asyncio CancelledError 和 KeyboardInterrupt

Posted

技术标签:

【中文标题】asyncio CancelledError 和 KeyboardInterrupt【英文标题】:asyncio CancelledError and KeyboardInterrupt 【发布时间】:2017-05-05 12:29:46 【问题描述】:

我正在尝试两种方法来阻止无限循环运行:

supervisor_1:任务以编程方式取消 supervisor_2:任务通过 Ctrl+C 停止

虽然 supervisor_2 在中断时不会抛出任何错误,但我无法通过获取 Task was destroyed but it is pending! 来获取 supervisor_1。知道为什么吗?

代码如下:

import asyncio
import aioredis
from functools import partial



class Listener:
    def __init__(self, redis_conn):
        self.redis_conn = redis_conn

    async def forever(self, loop_name):
        counter = 0
        try:
            while True:
                print(': '.format(loop_name, counter))
                counter += 1
                await asyncio.sleep(1)
        except asyncio.CancelledError:
            print('Task Cancelled')
            self.redis_conn.close()
            await self.redis_conn.wait_closed()


async def supervisor_1(redis_conn):
    redis_conn = await redis_conn

    l = Listener(redis_conn)

    task = asyncio.ensure_future(
        asyncio.gather(l.forever('loop_1'), 
                       l.forever('loop_2')))
    await asyncio.sleep(2)
    task.cancel()


async def supervisor_2(redis_conn):
    redis_conn = await redis_conn

    l = Listener(redis_conn)
    await asyncio.gather(l.forever('loop_1'), 
                         l.forever('loop_2'))


if __name__ == '__main__':
    redis_conn = aioredis.create_pool(('localhost', 5003), db=1)

    loop = asyncio.get_event_loop()
    run = partial(supervisor_2, redis_conn=redis_conn)
    task = asyncio.ensure_future(run())
    try:
        loop.run_until_complete(task)
    except KeyboardInterrupt:
        print('Interruped !')
        task.cancel()
        loop.run_forever()
    finally:
        loop.close()

@update

感谢@Gerasimov,这是一个解决问题的版本,但不知何故仍不时在 KeyboardInterrupt 上引发错误:

async def supervisor(redis_conn):
    redis_conn = await redis_conn

    l = Listener(redis_conn)

    task = asyncio.ensure_future(
        asyncio.gather(l.forever('loop_1'), 
                       l.forever('loop_2'))
    )
    await asyncio.sleep(10)
    task.cancel()
    with suppress(asyncio.CancelledError):
        await task

async def kill_tasks():
    pending = asyncio.Task.all_tasks()
    for task in pending:
        task.cancel()
        with suppress(asyncio.CancelledError):
            await task 

if __name__ == '__main__':
    redis_conn = aioredis.create_pool(('localhost', 5003), db=1)

    loop = asyncio.get_event_loop()
    run = partial(supervisor, redis_conn=redis_conn)
    task = asyncio.ensure_future(run())
    try:
        loop.run_until_complete(task)
    except KeyboardInterrupt:
        print('Interruped !')
        loop.run_until_complete(kill_tasks())
    finally:
        loop.close()

【问题讨论】:

【参考方案1】:

task.cancel() 本身并没有完成任务:它只是告诉任务 CancelledError 应该在其中引发并立即返回。您应该调用它并等待,而任务实际上会被取消(虽然它会引发CancelledError)。

你也不应该在任务中抑制CancelledError

阅读this answer 我试图展示处理任务的不同方式。例如,要取消某些任务并等待它取消,您可以这样做:

from contextlib import suppress


task = ...  # remember, task doesn't suppress CancelledError itself

task.cancel()  # returns immediately, we should await task raised CancelledError.

with suppress(asyncio.CancelledError):
    await task  # or loop.run_until_complete(task) if it happens after event loop stopped

# Now when we awaited for CancelledError and handled it, 
# task is finally over and we can close event loop without warning.

【讨论】:

感谢您的链接。我用我对修复的理解更新了我的答案。但仍然出现错误(但并不总是像以前那样) @Orelus,和以前一样的错误?尝试将loop.run_until_complete(kill_tasks()) 移动到 finally 块中,就在loop.close() 之前。这应该可以解决问题。我不确定你的 run() 协程做了什么,但是当它完成时可能会发生这种情况,但有些任务不是:在这种情况下,在关闭事件循环时,即使没有 KeyboardInterrupt 发生,你也会收到警告。 请注意使用这种方法的一个缺点:如果调用任务在等待子任务清理和退出时被取消,suppress() 调用将吞噬那个 i> CancelledError 相反,事实上,await task 可以提前中止。看起来当前的 asyncio(至少从 3.8 开始)可能无法避免这种情况。

以上是关于asyncio CancelledError 和 KeyboardInterrupt的主要内容,如果未能解决你的问题,请参考以下文章

asyncio并发编程

深究Python中的asyncio库-asyncio简介与关键字

asyncio和aiohttp

Python标准模块--asyncio

asyncio

Python协程之asyncio