| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477 | // Copyright Joyent, Inc. and other Node contributors.//// Permission is hereby granted, free of charge, to any person obtaining a// copy of this software and associated documentation files (the// "Software"), to deal in the Software without restriction, including// without limitation the rights to use, copy, modify, merge, publish,// distribute, sublicense, and/or sell copies of the Software, and to permit// persons to whom the Software is furnished to do so, subject to the// following conditions://// The above copyright notice and this permission notice shall be included// in all copies or substantial portions of the Software.//// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE// USE OR OTHER DEALINGS IN THE SOFTWARE.// A bit simpler than readable streams.// Implement an async ._write(chunk, cb), and it'll handle all// the drain event emission and buffering.module.exports = Writable;/*<replacement>*/var Buffer = require('buffer').Buffer;/*</replacement>*/Writable.WritableState = WritableState;/*<replacement>*/var util = require('core-util-is');util.inherits = require('inherits');/*</replacement>*/var Stream = require('stream');util.inherits(Writable, Stream);function WriteReq(chunk, encoding, cb) {  this.chunk = chunk;  this.encoding = encoding;  this.callback = cb;}function WritableState(options, stream) {  var Duplex = require('./_stream_duplex');  options = options || {};  // the point at which write() starts returning false  // Note: 0 is a valid value, means that we always return false if  // the entire buffer is not flushed immediately on write()  var hwm = options.highWaterMark;  var defaultHwm = options.objectMode ? 16 : 16 * 1024;  this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;  // object stream flag to indicate whether or not this stream  // contains buffers or objects.  this.objectMode = !!options.objectMode;  if (stream instanceof Duplex)    this.objectMode = this.objectMode || !!options.writableObjectMode;  // cast to ints.  this.highWaterMark = ~~this.highWaterMark;  this.needDrain = false;  // at the start of calling end()  this.ending = false;  // when end() has been called, and returned  this.ended = false;  // when 'finish' is emitted  this.finished = false;  // should we decode strings into buffers before passing to _write?  // this is here so that some node-core streams can optimize string  // handling at a lower level.  var noDecode = options.decodeStrings === false;  this.decodeStrings = !noDecode;  // Crypto is kind of old and crusty.  Historically, its default string  // encoding is 'binary' so we have to make this configurable.  // Everything else in the universe uses 'utf8', though.  this.defaultEncoding = options.defaultEncoding || 'utf8';  // not an actual buffer we keep track of, but a measurement  // of how much we're waiting to get pushed to some underlying  // socket or file.  this.length = 0;  // a flag to see when we're in the middle of a write.  this.writing = false;  // when true all writes will be buffered until .uncork() call  this.corked = 0;  // a flag to be able to tell if the onwrite cb is called immediately,  // or on a later tick.  We set this to true at first, because any  // actions that shouldn't happen until "later" should generally also  // not happen before the first write call.  this.sync = true;  // a flag to know if we're processing previously buffered items, which  // may call the _write() callback in the same tick, so that we don't  // end up in an overlapped onwrite situation.  this.bufferProcessing = false;  // the callback that's passed to _write(chunk,cb)  this.onwrite = function(er) {    onwrite(stream, er);  };  // the callback that the user supplies to write(chunk,encoding,cb)  this.writecb = null;  // the amount that is being written when _write is called.  this.writelen = 0;  this.buffer = [];  // number of pending user-supplied write callbacks  // this must be 0 before 'finish' can be emitted  this.pendingcb = 0;  // emit prefinish if the only thing we're waiting for is _write cbs  // This is relevant for synchronous Transform streams  this.prefinished = false;  // True if the error was already emitted and should not be thrown again  this.errorEmitted = false;}function Writable(options) {  var Duplex = require('./_stream_duplex');  // Writable ctor is applied to Duplexes, though they're not  // instanceof Writable, they're instanceof Readable.  if (!(this instanceof Writable) && !(this instanceof Duplex))    return new Writable(options);  this._writableState = new WritableState(options, this);  // legacy.  this.writable = true;  Stream.call(this);}// Otherwise people can pipe Writable streams, which is just wrong.Writable.prototype.pipe = function() {  this.emit('error', new Error('Cannot pipe. Not readable.'));};function writeAfterEnd(stream, state, cb) {  var er = new Error('write after end');  // TODO: defer error events consistently everywhere, not just the cb  stream.emit('error', er);  process.nextTick(function() {    cb(er);  });}// If we get something that is not a buffer, string, null, or undefined,// and we're not in objectMode, then that's an error.// Otherwise stream chunks are all considered to be of length=1, and the// watermarks determine how many objects to keep in the buffer, rather than// how many bytes or characters.function validChunk(stream, state, chunk, cb) {  var valid = true;  if (!util.isBuffer(chunk) &&      !util.isString(chunk) &&      !util.isNullOrUndefined(chunk) &&      !state.objectMode) {    var er = new TypeError('Invalid non-string/buffer chunk');    stream.emit('error', er);    process.nextTick(function() {      cb(er);    });    valid = false;  }  return valid;}Writable.prototype.write = function(chunk, encoding, cb) {  var state = this._writableState;  var ret = false;  if (util.isFunction(encoding)) {    cb = encoding;    encoding = null;  }  if (util.isBuffer(chunk))    encoding = 'buffer';  else if (!encoding)    encoding = state.defaultEncoding;  if (!util.isFunction(cb))    cb = function() {};  if (state.ended)    writeAfterEnd(this, state, cb);  else if (validChunk(this, state, chunk, cb)) {    state.pendingcb++;    ret = writeOrBuffer(this, state, chunk, encoding, cb);  }  return ret;};Writable.prototype.cork = function() {  var state = this._writableState;  state.corked++;};Writable.prototype.uncork = function() {  var state = this._writableState;  if (state.corked) {    state.corked--;    if (!state.writing &&        !state.corked &&        !state.finished &&        !state.bufferProcessing &&        state.buffer.length)      clearBuffer(this, state);  }};function decodeChunk(state, chunk, encoding) {  if (!state.objectMode &&      state.decodeStrings !== false &&      util.isString(chunk)) {    chunk = new Buffer(chunk, encoding);  }  return chunk;}// if we're already writing something, then just put this// in the queue, and wait our turn.  Otherwise, call _write// If we return false, then we need a drain event, so set that flag.function writeOrBuffer(stream, state, chunk, encoding, cb) {  chunk = decodeChunk(state, chunk, encoding);  if (util.isBuffer(chunk))    encoding = 'buffer';  var len = state.objectMode ? 1 : chunk.length;  state.length += len;  var ret = state.length < state.highWaterMark;  // we must ensure that previous needDrain will not be reset to false.  if (!ret)    state.needDrain = true;  if (state.writing || state.corked)    state.buffer.push(new WriteReq(chunk, encoding, cb));  else    doWrite(stream, state, false, len, chunk, encoding, cb);  return ret;}function doWrite(stream, state, writev, len, chunk, encoding, cb) {  state.writelen = len;  state.writecb = cb;  state.writing = true;  state.sync = true;  if (writev)    stream._writev(chunk, state.onwrite);  else    stream._write(chunk, encoding, state.onwrite);  state.sync = false;}function onwriteError(stream, state, sync, er, cb) {  if (sync)    process.nextTick(function() {      state.pendingcb--;      cb(er);    });  else {    state.pendingcb--;    cb(er);  }  stream._writableState.errorEmitted = true;  stream.emit('error', er);}function onwriteStateUpdate(state) {  state.writing = false;  state.writecb = null;  state.length -= state.writelen;  state.writelen = 0;}function onwrite(stream, er) {  var state = stream._writableState;  var sync = state.sync;  var cb = state.writecb;  onwriteStateUpdate(state);  if (er)    onwriteError(stream, state, sync, er, cb);  else {    // Check if we're actually ready to finish, but don't emit yet    var finished = needFinish(stream, state);    if (!finished &&        !state.corked &&        !state.bufferProcessing &&        state.buffer.length) {      clearBuffer(stream, state);    }    if (sync) {      process.nextTick(function() {        afterWrite(stream, state, finished, cb);      });    } else {      afterWrite(stream, state, finished, cb);    }  }}function afterWrite(stream, state, finished, cb) {  if (!finished)    onwriteDrain(stream, state);  state.pendingcb--;  cb();  finishMaybe(stream, state);}// Must force callback to be called on nextTick, so that we don't// emit 'drain' before the write() consumer gets the 'false' return// value, and has a chance to attach a 'drain' listener.function onwriteDrain(stream, state) {  if (state.length === 0 && state.needDrain) {    state.needDrain = false;    stream.emit('drain');  }}// if there's something in the buffer waiting, then process itfunction clearBuffer(stream, state) {  state.bufferProcessing = true;  if (stream._writev && state.buffer.length > 1) {    // Fast case, write everything using _writev()    var cbs = [];    for (var c = 0; c < state.buffer.length; c++)      cbs.push(state.buffer[c].callback);    // count the one we are adding, as well.    // TODO(isaacs) clean this up    state.pendingcb++;    doWrite(stream, state, true, state.length, state.buffer, '', function(err) {      for (var i = 0; i < cbs.length; i++) {        state.pendingcb--;        cbs[i](err);      }    });    // Clear buffer    state.buffer = [];  } else {    // Slow case, write chunks one-by-one    for (var c = 0; c < state.buffer.length; c++) {      var entry = state.buffer[c];      var chunk = entry.chunk;      var encoding = entry.encoding;      var cb = entry.callback;      var len = state.objectMode ? 1 : chunk.length;      doWrite(stream, state, false, len, chunk, encoding, cb);      // if we didn't call the onwrite immediately, then      // it means that we need to wait until it does.      // also, that means that the chunk and cb are currently      // being processed, so move the buffer counter past them.      if (state.writing) {        c++;        break;      }    }    if (c < state.buffer.length)      state.buffer = state.buffer.slice(c);    else      state.buffer.length = 0;  }  state.bufferProcessing = false;}Writable.prototype._write = function(chunk, encoding, cb) {  cb(new Error('not implemented'));};Writable.prototype._writev = null;Writable.prototype.end = function(chunk, encoding, cb) {  var state = this._writableState;  if (util.isFunction(chunk)) {    cb = chunk;    chunk = null;    encoding = null;  } else if (util.isFunction(encoding)) {    cb = encoding;    encoding = null;  }  if (!util.isNullOrUndefined(chunk))    this.write(chunk, encoding);  // .end() fully uncorks  if (state.corked) {    state.corked = 1;    this.uncork();  }  // ignore unnecessary end() calls.  if (!state.ending && !state.finished)    endWritable(this, state, cb);};function needFinish(stream, state) {  return (state.ending &&          state.length === 0 &&          !state.finished &&          !state.writing);}function prefinish(stream, state) {  if (!state.prefinished) {    state.prefinished = true;    stream.emit('prefinish');  }}function finishMaybe(stream, state) {  var need = needFinish(stream, state);  if (need) {    if (state.pendingcb === 0) {      prefinish(stream, state);      state.finished = true;      stream.emit('finish');    } else      prefinish(stream, state);  }  return need;}function endWritable(stream, state, cb) {  state.ending = true;  finishMaybe(stream, state);  if (cb) {    if (state.finished)      process.nextTick(cb);    else      stream.once('finish', cb);  }  state.ended = true;}
 |