1 import { OuterSubscriber } from '../OuterSubscriber';
2 import { subscribeToResult } from '../util/subscribeToResult';
3 export function expand(project, concurrent = Number.POSITIVE_INFINITY, scheduler = undefined) {
4 concurrent = (concurrent || 0) < 1 ? Number.POSITIVE_INFINITY : concurrent;
5 return (source) => source.lift(new ExpandOperator(project, concurrent, scheduler));
7 export class ExpandOperator {
8 constructor(project, concurrent, scheduler) {
9 this.project = project;
10 this.concurrent = concurrent;
11 this.scheduler = scheduler;
13 call(subscriber, source) {
14 return source.subscribe(new ExpandSubscriber(subscriber, this.project, this.concurrent, this.scheduler));
17 export class ExpandSubscriber extends OuterSubscriber {
18 constructor(destination, project, concurrent, scheduler) {
20 this.project = project;
21 this.concurrent = concurrent;
22 this.scheduler = scheduler;
25 this.hasCompleted = false;
26 if (concurrent < Number.POSITIVE_INFINITY) {
30 static dispatch(arg) {
31 const { subscriber, result, value, index } = arg;
32 subscriber.subscribeToProjection(result, value, index);
35 const destination = this.destination;
36 if (destination.closed) {
40 const index = this.index++;
41 if (this.active < this.concurrent) {
42 destination.next(value);
44 const { project } = this;
45 const result = project(value, index);
46 if (!this.scheduler) {
47 this.subscribeToProjection(result, value, index);
50 const state = { subscriber: this, result, value, index };
51 const destination = this.destination;
52 destination.add(this.scheduler.schedule(ExpandSubscriber.dispatch, 0, state));
60 this.buffer.push(value);
63 subscribeToProjection(result, value, index) {
65 const destination = this.destination;
66 destination.add(subscribeToResult(this, result, value, index));
69 this.hasCompleted = true;
70 if (this.hasCompleted && this.active === 0) {
71 this.destination.complete();
75 notifyNext(outerValue, innerValue, outerIndex, innerIndex, innerSub) {
76 this._next(innerValue);
78 notifyComplete(innerSub) {
79 const buffer = this.buffer;
80 const destination = this.destination;
81 destination.remove(innerSub);
83 if (buffer && buffer.length > 0) {
84 this._next(buffer.shift());
86 if (this.hasCompleted && this.active === 0) {
87 this.destination.complete();
91 //# sourceMappingURL=expand.js.map