| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287 | 
							- 'use strict';
 
- var map = require('../')
 
-   , it = require('it-is')
 
-   , u = require('ubelt')
 
-   , spec = require('stream-spec')
 
-   , from = require('from')
 
-   , Stream = require('stream')
 
-   , es = require('event-stream')
 
- //REFACTOR THIS TEST TO USE es.readArray and es.writeArray
 
- function writeArray(array, stream) {
 
-   array.forEach( function (j) {
 
-     stream.write(j)
 
-   })
 
-   stream.end()
 
- }
 
- function readStream(stream, done) {
 
-   var array = [] 
 
-   stream.on('data', function (data) {
 
-     array.push(data)
 
-   })
 
-   stream.on('error', done)
 
-   stream.on('end', function (data) {
 
-     done(null, array)
 
-   })
 
- } 
 
- //call sink on each write,
 
- //and complete when finished.
 
- function pauseStream (prob, delay) { 
 
-   var pauseIf = (
 
-       'number' == typeof prob 
 
-     ? function () {
 
-         return Math.random() < prob
 
-       } 
 
-     : 'function' == typeof prob 
 
-     ? prob
 
-     : 0.1
 
-   )
 
-   var delayer = ( 
 
-       !delay 
 
-     ? process.nextTick
 
-     : 'number' == typeof delay 
 
-     ? function (next) { setTimeout(next, delay) }
 
-     : delay
 
-   )   
 
-   return es.through(function (data) {    
 
-     if(!this.paused && pauseIf()) {
 
-       console.log('PAUSE STREAM PAUSING')
 
-       this.pause()
 
-       var self = this
 
-       delayer(function () {
 
-         console.log('PAUSE STREAM RESUMING')
 
-         self.resume()
 
-       })
 
-     }
 
-     console.log("emit ('data', " + data + ')')
 
-     this.emit('data', data) 
 
-   })
 
- }
 
- exports ['simple map applied to a stream'] = function (test) {
 
-   var input = [1,2,3,7,5,3,1,9,0,2,4,6]
 
-   //create event stream from
 
-   var doubler = map(function (data, cb) {
 
-     cb(null, data * 2)
 
-   })
 
-   spec(doubler).through().validateOnExit()
 
-   //a map is only a middle man, so it is both readable and writable
 
-   
 
-   it(doubler).has({
 
-     readable: true,
 
-     writable: true,   
 
-   })
 
-   readStream(doubler, function (err, output) {
 
-     it(output).deepEqual(input.map(function (j) {
 
-       return j * 2
 
-     }))
 
- //    process.nextTick(x.validate)
 
-     test.done()
 
-   })
 
-   
 
-   writeArray(input, doubler)
 
-   
 
- }
 
- exports ['stream comes back in the correct order'] = function (test) {
 
-   var input = [3, 2, 1]
 
-   var delayer = map(function(data, cb){
 
-     setTimeout(function () {
 
-       cb(null, data)
 
-     }, 100 * data)
 
-   })
 
-   readStream(delayer, function (err, output) {
 
-     it(output).deepEqual(input)
 
-     test.done()
 
-   })
 
-   writeArray(input, delayer)
 
- }
 
- exports['pipe two maps together'] = function (test) {
 
-   var input = [1,2,3,7,5,3,1,9,0,2,4,6]
 
-   //create event stream from
 
-   function dd (data, cb) {
 
-     cb(null, data * 2)
 
-   }
 
-   var doubler1 = map(dd), doubler2 = map(dd)
 
-   doubler1.pipe(doubler2)
 
-   
 
-   spec(doubler1).through().validateOnExit()
 
-   spec(doubler2).through().validateOnExit()
 
-   readStream(doubler2, function (err, output) {
 
-     it(output).deepEqual(input.map(function (j) {
 
-       return j * 4
 
-     }))
 
-     test.done()
 
-   })
 
-   
 
-   writeArray(input, doubler1)
 
- }
 
- //next:
 
- //
 
- // test pause, resume and drian.
 
- //
 
- // then make a pipe joiner:
 
- //
 
- // plumber (evStr1, evStr2, evStr3, evStr4, evStr5)
 
- //
 
- // will return a single stream that write goes to the first 
 
- exports ['map will not call end until the callback'] = function (test) {
 
-   var ticker = map(function (data, cb) {
 
-     process.nextTick(function () {
 
-       cb(null, data * 2)
 
-     })
 
-   })
 
-   spec(ticker).through().validateOnExit()
 
-   ticker.write('x')
 
-   ticker.end() 
 
-   ticker.on('end', function () {
 
-     test.done()
 
-   })
 
- }
 
- exports ['emit error thrown'] = function (test) {
 
-   var err = new Error('INTENSIONAL ERROR')
 
-     , mapper = 
 
-   map(function () {
 
-     throw err
 
-   })
 
-   mapper.on('error', function (_err) {
 
-     it(_err).equal(err)  
 
-     test.done()
 
-   })
 
-   mapper.write('hello')
 
- }
 
- exports ['emit error calledback'] = function (test) {
 
-   var err = new Error('INTENSIONAL ERROR')
 
-     , mapper = 
 
-   map(function (data, callback) {
 
-     callback(err)
 
-   })
 
-   mapper.on('error', function (_err) {
 
-     it(_err).equal(err)  
 
-     test.done()
 
-   })
 
-   mapper.write('hello')
 
- }
 
- exports ['do not emit drain if not paused'] = function (test) {
 
-   var maps = map(function (data, callback) {
 
-     u.delay(callback)(null, 1)
 
-     return true
 
-   })
 
-   
 
-   spec(maps).through().pausable().validateOnExit()
 
-   maps.on('drain', function () {
 
-     it(false).ok('should not emit drain unless the stream is paused')
 
-   })
 
-   it(maps.write('hello')).equal(true)
 
-   it(maps.write('hello')).equal(true)
 
-   it(maps.write('hello')).equal(true)
 
-   setTimeout(function () {maps.end()},10)
 
-   maps.on('end', test.done)
 
- }
 
- exports ['emits drain if paused, when all '] = function (test) {
 
-   var active = 0
 
-   var drained = false
 
-   var maps = map(function (data, callback) {
 
-     active ++
 
-     u.delay(function () {
 
-       active --
 
-       callback(null, 1)
 
-     })()
 
-     console.log('WRITE', false)
 
-     return false
 
-   })
 
-   spec(maps).through().validateOnExit()
 
-   maps.on('drain', function () {
 
-     drained = true
 
-     it(active).equal(0, 'should emit drain when all maps are done')
 
-   })
 
-   it(maps.write('hello')).equal(false)
 
-   it(maps.write('hello')).equal(false)
 
-   it(maps.write('hello')).equal(false)
 
-   process.nextTick(function () {maps.end()},10)
 
-   maps.on('end', function () {
 
-     console.log('end')
 
-     it(drained).ok('shoud have emitted drain before end')
 
-     test.done() 
 
-   })
 
- }
 
- exports ['map applied to a stream with filtering'] = function (test) {
 
-   var input = [1,2,3,7,5,3,1,9,0,2,4,6]
 
-   var doubler = map(function (data, callback) {
 
-     if (data % 2)
 
-       callback(null, data * 2)
 
-     else
 
-       callback()
 
-   })
 
-   
 
-   readStream(doubler, function (err, output) {
 
-     it(output).deepEqual(input.filter(function (j) {
 
-       return j % 2
 
-     }).map(function (j) {
 
-       return j * 2
 
-     }))
 
-     test.done()
 
-   })
 
-   
 
-   spec(doubler).through().validateOnExit()
 
-   writeArray(input, doubler)
 
-   
 
- }
 
 
  |