如何将 asyncio 与现有的阻塞库一起使用?
Posted
技术标签:
【中文标题】如何将 asyncio 与现有的阻塞库一起使用?【英文标题】:How to use asyncio with existing blocking library? 【发布时间】:2017-04-25 02:29:56 【问题描述】:我有几个阻塞函数foo
、bar
并且我无法更改它们(一些我无法控制的内部库。与一个或多个网络服务通信)。我如何将它用作异步?例如。我不想做以下事情。
results = []
for inp in inps:
val = foo(inp)
result = bar(val)
results.append(result)
这将是低效的,因为我可以在等待第一个输入时调用 foo
来获取第二个输入,而对于 bar
则相同。如何包装它们以便它们可以与 asyncio 一起使用(即新的async
、await
语法)?
假设函数是可重入的。即当之前的foo
已经在处理时,再次调用foo
是可以的。
更新
使用可重复使用的装饰器扩展答案。比如点击here。
def run_in_executor(f):
@functools.wraps(f)
def inner(*args, **kwargs):
loop = asyncio.get_running_loop()
return loop.run_in_executor(None, functools.partial(f, *args, **kwargs))
return inner
【问题讨论】:
你试过看*** Documentation on asyncio吗?转到异步执行器。 *** 文档已关闭 【参考方案1】:这里有(有点)两个问题:第一,如何异步运行阻塞代码,第二,如何并发运行异步代码(asyncio 是单线程的,所以 GIL 仍然适用,所以不是 真正平行,但我离题了)。
可以使用 asyncio.ensure_future 创建并发任务,如 here 所述。
要运行同步代码,您需要run the blocking code in an executor。示例:
import concurrent.futures
import asyncio
import time
def blocking(delay):
time.sleep(delay)
print('Completed.')
async def non_blocking(loop, executor):
# Run three of the blocking tasks concurrently. asyncio.wait will
# automatically wrap these in Tasks. If you want explicit access
# to the tasks themselves, use asyncio.ensure_future, or add a
# "done, pending = asyncio.wait..." assignment
await asyncio.wait(
fs=
# Returns after delay=12 seconds
loop.run_in_executor(executor, blocking, 12),
# Returns after delay=14 seconds
loop.run_in_executor(executor, blocking, 14),
# Returns after delay=16 seconds
loop.run_in_executor(executor, blocking, 16)
,
return_when=asyncio.ALL_COMPLETED
)
loop = asyncio.get_event_loop()
executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
loop.run_until_complete(non_blocking(loop, executor))
如果您想使用 for 循环(如您的示例)安排这些任务,您有几种不同的策略,但基本方法是使用 for 循环(或列表)安排任务理解等),用 asyncio.wait 等待它们,然后然后检索结果。示例:
done, pending = await asyncio.wait(
fs=[loop.run_in_executor(executor, blocking_foo, *args) for args in inps],
return_when=asyncio.ALL_COMPLETED
)
# Note that any errors raise during the above will be raised here; to
# handle errors you will need to call task.exception() and check if it
# is not None before calling task.result()
results = [task.result() for task in done]
【讨论】:
asyncio.ensure_future 在这种情况下不是强制性的,因为asyncio.wait 已经将协程包装在任务中。 好提示!尽管对于一个不太玩具的例子, ensure_future 可能仍然对 ex 有用。取消(不考虑待处理,完成)。 “在执行程序中运行阻塞代码”链接中的锚点已过期 -- this one 目前似乎是最新的。 所以这真的不会并行吗?我怎样才能变成真正的并行代码?为什么 asyncio.sleep()。真的平行吗? @user1628688 这取决于您真的是指并行,还是实际上是并发。 See here 解释差异。 (旁注,我必须更正我原来的答案,因为我把它们倒在那里)。由于 GIL,大多数 cpython 代码不能并行,只能并发。这是语言运行时限制;这不是你可以避免低级的事情。此外,无论如何,异步/等待代码(AFAIK)本质上是单线程的。【参考方案2】:扩展已接受的答案以实际解决相关问题。
注意:需要python 3.7+
import functools
from urllib.request import urlopen
import asyncio
def legacy_blocking_function(): # You cannot change this function
r = urlopen("https://example.com")
return r.read().decode()
def run_in_executor(f):
@functools.wraps(f)
def inner(*args, **kwargs):
loop = asyncio.get_running_loop()
return loop.run_in_executor(None, lambda: f(*args, **kwargs))
return inner
@run_in_executor
def foo(arg): # Your wrapper for async use
resp = legacy_blocking_function()
return f"arglen(resp)"
@run_in_executor
def bar(arg): # Another wrapper
resp = legacy_blocking_function()
return f"len(resp)arg"
async def process_input(inp): # Modern async function (coroutine)
res = await foo(inp)
res = f"XXXresXXX"
return await bar(res)
async def main():
inputs = ["one", "two", "three"]
input_tasks = [asyncio.create_task(process_input(inp)) for inp in inputs]
print([await t for t in asyncio.as_completed(input_tasks)])
# This doesn't work as expected :(
# print([await t for t in asyncio.as_completed([process_input(inp) for inp in input_tasks])])
if __name__ == '__main__':
asyncio.run(main())
单击here 获取此示例的最新版本并发送拉取请求。
【讨论】:
我在寻找答案时遇到了这个问题。这个例子信息量很大,但并不完全正确。实际的包装函数应该包含async
/await
关键字。 def run_in_executor(f): @functools.wraps(f) async def inner(*args, **kwargs): loop = asyncio.get_running_loop() return await loop.run_in_executor(None, lambda: f(*args, **kwargs)) return inner
【参考方案3】:
import asyncio
from time import sleep
import logging
logging.basicConfig(
level=logging.DEBUG, format="%(asctime)s %(thread)s %(funcName)s %(message)s")
def long_task(t):
"""Simulate long IO bound task."""
logging.info("2. t: %s", t)
sleep(t)
logging.info("4. t: %s", t)
return t ** 2
async def main():
loop = asyncio.get_running_loop()
inputs = range(1, 5)
logging.info("1.")
futures = [loop.run_in_executor(None, long_task, i) for i in inputs]
logging.info("3.")
results = await asyncio.gather(*futures)
logging.info("5.")
for (i, result) in zip(inputs, results):
logging.info("6. Result: %s, %s", i, result)
if __name__ == "__main__":
asyncio.run(main())
输出:
2020-03-18 17:13:07,523 23964 main 1.
2020-03-18 17:13:07,524 5008 long_task 2. t: 1
2020-03-18 17:13:07,525 21232 long_task 2. t: 2
2020-03-18 17:13:07,525 22048 long_task 2. t: 3
2020-03-18 17:13:07,526 25588 long_task 2. t: 4
2020-03-18 17:13:07,526 23964 main 3.
2020-03-18 17:13:08,526 5008 long_task 4. t: 1
2020-03-18 17:13:09,526 21232 long_task 4. t: 2
2020-03-18 17:13:10,527 22048 long_task 4. t: 3
2020-03-18 17:13:11,527 25588 long_task 4. t: 4
2020-03-18 17:13:11,527 23964 main 5.
2020-03-18 17:13:11,528 23964 main 6. Result: 1, 1
2020-03-18 17:13:11,528 23964 main 6. Result: 2, 4
2020-03-18 17:13:11,529 23964 main 6. Result: 3, 9
2020-03-18 17:13:11,529 23964 main 6. Result: 4, 16
【讨论】:
以上是关于如何将 asyncio 与现有的阻塞库一起使用?的主要内容,如果未能解决你的问题,请参考以下文章
将 beginBitmapFill 与现有的影片剪辑/形状一起使用? (动画 CC)