我如何*正确*在循环中运行 asyncio/aiohttp 请求?

Posted

技术标签:

【中文标题】我如何*正确*在循环中运行 asyncio/aiohttp 请求?【英文标题】:How do I *properly* run an asyncio/aiohttp request in a loop? 【发布时间】:2018-04-17 04:08:13 【问题描述】:

我正在尝试同时请求一堆 URL,但是这些 URL 是从列表中构建的。目前我正在遍历列表并(我认为)将它们添加到队列中。它绝对比 requests.get 快 10 倍,但是我不确定我是否正确执行此操作,因此可以对其进行优化。我对其进行了分析,并注意到它在并发请求完成后仍有 90% 的时间处于锁定状态,即开始 -> 10+ 个并发请求 -> 锁定 5 秒左右 -> 完成

此外,此代码最后会生成 Unclosed client session 消息。知道为什么吗?很确定这是正确使用上下文管理器。

我已经搜索并没有找到这个确切的问题

 import signal
 import sys
 import asyncio
 import aiohttp
 import json
 import requests

 lists = ['eth', 'btc', 'xmr', 'req', 'xlm', 'etc', 'omg', 'neo', 'btc', 'xmr', 'req', 'xlm', 'etc', 'omg', 'neo']

 loop = asyncio.get_event_loop()
 client = aiohttp.ClientSession(loop=loop)

 async def fetch(client, url):
     async with client.get(url) as resp:
         assert resp.status == 200
         return await resp.text()

 async def main(loop=loop, url=None):
     async with aiohttp.ClientSession(loop=loop) as client:
         html = await fetch(client, url)
         print(html)

 def signal_handler(signal, frame):
     loop.stop()
     client.close()
     sys.exit(0)

 signal.signal(signal.SIGINT, signal_handler)
 tasks = []
 for item in lists:
     url = "url/endpoint/coin_name".format(
                     url='https://coincap.io',
                     endpoint='page',
                     coin_name=item.upper()
                 )
     print(url)
     tasks.append(
         asyncio.ensure_future(main(url=url))
     )

 loop.run_until_complete(asyncio.gather(*tasks))

【问题讨论】:

【参考方案1】:

看起来你所拥有的东西是有效的,但是你认为你做的一切都不是很正确:

您创建了一个从未使用过的客户端,并且没有正确关闭(导致 Unclosed client session)警告 您正在为每个请求创建一个客户端,这比重用客户端效率低得多。 您没有在正在运行的事件循环中运行大部分代码。 没有必要的信号处理程序,如果你有长时间运行的异步任务,你可能想要使用add_signal_handler

这是我对您的代码的简化:

import asyncio
import aiohttp

lists = ['eth', 'btc', 'xmr', 'req', 'xlm', 'etc', 'omg', 'neo', 'btc', 'xmr', 'req', 'xlm', 'etc', 'omg', 'neo']


async def fetch(client, item):
    url = 'https://coincap.io/endpoint/coin_name'.format(
        endpoint='page',
        coin_name=item.upper()
    )
    async with client.get(url) as resp:
        assert resp.status == 200
        html = await resp.text()
        print(html)


async def main():
    async with aiohttp.ClientSession() as client:
        await asyncio.gather(*[
            asyncio.ensure_future(fetch(client, item))
            for item in lists
        ])


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

如果您想随后处理 html,您可以在 fetch 协程中进行处理,也可以对来自 gather 的所有结果进行操作。

【讨论】:

很好的答案,只是一个小问题:create_task() 应该优先使用 ensure_future() 如果你知道你有一个协程对象 - rationale by Guido。在这种情况下,两者都不需要,因为asyncio.gather(和asyncio.wait等)将正确处理传递的协程对象,或任何其他可以转换为Future的对象。 是的。我本来想改的,但忘了,在这种情况下ensure_future 只会调用create_task,但还是使用create_task 更好。 但是在这里你不需要它 - asyncio.gather(*(fetch(client, item) for item in lists)) 应该可以正常工作。 gather 是 explicitly documented 接受协程或期货。

以上是关于我如何*正确*在循环中运行 asyncio/aiohttp 请求?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 discord.py 机器人中正确使用任务/事件循环?

Qt5 - QML:如何正确连接 ProgressBar 和 Button 以进行长时间运行的循环计算

您如何在方案中运行等效的嵌套 for 循环

运行 SYCL 代码时结果不正确。在尝试并行化循环时

我正在尝试创建多个循环,但是如果一个循环返回null,则不会运行其余的循环。我该如何解决?

如何在 for 循环中并行处理。我的代码不正确地并行操作