| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137 | //filter will reemit the data if cb(err,pass) pass is truthy// reduce is more tricky// maybe we want to group the reductions or emit progress updates occasionally// the most basic reduce just emits one 'data' event after it has recieved 'end'var Stream = require('stream').Stream//create an event stream and apply function to each .write//emitting each response as data//unless it's an empty callbackmodule.exports = function (mapper) {  var stream = new Stream()    , inputs = 0    , outputs = 0    , ended = false    , paused = false    , destroyed = false    , lastWritten = 0    , inNext = false  // Items that are not ready to be written yet (because they would come out of  // order) get stuck in a queue for later.  var writeQueue = {}  stream.writable = true  stream.readable = true  function queueData (data, number) {    var nextToWrite = lastWritten + 1    if (number === nextToWrite) {      // If it's next, and its not undefined write it      if (data !== undefined) {        stream.emit.apply(stream, ['data', data])      }      lastWritten ++      nextToWrite ++    } else {      // Otherwise queue it for later.      writeQueue[number] = data    }    // If the next value is in the queue, write it    if (writeQueue.hasOwnProperty(nextToWrite)) {      var dataToWrite = writeQueue[nextToWrite]      delete writeQueue[nextToWrite]      return queueData(dataToWrite, nextToWrite)    }    outputs ++    if(inputs === outputs) {      if(paused) paused = false, stream.emit('drain') //written all the incoming events      if(ended) end()    }  }  function next (err, data, number) {    if(destroyed) return    inNext = true    if(err) {      return inNext = false, stream.emit.apply(stream, ['error', err])    }    queueData(data, number)    inNext = false  }  // Wrap the mapper function by calling its callback with the order number of  // the item in the stream.  function wrappedMapper (input, number, callback) {    return mapper.call(null, input, function(err, data){      callback(err, data, number)    })  }  stream.write = function (data) {    if(ended) throw new Error('map stream is not writable')    inNext = false    inputs ++    try {      //catch sync errors and handle them like async errors      var written = wrappedMapper(data, inputs, next)      paused = (written === false)      return !paused    } catch (err) {      //if the callback has been called syncronously, and the error      //has occured in an listener, throw it again.      if(inNext)        throw err      next(err)      return !paused    }  }  function end (data) {    //if end was called with args, write it,     ended = true //write will emit 'end' if ended is true    stream.writable = false    if(data !== undefined) {      return queueData(data, inputs)    } else if (inputs == outputs) { //wait for processing       stream.readable = false, stream.emit('end'), stream.destroy()     }  }  stream.end = function (data) {    if(ended) return    end()  }  stream.destroy = function () {    ended = destroyed = true    stream.writable = stream.readable = paused = false    process.nextTick(function () {      stream.emit('close')    })  }  stream.pause = function () {    paused = true  }  stream.resume = function () {    paused = false  }  return stream}
 |