1 import { Subject } from './Subject';
2 import { queue } from './scheduler/queue';
3 import { Subscription } from './Subscription';
4 import { ObserveOnSubscriber } from './operators/observeOn';
5 import { ObjectUnsubscribedError } from './util/ObjectUnsubscribedError';
6 import { SubjectSubscription } from './SubjectSubscription';
7 export class ReplaySubject extends Subject {
8 constructor(bufferSize = Number.POSITIVE_INFINITY, windowTime = Number.POSITIVE_INFINITY, scheduler) {
10 this.scheduler = scheduler;
12 this._infiniteTimeWindow = false;
13 this._bufferSize = bufferSize < 1 ? 1 : bufferSize;
14 this._windowTime = windowTime < 1 ? 1 : windowTime;
15 if (windowTime === Number.POSITIVE_INFINITY) {
16 this._infiniteTimeWindow = true;
17 this.next = this.nextInfiniteTimeWindow;
20 this.next = this.nextTimeWindow;
23 nextInfiniteTimeWindow(value) {
24 const _events = this._events;
26 if (_events.length > this._bufferSize) {
31 nextTimeWindow(value) {
32 this._events.push(new ReplayEvent(this._getNow(), value));
33 this._trimBufferThenGetEvents();
36 _subscribe(subscriber) {
37 const _infiniteTimeWindow = this._infiniteTimeWindow;
38 const _events = _infiniteTimeWindow ? this._events : this._trimBufferThenGetEvents();
39 const scheduler = this.scheduler;
40 const len = _events.length;
43 throw new ObjectUnsubscribedError();
45 else if (this.isStopped || this.hasError) {
46 subscription = Subscription.EMPTY;
49 this.observers.push(subscriber);
50 subscription = new SubjectSubscription(this, subscriber);
53 subscriber.add(subscriber = new ObserveOnSubscriber(subscriber, scheduler));
55 if (_infiniteTimeWindow) {
56 for (let i = 0; i < len && !subscriber.closed; i++) {
57 subscriber.next(_events[i]);
61 for (let i = 0; i < len && !subscriber.closed; i++) {
62 subscriber.next(_events[i].value);
66 subscriber.error(this.thrownError);
68 else if (this.isStopped) {
69 subscriber.complete();
74 return (this.scheduler || queue).now();
76 _trimBufferThenGetEvents() {
77 const now = this._getNow();
78 const _bufferSize = this._bufferSize;
79 const _windowTime = this._windowTime;
80 const _events = this._events;
81 const eventsCount = _events.length;
83 while (spliceCount < eventsCount) {
84 if ((now - _events[spliceCount].time) < _windowTime) {
89 if (eventsCount > _bufferSize) {
90 spliceCount = Math.max(spliceCount, eventsCount - _bufferSize);
92 if (spliceCount > 0) {
93 _events.splice(0, spliceCount);
99 constructor(time, value) {
104 //# sourceMappingURL=ReplaySubject.js.map