1 import { SimpleOuterSubscriber, SimpleInnerSubscriber, innerSubscribe } from '../innerSubscribe';
2 export function mergeScan(accumulator, seed, concurrent = Number.POSITIVE_INFINITY) {
3 return (source) => source.lift(new MergeScanOperator(accumulator, seed, concurrent));
5 export class MergeScanOperator {
6 constructor(accumulator, seed, concurrent) {
7 this.accumulator = accumulator;
9 this.concurrent = concurrent;
11 call(subscriber, source) {
12 return source.subscribe(new MergeScanSubscriber(subscriber, this.accumulator, this.seed, this.concurrent));
15 export class MergeScanSubscriber extends SimpleOuterSubscriber {
16 constructor(destination, accumulator, acc, concurrent) {
18 this.accumulator = accumulator;
20 this.concurrent = concurrent;
21 this.hasValue = false;
22 this.hasCompleted = false;
28 if (this.active < this.concurrent) {
29 const index = this.index++;
30 const destination = this.destination;
33 const { accumulator } = this;
34 ish = accumulator(this.acc, value, index);
37 return destination.error(e);
43 this.buffer.push(value);
47 const innerSubscriber = new SimpleInnerSubscriber(this);
48 const destination = this.destination;
49 destination.add(innerSubscriber);
50 const innerSubscription = innerSubscribe(ish, innerSubscriber);
51 if (innerSubscription !== innerSubscriber) {
52 destination.add(innerSubscription);
56 this.hasCompleted = true;
57 if (this.active === 0 && this.buffer.length === 0) {
58 if (this.hasValue === false) {
59 this.destination.next(this.acc);
61 this.destination.complete();
65 notifyNext(innerValue) {
66 const { destination } = this;
67 this.acc = innerValue;
69 destination.next(innerValue);
72 const buffer = this.buffer;
74 if (buffer.length > 0) {
75 this._next(buffer.shift());
77 else if (this.active === 0 && this.hasCompleted) {
78 if (this.hasValue === false) {
79 this.destination.next(this.acc);
81 this.destination.complete();
85 //# sourceMappingURL=mergeScan.js.map