60ff2882f9479643edd3689c3028463a1c6e4b7e
[dotfiles/.git] / .config / coc / extensions / node_modules / coc-prettier / node_modules / rxjs / src / internal / operators / mergeScan.ts
1 import { Operator } from '../Operator';
2 import { Observable } from '../Observable';
3 import { Subscriber } from '../Subscriber';
4 import { Subscription } from '../Subscription';
5 import { subscribeToResult } from '../util/subscribeToResult';
6 import { OuterSubscriber } from '../OuterSubscriber';
7 import { InnerSubscriber } from '../InnerSubscriber';
8 import { ObservableInput, OperatorFunction } from '../types';
9
10 /**
11  * Applies an accumulator function over the source Observable where the
12  * accumulator function itself returns an Observable, then each intermediate
13  * Observable returned is merged into the output Observable.
14  *
15  * <span class="informal">It's like {@link scan}, but the Observables returned
16  * by the accumulator are merged into the outer Observable.</span>
17  *
18  * ## Example
19  * Count the number of click events
20  * ```ts
21  * import { fromEvent, of } from 'rxjs';
22  * import { mapTo, mergeScan } from 'rxjs/operators';
23  *
24  * const click$ = fromEvent(document, 'click');
25  * const one$ = click$.pipe(mapTo(1));
26  * const seed = 0;
27  * const count$ = one$.pipe(
28  *   mergeScan((acc, one) => of(acc + one), seed),
29  * );
30  * count$.subscribe(x => console.log(x));
31  *
32  * // Results:
33  * // 1
34  * // 2
35  * // 3
36  * // 4
37  * // ...and so on for each click
38  * ```
39  *
40  * @param {function(acc: R, value: T): Observable<R>} accumulator
41  * The accumulator function called on each source value.
42  * @param seed The initial accumulation value.
43  * @param {number} [concurrent=Number.POSITIVE_INFINITY] Maximum number of
44  * input Observables being subscribed to concurrently.
45  * @return {Observable<R>} An observable of the accumulated values.
46  * @method mergeScan
47  * @owner Observable
48  */
49 export function mergeScan<T, R>(accumulator: (acc: R, value: T, index: number) => ObservableInput<R>,
50                                 seed: R,
51                                 concurrent: number = Number.POSITIVE_INFINITY): OperatorFunction<T, R> {
52   return (source: Observable<T>) => source.lift(new MergeScanOperator(accumulator, seed, concurrent));
53 }
54
55 export class MergeScanOperator<T, R> implements Operator<T, R> {
56   constructor(private accumulator: (acc: R, value: T, index: number) => ObservableInput<R>,
57               private seed: R,
58               private concurrent: number) {
59   }
60
61   call(subscriber: Subscriber<R>, source: any): any {
62     return source.subscribe(new MergeScanSubscriber(
63       subscriber, this.accumulator, this.seed, this.concurrent
64     ));
65   }
66 }
67
68 /**
69  * We need this JSDoc comment for affecting ESDoc.
70  * @ignore
71  * @extends {Ignored}
72  */
73 export class MergeScanSubscriber<T, R> extends OuterSubscriber<T, R> {
74   private hasValue: boolean = false;
75   private hasCompleted: boolean = false;
76   private buffer: Observable<any>[] = [];
77   private active: number = 0;
78   protected index: number = 0;
79
80   constructor(destination: Subscriber<R>,
81               private accumulator: (acc: R, value: T, index: number) => ObservableInput<R>,
82               private acc: R,
83               private concurrent: number) {
84     super(destination);
85   }
86
87   protected _next(value: any): void {
88     if (this.active < this.concurrent) {
89       const index = this.index++;
90       const destination = this.destination;
91       let ish;
92       try {
93         const { accumulator } = this;
94         ish = accumulator(this.acc, value, index);
95       } catch (e) {
96         return destination.error(e);
97       }
98       this.active++;
99       this._innerSub(ish, value, index);
100     } else {
101       this.buffer.push(value);
102     }
103   }
104
105   private _innerSub(ish: any, value: T, index: number): void {
106     const innerSubscriber = new InnerSubscriber(this, value, index);
107     const destination = this.destination as Subscription;
108     destination.add(innerSubscriber);
109     const innerSubscription = subscribeToResult<T, R>(this, ish, undefined, undefined, innerSubscriber);
110     // The returned subscription will usually be the subscriber that was
111     // passed. However, interop subscribers will be wrapped and for
112     // unsubscriptions to chain correctly, the wrapper needs to be added, too.
113     if (innerSubscription !== innerSubscriber) {
114       destination.add(innerSubscription);
115     }
116   }
117
118   protected _complete(): void {
119     this.hasCompleted = true;
120     if (this.active === 0 && this.buffer.length === 0) {
121       if (this.hasValue === false) {
122         this.destination.next(this.acc);
123       }
124       this.destination.complete();
125     }
126     this.unsubscribe();
127   }
128
129   notifyNext(outerValue: T, innerValue: R,
130              outerIndex: number, innerIndex: number,
131              innerSub: InnerSubscriber<T, R>): void {
132     const { destination } = this;
133     this.acc = innerValue;
134     this.hasValue = true;
135     destination.next(innerValue);
136   }
137
138   notifyComplete(innerSub: Subscription): void {
139     const buffer = this.buffer;
140     const destination = this.destination as Subscription;
141     destination.remove(innerSub);
142     this.active--;
143     if (buffer.length > 0) {
144       this._next(buffer.shift());
145     } else if (this.active === 0 && this.hasCompleted) {
146       if (this.hasValue === false) {
147         this.destination.next(this.acc);
148       }
149       this.destination.complete();
150     }
151   }
152 }