JS/TS 中使用 async/await 的异步有界队列

Posted

技术标签:

【中文标题】JS/TS 中使用 async/await 的异步有界队列【英文标题】:Asynchronous Bounded Queue in JS/TS using async/await 【发布时间】:2018-10-27 04:43:48 【问题描述】:

我正试图绕过async/await,我有以下代码:

class AsyncQueue<T> 
    queue = Array<T>()
    maxSize = 1

    async enqueue(x: T) 
        if (this.queue.length > this.maxSize) 
            // Block until available
        

        this.queue.unshift(x)
    

    async dequeue() 
        if (this.queue.length == 0) 
            // Block until available
        

        return this.queue.pop()!
    


async function produce<T>(q: AsyncQueue, x: T) 
    await q.enqueue(x)


async function consume<T>(q: AsyncQueue): T 
    return await q.dequeue()


// Expecting 3 4 in the console
(async () => 
    const q = new AsyncQueue<number>()
    consume(q).then(console.log)
    consume(q).then(console.log)
    produce(q, 3)
    produce(q, 4)
    consume(q).then(console.log)
    consume(q).then(console.log)
)()

当然,我的问题在于代码的“阻塞直到可用”部分。我期望能够“停止”执行直到发生某些事情(例如,出队停止直到存在入队,反之亦然,因为有可用空间)。我觉得我可能需要为此使用协程,但我真的想确保我不会在这里错过任何 async/await 魔法。

【问题讨论】:

不想想要block,那会冻结脚本——你应该有enqueuedequeueawait承诺一旦解决,无论他们'正在等待可用。此外,您应该使用() 调用构造函数 看来我正在将async/await 游戏推得越来越远,我不清楚这一切将如何解决。 看看可能重复的How to implement a pseudo blocking async queue in JS/TS? 不,你不需要协程,你只需要 new Promise 构造函数来等待外部发生的事情。但是,您正在实现协程,CSP-style。 【参考方案1】:

2019 年 4 月 17 日更新: 长话短说,下面的 AsyncSemaphore 实现中有一个错误,它是使用 property-based 测试捕获的。 You can read all about this "tale" here。这是固定版本:

class AsyncSemaphore 
    private promises = Array<() => void>()

    constructor(private permits: number) 

    signal() 
        this.permits += 1
        if (this.promises.length > 0) this.promises.pop()!()
    

    async wait() 
        this.permits -= 1
        if (this.permits < 0 || this.promises.length > 0)
            await new Promise(r => this.promises.unshift(r))
    


最后,经过相当大的努力,并受到@Titian 回答的启发,我想我解决了这个问题。代码中充满了调试消息,但它可能用于教学关于控制流的目的:

class AsyncQueue<T> 
    waitingEnqueue = new Array<() => void>()
    waitingDequeue = new Array<() => void>()
    enqueuePointer = 0
    dequeuePointer = 0
    queue = Array<T>()
    maxSize = 1
    trace = 0

    async enqueue(x: T) 
        this.trace += 1
        const localTrace = this.trace

        if ((this.queue.length + 1) > this.maxSize || this.waitingDequeue.length > 0) 
            console.debug(`[$localTrace] Producer Waiting`)
            this.dequeuePointer += 1
            await new Promise(r => this.waitingDequeue.unshift(r))
            this.waitingDequeue.pop()
            console.debug(`[$localTrace] Producer Ready`)
        

        this.queue.unshift(x)
        console.debug(`[$localTrace] Enqueueing $x Queue is now [$this.queue.join(', ')]`)

        if (this.enqueuePointer > 0) 
            console.debug(`[$localTrace] Notify Consumer`)
            this.waitingEnqueue[this.enqueuePointer-1]()
            this.enqueuePointer -= 1
        
    

    async dequeue() 
        this.trace += 1
        const localTrace = this.trace

        console.debug(`[$localTrace] Queue length before pop: $this.queue.length`)

        if (this.queue.length == 0 || this.waitingEnqueue.length > 0) 
            console.debug(`[$localTrace] Consumer Waiting`)
            this.enqueuePointer += 1
            await new Promise(r => this.waitingEnqueue.unshift(r))
            this.waitingEnqueue.pop()
            console.debug(`[$localTrace] Consumer Ready`)
        

        const x = this.queue.pop()!
        console.debug(`[$localTrace] Queue length after pop: $this.queue.length Popping $x`)

        if (this.dequeuePointer > 0) 
            console.debug(`[$localTrace] Notify Producer`)
            this.waitingDequeue[this.dequeuePointer - 1]()
            this.dequeuePointer -= 1
        

        return x
    


更新:这是一个使用 AsyncSemaphore 的干净版本,它真正封装了通常使用并发原语完成的方式,但适应了异步 CPS 单线程事件循环™ 风格的 javascriptasync/await。可以看到AsyncQueue的逻辑变得直观了很多,通过Promises的双重同步委托给了两个semaphores:

class AsyncSemaphore 
    private promises = Array<() => void>()

    constructor(private permits: number) 

    signal() 
        this.permits += 1
        if (this.promises.length > 0) this.promises.pop()()
    

    async wait() 
        if (this.permits == 0 || this.promises.length > 0)
            await new Promise(r => this.promises.unshift(r))
        this.permits -= 1
    


class AsyncQueue<T> 
    private queue = Array<T>()
    private waitingEnqueue: AsyncSemaphore
    private waitingDequeue: AsyncSemaphore

    constructor(readonly maxSize: number) 
        this.waitingEnqueue = new AsyncSemaphore(0)
        this.waitingDequeue = new AsyncSemaphore(maxSize)
    

    async enqueue(x: T) 
        await this.waitingDequeue.wait()
        this.queue.unshift(x)
        this.waitingEnqueue.signal()
    

    async dequeue() 
        await this.waitingEnqueue.wait()
        this.waitingDequeue.signal()
        return this.queue.pop()!
    


更新 2: 上面的代码中似乎隐藏了一个微妙的错误,当尝试使用大小为 0 的 AsyncQueue 时,这一点变得很明显。语义确实有意义:它是没有任何缓冲区的队列,发布者总是等待消费者存在。阻止它工作的行是:

await this.waitingEnqueue.wait()
this.waitingDequeue.signal()

如果您仔细观察,您会发现dequeue()enqueue() 并不完全对称。事实上,如果交换这两条指令的顺序:

this.waitingDequeue.signal()
await this.waitingEnqueue.wait()

然后一切都恢复正常;在我看来,在实际等待enqueuing 发生之前,我们会发出信号表明对dequeuing() 感兴趣。

如果没有经过广泛的测试,我仍然不确定这不会重新引入细微的错误。我将把它作为一个挑战;)

【讨论】:

很高兴看到您对答案有所帮助。我赶上了其他事情,没有时间调试它。我删除了原来的答案,因为它正在投票,当之无愧。

以上是关于JS/TS 中使用 async/await 的异步有界队列的主要内容,如果未能解决你的问题,请参考以下文章

异步操作要了解的ES7的async/await

js异步回调Async/Await与Promise区别 新学习使用Async/Await

异步编程之Async,Await和ConfigureAwait的关系

async / await对异步的处理

C#异步方法async/await的三种返回类型

async 和 await