优雅关闭 asyncio 协程

Posted

技术标签:

【中文标题】优雅关闭 asyncio 协程【英文标题】:Graceful shutdown of asyncio coroutines 【发布时间】:2016-09-21 21:42:22 【问题描述】:

我目前在关闭应用程序的 CTRL-C 期间关闭 asyncio 协程时遇到问题。以下代码是我现在所拥有的精简版:

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import asyncio
import time
import functools
import signal


class DummyProtocol(asyncio.Protocol):

    def __init__(self, *args, **kwargs):
        self._shutdown = asyncio.Event()
        self._response = asyncio.Queue(maxsize=1)
        super().__init__(*args, **kwargs)

    def connection_made(self, transport):
        self.transport = transport

    def close(self):
        print("Closing protocol")
        self._shutdown.set()

    def data_received(self, data):

        #data = b'OK MPD '

        # Start listening for commands after a successful handshake
        if data.startswith(b'OK MPD '):
            print("Ready for sending commands")
            self._proxy_task = asyncio.ensure_future(self._send_commands())
            return

        # saving response for later consumption in self._send_commands
        self._response.put_nowait(data)

    async def _send_commands(self):

        while not self._shutdown.is_set():

            print("Waiting for commands coming in ...")

            command = None

            # listen for commands coming in from the global command queue. Only blocking 1sec.
            try:
                command = await asyncio.wait_for(cmd_queue.get(), timeout=1)
            except asyncio.TimeoutError:
                continue

            # sending the command over the pipe
            self.transport.write(command)

            # waiting for the response. Blocking until response is complete.
            res = await self._response.get()
            # put it into the global response queue
            res_queue.put_nowait(res)


async def connect(loop):
    c = lambda: DummyProtocol()
    t = asyncio.Task(loop.create_connection(c, '192.168.1.143', '6600'))
    try:
        # Wait for 3 seconds, then raise TimeoutError
        trans, proto = await asyncio.wait_for(t, timeout=3)
        print("Connected to <192.168.1.143:6600>.")
        return proto
    except (asyncio.TimeoutError, OSError) as e:
        print("Could not connect to <192.168.1.143:6600>. Trying again ...")
        if isinstance(e, OSError):
            log.exception(e)


def shutdown(proto, loop):
    # http://***.com/a/30766124/1230358
    print("Shutdown of DummyProtocol initialized ...")
    proto.close()
    # give the coros time to finish
    time.sleep(2)

    # cancel all other tasks
    # for task in asyncio.Task.all_tasks():
    #    task.cancel()

    # stopping the event loop
    if loop:
        print("Stopping event loop ...")
        loop.stop()

    print("Shutdown complete ...")    


if __name__ == "__main__":

    loop = asyncio.get_event_loop()

    cmd_queue = asyncio.Queue()
    res_queue = asyncio.Queue()

    dummy_proto = loop.run_until_complete(connect(loop))

    for signame in ('SIGINT','SIGTERM'):
        loop.add_signal_handler(getattr(signal, signame), functools.partial(shutdown, dummy_proto, loop))

    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    finally:
        loop.close()

如果按下 CTRL-C,我会得到以下输出:

Connected to <192.168.1.143:6600>.
Ready for sending commands
Waiting for commands coming in ...
Waiting for commands coming in ...
Waiting for commands coming in ...
Waiting for commands coming in ...
^CShutdown of DummyProtocol initialized ...
Closing protocol
Stopping event loop ...
Shutdown complete ...
Task was destroyed but it is pending!
task: <Task pending coro=<DummyProtocol._send_commands() running at ./dummy.py:45> wait_for=<Future pending cb=[Task._wakeup()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<Queue.get() running at /usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/queues.py:168> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_release_waiter(<Future pendi...sk._wakeup()]>)() at /usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py:344]>
Exception ignored in: <generator object Queue.get at 0x10594b468>
Traceback (most recent call last):
  File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/queues.py", line 170, in get
  File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py", line 227, in cancel
  File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py", line 242, in _schedule_callbacks
  File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 447, in call_soon
  File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 456, in _call_soon
  File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 284, in _check_closed
RuntimeError: Event loop is closed

我对 asyncio 不是很有经验,所以我很确定我在这里遗漏了一些重要的东西。真正让我头疼的是Shutdown complete ...之后的输出部分。从Task was destroyed but it is pending!开始,我不得不承认我不知道发生了什么。我查看了其他问题,但无法正常工作。那么,为什么这段代码会输出 Task was destroyed but it is pending! aso. 之类的东西,以及如何干净地关闭协程?

感谢您的帮助!

【问题讨论】:

【参考方案1】:

Task was destroyed but it is pending! 是什么意思?

如果此时您的程序完成了一些异步任务仍未完成,您将收到此警告。需要此警告是因为某些正在运行的任务可能无法正确释放某些资源。

有两种常见的解决方法:

    您可以等待任务自行完成 您可以取消任务并等待它们完成

异步和阻塞同步操作

让我们看看你的代码:

def shutdown(proto, loop):
    print("Shutdown of DummyProtocol initialized ...")
    proto.close()

    time.sleep(2)
    # ...

time.sleep(2) - 这一行不会给协程时间来完成。它只会冻结你所有的程序两秒钟。这段时间什么都不会发生。

这是因为您的事件循环在您调用time.sleep(2) 的同一进程中运行。你不应该在你的 asyncio 程序中以这种方式调用长时间运行的同步操作。请阅读this answer 了解异步代码的工作原理。

我们如何等待任务完成

让我们尝试修改shutdown函数。这不是异步函数,你不能await 里面的东西。要执行一些异步代码,我们需要手动执行:停止当前正在运行的循环(因为它已经在运行),创建一些异步函数来等待任务完成,传递该函数以在事件循环中执行。

def shutdown(proto, loop):
    print("Shutdown of DummyProtocol initialized ...")

    # Set shutdown event: 
    proto.close()

    # Stop loop:
    loop.stop()

    # Find all running tasks:
    pending = asyncio.Task.all_tasks()

    # Run loop until tasks done:
    loop.run_until_complete(asyncio.gather(*pending))

    print("Shutdown complete ...")    

您也可以取消任务并等待它们完成。详情请见this answer。

清理操作的位置

我对信号不太熟悉,但你真的需要它来捕捉 CTRL-C 吗?每当KeyboardInterrupt 发生时,它都会在你运行事件循环的那一行抛出(在你的代码中它是loop.run_forever())。我在这里可能错了,但处理这种情况的常用方法是将所有清理操作放在 finally 块中。

比如你can seeaiohttp是怎么做到的:

try:
    loop.run_forever()
except KeyboardInterrupt:  # pragma: no branch
    pass
finally:
    srv.close()
    loop.run_until_complete(srv.wait_closed())
    loop.run_until_complete(app.shutdown())
    loop.run_until_complete(handler.finish_connections(shutdown_timeout))
    loop.run_until_complete(app.cleanup())
loop.close()

【讨论】:

这是一个深思熟虑且令人兴奋的答案!非常感谢您的帮助和时间,您花在写作和思考这个问题上!今天下午我必须试一试,但据我现在可以说,它确实解决了我所有的问题。 从 Python 3.7 开始,建议使用asyncio.all_tasks(),而不是asyncio.Task.all_tasks()。后者已弃用,将来将被删除:docs.python.org/3/library/… 什么是proto @alper 它是一个变量名,是“protocol”的缩写。它来自原始代码 sn-p,与关闭协程无关。【参考方案2】:

要完成接受的答案,您可以使用aiorun,它可以很好地为您解决这个问题:https://github.com/cjrh/aiorun

【讨论】:

以上是关于优雅关闭 asyncio 协程的主要内容,如果未能解决你的问题,请参考以下文章

Python3异步编程

Asyncio 与 Gevent [关闭]

python协程(4):asyncio

带有nest_asyncio包的asyncio的异步/等待总是返回协程'get1'从未等待

asyncio - 多次等待协程(周期性任务)

时间循环 asyncio 协程