| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144 | 'use strict'/* * merge2 * https://github.com/teambition/merge2 * * Copyright (c) 2014-2020 Teambition * Licensed under the MIT license. */const Stream = require('stream')const PassThrough = Stream.PassThroughconst slice = Array.prototype.slicemodule.exports = merge2function merge2 () {  const streamsQueue = []  const args = slice.call(arguments)  let merging = false  let options = args[args.length - 1]  if (options && !Array.isArray(options) && options.pipe == null) {    args.pop()  } else {    options = {}  }  const doEnd = options.end !== false  const doPipeError = options.pipeError === true  if (options.objectMode == null) {    options.objectMode = true  }  if (options.highWaterMark == null) {    options.highWaterMark = 64 * 1024  }  const mergedStream = PassThrough(options)  function addStream () {    for (let i = 0, len = arguments.length; i < len; i++) {      streamsQueue.push(pauseStreams(arguments[i], options))    }    mergeStream()    return this  }  function mergeStream () {    if (merging) {      return    }    merging = true    let streams = streamsQueue.shift()    if (!streams) {      process.nextTick(endStream)      return    }    if (!Array.isArray(streams)) {      streams = [streams]    }    let pipesCount = streams.length + 1    function next () {      if (--pipesCount > 0) {        return      }      merging = false      mergeStream()    }    function pipe (stream) {      function onend () {        stream.removeListener('merge2UnpipeEnd', onend)        stream.removeListener('end', onend)        if (doPipeError) {          stream.removeListener('error', onerror)        }        next()      }      function onerror (err) {        mergedStream.emit('error', err)      }      // skip ended stream      if (stream._readableState.endEmitted) {        return next()      }      stream.on('merge2UnpipeEnd', onend)      stream.on('end', onend)      if (doPipeError) {        stream.on('error', onerror)      }      stream.pipe(mergedStream, { end: false })      // compatible for old stream      stream.resume()    }    for (let i = 0; i < streams.length; i++) {      pipe(streams[i])    }    next()  }  function endStream () {    merging = false    // emit 'queueDrain' when all streams merged.    mergedStream.emit('queueDrain')    if (doEnd) {      mergedStream.end()    }  }  mergedStream.setMaxListeners(0)  mergedStream.add = addStream  mergedStream.on('unpipe', function (stream) {    stream.emit('merge2UnpipeEnd')  })  if (args.length) {    addStream.apply(null, args)  }  return mergedStream}// check and pause streams for pipe.function pauseStreams (streams, options) {  if (!Array.isArray(streams)) {    // Backwards-compat with old-style streams    if (!streams._readableState && streams.pipe) {      streams = streams.pipe(PassThrough(options))    }    if (!streams._readableState || !streams.pause || !streams.pipe) {      throw new Error('Only readable stream can be merged.')    }    streams.pause()  } else {    for (let i = 0, len = streams.length; i < len; i++) {      streams[i] = pauseStreams(streams[i], options)    }  }  return streams}
 |