import { Subscriber } from '../Subscriber';
import { Subscription } from '../Subscription';
-import { OuterSubscriber } from '../OuterSubscriber';
-import { InnerSubscriber } from '../InnerSubscriber';
-import { subscribeToResult } from '../util/subscribeToResult';
-
import { MonoTypeOperatorFunction, SubscribableOrPromise, TeardownLogic } from '../types';
+import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe';
export interface ThrottleConfig {
leading?: boolean;
*/
export function throttle<T>(durationSelector: (value: T) => SubscribableOrPromise<any>,
config: ThrottleConfig = defaultThrottleConfig): MonoTypeOperatorFunction<T> {
- return (source: Observable<T>) => source.lift(new ThrottleOperator(durationSelector, config.leading, config.trailing));
+ return (source: Observable<T>) => source.lift(new ThrottleOperator(durationSelector, !!config.leading, !!config.trailing));
}
class ThrottleOperator<T> implements Operator<T, T> {
* @ignore
* @extends {Ignored}
*/
-class ThrottleSubscriber<T, R> extends OuterSubscriber<T, R> {
- private _throttled: Subscription;
- private _sendValue: T;
+class ThrottleSubscriber<T, R> extends SimpleOuterSubscriber<T, R> {
+ private _throttled?: Subscription;
+ private _sendValue?: T;
private _hasValue = false;
constructor(protected destination: Subscriber<T>,
const { _hasValue, _sendValue } = this;
if (_hasValue) {
this.destination.next(_sendValue);
- this.throttle(_sendValue);
+ this.throttle(_sendValue!);
}
this._hasValue = false;
- this._sendValue = null;
+ this._sendValue = undefined;
}
private throttle(value: T): void {
const duration = this.tryDurationSelector(value);
if (!!duration) {
- this.add(this._throttled = subscribeToResult(this, duration));
+ this.add(this._throttled = innerSubscribe(duration, new SimpleInnerSubscriber(this)));
}
}
- private tryDurationSelector(value: T): SubscribableOrPromise<any> {
+ private tryDurationSelector(value: T): SubscribableOrPromise<any> | null {
try {
return this.durationSelector(value);
} catch (err) {
if (_throttled) {
_throttled.unsubscribe();
}
- this._throttled = null;
+ this._throttled = undefined;
if (_trailing) {
this.send();
}
}
- notifyNext(outerValue: T, innerValue: R,
- outerIndex: number, innerIndex: number,
- innerSub: InnerSubscriber<T, R>): void {
+ notifyNext(): void {
this.throttlingDone();
}