如何在 JS/TS 中实现伪阻塞异步队列?

Posted

技术标签:

【中文标题】如何在 JS/TS 中实现伪阻塞异步队列?【英文标题】:How to implement a pseudo blocking async queue in JS/TS? 【发布时间】:2018-04-19 19:48:11 【问题描述】:

所以这里有一个矛盾:我想在 javascript/typescript 中创建一个异步阻塞队列(如果你可以在没有 typescript 的情况下实现它,那很好)。基本上我想实现类似 Java 的 BlockingQueue expect 之类的东西,而不是它实际上是阻塞的,它是异步的,我可以等待出队。

这是我要实现的接口:

interface AsyncBlockingQueue<T> 
  enqueue(t: T): void;
  dequeue(): Promise<T>;

我会这样使用它:

// enqueue stuff somewhere else

async function useBlockingQueue() 
  // as soon as something is enqueued, the promise will be resolved:
  const value = await asyncBlockingQueue.dequeue();
  // this will cause it to await for a second value
  const secondValue = await asyncBlockingQueue.dequeue();

有什么想法吗?

【问题讨论】:

【参考方案1】:

实际上很简单,dequeue 将创建一个 enqueue 将解决的承诺。我们只需要将解析器保持在队列中 - 并且还要关心值在出队之前入队的情况,将已经实现的承诺保持在队列中。

class AsyncBlockingQueue 
  constructor() 
    // invariant: at least one of the arrays is empty
    this.resolvers = [];
    this.promises = [];
  
  _add() 
    this.promises.push(new Promise(resolve => 
      this.resolvers.push(resolve);
    ));
  
  enqueue(t) 
    // if (this.resolvers.length) this.resolvers.shift()(t);
    // else this.promises.push(Promise.resolve(t));
    if (!this.resolvers.length) this._add();
    this.resolvers.shift()(t);
  
  dequeue() 
    if (!this.promises.length) this._add();
    return this.promises.shift();
  
  // now some utilities:
  isEmpty()  // there are no values available
    return !this.promises.length; // this.length <= 0
  
  isBlocked()  // it's waiting for values
    return !!this.resolvers.length; // this.length < 0
  
  get length() 
    return this.promises.length - this.resolvers.length;
  
  [Symbol.asyncIterator]() 
    // Todo: Use AsyncIterator.from()
    return 
      next: () => this.dequeue().then(value => (done: false, value)),
      [Symbol.asyncIterator]()  return this; ,
    ;
  

我不知道 TypeScript,但想必添加必要的类型注释很简单。

为了获得更好的性能,请使用带有循环缓冲区而不是普通数组的 Queue 实现,例如this one。您也可以只使用一个队列并记住您当前存储的是 Promise 还是解析器。

【讨论】:

我要感谢 *** 让我添加更多 ! 以达到最少字符数 @Bergi 在这个解决方案中,为什么enqueue 中有检查if (!this.resolvers.length)?该逻辑似乎对 Promise(和解析器)的创建进行了编码,然后立即删除了解析器。我错过了什么?解析器数组的长度是否曾经超过 1? @52d6c6af 当dequeue() 调用多于enqueue() 调用时,解析器的长度大于0,当enqueue() 调用多于@ 时,promise 的长度大于0 987654332@ 来电。 @52d6c6af 不,我认为这与我知道的 the trampolines 没有任何关系【参考方案2】:

这只是@Bergi 的回答,但使用 typescript + 泛型并进行了一些修改,以使其在我的 typescript 窥视的严格模式下工作。

class AsyncBlockingQueue<T> 
  private _promises: Promise<T>[];
  private _resolvers: ((t: T) => void)[];

  constructor() 
    this._resolvers = [];
    this._promises = [];
  

  private _add() 
    this._promises.push(new Promise(resolve => 
      this._resolvers.push(resolve);
    ));
  

  enqueue(t: T) 
    if (!this._resolvers.length) this._add();
    const resolve = this._resolvers.shift()!;
    resolve(t);
  

  dequeue() 
    if (!this._promises.length) this._add();
    const promise = this._promises.shift()!;
    return promise;
  

  isEmpty() 
    return !this._promises.length;
  

  isBlocked() 
    return !!this._resolvers.length;
  

  get length() 
    return this._promises.length - this._resolvers.length;
  

【讨论】:

"should never occur" 真的是 "can never occur" - 有一个检查会在数组之前为空时添加元素被删除:-) @Bergi 已更新以匹配。打字稿类型 Array.prototype.shift 为 &lt;T&gt;() =&gt; T | undefined 所以我不得不从类型中删除 undefined 。不过我看到了你的支票:)

以上是关于如何在 JS/TS 中实现伪阻塞异步队列?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 asyncio 中使用阻塞函数

系统学习消息队列分享(十) 如何实现高性能的异步网络传输?

深入理解消息队列:如何实现高性能的异步网络传输?

redis实现异步队列

通过阻塞队列实现生产者和消费者异步解耦

Swoole 的异步 Task 任务详解