// /common/BlockingQueue.js /** * 一个异步阻塞队列,适用于uniapp和小程序环境。 * 核心逻辑与原版完全相同,仅将私有字段 '#' 替换为下划线 '_' 以增强兼容性。 */ export default class BlockingQueue { // 使用下划线 '_' 代替 '#' 作为私有属性的约定 _items = []; _waiters = []; // {resolve, reject, min, timer, onTimeout} /* 空队列一次性闸门 */ _emptyPromise = null; _emptyResolve = null; /* 生产者:把数据塞进去 */ enqueue(item, ...restItems) { if (restItems.length === 0) { this._items.push(item); } // 如果有额外参数,批量处理所有项 else { const items = [item, ...restItems].filter(i => i); if (items.length === 0) return; this._items.push(...items); } // 若有空队列闸门,一次性放行所有等待者 if (this._emptyResolve) { this._emptyResolve(); this._emptyResolve = null; this._emptyPromise = null; } // 唤醒所有正在等的 waiter this._wakeWaiters(); } /* 消费者:min 条或 timeout ms 先到谁 */ async dequeue(min = 1, timeout = Infinity, onTimeout = null) { // 1. 若空,等第一次数据到达(所有调用共享同一个 promise) if (this._items.length === 0) { await this._waitForFirstItem(); } // 立即满足 if (this._items.length >= min) { return this._flush(); } // 需要等待 return new Promise((resolve, reject) => { let timer = null; const waiter = { resolve, reject, min, onTimeout, timer }; // 超时逻辑 if (Number.isFinite(timeout)) { waiter.timer = setTimeout(() => { this._removeWaiter(waiter); if (onTimeout) onTimeout(this._items.length); resolve(this._flush()); }, timeout); } this._waiters.push(waiter); }); } /* 内部方法:同样使用 '_' 前缀 */ _waitForFirstItem() { if (!this._emptyPromise) { this._emptyPromise = new Promise(r => (this._emptyResolve = r)); } return this._emptyPromise; } _wakeWaiters() { for (let i = this._waiters.length - 1; i >= 0; i--) { const w = this._waiters[i]; if (this._items.length >= w.min) { this._removeWaiter(w); w.resolve(this._flush()); } } } _removeWaiter(waiter) { const idx = this._waiters.indexOf(waiter); if (idx !== -1) { this._waiters.splice(idx, 1); if (waiter.timer) clearTimeout(waiter.timer); } } _flush() { const snapshot = [...this._items]; this._items.length = 0; return snapshot; } /* 当前缓存长度(不含等待者) */ get length() { return this._items.length; } }