1 import { Operator } from '../Operator';
2 import { Observable } from '../Observable';
3 import { Subscriber } from '../Subscriber';
4 import { ObservableInput, OperatorFunction, TeardownLogic } from '../types';
5 import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe';
7 export function exhaust<T>(): OperatorFunction<ObservableInput<T>, T>;
8 export function exhaust<R>(): OperatorFunction<any, R>;
11 * Converts a higher-order Observable into a first-order Observable by dropping
12 * inner Observables while the previous inner Observable has not yet completed.
14 * <span class="informal">Flattens an Observable-of-Observables by dropping the
15 * next inner Observables while the current inner is still executing.</span>
19 * `exhaust` subscribes to an Observable that emits Observables, also known as a
20 * higher-order Observable. Each time it observes one of these emitted inner
21 * Observables, the output Observable begins emitting the items emitted by that
22 * inner Observable. So far, it behaves like {@link mergeAll}. However,
23 * `exhaust` ignores every new inner Observable if the previous Observable has
24 * not yet completed. Once that one completes, it will accept and flatten the
25 * next inner Observable and repeat this process.
28 * Run a finite timer for each click, only if there is no currently active timer
30 * import { fromEvent, interval } from 'rxjs';
31 * import { exhaust, map, take } from 'rxjs/operators';
33 * const clicks = fromEvent(document, 'click');
34 * const higherOrder = clicks.pipe(
35 * map((ev) => interval(1000).pipe(take(5))),
37 * const result = higherOrder.pipe(exhaust());
38 * result.subscribe(x => console.log(x));
41 * @see {@link combineAll}
42 * @see {@link concatAll}
43 * @see {@link switchAll}
44 * @see {@link switchMap}
45 * @see {@link mergeAll}
46 * @see {@link exhaustMap}
49 * @return {Observable} An Observable that takes a source of Observables and propagates the first observable
50 * exclusively until it completes before subscribing to the next.
54 export function exhaust<T>(): OperatorFunction<any, T> {
55 return (source: Observable<T>) => source.lift(new SwitchFirstOperator<T>());
58 class SwitchFirstOperator<T> implements Operator<T, T> {
59 call(subscriber: Subscriber<T>, source: any): TeardownLogic {
60 return source.subscribe(new SwitchFirstSubscriber(subscriber));
65 * We need this JSDoc comment for affecting ESDoc.
69 class SwitchFirstSubscriber<T> extends SimpleOuterSubscriber<T, T> {
70 private hasCompleted: boolean = false;
71 private hasSubscription: boolean = false;
73 constructor(destination: Subscriber<T>) {
77 protected _next(value: T): void {
78 if (!this.hasSubscription) {
79 this.hasSubscription = true;
80 this.add(innerSubscribe(value, new SimpleInnerSubscriber(this)));
84 protected _complete(): void {
85 this.hasCompleted = true;
86 if (!this.hasSubscription) {
87 this.destination.complete!();
91 notifyComplete(): void {
92 this.hasSubscription = false;
93 if (this.hasCompleted) {
94 this.destination.complete!();