如何在库函数中使用异步事件循环
Posted
技术标签:
【中文标题】如何在库函数中使用异步事件循环【英文标题】:How to use asyncio event loop in library function 【发布时间】:2017-07-10 10:53:24 【问题描述】:我正在尝试使用 asyncio 创建一个执行一些异步操作的函数,此函数的用户不需要知道 asyncio 是在幕后参与的。 我很难理解如何使用 asyncio API 完成此操作,因为大多数函数似乎都在使用 get_event_loop 访问的一些全局循环变量下运行,并且对此的调用受到此循环内的全局状态的影响。
我在这里有四个示例,其中两个(foo1 和 foo3)似乎是合理的用例,但它们都表现出非常奇怪的行为:
async def bar(loop):
# Disregard how simple this is, it's just for example
s = await asyncio.create_subprocess_exec("ls", loop=loop)
def foo1():
# Example1: Just use get_event_loop
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait_for(bar(loop), 1000))
# On exit this is written to stderr:
# Exception ignored in: <bound method BaseEventLoop.__del__ of <_UnixSelectorEventLoop running=False closed=True debug=False>>
# Traceback (most recent call last):
# File "/usr/lib/python3.5/asyncio/base_events.py", line 510, in __del__
# File "/usr/lib/python3.5/asyncio/unix_events.py", line 65, in close
# File "/usr/lib/python3.5/asyncio/unix_events.py", line 146, in remove_signal_handler
# File "/usr/lib/python3.5/signal.py", line 47, in signal
# TypeError: signal handler must be signal.SIG_IGN, signal.SIG_DFL, or a callable object
def foo2():
# Example2: Use get_event_loop and close it when done
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait_for(bar(loop), 1000)) # RuntimeError: Event loop is closed --- if foo2() is called twice
loop.close()
def foo3():
# Example3: Always use new_event_loop
loop = asyncio.new_event_loop()
loop.run_until_complete(asyncio.wait_for(bar(loop), 1000)) #RuntimeError: Cannot add child handler, the child watcher does not have a loop attached
loop.close()
def foo4():
# Example4: Same as foo3 but also set_event_loop to the newly created one
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) # Polutes global event loop, callers of foo4 does not expect this.
loop.run_until_complete(asyncio.wait_for(bar(loop), 1000)) # OK
loop.close()
这些功能都不起作用,我没有看到任何其他明显的方法来做到这一点,应该如何使用 asyncio?似乎它只是在假设应用程序的入口点是您可以创建和关闭全局循环的唯一地方的情况下使用的。我必须摆弄事件循环策略吗?
foo3 似乎是正确的解决方案,但即使我明确地传递循环,我也会得到一个错误,因为在 create_subprocess_exec 的深处,它正在使用当前策略来获得一个新的循环,即无,这是 asyncio 子进程中的错误吗?
我在 Ubuntu 上使用 Python 3.5.3。
【问题讨论】:
你应该用 async 包装并等待所有 foo 函数中的函数,就像你对 bar 所做的那样。 你永远不应该在你没有创建的对象上调用close()
。您正在关闭一个不属于您的全局事件循环。只是不要那样做。
在 example1 中我没有关闭循环,但这让 foo1 的用户有义务关闭循环。我的库的用户不需要知道涉及到 asyncio。
【参考方案1】:
foo1 错误是因为您没有关闭事件循环,请参阅此issue。
foo2 因为你不能重复使用封闭的事件循环。
foo3,因为您没有将新事件循环设置为全局。
foo4 几乎是你想要的,你剩下要做的就是存储旧的事件循环并在 bar 执行后将其设置回全局:
import asyncio
async def bar():
# After you set new event loop global,
# there's no need to pass loop as param to bar or anywhere else.
process = await asyncio.create_subprocess_exec("ls")
await process.communicate()
def sync_exec(coro): # foo5
old_loop = asyncio.get_event_loop()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(coro)
finally:
loop.close()
asyncio.set_event_loop(old_loop)
sync_exec(asyncio.wait_for(bar(), 1000))
更重要的一点:不清楚为什么要在某些同步函数后面隐藏使用 asyncio,但通常这是个坏主意。一个全局事件循环的全部内容是允许用户在这个单一事件循环中运行不同的并发作业。你正试图消除这种可能性。我认为你应该重新考虑这个决定。
【讨论】:
不错的尝试,但它与 foo1() 存在相同的问题,当程序退出时,您会收到一些被忽略的异常写入 stderr,因为 old_loop 永远不会关闭。 我隐藏它的原因是因为我有多个 I/O 绑定的异步任务,我想使用 asyncio.gather 合并到一个结果中,我想将所有这些子任务作为 asyncio 运行但是如果是同步的,最终的操作作为一个整体并不重要。也许这是错误的心态,我甚至应该将最终结果公开为异步函数,尽管我认为我的大多数用户都会对这一切感到困惑,他们不想建立自己的事件循环,而只是想要一个只同步给出结果的便利函数。same problem as foo1()
- 抱歉,我无法在 Linux 上进行测试,对吗?您从我的答案中尝试了代码,但它给了您错误?如果是这样,看起来问题更多的是关于create_subprocess_exec
,而不是关于事件循环操作。你可以试试this之类的东西。这可能会在 Python 3.7 或更高版本中得到修复,请参阅this PR。
i think most of my users ... dont want to set up their own event loop
- 没有什么能阻止您提供同步执行工作的功能,但同时我认为它至少不应该是 only 选项。您的一些用户可能希望并行运行您的任务和他们自己的任务。为了实现它,您可能还应该提供协程。
刚试过 3.6.1 并且 foo1() 工作。退出时没有任何内容打印到 stderr。【参考方案2】:
升级到 Python 3.6,然后 foo1() 将工作,无需显式关闭默认事件循环。
不是我希望的答案,因为我们只使用 3.5 :(
【讨论】:
以上是关于如何在库函数中使用异步事件循环的主要内容,如果未能解决你的问题,请参考以下文章