import { Observable } from '../Observable';
import { Subscriber } from '../Subscriber';
import { Subscription } from '../Subscription';
-import { subscribeToResult } from '../util/subscribeToResult';
-import { OuterSubscriber } from '../OuterSubscriber';
-import { InnerSubscriber } from '../InnerSubscriber';
import { ObservableInput, OperatorFunction } from '../types';
+import { SimpleOuterSubscriber, SimpleInnerSubscriber, innerSubscribe } from '../innerSubscribe';
/**
* Applies an accumulator function over the source Observable where the
* @ignore
* @extends {Ignored}
*/
-export class MergeScanSubscriber<T, R> extends OuterSubscriber<T, R> {
+export class MergeScanSubscriber<T, R> extends SimpleOuterSubscriber<T, R> {
private hasValue: boolean = false;
private hasCompleted: boolean = false;
private buffer: Observable<any>[] = [];
const { accumulator } = this;
ish = accumulator(this.acc, value, index);
} catch (e) {
- return destination.error(e);
+ return destination.error!(e);
}
this.active++;
- this._innerSub(ish, value, index);
+ this._innerSub(ish);
} else {
this.buffer.push(value);
}
}
- private _innerSub(ish: any, value: T, index: number): void {
- const innerSubscriber = new InnerSubscriber(this, value, index);
+ private _innerSub(ish: any): void {
+ const innerSubscriber = new SimpleInnerSubscriber(this);
const destination = this.destination as Subscription;
destination.add(innerSubscriber);
- const innerSubscription = subscribeToResult<T, R>(this, ish, undefined, undefined, innerSubscriber);
+ const innerSubscription = innerSubscribe(ish, innerSubscriber);
// The returned subscription will usually be the subscriber that was
// passed. However, interop subscribers will be wrapped and for
// unsubscriptions to chain correctly, the wrapper needs to be added, too.
this.hasCompleted = true;
if (this.active === 0 && this.buffer.length === 0) {
if (this.hasValue === false) {
- this.destination.next(this.acc);
+ this.destination.next!(this.acc);
}
- this.destination.complete();
+ this.destination.complete!();
}
this.unsubscribe();
}
- notifyNext(outerValue: T, innerValue: R,
- outerIndex: number, innerIndex: number,
- innerSub: InnerSubscriber<T, R>): void {
+ notifyNext(innerValue: R): void {
const { destination } = this;
this.acc = innerValue;
this.hasValue = true;
- destination.next(innerValue);
+ destination.next!(innerValue);
}
- notifyComplete(innerSub: Subscription): void {
+ notifyComplete(): void {
const buffer = this.buffer;
- const destination = this.destination as Subscription;
- destination.remove(innerSub);
this.active--;
if (buffer.length > 0) {
this._next(buffer.shift());
} else if (this.active === 0 && this.hasCompleted) {
if (this.hasValue === false) {
- this.destination.next(this.acc);
+ this.destination.next!(this.acc);
}
- this.destination.complete();
+ this.destination.complete!();
}
}
}