Giant blob of minor changes
[dotfiles/.git] / .config / coc / extensions / node_modules / coc-prettier / node_modules / rxjs / src / internal / operators / timeoutWith.ts
1 import { Operator } from '../Operator';
2 import { Subscriber } from '../Subscriber';
3 import { async } from '../scheduler/async';
4 import { Observable } from '../Observable';
5 import { isDate } from '../util/isDate';
6 import { ObservableInput, OperatorFunction, SchedulerAction, SchedulerLike, TeardownLogic } from '../types';
7 import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe';
8
9 /* tslint:disable:max-line-length */
10 export function timeoutWith<T, R>(due: number | Date, withObservable: ObservableInput<R>, scheduler?: SchedulerLike): OperatorFunction<T, T | R>;
11 /* tslint:enable:max-line-length */
12
13 /**
14  *
15  * Errors if Observable does not emit a value in given time span, in case of which
16  * subscribes to the second Observable.
17  *
18  * <span class="informal">It's a version of `timeout` operator that let's you specify fallback Observable.</span>
19  *
20  * ![](timeoutWith.png)
21  *
22  * `timeoutWith` is a variation of `timeout` operator. It behaves exactly the same,
23  * still accepting as a first argument either a number or a Date, which control - respectively -
24  * when values of source Observable should be emitted or when it should complete.
25  *
26  * The only difference is that it accepts a second, required parameter. This parameter
27  * should be an Observable which will be subscribed when source Observable fails any timeout check.
28  * So whenever regular `timeout` would emit an error, `timeoutWith` will instead start re-emitting
29  * values from second Observable. Note that this fallback Observable is not checked for timeouts
30  * itself, so it can emit values and complete at arbitrary points in time. From the moment of a second
31  * subscription, Observable returned from `timeoutWith` simply mirrors fallback stream. When that
32  * stream completes, it completes as well.
33  *
34  * Scheduler, which in case of `timeout` is provided as as second argument, can be still provided
35  * here - as a third, optional parameter. It still is used to schedule timeout checks and -
36  * as a consequence - when second Observable will be subscribed, since subscription happens
37  * immediately after failing check.
38  *
39  * ## Example
40  * Add fallback observable
41  * ```ts
42  * import { interval } from 'rxjs';
43  * import { timeoutWith } from 'rxjs/operators';
44  *
45  * const seconds = interval(1000);
46  * const minutes = interval(60 * 1000);
47  *
48  * seconds.pipe(timeoutWith(900, minutes))
49  *   .subscribe(
50  *     value => console.log(value), // After 900ms, will start emitting `minutes`,
51  *                                  // since first value of `seconds` will not arrive fast enough.
52  *     err => console.log(err),     // Would be called after 900ms in case of `timeout`,
53  *                                  // but here will never be called.
54  *   );
55  * ```
56  *
57  * @param {number|Date} due Number specifying period within which Observable must emit values
58  *                          or Date specifying before when Observable should complete
59  * @param {Observable<T>} withObservable Observable which will be subscribed if source fails timeout check.
60  * @param {SchedulerLike} [scheduler] Scheduler controlling when timeout checks occur.
61  * @return {Observable<T>} Observable that mirrors behaviour of source or, when timeout check fails, of an Observable
62  *                          passed as a second parameter.
63  * @method timeoutWith
64  * @owner Observable
65  */
66 export function timeoutWith<T, R>(due: number | Date,
67                                   withObservable: ObservableInput<R>,
68                                   scheduler: SchedulerLike = async): OperatorFunction<T, T | R> {
69   return (source: Observable<T>) => {
70     let absoluteTimeout = isDate(due);
71     let waitFor = absoluteTimeout ? (+due - scheduler.now()) : Math.abs(<number>due);
72     return source.lift(new TimeoutWithOperator(waitFor, absoluteTimeout, withObservable, scheduler));
73   };
74 }
75
76 class TimeoutWithOperator<T> implements Operator<T, T> {
77   constructor(private waitFor: number,
78               private absoluteTimeout: boolean,
79               private withObservable: ObservableInput<any>,
80               private scheduler: SchedulerLike) {
81   }
82
83   call(subscriber: Subscriber<T>, source: any): TeardownLogic {
84     return source.subscribe(new TimeoutWithSubscriber(
85       subscriber, this.absoluteTimeout, this.waitFor, this.withObservable, this.scheduler
86     ));
87   }
88 }
89
90 /**
91  * We need this JSDoc comment for affecting ESDoc.
92  * @ignore
93  * @extends {Ignored}
94  */
95 class TimeoutWithSubscriber<T, R> extends SimpleOuterSubscriber<T, R> {
96
97   private action?: SchedulerAction<TimeoutWithSubscriber<T, R>>;
98
99   constructor(destination: Subscriber<T>,
100               private absoluteTimeout: boolean,
101               private waitFor: number,
102               private withObservable: ObservableInput<any>,
103               private scheduler: SchedulerLike) {
104     super(destination);
105     this.scheduleTimeout();
106   }
107
108   private static dispatchTimeout<T, R>(subscriber: TimeoutWithSubscriber<T, R>): void {
109     const { withObservable } = subscriber;
110     subscriber._unsubscribeAndRecycle();
111     subscriber.add(innerSubscribe(withObservable, new SimpleInnerSubscriber(subscriber)));
112   }
113
114   private scheduleTimeout(): void {
115     const { action } = this;
116     if (action) {
117       // Recycle the action if we've already scheduled one. All the production
118       // Scheduler Actions mutate their state/delay time and return themeselves.
119       // VirtualActions are immutable, so they create and return a clone. In this
120       // case, we need to set the action reference to the most recent VirtualAction,
121       // to ensure that's the one we clone from next time.
122       this.action = (<SchedulerAction<TimeoutWithSubscriber<T, R>>> action.schedule(this, this.waitFor));
123     } else {
124       this.add(this.action = (<SchedulerAction<TimeoutWithSubscriber<T, R>>> this.scheduler.schedule<TimeoutWithSubscriber<T, R>>(
125         TimeoutWithSubscriber.dispatchTimeout as any, this.waitFor, this
126       )));
127     }
128   }
129
130   protected _next(value: T): void {
131     if (!this.absoluteTimeout) {
132       this.scheduleTimeout();
133     }
134     super._next(value);
135   }
136
137   /** @deprecated This is an internal implementation detail, do not use. */
138   _unsubscribe() {
139     this.action = undefined;
140     this.scheduler = null!;
141     this.withObservable = null!;
142   }
143 }