1 import { Subject } from '../Subject';
2 import { async } from '../scheduler/async';
3 import { Subscriber } from '../Subscriber';
4 import { isNumeric } from '../util/isNumeric';
5 import { isScheduler } from '../util/isScheduler';
6 export function windowTime(windowTimeSpan) {
8 let windowCreationInterval = null;
9 let maxWindowSize = Number.POSITIVE_INFINITY;
10 if (isScheduler(arguments[3])) {
11 scheduler = arguments[3];
13 if (isScheduler(arguments[2])) {
14 scheduler = arguments[2];
16 else if (isNumeric(arguments[2])) {
17 maxWindowSize = Number(arguments[2]);
19 if (isScheduler(arguments[1])) {
20 scheduler = arguments[1];
22 else if (isNumeric(arguments[1])) {
23 windowCreationInterval = Number(arguments[1]);
25 return function windowTimeOperatorFunction(source) {
26 return source.lift(new WindowTimeOperator(windowTimeSpan, windowCreationInterval, maxWindowSize, scheduler));
29 class WindowTimeOperator {
30 constructor(windowTimeSpan, windowCreationInterval, maxWindowSize, scheduler) {
31 this.windowTimeSpan = windowTimeSpan;
32 this.windowCreationInterval = windowCreationInterval;
33 this.maxWindowSize = maxWindowSize;
34 this.scheduler = scheduler;
36 call(subscriber, source) {
37 return source.subscribe(new WindowTimeSubscriber(subscriber, this.windowTimeSpan, this.windowCreationInterval, this.maxWindowSize, this.scheduler));
40 class CountedSubject extends Subject {
43 this._numberOfNextedValues = 0;
46 this._numberOfNextedValues++;
49 get numberOfNextedValues() {
50 return this._numberOfNextedValues;
53 class WindowTimeSubscriber extends Subscriber {
54 constructor(destination, windowTimeSpan, windowCreationInterval, maxWindowSize, scheduler) {
56 this.destination = destination;
57 this.windowTimeSpan = windowTimeSpan;
58 this.windowCreationInterval = windowCreationInterval;
59 this.maxWindowSize = maxWindowSize;
60 this.scheduler = scheduler;
62 const window = this.openWindow();
63 if (windowCreationInterval !== null && windowCreationInterval >= 0) {
64 const closeState = { subscriber: this, window, context: null };
65 const creationState = { windowTimeSpan, windowCreationInterval, subscriber: this, scheduler };
66 this.add(scheduler.schedule(dispatchWindowClose, windowTimeSpan, closeState));
67 this.add(scheduler.schedule(dispatchWindowCreation, windowCreationInterval, creationState));
70 const timeSpanOnlyState = { subscriber: this, window, windowTimeSpan };
71 this.add(scheduler.schedule(dispatchWindowTimeSpanOnly, windowTimeSpan, timeSpanOnlyState));
75 const windows = this.windows;
76 const len = windows.length;
77 for (let i = 0; i < len; i++) {
78 const window = windows[i];
81 if (window.numberOfNextedValues >= this.maxWindowSize) {
82 this.closeWindow(window);
88 const windows = this.windows;
89 while (windows.length > 0) {
90 windows.shift().error(err);
92 this.destination.error(err);
95 const windows = this.windows;
96 while (windows.length > 0) {
97 const window = windows.shift();
102 this.destination.complete();
105 const window = new CountedSubject();
106 this.windows.push(window);
107 const destination = this.destination;
108 destination.next(window);
111 closeWindow(window) {
113 const windows = this.windows;
114 windows.splice(windows.indexOf(window), 1);
117 function dispatchWindowTimeSpanOnly(state) {
118 const { subscriber, windowTimeSpan, window } = state;
120 subscriber.closeWindow(window);
122 state.window = subscriber.openWindow();
123 this.schedule(state, windowTimeSpan);
125 function dispatchWindowCreation(state) {
126 const { windowTimeSpan, subscriber, scheduler, windowCreationInterval } = state;
127 const window = subscriber.openWindow();
129 let context = { action, subscription: null };
130 const timeSpanState = { subscriber, window, context };
131 context.subscription = scheduler.schedule(dispatchWindowClose, windowTimeSpan, timeSpanState);
132 action.add(context.subscription);
133 action.schedule(state, windowCreationInterval);
135 function dispatchWindowClose(state) {
136 const { subscriber, window, context } = state;
137 if (context && context.action && context.subscription) {
138 context.action.remove(context.subscription);
140 subscriber.closeWindow(window);
142 //# sourceMappingURL=windowTime.js.map