import { Observable } from '../Observable';
import { Subscription } from '../Subscription';
import { MonoTypeOperatorFunction, SubscribableOrPromise, TeardownLogic } from '../types';
-
-import { OuterSubscriber } from '../OuterSubscriber';
-import { subscribeToResult } from '../util/subscribeToResult';
+import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe';
/**
* Ignores source values for a duration determined by another Observable, then
* @ignore
* @extends {Ignored}
*/
-class AuditSubscriber<T, R> extends OuterSubscriber<T, R> {
+class AuditSubscriber<T, R> extends SimpleOuterSubscriber<T, R> {
- private value: T;
+ private value?: T;
private hasValue: boolean = false;
- private throttled: Subscription;
+ private throttled?: Subscription;
constructor(destination: Subscriber<T>,
private durationSelector: (value: T) => SubscribableOrPromise<any>) {
const { durationSelector } = this;
duration = durationSelector(value);
} catch (err) {
- return this.destination.error(err);
+ return this.destination.error!(err);
}
- const innerSubscription = subscribeToResult(this, duration);
+ const innerSubscription = innerSubscribe(duration, new SimpleInnerSubscriber(this));
if (!innerSubscription || innerSubscription.closed) {
this.clearThrottle();
} else {
const { value, hasValue, throttled } = this;
if (throttled) {
this.remove(throttled);
- this.throttled = null;
+ this.throttled = undefined;
throttled.unsubscribe();
}
if (hasValue) {
- this.value = null;
+ this.value = undefined;
this.hasValue = false;
- this.destination.next(value);
+ this.destination.next!(value);
}
}
- notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void {
+ notifyNext(): void {
this.clearThrottle();
}