Actualizacion maquina principal
[dotfiles/.git] / .config / coc / extensions / node_modules / coc-prettier / node_modules / rxjs / src / internal / operators / mergeMap.ts
1 import { Observable } from '../Observable';
2 import { Operator } from '../Operator';
3 import { Subscriber } from '../Subscriber';
4 import { Subscription } from '../Subscription';
5 import { subscribeToResult } from '../util/subscribeToResult';
6 import { OuterSubscriber } from '../OuterSubscriber';
7 import { InnerSubscriber } from '../InnerSubscriber';
8 import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types';
9 import { map } from './map';
10 import { from } from '../observable/from';
11
12 /* tslint:disable:max-line-length */
13 export function mergeMap<T, O extends ObservableInput<any>>(project: (value: T, index: number) => O, concurrent?: number): OperatorFunction<T, ObservedValueOf<O>>;
14 /** @deprecated resultSelector no longer supported, use inner map instead */
15 export function mergeMap<T, O extends ObservableInput<any>>(project: (value: T, index: number) => O, resultSelector: undefined, concurrent?: number): OperatorFunction<T, ObservedValueOf<O>>;
16 /** @deprecated resultSelector no longer supported, use inner map instead */
17 export function mergeMap<T, R, O extends ObservableInput<any>>(project: (value: T, index: number) => O, resultSelector: (outerValue: T, innerValue: ObservedValueOf<O>, outerIndex: number, innerIndex: number) => R, concurrent?: number): OperatorFunction<T, R>;
18 /* tslint:enable:max-line-length */
19
20 /**
21  * Projects each source value to an Observable which is merged in the output
22  * Observable.
23  *
24  * <span class="informal">Maps each value to an Observable, then flattens all of
25  * these inner Observables using {@link mergeAll}.</span>
26  *
27  * ![](mergeMap.png)
28  *
29  * Returns an Observable that emits items based on applying a function that you
30  * supply to each item emitted by the source Observable, where that function
31  * returns an Observable, and then merging those resulting Observables and
32  * emitting the results of this merger.
33  *
34  * ## Example
35  * Map and flatten each letter to an Observable ticking every 1 second
36  * ```ts
37  * import { of, interval } from 'rxjs';
38  * import { mergeMap, map } from 'rxjs/operators';
39  *
40  * const letters = of('a', 'b', 'c');
41  * const result = letters.pipe(
42  *   mergeMap(x => interval(1000).pipe(map(i => x+i))),
43  * );
44  * result.subscribe(x => console.log(x));
45  *
46  * // Results in the following:
47  * // a0
48  * // b0
49  * // c0
50  * // a1
51  * // b1
52  * // c1
53  * // continues to list a,b,c with respective ascending integers
54  * ```
55  *
56  * @see {@link concatMap}
57  * @see {@link exhaustMap}
58  * @see {@link merge}
59  * @see {@link mergeAll}
60  * @see {@link mergeMapTo}
61  * @see {@link mergeScan}
62  * @see {@link switchMap}
63  *
64  * @param {function(value: T, ?index: number): ObservableInput} project A function
65  * that, when applied to an item emitted by the source Observable, returns an
66  * Observable.
67  * @param {number} [concurrent=Number.POSITIVE_INFINITY] Maximum number of input
68  * Observables being subscribed to concurrently.
69  * @return {Observable} An Observable that emits the result of applying the
70  * projection function (and the optional deprecated `resultSelector`) to each item
71  * emitted by the source Observable and merging the results of the Observables
72  * obtained from this transformation.
73  */
74 export function mergeMap<T, R, O extends ObservableInput<any>>(
75   project: (value: T, index: number) => O,
76   resultSelector?: ((outerValue: T, innerValue: ObservedValueOf<O>, outerIndex: number, innerIndex: number) => R) | number,
77   concurrent: number = Number.POSITIVE_INFINITY
78 ): OperatorFunction<T, ObservedValueOf<O>|R> {
79   if (typeof resultSelector === 'function') {
80     // DEPRECATED PATH
81     return (source: Observable<T>) => source.pipe(
82       mergeMap((a, i) => from(project(a, i)).pipe(
83         map((b: any, ii: number) => resultSelector(a, b, i, ii)),
84       ), concurrent)
85     );
86   } else if (typeof resultSelector === 'number') {
87     concurrent = resultSelector;
88   }
89   return (source: Observable<T>) => source.lift(new MergeMapOperator(project, concurrent));
90 }
91
92 export class MergeMapOperator<T, R> implements Operator<T, R> {
93   constructor(private project: (value: T, index: number) => ObservableInput<R>,
94               private concurrent: number = Number.POSITIVE_INFINITY) {
95   }
96
97   call(observer: Subscriber<R>, source: any): any {
98     return source.subscribe(new MergeMapSubscriber(
99       observer, this.project, this.concurrent
100     ));
101   }
102 }
103
104 /**
105  * We need this JSDoc comment for affecting ESDoc.
106  * @ignore
107  * @extends {Ignored}
108  */
109 export class MergeMapSubscriber<T, R> extends OuterSubscriber<T, R> {
110   private hasCompleted: boolean = false;
111   private buffer: T[] = [];
112   private active: number = 0;
113   protected index: number = 0;
114
115   constructor(destination: Subscriber<R>,
116               private project: (value: T, index: number) => ObservableInput<R>,
117               private concurrent: number = Number.POSITIVE_INFINITY) {
118     super(destination);
119   }
120
121   protected _next(value: T): void {
122     if (this.active < this.concurrent) {
123       this._tryNext(value);
124     } else {
125       this.buffer.push(value);
126     }
127   }
128
129   protected _tryNext(value: T) {
130     let result: ObservableInput<R>;
131     const index = this.index++;
132     try {
133       result = this.project(value, index);
134     } catch (err) {
135       this.destination.error(err);
136       return;
137     }
138     this.active++;
139     this._innerSub(result, value, index);
140   }
141
142   private _innerSub(ish: ObservableInput<R>, value: T, index: number): void {
143     const innerSubscriber = new InnerSubscriber(this, value, index);
144     const destination = this.destination as Subscription;
145     destination.add(innerSubscriber);
146     const innerSubscription = subscribeToResult<T, R>(this, ish, undefined, undefined, innerSubscriber);
147     // The returned subscription will usually be the subscriber that was
148     // passed. However, interop subscribers will be wrapped and for
149     // unsubscriptions to chain correctly, the wrapper needs to be added, too.
150     if (innerSubscription !== innerSubscriber) {
151       destination.add(innerSubscription);
152     }
153   }
154
155   protected _complete(): void {
156     this.hasCompleted = true;
157     if (this.active === 0 && this.buffer.length === 0) {
158       this.destination.complete();
159     }
160     this.unsubscribe();
161   }
162
163   notifyNext(outerValue: T, innerValue: R,
164              outerIndex: number, innerIndex: number,
165              innerSub: InnerSubscriber<T, R>): void {
166     this.destination.next(innerValue);
167   }
168
169   notifyComplete(innerSub: Subscription): void {
170     const buffer = this.buffer;
171     this.remove(innerSub);
172     this.active--;
173     if (buffer.length > 0) {
174       this._next(buffer.shift());
175     } else if (this.active === 0 && this.hasCompleted) {
176       this.destination.complete();
177     }
178   }
179 }
180
181 /**
182  * @deprecated renamed. Use {@link mergeMap}
183  */
184 export const flatMap = mergeMap;