| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725 | /** * Support for concurrent task management and synchronization in web * applications. * * @author Dave Longley * @author David I. Lehn <dlehn@digitalbazaar.com> * * Copyright (c) 2009-2013 Digital Bazaar, Inc. */var forge = require('./forge');require('./debug');require('./log');require('./util');// logging categoryvar cat = 'forge.task';// verbose level// 0: off, 1: a little, 2: a whole lot// Verbose debug logging is surrounded by a level check to avoid the// performance issues with even calling the logging code regardless if it// is actually logged.  For performance reasons this should not be set to 2// for production use.// ex: if(sVL >= 2) forge.log.verbose(....)var sVL = 0;// track tasks for debuggingvar sTasks = {};var sNextTaskId = 0;// debug accessforge.debug.set(cat, 'tasks', sTasks);// a map of task type to task queuevar sTaskQueues = {};// debug accessforge.debug.set(cat, 'queues', sTaskQueues);// name for unnamed tasksvar sNoTaskName = '?';// maximum number of doNext() recursions before a context swap occurs// FIXME: might need to tweak this based on the browservar sMaxRecursions = 30;// time slice for doing tasks before a context swap occurs// FIXME: might need to tweak this based on the browservar sTimeSlice = 20;/** * Task states. * * READY: ready to start processing * RUNNING: task or a subtask is running * BLOCKED: task is waiting to acquire N permits to continue * SLEEPING: task is sleeping for a period of time * DONE: task is done * ERROR: task has an error */var READY = 'ready';var RUNNING = 'running';var BLOCKED = 'blocked';var SLEEPING = 'sleeping';var DONE = 'done';var ERROR = 'error';/** * Task actions.  Used to control state transitions. * * STOP: stop processing * START: start processing tasks * BLOCK: block task from continuing until 1 or more permits are released * UNBLOCK: release one or more permits * SLEEP: sleep for a period of time * WAKEUP: wakeup early from SLEEPING state * CANCEL: cancel further tasks * FAIL: a failure occured */var STOP = 'stop';var START = 'start';var BLOCK = 'block';var UNBLOCK = 'unblock';var SLEEP = 'sleep';var WAKEUP = 'wakeup';var CANCEL = 'cancel';var FAIL = 'fail';/** * State transition table. * * nextState = sStateTable[currentState][action] */var sStateTable = {};sStateTable[READY] = {};sStateTable[READY][STOP] = READY;sStateTable[READY][START] = RUNNING;sStateTable[READY][CANCEL] = DONE;sStateTable[READY][FAIL] = ERROR;sStateTable[RUNNING] = {};sStateTable[RUNNING][STOP] = READY;sStateTable[RUNNING][START] = RUNNING;sStateTable[RUNNING][BLOCK] = BLOCKED;sStateTable[RUNNING][UNBLOCK] = RUNNING;sStateTable[RUNNING][SLEEP] = SLEEPING;sStateTable[RUNNING][WAKEUP] = RUNNING;sStateTable[RUNNING][CANCEL] = DONE;sStateTable[RUNNING][FAIL] = ERROR;sStateTable[BLOCKED] = {};sStateTable[BLOCKED][STOP] = BLOCKED;sStateTable[BLOCKED][START] = BLOCKED;sStateTable[BLOCKED][BLOCK] = BLOCKED;sStateTable[BLOCKED][UNBLOCK] = BLOCKED;sStateTable[BLOCKED][SLEEP] = BLOCKED;sStateTable[BLOCKED][WAKEUP] = BLOCKED;sStateTable[BLOCKED][CANCEL] = DONE;sStateTable[BLOCKED][FAIL] = ERROR;sStateTable[SLEEPING] = {};sStateTable[SLEEPING][STOP] = SLEEPING;sStateTable[SLEEPING][START] = SLEEPING;sStateTable[SLEEPING][BLOCK] = SLEEPING;sStateTable[SLEEPING][UNBLOCK] = SLEEPING;sStateTable[SLEEPING][SLEEP] = SLEEPING;sStateTable[SLEEPING][WAKEUP] = SLEEPING;sStateTable[SLEEPING][CANCEL] = DONE;sStateTable[SLEEPING][FAIL] = ERROR;sStateTable[DONE] = {};sStateTable[DONE][STOP] = DONE;sStateTable[DONE][START] = DONE;sStateTable[DONE][BLOCK] = DONE;sStateTable[DONE][UNBLOCK] = DONE;sStateTable[DONE][SLEEP] = DONE;sStateTable[DONE][WAKEUP] = DONE;sStateTable[DONE][CANCEL] = DONE;sStateTable[DONE][FAIL] = ERROR;sStateTable[ERROR] = {};sStateTable[ERROR][STOP] = ERROR;sStateTable[ERROR][START] = ERROR;sStateTable[ERROR][BLOCK] = ERROR;sStateTable[ERROR][UNBLOCK] = ERROR;sStateTable[ERROR][SLEEP] = ERROR;sStateTable[ERROR][WAKEUP] = ERROR;sStateTable[ERROR][CANCEL] = ERROR;sStateTable[ERROR][FAIL] = ERROR;/** * Creates a new task. * * @param options options for this task *   run: the run function for the task (required) *   name: the run function for the task (optional) *   parent: parent of this task (optional) * * @return the empty task. */var Task = function(options) {  // task id  this.id = -1;  // task name  this.name = options.name || sNoTaskName;  // task has no parent  this.parent = options.parent || null;  // save run function  this.run = options.run;  // create a queue of subtasks to run  this.subtasks = [];  // error flag  this.error = false;  // state of the task  this.state = READY;  // number of times the task has been blocked (also the number  // of permits needed to be released to continue running)  this.blocks = 0;  // timeout id when sleeping  this.timeoutId = null;  // no swap time yet  this.swapTime = null;  // no user data  this.userData = null;  // initialize task  // FIXME: deal with overflow  this.id = sNextTaskId++;  sTasks[this.id] = this;  if(sVL >= 1) {    forge.log.verbose(cat, '[%s][%s] init', this.id, this.name, this);  }};/** * Logs debug information on this task and the system state. */Task.prototype.debug = function(msg) {  msg = msg || '';  forge.log.debug(cat, msg,    '[%s][%s] task:', this.id, this.name, this,    'subtasks:', this.subtasks.length,    'queue:', sTaskQueues);};/** * Adds a subtask to run after task.doNext() or task.fail() is called. * * @param name human readable name for this task (optional). * @param subrun a function to run that takes the current task as *          its first parameter. * * @return the current task (useful for chaining next() calls). */Task.prototype.next = function(name, subrun) {  // juggle parameters if it looks like no name is given  if(typeof(name) === 'function') {    subrun = name;    // inherit parent's name    name = this.name;  }  // create subtask, set parent to this task, propagate callbacks  var subtask = new Task({    run: subrun,    name: name,    parent: this  });  // start subtasks running  subtask.state = RUNNING;  subtask.type = this.type;  subtask.successCallback = this.successCallback || null;  subtask.failureCallback = this.failureCallback || null;  // queue a new subtask  this.subtasks.push(subtask);  return this;};/** * Adds subtasks to run in parallel after task.doNext() or task.fail() * is called. * * @param name human readable name for this task (optional). * @param subrun functions to run that take the current task as *          their first parameter. * * @return the current task (useful for chaining next() calls). */Task.prototype.parallel = function(name, subrun) {  // juggle parameters if it looks like no name is given  if(forge.util.isArray(name)) {    subrun = name;    // inherit parent's name    name = this.name;  }  // Wrap parallel tasks in a regular task so they are started at the  // proper time.  return this.next(name, function(task) {    // block waiting for subtasks    var ptask = task;    ptask.block(subrun.length);    // we pass the iterator from the loop below as a parameter    // to a function because it is otherwise included in the    // closure and changes as the loop changes -- causing i    // to always be set to its highest value    var startParallelTask = function(pname, pi) {      forge.task.start({        type: pname,        run: function(task) {           subrun[pi](task);        },        success: function(task) {           ptask.unblock();        },        failure: function(task) {           ptask.unblock();        }      });    };    for(var i = 0; i < subrun.length; i++) {      // Type must be unique so task starts in parallel:      //    name + private string + task id + sub-task index      // start tasks in parallel and unblock when the finish      var pname = name + '__parallel-' + task.id + '-' + i;      var pi = i;      startParallelTask(pname, pi);    }  });};/** * Stops a running task. */Task.prototype.stop = function() {  this.state = sStateTable[this.state][STOP];};/** * Starts running a task. */Task.prototype.start = function() {  this.error = false;  this.state = sStateTable[this.state][START];  // try to restart  if(this.state === RUNNING) {    this.start = new Date();    this.run(this);    runNext(this, 0);  }};/** * Blocks a task until it one or more permits have been released. The * task will not resume until the requested number of permits have * been released with call(s) to unblock(). * * @param n number of permits to wait for(default: 1). */Task.prototype.block = function(n) {  n = typeof(n) === 'undefined' ? 1 : n;  this.blocks += n;  if(this.blocks > 0) {    this.state = sStateTable[this.state][BLOCK];  }};/** * Releases a permit to unblock a task. If a task was blocked by * requesting N permits via block(), then it will only continue * running once enough permits have been released via unblock() calls. * * If multiple processes need to synchronize with a single task then * use a condition variable (see forge.task.createCondition). It is * an error to unblock a task more times than it has been blocked. * * @param n number of permits to release (default: 1). * * @return the current block count (task is unblocked when count is 0) */Task.prototype.unblock = function(n) {  n = typeof(n) === 'undefined' ? 1 : n;  this.blocks -= n;  if(this.blocks === 0 && this.state !== DONE) {    this.state = RUNNING;    runNext(this, 0);  }  return this.blocks;};/** * Sleep for a period of time before resuming tasks. * * @param n number of milliseconds to sleep (default: 0). */Task.prototype.sleep = function(n) {  n = typeof(n) === 'undefined' ? 0 : n;  this.state = sStateTable[this.state][SLEEP];  var self = this;  this.timeoutId = setTimeout(function() {    self.timeoutId = null;    self.state = RUNNING;    runNext(self, 0);  }, n);};/** * Waits on a condition variable until notified. The next task will * not be scheduled until notification. A condition variable can be * created with forge.task.createCondition(). * * Once cond.notify() is called, the task will continue. * * @param cond the condition variable to wait on. */Task.prototype.wait = function(cond) {  cond.wait(this);};/** * If sleeping, wakeup and continue running tasks. */Task.prototype.wakeup = function() {  if(this.state === SLEEPING) {    cancelTimeout(this.timeoutId);    this.timeoutId = null;    this.state = RUNNING;    runNext(this, 0);  }};/** * Cancel all remaining subtasks of this task. */Task.prototype.cancel = function() {  this.state = sStateTable[this.state][CANCEL];  // remove permits needed  this.permitsNeeded = 0;  // cancel timeouts  if(this.timeoutId !== null) {    cancelTimeout(this.timeoutId);    this.timeoutId = null;  }  // remove subtasks  this.subtasks = [];};/** * Finishes this task with failure and sets error flag. The entire * task will be aborted unless the next task that should execute * is passed as a parameter. This allows levels of subtasks to be * skipped. For instance, to abort only this tasks's subtasks, then * call fail(task.parent). To abort this task's subtasks and its * parent's subtasks, call fail(task.parent.parent). To abort * all tasks and simply call the task callback, call fail() or * fail(null). * * The task callback (success or failure) will always, eventually, be * called. * * @param next the task to continue at, or null to abort entirely. */Task.prototype.fail = function(next) {  // set error flag  this.error = true;  // finish task  finish(this, true);  if(next) {    // propagate task info    next.error = this.error;    next.swapTime = this.swapTime;    next.userData = this.userData;    // do next task as specified    runNext(next, 0);  } else {    if(this.parent !== null) {      // finish root task (ensures it is removed from task queue)      var parent = this.parent;      while(parent.parent !== null) {        // propagate task info        parent.error = this.error;        parent.swapTime = this.swapTime;        parent.userData = this.userData;        parent = parent.parent;      }      finish(parent, true);    }    // call failure callback if one exists    if(this.failureCallback) {      this.failureCallback(this);    }  }};/** * Asynchronously start a task. * * @param task the task to start. */var start = function(task) {  task.error = false;  task.state = sStateTable[task.state][START];  setTimeout(function() {    if(task.state === RUNNING) {      task.swapTime = +new Date();      task.run(task);      runNext(task, 0);    }  }, 0);};/** * Run the next subtask or finish this task. * * @param task the task to process. * @param recurse the recursion count. */var runNext = function(task, recurse) {  // get time since last context swap (ms), if enough time has passed set  // swap to true to indicate that doNext was performed asynchronously  // also, if recurse is too high do asynchronously  var swap =    (recurse > sMaxRecursions) ||    (+new Date() - task.swapTime) > sTimeSlice;  var doNext = function(recurse) {    recurse++;    if(task.state === RUNNING) {      if(swap) {        // update swap time        task.swapTime = +new Date();      }      if(task.subtasks.length > 0) {        // run next subtask        var subtask = task.subtasks.shift();        subtask.error = task.error;        subtask.swapTime = task.swapTime;        subtask.userData = task.userData;        subtask.run(subtask);        if(!subtask.error) {           runNext(subtask, recurse);        }      } else {        finish(task);        if(!task.error) {          // chain back up and run parent          if(task.parent !== null) {            // propagate task info            task.parent.error = task.error;            task.parent.swapTime = task.swapTime;            task.parent.userData = task.userData;            // no subtasks left, call run next subtask on parent            runNext(task.parent, recurse);          }        }      }    }  };  if(swap) {    // we're swapping, so run asynchronously    setTimeout(doNext, 0);  } else {    // not swapping, so run synchronously    doNext(recurse);  }};/** * Finishes a task and looks for the next task in the queue to start. * * @param task the task to finish. * @param suppressCallbacks true to suppress callbacks. */var finish = function(task, suppressCallbacks) {  // subtask is now done  task.state = DONE;  delete sTasks[task.id];  if(sVL >= 1) {    forge.log.verbose(cat, '[%s][%s] finish',      task.id, task.name, task);  }  // only do queue processing for root tasks  if(task.parent === null) {    // report error if queue is missing    if(!(task.type in sTaskQueues)) {      forge.log.error(cat,        '[%s][%s] task queue missing [%s]',        task.id, task.name, task.type);    } else if(sTaskQueues[task.type].length === 0) {      // report error if queue is empty      forge.log.error(cat,        '[%s][%s] task queue empty [%s]',        task.id, task.name, task.type);    } else if(sTaskQueues[task.type][0] !== task) {      // report error if this task isn't the first in the queue      forge.log.error(cat,        '[%s][%s] task not first in queue [%s]',        task.id, task.name, task.type);    } else {      // remove ourselves from the queue      sTaskQueues[task.type].shift();      // clean up queue if it is empty      if(sTaskQueues[task.type].length === 0) {        if(sVL >= 1) {          forge.log.verbose(cat, '[%s][%s] delete queue [%s]',            task.id, task.name, task.type);        }        /* Note: Only a task can delete a queue of its own type. This         is used as a way to synchronize tasks. If a queue for a certain         task type exists, then a task of that type is running.         */        delete sTaskQueues[task.type];      } else {        // dequeue the next task and start it        if(sVL >= 1) {          forge.log.verbose(cat,            '[%s][%s] queue start next [%s] remain:%s',            task.id, task.name, task.type,            sTaskQueues[task.type].length);        }        sTaskQueues[task.type][0].start();      }    }    if(!suppressCallbacks) {      // call final callback if one exists      if(task.error && task.failureCallback) {        task.failureCallback(task);      } else if(!task.error && task.successCallback) {        task.successCallback(task);      }    }  }};/* Tasks API */module.exports = forge.task = forge.task || {};/** * Starts a new task that will run the passed function asynchronously. * * In order to finish the task, either task.doNext() or task.fail() * *must* be called. * * The task must have a type (a string identifier) that can be used to * synchronize it with other tasks of the same type. That type can also * be used to cancel tasks that haven't started yet. * * To start a task, the following object must be provided as a parameter * (each function takes a task object as its first parameter): * * { *   type: the type of task. *   run: the function to run to execute the task. *   success: a callback to call when the task succeeds (optional). *   failure: a callback to call when the task fails (optional). * } * * @param options the object as described above. */forge.task.start = function(options) {  // create a new task  var task = new Task({    run: options.run,    name: options.name || sNoTaskName  });  task.type = options.type;  task.successCallback = options.success || null;  task.failureCallback = options.failure || null;  // append the task onto the appropriate queue  if(!(task.type in sTaskQueues)) {    if(sVL >= 1) {      forge.log.verbose(cat, '[%s][%s] create queue [%s]',        task.id, task.name, task.type);    }    // create the queue with the new task    sTaskQueues[task.type] = [task];    start(task);  } else {    // push the task onto the queue, it will be run after a task    // with the same type completes    sTaskQueues[options.type].push(task);  }};/** * Cancels all tasks of the given type that haven't started yet. * * @param type the type of task to cancel. */forge.task.cancel = function(type) {  // find the task queue  if(type in sTaskQueues) {    // empty all but the current task from the queue    sTaskQueues[type] = [sTaskQueues[type][0]];  }};/** * Creates a condition variable to synchronize tasks. To make a task wait * on the condition variable, call task.wait(condition). To notify all * tasks that are waiting, call condition.notify(). * * @return the condition variable. */forge.task.createCondition = function() {  var cond = {    // all tasks that are blocked    tasks: {}  };  /**   * Causes the given task to block until notify is called. If the task   * is already waiting on this condition then this is a no-op.   *   * @param task the task to cause to wait.   */  cond.wait = function(task) {    // only block once    if(!(task.id in cond.tasks)) {       task.block();       cond.tasks[task.id] = task;    }  };  /**   * Notifies all waiting tasks to wake up.   */  cond.notify = function() {    // since unblock() will run the next task from here, make sure to    // clear the condition's blocked task list before unblocking    var tmp = cond.tasks;    cond.tasks = {};    for(var id in tmp) {      tmp[id].unblock();    }  };  return cond;};
 |