1 import { Observable } from '../Observable';
2 import { ObservableInput, SchedulerLike, ObservedValueOf } from '../types';
3 import { isScheduler } from '../util/isScheduler';
4 import { isArray } from '../util/isArray';
5 import { Subscriber } from '../Subscriber';
6 import { OuterSubscriber } from '../OuterSubscriber';
7 import { Operator } from '../Operator';
8 import { InnerSubscriber } from '../InnerSubscriber';
9 import { subscribeToResult } from '../util/subscribeToResult';
10 import { fromArray } from './fromArray';
14 /* tslint:disable:max-line-length */
16 // If called with a single array, it "auto-spreads" the array, with result selector
17 /** @deprecated resultSelector no longer supported, pipe to map instead */
18 export function combineLatest<O1 extends ObservableInput<any>, R>(sources: [O1], resultSelector: (v1: ObservedValueOf<O1>) => R, scheduler?: SchedulerLike): Observable<R>;
19 /** @deprecated resultSelector no longer supported, pipe to map instead */
20 export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, R>(sources: [O1, O2], resultSelector: (v1: ObservedValueOf<O1>, v2: ObservedValueOf<O2>) => R, scheduler?: SchedulerLike): Observable<R>;
21 /** @deprecated resultSelector no longer supported, pipe to map instead */
22 export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, R>(sources: [O1, O2, O3], resultSelector: (v1: ObservedValueOf<O1>, v2: ObservedValueOf<O2>, v3: ObservedValueOf<O3>) => R, scheduler?: SchedulerLike): Observable<R>;
23 /** @deprecated resultSelector no longer supported, pipe to map instead */
24 export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, R>(sources: [O1, O2, O3, O4], resultSelector: (v1: ObservedValueOf<O1>, v2: ObservedValueOf<O2>, v3: ObservedValueOf<O3>, v4: ObservedValueOf<O4>) => R, scheduler?: SchedulerLike): Observable<R>;
25 /** @deprecated resultSelector no longer supported, pipe to map instead */
26 export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>, R>(sources: [O1, O2, O3, O4, O5], resultSelector: (v1: ObservedValueOf<O1>, v2: ObservedValueOf<O2>, v3: ObservedValueOf<O3>, v4: ObservedValueOf<O4>, v5: ObservedValueOf<O5>) => R, scheduler?: SchedulerLike): Observable<R>;
27 /** @deprecated resultSelector no longer supported, pipe to map instead */
28 export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>, O6 extends ObservableInput<any>, R>(sources: [O1, O2, O3, O4, O5, O6], resultSelector: (v1: ObservedValueOf<O1>, v2: ObservedValueOf<O2>, v3: ObservedValueOf<O3>, v4: ObservedValueOf<O4>, v5: ObservedValueOf<O5>, v6: ObservedValueOf<O6>) => R, scheduler?: SchedulerLike): Observable<R>;
29 /** @deprecated resultSelector no longer supported, pipe to map instead */
30 export function combineLatest<O extends ObservableInput<any>, R>(sources: O[], resultSelector: (...args: ObservedValueOf<O>[]) => R, scheduler?: SchedulerLike): Observable<R>;
32 // standard call, but with a result selector
33 /** @deprecated resultSelector no longer supported, pipe to map instead */
34 export function combineLatest<O1 extends ObservableInput<any>, R>(v1: O1, resultSelector: (v1: ObservedValueOf<O1>) => R, scheduler?: SchedulerLike): Observable<R>;
35 /** @deprecated resultSelector no longer supported, pipe to map instead */
36 export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, R>(v1: O1, v2: O2, resultSelector: (v1: ObservedValueOf<O1>, v2: ObservedValueOf<O2>) => R, scheduler?: SchedulerLike): Observable<R>;
37 /** @deprecated resultSelector no longer supported, pipe to map instead */
38 export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, R>(v1: O1, v2: O2, v3: O3, resultSelector: (v1: ObservedValueOf<O1>, v2: ObservedValueOf<O2>, v3: ObservedValueOf<O3>) => R, scheduler?: SchedulerLike): Observable<R>;
39 /** @deprecated resultSelector no longer supported, pipe to map instead */
40 export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, R>(v1: O1, v2: O2, v3: O3, v4: O4, resultSelector: (v1: ObservedValueOf<O1>, v2: ObservedValueOf<O2>, v3: ObservedValueOf<O3>, v4: ObservedValueOf<O4>) => R, scheduler?: SchedulerLike): Observable<R>;
41 /** @deprecated resultSelector no longer supported, pipe to map instead */
42 export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>, R>(v1: O1, v2: O2, v3: O3, v4: O4, v5: O5, resultSelector: (v1: ObservedValueOf<O1>, v2: ObservedValueOf<O2>, v3: ObservedValueOf<O3>, v4: ObservedValueOf<O4>, v5: ObservedValueOf<O5>) => R, scheduler?: SchedulerLike): Observable<R>;
43 /** @deprecated resultSelector no longer supported, pipe to map instead */
44 export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>, O6 extends ObservableInput<any>, R>(v1: O1, v2: O2, v3: O3, v4: O4, v5: O5, v6: O6, resultSelector: (v1: ObservedValueOf<O1>, v2: ObservedValueOf<O2>, v3: ObservedValueOf<O3>, v4: ObservedValueOf<O4>, v5: ObservedValueOf<O5>, v6: ObservedValueOf<O6>) => R, scheduler?: SchedulerLike): Observable<R>;
46 // With a scheduler (deprecated)
47 /** @deprecated Passing a scheduler here is deprecated, use {@link subscribeOn} and/or {@link observeOn} instead */
48 export function combineLatest<O1 extends ObservableInput<any>>(sources: [O1], scheduler: SchedulerLike): Observable<[ObservedValueOf<O1>]>;
49 /** @deprecated Passing a scheduler here is deprecated, use {@link subscribeOn} and/or {@link observeOn} instead */
50 export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>>(sources: [O1, O2], scheduler: SchedulerLike): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>]>;
51 /** @deprecated Passing a scheduler here is deprecated, use {@link subscribeOn} and/or {@link observeOn} instead */
52 export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>>(sources: [O1, O2, O3], scheduler: SchedulerLike): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>]>;
53 /** @deprecated Passing a scheduler here is deprecated, use {@link subscribeOn} and/or {@link observeOn} instead */
54 export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>>(sources: [O1, O2, O3, O4], scheduler: SchedulerLike): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>]>;
55 /** @deprecated Passing a scheduler here is deprecated, use {@link subscribeOn} and/or {@link observeOn} instead */
56 export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>>(sources: [O1, O2, O3, O4, O5], scheduler: SchedulerLike): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>, ObservedValueOf<O5>]>;
57 /** @deprecated Passing a scheduler here is deprecated, use {@link subscribeOn} and/or {@link observeOn} instead */
58 export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>, O6 extends ObservableInput<any>>(sources: [O1, O2, O3, O4, O5, O6], scheduler: SchedulerLike): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>, ObservedValueOf<O5>, ObservedValueOf<O6>]>;
59 /** @deprecated Passing a scheduler here is deprecated, use {@link subscribeOn} and/or {@link observeOn} instead */
60 export function combineLatest<O extends ObservableInput<any>>(sources: O[], scheduler: SchedulerLike): Observable<ObservedValueOf<O>[]>;
63 export function combineLatest<O1 extends ObservableInput<any>>(sources: [O1]): Observable<[ObservedValueOf<O1>]>;
64 export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>>(sources: [O1, O2]): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>]>;
65 export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>>(sources: [O1, O2, O3]): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>]>;
66 export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>>(sources: [O1, O2, O3, O4]): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>]>;
67 export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>>(sources: [O1, O2, O3, O4, O5]): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>, ObservedValueOf<O5>]>;
68 export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>, O6 extends ObservableInput<any>>(sources: [O1, O2, O3, O4, O5, O6]): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>, ObservedValueOf<O5>, ObservedValueOf<O6>]>;
69 export function combineLatest<O extends ObservableInput<any>>(sources: O[]): Observable<ObservedValueOf<O>[]>;
72 /** @deprecated Pass arguments in a single array instead `combineLatest([a, b, c])` */
73 export function combineLatest<O1 extends ObservableInput<any>>(v1: O1, scheduler?: SchedulerLike): Observable<[ObservedValueOf<O1>]>;
74 /** @deprecated Pass arguments in a single array instead `combineLatest([a, b, c])` */
75 export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>>(v1: O1, v2: O2, scheduler?: SchedulerLike): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>]>;
76 /** @deprecated Pass arguments in a single array instead `combineLatest([a, b, c])` */
77 export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>>(v1: O1, v2: O2, v3: O3, scheduler?: SchedulerLike): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>]>;
78 /** @deprecated Pass arguments in a single array instead `combineLatest([a, b, c])` */
79 export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>>(v1: O1, v2: O2, v3: O3, v4: O4, scheduler?: SchedulerLike): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>]>;
80 /** @deprecated Pass arguments in a single array instead `combineLatest([a, b, c])` */
81 export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>>(v1: O1, v2: O2, v3: O3, v4: O4, v5: O5, scheduler?: SchedulerLike): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>, ObservedValueOf<O5>]>;
82 /** @deprecated Pass arguments in a single array instead `combineLatest([a, b, c])` */
83 export function combineLatest<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>, O6 extends ObservableInput<any>>(v1: O1, v2: O2, v3: O3, v4: O4, v5: O5, v6: O6, scheduler?: SchedulerLike): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>, ObservedValueOf<O5>, ObservedValueOf<O6>]>;
85 /** @deprecated Pass arguments in a single array instead `combineLatest([a, b, c])` */
86 export function combineLatest<O extends ObservableInput<any>>(...observables: O[]): Observable<any[]>;
88 /** @deprecated Pass arguments in a single array instead `combineLatest([a, b, c])` */
89 export function combineLatest<O extends ObservableInput<any>, R>(...observables: Array<ObservableInput<any> | ((...values: Array<any>) => R)>): Observable<R>;
91 /** @deprecated resultSelector no longer supported, pipe to map instead */
92 export function combineLatest<O extends ObservableInput<any>, R>(array: O[], resultSelector: (...values: ObservedValueOf<O>[]) => R, scheduler?: SchedulerLike): Observable<R>;
94 /** @deprecated Passing a scheduler here is deprecated, use {@link subscribeOn} and/or {@link observeOn} instead */
95 export function combineLatest<O extends ObservableInput<any>>(...observables: Array<O | SchedulerLike>): Observable<any[]>;
97 /** @deprecated Passing a scheduler here is deprecated, use {@link subscribeOn} and/or {@link observeOn} instead */
98 export function combineLatest<O extends ObservableInput<any>, R>(...observables: Array<O | ((...values: ObservedValueOf<O>[]) => R) | SchedulerLike>): Observable<R>;
100 /** @deprecated Passing a scheduler here is deprecated, use {@link subscribeOn} and/or {@link observeOn} instead */
101 export function combineLatest<R>(...observables: Array<ObservableInput<any> | ((...values: Array<any>) => R) | SchedulerLike>): Observable<R>;
102 /* tslint:enable:max-line-length */
105 * Combines multiple Observables to create an Observable whose values are
106 * calculated from the latest values of each of its input Observables.
108 * <span class="informal">Whenever any input Observable emits a value, it
109 * computes a formula using the latest values from all the inputs, then emits
110 * the output of that formula.</span>
112 * ![](combineLatest.png)
114 * `combineLatest` combines the values from all the Observables passed as
115 * arguments. This is done by subscribing to each Observable in order and,
116 * whenever any Observable emits, collecting an array of the most recent
117 * values from each Observable. So if you pass `n` Observables to operator,
118 * returned Observable will always emit an array of `n` values, in order
119 * corresponding to order of passed Observables (value from the first Observable
120 * on the first place and so on).
122 * Static version of `combineLatest` accepts either an array of Observables
123 * or each Observable can be put directly as an argument. Note that array of
124 * Observables is good choice, if you don't know beforehand how many Observables
125 * you will combine. Passing empty array will result in Observable that
126 * completes immediately.
128 * To ensure output array has always the same length, `combineLatest` will
129 * actually wait for all input Observables to emit at least once,
130 * before it starts emitting results. This means if some Observable emits
131 * values before other Observables started emitting, all these values but the last
132 * will be lost. On the other hand, if some Observable does not emit a value but
133 * completes, resulting Observable will complete at the same moment without
134 * emitting anything, since it will be now impossible to include value from
135 * completed Observable in resulting array. Also, if some input Observable does
136 * not emit any value and never completes, `combineLatest` will also never emit
137 * and never complete, since, again, it will wait for all streams to emit some
140 * If at least one Observable was passed to `combineLatest` and all passed Observables
141 * emitted something, resulting Observable will complete when all combined
142 * streams complete. So even if some Observable completes, result of
143 * `combineLatest` will still emit values when other Observables do. In case
144 * of completed Observable, its value from now on will always be the last
145 * emitted value. On the other hand, if any Observable errors, `combineLatest`
146 * will error immediately as well, and all other Observables will be unsubscribed.
148 * `combineLatest` accepts as optional parameter `project` function, which takes
149 * as arguments all values that would normally be emitted by resulting Observable.
150 * `project` can return any kind of value, which will be then emitted by Observable
151 * instead of default array. Note that `project` does not take as argument that array
152 * of values, but values themselves. That means default `project` can be imagined
153 * as function that takes all its arguments and puts them into an array.
156 * ### Combine two timer Observables
158 * import { combineLatest, timer } from 'rxjs';
160 * const firstTimer = timer(0, 1000); // emit 0, 1, 2... after every second, starting from now
161 * const secondTimer = timer(500, 1000); // emit 0, 1, 2... after every second, starting 0,5s from now
162 * const combinedTimers = combineLatest(firstTimer, secondTimer);
163 * combinedTimers.subscribe(value => console.log(value));
165 * // [0, 0] after 0.5s
167 * // [1, 1] after 1.5s
171 * ### Combine an array of Observables
173 * import { combineLatest, of } from 'rxjs';
174 * import { delay, starWith } from 'rxjs/operators';
176 * const observables = [1, 5, 10].map(
178 * delay(n * 1000), // emit 0 and then emit n after n seconds
182 * const combined = combineLatest(observables);
183 * combined.subscribe(value => console.log(value));
185 * // [0, 0, 0] immediately
186 * // [1, 0, 0] after 1s
187 * // [1, 5, 0] after 5s
188 * // [1, 5, 10] after 10s
192 * ### Use project function to dynamically calculate the Body-Mass Index
194 * import { combineLatest, of } from 'rxjs';
195 * import { map } from 'rxjs/operators';
197 * const weight = of(70, 72, 76, 79, 75);
198 * const height = of(1.76, 1.77, 1.78);
199 * const bmi = combineLatest(weight, height).pipe(
200 * map(([w, h]) => w / (h * h)),
202 * bmi.subscribe(x => console.log('BMI is ' + x));
204 * // With output to console:
205 * // BMI is 24.212293388429753
206 * // BMI is 23.93948099205209
207 * // BMI is 23.671253629592222
210 * @see {@link combineAll}
212 * @see {@link withLatestFrom}
214 * @param {ObservableInput} observable1 An input Observable to combine with other Observables.
215 * @param {ObservableInput} observable2 An input Observable to combine with other Observables.
216 * More than one input Observables may be given as arguments
217 * or an array of Observables may be given as the first argument.
218 * @param {function} [project] An optional function to project the values from
219 * the combined latest values into a new value on the output Observable.
220 * @param {SchedulerLike} [scheduler=null] The {@link SchedulerLike} to use for subscribing to
221 * each input Observable.
222 * @return {Observable} An Observable of projected values from the most recent
223 * values from each input Observable, or an array of the most recent values from
224 * each input Observable.
226 export function combineLatest<O extends ObservableInput<any>, R>(
227 ...observables: (O | ((...values: ObservedValueOf<O>[]) => R) | SchedulerLike)[]
229 let resultSelector: (...values: Array<any>) => R = null;
230 let scheduler: SchedulerLike = null;
232 if (isScheduler(observables[observables.length - 1])) {
233 scheduler = observables.pop() as SchedulerLike;
236 if (typeof observables[observables.length - 1] === 'function') {
237 resultSelector = observables.pop() as (...values: Array<any>) => R;
240 // if the first and only other argument besides the resultSelector is an array
241 // assume it's been called with `combineLatest([obs1, obs2, obs3], resultSelector)`
242 if (observables.length === 1 && isArray(observables[0])) {
243 observables = observables[0] as any;
246 return fromArray(observables, scheduler).lift(new CombineLatestOperator<ObservedValueOf<O>, R>(resultSelector));
249 export class CombineLatestOperator<T, R> implements Operator<T, R> {
250 constructor(private resultSelector?: (...values: Array<any>) => R) {
253 call(subscriber: Subscriber<R>, source: any): any {
254 return source.subscribe(new CombineLatestSubscriber(subscriber, this.resultSelector));
259 * We need this JSDoc comment for affecting ESDoc.
263 export class CombineLatestSubscriber<T, R> extends OuterSubscriber<T, R> {
264 private active: number = 0;
265 private values: any[] = [];
266 private observables: any[] = [];
267 private toRespond: number;
269 constructor(destination: Subscriber<R>, private resultSelector?: (...values: Array<any>) => R) {
273 protected _next(observable: any) {
274 this.values.push(NONE);
275 this.observables.push(observable);
278 protected _complete() {
279 const observables = this.observables;
280 const len = observables.length;
282 this.destination.complete();
285 this.toRespond = len;
286 for (let i = 0; i < len; i++) {
287 const observable = observables[i];
288 this.add(subscribeToResult(this, observable, observable, i));
293 notifyComplete(unused: Subscriber<R>): void {
294 if ((this.active -= 1) === 0) {
295 this.destination.complete();
299 notifyNext(outerValue: T, innerValue: R,
300 outerIndex: number, innerIndex: number,
301 innerSub: InnerSubscriber<T, R>): void {
302 const values = this.values;
303 const oldVal = values[outerIndex];
304 const toRespond = !this.toRespond
306 : oldVal === NONE ? --this.toRespond : this.toRespond;
307 values[outerIndex] = innerValue;
309 if (toRespond === 0) {
310 if (this.resultSelector) {
311 this._tryResultSelector(values);
313 this.destination.next(values.slice());
318 private _tryResultSelector(values: any[]) {
321 result = this.resultSelector.apply(this, values);
323 this.destination.error(err);
326 this.destination.next(result);