| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951 | // Copyright Joyent, Inc. and other Node contributors.//// Permission is hereby granted, free of charge, to any person obtaining a// copy of this software and associated documentation files (the// "Software"), to deal in the Software without restriction, including// without limitation the rights to use, copy, modify, merge, publish,// distribute, sublicense, and/or sell copies of the Software, and to permit// persons to whom the Software is furnished to do so, subject to the// following conditions://// The above copyright notice and this permission notice shall be included// in all copies or substantial portions of the Software.//// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE// USE OR OTHER DEALINGS IN THE SOFTWARE.module.exports = Readable;/*<replacement>*/var isArray = require('isarray');/*</replacement>*//*<replacement>*/var Buffer = require('buffer').Buffer;/*</replacement>*/Readable.ReadableState = ReadableState;var EE = require('events').EventEmitter;/*<replacement>*/if (!EE.listenerCount) EE.listenerCount = function(emitter, type) {  return emitter.listeners(type).length;};/*</replacement>*/var Stream = require('stream');/*<replacement>*/var util = require('core-util-is');util.inherits = require('inherits');/*</replacement>*/var StringDecoder;/*<replacement>*/var debug = require('util');if (debug && debug.debuglog) {  debug = debug.debuglog('stream');} else {  debug = function () {};}/*</replacement>*/util.inherits(Readable, Stream);function ReadableState(options, stream) {  var Duplex = require('./_stream_duplex');  options = options || {};  // the point at which it stops calling _read() to fill the buffer  // Note: 0 is a valid value, means "don't call _read preemptively ever"  var hwm = options.highWaterMark;  var defaultHwm = options.objectMode ? 16 : 16 * 1024;  this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;  // cast to ints.  this.highWaterMark = ~~this.highWaterMark;  this.buffer = [];  this.length = 0;  this.pipes = null;  this.pipesCount = 0;  this.flowing = null;  this.ended = false;  this.endEmitted = false;  this.reading = false;  // a flag to be able to tell if the onwrite cb is called immediately,  // or on a later tick.  We set this to true at first, because any  // actions that shouldn't happen until "later" should generally also  // not happen before the first write call.  this.sync = true;  // whenever we return null, then we set a flag to say  // that we're awaiting a 'readable' event emission.  this.needReadable = false;  this.emittedReadable = false;  this.readableListening = false;  // object stream flag. Used to make read(n) ignore n and to  // make all the buffer merging and length checks go away  this.objectMode = !!options.objectMode;  if (stream instanceof Duplex)    this.objectMode = this.objectMode || !!options.readableObjectMode;  // Crypto is kind of old and crusty.  Historically, its default string  // encoding is 'binary' so we have to make this configurable.  // Everything else in the universe uses 'utf8', though.  this.defaultEncoding = options.defaultEncoding || 'utf8';  // when piping, we only care about 'readable' events that happen  // after read()ing all the bytes and not getting any pushback.  this.ranOut = false;  // the number of writers that are awaiting a drain event in .pipe()s  this.awaitDrain = 0;  // if true, a maybeReadMore has been scheduled  this.readingMore = false;  this.decoder = null;  this.encoding = null;  if (options.encoding) {    if (!StringDecoder)      StringDecoder = require('string_decoder/').StringDecoder;    this.decoder = new StringDecoder(options.encoding);    this.encoding = options.encoding;  }}function Readable(options) {  var Duplex = require('./_stream_duplex');  if (!(this instanceof Readable))    return new Readable(options);  this._readableState = new ReadableState(options, this);  // legacy  this.readable = true;  Stream.call(this);}// Manually shove something into the read() buffer.// This returns true if the highWaterMark has not been hit yet,// similar to how Writable.write() returns true if you should// write() some more.Readable.prototype.push = function(chunk, encoding) {  var state = this._readableState;  if (util.isString(chunk) && !state.objectMode) {    encoding = encoding || state.defaultEncoding;    if (encoding !== state.encoding) {      chunk = new Buffer(chunk, encoding);      encoding = '';    }  }  return readableAddChunk(this, state, chunk, encoding, false);};// Unshift should *always* be something directly out of read()Readable.prototype.unshift = function(chunk) {  var state = this._readableState;  return readableAddChunk(this, state, chunk, '', true);};function readableAddChunk(stream, state, chunk, encoding, addToFront) {  var er = chunkInvalid(state, chunk);  if (er) {    stream.emit('error', er);  } else if (util.isNullOrUndefined(chunk)) {    state.reading = false;    if (!state.ended)      onEofChunk(stream, state);  } else if (state.objectMode || chunk && chunk.length > 0) {    if (state.ended && !addToFront) {      var e = new Error('stream.push() after EOF');      stream.emit('error', e);    } else if (state.endEmitted && addToFront) {      var e = new Error('stream.unshift() after end event');      stream.emit('error', e);    } else {      if (state.decoder && !addToFront && !encoding)        chunk = state.decoder.write(chunk);      if (!addToFront)        state.reading = false;      // if we want the data now, just emit it.      if (state.flowing && state.length === 0 && !state.sync) {        stream.emit('data', chunk);        stream.read(0);      } else {        // update the buffer info.        state.length += state.objectMode ? 1 : chunk.length;        if (addToFront)          state.buffer.unshift(chunk);        else          state.buffer.push(chunk);        if (state.needReadable)          emitReadable(stream);      }      maybeReadMore(stream, state);    }  } else if (!addToFront) {    state.reading = false;  }  return needMoreData(state);}// if it's past the high water mark, we can push in some more.// Also, if we have no data yet, we can stand some// more bytes.  This is to work around cases where hwm=0,// such as the repl.  Also, if the push() triggered a// readable event, and the user called read(largeNumber) such that// needReadable was set, then we ought to push more, so that another// 'readable' event will be triggered.function needMoreData(state) {  return !state.ended &&         (state.needReadable ||          state.length < state.highWaterMark ||          state.length === 0);}// backwards compatibility.Readable.prototype.setEncoding = function(enc) {  if (!StringDecoder)    StringDecoder = require('string_decoder/').StringDecoder;  this._readableState.decoder = new StringDecoder(enc);  this._readableState.encoding = enc;  return this;};// Don't raise the hwm > 128MBvar MAX_HWM = 0x800000;function roundUpToNextPowerOf2(n) {  if (n >= MAX_HWM) {    n = MAX_HWM;  } else {    // Get the next highest power of 2    n--;    for (var p = 1; p < 32; p <<= 1) n |= n >> p;    n++;  }  return n;}function howMuchToRead(n, state) {  if (state.length === 0 && state.ended)    return 0;  if (state.objectMode)    return n === 0 ? 0 : 1;  if (isNaN(n) || util.isNull(n)) {    // only flow one buffer at a time    if (state.flowing && state.buffer.length)      return state.buffer[0].length;    else      return state.length;  }  if (n <= 0)    return 0;  // If we're asking for more than the target buffer level,  // then raise the water mark.  Bump up to the next highest  // power of 2, to prevent increasing it excessively in tiny  // amounts.  if (n > state.highWaterMark)    state.highWaterMark = roundUpToNextPowerOf2(n);  // don't have that much.  return null, unless we've ended.  if (n > state.length) {    if (!state.ended) {      state.needReadable = true;      return 0;    } else      return state.length;  }  return n;}// you can override either this method, or the async _read(n) below.Readable.prototype.read = function(n) {  debug('read', n);  var state = this._readableState;  var nOrig = n;  if (!util.isNumber(n) || n > 0)    state.emittedReadable = false;  // if we're doing read(0) to trigger a readable event, but we  // already have a bunch of data in the buffer, then just trigger  // the 'readable' event and move on.  if (n === 0 &&      state.needReadable &&      (state.length >= state.highWaterMark || state.ended)) {    debug('read: emitReadable', state.length, state.ended);    if (state.length === 0 && state.ended)      endReadable(this);    else      emitReadable(this);    return null;  }  n = howMuchToRead(n, state);  // if we've ended, and we're now clear, then finish it up.  if (n === 0 && state.ended) {    if (state.length === 0)      endReadable(this);    return null;  }  // All the actual chunk generation logic needs to be  // *below* the call to _read.  The reason is that in certain  // synthetic stream cases, such as passthrough streams, _read  // may be a completely synchronous operation which may change  // the state of the read buffer, providing enough data when  // before there was *not* enough.  //  // So, the steps are:  // 1. Figure out what the state of things will be after we do  // a read from the buffer.  //  // 2. If that resulting state will trigger a _read, then call _read.  // Note that this may be asynchronous, or synchronous.  Yes, it is  // deeply ugly to write APIs this way, but that still doesn't mean  // that the Readable class should behave improperly, as streams are  // designed to be sync/async agnostic.  // Take note if the _read call is sync or async (ie, if the read call  // has returned yet), so that we know whether or not it's safe to emit  // 'readable' etc.  //  // 3. Actually pull the requested chunks out of the buffer and return.  // if we need a readable event, then we need to do some reading.  var doRead = state.needReadable;  debug('need readable', doRead);  // if we currently have less than the highWaterMark, then also read some  if (state.length === 0 || state.length - n < state.highWaterMark) {    doRead = true;    debug('length less than watermark', doRead);  }  // however, if we've ended, then there's no point, and if we're already  // reading, then it's unnecessary.  if (state.ended || state.reading) {    doRead = false;    debug('reading or ended', doRead);  }  if (doRead) {    debug('do read');    state.reading = true;    state.sync = true;    // if the length is currently zero, then we *need* a readable event.    if (state.length === 0)      state.needReadable = true;    // call internal read method    this._read(state.highWaterMark);    state.sync = false;  }  // If _read pushed data synchronously, then `reading` will be false,  // and we need to re-evaluate how much data we can return to the user.  if (doRead && !state.reading)    n = howMuchToRead(nOrig, state);  var ret;  if (n > 0)    ret = fromList(n, state);  else    ret = null;  if (util.isNull(ret)) {    state.needReadable = true;    n = 0;  }  state.length -= n;  // If we have nothing in the buffer, then we want to know  // as soon as we *do* get something into the buffer.  if (state.length === 0 && !state.ended)    state.needReadable = true;  // If we tried to read() past the EOF, then emit end on the next tick.  if (nOrig !== n && state.ended && state.length === 0)    endReadable(this);  if (!util.isNull(ret))    this.emit('data', ret);  return ret;};function chunkInvalid(state, chunk) {  var er = null;  if (!util.isBuffer(chunk) &&      !util.isString(chunk) &&      !util.isNullOrUndefined(chunk) &&      !state.objectMode) {    er = new TypeError('Invalid non-string/buffer chunk');  }  return er;}function onEofChunk(stream, state) {  if (state.decoder && !state.ended) {    var chunk = state.decoder.end();    if (chunk && chunk.length) {      state.buffer.push(chunk);      state.length += state.objectMode ? 1 : chunk.length;    }  }  state.ended = true;  // emit 'readable' now to make sure it gets picked up.  emitReadable(stream);}// Don't emit readable right away in sync mode, because this can trigger// another read() call => stack overflow.  This way, it might trigger// a nextTick recursion warning, but that's not so bad.function emitReadable(stream) {  var state = stream._readableState;  state.needReadable = false;  if (!state.emittedReadable) {    debug('emitReadable', state.flowing);    state.emittedReadable = true;    if (state.sync)      process.nextTick(function() {        emitReadable_(stream);      });    else      emitReadable_(stream);  }}function emitReadable_(stream) {  debug('emit readable');  stream.emit('readable');  flow(stream);}// at this point, the user has presumably seen the 'readable' event,// and called read() to consume some data.  that may have triggered// in turn another _read(n) call, in which case reading = true if// it's in progress.// However, if we're not ended, or reading, and the length < hwm,// then go ahead and try to read some more preemptively.function maybeReadMore(stream, state) {  if (!state.readingMore) {    state.readingMore = true;    process.nextTick(function() {      maybeReadMore_(stream, state);    });  }}function maybeReadMore_(stream, state) {  var len = state.length;  while (!state.reading && !state.flowing && !state.ended &&         state.length < state.highWaterMark) {    debug('maybeReadMore read 0');    stream.read(0);    if (len === state.length)      // didn't get any data, stop spinning.      break;    else      len = state.length;  }  state.readingMore = false;}// abstract method.  to be overridden in specific implementation classes.// call cb(er, data) where data is <= n in length.// for virtual (non-string, non-buffer) streams, "length" is somewhat// arbitrary, and perhaps not very meaningful.Readable.prototype._read = function(n) {  this.emit('error', new Error('not implemented'));};Readable.prototype.pipe = function(dest, pipeOpts) {  var src = this;  var state = this._readableState;  switch (state.pipesCount) {    case 0:      state.pipes = dest;      break;    case 1:      state.pipes = [state.pipes, dest];      break;    default:      state.pipes.push(dest);      break;  }  state.pipesCount += 1;  debug('pipe count=%d opts=%j', state.pipesCount, pipeOpts);  var doEnd = (!pipeOpts || pipeOpts.end !== false) &&              dest !== process.stdout &&              dest !== process.stderr;  var endFn = doEnd ? onend : cleanup;  if (state.endEmitted)    process.nextTick(endFn);  else    src.once('end', endFn);  dest.on('unpipe', onunpipe);  function onunpipe(readable) {    debug('onunpipe');    if (readable === src) {      cleanup();    }  }  function onend() {    debug('onend');    dest.end();  }  // when the dest drains, it reduces the awaitDrain counter  // on the source.  This would be more elegant with a .once()  // handler in flow(), but adding and removing repeatedly is  // too slow.  var ondrain = pipeOnDrain(src);  dest.on('drain', ondrain);  function cleanup() {    debug('cleanup');    // cleanup event handlers once the pipe is broken    dest.removeListener('close', onclose);    dest.removeListener('finish', onfinish);    dest.removeListener('drain', ondrain);    dest.removeListener('error', onerror);    dest.removeListener('unpipe', onunpipe);    src.removeListener('end', onend);    src.removeListener('end', cleanup);    src.removeListener('data', ondata);    // if the reader is waiting for a drain event from this    // specific writer, then it would cause it to never start    // flowing again.    // So, if this is awaiting a drain, then we just call it now.    // If we don't know, then assume that we are waiting for one.    if (state.awaitDrain &&        (!dest._writableState || dest._writableState.needDrain))      ondrain();  }  src.on('data', ondata);  function ondata(chunk) {    debug('ondata');    var ret = dest.write(chunk);    if (false === ret) {      debug('false write response, pause',            src._readableState.awaitDrain);      src._readableState.awaitDrain++;      src.pause();    }  }  // if the dest has an error, then stop piping into it.  // however, don't suppress the throwing behavior for this.  function onerror(er) {    debug('onerror', er);    unpipe();    dest.removeListener('error', onerror);    if (EE.listenerCount(dest, 'error') === 0)      dest.emit('error', er);  }  // This is a brutally ugly hack to make sure that our error handler  // is attached before any userland ones.  NEVER DO THIS.  if (!dest._events || !dest._events.error)    dest.on('error', onerror);  else if (isArray(dest._events.error))    dest._events.error.unshift(onerror);  else    dest._events.error = [onerror, dest._events.error];  // Both close and finish should trigger unpipe, but only once.  function onclose() {    dest.removeListener('finish', onfinish);    unpipe();  }  dest.once('close', onclose);  function onfinish() {    debug('onfinish');    dest.removeListener('close', onclose);    unpipe();  }  dest.once('finish', onfinish);  function unpipe() {    debug('unpipe');    src.unpipe(dest);  }  // tell the dest that it's being piped to  dest.emit('pipe', src);  // start the flow if it hasn't been started already.  if (!state.flowing) {    debug('pipe resume');    src.resume();  }  return dest;};function pipeOnDrain(src) {  return function() {    var state = src._readableState;    debug('pipeOnDrain', state.awaitDrain);    if (state.awaitDrain)      state.awaitDrain--;    if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {      state.flowing = true;      flow(src);    }  };}Readable.prototype.unpipe = function(dest) {  var state = this._readableState;  // if we're not piping anywhere, then do nothing.  if (state.pipesCount === 0)    return this;  // just one destination.  most common case.  if (state.pipesCount === 1) {    // passed in one, but it's not the right one.    if (dest && dest !== state.pipes)      return this;    if (!dest)      dest = state.pipes;    // got a match.    state.pipes = null;    state.pipesCount = 0;    state.flowing = false;    if (dest)      dest.emit('unpipe', this);    return this;  }  // slow case. multiple pipe destinations.  if (!dest) {    // remove all.    var dests = state.pipes;    var len = state.pipesCount;    state.pipes = null;    state.pipesCount = 0;    state.flowing = false;    for (var i = 0; i < len; i++)      dests[i].emit('unpipe', this);    return this;  }  // try to find the right one.  var i = indexOf(state.pipes, dest);  if (i === -1)    return this;  state.pipes.splice(i, 1);  state.pipesCount -= 1;  if (state.pipesCount === 1)    state.pipes = state.pipes[0];  dest.emit('unpipe', this);  return this;};// set up data events if they are asked for// Ensure readable listeners eventually get somethingReadable.prototype.on = function(ev, fn) {  var res = Stream.prototype.on.call(this, ev, fn);  // If listening to data, and it has not explicitly been paused,  // then call resume to start the flow of data on the next tick.  if (ev === 'data' && false !== this._readableState.flowing) {    this.resume();  }  if (ev === 'readable' && this.readable) {    var state = this._readableState;    if (!state.readableListening) {      state.readableListening = true;      state.emittedReadable = false;      state.needReadable = true;      if (!state.reading) {        var self = this;        process.nextTick(function() {          debug('readable nexttick read 0');          self.read(0);        });      } else if (state.length) {        emitReadable(this, state);      }    }  }  return res;};Readable.prototype.addListener = Readable.prototype.on;// pause() and resume() are remnants of the legacy readable stream API// If the user uses them, then switch into old mode.Readable.prototype.resume = function() {  var state = this._readableState;  if (!state.flowing) {    debug('resume');    state.flowing = true;    if (!state.reading) {      debug('resume read 0');      this.read(0);    }    resume(this, state);  }  return this;};function resume(stream, state) {  if (!state.resumeScheduled) {    state.resumeScheduled = true;    process.nextTick(function() {      resume_(stream, state);    });  }}function resume_(stream, state) {  state.resumeScheduled = false;  stream.emit('resume');  flow(stream);  if (state.flowing && !state.reading)    stream.read(0);}Readable.prototype.pause = function() {  debug('call pause flowing=%j', this._readableState.flowing);  if (false !== this._readableState.flowing) {    debug('pause');    this._readableState.flowing = false;    this.emit('pause');  }  return this;};function flow(stream) {  var state = stream._readableState;  debug('flow', state.flowing);  if (state.flowing) {    do {      var chunk = stream.read();    } while (null !== chunk && state.flowing);  }}// wrap an old-style stream as the async data source.// This is *not* part of the readable stream interface.// It is an ugly unfortunate mess of history.Readable.prototype.wrap = function(stream) {  var state = this._readableState;  var paused = false;  var self = this;  stream.on('end', function() {    debug('wrapped end');    if (state.decoder && !state.ended) {      var chunk = state.decoder.end();      if (chunk && chunk.length)        self.push(chunk);    }    self.push(null);  });  stream.on('data', function(chunk) {    debug('wrapped data');    if (state.decoder)      chunk = state.decoder.write(chunk);    if (!chunk || !state.objectMode && !chunk.length)      return;    var ret = self.push(chunk);    if (!ret) {      paused = true;      stream.pause();    }  });  // proxy all the other methods.  // important when wrapping filters and duplexes.  for (var i in stream) {    if (util.isFunction(stream[i]) && util.isUndefined(this[i])) {      this[i] = function(method) { return function() {        return stream[method].apply(stream, arguments);      }}(i);    }  }  // proxy certain important events.  var events = ['error', 'close', 'destroy', 'pause', 'resume'];  forEach(events, function(ev) {    stream.on(ev, self.emit.bind(self, ev));  });  // when we try to consume some more bytes, simply unpause the  // underlying stream.  self._read = function(n) {    debug('wrapped _read', n);    if (paused) {      paused = false;      stream.resume();    }  };  return self;};// exposed for testing purposes only.Readable._fromList = fromList;// Pluck off n bytes from an array of buffers.// Length is the combined lengths of all the buffers in the list.function fromList(n, state) {  var list = state.buffer;  var length = state.length;  var stringMode = !!state.decoder;  var objectMode = !!state.objectMode;  var ret;  // nothing in the list, definitely empty.  if (list.length === 0)    return null;  if (length === 0)    ret = null;  else if (objectMode)    ret = list.shift();  else if (!n || n >= length) {    // read it all, truncate the array.    if (stringMode)      ret = list.join('');    else      ret = Buffer.concat(list, length);    list.length = 0;  } else {    // read just some of it.    if (n < list[0].length) {      // just take a part of the first list item.      // slice is the same for buffers and strings.      var buf = list[0];      ret = buf.slice(0, n);      list[0] = buf.slice(n);    } else if (n === list[0].length) {      // first list is a perfect match      ret = list.shift();    } else {      // complex case.      // we have enough to cover it, but it spans past the first buffer.      if (stringMode)        ret = '';      else        ret = new Buffer(n);      var c = 0;      for (var i = 0, l = list.length; i < l && c < n; i++) {        var buf = list[0];        var cpy = Math.min(n - c, buf.length);        if (stringMode)          ret += buf.slice(0, cpy);        else          buf.copy(ret, c, 0, cpy);        if (cpy < buf.length)          list[0] = buf.slice(cpy);        else          list.shift();        c += cpy;      }    }  }  return ret;}function endReadable(stream) {  var state = stream._readableState;  // If we get here before consuming all the bytes, then that is a  // bug in node.  Should never happen.  if (state.length > 0)    throw new Error('endReadable called on non-empty stream');  if (!state.endEmitted) {    state.ended = true;    process.nextTick(function() {      // Check that we didn't get one last unshift.      if (!state.endEmitted && state.length === 0) {        state.endEmitted = true;        stream.readable = false;        stream.emit('end');      }    });  }}function forEach (xs, f) {  for (var i = 0, l = xs.length; i < l; i++) {    f(xs[i], i);  }}function indexOf (xs, x) {  for (var i = 0, l = xs.length; i < l; i++) {    if (xs[i] === x) return i;  }  return -1;}
 |