Giant blob of minor changes
[dotfiles/.git] / .config / coc / extensions / node_modules / coc-prettier / node_modules / rxjs / src / internal / operators / bufferTime.ts
1 import { Operator } from '../Operator';
2 import { async } from '../scheduler/async';
3 import { Observable } from '../Observable';
4 import { Subscriber } from '../Subscriber';
5 import { Subscription } from '../Subscription';
6 import { isScheduler } from '../util/isScheduler';
7 import { OperatorFunction, SchedulerAction, SchedulerLike } from '../types';
8
9 /* tslint:disable:max-line-length */
10 export function bufferTime<T>(bufferTimeSpan: number, scheduler?: SchedulerLike): OperatorFunction<T, T[]>;
11 export function bufferTime<T>(bufferTimeSpan: number, bufferCreationInterval: number | null | undefined, scheduler?: SchedulerLike): OperatorFunction<T, T[]>;
12 export function bufferTime<T>(bufferTimeSpan: number, bufferCreationInterval: number | null | undefined, maxBufferSize: number, scheduler?: SchedulerLike): OperatorFunction<T, T[]>;
13 /* tslint:enable:max-line-length */
14
15 /**
16  * Buffers the source Observable values for a specific time period.
17  *
18  * <span class="informal">Collects values from the past as an array, and emits
19  * those arrays periodically in time.</span>
20  *
21  * ![](bufferTime.png)
22  *
23  * Buffers values from the source for a specific time duration `bufferTimeSpan`.
24  * Unless the optional argument `bufferCreationInterval` is given, it emits and
25  * resets the buffer every `bufferTimeSpan` milliseconds. If
26  * `bufferCreationInterval` is given, this operator opens the buffer every
27  * `bufferCreationInterval` milliseconds and closes (emits and resets) the
28  * buffer every `bufferTimeSpan` milliseconds. When the optional argument
29  * `maxBufferSize` is specified, the buffer will be closed either after
30  * `bufferTimeSpan` milliseconds or when it contains `maxBufferSize` elements.
31  *
32  * ## Examples
33  *
34  * Every second, emit an array of the recent click events
35  *
36  * ```ts
37  * import { fromEvent } from 'rxjs';
38  * import { bufferTime } from 'rxjs/operators';
39  *
40  * const clicks = fromEvent(document, 'click');
41  * const buffered = clicks.pipe(bufferTime(1000));
42  * buffered.subscribe(x => console.log(x));
43  * ```
44  *
45  * Every 5 seconds, emit the click events from the next 2 seconds
46  *
47  * ```ts
48  * import { fromEvent } from 'rxjs';
49  * import { bufferTime } from 'rxjs/operators';
50  *
51  * const clicks = fromEvent(document, 'click');
52  * const buffered = clicks.pipe(bufferTime(2000, 5000));
53  * buffered.subscribe(x => console.log(x));
54  * ```
55  *
56  * @see {@link buffer}
57  * @see {@link bufferCount}
58  * @see {@link bufferToggle}
59  * @see {@link bufferWhen}
60  * @see {@link windowTime}
61  *
62  * @param {number} bufferTimeSpan The amount of time to fill each buffer array.
63  * @param {number} [bufferCreationInterval] The interval at which to start new
64  * buffers.
65  * @param {number} [maxBufferSize] The maximum buffer size.
66  * @param {SchedulerLike} [scheduler=async] The scheduler on which to schedule the
67  * intervals that determine buffer boundaries.
68  * @return {Observable<T[]>} An observable of arrays of buffered values.
69  * @method bufferTime
70  * @owner Observable
71  */
72 export function bufferTime<T>(bufferTimeSpan: number): OperatorFunction<T, T[]> {
73   let length: number = arguments.length;
74
75   let scheduler: SchedulerLike = async;
76   if (isScheduler(arguments[arguments.length - 1])) {
77     scheduler = arguments[arguments.length - 1];
78     length--;
79   }
80
81   let bufferCreationInterval: number = null;
82   if (length >= 2) {
83     bufferCreationInterval = arguments[1];
84   }
85
86   let maxBufferSize: number = Number.POSITIVE_INFINITY;
87   if (length >= 3) {
88     maxBufferSize = arguments[2];
89   }
90
91   return function bufferTimeOperatorFunction(source: Observable<T>) {
92     return source.lift(new BufferTimeOperator<T>(bufferTimeSpan, bufferCreationInterval, maxBufferSize, scheduler));
93   };
94 }
95
96 class BufferTimeOperator<T> implements Operator<T, T[]> {
97   constructor(private bufferTimeSpan: number,
98               private bufferCreationInterval: number,
99               private maxBufferSize: number,
100               private scheduler: SchedulerLike) {
101   }
102
103   call(subscriber: Subscriber<T[]>, source: any): any {
104     return source.subscribe(new BufferTimeSubscriber(
105       subscriber, this.bufferTimeSpan, this.bufferCreationInterval, this.maxBufferSize, this.scheduler
106     ));
107   }
108 }
109
110 class Context<T> {
111   buffer: T[] = [];
112   closeAction: Subscription;
113 }
114
115 interface DispatchCreateArg<T> {
116   bufferTimeSpan: number;
117   bufferCreationInterval: number;
118   subscriber: BufferTimeSubscriber<T>;
119   scheduler: SchedulerLike;
120 }
121
122 interface DispatchCloseArg<T> {
123   subscriber: BufferTimeSubscriber<T>;
124   context: Context<T>;
125 }
126
127 /**
128  * We need this JSDoc comment for affecting ESDoc.
129  * @ignore
130  * @extends {Ignored}
131  */
132 class BufferTimeSubscriber<T> extends Subscriber<T> {
133   private contexts: Array<Context<T>> = [];
134   private timespanOnly: boolean;
135
136   constructor(destination: Subscriber<T[]>,
137               private bufferTimeSpan: number,
138               private bufferCreationInterval: number,
139               private maxBufferSize: number,
140               private scheduler: SchedulerLike) {
141     super(destination);
142     const context = this.openContext();
143     this.timespanOnly = bufferCreationInterval == null || bufferCreationInterval < 0;
144     if (this.timespanOnly) {
145       const timeSpanOnlyState = { subscriber: this, context, bufferTimeSpan };
146       this.add(context.closeAction = scheduler.schedule(dispatchBufferTimeSpanOnly, bufferTimeSpan, timeSpanOnlyState));
147     } else {
148       const closeState = { subscriber: this, context };
149       const creationState: DispatchCreateArg<T> = { bufferTimeSpan, bufferCreationInterval, subscriber: this, scheduler };
150       this.add(context.closeAction = scheduler.schedule<DispatchCloseArg<T>>(dispatchBufferClose, bufferTimeSpan, closeState));
151       this.add(scheduler.schedule<DispatchCreateArg<T>>(dispatchBufferCreation, bufferCreationInterval, creationState));
152     }
153   }
154
155   protected _next(value: T) {
156     const contexts = this.contexts;
157     const len = contexts.length;
158     let filledBufferContext: Context<T>;
159     for (let i = 0; i < len; i++) {
160       const context = contexts[i];
161       const buffer = context.buffer;
162       buffer.push(value);
163       if (buffer.length == this.maxBufferSize) {
164         filledBufferContext = context;
165       }
166     }
167
168     if (filledBufferContext) {
169       this.onBufferFull(filledBufferContext);
170     }
171   }
172
173   protected _error(err: any) {
174     this.contexts.length = 0;
175     super._error(err);
176   }
177
178   protected _complete() {
179     const { contexts, destination } = this;
180     while (contexts.length > 0) {
181       const context = contexts.shift();
182       destination.next(context.buffer);
183     }
184     super._complete();
185   }
186
187   /** @deprecated This is an internal implementation detail, do not use. */
188   _unsubscribe() {
189     this.contexts = null;
190   }
191
192   protected onBufferFull(context: Context<T>) {
193     this.closeContext(context);
194     const closeAction = context.closeAction;
195     closeAction.unsubscribe();
196     this.remove(closeAction);
197
198     if (!this.closed && this.timespanOnly) {
199       context = this.openContext();
200       const bufferTimeSpan = this.bufferTimeSpan;
201       const timeSpanOnlyState = { subscriber: this, context, bufferTimeSpan };
202       this.add(context.closeAction = this.scheduler.schedule(dispatchBufferTimeSpanOnly, bufferTimeSpan, timeSpanOnlyState));
203     }
204   }
205
206   openContext(): Context<T> {
207     const context: Context<T> = new Context<T>();
208     this.contexts.push(context);
209     return context;
210   }
211
212   closeContext(context: Context<T>) {
213     this.destination.next(context.buffer);
214     const contexts = this.contexts;
215
216     const spliceIndex = contexts ? contexts.indexOf(context) : -1;
217     if (spliceIndex >= 0) {
218       contexts.splice(contexts.indexOf(context), 1);
219     }
220   }
221 }
222
223 function dispatchBufferTimeSpanOnly(this: SchedulerAction<any>, state: any) {
224   const subscriber: BufferTimeSubscriber<any> = state.subscriber;
225
226   const prevContext = state.context;
227   if (prevContext) {
228     subscriber.closeContext(prevContext);
229   }
230
231   if (!subscriber.closed) {
232     state.context = subscriber.openContext();
233     state.context.closeAction = this.schedule(state, state.bufferTimeSpan);
234   }
235 }
236
237 function dispatchBufferCreation<T>(this: SchedulerAction<DispatchCreateArg<T>>, state: DispatchCreateArg<T>) {
238   const { bufferCreationInterval, bufferTimeSpan, subscriber, scheduler } = state;
239   const context = subscriber.openContext();
240   const action = <SchedulerAction<DispatchCreateArg<T>>>this;
241   if (!subscriber.closed) {
242     subscriber.add(context.closeAction = scheduler.schedule<DispatchCloseArg<T>>(dispatchBufferClose, bufferTimeSpan, { subscriber, context }));
243     action.schedule(state, bufferCreationInterval);
244   }
245 }
246
247 function dispatchBufferClose<T>(arg: DispatchCloseArg<T>) {
248   const { subscriber, context } = arg;
249   subscriber.closeContext(context);
250 }