753b16707c6bd425b72f955d094887e34a2530b0
[dotfiles/.git] / .config / coc / extensions / node_modules / coc-prettier / node_modules / rxjs / src / internal / operators / timeoutWith.ts
1 import { Operator } from '../Operator';
2 import { Subscriber } from '../Subscriber';
3 import { async } from '../scheduler/async';
4 import { Observable } from '../Observable';
5 import { isDate } from '../util/isDate';
6 import { OuterSubscriber } from '../OuterSubscriber';
7 import { subscribeToResult } from '../util/subscribeToResult';
8 import { ObservableInput, OperatorFunction, MonoTypeOperatorFunction, SchedulerAction, SchedulerLike, TeardownLogic } from '../types';
9
10 /* tslint:disable:max-line-length */
11 export function timeoutWith<T, R>(due: number | Date, withObservable: ObservableInput<R>, scheduler?: SchedulerLike): OperatorFunction<T, T | R>;
12 /* tslint:enable:max-line-length */
13
14 /**
15  *
16  * Errors if Observable does not emit a value in given time span, in case of which
17  * subscribes to the second Observable.
18  *
19  * <span class="informal">It's a version of `timeout` operator that let's you specify fallback Observable.</span>
20  *
21  * ![](timeoutWith.png)
22  *
23  * `timeoutWith` is a variation of `timeout` operator. It behaves exactly the same,
24  * still accepting as a first argument either a number or a Date, which control - respectively -
25  * when values of source Observable should be emitted or when it should complete.
26  *
27  * The only difference is that it accepts a second, required parameter. This parameter
28  * should be an Observable which will be subscribed when source Observable fails any timeout check.
29  * So whenever regular `timeout` would emit an error, `timeoutWith` will instead start re-emitting
30  * values from second Observable. Note that this fallback Observable is not checked for timeouts
31  * itself, so it can emit values and complete at arbitrary points in time. From the moment of a second
32  * subscription, Observable returned from `timeoutWith` simply mirrors fallback stream. When that
33  * stream completes, it completes as well.
34  *
35  * Scheduler, which in case of `timeout` is provided as as second argument, can be still provided
36  * here - as a third, optional parameter. It still is used to schedule timeout checks and -
37  * as a consequence - when second Observable will be subscribed, since subscription happens
38  * immediately after failing check.
39  *
40  * ## Example
41  * Add fallback observable
42  * ```ts
43  * import { intrerval } from 'rxjs';
44  * import { timeoutWith } from 'rxjs/operators';
45  *
46  * const seconds = interval(1000);
47  * const minutes = interval(60 * 1000);
48  *
49  * seconds.pipe(timeoutWith(900, minutes))
50  *   .subscribe(
51  *     value => console.log(value), // After 900ms, will start emitting `minutes`,
52  *                                  // since first value of `seconds` will not arrive fast enough.
53  *     err => console.log(err),     // Would be called after 900ms in case of `timeout`,
54  *                                  // but here will never be called.
55  *   );
56  * ```
57  *
58  * @param {number|Date} due Number specifying period within which Observable must emit values
59  *                          or Date specifying before when Observable should complete
60  * @param {Observable<T>} withObservable Observable which will be subscribed if source fails timeout check.
61  * @param {SchedulerLike} [scheduler] Scheduler controlling when timeout checks occur.
62  * @return {Observable<T>} Observable that mirrors behaviour of source or, when timeout check fails, of an Observable
63  *                          passed as a second parameter.
64  * @method timeoutWith
65  * @owner Observable
66  */
67 export function timeoutWith<T, R>(due: number | Date,
68                                   withObservable: ObservableInput<R>,
69                                   scheduler: SchedulerLike = async): OperatorFunction<T, T | R> {
70   return (source: Observable<T>) => {
71     let absoluteTimeout = isDate(due);
72     let waitFor = absoluteTimeout ? (+due - scheduler.now()) : Math.abs(<number>due);
73     return source.lift(new TimeoutWithOperator(waitFor, absoluteTimeout, withObservable, scheduler));
74   };
75 }
76
77 class TimeoutWithOperator<T> implements Operator<T, T> {
78   constructor(private waitFor: number,
79               private absoluteTimeout: boolean,
80               private withObservable: ObservableInput<any>,
81               private scheduler: SchedulerLike) {
82   }
83
84   call(subscriber: Subscriber<T>, source: any): TeardownLogic {
85     return source.subscribe(new TimeoutWithSubscriber(
86       subscriber, this.absoluteTimeout, this.waitFor, this.withObservable, this.scheduler
87     ));
88   }
89 }
90
91 /**
92  * We need this JSDoc comment for affecting ESDoc.
93  * @ignore
94  * @extends {Ignored}
95  */
96 class TimeoutWithSubscriber<T, R> extends OuterSubscriber<T, R> {
97
98   private action: SchedulerAction<TimeoutWithSubscriber<T, R>> = null;
99
100   constructor(destination: Subscriber<T>,
101               private absoluteTimeout: boolean,
102               private waitFor: number,
103               private withObservable: ObservableInput<any>,
104               private scheduler: SchedulerLike) {
105     super(destination);
106     this.scheduleTimeout();
107   }
108
109   private static dispatchTimeout<T, R>(subscriber: TimeoutWithSubscriber<T, R>): void {
110     const { withObservable } = subscriber;
111     (<any> subscriber)._unsubscribeAndRecycle();
112     subscriber.add(subscribeToResult(subscriber, withObservable));
113   }
114
115   private scheduleTimeout(): void {
116     const { action } = this;
117     if (action) {
118       // Recycle the action if we've already scheduled one. All the production
119       // Scheduler Actions mutate their state/delay time and return themeselves.
120       // VirtualActions are immutable, so they create and return a clone. In this
121       // case, we need to set the action reference to the most recent VirtualAction,
122       // to ensure that's the one we clone from next time.
123       this.action = (<SchedulerAction<TimeoutWithSubscriber<T, R>>> action.schedule(this, this.waitFor));
124     } else {
125       this.add(this.action = (<SchedulerAction<TimeoutWithSubscriber<T, R>>> this.scheduler.schedule<TimeoutWithSubscriber<T, R>>(
126         TimeoutWithSubscriber.dispatchTimeout, this.waitFor, this
127       )));
128     }
129   }
130
131   protected _next(value: T): void {
132     if (!this.absoluteTimeout) {
133       this.scheduleTimeout();
134     }
135     super._next(value);
136   }
137
138   /** @deprecated This is an internal implementation detail, do not use. */
139   _unsubscribe() {
140     this.action = null;
141     this.scheduler = null;
142     this.withObservable = null;
143   }
144 }