1 import { Subject, SubjectSubscriber } from '../Subject';
2 import { Operator } from '../Operator';
3 import { Observable } from '../Observable';
4 import { Subscriber } from '../Subscriber';
5 import { Subscription } from '../Subscription';
6 import { TeardownLogic } from '../types';
7 import { refCount as higherOrderRefCount } from '../operators/refCount';
10 * @class ConnectableObservable<T>
12 export class ConnectableObservable<T> extends Observable<T> {
14 protected _subject: Subject<T>;
15 protected _refCount: number = 0;
16 protected _connection: Subscription;
20 constructor(public source: Observable<T>,
21 protected subjectFactory: () => Subject<T>) {
25 /** @deprecated This is an internal implementation detail, do not use. */
26 _subscribe(subscriber: Subscriber<T>) {
27 return this.getSubject().subscribe(subscriber);
30 protected getSubject(): Subject<T> {
31 const subject = this._subject;
32 if (!subject || subject.isStopped) {
33 this._subject = this.subjectFactory();
38 connect(): Subscription {
39 let connection = this._connection;
41 this._isComplete = false;
42 connection = this._connection = new Subscription();
43 connection.add(this.source
44 .subscribe(new ConnectableSubscriber(this.getSubject(), this)));
45 if (connection.closed) {
46 this._connection = null;
47 connection = Subscription.EMPTY;
53 refCount(): Observable<T> {
54 return higherOrderRefCount()(this) as Observable<T>;
58 export const connectableObservableDescriptor: PropertyDescriptorMap = (() => {
59 const connectableProto = <any>ConnectableObservable.prototype;
61 operator: { value: null as null },
62 _refCount: { value: 0, writable: true },
63 _subject: { value: null as null, writable: true },
64 _connection: { value: null as null, writable: true },
65 _subscribe: { value: connectableProto._subscribe },
66 _isComplete: { value: connectableProto._isComplete, writable: true },
67 getSubject: { value: connectableProto.getSubject },
68 connect: { value: connectableProto.connect },
69 refCount: { value: connectableProto.refCount }
73 class ConnectableSubscriber<T> extends SubjectSubscriber<T> {
74 constructor(destination: Subject<T>,
75 private connectable: ConnectableObservable<T>) {
78 protected _error(err: any): void {
82 protected _complete(): void {
83 this.connectable._isComplete = true;
87 protected _unsubscribe() {
88 const connectable = <any>this.connectable;
90 this.connectable = null;
91 const connection = connectable._connection;
92 connectable._refCount = 0;
93 connectable._subject = null;
94 connectable._connection = null;
96 connection.unsubscribe();
102 class RefCountOperator<T> implements Operator<T, T> {
103 constructor(private connectable: ConnectableObservable<T>) {
105 call(subscriber: Subscriber<T>, source: any): TeardownLogic {
107 const { connectable } = this;
108 (<any> connectable)._refCount++;
110 const refCounter = new RefCountSubscriber(subscriber, connectable);
111 const subscription = source.subscribe(refCounter);
113 if (!refCounter.closed) {
114 (<any> refCounter).connection = connectable.connect();
121 class RefCountSubscriber<T> extends Subscriber<T> {
123 private connection: Subscription;
125 constructor(destination: Subscriber<T>,
126 private connectable: ConnectableObservable<T>) {
130 protected _unsubscribe() {
132 const { connectable } = this;
134 this.connection = null;
138 this.connectable = null;
139 const refCount = (<any> connectable)._refCount;
141 this.connection = null;
145 (<any> connectable)._refCount = refCount - 1;
147 this.connection = null;
152 // Compare the local RefCountSubscriber's connection Subscription to the
153 // connection Subscription on the shared ConnectableObservable. In cases
154 // where the ConnectableObservable source synchronously emits values, and
155 // the RefCountSubscriber's downstream Observers synchronously unsubscribe,
156 // execution continues to here before the RefCountOperator has a chance to
157 // supply the RefCountSubscriber with the shared connection Subscription.
160 // range(0, 10).pipe(
166 // In order to account for this case, RefCountSubscriber should only dispose
167 // the ConnectableObservable's shared connection Subscription if the
168 // connection Subscription exists, *and* either:
169 // a. RefCountSubscriber doesn't have a reference to the shared connection
170 // Subscription yet, or,
171 // b. RefCountSubscriber's connection Subscription reference is identical
172 // to the shared connection Subscription
174 const { connection } = this;
175 const sharedConnection = (<any> connectable)._connection;
176 this.connection = null;
178 if (sharedConnection && (!connection || sharedConnection === connection)) {
179 sharedConnection.unsubscribe();