asyncio 任务已被销毁,但它处于挂起状态

Posted

技术标签:

【中文标题】asyncio 任务已被销毁,但它处于挂起状态【英文标题】:asyncio task was destroyed but it is pending 【发布时间】:2020-10-22 08:04:54 【问题描述】:

我正在编写一个示例程序,它以块的形式从数据源(csv 或 rdbms)中读取数据,进行一些转换并通过套接字将其发送到服务器。

但是因为 csv 非常大,为了测试目的,我想在几块之后打破阅读。 不幸的是,出了点问题,我不知道是什么以及如何解决它。可能我必须取消一些,但现在确定在哪里以及如何取消。我收到以下错误:

Task was destroyed but it is pending!
task: <Task pending coro=<<async_generator_athrow without __name__>()>>

示例代码为:

import asyncio
import json

async def readChunks():
  # this is basically a dummy alternative for reading csv in chunks
  df = ["chunk_" + str(x) : [r for r in range(10)] for x in range(10)]
  for chunk in df:
    await asyncio.sleep(0.001)
    yield chunk

async def send(row):
    j = json.dumps(row)
    print(f"to be sent: j")
    await asyncio.sleep(0.001)


async def main():
    i = 0
    async for chunk in readChunks():
        for k, v in chunk.items():
            await asyncio.gather(send(k:v))
        i += 1
        if i > 5:
            break
        #print(f"item in main via async generator is chunk")
    

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

【问题讨论】:

Unfortunately something goes wrong... - 出了什么问题?行为不端如何?你知道你的协程是否按照你想要的方式工作吗?您是否尝试通过检查中间结果来追踪错误? 你为什么不使用高级api - asyncio.run() 【参考方案1】:

这行得通...

import asyncio
import json
import logging

logging.basicConfig(format='%(asctime)s.%(msecs)03d %(message)s',
                    datefmt='%S')
root = logging.getLogger()
root.setLevel(logging.INFO)

async def readChunks():
  # this is basically a dummy alternative for reading csv in chunks
  df = ["chunk_" + str(x) : [r for r in range(10)] for x in range(10)]
  for chunk in df:
    await asyncio.sleep(0.002)
    root.info('readChunks: next chunk coming')
    yield chunk

async def send(row):
    j = json.dumps(row)
    root.info(f"to be sent: j")
    await asyncio.sleep(0.002)


async def main():
    i = 0
    root.info('main: starting to read chunks')
    async for chunk in readChunks():
        for k, v in chunk.items():
            root.info(f'main: sending an item')
            #await asyncio.gather(send(k:v))
            stuff = await send(k:v)
        i += 1
        if i > 5:
            break
        #print(f"item in main via async generator is chunk")

##loop = asyncio.get_event_loop()
##loop.run_until_complete(main())
##loop.close()

if __name__ == '__main__':

    asyncio.run(main())

...至少它会运行并完成。


bugs.python.org/issue38013 中描述了通过退出 async for 循环来停止异步生成器的问题,并且看起来它已在 3.7.5 中修复。

但是,使用

loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.run_until_complete(main())
loop.close()

我得到一个调试错误,但在 Python 3.8 中没有异常。

Task was destroyed but it is pending!
task: <Task pending name='Task-8' coro=<<async_generator_athrow without __name__>()>>

使用更高级别的 API asyncio.run(main()) with debugging ON 我确实没有收到调试消息。如果您打算尝试升级到 Python 3.7.5-9,您可能仍应使用 asyncio.run()

【讨论】:

你有调试吗? - 让我试试 - 我正在使用 3,8。 在调试模式开启的情况下没有任何报告 - 不确定我是否可以在我的计算机上没有 3.7 的情况下跟踪该问题,我将查看文档。 @MisterMiyagi - 看起来是 - bugs.python.org/issue38013 - 它在 3.7.1 中出现,然后在 3.7.5 中修复。跨度> 忘了提及我使用的是 3.7,因为使用 3.8 我无法安装一些可用 C 代码提供的库。 @spyder - 看起来它已在 3.7.5 中修复 - 如果它不会破坏您的库,请升级到 3.7.5-3.7.9。【参考方案2】:

许多async 资源,例如生成器,需要借助事件循环进行清理。当async for 循环通过break 停止迭代异步生成器时,生成器仅由垃圾收集器清理。这意味着任务处于挂起状态(等待事件循环)但被销毁(被垃圾收集器)。

最直接的解决方法是显式地aclose 生成器:

async def main():
    i = 0
    aiter = readChunks()      # name iterator in order to ...
    try:
        async for chunk in aiter:
            ...
            i += 1
            if i > 5:
                break
    finally:
        await aiter.aclose()  # ... clean it up when done

可以使用asyncstdlib 简化这些模式(免责声明:我维护这个库)。 asyncstdlib.islice 允许在干净地关闭生成器之前获取固定数量的项目:

import asyncstdlib as a

async def main():
    async for chunk in a.islice(readChunks(), 5):
        ...

如果break 条件是动态的,scoping the iterator 在任何情况下都保证清理:

import asyncstdlib as a

async def main():
    async with a.scoped_iter(readChunks()) as aiter:
        async for idx, chunk in a.enumerate(aiter):
            ...
            if idx >= 5:
                break

【讨论】:

我不知道 asyncstdlib,干得好 Miyagi 先生! 谢谢!认为要取消某些事情并将某些事情转换为任务。不知道是什么以及如何。您是否知道任何详细但逐步的指南,它会越来越深入并引导我穿越异步世界? 还发现这似乎在没有“最终”的情况下工作:await asyncio.gather(send(k:v), return_exceptions = True) 我可以把它放在我的代码或每个有async for循环的任务中吗?【参考方案3】:

您的 readChunks 正在异步运行并且您的循环。如果没有完成程序,你就是在破坏它。

这就是为什么它给asyncio task was destroyed but it is pending

简而言之,异步任务在后台执行其工作,但您通过中断循环(停止程序)将其杀死。

【讨论】:

【参考方案4】:

问题很简单。您确实提前退出了循环,但异步生成器尚未用尽(待处理):

...
if i > 5:
    break
...

【讨论】:

@7u5h4r 和 Alex:是的,我的意图是打破迭代。但我的做法是……至少很糟糕。我正在寻找合适的教授。大大地。所以想找方法在哪里,怎么取消,哪一部分。

以上是关于asyncio 任务已被销毁,但它处于挂起状态的主要内容,如果未能解决你的问题,请参考以下文章

ASYNCIO:[错误] 任务被破坏,但它处于挂起状态

取消 asyncio 任务导致“任务被破坏但它处于挂起状态”

Python3 asyncio“任务已被破坏,但处于待处理状态”,具有某些特定条件

Python asyncio - 使用Task的循环退出已被销毁,但它正在等待处理

Python asyncio/discord.py - 循环退出,任务被破坏,但它处于待处理状态

9Python Asyncio异步编程-事件循环详解