Actualizacion maquina principal
[dotfiles/.git] / .config / coc / extensions / node_modules / coc-prettier / node_modules / rxjs / src / internal / observable / ConnectableObservable.ts
diff --git a/.config/coc/extensions/node_modules/coc-prettier/node_modules/rxjs/src/internal/observable/ConnectableObservable.ts b/.config/coc/extensions/node_modules/coc-prettier/node_modules/rxjs/src/internal/observable/ConnectableObservable.ts
new file mode 100644 (file)
index 0000000..4e7ffbf
--- /dev/null
@@ -0,0 +1,182 @@
+import { Subject, SubjectSubscriber } from '../Subject';
+import { Operator } from '../Operator';
+import { Observable } from '../Observable';
+import { Subscriber } from '../Subscriber';
+import { Subscription } from '../Subscription';
+import { TeardownLogic } from '../types';
+import { refCount as higherOrderRefCount } from '../operators/refCount';
+
+/**
+ * @class ConnectableObservable<T>
+ */
+export class ConnectableObservable<T> extends Observable<T> {
+
+  protected _subject: Subject<T>;
+  protected _refCount: number = 0;
+  protected _connection: Subscription;
+  /** @internal */
+  _isComplete = false;
+
+  constructor(public source: Observable<T>,
+              protected subjectFactory: () => Subject<T>) {
+    super();
+  }
+
+  /** @deprecated This is an internal implementation detail, do not use. */
+  _subscribe(subscriber: Subscriber<T>) {
+    return this.getSubject().subscribe(subscriber);
+  }
+
+  protected getSubject(): Subject<T> {
+    const subject = this._subject;
+    if (!subject || subject.isStopped) {
+      this._subject = this.subjectFactory();
+    }
+    return this._subject;
+  }
+
+  connect(): Subscription {
+    let connection = this._connection;
+    if (!connection) {
+      this._isComplete = false;
+      connection = this._connection = new Subscription();
+      connection.add(this.source
+        .subscribe(new ConnectableSubscriber(this.getSubject(), this)));
+      if (connection.closed) {
+        this._connection = null;
+        connection = Subscription.EMPTY;
+      }
+    }
+    return connection;
+  }
+
+  refCount(): Observable<T> {
+    return higherOrderRefCount()(this) as Observable<T>;
+  }
+}
+
+export const connectableObservableDescriptor: PropertyDescriptorMap = (() => {
+  const connectableProto = <any>ConnectableObservable.prototype;
+  return {
+    operator: { value: null as null },
+    _refCount: { value: 0, writable: true },
+    _subject: { value: null as null, writable: true },
+    _connection: { value: null as null, writable: true },
+    _subscribe: { value: connectableProto._subscribe },
+    _isComplete: { value: connectableProto._isComplete, writable: true },
+    getSubject: { value: connectableProto.getSubject },
+    connect: { value: connectableProto.connect },
+    refCount: { value: connectableProto.refCount }
+  };
+})();
+
+class ConnectableSubscriber<T> extends SubjectSubscriber<T> {
+  constructor(destination: Subject<T>,
+              private connectable: ConnectableObservable<T>) {
+    super(destination);
+  }
+  protected _error(err: any): void {
+    this._unsubscribe();
+    super._error(err);
+  }
+  protected _complete(): void {
+    this.connectable._isComplete = true;
+    this._unsubscribe();
+    super._complete();
+  }
+  protected _unsubscribe() {
+    const connectable = <any>this.connectable;
+    if (connectable) {
+      this.connectable = null;
+      const connection = connectable._connection;
+      connectable._refCount = 0;
+      connectable._subject = null;
+      connectable._connection = null;
+      if (connection) {
+        connection.unsubscribe();
+      }
+    }
+  }
+}
+
+class RefCountOperator<T> implements Operator<T, T> {
+  constructor(private connectable: ConnectableObservable<T>) {
+  }
+  call(subscriber: Subscriber<T>, source: any): TeardownLogic {
+
+    const { connectable } = this;
+    (<any> connectable)._refCount++;
+
+    const refCounter = new RefCountSubscriber(subscriber, connectable);
+    const subscription = source.subscribe(refCounter);
+
+    if (!refCounter.closed) {
+      (<any> refCounter).connection = connectable.connect();
+    }
+
+    return subscription;
+  }
+}
+
+class RefCountSubscriber<T> extends Subscriber<T> {
+
+  private connection: Subscription;
+
+  constructor(destination: Subscriber<T>,
+              private connectable: ConnectableObservable<T>) {
+    super(destination);
+  }
+
+  protected _unsubscribe() {
+
+    const { connectable } = this;
+    if (!connectable) {
+      this.connection = null;
+      return;
+    }
+
+    this.connectable = null;
+    const refCount = (<any> connectable)._refCount;
+    if (refCount <= 0) {
+      this.connection = null;
+      return;
+    }
+
+    (<any> connectable)._refCount = refCount - 1;
+    if (refCount > 1) {
+      this.connection = null;
+      return;
+    }
+
+    ///
+    // Compare the local RefCountSubscriber's connection Subscription to the
+    // connection Subscription on the shared ConnectableObservable. In cases
+    // where the ConnectableObservable source synchronously emits values, and
+    // the RefCountSubscriber's downstream Observers synchronously unsubscribe,
+    // execution continues to here before the RefCountOperator has a chance to
+    // supply the RefCountSubscriber with the shared connection Subscription.
+    // For example:
+    // ```
+    // range(0, 10).pipe(
+    //   publish(),
+    //   refCount(),
+    //   take(5),
+    // ).subscribe();
+    // ```
+    // In order to account for this case, RefCountSubscriber should only dispose
+    // the ConnectableObservable's shared connection Subscription if the
+    // connection Subscription exists, *and* either:
+    //   a. RefCountSubscriber doesn't have a reference to the shared connection
+    //      Subscription yet, or,
+    //   b. RefCountSubscriber's connection Subscription reference is identical
+    //      to the shared connection Subscription
+    ///
+    const { connection } = this;
+    const sharedConnection = (<any> connectable)._connection;
+    this.connection = null;
+
+    if (sharedConnection && (!connection || sharedConnection === connection)) {
+      sharedConnection.unsubscribe();
+    }
+  }
+}