1 import { SubjectSubscriber } from '../Subject';
2 import { Observable } from '../Observable';
3 import { Subscriber } from '../Subscriber';
4 import { Subscription } from '../Subscription';
5 import { refCount as higherOrderRefCount } from '../operators/refCount';
6 export class ConnectableObservable extends Observable {
7 constructor(source, subjectFactory) {
10 this.subjectFactory = subjectFactory;
12 this._isComplete = false;
14 _subscribe(subscriber) {
15 return this.getSubject().subscribe(subscriber);
18 const subject = this._subject;
19 if (!subject || subject.isStopped) {
20 this._subject = this.subjectFactory();
25 let connection = this._connection;
27 this._isComplete = false;
28 connection = this._connection = new Subscription();
29 connection.add(this.source
30 .subscribe(new ConnectableSubscriber(this.getSubject(), this)));
31 if (connection.closed) {
32 this._connection = null;
33 connection = Subscription.EMPTY;
39 return higherOrderRefCount()(this);
42 export const connectableObservableDescriptor = (() => {
43 const connectableProto = ConnectableObservable.prototype;
45 operator: { value: null },
46 _refCount: { value: 0, writable: true },
47 _subject: { value: null, writable: true },
48 _connection: { value: null, writable: true },
49 _subscribe: { value: connectableProto._subscribe },
50 _isComplete: { value: connectableProto._isComplete, writable: true },
51 getSubject: { value: connectableProto.getSubject },
52 connect: { value: connectableProto.connect },
53 refCount: { value: connectableProto.refCount }
56 class ConnectableSubscriber extends SubjectSubscriber {
57 constructor(destination, connectable) {
59 this.connectable = connectable;
66 this.connectable._isComplete = true;
71 const connectable = this.connectable;
73 this.connectable = null;
74 const connection = connectable._connection;
75 connectable._refCount = 0;
76 connectable._subject = null;
77 connectable._connection = null;
79 connection.unsubscribe();
84 class RefCountOperator {
85 constructor(connectable) {
86 this.connectable = connectable;
88 call(subscriber, source) {
89 const { connectable } = this;
90 connectable._refCount++;
91 const refCounter = new RefCountSubscriber(subscriber, connectable);
92 const subscription = source.subscribe(refCounter);
93 if (!refCounter.closed) {
94 refCounter.connection = connectable.connect();
99 class RefCountSubscriber extends Subscriber {
100 constructor(destination, connectable) {
102 this.connectable = connectable;
105 const { connectable } = this;
107 this.connection = null;
110 this.connectable = null;
111 const refCount = connectable._refCount;
113 this.connection = null;
116 connectable._refCount = refCount - 1;
118 this.connection = null;
121 const { connection } = this;
122 const sharedConnection = connectable._connection;
123 this.connection = null;
124 if (sharedConnection && (!connection || sharedConnection === connection)) {
125 sharedConnection.unsubscribe();
129 //# sourceMappingURL=ConnectableObservable.js.map