4 * https://github.com/teambition/merge2
6 * Copyright (c) 2014-2020 Teambition
7 * Licensed under the MIT license.
9 const Stream = require('stream')
10 const PassThrough = Stream.PassThrough
11 const slice = Array.prototype.slice
13 module.exports = merge2
16 const streamsQueue = []
17 const args = slice.call(arguments)
19 let options = args[args.length - 1]
21 if (options && !Array.isArray(options) && options.pipe == null) {
27 const doEnd = options.end !== false
28 const doPipeError = options.pipeError === true
29 if (options.objectMode == null) {
30 options.objectMode = true
32 if (options.highWaterMark == null) {
33 options.highWaterMark = 64 * 1024
35 const mergedStream = PassThrough(options)
37 function addStream () {
38 for (let i = 0, len = arguments.length; i < len; i++) {
39 streamsQueue.push(pauseStreams(arguments[i], options))
45 function mergeStream () {
51 let streams = streamsQueue.shift()
53 process.nextTick(endStream)
56 if (!Array.isArray(streams)) {
60 let pipesCount = streams.length + 1
63 if (--pipesCount > 0) {
70 function pipe (stream) {
72 stream.removeListener('merge2UnpipeEnd', onend)
73 stream.removeListener('end', onend)
75 stream.removeListener('error', onerror)
79 function onerror (err) {
80 mergedStream.emit('error', err)
83 if (stream._readableState.endEmitted) {
87 stream.on('merge2UnpipeEnd', onend)
88 stream.on('end', onend)
91 stream.on('error', onerror)
94 stream.pipe(mergedStream, { end: false })
95 // compatible for old stream
99 for (let i = 0; i < streams.length; i++) {
106 function endStream () {
108 // emit 'queueDrain' when all streams merged.
109 mergedStream.emit('queueDrain')
115 mergedStream.setMaxListeners(0)
116 mergedStream.add = addStream
117 mergedStream.on('unpipe', function (stream) {
118 stream.emit('merge2UnpipeEnd')
122 addStream.apply(null, args)
127 // check and pause streams for pipe.
128 function pauseStreams (streams, options) {
129 if (!Array.isArray(streams)) {
130 // Backwards-compat with old-style streams
131 if (!streams._readableState && streams.pipe) {
132 streams = streams.pipe(PassThrough(options))
134 if (!streams._readableState || !streams.pause || !streams.pipe) {
135 throw new Error('Only readable stream can be merged.')
139 for (let i = 0, len = streams.length; i < len; i++) {
140 streams[i] = pauseStreams(streams[i], options)