JS/TS 中使用 async/await 的异步有界队列
Posted
技术标签:
【中文标题】JS/TS 中使用 async/await 的异步有界队列【英文标题】:Asynchronous Bounded Queue in JS/TS using async/await 【发布时间】:2018-10-27 04:43:48 【问题描述】:我正试图绕过async/await
,我有以下代码:
class AsyncQueue<T>
queue = Array<T>()
maxSize = 1
async enqueue(x: T)
if (this.queue.length > this.maxSize)
// Block until available
this.queue.unshift(x)
async dequeue()
if (this.queue.length == 0)
// Block until available
return this.queue.pop()!
async function produce<T>(q: AsyncQueue, x: T)
await q.enqueue(x)
async function consume<T>(q: AsyncQueue): T
return await q.dequeue()
// Expecting 3 4 in the console
(async () =>
const q = new AsyncQueue<number>()
consume(q).then(console.log)
consume(q).then(console.log)
produce(q, 3)
produce(q, 4)
consume(q).then(console.log)
consume(q).then(console.log)
)()
当然,我的问题在于代码的“阻塞直到可用”部分。我期望能够“停止”执行直到发生某些事情(例如,出队停止直到存在入队,反之亦然,因为有可用空间)。我觉得我可能需要为此使用协程,但我真的想确保我不会在这里错过任何 async/await
魔法。
【问题讨论】:
你不想想要block
,那会冻结脚本——你应该有enqueue
和dequeue
await
承诺一旦解决,无论他们'正在等待可用。此外,您应该使用()
调用构造函数
看来我正在将async/await
游戏推得越来越远,我不清楚这一切将如何解决。
看看可能重复的How to implement a pseudo blocking async queue in JS/TS?
不,你不需要协程,你只需要 new Promise
构造函数来等待外部发生的事情。但是,您正在实现协程,CSP-style。
【参考方案1】:
2019 年 4 月 17 日更新: 长话短说,下面的 AsyncSemaphore 实现中有一个错误,它是使用 property-based 测试捕获的。 You can read all about this "tale" here。这是固定版本:
class AsyncSemaphore
private promises = Array<() => void>()
constructor(private permits: number)
signal()
this.permits += 1
if (this.promises.length > 0) this.promises.pop()!()
async wait()
this.permits -= 1
if (this.permits < 0 || this.promises.length > 0)
await new Promise(r => this.promises.unshift(r))
最后,经过相当大的努力,并受到@Titian 回答的启发,我想我解决了这个问题。代码中充满了调试消息,但它可能用于教学关于控制流的目的:
class AsyncQueue<T>
waitingEnqueue = new Array<() => void>()
waitingDequeue = new Array<() => void>()
enqueuePointer = 0
dequeuePointer = 0
queue = Array<T>()
maxSize = 1
trace = 0
async enqueue(x: T)
this.trace += 1
const localTrace = this.trace
if ((this.queue.length + 1) > this.maxSize || this.waitingDequeue.length > 0)
console.debug(`[$localTrace] Producer Waiting`)
this.dequeuePointer += 1
await new Promise(r => this.waitingDequeue.unshift(r))
this.waitingDequeue.pop()
console.debug(`[$localTrace] Producer Ready`)
this.queue.unshift(x)
console.debug(`[$localTrace] Enqueueing $x Queue is now [$this.queue.join(', ')]`)
if (this.enqueuePointer > 0)
console.debug(`[$localTrace] Notify Consumer`)
this.waitingEnqueue[this.enqueuePointer-1]()
this.enqueuePointer -= 1
async dequeue()
this.trace += 1
const localTrace = this.trace
console.debug(`[$localTrace] Queue length before pop: $this.queue.length`)
if (this.queue.length == 0 || this.waitingEnqueue.length > 0)
console.debug(`[$localTrace] Consumer Waiting`)
this.enqueuePointer += 1
await new Promise(r => this.waitingEnqueue.unshift(r))
this.waitingEnqueue.pop()
console.debug(`[$localTrace] Consumer Ready`)
const x = this.queue.pop()!
console.debug(`[$localTrace] Queue length after pop: $this.queue.length Popping $x`)
if (this.dequeuePointer > 0)
console.debug(`[$localTrace] Notify Producer`)
this.waitingDequeue[this.dequeuePointer - 1]()
this.dequeuePointer -= 1
return x
更新:这是一个使用 AsyncSemaphore
的干净版本,它真正封装了通常使用并发原语完成的方式,但适应了异步 CPS 单线程事件循环™ 风格的 javascript 与 async/await
。可以看到AsyncQueue
的逻辑变得直观了很多,通过Promises的双重同步委托给了两个semaphores:
class AsyncSemaphore
private promises = Array<() => void>()
constructor(private permits: number)
signal()
this.permits += 1
if (this.promises.length > 0) this.promises.pop()()
async wait()
if (this.permits == 0 || this.promises.length > 0)
await new Promise(r => this.promises.unshift(r))
this.permits -= 1
class AsyncQueue<T>
private queue = Array<T>()
private waitingEnqueue: AsyncSemaphore
private waitingDequeue: AsyncSemaphore
constructor(readonly maxSize: number)
this.waitingEnqueue = new AsyncSemaphore(0)
this.waitingDequeue = new AsyncSemaphore(maxSize)
async enqueue(x: T)
await this.waitingDequeue.wait()
this.queue.unshift(x)
this.waitingEnqueue.signal()
async dequeue()
await this.waitingEnqueue.wait()
this.waitingDequeue.signal()
return this.queue.pop()!
更新 2: 上面的代码中似乎隐藏了一个微妙的错误,当尝试使用大小为 0 的 AsyncQueue
时,这一点变得很明显。语义确实有意义:它是没有任何缓冲区的队列,发布者总是等待消费者存在。阻止它工作的行是:
await this.waitingEnqueue.wait()
this.waitingDequeue.signal()
如果您仔细观察,您会发现dequeue()
与enqueue()
并不完全对称。事实上,如果交换这两条指令的顺序:
this.waitingDequeue.signal()
await this.waitingEnqueue.wait()
然后一切都恢复正常;在我看来,在实际等待enqueuing
发生之前,我们会发出信号表明对dequeuing()
感兴趣。
如果没有经过广泛的测试,我仍然不确定这不会重新引入细微的错误。我将把它作为一个挑战;)
【讨论】:
很高兴看到您对答案有所帮助。我赶上了其他事情,没有时间调试它。我删除了原来的答案,因为它正在投票,当之无愧。以上是关于JS/TS 中使用 async/await 的异步有界队列的主要内容,如果未能解决你的问题,请参考以下文章
js异步回调Async/Await与Promise区别 新学习使用Async/Await