Giant blob of minor changes
[dotfiles/.git] / .config / coc / extensions / node_modules / coc-prettier / node_modules / rxjs / src / internal / operators / windowToggle.ts
1 import { Operator } from '../Operator';
2 import { Subscriber } from '../Subscriber';
3 import { Observable } from '../Observable';
4 import { Subject } from '../Subject';
5 import { Subscription } from '../Subscription';
6 import { OuterSubscriber } from '../OuterSubscriber';
7 import { InnerSubscriber } from '../InnerSubscriber';
8 import { subscribeToResult } from '../util/subscribeToResult';
9 import { OperatorFunction } from '../types';
10
11 /**
12  * Branch out the source Observable values as a nested Observable starting from
13  * an emission from `openings` and ending when the output of `closingSelector`
14  * emits.
15  *
16  * <span class="informal">It's like {@link bufferToggle}, but emits a nested
17  * Observable instead of an array.</span>
18  *
19  * ![](windowToggle.png)
20  *
21  * Returns an Observable that emits windows of items it collects from the source
22  * Observable. The output Observable emits windows that contain those items
23  * emitted by the source Observable between the time when the `openings`
24  * Observable emits an item and when the Observable returned by
25  * `closingSelector` emits an item.
26  *
27  * ## Example
28  * Every other second, emit the click events from the next 500ms
29  * ```ts
30  * import { fromEvent, interval, EMPTY } from 'rxjs';
31  * import { windowToggle, mergeAll } from 'rxjs/operators';
32  *
33  * const clicks = fromEvent(document, 'click');
34  * const openings = interval(1000);
35  * const result = clicks.pipe(
36  *   windowToggle(openings, i => i % 2 ? interval(500) : EMPTY),
37  *   mergeAll()
38  * );
39  * result.subscribe(x => console.log(x));
40  * ```
41  *
42  * @see {@link window}
43  * @see {@link windowCount}
44  * @see {@link windowTime}
45  * @see {@link windowWhen}
46  * @see {@link bufferToggle}
47  *
48  * @param {Observable<O>} openings An observable of notifications to start new
49  * windows.
50  * @param {function(value: O): Observable} closingSelector A function that takes
51  * the value emitted by the `openings` observable and returns an Observable,
52  * which, when it emits (either `next` or `complete`), signals that the
53  * associated window should complete.
54  * @return {Observable<Observable<T>>} An observable of windows, which in turn
55  * are Observables.
56  * @method windowToggle
57  * @owner Observable
58  */
59 export function windowToggle<T, O>(openings: Observable<O>,
60                                    closingSelector: (openValue: O) => Observable<any>): OperatorFunction<T, Observable<T>> {
61   return (source: Observable<T>) => source.lift(new WindowToggleOperator<T, O>(openings, closingSelector));
62 }
63
64 class WindowToggleOperator<T, O> implements Operator<T, Observable<T>> {
65
66   constructor(private openings: Observable<O>,
67               private closingSelector: (openValue: O) => Observable<any>) {
68   }
69
70   call(subscriber: Subscriber<Observable<T>>, source: any): any {
71     return source.subscribe(new WindowToggleSubscriber(
72       subscriber, this.openings, this.closingSelector
73     ));
74   }
75 }
76
77 interface WindowContext<T> {
78   window: Subject<T>;
79   subscription: Subscription;
80 }
81
82 /**
83  * We need this JSDoc comment for affecting ESDoc.
84  * @ignore
85  * @extends {Ignored}
86  */
87 class WindowToggleSubscriber<T, O> extends OuterSubscriber<T, any> {
88   private contexts: WindowContext<T>[] = [];
89   private openSubscription: Subscription;
90
91   constructor(destination: Subscriber<Observable<T>>,
92               private openings: Observable<O>,
93               private closingSelector: (openValue: O) => Observable<any>) {
94     super(destination);
95     this.add(this.openSubscription = subscribeToResult(this, openings, openings as any));
96   }
97
98   protected _next(value: T) {
99     const { contexts } = this;
100     if (contexts) {
101       const len = contexts.length;
102       for (let i = 0; i < len; i++) {
103         contexts[i].window.next(value);
104       }
105     }
106   }
107
108   protected _error(err: any) {
109
110     const { contexts } = this;
111     this.contexts = null;
112
113     if (contexts) {
114       const len = contexts.length;
115       let index = -1;
116
117       while (++index < len) {
118         const context = contexts[index];
119         context.window.error(err);
120         context.subscription.unsubscribe();
121       }
122     }
123
124     super._error(err);
125   }
126
127   protected _complete() {
128     const { contexts } = this;
129     this.contexts = null;
130     if (contexts) {
131       const len = contexts.length;
132       let index = -1;
133       while (++index < len) {
134         const context = contexts[index];
135         context.window.complete();
136         context.subscription.unsubscribe();
137       }
138     }
139     super._complete();
140   }
141
142   /** @deprecated This is an internal implementation detail, do not use. */
143   _unsubscribe() {
144     const { contexts } = this;
145     this.contexts = null;
146     if (contexts) {
147       const len = contexts.length;
148       let index = -1;
149       while (++index < len) {
150         const context = contexts[index];
151         context.window.unsubscribe();
152         context.subscription.unsubscribe();
153       }
154     }
155   }
156
157   notifyNext(outerValue: any, innerValue: any,
158              outerIndex: number, innerIndex: number,
159              innerSub: InnerSubscriber<T, any>): void {
160
161     if (outerValue === this.openings) {
162       let closingNotifier;
163       try {
164         const { closingSelector } = this;
165         closingNotifier = closingSelector(innerValue);
166       } catch (e) {
167         return this.error(e);
168       }
169
170       const window = new Subject<T>();
171       const subscription = new Subscription();
172       const context = { window, subscription };
173       this.contexts.push(context);
174       const innerSubscription = subscribeToResult(this, closingNotifier, context as any);
175
176       if (innerSubscription.closed) {
177         this.closeWindow(this.contexts.length - 1);
178       } else {
179         (<any>innerSubscription).context = context;
180         subscription.add(innerSubscription);
181       }
182
183       this.destination.next(window);
184     } else {
185       this.closeWindow(this.contexts.indexOf(outerValue));
186     }
187   }
188
189   notifyError(err: any): void {
190     this.error(err);
191   }
192
193   notifyComplete(inner: Subscription): void {
194     if (inner !== this.openSubscription) {
195       this.closeWindow(this.contexts.indexOf((<any> inner).context));
196     }
197   }
198
199   private closeWindow(index: number): void {
200     if (index === -1) {
201       return;
202     }
203
204     const { contexts } = this;
205     const context = contexts[index];
206     const { window, subscription } = context;
207     contexts.splice(index, 1);
208     window.complete();
209     subscription.unsubscribe();
210   }
211 }