asyncio 任务已被销毁,但它处于挂起状态
Posted
技术标签:
【中文标题】asyncio 任务已被销毁,但它处于挂起状态【英文标题】:asyncio task was destroyed but it is pending 【发布时间】:2020-10-22 08:04:54 【问题描述】:我正在编写一个示例程序,它以块的形式从数据源(csv 或 rdbms)中读取数据,进行一些转换并通过套接字将其发送到服务器。
但是因为 csv 非常大,为了测试目的,我想在几块之后打破阅读。 不幸的是,出了点问题,我不知道是什么以及如何解决它。可能我必须取消一些,但现在确定在哪里以及如何取消。我收到以下错误:
Task was destroyed but it is pending!
task: <Task pending coro=<<async_generator_athrow without __name__>()>>
示例代码为:
import asyncio
import json
async def readChunks():
# this is basically a dummy alternative for reading csv in chunks
df = ["chunk_" + str(x) : [r for r in range(10)] for x in range(10)]
for chunk in df:
await asyncio.sleep(0.001)
yield chunk
async def send(row):
j = json.dumps(row)
print(f"to be sent: j")
await asyncio.sleep(0.001)
async def main():
i = 0
async for chunk in readChunks():
for k, v in chunk.items():
await asyncio.gather(send(k:v))
i += 1
if i > 5:
break
#print(f"item in main via async generator is chunk")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
【问题讨论】:
Unfortunately something goes wrong...
- 出了什么问题?行为不端如何?你知道你的协程是否按照你想要的方式工作吗?您是否尝试通过检查中间结果来追踪错误?
你为什么不使用高级api - asyncio.run()
?
【参考方案1】:
这行得通...
import asyncio
import json
import logging
logging.basicConfig(format='%(asctime)s.%(msecs)03d %(message)s',
datefmt='%S')
root = logging.getLogger()
root.setLevel(logging.INFO)
async def readChunks():
# this is basically a dummy alternative for reading csv in chunks
df = ["chunk_" + str(x) : [r for r in range(10)] for x in range(10)]
for chunk in df:
await asyncio.sleep(0.002)
root.info('readChunks: next chunk coming')
yield chunk
async def send(row):
j = json.dumps(row)
root.info(f"to be sent: j")
await asyncio.sleep(0.002)
async def main():
i = 0
root.info('main: starting to read chunks')
async for chunk in readChunks():
for k, v in chunk.items():
root.info(f'main: sending an item')
#await asyncio.gather(send(k:v))
stuff = await send(k:v)
i += 1
if i > 5:
break
#print(f"item in main via async generator is chunk")
##loop = asyncio.get_event_loop()
##loop.run_until_complete(main())
##loop.close()
if __name__ == '__main__':
asyncio.run(main())
...至少它会运行并完成。
bugs.python.org/issue38013 中描述了通过退出 async for
循环来停止异步生成器的问题,并且看起来它已在 3.7.5 中修复。
但是,使用
loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.run_until_complete(main())
loop.close()
我得到一个调试错误,但在 Python 3.8 中没有异常。
Task was destroyed but it is pending!
task: <Task pending name='Task-8' coro=<<async_generator_athrow without __name__>()>>
使用更高级别的 API asyncio.run(main())
with debugging ON 我确实没有收到调试消息。如果您打算尝试升级到 Python 3.7.5-9,您可能仍应使用 asyncio.run()
。
【讨论】:
你有调试吗? - 让我试试 - 我正在使用 3,8。 在调试模式开启的情况下没有任何报告 - 不确定我是否可以在我的计算机上没有 3.7 的情况下跟踪该问题,我将查看文档。 @MisterMiyagi - 看起来是 - bugs.python.org/issue38013 - 它在 3.7.1 中出现,然后在 3.7.5 中修复。跨度> 忘了提及我使用的是 3.7,因为使用 3.8 我无法安装一些可用 C 代码提供的库。 @spyder - 看起来它已在 3.7.5 中修复 - 如果它不会破坏您的库,请升级到 3.7.5-3.7.9。【参考方案2】:许多async
资源,例如生成器,需要借助事件循环进行清理。当async for
循环通过break
停止迭代异步生成器时,生成器仅由垃圾收集器清理。这意味着任务处于挂起状态(等待事件循环)但被销毁(被垃圾收集器)。
最直接的解决方法是显式地aclose
生成器:
async def main():
i = 0
aiter = readChunks() # name iterator in order to ...
try:
async for chunk in aiter:
...
i += 1
if i > 5:
break
finally:
await aiter.aclose() # ... clean it up when done
可以使用asyncstdlib
简化这些模式(免责声明:我维护这个库)。 asyncstdlib.islice
允许在干净地关闭生成器之前获取固定数量的项目:
import asyncstdlib as a
async def main():
async for chunk in a.islice(readChunks(), 5):
...
如果break
条件是动态的,scoping the iterator 在任何情况下都保证清理:
import asyncstdlib as a
async def main():
async with a.scoped_iter(readChunks()) as aiter:
async for idx, chunk in a.enumerate(aiter):
...
if idx >= 5:
break
【讨论】:
我不知道asyncstdlib
,干得好 Miyagi 先生!
谢谢!认为要取消某些事情并将某些事情转换为任务。不知道是什么以及如何。您是否知道任何详细但逐步的指南,它会越来越深入并引导我穿越异步世界?
还发现这似乎在没有“最终”的情况下工作:await asyncio.gather(send(k:v), return_exceptions = True)
我可以把它放在我的代码或每个有async for
循环的任务中吗?【参考方案3】:
您的 readChunks
正在异步运行并且您的循环。如果没有完成程序,你就是在破坏它。
这就是为什么它给asyncio task was destroyed but it is pending
简而言之,异步任务在后台执行其工作,但您通过中断循环(停止程序)将其杀死。
【讨论】:
【参考方案4】:问题很简单。您确实提前退出了循环,但异步生成器尚未用尽(待处理):
...
if i > 5:
break
...
【讨论】:
@7u5h4r 和 Alex:是的,我的意图是打破迭代。但我的做法是……至少很糟糕。我正在寻找合适的教授。大大地。所以想找方法在哪里,怎么取消,哪一部分。以上是关于asyncio 任务已被销毁,但它处于挂起状态的主要内容,如果未能解决你的问题,请参考以下文章
取消 asyncio 任务导致“任务被破坏但它处于挂起状态”
Python3 asyncio“任务已被破坏,但处于待处理状态”,具有某些特定条件
Python asyncio - 使用Task的循环退出已被销毁,但它正在等待处理