| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348 | 'use strict'const DEFAULT_OPTIONS = {          workerOptions               : {}        , maxCallsPerWorker           : Infinity        , maxConcurrentWorkers        : (require('os').cpus() || { length: 1 }).length        , maxConcurrentCallsPerWorker : 10        , maxConcurrentCalls          : Infinity        , maxCallTime                 : Infinity // exceed this and the whole worker is terminated        , maxRetries                  : Infinity        , forcedKillTime              : 100        , autoStart                   : false        , onChild                     : function() {}      }const fork                    = require('./fork')    , TimeoutError            = require('errno').create('TimeoutError')    , ProcessTerminatedError  = require('errno').create('ProcessTerminatedError')    , MaxConcurrentCallsError = require('errno').create('MaxConcurrentCallsError')function Farm (options, path) {  this.options     = Object.assign({}, DEFAULT_OPTIONS, options)  this.path        = path  this.activeCalls = 0}// make a handle to pass back in the form of an external APIFarm.prototype.mkhandle = function (method) {  return function () {    let args = Array.prototype.slice.call(arguments)    if (this.activeCalls + this.callQueue.length >= this.options.maxConcurrentCalls) {      let err = new MaxConcurrentCallsError('Too many concurrent calls (active: ' + this.activeCalls + ', queued: ' + this.callQueue.length + ')')      if (typeof args[args.length - 1] == 'function')        return process.nextTick(args[args.length - 1].bind(null, err))      throw err    }    this.addCall({        method   : method      , callback : args.pop()      , args     : args      , retries  : 0    })  }.bind(this)}// a constructor of sortsFarm.prototype.setup = function (methods) {  let iface  if (!methods) { // single-function export    iface = this.mkhandle()  } else { // multiple functions on the export    iface = {}    methods.forEach(function (m) {      iface[m] = this.mkhandle(m)    }.bind(this))  }  this.searchStart    = -1  this.childId        = -1  this.children       = {}  this.activeChildren = 0  this.callQueue      = []  if (this.options.autoStart) {    while (this.activeChildren < this.options.maxConcurrentWorkers)      this.startChild()  }  return iface}// when a child exits, check if there are any outstanding jobs and requeue themFarm.prototype.onExit = function (childId) {  // delay this to give any sends a chance to finish  setTimeout(function () {    let doQueue = false    if (this.children[childId] && this.children[childId].activeCalls) {      this.children[childId].calls.forEach(function (call, i) {        if (!call) return        else if (call.retries >= this.options.maxRetries) {          this.receive({              idx   : i            , child : childId            , args  : [ new ProcessTerminatedError('cancel after ' + call.retries + ' retries!') ]          })        } else {          call.retries++          this.callQueue.unshift(call)          doQueue = true        }      }.bind(this))    }    this.stopChild(childId)    doQueue && this.processQueue()  }.bind(this), 10)}// start a new workerFarm.prototype.startChild = function () {  this.childId++  let forked = fork(this.path, this.options.workerOptions)    , id     = this.childId    , c      = {          send        : forked.send        , child       : forked.child        , calls       : []        , activeCalls : 0        , exitCode    : null      }  this.options.onChild(forked.child);  forked.child.on('message', function(data) {    if (data.owner !== 'farm') {      return;    }    this.receive(data);  }.bind(this))  forked.child.once('exit', function (code) {    c.exitCode = code    this.onExit(id)  }.bind(this))  this.activeChildren++  this.children[id] = c}// stop a worker, identified by idFarm.prototype.stopChild = function (childId) {  let child = this.children[childId]  if (child) {    child.send({owner: 'farm', event: 'die'})    setTimeout(function () {      if (child.exitCode === null)        child.child.kill('SIGKILL')    }, this.options.forcedKillTime).unref()    ;delete this.children[childId]    this.activeChildren--  }}// called from a child process, the data contains information needed to// look up the child and the original call so we can invoke the callbackFarm.prototype.receive = function (data) {  let idx     = data.idx    , childId = data.child    , args    = data.args    , child   = this.children[childId]    , call  if (!child) {    return console.error(        'Worker Farm: Received message for unknown child. '      + 'This is likely as a result of premature child death, '      + 'the operation will have been re-queued.'    )  }  call = child.calls[idx]  if (!call) {    return console.error(        'Worker Farm: Received message for unknown index for existing child. '      + 'This should not happen!'    )  }  if (this.options.maxCallTime !== Infinity)    clearTimeout(call.timer)  if (args[0] && args[0].$error == '$error') {    let e = args[0]    switch (e.type) {      case 'TypeError': args[0] = new TypeError(e.message); break      case 'RangeError': args[0] = new RangeError(e.message); break      case 'EvalError': args[0] = new EvalError(e.message); break      case 'ReferenceError': args[0] = new ReferenceError(e.message); break      case 'SyntaxError': args[0] = new SyntaxError(e.message); break      case 'URIError': args[0] = new URIError(e.message); break      default: args[0] = new Error(e.message)    }    args[0].type = e.type    args[0].stack = e.stack    // Copy any custom properties to pass it on.    Object.keys(e).forEach(function(key) {      args[0][key] = e[key];    });  }  process.nextTick(function () {    call.callback.apply(null, args)  })  ;delete child.calls[idx]  child.activeCalls--  this.activeCalls--  if (child.calls.length >= this.options.maxCallsPerWorker      && !Object.keys(child.calls).length) {    // this child has finished its run, kill it    this.stopChild(childId)  }  // allow any outstanding calls to be processed  this.processQueue()}Farm.prototype.childTimeout = function (childId) {  let child = this.children[childId]    , i  if (!child)    return  for (i in child.calls) {    this.receive({        idx   : i      , child : childId      , args  : [ new TimeoutError('worker call timed out!') ]    })  }  this.stopChild(childId)}// send a call to a worker, identified by idFarm.prototype.send = function (childId, call) {  let child = this.children[childId]    , idx   = child.calls.length  child.calls.push(call)  child.activeCalls++  this.activeCalls++  child.send({      owner  : 'farm'    , idx    : idx    , child  : childId    , method : call.method    , args   : call.args  })  if (this.options.maxCallTime !== Infinity) {    call.timer =      setTimeout(this.childTimeout.bind(this, childId), this.options.maxCallTime)  }}// a list of active worker ids, in order, but the starting offset is// shifted each time this method is called, so we work our way through// all workers when handing out jobsFarm.prototype.childKeys = function () {  let cka = Object.keys(this.children)    , cks  if (this.searchStart >= cka.length - 1)    this.searchStart = 0  else    this.searchStart++  cks = cka.splice(0, this.searchStart)  return cka.concat(cks)}// Calls are added to a queue, this processes the queue and is called// whenever there might be a chance to send more calls to the workers.// The various options all impact on when we're able to send calls,// they may need to be kept in a queue until a worker is ready.Farm.prototype.processQueue = function () {  let cka, i = 0, childId  if (!this.callQueue.length)    return this.ending && this.end()  if (this.activeChildren < this.options.maxConcurrentWorkers)    this.startChild()  for (cka = this.childKeys(); i < cka.length; i++) {    childId = +cka[i]    if (this.children[childId].activeCalls < this.options.maxConcurrentCallsPerWorker        && this.children[childId].calls.length < this.options.maxCallsPerWorker) {      this.send(childId, this.callQueue.shift())      if (!this.callQueue.length)        return this.ending && this.end()    } /*else {      console.log(        , this.children[childId].activeCalls < this.options.maxConcurrentCallsPerWorker        , this.children[childId].calls.length < this.options.maxCallsPerWorker        , this.children[childId].calls.length , this.options.maxCallsPerWorker)    }*/  }  if (this.ending)    this.end()}// add a new call to the call queue, then trigger a process of the queueFarm.prototype.addCall = function (call) {  if (this.ending)    return this.end() // don't add anything new to the queue  this.callQueue.push(call)  this.processQueue()}// kills child workers when they're all doneFarm.prototype.end = function (callback) {  let complete = true  if (this.ending === false)    return  if (callback)    this.ending = callback  else if (this.ending == null)    this.ending = true  Object.keys(this.children).forEach(function (child) {    if (!this.children[child])      return    if (!this.children[child].activeCalls)      this.stopChild(child)    else      complete = false  }.bind(this))  if (complete && typeof this.ending == 'function') {    process.nextTick(function () {      this.ending()      this.ending = false    }.bind(this))  }}module.exports              = Farmmodule.exports.TimeoutError = TimeoutError
 |