3 const stream = require('stream');
5 const PerMessageDeflate = require('./permessage-deflate');
6 const bufferUtil = require('./buffer-util');
7 const validation = require('./validation');
8 const constants = require('./constants');
11 const GET_PAYLOAD_LENGTH_16 = 1;
12 const GET_PAYLOAD_LENGTH_64 = 2;
18 * HyBi Receiver implementation.
20 * @extends stream.Writable
22 class Receiver extends stream.Writable {
24 * Creates a Receiver instance.
26 * @param {String} binaryType The type for binary data
27 * @param {Object} extensions An object containing the negotiated extensions
28 * @param {Number} maxPayload The maximum allowed message length
30 constructor (binaryType, extensions, maxPayload) {
33 this._binaryType = binaryType || constants.BINARY_TYPES[0];
34 this[constants.kWebSocket] = undefined;
35 this._extensions = extensions || {};
36 this._maxPayload = maxPayload | 0;
38 this._bufferedBytes = 0;
41 this._compressed = false;
42 this._payloadLength = 0;
43 this._mask = undefined;
49 this._totalPayloadLength = 0;
50 this._messageLength = 0;
53 this._state = GET_INFO;
58 * Implements `Writable.prototype._write()`.
60 * @param {Buffer} chunk The chunk of data to write
61 * @param {String} encoding The character encoding of `chunk`
62 * @param {Function} cb Callback
64 _write (chunk, encoding, cb) {
65 if (this._opcode === 0x08) return cb();
67 this._bufferedBytes += chunk.length;
68 this._buffers.push(chunk);
73 * Consumes `n` bytes from the buffered data.
75 * @param {Number} n The number of bytes to consume
76 * @return {Buffer} The consumed bytes
80 this._bufferedBytes -= n;
82 if (n === this._buffers[0].length) return this._buffers.shift();
84 if (n < this._buffers[0].length) {
85 const buf = this._buffers[0];
86 this._buffers[0] = buf.slice(n);
87 return buf.slice(0, n);
90 const dst = Buffer.allocUnsafe(n);
93 const buf = this._buffers[0];
95 if (n >= buf.length) {
96 this._buffers.shift().copy(dst, dst.length - n);
98 buf.copy(dst, dst.length - n, 0, n);
99 this._buffers[0] = buf.slice(n);
109 * Starts the parsing loop.
111 * @param {Function} cb Callback
119 switch (this._state) {
121 err = this.getInfo();
123 case GET_PAYLOAD_LENGTH_16:
124 err = this.getPayloadLength16();
126 case GET_PAYLOAD_LENGTH_64:
127 err = this.getPayloadLength64();
133 err = this.getData(cb);
135 default: // `INFLATING`
139 } while (this._loop);
145 * Reads the first two bytes of a frame.
147 * @return {(RangeError|undefined)} A possible error
151 if (this._bufferedBytes < 2) {
156 const buf = this.consume(2);
158 if ((buf[0] & 0x30) !== 0x00) {
160 return error(RangeError, 'RSV2 and RSV3 must be clear', true, 1002);
163 const compressed = (buf[0] & 0x40) === 0x40;
165 if (compressed && !this._extensions[PerMessageDeflate.extensionName]) {
167 return error(RangeError, 'RSV1 must be clear', true, 1002);
170 this._fin = (buf[0] & 0x80) === 0x80;
171 this._opcode = buf[0] & 0x0f;
172 this._payloadLength = buf[1] & 0x7f;
174 if (this._opcode === 0x00) {
177 return error(RangeError, 'RSV1 must be clear', true, 1002);
180 if (!this._fragmented) {
182 return error(RangeError, 'invalid opcode 0', true, 1002);
185 this._opcode = this._fragmented;
186 } else if (this._opcode === 0x01 || this._opcode === 0x02) {
187 if (this._fragmented) {
189 return error(RangeError, `invalid opcode ${this._opcode}`, true, 1002);
192 this._compressed = compressed;
193 } else if (this._opcode > 0x07 && this._opcode < 0x0b) {
196 return error(RangeError, 'FIN must be set', true, 1002);
201 return error(RangeError, 'RSV1 must be clear', true, 1002);
204 if (this._payloadLength > 0x7d) {
208 `invalid payload length ${this._payloadLength}`,
215 return error(RangeError, `invalid opcode ${this._opcode}`, true, 1002);
218 if (!this._fin && !this._fragmented) this._fragmented = this._opcode;
219 this._masked = (buf[1] & 0x80) === 0x80;
221 if (this._payloadLength === 126) this._state = GET_PAYLOAD_LENGTH_16;
222 else if (this._payloadLength === 127) this._state = GET_PAYLOAD_LENGTH_64;
223 else return this.haveLength();
227 * Gets extended payload length (7+16).
229 * @return {(RangeError|undefined)} A possible error
232 getPayloadLength16 () {
233 if (this._bufferedBytes < 2) {
238 this._payloadLength = this.consume(2).readUInt16BE(0);
239 return this.haveLength();
243 * Gets extended payload length (7+64).
245 * @return {(RangeError|undefined)} A possible error
248 getPayloadLength64 () {
249 if (this._bufferedBytes < 8) {
254 const buf = this.consume(8);
255 const num = buf.readUInt32BE(0);
258 // The maximum safe integer in JavaScript is 2^53 - 1. An error is returned
259 // if payload length is greater than this number.
261 if (num > Math.pow(2, 53 - 32) - 1) {
265 'Unsupported WebSocket frame: payload length > 2^53 - 1',
271 this._payloadLength = num * Math.pow(2, 32) + buf.readUInt32BE(4);
272 return this.haveLength();
276 * Payload length has been read.
278 * @return {(RangeError|undefined)} A possible error
282 if (this._payloadLength && this._opcode < 0x08) {
283 this._totalPayloadLength += this._payloadLength;
284 if (this._totalPayloadLength > this._maxPayload && this._maxPayload > 0) {
286 return error(RangeError, 'Max payload size exceeded', false, 1009);
290 if (this._masked) this._state = GET_MASK;
291 else this._state = GET_DATA;
300 if (this._bufferedBytes < 4) {
305 this._mask = this.consume(4);
306 this._state = GET_DATA;
312 * @param {Function} cb Callback
313 * @return {(Error|RangeError|undefined)} A possible error
317 var data = constants.EMPTY_BUFFER;
319 if (this._payloadLength) {
320 if (this._bufferedBytes < this._payloadLength) {
325 data = this.consume(this._payloadLength);
326 if (this._masked) bufferUtil.unmask(data, this._mask);
329 if (this._opcode > 0x07) return this.controlMessage(data);
331 if (this._compressed) {
332 this._state = INFLATING;
333 this.decompress(data, cb);
339 // This message is not compressed so its lenght is the sum of the payload
340 // length of all fragments.
342 this._messageLength = this._totalPayloadLength;
343 this._fragments.push(data);
346 return this.dataMessage();
352 * @param {Buffer} data Compressed data
353 * @param {Function} cb Callback
356 decompress (data, cb) {
357 const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
359 perMessageDeflate.decompress(data, this._fin, (err, buf) => {
360 if (err) return cb(err);
363 this._messageLength += buf.length;
364 if (this._messageLength > this._maxPayload && this._maxPayload > 0) {
365 return cb(error(RangeError, 'Max payload size exceeded', false, 1009));
368 this._fragments.push(buf);
371 const er = this.dataMessage();
372 if (er) return cb(er);
379 * Handles a data message.
381 * @return {(Error|undefined)} A possible error
386 const messageLength = this._messageLength;
387 const fragments = this._fragments;
389 this._totalPayloadLength = 0;
390 this._messageLength = 0;
391 this._fragmented = 0;
392 this._fragments = [];
394 if (this._opcode === 2) {
397 if (this._binaryType === 'nodebuffer') {
398 data = toBuffer(fragments, messageLength);
399 } else if (this._binaryType === 'arraybuffer') {
400 data = toArrayBuffer(toBuffer(fragments, messageLength));
405 this.emit('message', data);
407 const buf = toBuffer(fragments, messageLength);
409 if (!validation.isValidUTF8(buf)) {
411 return error(Error, 'invalid UTF-8 sequence', true, 1007);
414 this.emit('message', buf.toString());
418 this._state = GET_INFO;
422 * Handles a control message.
424 * @param {Buffer} data Data to handle
425 * @return {(Error|RangeError|undefined)} A possible error
428 controlMessage (data) {
429 if (this._opcode === 0x08) {
432 if (data.length === 0) {
433 this.emit('conclude', 1005, '');
435 } else if (data.length === 1) {
436 return error(RangeError, 'invalid payload length 1', true, 1002);
438 const code = data.readUInt16BE(0);
440 if (!validation.isValidStatusCode(code)) {
441 return error(RangeError, `invalid status code ${code}`, true, 1002);
444 const buf = data.slice(2);
446 if (!validation.isValidUTF8(buf)) {
447 return error(Error, 'invalid UTF-8 sequence', true, 1007);
450 this.emit('conclude', code, buf.toString());
457 if (this._opcode === 0x09) this.emit('ping', data);
458 else this.emit('pong', data);
460 this._state = GET_INFO;
464 module.exports = Receiver;
467 * Builds an error object.
469 * @param {(Error|RangeError)} ErrorCtor The error constructor
470 * @param {String} message The error message
471 * @param {Boolean} prefix Specifies whether or not to add a default prefix to
473 * @param {Number} statusCode The status code
474 * @return {(Error|RangeError)} The error
477 function error (ErrorCtor, message, prefix, statusCode) {
478 const err = new ErrorCtor(
479 prefix ? `Invalid WebSocket frame: ${message}` : message
482 Error.captureStackTrace(err, error);
483 err[constants.kStatusCode] = statusCode;
488 * Makes a buffer from a list of fragments.
490 * @param {Buffer[]} fragments The list of fragments composing the message
491 * @param {Number} messageLength The length of the message
495 function toBuffer (fragments, messageLength) {
496 if (fragments.length === 1) return fragments[0];
497 if (fragments.length > 1) return bufferUtil.concat(fragments, messageLength);
498 return constants.EMPTY_BUFFER;
502 * Converts a buffer to an `ArrayBuffer`.
504 * @param {Buffer} The buffer to convert
505 * @return {ArrayBuffer} Converted buffer
507 function toArrayBuffer (buf) {
508 if (buf.byteOffset === 0 && buf.byteLength === buf.buffer.byteLength) {
512 return buf.buffer.slice(buf.byteOffset, buf.byteOffset + buf.byteLength);