minor adjustment to readme
[dotfiles/.git] / .config / coc / extensions / node_modules / coc-prettier / node_modules / rxjs / src / internal / operators / delay.ts
1 import { async } from '../scheduler/async';
2 import { isDate } from '../util/isDate';
3 import { Operator } from '../Operator';
4 import { Subscriber } from '../Subscriber';
5 import { Subscription } from '../Subscription';
6 import { Notification } from '../Notification';
7 import { Observable } from '../Observable';
8 import { MonoTypeOperatorFunction, PartialObserver, SchedulerAction, SchedulerLike, TeardownLogic } from '../types';
9
10 /**
11  * Delays the emission of items from the source Observable by a given timeout or
12  * until a given Date.
13  *
14  * <span class="informal">Time shifts each item by some specified amount of
15  * milliseconds.</span>
16  *
17  * ![](delay.png)
18  *
19  * If the delay argument is a Number, this operator time shifts the source
20  * Observable by that amount of time expressed in milliseconds. The relative
21  * time intervals between the values are preserved.
22  *
23  * If the delay argument is a Date, this operator time shifts the start of the
24  * Observable execution until the given date occurs.
25  *
26  * ## Examples
27  * Delay each click by one second
28  * ```ts
29  * import { fromEvent } from 'rxjs';
30  * import { delay } from 'rxjs/operators';
31  *
32  * const clicks = fromEvent(document, 'click');
33  * const delayedClicks = clicks.pipe(delay(1000)); // each click emitted after 1 second
34  * delayedClicks.subscribe(x => console.log(x));
35  * ```
36  *
37  * Delay all clicks until a future date happens
38  * ```ts
39  * import { fromEvent } from 'rxjs';
40  * import { delay } from 'rxjs/operators';
41  *
42  * const clicks = fromEvent(document, 'click');
43  * const date = new Date('March 15, 2050 12:00:00'); // in the future
44  * const delayedClicks = clicks.pipe(delay(date)); // click emitted only after that date
45  * delayedClicks.subscribe(x => console.log(x));
46  * ```
47  *
48  * @see {@link debounceTime}
49  * @see {@link delayWhen}
50  *
51  * @param {number|Date} delay The delay duration in milliseconds (a `number`) or
52  * a `Date` until which the emission of the source items is delayed.
53  * @param {SchedulerLike} [scheduler=async] The {@link SchedulerLike} to use for
54  * managing the timers that handle the time-shift for each item.
55  * @return {Observable} An Observable that delays the emissions of the source
56  * Observable by the specified timeout or Date.
57  * @method delay
58  * @owner Observable
59  */
60 export function delay<T>(delay: number|Date,
61                          scheduler: SchedulerLike = async): MonoTypeOperatorFunction<T> {
62   const absoluteDelay = isDate(delay);
63   const delayFor = absoluteDelay ? (+delay - scheduler.now()) : Math.abs(<number>delay);
64   return (source: Observable<T>) => source.lift(new DelayOperator(delayFor, scheduler));
65 }
66
67 class DelayOperator<T> implements Operator<T, T> {
68   constructor(private delay: number,
69               private scheduler: SchedulerLike) {
70   }
71
72   call(subscriber: Subscriber<T>, source: any): TeardownLogic {
73     return source.subscribe(new DelaySubscriber(subscriber, this.delay, this.scheduler));
74   }
75 }
76
77 interface DelayState<T> {
78   source: DelaySubscriber<T>;
79   destination: PartialObserver<T>;
80   scheduler: SchedulerLike;
81 }
82
83 /**
84  * We need this JSDoc comment for affecting ESDoc.
85  * @ignore
86  * @extends {Ignored}
87  */
88 class DelaySubscriber<T> extends Subscriber<T> {
89   private queue: Array<DelayMessage<T>> = [];
90   private active: boolean = false;
91   private errored: boolean = false;
92
93   private static dispatch<T>(this: SchedulerAction<DelayState<T>>, state: DelayState<T>): void {
94     const source = state.source;
95     const queue = source.queue;
96     const scheduler = state.scheduler;
97     const destination = state.destination;
98
99     while (queue.length > 0 && (queue[0].time - scheduler.now()) <= 0) {
100       queue.shift().notification.observe(destination);
101     }
102
103     if (queue.length > 0) {
104       const delay = Math.max(0, queue[0].time - scheduler.now());
105       this.schedule(state, delay);
106     } else {
107       this.unsubscribe();
108       source.active = false;
109     }
110   }
111
112   constructor(destination: Subscriber<T>,
113               private delay: number,
114               private scheduler: SchedulerLike) {
115     super(destination);
116   }
117
118   private _schedule(scheduler: SchedulerLike): void {
119     this.active = true;
120     const destination = this.destination as Subscription;
121     destination.add(scheduler.schedule<DelayState<T>>(DelaySubscriber.dispatch, this.delay, {
122       source: this, destination: this.destination, scheduler: scheduler
123     }));
124   }
125
126   private scheduleNotification(notification: Notification<T>): void {
127     if (this.errored === true) {
128       return;
129     }
130
131     const scheduler = this.scheduler;
132     const message = new DelayMessage(scheduler.now() + this.delay, notification);
133     this.queue.push(message);
134
135     if (this.active === false) {
136       this._schedule(scheduler);
137     }
138   }
139
140   protected _next(value: T) {
141     this.scheduleNotification(Notification.createNext(value));
142   }
143
144   protected _error(err: any) {
145     this.errored = true;
146     this.queue = [];
147     this.destination.error(err);
148     this.unsubscribe();
149   }
150
151   protected _complete() {
152     this.scheduleNotification(Notification.createComplete());
153     this.unsubscribe();
154   }
155 }
156
157 class DelayMessage<T> {
158   constructor(public readonly time: number,
159               public readonly notification: Notification<T>) {
160   }
161 }