1 var Stream = require('stream')
5 // a stream that does nothing but re-emit the input.
6 // useful for aggregating a series of changing but not ending streams into one stream)
8 exports = module.exports = through
9 through.through = through
11 //create a readable writable stream.
13 function through (write, end, opts) {
14 write = write || function (data) { this.queue(data) }
15 end = end || function () { this.queue(null) }
17 var ended = false, destroyed = false, buffer = [], _ended = false
18 var stream = new Stream()
19 stream.readable = stream.writable = true
22 // stream.autoPause = !(opts && opts.autoPause === false)
23 stream.autoDestroy = !(opts && opts.autoDestroy === false)
25 stream.write = function (data) {
26 write.call(this, data)
31 while(buffer.length && !stream.paused) {
32 var data = buffer.shift()
34 return stream.emit('end')
36 stream.emit('data', data)
40 stream.queue = stream.push = function (data) {
41 // console.error(ended)
42 if(_ended) return stream
43 if(data === null) _ended = true
49 //this will be registered as the first 'end' listener
50 //must call destroy next tick, to make sure we're after any
51 //stream piped from here.
52 //this is only a problem if end is not emitted synchronously.
53 //a nicer way to do this is to make sure this is the last listener for 'end'
55 stream.on('end', function () {
56 stream.readable = false
57 if(!stream.writable && stream.autoDestroy)
58 process.nextTick(function () {
64 stream.writable = false
66 if(!stream.readable && stream.autoDestroy)
70 stream.end = function (data) {
73 if(arguments.length) stream.write(data)
74 _end() // will emit or queue
78 stream.destroy = function () {
83 stream.writable = stream.readable = false
88 stream.pause = function () {
89 if(stream.paused) return
94 stream.resume = function () {
100 //may have become paused again,
101 //as drain emits 'data'.