1 import { Observable } from '../Observable';
2 import { isArray } from '../util/isArray';
3 import { fromArray } from './fromArray';
4 import { Operator } from '../Operator';
5 import { Subscriber } from '../Subscriber';
6 import { Subscription } from '../Subscription';
7 import { TeardownLogic, ObservableInput } from '../types';
8 import { OuterSubscriber } from '../OuterSubscriber';
9 import { InnerSubscriber } from '../InnerSubscriber';
10 import { subscribeToResult } from '../util/subscribeToResult';
12 // tslint:disable:max-line-length
13 export function race<A>(arg: [ObservableInput<A>]): Observable<A>;
14 export function race<A, B>(arg: [ObservableInput<A>, ObservableInput<B>]): Observable<A | B>;
15 export function race<A, B, C>(arg: [ObservableInput<A>, ObservableInput<B>, ObservableInput<C>]): Observable<A | B | C>;
16 export function race<A, B, C, D>(arg: [ObservableInput<A>, ObservableInput<B>, ObservableInput<C>, ObservableInput<D>]): Observable<A | B | C | D>;
17 export function race<A, B, C, D, E>(arg: [ObservableInput<A>, ObservableInput<B>, ObservableInput<C>, ObservableInput<D>, ObservableInput<E>]): Observable<A | B | C | D | E>;
18 export function race<T>(arg: ObservableInput<T>[]): Observable<T>;
19 export function race(arg: ObservableInput<any>[]): Observable<{}>;
21 export function race<A>(a: ObservableInput<A>): Observable<A>;
22 export function race<A, B>(a: ObservableInput<A>, b: ObservableInput<B>): Observable<A | B>;
23 export function race<A, B, C>(a: ObservableInput<A>, b: ObservableInput<B>, c: ObservableInput<C>): Observable<A | B | C>;
24 export function race<A, B, C, D>(a: ObservableInput<A>, b: ObservableInput<B>, c: ObservableInput<C>, d: ObservableInput<D>): Observable<A | B | C | D>;
25 export function race<A, B, C, D, E>(a: ObservableInput<A>, b: ObservableInput<B>, c: ObservableInput<C>, d: ObservableInput<D>, e: ObservableInput<E>): Observable<A | B | C | D | E>;
26 // tslint:enable:max-line-length
28 export function race<T>(observables: ObservableInput<T>[]): Observable<T>;
29 export function race(observables: ObservableInput<any>[]): Observable<{}>;
30 export function race<T>(...observables: ObservableInput<T>[]): Observable<T>;
31 export function race(...observables: ObservableInput<any>[]): Observable<{}>;
34 * Returns an Observable that mirrors the first source Observable to emit an item.
37 * ### Subscribes to the observable that was the first to start emitting.
40 * import { race, interval } from 'rxjs';
41 * import { mapTo } from 'rxjs/operators';
43 * const obs1 = interval(1000).pipe(mapTo('fast one'));
44 * const obs2 = interval(3000).pipe(mapTo('medium one'));
45 * const obs3 = interval(5000).pipe(mapTo('slow one'));
47 * race(obs3, obs1, obs2)
49 * winner => console.log(winner)
53 * // a series of 'fast one'
56 * @param {...Observables} ...observables sources used to race for which Observable emits first.
57 * @return {Observable} an Observable that mirrors the output of the first Observable to emit an item.
62 export function race<T>(...observables: ObservableInput<any>[]): Observable<T> {
63 // if the only argument is an array, it was most likely called with
64 // `race([obs1, obs2, ...])`
65 if (observables.length === 1) {
66 if (isArray(observables[0])) {
67 observables = observables[0] as Observable<any>[];
69 return observables[0] as Observable<T>;
73 return fromArray(observables, undefined).lift(new RaceOperator<T>());
76 export class RaceOperator<T> implements Operator<T, T> {
77 call(subscriber: Subscriber<T>, source: any): TeardownLogic {
78 return source.subscribe(new RaceSubscriber(subscriber));
83 * We need this JSDoc comment for affecting ESDoc.
87 export class RaceSubscriber<T> extends OuterSubscriber<T, T> {
88 private hasFirst: boolean = false;
89 private observables: Observable<any>[] = [];
90 private subscriptions: Subscription[] = [];
92 constructor(destination: Subscriber<T>) {
96 protected _next(observable: any): void {
97 this.observables.push(observable);
100 protected _complete() {
101 const observables = this.observables;
102 const len = observables.length;
105 this.destination.complete!();
107 for (let i = 0; i < len && !this.hasFirst; i++) {
108 const observable = observables[i];
109 const subscription = subscribeToResult(this, observable, undefined, i)!;
111 if (this.subscriptions) {
112 this.subscriptions.push(subscription);
114 this.add(subscription);
116 this.observables = null!;
120 notifyNext(_outerValue: T, innerValue: T,
121 outerIndex: number): void {
122 if (!this.hasFirst) {
123 this.hasFirst = true;
125 for (let i = 0; i < this.subscriptions.length; i++) {
126 if (i !== outerIndex) {
127 let subscription = this.subscriptions[i];
129 subscription.unsubscribe();
130 this.remove(subscription);
134 this.subscriptions = null!;
137 this.destination.next!(innerValue);