import { ObservableInput, PartialObserver, ObservedValueOf } from '../types';
import { Subscriber } from '../Subscriber';
import { Subscription } from '../Subscription';
-import { OuterSubscriber } from '../OuterSubscriber';
-import { InnerSubscriber } from '../InnerSubscriber';
-import { subscribeToResult } from '../util/subscribeToResult';
import { iterator as Symbol_iterator } from '../../internal/symbol/iterator';
+import { SimpleOuterSubscriber, SimpleInnerSubscriber, innerSubscribe } from '../innerSubscribe';
/* tslint:disable:max-line-length */
/** @deprecated resultSelector is no longer supported, pipe to map instead */
export class ZipOperator<T, R> implements Operator<T, R> {
- resultSelector: (...values: Array<any>) => R;
+ resultSelector?: (...values: Array<any>) => R;
constructor(resultSelector?: (...values: Array<any>) => R) {
this.resultSelector = resultSelector;
* @extends {Ignored}
*/
export class ZipSubscriber<T, R> extends Subscriber<T> {
- private values: any;
- private resultSelector: (...values: Array<any>) => R;
private iterators: LookAheadIterator<any>[] = [];
private active = 0;
constructor(destination: Subscriber<R>,
- resultSelector?: (...values: Array<any>) => R,
+ private resultSelector?: (...values: Array<any>) => R,
values: any = Object.create(null)) {
super(destination);
- this.resultSelector = (typeof resultSelector === 'function') ? resultSelector : null;
- this.values = values;
+ this.resultSelector = (typeof resultSelector === 'function') ? resultSelector : undefined;
}
protected _next(value: any) {
this.unsubscribe();
if (len === 0) {
- this.destination.complete();
+ this.destination.complete!();
return;
}
let iterator: ZipBufferIterator<any, any> = <any>iterators[i];
if (iterator.stillUnsubscribed) {
const destination = this.destination as Subscription;
- destination.add(iterator.subscribe(iterator, i));
+ destination.add(iterator.subscribe());
} else {
this.active--; // not an observable
}
notifyInactive() {
this.active--;
if (this.active === 0) {
- this.destination.complete();
+ this.destination.complete!();
}
}
}
if (result.done) {
- destination.complete();
+ destination.complete!();
return;
}
if (this.resultSelector) {
this._tryresultSelector(args);
} else {
- destination.next(args);
+ destination.next!(args);
}
if (shouldComplete) {
- destination.complete();
+ destination.complete!();
}
}
protected _tryresultSelector(args: any[]) {
let result: any;
try {
- result = this.resultSelector.apply(this, args);
+ result = this.resultSelector!.apply(this, args);
} catch (err) {
- this.destination.error(err);
+ this.destination.error!(err);
return;
}
- this.destination.next(result);
+ this.destination.next!(result);
}
}
return result;
}
- hasCompleted() {
+ hasCompleted(): boolean {
const nextResult = this.nextResult;
- return nextResult && nextResult.done;
+ return Boolean(nextResult && nextResult.done);
}
}
* @ignore
* @extends {Ignored}
*/
-class ZipBufferIterator<T, R> extends OuterSubscriber<T, R> implements LookAheadIterator<T> {
+class ZipBufferIterator<T, R> extends SimpleOuterSubscriber<T, R> implements LookAheadIterator<T> {
stillUnsubscribed = true;
buffer: T[] = [];
isComplete = false;
if (buffer.length === 0 && this.isComplete) {
return { value: null, done: true };
} else {
- return { value: buffer.shift(), done: false };
+ return { value: buffer.shift()!, done: false };
}
}
this.isComplete = true;
this.parent.notifyInactive();
} else {
- this.destination.complete();
+ this.destination.complete!();
}
}
- notifyNext(outerValue: T, innerValue: any,
- outerIndex: number, innerIndex: number,
- innerSub: InnerSubscriber<T, R>): void {
+ notifyNext(innerValue: any): void {
this.buffer.push(innerValue);
this.parent.checkIterators();
}
- subscribe(value: any, index: number) {
- return subscribeToResult<any, any>(this, this.observable, this, index);
+ subscribe() {
+ return innerSubscribe(this.observable, new SimpleInnerSubscriber(this));
}
}