Giant blob of minor changes
[dotfiles/.git] / .config / coc / extensions / node_modules / coc-prettier / node_modules / rxjs / src / internal / operators / bufferWhen.ts
1 import { Operator } from '../Operator';
2 import { Subscriber } from '../Subscriber';
3 import { Observable } from '../Observable';
4 import { Subscription } from '../Subscription';
5 import { OperatorFunction } from '../types';
6 import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe';
7
8 /**
9  * Buffers the source Observable values, using a factory function of closing
10  * Observables to determine when to close, emit, and reset the buffer.
11  *
12  * <span class="informal">Collects values from the past as an array. When it
13  * starts collecting values, it calls a function that returns an Observable that
14  * tells when to close the buffer and restart collecting.</span>
15  *
16  * ![](bufferWhen.png)
17  *
18  * Opens a buffer immediately, then closes the buffer when the observable
19  * returned by calling `closingSelector` function emits a value. When it closes
20  * the buffer, it immediately opens a new buffer and repeats the process.
21  *
22  * ## Example
23  *
24  * Emit an array of the last clicks every [1-5] random seconds
25  *
26  * ```ts
27  * import { fromEvent, interval } from 'rxjs';
28  * import { bufferWhen } from 'rxjs/operators';
29  *
30  * const clicks = fromEvent(document, 'click');
31  * const buffered = clicks.pipe(bufferWhen(() =>
32  *   interval(1000 + Math.random() * 4000)
33  * ));
34  * buffered.subscribe(x => console.log(x));
35  * ```
36  *
37  *
38  * @see {@link buffer}
39  * @see {@link bufferCount}
40  * @see {@link bufferTime}
41  * @see {@link bufferToggle}
42  * @see {@link windowWhen}
43  *
44  * @param {function(): Observable} closingSelector A function that takes no
45  * arguments and returns an Observable that signals buffer closure.
46  * @return {Observable<T[]>} An observable of arrays of buffered values.
47  * @method bufferWhen
48  * @owner Observable
49  */
50 export function bufferWhen<T>(closingSelector: () => Observable<any>): OperatorFunction<T, T[]> {
51   return function (source: Observable<T>) {
52     return source.lift(new BufferWhenOperator(closingSelector));
53   };
54 }
55
56 class BufferWhenOperator<T> implements Operator<T, T[]> {
57
58   constructor(private closingSelector: () => Observable<any>) {
59   }
60
61   call(subscriber: Subscriber<T[]>, source: any): any {
62     return source.subscribe(new BufferWhenSubscriber(subscriber, this.closingSelector));
63   }
64 }
65
66 /**
67  * We need this JSDoc comment for affecting ESDoc.
68  * @ignore
69  * @extends {Ignored}
70  */
71 class BufferWhenSubscriber<T> extends SimpleOuterSubscriber<T, any> {
72   private buffer?: T[];
73   private subscribing: boolean = false;
74   private closingSubscription?: Subscription;
75
76   constructor(destination: Subscriber<T[]>, private closingSelector: () => Observable<any>) {
77     super(destination);
78     this.openBuffer();
79   }
80
81   protected _next(value: T) {
82     this.buffer!.push(value);
83   }
84
85   protected _complete() {
86     const buffer = this.buffer;
87     if (buffer) {
88       this.destination.next!(buffer);
89     }
90     super._complete();
91   }
92
93   /** @deprecated This is an internal implementation detail, do not use. */
94   _unsubscribe() {
95     this.buffer = undefined;
96     this.subscribing = false;
97   }
98
99   notifyNext(): void {
100     this.openBuffer();
101   }
102
103   notifyComplete(): void {
104     if (this.subscribing) {
105       this.complete();
106     } else {
107       this.openBuffer();
108     }
109   }
110
111   openBuffer() {
112     let { closingSubscription } = this;
113
114     if (closingSubscription) {
115       this.remove(closingSubscription);
116       closingSubscription.unsubscribe();
117     }
118
119     const buffer = this.buffer;
120     if (this.buffer) {
121       this.destination.next!(buffer);
122     }
123
124     this.buffer = [];
125
126     let closingNotifier;
127     try {
128       const { closingSelector } = this;
129       closingNotifier = closingSelector();
130     } catch (err) {
131       return this.error(err);
132     }
133     closingSubscription = new Subscription();
134     this.closingSubscription = closingSubscription;
135     this.add(closingSubscription);
136     this.subscribing = true;
137     closingSubscription.add(innerSubscribe(closingNotifier, new SimpleInnerSubscriber(this)));
138     this.subscribing = false;
139   }
140 }