3 const Limiter = require('async-limiter');
4 const zlib = require('zlib');
6 const bufferUtil = require('./buffer-util');
7 const constants = require('./constants');
9 const TRAILER = Buffer.from([0x00, 0x00, 0xff, 0xff]);
10 const EMPTY_BLOCK = Buffer.from([0x00]);
12 const kPerMessageDeflate = Symbol('permessage-deflate');
13 const kWriteInProgress = Symbol('write-in-progress');
14 const kPendingClose = Symbol('pending-close');
15 const kTotalLength = Symbol('total-length');
16 const kCallback = Symbol('callback');
17 const kBuffers = Symbol('buffers');
18 const kError = Symbol('error');
21 // We limit zlib concurrency, which prevents severe memory fragmentation
22 // as documented in https://github.com/nodejs/node/issues/8871#issuecomment-250915913
23 // and https://github.com/websockets/ws/issues/1202
25 // Intentionally global; it's the global thread pool that's an issue.
30 * permessage-deflate implementation.
32 class PerMessageDeflate {
34 * Creates a PerMessageDeflate instance.
36 * @param {Object} options Configuration options
37 * @param {Boolean} options.serverNoContextTakeover Request/accept disabling
38 * of server context takeover
39 * @param {Boolean} options.clientNoContextTakeover Advertise/acknowledge
40 * disabling of client context takeover
41 * @param {(Boolean|Number)} options.serverMaxWindowBits Request/confirm the
42 * use of a custom server window size
43 * @param {(Boolean|Number)} options.clientMaxWindowBits Advertise support
44 * for, or request, a custom client window size
45 * @param {Object} options.zlibDeflateOptions Options to pass to zlib on deflate
46 * @param {Object} options.zlibInflateOptions Options to pass to zlib on inflate
47 * @param {Number} options.threshold Size (in bytes) below which messages
48 * should not be compressed
49 * @param {Number} options.concurrencyLimit The number of concurrent calls to
51 * @param {Boolean} isServer Create the instance in either server or client
53 * @param {Number} maxPayload The maximum allowed message length
55 constructor (options, isServer, maxPayload) {
56 this._maxPayload = maxPayload | 0;
57 this._options = options || {};
58 this._threshold = this._options.threshold !== undefined
59 ? this._options.threshold
61 this._isServer = !!isServer;
68 const concurrency = this._options.concurrencyLimit !== undefined
69 ? this._options.concurrencyLimit
71 zlibLimiter = new Limiter({ concurrency });
78 static get extensionName () {
79 return 'permessage-deflate';
83 * Create an extension negotiation offer.
85 * @return {Object} Extension parameters
91 if (this._options.serverNoContextTakeover) {
92 params.server_no_context_takeover = true;
94 if (this._options.clientNoContextTakeover) {
95 params.client_no_context_takeover = true;
97 if (this._options.serverMaxWindowBits) {
98 params.server_max_window_bits = this._options.serverMaxWindowBits;
100 if (this._options.clientMaxWindowBits) {
101 params.client_max_window_bits = this._options.clientMaxWindowBits;
102 } else if (this._options.clientMaxWindowBits == null) {
103 params.client_max_window_bits = true;
110 * Accept an extension negotiation offer/response.
112 * @param {Array} configurations The extension negotiation offers/reponse
113 * @return {Object} Accepted configuration
116 accept (configurations) {
117 configurations = this.normalizeParams(configurations);
119 this.params = this._isServer
120 ? this.acceptAsServer(configurations)
121 : this.acceptAsClient(configurations);
127 * Releases all resources used by the extension.
133 if (this._inflate[kWriteInProgress]) {
134 this._inflate[kPendingClose] = true;
136 this._inflate.close();
137 this._inflate = null;
141 if (this._deflate[kWriteInProgress]) {
142 this._deflate[kPendingClose] = true;
144 this._deflate.close();
145 this._deflate = null;
151 * Accept an extension negotiation offer.
153 * @param {Array} offers The extension negotiation offers
154 * @return {Object} Accepted configuration
157 acceptAsServer (offers) {
158 const opts = this._options;
159 const accepted = offers.find((params) => {
161 (opts.serverNoContextTakeover === false &&
162 params.server_no_context_takeover) ||
163 (params.server_max_window_bits &&
164 (opts.serverMaxWindowBits === false ||
165 (typeof opts.serverMaxWindowBits === 'number' &&
166 opts.serverMaxWindowBits > params.server_max_window_bits))) ||
167 (typeof opts.clientMaxWindowBits === 'number' &&
168 !params.client_max_window_bits)
177 throw new Error('None of the extension offers can be accepted');
180 if (opts.serverNoContextTakeover) {
181 accepted.server_no_context_takeover = true;
183 if (opts.clientNoContextTakeover) {
184 accepted.client_no_context_takeover = true;
186 if (typeof opts.serverMaxWindowBits === 'number') {
187 accepted.server_max_window_bits = opts.serverMaxWindowBits;
189 if (typeof opts.clientMaxWindowBits === 'number') {
190 accepted.client_max_window_bits = opts.clientMaxWindowBits;
192 accepted.client_max_window_bits === true ||
193 opts.clientMaxWindowBits === false
195 delete accepted.client_max_window_bits;
202 * Accept the extension negotiation response.
204 * @param {Array} response The extension negotiation response
205 * @return {Object} Accepted configuration
208 acceptAsClient (response) {
209 const params = response[0];
212 this._options.clientNoContextTakeover === false &&
213 params.client_no_context_takeover
215 throw new Error('Unexpected parameter "client_no_context_takeover"');
218 if (!params.client_max_window_bits) {
219 if (typeof this._options.clientMaxWindowBits === 'number') {
220 params.client_max_window_bits = this._options.clientMaxWindowBits;
223 this._options.clientMaxWindowBits === false ||
224 (typeof this._options.clientMaxWindowBits === 'number' &&
225 params.client_max_window_bits > this._options.clientMaxWindowBits)
228 'Unexpected or invalid parameter "client_max_window_bits"'
236 * Normalize parameters.
238 * @param {Array} configurations The extension negotiation offers/reponse
239 * @return {Array} The offers/response with normalized parameters
242 normalizeParams (configurations) {
243 configurations.forEach((params) => {
244 Object.keys(params).forEach((key) => {
245 var value = params[key];
247 if (value.length > 1) {
248 throw new Error(`Parameter "${key}" must have only a single value`);
253 if (key === 'client_max_window_bits') {
254 if (value !== true) {
256 if (!Number.isInteger(num) || num < 8 || num > 15) {
258 `Invalid value for parameter "${key}": ${value}`
262 } else if (!this._isServer) {
264 `Invalid value for parameter "${key}": ${value}`
267 } else if (key === 'server_max_window_bits') {
269 if (!Number.isInteger(num) || num < 8 || num > 15) {
271 `Invalid value for parameter "${key}": ${value}`
276 key === 'client_no_context_takeover' ||
277 key === 'server_no_context_takeover'
279 if (value !== true) {
281 `Invalid value for parameter "${key}": ${value}`
285 throw new Error(`Unknown parameter "${key}"`);
292 return configurations;
296 * Decompress data. Concurrency limited by async-limiter.
298 * @param {Buffer} data Compressed data
299 * @param {Boolean} fin Specifies whether or not this is the last fragment
300 * @param {Function} callback Callback
303 decompress (data, fin, callback) {
304 zlibLimiter.push((done) => {
305 this._decompress(data, fin, (err, result) => {
307 callback(err, result);
313 * Compress data. Concurrency limited by async-limiter.
315 * @param {Buffer} data Data to compress
316 * @param {Boolean} fin Specifies whether or not this is the last fragment
317 * @param {Function} callback Callback
320 compress (data, fin, callback) {
321 zlibLimiter.push((done) => {
322 this._compress(data, fin, (err, result) => {
324 callback(err, result);
332 * @param {Buffer} data Compressed data
333 * @param {Boolean} fin Specifies whether or not this is the last fragment
334 * @param {Function} callback Callback
337 _decompress (data, fin, callback) {
338 const endpoint = this._isServer ? 'client' : 'server';
340 if (!this._inflate) {
341 const key = `${endpoint}_max_window_bits`;
342 const windowBits = typeof this.params[key] !== 'number'
343 ? zlib.Z_DEFAULT_WINDOWBITS
346 this._inflate = zlib.createInflateRaw(
347 Object.assign({}, this._options.zlibInflateOptions, { windowBits })
349 this._inflate[kPerMessageDeflate] = this;
350 this._inflate[kTotalLength] = 0;
351 this._inflate[kBuffers] = [];
352 this._inflate.on('error', inflateOnError);
353 this._inflate.on('data', inflateOnData);
356 this._inflate[kCallback] = callback;
357 this._inflate[kWriteInProgress] = true;
359 this._inflate.write(data);
360 if (fin) this._inflate.write(TRAILER);
362 this._inflate.flush(() => {
363 const err = this._inflate[kError];
366 this._inflate.close();
367 this._inflate = null;
372 const data = bufferUtil.concat(
373 this._inflate[kBuffers],
374 this._inflate[kTotalLength]
378 (fin && this.params[`${endpoint}_no_context_takeover`]) ||
379 this._inflate[kPendingClose]
381 this._inflate.close();
382 this._inflate = null;
384 this._inflate[kWriteInProgress] = false;
385 this._inflate[kTotalLength] = 0;
386 this._inflate[kBuffers] = [];
389 callback(null, data);
396 * @param {Buffer} data Data to compress
397 * @param {Boolean} fin Specifies whether or not this is the last fragment
398 * @param {Function} callback Callback
401 _compress (data, fin, callback) {
402 if (!data || data.length === 0) {
403 process.nextTick(callback, null, EMPTY_BLOCK);
407 const endpoint = this._isServer ? 'server' : 'client';
409 if (!this._deflate) {
410 const key = `${endpoint}_max_window_bits`;
411 const windowBits = typeof this.params[key] !== 'number'
412 ? zlib.Z_DEFAULT_WINDOWBITS
415 this._deflate = zlib.createDeflateRaw(
417 // TODO deprecate memLevel/level and recommend zlibDeflateOptions instead
419 memLevel: this._options.memLevel,
420 level: this._options.level
422 this._options.zlibDeflateOptions,
427 this._deflate[kTotalLength] = 0;
428 this._deflate[kBuffers] = [];
431 // `zlib.DeflateRaw` emits an `'error'` event only when an attempt to use
432 // it is made after it has already been closed. This cannot happen here,
433 // so we only add a listener for the `'data'` event.
435 this._deflate.on('data', deflateOnData);
438 this._deflate[kWriteInProgress] = true;
440 this._deflate.write(data);
441 this._deflate.flush(zlib.Z_SYNC_FLUSH, () => {
442 var data = bufferUtil.concat(
443 this._deflate[kBuffers],
444 this._deflate[kTotalLength]
447 if (fin) data = data.slice(0, data.length - 4);
450 (fin && this.params[`${endpoint}_no_context_takeover`]) ||
451 this._deflate[kPendingClose]
453 this._deflate.close();
454 this._deflate = null;
456 this._deflate[kWriteInProgress] = false;
457 this._deflate[kTotalLength] = 0;
458 this._deflate[kBuffers] = [];
461 callback(null, data);
466 module.exports = PerMessageDeflate;
469 * The listener of the `zlib.DeflateRaw` stream `'data'` event.
471 * @param {Buffer} chunk A chunk of data
474 function deflateOnData (chunk) {
475 this[kBuffers].push(chunk);
476 this[kTotalLength] += chunk.length;
480 * The listener of the `zlib.InflateRaw` stream `'data'` event.
482 * @param {Buffer} chunk A chunk of data
485 function inflateOnData (chunk) {
486 this[kTotalLength] += chunk.length;
489 this[kPerMessageDeflate]._maxPayload < 1 ||
490 this[kTotalLength] <= this[kPerMessageDeflate]._maxPayload
492 this[kBuffers].push(chunk);
496 this[kError] = new RangeError('Max payload size exceeded');
497 this[kError][constants.kStatusCode] = 1009;
498 this.removeListener('data', inflateOnData);
503 * The listener of the `zlib.InflateRaw` stream `'error'` event.
505 * @param {Error} err The emitted error
508 function inflateOnError (err) {
510 // There is no need to call `Zlib#close()` as the handle is automatically
511 // closed when an error is emitted.
513 this[kPerMessageDeflate]._inflate = null;
514 err[constants.kStatusCode] = 1007;
515 this[kCallback](err);