中断事件循环后清理的正确方法是啥?

Posted

技术标签:

【中文标题】中断事件循环后清理的正确方法是啥?【英文标题】:What's the correct way to clean up after an interrupted event loop?中断事件循环后清理的正确方法是什么? 【发布时间】:2015-06-10 19:30:45 【问题描述】:

我有一个事件循环,它作为命令行工具的一部分运行一些协同例程。用户可以使用通常的 Ctrl + C 中断工具,此时我想在中断的事件循环之后正确清理。

这是我尝试过的。

import asyncio


@asyncio.coroutine
def shleepy_time(seconds):
    print("Shleeping for s seconds...".format(s=seconds))
    yield from asyncio.sleep(seconds)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    # Side note: Apparently, async() will be deprecated in 3.4.4.
    # See: https://docs.python.org/3.4/library/asyncio-task.html#asyncio.async
    tasks = [
        asyncio.async(shleepy_time(seconds=5)),
        asyncio.async(shleepy_time(seconds=10))
    ]

    try:
        loop.run_until_complete(asyncio.gather(*tasks))
    except KeyboardInterrupt as e:
        print("Caught keyboard interrupt. Canceling tasks...")

        # This doesn't seem to be the correct solution.
        for t in tasks:
            t.cancel()
    finally:
        loop.close()

运行这个并点击 Ctrl + C 产生:

$ python3 asyncio-keyboardinterrupt-example.py 
Shleeping for 5 seconds...
Shleeping for 10 seconds...
^CCaught keyboard interrupt. Canceling tasks...
Task was destroyed but it is pending!
task: <Task pending coro=<shleepy_time() running at asyncio-keyboardinterrupt-example.py:7> wait_for=<Future cancelled> cb=[gather.<locals>._done_callback(1)() at /usr/local/Cellar/python3/3.4.3/Frameworks/Python.framework/Versions/3.4/lib/python3.4/asyncio/tasks.py:587]>
Task was destroyed but it is pending!
task: <Task pending coro=<shleepy_time() running at asyncio-keyboardinterrupt-example.py:7> wait_for=<Future cancelled> cb=[gather.<locals>._done_callback(0)() at /usr/local/Cellar/python3/3.4.3/Frameworks/Python.framework/Versions/3.4/lib/python3.4/asyncio/tasks.py:587]>

显然,我没有正确清理。我想也许在任务上调用cancel() 是一种方法。

事件循环中断后清理的正确方法是什么?

【问题讨论】:

以防万一,我在 OS X 10.10.3 上运行 Python 3.4.3。 【参考方案1】:

当您按 CTRL+C 时,事件循环会停止,因此您对 t.cancel() 的调用实际上不会生效。对于要取消的任务,您需要重新启动循环。

你可以这样处理它:

import asyncio

@asyncio.coroutine
def shleepy_time(seconds):
    print("Shleeping for s seconds...".format(s=seconds))
    yield from asyncio.sleep(seconds)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    # Side note: Apparently, async() will be deprecated in 3.4.4.
    # See: https://docs.python.org/3.4/library/asyncio-task.html#asyncio.async
    tasks = asyncio.gather(
        asyncio.async(shleepy_time(seconds=5)),
        asyncio.async(shleepy_time(seconds=10))
    )

    try:
        loop.run_until_complete(tasks)
    except KeyboardInterrupt as e:
        print("Caught keyboard interrupt. Canceling tasks...")
        tasks.cancel()
        loop.run_forever()
        tasks.exception()
    finally:
        loop.close()

一旦我们捕捉到KeyboardInterrupt,我们就会调用tasks.cancel(),然后再次启动looprun_forever 实际上会在 tasks 被取消后立即退出(请注意,取消 asyncio.gather 返回的 Future 也会取消其中的所有 Futures),因为中断的 loop.run_until_complete 调用添加了一个 @987654334 @ 到 tasks 停止循环。所以,当我们取消tasks 时,回调触发,循环停止。此时我们调用tasks.exception,只是为了避免收到有关未从_GatheringFuture 获取异常的警告。

【讨论】:

啊,所以事件循环之外的任务没有任何反应,甚至没有取消,对吧?这听起来像是一个需要牢记的简单规则。 刚刚试了一下,它似乎像宣传的那样工作。甜的!旁注:我还注意到,如果您将return_exceptions=True 传递给gather(),则可以省略对tasks.exception() 的调用,因为异常将作为结果返回。 @NickChammas 对,必须运行循环才能使取消生效(as the noted in the docs)。是的,一般来说,asyncio.Task 不会发生任何事情,除非循环正在积极驱动它。使用 return_exceptions=True 是一个很好的技巧,只要你可以接受真正的异常(例如 CancelledError 以外的东西)从你的包装协程中抛出,而实际上并没有引发。 dano - 当协程引发常规异常(与用户引发键盘中断相反)时,您在此处描述的行为是否同样适用?我发现the call to loop.run_forever() just keeps going and the canceled tasks just run anyway。这是预期的吗? @NickChammas 实际上,您可能会提出一个很好的案例,即应该增强由asyncio.gather 返回的_GatheringFuture 以支持对其子级调用cancel(),即使_GatheringFuture 本身也是如此已经完成,以支持此用例。虽然我想如果你想要这种行为,你最好使用 asyncio.waitFIRST_EXCEPTION 选项。【参考方案2】:

Python 3.7+ 的注意事项:以下内容现已作为标准库 asyncio.run 函数的一部分实现 – 准备好升级后,将以下内容替换为 sys.exit(loop.run(amain(loop)))! (如果您想打印该消息,只需将 try…except-clause 移至 amain。)

针对 Python 3.6+ 更新:添加对 loop.shutdown_asyncgens 的调用以避免未完全使用的异步生成器导致内存泄漏。

受其他一些答案的启发,以下解决方案应该适用于几乎所有情况,并且不依赖于您手动跟踪需要在 Ctrl+C:

loop = asyncio.get_event_loop()
try:
    # Here `amain(loop)` is the core coroutine that may spawn any
    # number of tasks
    sys.exit(loop.run_until_complete(amain(loop)))
except KeyboardInterrupt:
    # Optionally show a message if the shutdown may take a while
    print("Attempting graceful shutdown, press Ctrl+C again to exit…", flush=True)
    
    # Do not show `asyncio.CancelledError` exceptions during shutdown
    # (a lot of these may be generated, skip this if you prefer to see them)
    def shutdown_exception_handler(loop, context):
        if "exception" not in context \
        or not isinstance(context["exception"], asyncio.CancelledError):
            loop.default_exception_handler(context)
    loop.set_exception_handler(shutdown_exception_handler)
    
    # Handle shutdown gracefully by waiting for all tasks to be cancelled
    tasks = asyncio.gather(*asyncio.Task.all_tasks(loop=loop), loop=loop, return_exceptions=True)
    tasks.add_done_callback(lambda t: loop.stop())
    tasks.cancel()
    
    # Keep the event loop running until it is either destroyed or all
    # tasks have really terminated
    while not tasks.done() and not loop.is_closed():
        loop.run_forever()
finally:
    loop.run_until_complete(loop.shutdown_asyncgens())
    loop.close()

上面的代码将使用asyncio.Task.all_tasks从事件循环中获取所有当前任务,并使用asyncio.gather将它们放在一个组合的未来中。然后使用 future 的 .cancel() 方法取消该未来的所有任务(所有当前正在运行的任务)。然后return_exceptions=True 确保存储所有接收到的asyncio.CancelledError 异常,而不是导致未来出错。

上述代码还将覆盖默认异常处理程序,以防止生成的asyncio.CancelledError 异常被记录。

2020 年 12 月 17 日更新:删除了 Python 3.5 的兼容性代码。

【讨论】:

显然核心团队不推荐使用单独的loop 哪个is Not asyncio.get_event_loop() (因此所有这些loop=loop kw 参数)【参考方案3】:

Python 3.7+ 中,建议您使用asyncio.run 来启动异步主函数。

asyncio.run 将负责为您的程序创建事件循环,并确保在主函数退出时关闭事件循环并清除所有任务(包括由于KeyboardInterrupt 异常)。

大致类似于以下内容(参见asyncio/runners.py):

def run(coro, *, debug=False):
    """`asyncio.run` is new in Python 3.7"""
    loop = asyncio.get_event_loop()
    try:
        loop.set_debug(debug)
        return loop.run_until_complete(coro)
    finally:
        try:
            all_tasks = asyncio.gather(*asyncio.all_tasks(loop), return_exceptions=True)
            all_tasks.cancel()
            with contextlib.suppress(asyncio.CancelledError):
                loop.run_until_complete(all_tasks)
            loop.run_until_complete(loop.shutdown_asyncgens())
        finally:
            loop.close()

【讨论】:

如果你 KeyboardInterrupt asyncio.run(main())main 处理 asyncio.CancelledError 你如何访问它的返回值? @Kenny 我不确定我是否完全理解你的问题;但您可以像任何其他异常一样捕获 asyncio.CancelledError:try: asyncio.run(main()); except asyncio.CancelledError as e: print(f'Cancelled: e')【参考方案4】:

除非您在 Windows 上,否则为 SIGINT 设置基于事件循环的信号处理程序(以及 SIGTERM,以便您可以将其作为服务运行)。在这些处理程序中,您可以立即退出事件循环,或者启动某种清理序列并稍后退出。

官方 Python 文档中的示例:https://docs.python.org/3.4/library/asyncio-eventloop.html#set-signal-handlers-for-sigint-and-sigterm

【讨论】:

您能否解释一下这是如何工作的,以及为什么像我在示例中那样简单地捕获KeyboardInterrupt 更可取?中断似乎工作正常。清理遗留的任务似乎是问题所在。是不是因为事件循环本身没有处理中断? 通常当你有一个事件循环时,你应该在事件循环中处理各种事件。我不能说为什么 KeyboardInterrupt 具体会有问题。考虑一下它可能会中断在事件循环中执行的任何代码(但我不能肯定地说,因为我不知道设计的细节)。 文档中似乎没有描述它是如何工作的。我想这是“自我管道”技巧,如果你想知道,你应该查看 Python 源代码。【参考方案5】:

使用signal 模块在signal.SIGINT 信号(Ctrl + C) 上设置asyncio.Event 可以是告诉所有异步代码自然停止的干净方法。这一点尤其重要,因为有些库如aiohttp need a chance to be run cleanup tasks before the event loop closes.

这是一个使用aiohttp 库的示例。有一个asyncio.sleep(5)防止连接返回池,给用户一个机会ctrl+c模拟一个KeyboardInterrupt异常

示例代码:

import logging
import asyncio
import signal
import random

import aiohttp

logging.basicConfig(level="INFO", format="%(asctime)s %(threadName)-10s %(name)-10s %(levelname)-8s: %(message)s")
logger = logging.getLogger("root")

stop_event = asyncio.Event()

async def get_json(aiohttp_session):

    logger.info("making http request")

    params = "value": random.randint(0,1000) 
    async with aiohttp_session.get(f'https://httpbin.org/get', params=params) as response:

        # async with response:
        j = await response.json()
        logger.info("get data: `%s`", j["args"])
        await asyncio.sleep(5)

async def run():

    while not stop_event.is_set():
        async with aiohttp.ClientSession() as aiohttp_session:

            await get_json(aiohttp_session)

    logger.info("stop event was set, sleeping to let aiohttp close it's connections")
    await asyncio.sleep(0.1)
    logger.info("sleep finished, returning")


def inner_ctrl_c_signal_handler(sig, frame):
    '''
    function that gets called when the user issues a
    keyboard interrupt (ctrl+c)
    '''

    logger.info("SIGINT caught!")
    stop_event.set()

# experiment with commenting out this line and ctrl+c-ing the script
# to see how you get an "event loop is closed" error
signal.signal(signal.SIGINT, inner_ctrl_c_signal_handler)

asyncio.run(run())

没有signal.signal 调用:

> python C:\Users\mark\Temp\test_aiohttp.py
2021-03-06 22:21:08,684 MainThread root       INFO    : making http request
2021-03-06 22:21:09,132 MainThread root       INFO    : get data: `'value': '500'`
Traceback (most recent call last):
  File "C:\Users\auror\Temp\test_aiohttp.py", line 52, in <module>
    asyncio.run(run())
  File "c:\python39\lib\asyncio\runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "c:\python39\lib\asyncio\base_events.py", line 629, in run_until_complete
    self.run_forever()
  File "c:\python39\lib\asyncio\windows_events.py", line 316, in run_forever
    super().run_forever()
  File "c:\python39\lib\asyncio\base_events.py", line 596, in run_forever
    self._run_once()
  File "c:\python39\lib\asyncio\base_events.py", line 1854, in _run_once
    event_list = self._selector.select(timeout)
  File "c:\python39\lib\asyncio\windows_events.py", line 434, in select
    self._poll(timeout)
  File "c:\python39\lib\asyncio\windows_events.py", line 783, in _poll
    status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
KeyboardInterrupt
Exception ignored in: <function _ProactorBasePipeTransport.__del__ at 0x000001CFFD75BB80>
Traceback (most recent call last):
  File "c:\python39\lib\asyncio\proactor_events.py", line 116, in __del__
    self.close()
  File "c:\python39\lib\asyncio\proactor_events.py", line 108, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "c:\python39\lib\asyncio\base_events.py", line 746, in call_soon
    self._check_closed()
  File "c:\python39\lib\asyncio\base_events.py", line 510, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

用它:


> python C:\Users\mark\Temp\test_aiohttp.py
2021-03-06 22:20:29,656 MainThread root       INFO    : making http request
2021-03-06 22:20:30,106 MainThread root       INFO    : get data: `'value': '367'`
2021-03-06 22:20:35,122 MainThread root       INFO    : making http request
2021-03-06 22:20:35,863 MainThread root       INFO    : get data: `'value': '489'`
2021-03-06 22:20:38,695 MainThread root       INFO    : SIGINT caught!
2021-03-06 22:20:40,867 MainThread root       INFO    : stop event was set, sleeping to let aiohttp close it's connections
2021-03-06 22:20:40,962 MainThread root       INFO    : sleep finished, returning

【讨论】:

以上是关于中断事件循环后清理的正确方法是啥?的主要内容,如果未能解决你的问题,请参考以下文章

javascript使用for循环批量注册的事件不能正确获取索引值的解决方法

进入Qt事件循环后如何自动执行方法?

php foreach循环大的数据量,在其执行1个多小时后就中断了?有可能是啥原因?

使用 SIMD 将累积(单个)值打包成两个值管理清理代码循环的方法是啥?

在 React 中处理事件时使用钩子的正确方法是啥

在C#中循环未正确完成