Actualizacion maquina principal
[dotfiles/.git] / .config / coc / extensions / node_modules / coc-prettier / node_modules / rxjs / src / internal / observable / dom / WebSocketSubject.ts
1 import { Subject, AnonymousSubject } from '../../Subject';
2 import { Subscriber } from '../../Subscriber';
3 import { Observable } from '../../Observable';
4 import { Subscription } from '../../Subscription';
5 import { Operator } from '../../Operator';
6 import { ReplaySubject } from '../../ReplaySubject';
7 import { Observer, NextObserver } from '../../types';
8
9 /**
10  * WebSocketSubjectConfig is a plain Object that allows us to make our
11  * webSocket configurable.
12  *
13  * <span class="informal">Provides flexibility to {@link webSocket}</span>
14  *
15  * It defines a set of properties to provide custom behavior in specific
16  * moments of the socket's lifecycle. When the connection opens we can
17  * use `openObserver`, when the connection is closed `closeObserver`, if we
18  * are interested in listening for data comming from server: `deserializer`,
19  * which allows us to customize the deserialization strategy of data before passing it
20  * to the socket client. By default `deserializer` is going to apply `JSON.parse` to each message comming
21  * from the Server.
22  *
23  * ## Example
24  * **deserializer**, the default for this property is `JSON.parse` but since there are just two options
25  * for incomming data, either be text or binarydata. We can apply a custom deserialization strategy
26  * or just simply skip the default behaviour.
27  * ```ts
28  * import { webSocket } from 'rxjs/webSocket';
29  *
30  * const wsSubject = webSocket({
31  *     url: 'ws://localhost:8081',
32  * //Apply any transformation of your choice.
33  *     deserializer: ({data}) => data
34  * });
35  *
36  * wsSubject.subscribe(console.log);
37  *
38  * // Let's suppose we have this on the Server: ws.send("This is a msg from the server")
39  * //output
40  * //
41  * // This is a msg from the server
42  * ```
43  *
44  * **serializer** allows us tom apply custom serialization strategy but for the outgoing messages
45  * ```ts
46  * import { webSocket } from 'rxjs/webSocket';
47  *
48  * const wsSubject = webSocket({
49  *     url: 'ws://localhost:8081',
50  * //Apply any transformation of your choice.
51  *     serializer: msg => JSON.stringify({channel: "webDevelopment", msg: msg})
52  * });
53  *
54  * wsSubject.subscribe(() => subject.next("msg to the server"));
55  *
56  * // Let's suppose we have this on the Server: ws.send("This is a msg from the server")
57  * //output
58  * //
59  * // {"channel":"webDevelopment","msg":"msg to the server"}
60  * ```
61  *
62  * **closeObserver** allows us to set a custom error when an error raise up.
63  * ```ts
64  * import { webSocket } from 'rxjs/webSocket';
65  *
66  * const wsSubject = webSocket({
67  *     url: 'ws://localhost:8081',
68  *     closeObserver: {
69         next(closeEvent) {
70             const customError = { code: 6666, reason: "Custom evil reason" }
71             console.log(`code: ${customError.code}, reason: ${customError.reason}`);
72         }
73     }
74  * });
75  *
76  * //output
77  * // code: 6666, reason: Custom evil reason
78  * ```
79  *
80  * **openObserver**, Let's say we need to make some kind of init task before sending/receiving msgs to the
81  * webSocket or sending notification that the connection was successful, this is when
82  * openObserver is usefull for.
83  * ```ts
84  * import { webSocket } from 'rxjs/webSocket';
85  *
86  * const wsSubject = webSocket({
87  *     url: 'ws://localhost:8081',
88  *     openObserver: {
89  *         next: () => {
90  *             console.log('connetion ok');
91  *         }
92  *     },
93  * });
94  *
95  * //output
96  * // connetion ok`
97  * ```
98  * */
99
100 export interface WebSocketSubjectConfig<T> {
101   /** The url of the socket server to connect to */
102   url: string;
103   /** The protocol to use to connect */
104   protocol?: string | Array<string>;
105   /** @deprecated use {@link deserializer} */
106   resultSelector?: (e: MessageEvent) => T;
107   /**
108    * A serializer used to create messages from passed values before the
109    * messages are sent to the server. Defaults to JSON.stringify.
110    */
111   serializer?: (value: T) => WebSocketMessage;
112   /**
113    * A deserializer used for messages arriving on the socket from the
114    * server. Defaults to JSON.parse.
115    */
116   deserializer?: (e: MessageEvent) => T;
117   /**
118    * An Observer that watches when open events occur on the underlying web socket.
119    */
120   openObserver?: NextObserver<Event>;
121   /**
122    * An Observer than watches when close events occur on the underlying webSocket
123    */
124   closeObserver?: NextObserver<CloseEvent>;
125   /**
126    * An Observer that watches when a close is about to occur due to
127    * unsubscription.
128    */
129   closingObserver?: NextObserver<void>;
130   /**
131    * A WebSocket constructor to use. This is useful for situations like using a
132    * WebSocket impl in Node (WebSocket is a DOM API), or for mocking a WebSocket
133    * for testing purposes
134    */
135   WebSocketCtor?: { new(url: string, protocols?: string|string[]): WebSocket };
136   /** Sets the `binaryType` property of the underlying WebSocket. */
137   binaryType?: 'blob' | 'arraybuffer';
138 }
139
140 const DEFAULT_WEBSOCKET_CONFIG: WebSocketSubjectConfig<any> = {
141   url: '',
142   deserializer: (e: MessageEvent) => JSON.parse(e.data),
143   serializer: (value: any) => JSON.stringify(value),
144 };
145
146 const WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT =
147   'WebSocketSubject.error must be called with an object with an error code, and an optional reason: { code: number, reason: string }';
148
149 export type WebSocketMessage = string | ArrayBuffer | Blob | ArrayBufferView;
150
151 export class WebSocketSubject<T> extends AnonymousSubject<T> {
152
153   private _config: WebSocketSubjectConfig<T>;
154
155   /** @deprecated This is an internal implementation detail, do not use. */
156   _output: Subject<T>;
157
158   private _socket: WebSocket;
159
160   constructor(urlConfigOrSource: string | WebSocketSubjectConfig<T> | Observable<T>, destination?: Observer<T>) {
161     super();
162     if (urlConfigOrSource instanceof Observable) {
163       this.destination = destination;
164       this.source = urlConfigOrSource as Observable<T>;
165     } else {
166       const config = this._config = { ...DEFAULT_WEBSOCKET_CONFIG };
167       this._output = new Subject<T>();
168       if (typeof urlConfigOrSource === 'string') {
169         config.url = urlConfigOrSource;
170       } else {
171         for (let key in urlConfigOrSource) {
172           if (urlConfigOrSource.hasOwnProperty(key)) {
173             config[key] = urlConfigOrSource[key];
174           }
175         }
176       }
177
178       if (!config.WebSocketCtor && WebSocket) {
179         config.WebSocketCtor = WebSocket;
180       } else if (!config.WebSocketCtor) {
181         throw new Error('no WebSocket constructor can be found');
182       }
183       this.destination = new ReplaySubject();
184     }
185   }
186
187   lift<R>(operator: Operator<T, R>): WebSocketSubject<R> {
188     const sock = new WebSocketSubject<R>(this._config as WebSocketSubjectConfig<any>, <any> this.destination);
189     sock.operator = operator;
190     sock.source = this;
191     return sock;
192   }
193
194   private _resetState() {
195     this._socket = null;
196     if (!this.source) {
197       this.destination = new ReplaySubject();
198     }
199     this._output = new Subject<T>();
200   }
201
202   /**
203    * Creates an {@link Observable}, that when subscribed to, sends a message,
204    * defined by the `subMsg` function, to the server over the socket to begin a
205    * subscription to data over that socket. Once data arrives, the
206    * `messageFilter` argument will be used to select the appropriate data for
207    * the resulting Observable. When teardown occurs, either due to
208    * unsubscription, completion or error, a message defined by the `unsubMsg`
209    * argument will be send to the server over the WebSocketSubject.
210    *
211    * @param subMsg A function to generate the subscription message to be sent to
212    * the server. This will still be processed by the serializer in the
213    * WebSocketSubject's config. (Which defaults to JSON serialization)
214    * @param unsubMsg A function to generate the unsubscription message to be
215    * sent to the server at teardown. This will still be processed by the
216    * serializer in the WebSocketSubject's config.
217    * @param messageFilter A predicate for selecting the appropriate messages
218    * from the server for the output stream.
219    */
220   multiplex(subMsg: () => any, unsubMsg: () => any, messageFilter: (value: T) => boolean) {
221     const self = this;
222     return new Observable((observer: Observer<any>) => {
223       try {
224         self.next(subMsg());
225       } catch (err) {
226         observer.error(err);
227       }
228
229       const subscription = self.subscribe(x => {
230         try {
231           if (messageFilter(x)) {
232             observer.next(x);
233           }
234         } catch (err) {
235           observer.error(err);
236         }
237       },
238         err => observer.error(err),
239         () => observer.complete());
240
241       return () => {
242         try {
243           self.next(unsubMsg());
244         } catch (err) {
245           observer.error(err);
246         }
247         subscription.unsubscribe();
248       };
249     });
250   }
251
252   private _connectSocket() {
253     const { WebSocketCtor, protocol, url, binaryType } = this._config;
254     const observer = this._output;
255
256     let socket: WebSocket = null;
257     try {
258       socket = protocol ?
259         new WebSocketCtor(url, protocol) :
260         new WebSocketCtor(url);
261       this._socket = socket;
262       if (binaryType) {
263         this._socket.binaryType = binaryType;
264       }
265     } catch (e) {
266       observer.error(e);
267       return;
268     }
269
270     const subscription = new Subscription(() => {
271       this._socket = null;
272       if (socket && socket.readyState === 1) {
273         socket.close();
274       }
275     });
276
277     socket.onopen = (e: Event) => {
278       const { _socket } = this;
279       if (!_socket) {
280         socket.close();
281         this._resetState();
282         return;
283       }
284       const { openObserver } = this._config;
285       if (openObserver) {
286         openObserver.next(e);
287       }
288
289       const queue = this.destination;
290
291       this.destination = Subscriber.create<T>(
292         (x) => {
293           if (socket.readyState === 1) {
294             try {
295               const { serializer } = this._config;
296               socket.send(serializer(x));
297               } catch (e) {
298               this.destination.error(e);
299             }
300           }
301         },
302         (e) => {
303           const { closingObserver } = this._config;
304           if (closingObserver) {
305             closingObserver.next(undefined);
306           }
307           if (e && e.code) {
308             socket.close(e.code, e.reason);
309           } else {
310             observer.error(new TypeError(WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT));
311           }
312           this._resetState();
313         },
314         () => {
315           const { closingObserver } = this._config;
316           if (closingObserver) {
317             closingObserver.next(undefined);
318           }
319           socket.close();
320           this._resetState();
321         }
322       ) as Subscriber<any>;
323
324       if (queue && queue instanceof ReplaySubject) {
325         subscription.add((<ReplaySubject<T>>queue).subscribe(this.destination));
326       }
327     };
328
329     socket.onerror = (e: Event) => {
330       this._resetState();
331       observer.error(e);
332     };
333
334     socket.onclose = (e: CloseEvent) => {
335       this._resetState();
336       const { closeObserver } = this._config;
337       if (closeObserver) {
338         closeObserver.next(e);
339       }
340       if (e.wasClean) {
341         observer.complete();
342       } else {
343         observer.error(e);
344       }
345     };
346
347     socket.onmessage = (e: MessageEvent) => {
348       try {
349         const { deserializer } = this._config;
350         observer.next(deserializer(e));
351       } catch (err) {
352         observer.error(err);
353       }
354     };
355   }
356
357   /** @deprecated This is an internal implementation detail, do not use. */
358   _subscribe(subscriber: Subscriber<T>): Subscription {
359     const { source } = this;
360     if (source) {
361       return source.subscribe(subscriber);
362     }
363     if (!this._socket) {
364       this._connectSocket();
365     }
366     this._output.subscribe(subscriber);
367     subscriber.add(() => {
368       const { _socket } = this;
369       if (this._output.observers.length === 0) {
370         if (_socket && _socket.readyState === 1) {
371           _socket.close();
372         }
373         this._resetState();
374       }
375     });
376     return subscriber;
377   }
378
379   unsubscribe() {
380     const { _socket } = this;
381     if (_socket && _socket.readyState === 1) {
382       _socket.close();
383     }
384     this._resetState();
385     super.unsubscribe();
386   }
387 }