| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 | 
							- var Stream = require('stream')
 
- // through
 
- //
 
- // a stream that does nothing but re-emit the input.
 
- // useful for aggregating a series of changing but not ending streams into one stream)
 
- exports = module.exports = through
 
- through.through = through
 
- //create a readable writable stream.
 
- function through (write, end, opts) {
 
-   write = write || function (data) { this.queue(data) }
 
-   end = end || function () { this.queue(null) }
 
-   var ended = false, destroyed = false, buffer = [], _ended = false
 
-   var stream = new Stream()
 
-   stream.readable = stream.writable = true
 
-   stream.paused = false
 
- //  stream.autoPause   = !(opts && opts.autoPause   === false)
 
-   stream.autoDestroy = !(opts && opts.autoDestroy === false)
 
-   stream.write = function (data) {
 
-     write.call(this, data)
 
-     return !stream.paused
 
-   }
 
-   function drain() {
 
-     while(buffer.length && !stream.paused) {
 
-       var data = buffer.shift()
 
-       if(null === data)
 
-         return stream.emit('end')
 
-       else
 
-         stream.emit('data', data)
 
-     }
 
-   }
 
-   stream.queue = stream.push = function (data) {
 
- //    console.error(ended)
 
-     if(_ended) return stream
 
-     if(data === null) _ended = true
 
-     buffer.push(data)
 
-     drain()
 
-     return stream
 
-   }
 
-   //this will be registered as the first 'end' listener
 
-   //must call destroy next tick, to make sure we're after any
 
-   //stream piped from here.
 
-   //this is only a problem if end is not emitted synchronously.
 
-   //a nicer way to do this is to make sure this is the last listener for 'end'
 
-   stream.on('end', function () {
 
-     stream.readable = false
 
-     if(!stream.writable && stream.autoDestroy)
 
-       process.nextTick(function () {
 
-         stream.destroy()
 
-       })
 
-   })
 
-   function _end () {
 
-     stream.writable = false
 
-     end.call(stream)
 
-     if(!stream.readable && stream.autoDestroy)
 
-       stream.destroy()
 
-   }
 
-   stream.end = function (data) {
 
-     if(ended) return
 
-     ended = true
 
-     if(arguments.length) stream.write(data)
 
-     _end() // will emit or queue
 
-     return stream
 
-   }
 
-   stream.destroy = function () {
 
-     if(destroyed) return
 
-     destroyed = true
 
-     ended = true
 
-     buffer.length = 0
 
-     stream.writable = stream.readable = false
 
-     stream.emit('close')
 
-     return stream
 
-   }
 
-   stream.pause = function () {
 
-     if(stream.paused) return
 
-     stream.paused = true
 
-     return stream
 
-   }
 
-   stream.resume = function () {
 
-     if(stream.paused) {
 
-       stream.paused = false
 
-       stream.emit('resume')
 
-     }
 
-     drain()
 
-     //may have become paused again,
 
-     //as drain emits 'data'.
 
-     if(!stream.paused)
 
-       stream.emit('drain')
 
-     return stream
 
-   }
 
-   return stream
 
- }
 
 
  |