1 import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe';
2 export function expand(project, concurrent = Number.POSITIVE_INFINITY, scheduler) {
3 concurrent = (concurrent || 0) < 1 ? Number.POSITIVE_INFINITY : concurrent;
4 return (source) => source.lift(new ExpandOperator(project, concurrent, scheduler));
6 export class ExpandOperator {
7 constructor(project, concurrent, scheduler) {
8 this.project = project;
9 this.concurrent = concurrent;
10 this.scheduler = scheduler;
12 call(subscriber, source) {
13 return source.subscribe(new ExpandSubscriber(subscriber, this.project, this.concurrent, this.scheduler));
16 export class ExpandSubscriber extends SimpleOuterSubscriber {
17 constructor(destination, project, concurrent, scheduler) {
19 this.project = project;
20 this.concurrent = concurrent;
21 this.scheduler = scheduler;
24 this.hasCompleted = false;
25 if (concurrent < Number.POSITIVE_INFINITY) {
29 static dispatch(arg) {
30 const { subscriber, result, value, index } = arg;
31 subscriber.subscribeToProjection(result, value, index);
34 const destination = this.destination;
35 if (destination.closed) {
39 const index = this.index++;
40 if (this.active < this.concurrent) {
41 destination.next(value);
43 const { project } = this;
44 const result = project(value, index);
45 if (!this.scheduler) {
46 this.subscribeToProjection(result, value, index);
49 const state = { subscriber: this, result, value, index };
50 const destination = this.destination;
51 destination.add(this.scheduler.schedule(ExpandSubscriber.dispatch, 0, state));
59 this.buffer.push(value);
62 subscribeToProjection(result, value, index) {
64 const destination = this.destination;
65 destination.add(innerSubscribe(result, new SimpleInnerSubscriber(this)));
68 this.hasCompleted = true;
69 if (this.hasCompleted && this.active === 0) {
70 this.destination.complete();
74 notifyNext(innerValue) {
75 this._next(innerValue);
78 const buffer = this.buffer;
80 if (buffer && buffer.length > 0) {
81 this._next(buffer.shift());
83 if (this.hasCompleted && this.active === 0) {
84 this.destination.complete();
88 //# sourceMappingURL=expand.js.map