| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348 | 
							- 'use strict'
 
- const DEFAULT_OPTIONS = {
 
-           workerOptions               : {}
 
-         , maxCallsPerWorker           : Infinity
 
-         , maxConcurrentWorkers        : (require('os').cpus() || { length: 1 }).length
 
-         , maxConcurrentCallsPerWorker : 10
 
-         , maxConcurrentCalls          : Infinity
 
-         , maxCallTime                 : Infinity // exceed this and the whole worker is terminated
 
-         , maxRetries                  : Infinity
 
-         , forcedKillTime              : 100
 
-         , autoStart                   : false
 
-         , onChild                     : function() {}
 
-       }
 
- const fork                    = require('./fork')
 
-     , TimeoutError            = require('errno').create('TimeoutError')
 
-     , ProcessTerminatedError  = require('errno').create('ProcessTerminatedError')
 
-     , MaxConcurrentCallsError = require('errno').create('MaxConcurrentCallsError')
 
- function Farm (options, path) {
 
-   this.options     = Object.assign({}, DEFAULT_OPTIONS, options)
 
-   this.path        = path
 
-   this.activeCalls = 0
 
- }
 
- // make a handle to pass back in the form of an external API
 
- Farm.prototype.mkhandle = function (method) {
 
-   return function () {
 
-     let args = Array.prototype.slice.call(arguments)
 
-     if (this.activeCalls + this.callQueue.length >= this.options.maxConcurrentCalls) {
 
-       let err = new MaxConcurrentCallsError('Too many concurrent calls (active: ' + this.activeCalls + ', queued: ' + this.callQueue.length + ')')
 
-       if (typeof args[args.length - 1] == 'function')
 
-         return process.nextTick(args[args.length - 1].bind(null, err))
 
-       throw err
 
-     }
 
-     this.addCall({
 
-         method   : method
 
-       , callback : args.pop()
 
-       , args     : args
 
-       , retries  : 0
 
-     })
 
-   }.bind(this)
 
- }
 
- // a constructor of sorts
 
- Farm.prototype.setup = function (methods) {
 
-   let iface
 
-   if (!methods) { // single-function export
 
-     iface = this.mkhandle()
 
-   } else { // multiple functions on the export
 
-     iface = {}
 
-     methods.forEach(function (m) {
 
-       iface[m] = this.mkhandle(m)
 
-     }.bind(this))
 
-   }
 
-   this.searchStart    = -1
 
-   this.childId        = -1
 
-   this.children       = {}
 
-   this.activeChildren = 0
 
-   this.callQueue      = []
 
-   if (this.options.autoStart) {
 
-     while (this.activeChildren < this.options.maxConcurrentWorkers)
 
-       this.startChild()
 
-   }
 
-   return iface
 
- }
 
- // when a child exits, check if there are any outstanding jobs and requeue them
 
- Farm.prototype.onExit = function (childId) {
 
-   // delay this to give any sends a chance to finish
 
-   setTimeout(function () {
 
-     let doQueue = false
 
-     if (this.children[childId] && this.children[childId].activeCalls) {
 
-       this.children[childId].calls.forEach(function (call, i) {
 
-         if (!call) return
 
-         else if (call.retries >= this.options.maxRetries) {
 
-           this.receive({
 
-               idx   : i
 
-             , child : childId
 
-             , args  : [ new ProcessTerminatedError('cancel after ' + call.retries + ' retries!') ]
 
-           })
 
-         } else {
 
-           call.retries++
 
-           this.callQueue.unshift(call)
 
-           doQueue = true
 
-         }
 
-       }.bind(this))
 
-     }
 
-     this.stopChild(childId)
 
-     doQueue && this.processQueue()
 
-   }.bind(this), 10)
 
- }
 
- // start a new worker
 
- Farm.prototype.startChild = function () {
 
-   this.childId++
 
-   let forked = fork(this.path, this.options.workerOptions)
 
-     , id     = this.childId
 
-     , c      = {
 
-           send        : forked.send
 
-         , child       : forked.child
 
-         , calls       : []
 
-         , activeCalls : 0
 
-         , exitCode    : null
 
-       }
 
-   this.options.onChild(forked.child);
 
-   forked.child.on('message', function(data) {
 
-     if (data.owner !== 'farm') {
 
-       return;
 
-     }
 
-     this.receive(data);
 
-   }.bind(this))
 
-   forked.child.once('exit', function (code) {
 
-     c.exitCode = code
 
-     this.onExit(id)
 
-   }.bind(this))
 
-   this.activeChildren++
 
-   this.children[id] = c
 
- }
 
- // stop a worker, identified by id
 
- Farm.prototype.stopChild = function (childId) {
 
-   let child = this.children[childId]
 
-   if (child) {
 
-     child.send({owner: 'farm', event: 'die'})
 
-     setTimeout(function () {
 
-       if (child.exitCode === null)
 
-         child.child.kill('SIGKILL')
 
-     }, this.options.forcedKillTime).unref()
 
-     ;delete this.children[childId]
 
-     this.activeChildren--
 
-   }
 
- }
 
- // called from a child process, the data contains information needed to
 
- // look up the child and the original call so we can invoke the callback
 
- Farm.prototype.receive = function (data) {
 
-   let idx     = data.idx
 
-     , childId = data.child
 
-     , args    = data.args
 
-     , child   = this.children[childId]
 
-     , call
 
-   if (!child) {
 
-     return console.error(
 
-         'Worker Farm: Received message for unknown child. '
 
-       + 'This is likely as a result of premature child death, '
 
-       + 'the operation will have been re-queued.'
 
-     )
 
-   }
 
-   call = child.calls[idx]
 
-   if (!call) {
 
-     return console.error(
 
-         'Worker Farm: Received message for unknown index for existing child. '
 
-       + 'This should not happen!'
 
-     )
 
-   }
 
-   if (this.options.maxCallTime !== Infinity)
 
-     clearTimeout(call.timer)
 
-   if (args[0] && args[0].$error == '$error') {
 
-     let e = args[0]
 
-     switch (e.type) {
 
-       case 'TypeError': args[0] = new TypeError(e.message); break
 
-       case 'RangeError': args[0] = new RangeError(e.message); break
 
-       case 'EvalError': args[0] = new EvalError(e.message); break
 
-       case 'ReferenceError': args[0] = new ReferenceError(e.message); break
 
-       case 'SyntaxError': args[0] = new SyntaxError(e.message); break
 
-       case 'URIError': args[0] = new URIError(e.message); break
 
-       default: args[0] = new Error(e.message)
 
-     }
 
-     args[0].type = e.type
 
-     args[0].stack = e.stack
 
-     // Copy any custom properties to pass it on.
 
-     Object.keys(e).forEach(function(key) {
 
-       args[0][key] = e[key];
 
-     });
 
-   }
 
-   process.nextTick(function () {
 
-     call.callback.apply(null, args)
 
-   })
 
-   ;delete child.calls[idx]
 
-   child.activeCalls--
 
-   this.activeCalls--
 
-   if (child.calls.length >= this.options.maxCallsPerWorker
 
-       && !Object.keys(child.calls).length) {
 
-     // this child has finished its run, kill it
 
-     this.stopChild(childId)
 
-   }
 
-   // allow any outstanding calls to be processed
 
-   this.processQueue()
 
- }
 
- Farm.prototype.childTimeout = function (childId) {
 
-   let child = this.children[childId]
 
-     , i
 
-   if (!child)
 
-     return
 
-   for (i in child.calls) {
 
-     this.receive({
 
-         idx   : i
 
-       , child : childId
 
-       , args  : [ new TimeoutError('worker call timed out!') ]
 
-     })
 
-   }
 
-   this.stopChild(childId)
 
- }
 
- // send a call to a worker, identified by id
 
- Farm.prototype.send = function (childId, call) {
 
-   let child = this.children[childId]
 
-     , idx   = child.calls.length
 
-   child.calls.push(call)
 
-   child.activeCalls++
 
-   this.activeCalls++
 
-   child.send({
 
-       owner  : 'farm'
 
-     , idx    : idx
 
-     , child  : childId
 
-     , method : call.method
 
-     , args   : call.args
 
-   })
 
-   if (this.options.maxCallTime !== Infinity) {
 
-     call.timer =
 
-       setTimeout(this.childTimeout.bind(this, childId), this.options.maxCallTime)
 
-   }
 
- }
 
- // a list of active worker ids, in order, but the starting offset is
 
- // shifted each time this method is called, so we work our way through
 
- // all workers when handing out jobs
 
- Farm.prototype.childKeys = function () {
 
-   let cka = Object.keys(this.children)
 
-     , cks
 
-   if (this.searchStart >= cka.length - 1)
 
-     this.searchStart = 0
 
-   else
 
-     this.searchStart++
 
-   cks = cka.splice(0, this.searchStart)
 
-   return cka.concat(cks)
 
- }
 
- // Calls are added to a queue, this processes the queue and is called
 
- // whenever there might be a chance to send more calls to the workers.
 
- // The various options all impact on when we're able to send calls,
 
- // they may need to be kept in a queue until a worker is ready.
 
- Farm.prototype.processQueue = function () {
 
-   let cka, i = 0, childId
 
-   if (!this.callQueue.length)
 
-     return this.ending && this.end()
 
-   if (this.activeChildren < this.options.maxConcurrentWorkers)
 
-     this.startChild()
 
-   for (cka = this.childKeys(); i < cka.length; i++) {
 
-     childId = +cka[i]
 
-     if (this.children[childId].activeCalls < this.options.maxConcurrentCallsPerWorker
 
-         && this.children[childId].calls.length < this.options.maxCallsPerWorker) {
 
-       this.send(childId, this.callQueue.shift())
 
-       if (!this.callQueue.length)
 
-         return this.ending && this.end()
 
-     } /*else {
 
-       console.log(
 
-         , this.children[childId].activeCalls < this.options.maxConcurrentCallsPerWorker
 
-         , this.children[childId].calls.length < this.options.maxCallsPerWorker
 
-         , this.children[childId].calls.length , this.options.maxCallsPerWorker)
 
-     }*/
 
-   }
 
-   if (this.ending)
 
-     this.end()
 
- }
 
- // add a new call to the call queue, then trigger a process of the queue
 
- Farm.prototype.addCall = function (call) {
 
-   if (this.ending)
 
-     return this.end() // don't add anything new to the queue
 
-   this.callQueue.push(call)
 
-   this.processQueue()
 
- }
 
- // kills child workers when they're all done
 
- Farm.prototype.end = function (callback) {
 
-   let complete = true
 
-   if (this.ending === false)
 
-     return
 
-   if (callback)
 
-     this.ending = callback
 
-   else if (this.ending == null)
 
-     this.ending = true
 
-   Object.keys(this.children).forEach(function (child) {
 
-     if (!this.children[child])
 
-       return
 
-     if (!this.children[child].activeCalls)
 
-       this.stopChild(child)
 
-     else
 
-       complete = false
 
-   }.bind(this))
 
-   if (complete && typeof this.ending == 'function') {
 
-     process.nextTick(function () {
 
-       this.ending()
 
-       this.ending = false
 
-     }.bind(this))
 
-   }
 
- }
 
- module.exports              = Farm
 
- module.exports.TimeoutError = TimeoutError
 
 
  |