立即从同步代码执行异步回调

Posted

技术标签:

【中文标题】立即从同步代码执行异步回调【英文标题】:Execute async callback from synchronous code immediately 【发布时间】:2019-05-23 08:06:58 【问题描述】:

问题

我有一个库,目前不支持异步,需要从异步代码中调用。异步代码通过处理程序调用库(下面代码中的handler 函数)。在处理程序执行时,库会定期调用回调 (callback_wrapper) 以报告进度。

同步处理程序在ThreadPoolExecutor 中执行,以便主事件循环能够在处理程序运行时处理更多事件。

发生的情况是同步回调立即执行,但异步回调仅在主处理程序执行后才执行。期望的结果是立即执行的异步回调。

我猜事件循环在run_in_executor 调用时被阻塞了,但我不知道如何解决这个问题。

代码

import asyncio
import time
from concurrent.futures.thread import ThreadPoolExecutor

loop = asyncio.get_event_loop()


def handler():
    print('handler started')
    callback_wrapper()
    time.sleep(1)
    print('handler stopped')


async def callback():
    print('callback')


def callback_wrapper():
    print('callback wrapper started')
    asyncio.ensure_future(callback(), loop=loop)
    print('callback wrapper stopped')


async def main():
    handler()


with ThreadPoolExecutor() as pool:
    async def thread_handler():
        await loop.run_in_executor(pool, handler)


    loop.run_until_complete(main())

输出

handler started
callback wrapper started
callback wrapper stopped
handler stopped
callback

期望的输出

handler started
callback wrapper started
callback
callback wrapper stopped
handler stopped

【问题讨论】:

这行不通。你的 main() 只是名义上的协程 - 它不等待任何东西,它调用一个阻塞函数。您可能应该使用两个线程,一个运行 asyncio 事件循环,另一个运行阻塞代码。然后您可以使用asyncio.run_coroutine_threadsafe 将任务提交到事件循环并阻塞(您的线程)直到它们完成。 @user4815162342 谢谢,您的意见帮助我走上了正轨! 【参考方案1】:

感谢@user4815162342 的输入,我想出了以下解决方案:

import asyncio
import time
from concurrent.futures.thread import ThreadPoolExecutor

loop = asyncio.get_event_loop()


def handler():
    print('handler started')
    callback_wrapper()
    time.sleep(1)
    print('handler stopped')


async def callback():
    print('callback')


def callback_wrapper():
    print('callback wrapper started')
    asyncio.run_coroutine_threadsafe(callback(), loop).result()
    print('callback wrapper stopped')


async def main():
    await thread_handler()


with ThreadPoolExecutor() as pool:
    async def thread_handler():
        await loop.run_in_executor(pool, handler)


    loop.run_until_complete(main())

产生期望的结果:

handler started
callback wrapper started
callback
callback wrapper stopped
handler stopped

【讨论】:

我认为您在调用run_coroutine_threadsafe 时还需要调用.result()。 (它返回一个 concurrent.futures.Future 对象,该对象提供了一个阻塞 result() 方法。)此外,您的 thread_handler 协程似乎没有在任何地方使用。 @user4815162342 你是对的,我已经更新了上面的代码 看起来不错,我相信应该可以正常工作。您甚至可以使用concurrent.futures.wait() 等启动多个协程并en masse 等待它们。

以上是关于立即从同步代码执行异步回调的主要内容,如果未能解决你的问题,请参考以下文章

JavaScript 异步操作之回调函数

同步/异步 异步回调 协成 线程队列

如何识别回调是同步执行还是异步执行? [复制]

如何识别回调是同步执行还是异步执行? [复制]

Android,Kotlin妙用suspendCancellableCoroutine,同步执行异步回调代码

同步调用,异步回调和 Future 模式