123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104 |
- // /common/BlockingQueue.js
- /**
- * 一个异步阻塞队列,适用于uniapp和小程序环境。
- * 核心逻辑与原版完全相同,仅将私有字段 '#' 替换为下划线 '_' 以增强兼容性。
- */
- export default class BlockingQueue {
- // 使用下划线 '_' 代替 '#' 作为私有属性的约定
- _items = [];
- _waiters = []; // {resolve, reject, min, timer, onTimeout}
- /* 空队列一次性闸门 */
- _emptyPromise = null;
- _emptyResolve = null;
- /* 生产者:把数据塞进去 */
- enqueue(item, ...restItems) {
- if (restItems.length === 0) {
- this._items.push(item);
- }
- // 如果有额外参数,批量处理所有项
- else {
- const items = [item, ...restItems].filter(i => i);
- if (items.length === 0) return;
- this._items.push(...items);
- }
- // 若有空队列闸门,一次性放行所有等待者
- if (this._emptyResolve) {
- this._emptyResolve();
- this._emptyResolve = null;
- this._emptyPromise = null;
- }
- // 唤醒所有正在等的 waiter
- this._wakeWaiters();
- }
- /* 消费者:min 条或 timeout ms 先到谁 */
- async dequeue(min = 1, timeout = Infinity, onTimeout = null) {
- // 1. 若空,等第一次数据到达(所有调用共享同一个 promise)
- if (this._items.length === 0) {
- await this._waitForFirstItem();
- }
- // 立即满足
- if (this._items.length >= min) {
- return this._flush();
- }
- // 需要等待
- return new Promise((resolve, reject) => {
- let timer = null;
- const waiter = { resolve, reject, min, onTimeout, timer };
- // 超时逻辑
- if (Number.isFinite(timeout)) {
- waiter.timer = setTimeout(() => {
- this._removeWaiter(waiter);
- if (onTimeout) onTimeout(this._items.length);
- resolve(this._flush());
- }, timeout);
- }
- this._waiters.push(waiter);
- });
- }
- /* 内部方法:同样使用 '_' 前缀 */
- _waitForFirstItem() {
- if (!this._emptyPromise) {
- this._emptyPromise = new Promise(r => (this._emptyResolve = r));
- }
- return this._emptyPromise;
- }
- _wakeWaiters() {
- for (let i = this._waiters.length - 1; i >= 0; i--) {
- const w = this._waiters[i];
- if (this._items.length >= w.min) {
- this._removeWaiter(w);
- w.resolve(this._flush());
- }
- }
- }
- _removeWaiter(waiter) {
- const idx = this._waiters.indexOf(waiter);
- if (idx !== -1) {
- this._waiters.splice(idx, 1);
- if (waiter.timer) clearTimeout(waiter.timer);
- }
- }
- _flush() {
- const snapshot = [...this._items];
- this._items.length = 0;
- return snapshot;
- }
- /* 当前缓存长度(不含等待者) */
- get length() {
- return this._items.length;
- }
- }
|