1 import { Operator } from '../Operator';
2 import { Subscriber } from '../Subscriber';
3 import { Observable } from '../Observable';
4 import { Subscription } from '../Subscription';
5 import { MonoTypeOperatorFunction, SubscribableOrPromise, TeardownLogic } from '../types';
6 import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe';
9 * Ignores source values for a duration determined by another Observable, then
10 * emits the most recent value from the source Observable, then repeats this
13 * <span class="informal">It's like {@link auditTime}, but the silencing
14 * duration is determined by a second Observable.</span>
18 * `audit` is similar to `throttle`, but emits the last value from the silenced
19 * time window, instead of the first value. `audit` emits the most recent value
20 * from the source Observable on the output Observable as soon as its internal
21 * timer becomes disabled, and ignores source values while the timer is enabled.
22 * Initially, the timer is disabled. As soon as the first source value arrives,
23 * the timer is enabled by calling the `durationSelector` function with the
24 * source value, which returns the "duration" Observable. When the duration
25 * Observable emits a value or completes, the timer is disabled, then the most
26 * recent source value is emitted on the output Observable, and this process
27 * repeats for the next source value.
31 * Emit clicks at a rate of at most one click per second
33 * import { fromEvent, interval } from 'rxjs';
34 * import { audit } from 'rxjs/operators'
36 * const clicks = fromEvent(document, 'click');
37 * const result = clicks.pipe(audit(ev => interval(1000)));
38 * result.subscribe(x => console.log(x));
40 * @see {@link auditTime}
41 * @see {@link debounce}
42 * @see {@link delayWhen}
44 * @see {@link throttle}
46 * @param {function(value: T): SubscribableOrPromise} durationSelector A function
47 * that receives a value from the source Observable, for computing the silencing
48 * duration, returned as an Observable or a Promise.
49 * @return {Observable<T>} An Observable that performs rate-limiting of
50 * emissions from the source Observable.
54 export function audit<T>(durationSelector: (value: T) => SubscribableOrPromise<any>): MonoTypeOperatorFunction<T> {
55 return function auditOperatorFunction(source: Observable<T>) {
56 return source.lift(new AuditOperator(durationSelector));
60 class AuditOperator<T> implements Operator<T, T> {
61 constructor(private durationSelector: (value: T) => SubscribableOrPromise<any>) {
64 call(subscriber: Subscriber<T>, source: any): TeardownLogic {
65 return source.subscribe(new AuditSubscriber<T, T>(subscriber, this.durationSelector));
70 * We need this JSDoc comment for affecting ESDoc.
74 class AuditSubscriber<T, R> extends SimpleOuterSubscriber<T, R> {
77 private hasValue: boolean = false;
78 private throttled?: Subscription;
80 constructor(destination: Subscriber<T>,
81 private durationSelector: (value: T) => SubscribableOrPromise<any>) {
85 protected _next(value: T): void {
88 if (!this.throttled) {
91 const { durationSelector } = this;
92 duration = durationSelector(value);
94 return this.destination.error!(err);
96 const innerSubscription = innerSubscribe(duration, new SimpleInnerSubscriber(this));
97 if (!innerSubscription || innerSubscription.closed) {
100 this.add(this.throttled = innerSubscription);
106 const { value, hasValue, throttled } = this;
108 this.remove(throttled);
109 this.throttled = undefined;
110 throttled.unsubscribe();
113 this.value = undefined;
114 this.hasValue = false;
115 this.destination.next!(value);
120 this.clearThrottle();
123 notifyComplete(): void {
124 this.clearThrottle();