1 import { Operator } from '../Operator';
2 import { Observable } from '../Observable';
3 import { Subscriber } from '../Subscriber';
4 import { Subscription } from '../Subscription';
5 import { OuterSubscriber } from '../OuterSubscriber';
6 import { subscribeToResult } from '../util/subscribeToResult';
7 import { ObservableInput, OperatorFunction, TeardownLogic } from '../types';
9 export function exhaust<T>(): OperatorFunction<ObservableInput<T>, T>;
10 export function exhaust<R>(): OperatorFunction<any, R>;
13 * Converts a higher-order Observable into a first-order Observable by dropping
14 * inner Observables while the previous inner Observable has not yet completed.
16 * <span class="informal">Flattens an Observable-of-Observables by dropping the
17 * next inner Observables while the current inner is still executing.</span>
21 * `exhaust` subscribes to an Observable that emits Observables, also known as a
22 * higher-order Observable. Each time it observes one of these emitted inner
23 * Observables, the output Observable begins emitting the items emitted by that
24 * inner Observable. So far, it behaves like {@link mergeAll}. However,
25 * `exhaust` ignores every new inner Observable if the previous Observable has
26 * not yet completed. Once that one completes, it will accept and flatten the
27 * next inner Observable and repeat this process.
30 * Run a finite timer for each click, only if there is no currently active timer
32 * import { fromEvent, interval } from 'rxjs';
33 * import { exhaust, map, take } from 'rxjs/operators';
35 * const clicks = fromEvent(document, 'click');
36 * const higherOrder = clicks.pipe(
37 * map((ev) => interval(1000).pipe(take(5))),
39 * const result = higherOrder.pipe(exhaust());
40 * result.subscribe(x => console.log(x));
43 * @see {@link combineAll}
44 * @see {@link concatAll}
45 * @see {@link switchAll}
46 * @see {@link switchMap}
47 * @see {@link mergeAll}
48 * @see {@link exhaustMap}
51 * @return {Observable} An Observable that takes a source of Observables and propagates the first observable
52 * exclusively until it completes before subscribing to the next.
56 export function exhaust<T>(): OperatorFunction<any, T> {
57 return (source: Observable<T>) => source.lift(new SwitchFirstOperator<T>());
60 class SwitchFirstOperator<T> implements Operator<T, T> {
61 call(subscriber: Subscriber<T>, source: any): TeardownLogic {
62 return source.subscribe(new SwitchFirstSubscriber(subscriber));
67 * We need this JSDoc comment for affecting ESDoc.
71 class SwitchFirstSubscriber<T> extends OuterSubscriber<T, T> {
72 private hasCompleted: boolean = false;
73 private hasSubscription: boolean = false;
75 constructor(destination: Subscriber<T>) {
79 protected _next(value: T): void {
80 if (!this.hasSubscription) {
81 this.hasSubscription = true;
82 this.add(subscribeToResult(this, value));
86 protected _complete(): void {
87 this.hasCompleted = true;
88 if (!this.hasSubscription) {
89 this.destination.complete();
93 notifyComplete(innerSub: Subscription): void {
94 this.remove(innerSub);
95 this.hasSubscription = false;
96 if (this.hasCompleted) {
97 this.destination.complete();