asyncio.gather() 在具有协程字段的 dict 列表上?
Posted
技术标签:
【中文标题】asyncio.gather() 在具有协程字段的 dict 列表上?【英文标题】:asyncio.gather() on list of dict which has a field of coroutine? 【发布时间】:2020-11-11 05:54:06 【问题描述】:我有以下两个异步函数
from tornado.httpclient import AsyncHTTPClient
async def get_categories(): # return a list of str
# ....
http = AsyncHTTPClient()
resp = await http.fetch(....)
return [....]
async def get_details(category): # return a list of dict
# ....
http = AsyncHTTPClient()
resp = await http.fetch(....)
return [....]
现在我需要创建一个函数来获取所有类别的详细信息(同时运行 http fetch)并将它们组合在一起。
async def get_all_details():
categories = await get_categories()
tasks = list(map(lambda x: 'category': x, 'task':get_details(x), categories))
r = await asyncio.gather(*tasks) # error
# need to return [
# 'category':'aaa', 'detail':'aaa detail 1',
# 'category':'aaa', 'detail':'aaa detail 2',
# 'category':'bbb', 'detail':'bbb detail 1',
# 'category':'bbb', 'detail':'bbb detail 2',
# 'category':'bbb', 'detail':'bbb detail 3',
# 'category':'ccc', 'detail':'ccc detail 1',
# 'category':'ccc', 'detail':'aaa detail 2',
# ]
但是,列表行返回错误:
TypeError: unhashable type: 'dict'
tasks
具有以下值:
['category': 'aaa',
'task': <coroutine object get_docker_list at 0x000001B12B8560C0>,
'category': 'bbb',
'task': <coroutine object get_docker_list at 0x000001B12B856F40>,
'category': 'ccc',
'task': <coroutine object get_docker_list at 0x000001B12B856740>]
顺便说一句,这是一种限制 http fetch 调用的方法吗?例如,最多同时运行四个提取。
【问题讨论】:
【参考方案1】:gather
接受协程(或其他可等待)参数并以相同顺序返回其结果的元组。您正在向它传递一系列 dicts,其中一些值是协程。 gather
不知道如何处理它并试图将 dicts 视为等待对象,但很快就会失败。
生成字典列表的正确方法是只将协程传递给gather
,等待结果,然后将它们处理成新的字典:
async def get_all_details():
category_list = await get_categories()
details_list = await asyncio.gather(
*[get_details(category) for category in category_list]
)
return [
'category': category, 'details': details
for (category, details) in zip(category_list, details_list)
]
顺便说一句,这是一种限制 http fetch 调用的方法吗?例如,最多同时运行四个提取。
限制并行调用的方便且惯用的方法是使用semaphore:
async def get_details(category, limit):
# acquiring the semaphore passed as `limit` will allow at most a
# fixed number of coroutines to proceed concurrently
async with limit:
... the rest of the code ...
async def get_all_details():
limit = asyncio.Semaphore(4)
category_list = await get_categories()
details_list = await asyncio.gather(
*[get_details(category, limit) for category in category_list]
)
... the rest of the code ...
【讨论】:
顺便说一句,我应该创建一个全局http = AsyncHTTPClient()
以便创建更少的 ``AsyncHTTPClient` 对象吗?
@ca9163d9 我没有使用tornado 的http 客户端的经验,但通常会在main()
中创建一个客户端并将其传递给各种协程。这允许不同的 fetch 重用 TCP 连接并尽可能共享与客户端相关的资源。以上是关于asyncio.gather() 在具有协程字段的 dict 列表上?的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Python 3.8 中为 asyncio.gather 构建任务列表