2c3e96bc2eb356fabd6b71f680443934655aefbc
[dotfiles/.git] / .config / coc / extensions / node_modules / coc-prettier / node_modules / rxjs / src / internal / operators / bufferWhen.ts
1 import { Operator } from '../Operator';
2 import { Subscriber } from '../Subscriber';
3 import { Observable } from '../Observable';
4 import { Subscription } from '../Subscription';
5 import { OuterSubscriber } from '../OuterSubscriber';
6 import { InnerSubscriber } from '../InnerSubscriber';
7 import { subscribeToResult } from '../util/subscribeToResult';
8 import { OperatorFunction } from '../types';
9
10 /**
11  * Buffers the source Observable values, using a factory function of closing
12  * Observables to determine when to close, emit, and reset the buffer.
13  *
14  * <span class="informal">Collects values from the past as an array. When it
15  * starts collecting values, it calls a function that returns an Observable that
16  * tells when to close the buffer and restart collecting.</span>
17  *
18  * ![](bufferWhen.png)
19  *
20  * Opens a buffer immediately, then closes the buffer when the observable
21  * returned by calling `closingSelector` function emits a value. When it closes
22  * the buffer, it immediately opens a new buffer and repeats the process.
23  *
24  * ## Example
25  *
26  * Emit an array of the last clicks every [1-5] random seconds
27  *
28  * ```ts
29  * import { fromEvent, interval } from 'rxjs';
30  * import { bufferWhen } from 'rxjs/operators';
31  *
32  * const clicks = fromEvent(document, 'click');
33  * const buffered = clicks.pipe(bufferWhen(() =>
34  *   interval(1000 + Math.random() * 4000)
35  * ));
36  * buffered.subscribe(x => console.log(x));
37  * ```
38  *
39  *
40  * @see {@link buffer}
41  * @see {@link bufferCount}
42  * @see {@link bufferTime}
43  * @see {@link bufferToggle}
44  * @see {@link windowWhen}
45  *
46  * @param {function(): Observable} closingSelector A function that takes no
47  * arguments and returns an Observable that signals buffer closure.
48  * @return {Observable<T[]>} An observable of arrays of buffered values.
49  * @method bufferWhen
50  * @owner Observable
51  */
52 export function bufferWhen<T>(closingSelector: () => Observable<any>): OperatorFunction<T, T[]> {
53   return function (source: Observable<T>) {
54     return source.lift(new BufferWhenOperator(closingSelector));
55   };
56 }
57
58 class BufferWhenOperator<T> implements Operator<T, T[]> {
59
60   constructor(private closingSelector: () => Observable<any>) {
61   }
62
63   call(subscriber: Subscriber<T[]>, source: any): any {
64     return source.subscribe(new BufferWhenSubscriber(subscriber, this.closingSelector));
65   }
66 }
67
68 /**
69  * We need this JSDoc comment for affecting ESDoc.
70  * @ignore
71  * @extends {Ignored}
72  */
73 class BufferWhenSubscriber<T> extends OuterSubscriber<T, any> {
74   private buffer: T[];
75   private subscribing: boolean = false;
76   private closingSubscription: Subscription;
77
78   constructor(destination: Subscriber<T[]>, private closingSelector: () => Observable<any>) {
79     super(destination);
80     this.openBuffer();
81   }
82
83   protected _next(value: T) {
84     this.buffer.push(value);
85   }
86
87   protected _complete() {
88     const buffer = this.buffer;
89     if (buffer) {
90       this.destination.next(buffer);
91     }
92     super._complete();
93   }
94
95   /** @deprecated This is an internal implementation detail, do not use. */
96   _unsubscribe() {
97     this.buffer = null;
98     this.subscribing = false;
99   }
100
101   notifyNext(outerValue: T, innerValue: any,
102              outerIndex: number, innerIndex: number,
103              innerSub: InnerSubscriber<T, any>): void {
104     this.openBuffer();
105   }
106
107   notifyComplete(): void {
108     if (this.subscribing) {
109       this.complete();
110     } else {
111       this.openBuffer();
112     }
113   }
114
115   openBuffer() {
116     let { closingSubscription } = this;
117
118     if (closingSubscription) {
119       this.remove(closingSubscription);
120       closingSubscription.unsubscribe();
121     }
122
123     const buffer = this.buffer;
124     if (this.buffer) {
125       this.destination.next(buffer);
126     }
127
128     this.buffer = [];
129
130     let closingNotifier;
131     try {
132       const { closingSelector } = this;
133       closingNotifier = closingSelector();
134     } catch (err) {
135       return this.error(err);
136     }
137     closingSubscription = new Subscription();
138     this.closingSubscription = closingSubscription;
139     this.add(closingSubscription);
140     this.subscribing = true;
141     closingSubscription.add(subscribeToResult(this, closingNotifier));
142     this.subscribing = false;
143   }
144 }