1 import { Operator } from '../Operator';
2 import { Subscriber } from '../Subscriber';
3 import { async } from '../scheduler/async';
4 import { Observable } from '../Observable';
5 import { isDate } from '../util/isDate';
6 import { OuterSubscriber } from '../OuterSubscriber';
7 import { subscribeToResult } from '../util/subscribeToResult';
8 import { ObservableInput, OperatorFunction, MonoTypeOperatorFunction, SchedulerAction, SchedulerLike, TeardownLogic } from '../types';
10 /* tslint:disable:max-line-length */
11 export function timeoutWith<T, R>(due: number | Date, withObservable: ObservableInput<R>, scheduler?: SchedulerLike): OperatorFunction<T, T | R>;
12 /* tslint:enable:max-line-length */
16 * Errors if Observable does not emit a value in given time span, in case of which
17 * subscribes to the second Observable.
19 * <span class="informal">It's a version of `timeout` operator that let's you specify fallback Observable.</span>
21 * ![](timeoutWith.png)
23 * `timeoutWith` is a variation of `timeout` operator. It behaves exactly the same,
24 * still accepting as a first argument either a number or a Date, which control - respectively -
25 * when values of source Observable should be emitted or when it should complete.
27 * The only difference is that it accepts a second, required parameter. This parameter
28 * should be an Observable which will be subscribed when source Observable fails any timeout check.
29 * So whenever regular `timeout` would emit an error, `timeoutWith` will instead start re-emitting
30 * values from second Observable. Note that this fallback Observable is not checked for timeouts
31 * itself, so it can emit values and complete at arbitrary points in time. From the moment of a second
32 * subscription, Observable returned from `timeoutWith` simply mirrors fallback stream. When that
33 * stream completes, it completes as well.
35 * Scheduler, which in case of `timeout` is provided as as second argument, can be still provided
36 * here - as a third, optional parameter. It still is used to schedule timeout checks and -
37 * as a consequence - when second Observable will be subscribed, since subscription happens
38 * immediately after failing check.
41 * Add fallback observable
43 * import { intrerval } from 'rxjs';
44 * import { timeoutWith } from 'rxjs/operators';
46 * const seconds = interval(1000);
47 * const minutes = interval(60 * 1000);
49 * seconds.pipe(timeoutWith(900, minutes))
51 * value => console.log(value), // After 900ms, will start emitting `minutes`,
52 * // since first value of `seconds` will not arrive fast enough.
53 * err => console.log(err), // Would be called after 900ms in case of `timeout`,
54 * // but here will never be called.
58 * @param {number|Date} due Number specifying period within which Observable must emit values
59 * or Date specifying before when Observable should complete
60 * @param {Observable<T>} withObservable Observable which will be subscribed if source fails timeout check.
61 * @param {SchedulerLike} [scheduler] Scheduler controlling when timeout checks occur.
62 * @return {Observable<T>} Observable that mirrors behaviour of source or, when timeout check fails, of an Observable
63 * passed as a second parameter.
67 export function timeoutWith<T, R>(due: number | Date,
68 withObservable: ObservableInput<R>,
69 scheduler: SchedulerLike = async): OperatorFunction<T, T | R> {
70 return (source: Observable<T>) => {
71 let absoluteTimeout = isDate(due);
72 let waitFor = absoluteTimeout ? (+due - scheduler.now()) : Math.abs(<number>due);
73 return source.lift(new TimeoutWithOperator(waitFor, absoluteTimeout, withObservable, scheduler));
77 class TimeoutWithOperator<T> implements Operator<T, T> {
78 constructor(private waitFor: number,
79 private absoluteTimeout: boolean,
80 private withObservable: ObservableInput<any>,
81 private scheduler: SchedulerLike) {
84 call(subscriber: Subscriber<T>, source: any): TeardownLogic {
85 return source.subscribe(new TimeoutWithSubscriber(
86 subscriber, this.absoluteTimeout, this.waitFor, this.withObservable, this.scheduler
92 * We need this JSDoc comment for affecting ESDoc.
96 class TimeoutWithSubscriber<T, R> extends OuterSubscriber<T, R> {
98 private action: SchedulerAction<TimeoutWithSubscriber<T, R>> = null;
100 constructor(destination: Subscriber<T>,
101 private absoluteTimeout: boolean,
102 private waitFor: number,
103 private withObservable: ObservableInput<any>,
104 private scheduler: SchedulerLike) {
106 this.scheduleTimeout();
109 private static dispatchTimeout<T, R>(subscriber: TimeoutWithSubscriber<T, R>): void {
110 const { withObservable } = subscriber;
111 (<any> subscriber)._unsubscribeAndRecycle();
112 subscriber.add(subscribeToResult(subscriber, withObservable));
115 private scheduleTimeout(): void {
116 const { action } = this;
118 // Recycle the action if we've already scheduled one. All the production
119 // Scheduler Actions mutate their state/delay time and return themeselves.
120 // VirtualActions are immutable, so they create and return a clone. In this
121 // case, we need to set the action reference to the most recent VirtualAction,
122 // to ensure that's the one we clone from next time.
123 this.action = (<SchedulerAction<TimeoutWithSubscriber<T, R>>> action.schedule(this, this.waitFor));
125 this.add(this.action = (<SchedulerAction<TimeoutWithSubscriber<T, R>>> this.scheduler.schedule<TimeoutWithSubscriber<T, R>>(
126 TimeoutWithSubscriber.dispatchTimeout, this.waitFor, this
131 protected _next(value: T): void {
132 if (!this.absoluteTimeout) {
133 this.scheduleTimeout();
138 /** @deprecated This is an internal implementation detail, do not use. */
141 this.scheduler = null;
142 this.withObservable = null;