立即从同步代码执行异步回调
Posted
技术标签:
【中文标题】立即从同步代码执行异步回调【英文标题】:Execute async callback from synchronous code immediately 【发布时间】:2019-05-23 08:06:58 【问题描述】:问题
我有一个库,目前不支持异步,需要从异步代码中调用。异步代码通过处理程序调用库(下面代码中的handler
函数)。在处理程序执行时,库会定期调用回调 (callback_wrapper
) 以报告进度。
同步处理程序在ThreadPoolExecutor
中执行,以便主事件循环能够在处理程序运行时处理更多事件。
发生的情况是同步回调立即执行,但异步回调仅在主处理程序执行后才执行。期望的结果是立即执行的异步回调。
我猜事件循环在run_in_executor
调用时被阻塞了,但我不知道如何解决这个问题。
代码
import asyncio
import time
from concurrent.futures.thread import ThreadPoolExecutor
loop = asyncio.get_event_loop()
def handler():
print('handler started')
callback_wrapper()
time.sleep(1)
print('handler stopped')
async def callback():
print('callback')
def callback_wrapper():
print('callback wrapper started')
asyncio.ensure_future(callback(), loop=loop)
print('callback wrapper stopped')
async def main():
handler()
with ThreadPoolExecutor() as pool:
async def thread_handler():
await loop.run_in_executor(pool, handler)
loop.run_until_complete(main())
输出
handler started
callback wrapper started
callback wrapper stopped
handler stopped
callback
期望的输出
handler started
callback wrapper started
callback
callback wrapper stopped
handler stopped
【问题讨论】:
这行不通。你的main()
只是名义上的协程 - 它不等待任何东西,它调用一个阻塞函数。您可能应该使用两个线程,一个运行 asyncio 事件循环,另一个运行阻塞代码。然后您可以使用asyncio.run_coroutine_threadsafe
将任务提交到事件循环并阻塞(您的线程)直到它们完成。
@user4815162342 谢谢,您的意见帮助我走上了正轨!
【参考方案1】:
感谢@user4815162342 的输入,我想出了以下解决方案:
import asyncio
import time
from concurrent.futures.thread import ThreadPoolExecutor
loop = asyncio.get_event_loop()
def handler():
print('handler started')
callback_wrapper()
time.sleep(1)
print('handler stopped')
async def callback():
print('callback')
def callback_wrapper():
print('callback wrapper started')
asyncio.run_coroutine_threadsafe(callback(), loop).result()
print('callback wrapper stopped')
async def main():
await thread_handler()
with ThreadPoolExecutor() as pool:
async def thread_handler():
await loop.run_in_executor(pool, handler)
loop.run_until_complete(main())
产生期望的结果:
handler started
callback wrapper started
callback
callback wrapper stopped
handler stopped
【讨论】:
我认为您在调用run_coroutine_threadsafe
时还需要调用.result()
。 (它返回一个 concurrent.futures.Future
对象,该对象提供了一个阻塞 result()
方法。)此外,您的 thread_handler
协程似乎没有在任何地方使用。
@user4815162342 你是对的,我已经更新了上面的代码
看起来不错,我相信应该可以正常工作。您甚至可以使用concurrent.futures.wait()
等启动多个协程并en masse 等待它们。以上是关于立即从同步代码执行异步回调的主要内容,如果未能解决你的问题,请参考以下文章