取消 asyncio 任务导致“任务被破坏但它处于挂起状态”

Posted

技术标签:

【中文标题】取消 asyncio 任务导致“任务被破坏但它处于挂起状态”【英文标题】:cancelled asyncio tasks result in "Task was destroyed but it is pending" 【发布时间】:2020-12-14 13:22:46 【问题描述】:

我正在尝试编写一个作业调度程序,它可以有效地跨 N 个内核调度 M 个作业。一项工作完成后,应立即开始一项新工作。此外,我们应该支持超时,以便没有任务花费超过一定时间。这就是我想出的 for 主循环:

import asyncio
import sys

max_concurrency = 4

async def _sleep_asynchronously(time):
    index, seconds = time
    await asyncio.sleep(seconds)
    return (index, seconds)

def select_invocations(waiting, num_running):
    count = max_concurrency - num_running
    selected = waiting[:count]
    waiting = waiting[count:]
    return selected, waiting

async def _run_everything_asynchronously():
    tasks = []
    timeouts = [ 4, 3, 1, 2, 0.5, 7, 0.25, 3, 2, 1, 4.5, 5]
    timeouts = list(enumerate(timeouts))

    pending, waiting = select_invocations(tasks, 0)
    running = _sleep_asynchronously(timeout) for timeout in timeouts

    while len(running) > 0:
        try:
            done, running = await asyncio.wait(running, timeout=0.5, return_when=asyncio.FIRST_COMPLETED)
            if not done:
                for r in running:
                    r.cancel()
                    await r
            else:
                for d in done:
                    index, timeout = await d
                    print("Index  finished waiting for  seconds".format(index, timeout))

        except asyncio.CancelledError as e:
            running.clear()

        if len(waiting) > 0:
            pending, waiting = select_invocations(tasks, len(running))
            running = _sleep_asynchronously(timeout) for timeout in timeouts

if 'win32' in sys.platform:
    asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())

loop = asyncio.get_event_loop()
rc = loop.run_until_complete(_run_everything_asynchronously())
loop.close()

sys.exit(0)

如果我运行它,这是我的输出:

Index 6 finished waiting for 0.25 seconds
Index 4 finished waiting for 0.5 seconds
Index 9 finished waiting for 1 seconds
Index 2 finished waiting for 1 seconds
Task was destroyed but it is pending!
task: <Task pending coro=<_sleep_asynchronously() done, defined at D:\src\asynciotest.py:6> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000001E74D13BDF8>()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<_sleep_asynchronously() done, defined at D:\src\asynciotest.py:6> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000001E74D352438>()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<_sleep_asynchronously() done, defined at D:\src\asynciotest.py:6> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000001E74D37F9D8>()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<_sleep_asynchronously() done, defined at D:\src\asynciotest.py:6> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000001E74D37FBE8>()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<_sleep_asynchronously() done, defined at D:\src\asynciotest.py:6> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000001E74D3A15B8>()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<_sleep_asynchronously() done, defined at D:\src\asynciotest.py:6> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000001E74D3E7498>()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<_sleep_asynchronously() done, defined at D:\src\asynciotest.py:6> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000001E74D3E74C8>()]>>

我在这里缺少什么?如何正确清理被取消的任务?

【问题讨论】:

【参考方案1】:

当你取消一个任务并在上面await,除非被抓到,否则它会抛出一个CancelledError

您当前的代码采用第一个CancelledError 并清除运行列表,而不取消其余部分:

except asyncio.CancelledError as e:
    running.clear()

与其等待,不如将它们全部取消:

for r in running:
    r.cancel()

【讨论】:

好的,很好。我的代码还有另一个问题,但对于这种情况并不重要。在我的实际程序中,这些异步函数是子进程,即使我按照您建议的方式进行操作,所有取消的子进程仍然会抱怨。我不得不加回抛出的等待,但是用一个什么都不做的 except 子句包装那个等待。所以看起来(至少对于子进程),等待仍然是必要的。 因为取消没有完成。取消后可以await asyncio.wait(runners)

以上是关于取消 asyncio 任务导致“任务被破坏但它处于挂起状态”的主要内容,如果未能解决你的问题,请参考以下文章

Python asyncio/discord.py - 循环退出,任务被破坏,但它处于待处理状态

python asyncio,如何从另一个线程创建和取消任务

asyncio 任务已被销毁,但它处于挂起状态

线程和 asyncio:任务已销毁,但处于挂起状态

python-asyncio

asyncio CancelledError 和 KeyboardInterrupt