| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287 | 'use strict';var map = require('../')  , it = require('it-is')  , u = require('ubelt')  , spec = require('stream-spec')  , from = require('from')  , Stream = require('stream')  , es = require('event-stream')//REFACTOR THIS TEST TO USE es.readArray and es.writeArrayfunction writeArray(array, stream) {  array.forEach( function (j) {    stream.write(j)  })  stream.end()}function readStream(stream, done) {  var array = []   stream.on('data', function (data) {    array.push(data)  })  stream.on('error', done)  stream.on('end', function (data) {    done(null, array)  })} //call sink on each write,//and complete when finished.function pauseStream (prob, delay) {   var pauseIf = (      'number' == typeof prob     ? function () {        return Math.random() < prob      }     : 'function' == typeof prob     ? prob    : 0.1  )  var delayer = (       !delay     ? process.nextTick    : 'number' == typeof delay     ? function (next) { setTimeout(next, delay) }    : delay  )     return es.through(function (data) {        if(!this.paused && pauseIf()) {      console.log('PAUSE STREAM PAUSING')      this.pause()      var self = this      delayer(function () {        console.log('PAUSE STREAM RESUMING')        self.resume()      })    }    console.log("emit ('data', " + data + ')')    this.emit('data', data)   })}exports ['simple map applied to a stream'] = function (test) {  var input = [1,2,3,7,5,3,1,9,0,2,4,6]  //create event stream from  var doubler = map(function (data, cb) {    cb(null, data * 2)  })  spec(doubler).through().validateOnExit()  //a map is only a middle man, so it is both readable and writable    it(doubler).has({    readable: true,    writable: true,     })  readStream(doubler, function (err, output) {    it(output).deepEqual(input.map(function (j) {      return j * 2    }))//    process.nextTick(x.validate)    test.done()  })    writeArray(input, doubler)  }exports ['stream comes back in the correct order'] = function (test) {  var input = [3, 2, 1]  var delayer = map(function(data, cb){    setTimeout(function () {      cb(null, data)    }, 100 * data)  })  readStream(delayer, function (err, output) {    it(output).deepEqual(input)    test.done()  })  writeArray(input, delayer)}exports['pipe two maps together'] = function (test) {  var input = [1,2,3,7,5,3,1,9,0,2,4,6]  //create event stream from  function dd (data, cb) {    cb(null, data * 2)  }  var doubler1 = map(dd), doubler2 = map(dd)  doubler1.pipe(doubler2)    spec(doubler1).through().validateOnExit()  spec(doubler2).through().validateOnExit()  readStream(doubler2, function (err, output) {    it(output).deepEqual(input.map(function (j) {      return j * 4    }))    test.done()  })    writeArray(input, doubler1)}//next://// test pause, resume and drian.//// then make a pipe joiner://// plumber (evStr1, evStr2, evStr3, evStr4, evStr5)//// will return a single stream that write goes to the first exports ['map will not call end until the callback'] = function (test) {  var ticker = map(function (data, cb) {    process.nextTick(function () {      cb(null, data * 2)    })  })  spec(ticker).through().validateOnExit()  ticker.write('x')  ticker.end()   ticker.on('end', function () {    test.done()  })}exports ['emit error thrown'] = function (test) {  var err = new Error('INTENSIONAL ERROR')    , mapper =   map(function () {    throw err  })  mapper.on('error', function (_err) {    it(_err).equal(err)      test.done()  })  mapper.write('hello')}exports ['emit error calledback'] = function (test) {  var err = new Error('INTENSIONAL ERROR')    , mapper =   map(function (data, callback) {    callback(err)  })  mapper.on('error', function (_err) {    it(_err).equal(err)      test.done()  })  mapper.write('hello')}exports ['do not emit drain if not paused'] = function (test) {  var maps = map(function (data, callback) {    u.delay(callback)(null, 1)    return true  })    spec(maps).through().pausable().validateOnExit()  maps.on('drain', function () {    it(false).ok('should not emit drain unless the stream is paused')  })  it(maps.write('hello')).equal(true)  it(maps.write('hello')).equal(true)  it(maps.write('hello')).equal(true)  setTimeout(function () {maps.end()},10)  maps.on('end', test.done)}exports ['emits drain if paused, when all '] = function (test) {  var active = 0  var drained = false  var maps = map(function (data, callback) {    active ++    u.delay(function () {      active --      callback(null, 1)    })()    console.log('WRITE', false)    return false  })  spec(maps).through().validateOnExit()  maps.on('drain', function () {    drained = true    it(active).equal(0, 'should emit drain when all maps are done')  })  it(maps.write('hello')).equal(false)  it(maps.write('hello')).equal(false)  it(maps.write('hello')).equal(false)  process.nextTick(function () {maps.end()},10)  maps.on('end', function () {    console.log('end')    it(drained).ok('shoud have emitted drain before end')    test.done()   })}exports ['map applied to a stream with filtering'] = function (test) {  var input = [1,2,3,7,5,3,1,9,0,2,4,6]  var doubler = map(function (data, callback) {    if (data % 2)      callback(null, data * 2)    else      callback()  })    readStream(doubler, function (err, output) {    it(output).deepEqual(input.filter(function (j) {      return j % 2    }).map(function (j) {      return j * 2    }))    test.done()  })    spec(doubler).through().validateOnExit()  writeArray(input, doubler)  }
 |