Giant blob of minor changes
[dotfiles/.git] / .config / coc / extensions / node_modules / coc-prettier / node_modules / rxjs / src / internal / operators / windowWhen.ts
1 import { Operator } from '../Operator';
2 import { Subscriber } from '../Subscriber';
3 import { Observable } from '../Observable';
4 import { Subject } from '../Subject';
5 import { Subscription } from '../Subscription';
6 import { OuterSubscriber } from '../OuterSubscriber';
7 import { InnerSubscriber } from '../InnerSubscriber';
8 import { subscribeToResult } from '../util/subscribeToResult';
9 import { OperatorFunction } from '../types';
10
11 /**
12  * Branch out the source Observable values as a nested Observable using a
13  * factory function of closing Observables to determine when to start a new
14  * window.
15  *
16  * <span class="informal">It's like {@link bufferWhen}, but emits a nested
17  * Observable instead of an array.</span>
18  *
19  * ![](windowWhen.png)
20  *
21  * Returns an Observable that emits windows of items it collects from the source
22  * Observable. The output Observable emits connected, non-overlapping windows.
23  * It emits the current window and opens a new one whenever the Observable
24  * produced by the specified `closingSelector` function emits an item. The first
25  * window is opened immediately when subscribing to the output Observable.
26  *
27  * ## Example
28  * Emit only the first two clicks events in every window of [1-5] random seconds
29  * ```ts
30  * import { fromEvent, interval } from 'rxjs';
31  * import { windowWhen, map, mergeAll, take } from 'rxjs/operators';
32  *
33  * const clicks = fromEvent(document, 'click');
34  * const result = clicks.pipe(
35  *   windowWhen(() => interval(1000 + Math.random() * 4000)),
36  *   map(win => win.pipe(take(2))),     // each window has at most 2 emissions
37  *   mergeAll()                         // flatten the Observable-of-Observables
38  * );
39  * result.subscribe(x => console.log(x));
40  * ```
41  *
42  * @see {@link window}
43  * @see {@link windowCount}
44  * @see {@link windowTime}
45  * @see {@link windowToggle}
46  * @see {@link bufferWhen}
47  *
48  * @param {function(): Observable} closingSelector A function that takes no
49  * arguments and returns an Observable that signals (on either `next` or
50  * `complete`) when to close the previous window and start a new one.
51  * @return {Observable<Observable<T>>} An observable of windows, which in turn
52  * are Observables.
53  * @method windowWhen
54  * @owner Observable
55  */
56 export function windowWhen<T>(closingSelector: () => Observable<any>): OperatorFunction<T, Observable<T>> {
57   return function windowWhenOperatorFunction(source: Observable<T>) {
58     return source.lift(new WindowOperator<T>(closingSelector));
59   };
60 }
61
62 class WindowOperator<T> implements Operator<T, Observable<T>> {
63   constructor(private closingSelector: () => Observable<any>) {
64   }
65
66   call(subscriber: Subscriber<Observable<T>>, source: any): any {
67     return source.subscribe(new WindowSubscriber(subscriber, this.closingSelector));
68   }
69 }
70
71 /**
72  * We need this JSDoc comment for affecting ESDoc.
73  * @ignore
74  * @extends {Ignored}
75  */
76 class WindowSubscriber<T> extends OuterSubscriber<T, any> {
77   private window?: Subject<T>;
78   private closingNotification?: Subscription;
79
80   constructor(protected destination: Subscriber<Observable<T>>,
81               private closingSelector: () => Observable<any>) {
82     super(destination);
83     this.openWindow();
84   }
85
86   notifyNext(_outerValue: T, _innerValue: any,
87              _outerIndex: number, _innerIndex: number,
88              innerSub: InnerSubscriber<T, any>): void {
89     this.openWindow(innerSub);
90   }
91
92   notifyError(error: any): void {
93     this._error(error);
94   }
95
96   notifyComplete(innerSub: InnerSubscriber<T, any>): void {
97     this.openWindow(innerSub);
98   }
99
100   protected _next(value: T): void {
101     this.window!.next(value);
102   }
103
104   protected _error(err: any): void {
105     this.window!.error(err);
106     this.destination.error(err);
107     this.unsubscribeClosingNotification();
108   }
109
110   protected _complete(): void {
111     this.window!.complete();
112     this.destination.complete();
113     this.unsubscribeClosingNotification();
114   }
115
116   private unsubscribeClosingNotification(): void {
117     if (this.closingNotification) {
118       this.closingNotification.unsubscribe();
119     }
120   }
121
122   private openWindow(innerSub: InnerSubscriber<T, any> | null = null): void {
123     if (innerSub) {
124       this.remove(innerSub);
125       innerSub.unsubscribe();
126     }
127
128     const prevWindow = this.window;
129     if (prevWindow) {
130       prevWindow.complete();
131     }
132
133     const window = this.window = new Subject<T>();
134     this.destination.next(window);
135
136     let closingNotifier;
137     try {
138       const { closingSelector } = this;
139       closingNotifier = closingSelector();
140     } catch (e) {
141       this.destination.error(e);
142       this.window.error(e);
143       return;
144     }
145     this.add(this.closingNotification = subscribeToResult(this, closingNotifier));
146   }
147 }