| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209 | // write data to it, and it'll emit data in 512 byte blocks.// if you .end() or .flush(), it'll emit whatever it's got,// padded with nulls to 512 bytes.module.exports = BlockStreamvar Stream = require("stream").Stream  , inherits = require("inherits")  , assert = require("assert").ok  , debug = process.env.DEBUG ? console.error : function () {}function BlockStream (size, opt) {  this.writable = this.readable = true  this._opt = opt || {}  this._chunkSize = size || 512  this._offset = 0  this._buffer = []  this._bufferLength = 0  if (this._opt.nopad) this._zeroes = false  else {    this._zeroes = new Buffer(this._chunkSize)    for (var i = 0; i < this._chunkSize; i ++) {      this._zeroes[i] = 0    }  }}inherits(BlockStream, Stream)BlockStream.prototype.write = function (c) {  // debug("   BS write", c)  if (this._ended) throw new Error("BlockStream: write after end")  if (c && !Buffer.isBuffer(c)) c = new Buffer(c + "")  if (c.length) {    this._buffer.push(c)    this._bufferLength += c.length  }  // debug("pushed onto buffer", this._bufferLength)  if (this._bufferLength >= this._chunkSize) {    if (this._paused) {      // debug("   BS paused, return false, need drain")      this._needDrain = true      return false    }    this._emitChunk()  }  return true}BlockStream.prototype.pause = function () {  // debug("   BS pausing")  this._paused = true}BlockStream.prototype.resume = function () {  // debug("   BS resume")  this._paused = false  return this._emitChunk()}BlockStream.prototype.end = function (chunk) {  // debug("end", chunk)  if (typeof chunk === "function") cb = chunk, chunk = null  if (chunk) this.write(chunk)  this._ended = true  this.flush()}BlockStream.prototype.flush = function () {  this._emitChunk(true)}BlockStream.prototype._emitChunk = function (flush) {  // debug("emitChunk flush=%j emitting=%j paused=%j", flush, this._emitting, this._paused)  // emit a <chunkSize> chunk  if (flush && this._zeroes) {    // debug("    BS push zeroes", this._bufferLength)    // push a chunk of zeroes    var padBytes = (this._bufferLength % this._chunkSize)    if (padBytes !== 0) padBytes = this._chunkSize - padBytes    if (padBytes > 0) {      // debug("padBytes", padBytes, this._zeroes.slice(0, padBytes))      this._buffer.push(this._zeroes.slice(0, padBytes))      this._bufferLength += padBytes      // debug(this._buffer[this._buffer.length - 1].length, this._bufferLength)    }  }  if (this._emitting || this._paused) return  this._emitting = true  // debug("    BS entering loops")  var bufferIndex = 0  while (this._bufferLength >= this._chunkSize &&         (flush || !this._paused)) {    // debug("     BS data emission loop", this._bufferLength)    var out      , outOffset = 0      , outHas = this._chunkSize    while (outHas > 0 && (flush || !this._paused) ) {      // debug("    BS data inner emit loop", this._bufferLength)      var cur = this._buffer[bufferIndex]        , curHas = cur.length - this._offset      // debug("cur=", cur)      // debug("curHas=%j", curHas)      // If it's not big enough to fill the whole thing, then we'll need      // to copy multiple buffers into one.  However, if it is big enough,      // then just slice out the part we want, to save unnecessary copying.      // Also, need to copy if we've already done some copying, since buffers      // can't be joined like cons strings.      if (out || curHas < outHas) {        out = out || new Buffer(this._chunkSize)        cur.copy(out, outOffset,                 this._offset, this._offset + Math.min(curHas, outHas))      } else if (cur.length === outHas && this._offset === 0) {        // shortcut -- cur is exactly long enough, and no offset.        out = cur      } else {        // slice out the piece of cur that we need.        out = cur.slice(this._offset, this._offset + outHas)      }      if (curHas > outHas) {        // means that the current buffer couldn't be completely output        // update this._offset to reflect how much WAS written        this._offset += outHas        outHas = 0      } else {        // output the entire current chunk.        // toss it away        outHas -= curHas        outOffset += curHas        bufferIndex ++        this._offset = 0      }    }    this._bufferLength -= this._chunkSize    assert(out.length === this._chunkSize)    // debug("emitting data", out)    // debug("   BS emitting, paused=%j", this._paused, this._bufferLength)    this.emit("data", out)    out = null  }  // debug("    BS out of loops", this._bufferLength)  // whatever is left, it's not enough to fill up a block, or we're paused  this._buffer = this._buffer.slice(bufferIndex)  if (this._paused) {    // debug("    BS paused, leaving", this._bufferLength)    this._needsDrain = true    this._emitting = false    return  }  // if flushing, and not using null-padding, then need to emit the last  // chunk(s) sitting in the queue.  We know that it's not enough to  // fill up a whole block, because otherwise it would have been emitted  // above, but there may be some offset.  var l = this._buffer.length  if (flush && !this._zeroes && l) {    if (l === 1) {      if (this._offset) {        this.emit("data", this._buffer[0].slice(this._offset))      } else {        this.emit("data", this._buffer[0])      }    } else {      var outHas = this._bufferLength        , out = new Buffer(outHas)        , outOffset = 0      for (var i = 0; i < l; i ++) {        var cur = this._buffer[i]          , curHas = cur.length - this._offset        cur.copy(out, outOffset, this._offset)        this._offset = 0        outOffset += curHas        this._bufferLength -= curHas      }      this.emit("data", out)    }    // truncate    this._buffer.length = 0    this._bufferLength = 0    this._offset = 0  }  // now either drained or ended  // debug("either draining, or ended", this._bufferLength, this._ended)  // means that we've flushed out all that we can so far.  if (this._needDrain) {    // debug("emitting drain", this._bufferLength)    this._needDrain = false    this.emit("drain")  }  if ((this._bufferLength === 0) && this._ended && !this._endEmitted) {    // debug("emitting end", this._bufferLength)    this._endEmitted = true    this.emit("end")  }  this._emitting = false  // debug("    BS no longer emitting", flush, this._paused, this._emitting, this._bufferLength, this._chunkSize)}
 |