Giant blob of minor changes
[dotfiles/.git] / .config / coc / extensions / node_modules / coc-prettier / node_modules / rxjs / src / internal / observable / ConnectableObservable.ts
1 import { Subject, SubjectSubscriber } from '../Subject';
2 import { Operator } from '../Operator';
3 import { Observable } from '../Observable';
4 import { Subscriber } from '../Subscriber';
5 import { Subscription } from '../Subscription';
6 import { TeardownLogic } from '../types';
7 import { refCount as higherOrderRefCount } from '../operators/refCount';
8
9 /**
10  * @class ConnectableObservable<T>
11  */
12 export class ConnectableObservable<T> extends Observable<T> {
13
14   protected _subject: Subject<T>;
15   protected _refCount: number = 0;
16   protected _connection: Subscription;
17   /** @internal */
18   _isComplete = false;
19
20   constructor(public source: Observable<T>,
21               protected subjectFactory: () => Subject<T>) {
22     super();
23   }
24
25   /** @deprecated This is an internal implementation detail, do not use. */
26   _subscribe(subscriber: Subscriber<T>) {
27     return this.getSubject().subscribe(subscriber);
28   }
29
30   protected getSubject(): Subject<T> {
31     const subject = this._subject;
32     if (!subject || subject.isStopped) {
33       this._subject = this.subjectFactory();
34     }
35     return this._subject;
36   }
37
38   connect(): Subscription {
39     let connection = this._connection;
40     if (!connection) {
41       this._isComplete = false;
42       connection = this._connection = new Subscription();
43       connection.add(this.source
44         .subscribe(new ConnectableSubscriber(this.getSubject(), this)));
45       if (connection.closed) {
46         this._connection = null;
47         connection = Subscription.EMPTY;
48       }
49     }
50     return connection;
51   }
52
53   refCount(): Observable<T> {
54     return higherOrderRefCount()(this) as Observable<T>;
55   }
56 }
57
58 export const connectableObservableDescriptor: PropertyDescriptorMap = (() => {
59   const connectableProto = <any>ConnectableObservable.prototype;
60   return {
61     operator: { value: null as null },
62     _refCount: { value: 0, writable: true },
63     _subject: { value: null as null, writable: true },
64     _connection: { value: null as null, writable: true },
65     _subscribe: { value: connectableProto._subscribe },
66     _isComplete: { value: connectableProto._isComplete, writable: true },
67     getSubject: { value: connectableProto.getSubject },
68     connect: { value: connectableProto.connect },
69     refCount: { value: connectableProto.refCount }
70   };
71 })();
72
73 class ConnectableSubscriber<T> extends SubjectSubscriber<T> {
74   constructor(destination: Subject<T>,
75               private connectable: ConnectableObservable<T>) {
76     super(destination);
77   }
78   protected _error(err: any): void {
79     this._unsubscribe();
80     super._error(err);
81   }
82   protected _complete(): void {
83     this.connectable._isComplete = true;
84     this._unsubscribe();
85     super._complete();
86   }
87   protected _unsubscribe() {
88     const connectable = <any>this.connectable;
89     if (connectable) {
90       this.connectable = null;
91       const connection = connectable._connection;
92       connectable._refCount = 0;
93       connectable._subject = null;
94       connectable._connection = null;
95       if (connection) {
96         connection.unsubscribe();
97       }
98     }
99   }
100 }
101
102 class RefCountOperator<T> implements Operator<T, T> {
103   constructor(private connectable: ConnectableObservable<T>) {
104   }
105   call(subscriber: Subscriber<T>, source: any): TeardownLogic {
106
107     const { connectable } = this;
108     (<any> connectable)._refCount++;
109
110     const refCounter = new RefCountSubscriber(subscriber, connectable);
111     const subscription = source.subscribe(refCounter);
112
113     if (!refCounter.closed) {
114       (<any> refCounter).connection = connectable.connect();
115     }
116
117     return subscription;
118   }
119 }
120
121 class RefCountSubscriber<T> extends Subscriber<T> {
122
123   private connection: Subscription;
124
125   constructor(destination: Subscriber<T>,
126               private connectable: ConnectableObservable<T>) {
127     super(destination);
128   }
129
130   protected _unsubscribe() {
131
132     const { connectable } = this;
133     if (!connectable) {
134       this.connection = null;
135       return;
136     }
137
138     this.connectable = null;
139     const refCount = (<any> connectable)._refCount;
140     if (refCount <= 0) {
141       this.connection = null;
142       return;
143     }
144
145     (<any> connectable)._refCount = refCount - 1;
146     if (refCount > 1) {
147       this.connection = null;
148       return;
149     }
150
151     ///
152     // Compare the local RefCountSubscriber's connection Subscription to the
153     // connection Subscription on the shared ConnectableObservable. In cases
154     // where the ConnectableObservable source synchronously emits values, and
155     // the RefCountSubscriber's downstream Observers synchronously unsubscribe,
156     // execution continues to here before the RefCountOperator has a chance to
157     // supply the RefCountSubscriber with the shared connection Subscription.
158     // For example:
159     // ```
160     // range(0, 10).pipe(
161     //   publish(),
162     //   refCount(),
163     //   take(5),
164     // ).subscribe();
165     // ```
166     // In order to account for this case, RefCountSubscriber should only dispose
167     // the ConnectableObservable's shared connection Subscription if the
168     // connection Subscription exists, *and* either:
169     //   a. RefCountSubscriber doesn't have a reference to the shared connection
170     //      Subscription yet, or,
171     //   b. RefCountSubscriber's connection Subscription reference is identical
172     //      to the shared connection Subscription
173     ///
174     const { connection } = this;
175     const sharedConnection = (<any> connectable)._connection;
176     this.connection = null;
177
178     if (sharedConnection && (!connection || sharedConnection === connection)) {
179       sharedConnection.unsubscribe();
180     }
181   }
182 }