| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294 | 'use strict';Object.defineProperty(exports, "__esModule", {    value: true});exports.default = queue;var _onlyOnce = require('./onlyOnce.js');var _onlyOnce2 = _interopRequireDefault(_onlyOnce);var _setImmediate = require('./setImmediate.js');var _setImmediate2 = _interopRequireDefault(_setImmediate);var _DoublyLinkedList = require('./DoublyLinkedList.js');var _DoublyLinkedList2 = _interopRequireDefault(_DoublyLinkedList);var _wrapAsync = require('./wrapAsync.js');var _wrapAsync2 = _interopRequireDefault(_wrapAsync);function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }function queue(worker, concurrency, payload) {    if (concurrency == null) {        concurrency = 1;    } else if (concurrency === 0) {        throw new RangeError('Concurrency must not be zero');    }    var _worker = (0, _wrapAsync2.default)(worker);    var numRunning = 0;    var workersList = [];    const events = {        error: [],        drain: [],        saturated: [],        unsaturated: [],        empty: []    };    function on(event, handler) {        events[event].push(handler);    }    function once(event, handler) {        const handleAndRemove = (...args) => {            off(event, handleAndRemove);            handler(...args);        };        events[event].push(handleAndRemove);    }    function off(event, handler) {        if (!event) return Object.keys(events).forEach(ev => events[ev] = []);        if (!handler) return events[event] = [];        events[event] = events[event].filter(ev => ev !== handler);    }    function trigger(event, ...args) {        events[event].forEach(handler => handler(...args));    }    var processingScheduled = false;    function _insert(data, insertAtFront, rejectOnError, callback) {        if (callback != null && typeof callback !== 'function') {            throw new Error('task callback must be a function');        }        q.started = true;        var res, rej;        function promiseCallback(err, ...args) {            // we don't care about the error, let the global error handler            // deal with it            if (err) return rejectOnError ? rej(err) : res();            if (args.length <= 1) return res(args[0]);            res(args);        }        var item = q._createTaskItem(data, rejectOnError ? promiseCallback : callback || promiseCallback);        if (insertAtFront) {            q._tasks.unshift(item);        } else {            q._tasks.push(item);        }        if (!processingScheduled) {            processingScheduled = true;            (0, _setImmediate2.default)(() => {                processingScheduled = false;                q.process();            });        }        if (rejectOnError || !callback) {            return new Promise((resolve, reject) => {                res = resolve;                rej = reject;            });        }    }    function _createCB(tasks) {        return function (err, ...args) {            numRunning -= 1;            for (var i = 0, l = tasks.length; i < l; i++) {                var task = tasks[i];                var index = workersList.indexOf(task);                if (index === 0) {                    workersList.shift();                } else if (index > 0) {                    workersList.splice(index, 1);                }                task.callback(err, ...args);                if (err != null) {                    trigger('error', err, task.data);                }            }            if (numRunning <= q.concurrency - q.buffer) {                trigger('unsaturated');            }            if (q.idle()) {                trigger('drain');            }            q.process();        };    }    function _maybeDrain(data) {        if (data.length === 0 && q.idle()) {            // call drain immediately if there are no tasks            (0, _setImmediate2.default)(() => trigger('drain'));            return true;        }        return false;    }    const eventMethod = name => handler => {        if (!handler) {            return new Promise((resolve, reject) => {                once(name, (err, data) => {                    if (err) return reject(err);                    resolve(data);                });            });        }        off(name);        on(name, handler);    };    var isProcessing = false;    var q = {        _tasks: new _DoublyLinkedList2.default(),        _createTaskItem(data, callback) {            return {                data,                callback            };        },        *[Symbol.iterator]() {            yield* q._tasks[Symbol.iterator]();        },        concurrency,        payload,        buffer: concurrency / 4,        started: false,        paused: false,        push(data, callback) {            if (Array.isArray(data)) {                if (_maybeDrain(data)) return;                return data.map(datum => _insert(datum, false, false, callback));            }            return _insert(data, false, false, callback);        },        pushAsync(data, callback) {            if (Array.isArray(data)) {                if (_maybeDrain(data)) return;                return data.map(datum => _insert(datum, false, true, callback));            }            return _insert(data, false, true, callback);        },        kill() {            off();            q._tasks.empty();        },        unshift(data, callback) {            if (Array.isArray(data)) {                if (_maybeDrain(data)) return;                return data.map(datum => _insert(datum, true, false, callback));            }            return _insert(data, true, false, callback);        },        unshiftAsync(data, callback) {            if (Array.isArray(data)) {                if (_maybeDrain(data)) return;                return data.map(datum => _insert(datum, true, true, callback));            }            return _insert(data, true, true, callback);        },        remove(testFn) {            q._tasks.remove(testFn);        },        process() {            // Avoid trying to start too many processing operations. This can occur            // when callbacks resolve synchronously (#1267).            if (isProcessing) {                return;            }            isProcessing = true;            while (!q.paused && numRunning < q.concurrency && q._tasks.length) {                var tasks = [],                    data = [];                var l = q._tasks.length;                if (q.payload) l = Math.min(l, q.payload);                for (var i = 0; i < l; i++) {                    var node = q._tasks.shift();                    tasks.push(node);                    workersList.push(node);                    data.push(node.data);                }                numRunning += 1;                if (q._tasks.length === 0) {                    trigger('empty');                }                if (numRunning === q.concurrency) {                    trigger('saturated');                }                var cb = (0, _onlyOnce2.default)(_createCB(tasks));                _worker(data, cb);            }            isProcessing = false;        },        length() {            return q._tasks.length;        },        running() {            return numRunning;        },        workersList() {            return workersList;        },        idle() {            return q._tasks.length + numRunning === 0;        },        pause() {            q.paused = true;        },        resume() {            if (q.paused === false) {                return;            }            q.paused = false;            (0, _setImmediate2.default)(q.process);        }    };    // define these as fixed properties, so people get useful errors when updating    Object.defineProperties(q, {        saturated: {            writable: false,            value: eventMethod('saturated')        },        unsaturated: {            writable: false,            value: eventMethod('unsaturated')        },        empty: {            writable: false,            value: eventMethod('empty')        },        drain: {            writable: false,            value: eventMethod('drain')        },        error: {            writable: false,            value: eventMethod('error')        }    });    return q;}module.exports = exports.default;
 |