| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137 | 
							- //filter will reemit the data if cb(err,pass) pass is truthy
 
- // reduce is more tricky
 
- // maybe we want to group the reductions or emit progress updates occasionally
 
- // the most basic reduce just emits one 'data' event after it has recieved 'end'
 
- var Stream = require('stream').Stream
 
- //create an event stream and apply function to each .write
 
- //emitting each response as data
 
- //unless it's an empty callback
 
- module.exports = function (mapper) {
 
-   var stream = new Stream()
 
-     , inputs = 0
 
-     , outputs = 0
 
-     , ended = false
 
-     , paused = false
 
-     , destroyed = false
 
-     , lastWritten = 0
 
-     , inNext = false
 
-   // Items that are not ready to be written yet (because they would come out of
 
-   // order) get stuck in a queue for later.
 
-   var writeQueue = {}
 
-   stream.writable = true
 
-   stream.readable = true
 
-   function queueData (data, number) {
 
-     var nextToWrite = lastWritten + 1
 
-     if (number === nextToWrite) {
 
-       // If it's next, and its not undefined write it
 
-       if (data !== undefined) {
 
-         stream.emit.apply(stream, ['data', data])
 
-       }
 
-       lastWritten ++
 
-       nextToWrite ++
 
-     } else {
 
-       // Otherwise queue it for later.
 
-       writeQueue[number] = data
 
-     }
 
-     // If the next value is in the queue, write it
 
-     if (writeQueue.hasOwnProperty(nextToWrite)) {
 
-       var dataToWrite = writeQueue[nextToWrite]
 
-       delete writeQueue[nextToWrite]
 
-       return queueData(dataToWrite, nextToWrite)
 
-     }
 
-     outputs ++
 
-     if(inputs === outputs) {
 
-       if(paused) paused = false, stream.emit('drain') //written all the incoming events
 
-       if(ended) end()
 
-     }
 
-   }
 
-   function next (err, data, number) {
 
-     if(destroyed) return
 
-     inNext = true
 
-     if(err) {
 
-       return inNext = false, stream.emit.apply(stream, ['error', err])
 
-     }
 
-     queueData(data, number)
 
-     inNext = false
 
-   }
 
-   // Wrap the mapper function by calling its callback with the order number of
 
-   // the item in the stream.
 
-   function wrappedMapper (input, number, callback) {
 
-     return mapper.call(null, input, function(err, data){
 
-       callback(err, data, number)
 
-     })
 
-   }
 
-   stream.write = function (data) {
 
-     if(ended) throw new Error('map stream is not writable')
 
-     inNext = false
 
-     inputs ++
 
-     try {
 
-       //catch sync errors and handle them like async errors
 
-       var written = wrappedMapper(data, inputs, next)
 
-       paused = (written === false)
 
-       return !paused
 
-     } catch (err) {
 
-       //if the callback has been called syncronously, and the error
 
-       //has occured in an listener, throw it again.
 
-       if(inNext)
 
-         throw err
 
-       next(err)
 
-       return !paused
 
-     }
 
-   }
 
-   function end (data) {
 
-     //if end was called with args, write it, 
 
-     ended = true //write will emit 'end' if ended is true
 
-     stream.writable = false
 
-     if(data !== undefined) {
 
-       return queueData(data, inputs)
 
-     } else if (inputs == outputs) { //wait for processing 
 
-       stream.readable = false, stream.emit('end'), stream.destroy() 
 
-     }
 
-   }
 
-   stream.end = function (data) {
 
-     if(ended) return
 
-     end()
 
-   }
 
-   stream.destroy = function () {
 
-     ended = destroyed = true
 
-     stream.writable = stream.readable = paused = false
 
-     process.nextTick(function () {
 
-       stream.emit('close')
 
-     })
 
-   }
 
-   stream.pause = function () {
 
-     paused = true
 
-   }
 
-   stream.resume = function () {
 
-     paused = false
 
-   }
 
-   return stream
 
- }
 
 
  |