1 import { Subject, AnonymousSubject } from '../../Subject';
2 import { Subscriber } from '../../Subscriber';
3 import { Observable } from '../../Observable';
4 import { Subscription } from '../../Subscription';
5 import { Operator } from '../../Operator';
6 import { ReplaySubject } from '../../ReplaySubject';
7 import { Observer, NextObserver } from '../../types';
10 * WebSocketSubjectConfig is a plain Object that allows us to make our
11 * webSocket configurable.
13 * <span class="informal">Provides flexibility to {@link webSocket}</span>
15 * It defines a set of properties to provide custom behavior in specific
16 * moments of the socket's lifecycle. When the connection opens we can
17 * use `openObserver`, when the connection is closed `closeObserver`, if we
18 * are interested in listening for data comming from server: `deserializer`,
19 * which allows us to customize the deserialization strategy of data before passing it
20 * to the socket client. By default `deserializer` is going to apply `JSON.parse` to each message comming
24 * **deserializer**, the default for this property is `JSON.parse` but since there are just two options
25 * for incomming data, either be text or binarydata. We can apply a custom deserialization strategy
26 * or just simply skip the default behaviour.
28 * import { webSocket } from 'rxjs/webSocket';
30 * const wsSubject = webSocket({
31 * url: 'ws://localhost:8081',
32 * //Apply any transformation of your choice.
33 * deserializer: ({data}) => data
36 * wsSubject.subscribe(console.log);
38 * // Let's suppose we have this on the Server: ws.send("This is a msg from the server")
41 * // This is a msg from the server
44 * **serializer** allows us tom apply custom serialization strategy but for the outgoing messages
46 * import { webSocket } from 'rxjs/webSocket';
48 * const wsSubject = webSocket({
49 * url: 'ws://localhost:8081',
50 * //Apply any transformation of your choice.
51 * serializer: msg => JSON.stringify({channel: "webDevelopment", msg: msg})
54 * wsSubject.subscribe(() => subject.next("msg to the server"));
56 * // Let's suppose we have this on the Server: ws.send("This is a msg from the server")
59 * // {"channel":"webDevelopment","msg":"msg to the server"}
62 * **closeObserver** allows us to set a custom error when an error raise up.
64 * import { webSocket } from 'rxjs/webSocket';
66 * const wsSubject = webSocket({
67 * url: 'ws://localhost:8081',
70 const customError = { code: 6666, reason: "Custom evil reason" }
71 console.log(`code: ${customError.code}, reason: ${customError.reason}`);
77 * // code: 6666, reason: Custom evil reason
80 * **openObserver**, Let's say we need to make some kind of init task before sending/receiving msgs to the
81 * webSocket or sending notification that the connection was successful, this is when
82 * openObserver is usefull for.
84 * import { webSocket } from 'rxjs/webSocket';
86 * const wsSubject = webSocket({
87 * url: 'ws://localhost:8081',
90 * console.log('connetion ok');
100 export interface WebSocketSubjectConfig<T> {
101 /** The url of the socket server to connect to */
103 /** The protocol to use to connect */
104 protocol?: string | Array<string>;
105 /** @deprecated use {@link deserializer} */
106 resultSelector?: (e: MessageEvent) => T;
108 * A serializer used to create messages from passed values before the
109 * messages are sent to the server. Defaults to JSON.stringify.
111 serializer?: (value: T) => WebSocketMessage;
113 * A deserializer used for messages arriving on the socket from the
114 * server. Defaults to JSON.parse.
116 deserializer?: (e: MessageEvent) => T;
118 * An Observer that watches when open events occur on the underlying web socket.
120 openObserver?: NextObserver<Event>;
122 * An Observer than watches when close events occur on the underlying webSocket
124 closeObserver?: NextObserver<CloseEvent>;
126 * An Observer that watches when a close is about to occur due to
129 closingObserver?: NextObserver<void>;
131 * A WebSocket constructor to use. This is useful for situations like using a
132 * WebSocket impl in Node (WebSocket is a DOM API), or for mocking a WebSocket
133 * for testing purposes
135 WebSocketCtor?: { new(url: string, protocols?: string|string[]): WebSocket };
136 /** Sets the `binaryType` property of the underlying WebSocket. */
137 binaryType?: 'blob' | 'arraybuffer';
140 const DEFAULT_WEBSOCKET_CONFIG: WebSocketSubjectConfig<any> = {
142 deserializer: (e: MessageEvent) => JSON.parse(e.data),
143 serializer: (value: any) => JSON.stringify(value),
146 const WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT =
147 'WebSocketSubject.error must be called with an object with an error code, and an optional reason: { code: number, reason: string }';
149 export type WebSocketMessage = string | ArrayBuffer | Blob | ArrayBufferView;
151 export class WebSocketSubject<T> extends AnonymousSubject<T> {
153 private _config: WebSocketSubjectConfig<T>;
155 /** @deprecated This is an internal implementation detail, do not use. */
158 private _socket: WebSocket;
160 constructor(urlConfigOrSource: string | WebSocketSubjectConfig<T> | Observable<T>, destination?: Observer<T>) {
162 if (urlConfigOrSource instanceof Observable) {
163 this.destination = destination;
164 this.source = urlConfigOrSource as Observable<T>;
166 const config = this._config = { ...DEFAULT_WEBSOCKET_CONFIG };
167 this._output = new Subject<T>();
168 if (typeof urlConfigOrSource === 'string') {
169 config.url = urlConfigOrSource;
171 for (let key in urlConfigOrSource) {
172 if (urlConfigOrSource.hasOwnProperty(key)) {
173 config[key] = urlConfigOrSource[key];
178 if (!config.WebSocketCtor && WebSocket) {
179 config.WebSocketCtor = WebSocket;
180 } else if (!config.WebSocketCtor) {
181 throw new Error('no WebSocket constructor can be found');
183 this.destination = new ReplaySubject();
187 lift<R>(operator: Operator<T, R>): WebSocketSubject<R> {
188 const sock = new WebSocketSubject<R>(this._config as WebSocketSubjectConfig<any>, <any> this.destination);
189 sock.operator = operator;
194 private _resetState() {
197 this.destination = new ReplaySubject();
199 this._output = new Subject<T>();
203 * Creates an {@link Observable}, that when subscribed to, sends a message,
204 * defined by the `subMsg` function, to the server over the socket to begin a
205 * subscription to data over that socket. Once data arrives, the
206 * `messageFilter` argument will be used to select the appropriate data for
207 * the resulting Observable. When teardown occurs, either due to
208 * unsubscription, completion or error, a message defined by the `unsubMsg`
209 * argument will be send to the server over the WebSocketSubject.
211 * @param subMsg A function to generate the subscription message to be sent to
212 * the server. This will still be processed by the serializer in the
213 * WebSocketSubject's config. (Which defaults to JSON serialization)
214 * @param unsubMsg A function to generate the unsubscription message to be
215 * sent to the server at teardown. This will still be processed by the
216 * serializer in the WebSocketSubject's config.
217 * @param messageFilter A predicate for selecting the appropriate messages
218 * from the server for the output stream.
220 multiplex(subMsg: () => any, unsubMsg: () => any, messageFilter: (value: T) => boolean) {
222 return new Observable((observer: Observer<any>) => {
229 const subscription = self.subscribe(x => {
231 if (messageFilter(x)) {
238 err => observer.error(err),
239 () => observer.complete());
243 self.next(unsubMsg());
247 subscription.unsubscribe();
252 private _connectSocket() {
253 const { WebSocketCtor, protocol, url, binaryType } = this._config;
254 const observer = this._output;
256 let socket: WebSocket = null;
259 new WebSocketCtor(url, protocol) :
260 new WebSocketCtor(url);
261 this._socket = socket;
263 this._socket.binaryType = binaryType;
270 const subscription = new Subscription(() => {
272 if (socket && socket.readyState === 1) {
277 socket.onopen = (e: Event) => {
278 const { _socket } = this;
284 const { openObserver } = this._config;
286 openObserver.next(e);
289 const queue = this.destination;
291 this.destination = Subscriber.create<T>(
293 if (socket.readyState === 1) {
295 const { serializer } = this._config;
296 socket.send(serializer(x));
298 this.destination.error(e);
303 const { closingObserver } = this._config;
304 if (closingObserver) {
305 closingObserver.next(undefined);
308 socket.close(e.code, e.reason);
310 observer.error(new TypeError(WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT));
315 const { closingObserver } = this._config;
316 if (closingObserver) {
317 closingObserver.next(undefined);
322 ) as Subscriber<any>;
324 if (queue && queue instanceof ReplaySubject) {
325 subscription.add((<ReplaySubject<T>>queue).subscribe(this.destination));
329 socket.onerror = (e: Event) => {
334 socket.onclose = (e: CloseEvent) => {
336 const { closeObserver } = this._config;
338 closeObserver.next(e);
347 socket.onmessage = (e: MessageEvent) => {
349 const { deserializer } = this._config;
350 observer.next(deserializer(e));
357 /** @deprecated This is an internal implementation detail, do not use. */
358 _subscribe(subscriber: Subscriber<T>): Subscription {
359 const { source } = this;
361 return source.subscribe(subscriber);
364 this._connectSocket();
366 this._output.subscribe(subscriber);
367 subscriber.add(() => {
368 const { _socket } = this;
369 if (this._output.observers.length === 0) {
370 if (_socket && _socket.readyState === 1) {
380 const { _socket } = this;
381 if (_socket && _socket.readyState === 1) {