| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859 | var eos = require('end-of-stream')var shift = require('stream-shift')module.exports = eachfunction each (stream, fn, cb) {  var want = true  var error = null  var ended = false  var running = false  var calling = false  stream.on('readable', onreadable)  onreadable()  if (cb) eos(stream, {readable: true, writable: false}, done)  return stream  function done (err) {    if (!error) error = err    ended = true    if (!running) cb(error)  }  function onreadable () {    if (want) read()  }  function afterRead (err) {    running = false    if (err) {      error = err      if (ended) return cb(error)      stream.destroy(err)      return    }    if (ended) return cb(error)    if (!calling) read()  }  function read () {    while (!running && !ended) {      want = false      var data = shift(stream)      if (ended) return      if (data === null) {        want = true        return      }      running = true      calling = true      fn(data, afterRead)      calling = false    }  }}
 |