1 import { subscribeToResult } from '../util/subscribeToResult';
2 import { OuterSubscriber } from '../OuterSubscriber';
3 import { InnerSubscriber } from '../InnerSubscriber';
4 import { map } from './map';
5 import { from } from '../observable/from';
6 export function mergeMap(project, resultSelector, concurrent = Number.POSITIVE_INFINITY) {
7 if (typeof resultSelector === 'function') {
8 return (source) => source.pipe(mergeMap((a, i) => from(project(a, i)).pipe(map((b, ii) => resultSelector(a, b, i, ii))), concurrent));
10 else if (typeof resultSelector === 'number') {
11 concurrent = resultSelector;
13 return (source) => source.lift(new MergeMapOperator(project, concurrent));
15 export class MergeMapOperator {
16 constructor(project, concurrent = Number.POSITIVE_INFINITY) {
17 this.project = project;
18 this.concurrent = concurrent;
20 call(observer, source) {
21 return source.subscribe(new MergeMapSubscriber(observer, this.project, this.concurrent));
24 export class MergeMapSubscriber extends OuterSubscriber {
25 constructor(destination, project, concurrent = Number.POSITIVE_INFINITY) {
27 this.project = project;
28 this.concurrent = concurrent;
29 this.hasCompleted = false;
35 if (this.active < this.concurrent) {
39 this.buffer.push(value);
44 const index = this.index++;
46 result = this.project(value, index);
49 this.destination.error(err);
53 this._innerSub(result, value, index);
55 _innerSub(ish, value, index) {
56 const innerSubscriber = new InnerSubscriber(this, value, index);
57 const destination = this.destination;
58 destination.add(innerSubscriber);
59 const innerSubscription = subscribeToResult(this, ish, undefined, undefined, innerSubscriber);
60 if (innerSubscription !== innerSubscriber) {
61 destination.add(innerSubscription);
65 this.hasCompleted = true;
66 if (this.active === 0 && this.buffer.length === 0) {
67 this.destination.complete();
71 notifyNext(outerValue, innerValue, outerIndex, innerIndex, innerSub) {
72 this.destination.next(innerValue);
74 notifyComplete(innerSub) {
75 const buffer = this.buffer;
76 this.remove(innerSub);
78 if (buffer.length > 0) {
79 this._next(buffer.shift());
81 else if (this.active === 0 && this.hasCompleted) {
82 this.destination.complete();
86 export const flatMap = mergeMap;
87 //# sourceMappingURL=mergeMap.js.map