--- /dev/null
+import { Subject, SubjectSubscriber } from '../Subject';
+import { Operator } from '../Operator';
+import { Observable } from '../Observable';
+import { Subscriber } from '../Subscriber';
+import { Subscription } from '../Subscription';
+import { TeardownLogic } from '../types';
+import { refCount as higherOrderRefCount } from '../operators/refCount';
+
+/**
+ * @class ConnectableObservable<T>
+ */
+export class ConnectableObservable<T> extends Observable<T> {
+
+ protected _subject: Subject<T>;
+ protected _refCount: number = 0;
+ protected _connection: Subscription;
+ /** @internal */
+ _isComplete = false;
+
+ constructor(public source: Observable<T>,
+ protected subjectFactory: () => Subject<T>) {
+ super();
+ }
+
+ /** @deprecated This is an internal implementation detail, do not use. */
+ _subscribe(subscriber: Subscriber<T>) {
+ return this.getSubject().subscribe(subscriber);
+ }
+
+ protected getSubject(): Subject<T> {
+ const subject = this._subject;
+ if (!subject || subject.isStopped) {
+ this._subject = this.subjectFactory();
+ }
+ return this._subject;
+ }
+
+ connect(): Subscription {
+ let connection = this._connection;
+ if (!connection) {
+ this._isComplete = false;
+ connection = this._connection = new Subscription();
+ connection.add(this.source
+ .subscribe(new ConnectableSubscriber(this.getSubject(), this)));
+ if (connection.closed) {
+ this._connection = null;
+ connection = Subscription.EMPTY;
+ }
+ }
+ return connection;
+ }
+
+ refCount(): Observable<T> {
+ return higherOrderRefCount()(this) as Observable<T>;
+ }
+}
+
+export const connectableObservableDescriptor: PropertyDescriptorMap = (() => {
+ const connectableProto = <any>ConnectableObservable.prototype;
+ return {
+ operator: { value: null as null },
+ _refCount: { value: 0, writable: true },
+ _subject: { value: null as null, writable: true },
+ _connection: { value: null as null, writable: true },
+ _subscribe: { value: connectableProto._subscribe },
+ _isComplete: { value: connectableProto._isComplete, writable: true },
+ getSubject: { value: connectableProto.getSubject },
+ connect: { value: connectableProto.connect },
+ refCount: { value: connectableProto.refCount }
+ };
+})();
+
+class ConnectableSubscriber<T> extends SubjectSubscriber<T> {
+ constructor(destination: Subject<T>,
+ private connectable: ConnectableObservable<T>) {
+ super(destination);
+ }
+ protected _error(err: any): void {
+ this._unsubscribe();
+ super._error(err);
+ }
+ protected _complete(): void {
+ this.connectable._isComplete = true;
+ this._unsubscribe();
+ super._complete();
+ }
+ protected _unsubscribe() {
+ const connectable = <any>this.connectable;
+ if (connectable) {
+ this.connectable = null;
+ const connection = connectable._connection;
+ connectable._refCount = 0;
+ connectable._subject = null;
+ connectable._connection = null;
+ if (connection) {
+ connection.unsubscribe();
+ }
+ }
+ }
+}
+
+class RefCountOperator<T> implements Operator<T, T> {
+ constructor(private connectable: ConnectableObservable<T>) {
+ }
+ call(subscriber: Subscriber<T>, source: any): TeardownLogic {
+
+ const { connectable } = this;
+ (<any> connectable)._refCount++;
+
+ const refCounter = new RefCountSubscriber(subscriber, connectable);
+ const subscription = source.subscribe(refCounter);
+
+ if (!refCounter.closed) {
+ (<any> refCounter).connection = connectable.connect();
+ }
+
+ return subscription;
+ }
+}
+
+class RefCountSubscriber<T> extends Subscriber<T> {
+
+ private connection: Subscription;
+
+ constructor(destination: Subscriber<T>,
+ private connectable: ConnectableObservable<T>) {
+ super(destination);
+ }
+
+ protected _unsubscribe() {
+
+ const { connectable } = this;
+ if (!connectable) {
+ this.connection = null;
+ return;
+ }
+
+ this.connectable = null;
+ const refCount = (<any> connectable)._refCount;
+ if (refCount <= 0) {
+ this.connection = null;
+ return;
+ }
+
+ (<any> connectable)._refCount = refCount - 1;
+ if (refCount > 1) {
+ this.connection = null;
+ return;
+ }
+
+ ///
+ // Compare the local RefCountSubscriber's connection Subscription to the
+ // connection Subscription on the shared ConnectableObservable. In cases
+ // where the ConnectableObservable source synchronously emits values, and
+ // the RefCountSubscriber's downstream Observers synchronously unsubscribe,
+ // execution continues to here before the RefCountOperator has a chance to
+ // supply the RefCountSubscriber with the shared connection Subscription.
+ // For example:
+ // ```
+ // range(0, 10).pipe(
+ // publish(),
+ // refCount(),
+ // take(5),
+ // ).subscribe();
+ // ```
+ // In order to account for this case, RefCountSubscriber should only dispose
+ // the ConnectableObservable's shared connection Subscription if the
+ // connection Subscription exists, *and* either:
+ // a. RefCountSubscriber doesn't have a reference to the shared connection
+ // Subscription yet, or,
+ // b. RefCountSubscriber's connection Subscription reference is identical
+ // to the shared connection Subscription
+ ///
+ const { connection } = this;
+ const sharedConnection = (<any> connectable)._connection;
+ this.connection = null;
+
+ if (sharedConnection && (!connection || sharedConnection === connection)) {
+ sharedConnection.unsubscribe();
+ }
+ }
+}