import { Operator } from '../Operator';
import { Subscriber } from '../Subscriber';
import { Observable } from '../Observable';
-import { OuterSubscriber } from '../OuterSubscriber';
-import { InnerSubscriber } from '../InnerSubscriber';
-import { subscribeToResult } from '../util/subscribeToResult';
import { OperatorFunction } from '../types';
+import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe';
/**
* Buffers the source Observable values until `closingNotifier` emits.
* @ignore
* @extends {Ignored}
*/
-class BufferSubscriber<T> extends OuterSubscriber<T, any> {
+class BufferSubscriber<T> extends SimpleOuterSubscriber<T, any> {
private buffer: T[] = [];
constructor(destination: Subscriber<T[]>, closingNotifier: Observable<any>) {
super(destination);
- this.add(subscribeToResult(this, closingNotifier));
+ this.add(innerSubscribe(closingNotifier, new SimpleInnerSubscriber(this)));
}
protected _next(value: T) {
this.buffer.push(value);
}
- notifyNext(outerValue: T, innerValue: any,
- outerIndex: number, innerIndex: number,
- innerSub: InnerSubscriber<T, any>): void {
+ notifyNext(): void {
const buffer = this.buffer;
this.buffer = [];
- this.destination.next(buffer);
+ this.destination.next!(buffer);
}
}