使用 aiohttp/asyncio 发出 100 万个请求 - 字面意思
Posted
技术标签:
【中文标题】使用 aiohttp/asyncio 发出 100 万个请求 - 字面意思【英文标题】:Making 1 milion requests with aiohttp/asyncio - literally 【发布时间】:2016-12-14 08:29:10 【问题描述】:我跟进了本教程:https://pawelmhm.github.io/asyncio/python/aiohttp/2016/04/22/asyncio-aiohttp.html,当我处理 50 000 个请求时,一切正常。但我需要进行 100 万次 API 调用,然后我对这段代码有疑问:
url = "http://some_url.com/?id="
tasks = set()
sem = asyncio.Semaphore(MAX_SIM_CONNS)
for i in range(1, LAST_ID + 1):
task = asyncio.ensure_future(bound_fetch(sem, url.format(i)))
tasks.add(task)
responses = asyncio.gather(*tasks)
return await responses
因为 Python 需要创建 100 万个任务,它基本上只是滞后,然后在终端中打印 Killed
消息。有没有办法使用由预制的 URL 集(或列表)生成的生成器?谢谢。
【问题讨论】:
信号量不是为了那个吗?即使我将信号量设置为 10,我也会收到延迟和“Killed”消息。 我尝试了链接中的示例并且它有效。正如作者所说,1000000 个请求大约需要 11-12 分钟。我什至调整了它以使用 python3.4。在这里工作。也许您的代码在其他地方还有其他问题。你能发布整个代码吗? 你能在程序被杀死和/或 Python 解释器得到的完整错误后发布dmesg
命令输出吗?应该是 MemoryError。
【参考方案1】:
一次安排所有 100 万个任务
这是您正在谈论的代码。它最多需要 3 GB RAM,因此如果您的可用内存不足,它很容易被操作系统终止。
import asyncio
from aiohttp import ClientSession
MAX_SIM_CONNS = 50
LAST_ID = 10**6
async def fetch(url, session):
async with session.get(url) as response:
return await response.read()
async def bound_fetch(sem, url, session):
async with sem:
await fetch(url, session)
async def fetch_all():
url = "http://localhost:8080/?id="
tasks = set()
async with ClientSession() as session:
sem = asyncio.Semaphore(MAX_SIM_CONNS)
for i in range(1, LAST_ID + 1):
task = asyncio.create_task(bound_fetch(sem, url.format(i), session))
tasks.add(task)
return await asyncio.gather(*tasks)
if __name__ == '__main__':
asyncio.run(fetch_all())
使用队列简化工作
这是我对如何使用asyncio.Queue 将 URL 传递给工作任务的建议。队列按需要填充,没有预先制作的 URL 列表。
它只需要 30 MB RAM :)
import asyncio
from aiohttp import ClientSession
MAX_SIM_CONNS = 50
LAST_ID = 10**6
async def fetch(url, session):
async with session.get(url) as response:
return await response.read()
async def fetch_worker(url_queue):
async with ClientSession() as session:
while True:
url = await url_queue.get()
try:
if url is None:
# all work is done
return
response = await fetch(url, session)
# ...do something with the response
finally:
url_queue.task_done()
# calling task_done() is necessary for the url_queue.join() to work correctly
async def fetch_all():
url = "http://localhost:8080/?id="
url_queue = asyncio.Queue(maxsize=100)
worker_tasks = []
for i in range(MAX_SIM_CONNS):
wt = asyncio.create_task(fetch_worker(url_queue))
worker_tasks.append(wt)
for i in range(1, LAST_ID + 1):
await url_queue.put(url.format(i))
for i in range(MAX_SIM_CONNS):
# tell the workers that the work is done
await url_queue.put(None)
await url_queue.join()
await asyncio.gather(*worker_tasks)
if __name__ == '__main__':
asyncio.run(fetch_all())
【讨论】:
【参考方案2】:asyncio 受内存限制(与任何其他程序一样)。您无法生成更多内存可以容纳的任务。我的猜测是你达到了内存限制。查看 dmesg 了解更多信息。
100 万个 RPS 并不意味着有 100 万个任务。一个任务可以在同一秒内完成多个请求。
【讨论】:
以上是关于使用 aiohttp/asyncio 发出 100 万个请求 - 字面意思的主要内容,如果未能解决你的问题,请参考以下文章
谁能说出WiFi4、WiFi5、WiFi6在100兆宽带下的实际发出网速是多少?
使用 curl 发出请求,相当于 REST_CLIENT 请求