| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235 | var tape = require('tape')var through = require('through2')var pumpify = require('./')var stream = require('stream')var duplexify = require('duplexify')tape('basic', function(t) {  t.plan(3)  var pipeline = pumpify(    through(function(data, enc, cb) {      t.same(data.toString(), 'hello')      cb(null, data.toString().toUpperCase())    }),    through(function(data, enc, cb) {      t.same(data.toString(), 'HELLO')      cb(null, data.toString().toLowerCase())    })  )  pipeline.write('hello')  pipeline.on('data', function(data) {    t.same(data.toString(), 'hello')    t.end()  })})tape('3 times', function(t) {  t.plan(4)  var pipeline = pumpify(    through(function(data, enc, cb) {      t.same(data.toString(), 'hello')      cb(null, data.toString().toUpperCase())    }),    through(function(data, enc, cb) {      t.same(data.toString(), 'HELLO')      cb(null, data.toString().toLowerCase())    }),    through(function(data, enc, cb) {      t.same(data.toString(), 'hello')      cb(null, data.toString().toUpperCase())    })  )  pipeline.write('hello')  pipeline.on('data', function(data) {    t.same(data.toString(), 'HELLO')    t.end()  })})tape('destroy', function(t) {  var test = through()  test.destroy = function() {    t.ok(true)    t.end()  }  var pipeline = pumpify(through(), test)  pipeline.destroy()})tape('close', function(t) {  var test = through()  var pipeline = pumpify(through(), test)  pipeline.on('error', function(err) {    t.same(err.message, 'lol')    t.end()  })  test.emit('error', new Error('lol'))})tape('end waits for last one', function(t) {  var ran = false  var a = through()  var b = through()  var c = through(function(data, enc, cb) {    setTimeout(function() {      ran = true      cb()    }, 100)  })  var pipeline = pumpify(a, b, c)  pipeline.write('foo')  pipeline.end(function() {    t.ok(ran)    t.end()  })  t.ok(!ran)})tape('always wait for finish', function(t) {  var a = new stream.Readable()  a._read = function() {}  a.push('hello')  var pipeline = pumpify(a, through(), through())  var ran = false  pipeline.on('finish', function() {    t.ok(ran)    t.end()  })  setTimeout(function() {    ran = true    a.push(null)  }, 100)})tape('async', function(t) {  var pipeline = pumpify()  t.plan(4)  pipeline.write('hello')  pipeline.on('data', function(data) {    t.same(data.toString(), 'HELLO')    t.end()  })  setTimeout(function() {    pipeline.setPipeline(      through(function(data, enc, cb) {        t.same(data.toString(), 'hello')        cb(null, data.toString().toUpperCase())      }),      through(function(data, enc, cb) {        t.same(data.toString(), 'HELLO')        cb(null, data.toString().toLowerCase())      }),      through(function(data, enc, cb) {        t.same(data.toString(), 'hello')        cb(null, data.toString().toUpperCase())      })    )  }, 100)})tape('early destroy', function(t) {  var a = through()  var b = through()  var c = through()  b.destroy = function() {    t.ok(true)    t.end()  }  var pipeline = pumpify()  pipeline.destroy()  setTimeout(function() {    pipeline.setPipeline(a, b, c)  }, 100)})tape('preserves error', function (t) {  var a = through()  var b = through(function (data, enc, cb) {    cb(new Error('stop'))  })  var c = through()  var s = pumpify()  s.on('error', function (err) {    t.same(err.message, 'stop')    t.end()  })  s.setPipeline(a, b, c)  s.resume()  s.write('hi')})tape('preserves error again', function (t) {  var ws = new stream.Writable()  var rs = new stream.Readable({highWaterMark: 16})  ws._write = function (data, enc, cb) {    cb(null)  }  rs._read = function () {    process.nextTick(function () {      rs.push('hello world')    })  }  var pumpifyErr = pumpify(    through(),    through(function(chunk, _, cb) {      cb(new Error('test'))    }),    ws  )  rs.pipe(pumpifyErr)    .on('error', function (err) {      t.ok(err)      t.ok(err.message !== 'premature close', 'does not close with premature close')      t.end()    })})tape('returns error from duplexify', function (t) {  var a = through()  var b = duplexify()  var s = pumpify()  s.setPipeline(a, b)  s.on('error', function (err) {    t.same(err.message, 'stop')    t.end()  })  s.write('data')  // Test passes if `.end()` is not called  s.end()  b.setWritable(through())  setImmediate(function () {    b.destroy(new Error('stop'))  })})
 |