BlockingQueue.js 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. // /common/BlockingQueue.js
  2. /**
  3. * 一个异步阻塞队列,适用于uniapp和小程序环境。
  4. * 核心逻辑与原版完全相同,仅将私有字段 '#' 替换为下划线 '_' 以增强兼容性。
  5. */
  6. export default class BlockingQueue {
  7. // 使用下划线 '_' 代替 '#' 作为私有属性的约定
  8. _items = [];
  9. _waiters = []; // {resolve, reject, min, timer, onTimeout}
  10. /* 空队列一次性闸门 */
  11. _emptyPromise = null;
  12. _emptyResolve = null;
  13. /* 生产者:把数据塞进去 */
  14. enqueue(item, ...restItems) {
  15. if (restItems.length === 0) {
  16. this._items.push(item);
  17. }
  18. // 如果有额外参数,批量处理所有项
  19. else {
  20. const items = [item, ...restItems].filter(i => i);
  21. if (items.length === 0) return;
  22. this._items.push(...items);
  23. }
  24. // 若有空队列闸门,一次性放行所有等待者
  25. if (this._emptyResolve) {
  26. this._emptyResolve();
  27. this._emptyResolve = null;
  28. this._emptyPromise = null;
  29. }
  30. // 唤醒所有正在等的 waiter
  31. this._wakeWaiters();
  32. }
  33. /* 消费者:min 条或 timeout ms 先到谁 */
  34. async dequeue(min = 1, timeout = Infinity, onTimeout = null) {
  35. // 1. 若空,等第一次数据到达(所有调用共享同一个 promise)
  36. if (this._items.length === 0) {
  37. await this._waitForFirstItem();
  38. }
  39. // 立即满足
  40. if (this._items.length >= min) {
  41. return this._flush();
  42. }
  43. // 需要等待
  44. return new Promise((resolve, reject) => {
  45. let timer = null;
  46. const waiter = { resolve, reject, min, onTimeout, timer };
  47. // 超时逻辑
  48. if (Number.isFinite(timeout)) {
  49. waiter.timer = setTimeout(() => {
  50. this._removeWaiter(waiter);
  51. if (onTimeout) onTimeout(this._items.length);
  52. resolve(this._flush());
  53. }, timeout);
  54. }
  55. this._waiters.push(waiter);
  56. });
  57. }
  58. /* 内部方法:同样使用 '_' 前缀 */
  59. _waitForFirstItem() {
  60. if (!this._emptyPromise) {
  61. this._emptyPromise = new Promise(r => (this._emptyResolve = r));
  62. }
  63. return this._emptyPromise;
  64. }
  65. _wakeWaiters() {
  66. for (let i = this._waiters.length - 1; i >= 0; i--) {
  67. const w = this._waiters[i];
  68. if (this._items.length >= w.min) {
  69. this._removeWaiter(w);
  70. w.resolve(this._flush());
  71. }
  72. }
  73. }
  74. _removeWaiter(waiter) {
  75. const idx = this._waiters.indexOf(waiter);
  76. if (idx !== -1) {
  77. this._waiters.splice(idx, 1);
  78. if (waiter.timer) clearTimeout(waiter.timer);
  79. }
  80. }
  81. _flush() {
  82. const snapshot = [...this._items];
  83. this._items.length = 0;
  84. return snapshot;
  85. }
  86. /* 当前缓存长度(不含等待者) */
  87. get length() {
  88. return this._items.length;
  89. }
  90. }