1 import { Subject } from '../Subject';
2 import { Operator } from '../Operator';
3 import { Subscriber } from '../Subscriber';
4 import { Observable } from '../Observable';
5 import { ConnectableObservable, connectableObservableDescriptor } from '../observable/ConnectableObservable';
6 import { MonoTypeOperatorFunction, OperatorFunction, UnaryFunction, ObservedValueOf, ObservableInput } from '../types';
8 /* tslint:disable:max-line-length */
9 export function multicast<T>(subject: Subject<T>): UnaryFunction<Observable<T>, ConnectableObservable<T>>;
10 export function multicast<T, O extends ObservableInput<any>>(subject: Subject<T>, selector: (shared: Observable<T>) => O): UnaryFunction<Observable<T>, ConnectableObservable<ObservedValueOf<O>>>;
11 export function multicast<T>(subjectFactory: (this: Observable<T>) => Subject<T>): UnaryFunction<Observable<T>, ConnectableObservable<T>>;
12 export function multicast<T, O extends ObservableInput<any>>(SubjectFactory: (this: Observable<T>) => Subject<T>, selector: (shared: Observable<T>) => O): OperatorFunction<T, ObservedValueOf<O>>;
13 /* tslint:enable:max-line-length */
16 * Returns an Observable that emits the results of invoking a specified selector on items
17 * emitted by a ConnectableObservable that shares a single subscription to the underlying stream.
21 * @param {Function|Subject} subjectOrSubjectFactory - Factory function to create an intermediate subject through
22 * which the source sequence's elements will be multicast to the selector function
23 * or Subject to push source elements into.
24 * @param {Function} [selector] - Optional selector function that can use the multicasted source stream
25 * as many times as needed, without causing multiple subscriptions to the source stream.
26 * Subscribers to the given source will receive all notifications of the source from the
27 * time of the subscription forward.
28 * @return {Observable} An Observable that emits the results of invoking the selector
29 * on the items emitted by a `ConnectableObservable` that shares a single subscription to
30 * the underlying stream.
34 export function multicast<T, R>(subjectOrSubjectFactory: Subject<T> | (() => Subject<T>),
35 selector?: (source: Observable<T>) => Observable<R>): OperatorFunction<T, R> {
36 return function multicastOperatorFunction(source: Observable<T>): Observable<R> {
37 let subjectFactory: () => Subject<T>;
38 if (typeof subjectOrSubjectFactory === 'function') {
39 subjectFactory = <() => Subject<T>>subjectOrSubjectFactory;
41 subjectFactory = function subjectFactory() {
42 return <Subject<T>>subjectOrSubjectFactory;
46 if (typeof selector === 'function') {
47 return source.lift(new MulticastOperator(subjectFactory, selector));
50 const connectable: any = Object.create(source, connectableObservableDescriptor);
51 connectable.source = source;
52 connectable.subjectFactory = subjectFactory;
54 return <ConnectableObservable<R>> connectable;
58 export class MulticastOperator<T, R> implements Operator<T, R> {
59 constructor(private subjectFactory: () => Subject<T>,
60 private selector: (source: Observable<T>) => Observable<R>) {
62 call(subscriber: Subscriber<R>, source: any): any {
63 const { selector } = this;
64 const subject = this.subjectFactory();
65 const subscription = selector(subject).subscribe(subscriber);
66 subscription.add(source.subscribe(subject));