并行请求在使用 asyncio 恰好 100 个请求后无限阻塞

Posted

技术标签:

【中文标题】并行请求在使用 asyncio 恰好 100 个请求后无限阻塞【英文标题】:Parallel requests block infinitely after exactly 100 requests using asyncio 【发布时间】:2020-08-17 09:00:04 【问题描述】:

我尝试过同时使用 httpx 和 aiohttp,两者都有这个硬编码限制。

import asyncio

import aiohttp
import httpx


async def main():
    client = aiohttp.ClientSession() 
    # client = httpx.AsyncClient(timeout=None)

    coros = [
        client.get(
            "https://query1.finance.yahoo.com/v8/finance/chart/",
            params="symbol": "ADANIENT.NS", "interval": "2m", "range": "60d",,
        )
        for _ in range(500)
    ]

    for i, coro in enumerate(asyncio.as_completed(coros)):
        await coro
        print(i, end=", ")


asyncio.run(main())

输出 -

0、1、2、3、4、5、6、7、8、9、10、11、12、13、14、15、16、17、18、19、20、21、22、23 , 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48 , 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73 , 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98 , 99

两个库都停留在 99

如果每个请求都使用新的 Session,则不会发生这种情况。

我做错了什么? asyncio 的重点不就是让这样的事情变得简单吗?


我尝试使用线程、zmq 和请求重新编写它,效果很好 -

import zmq

N_WORKERS = 100
N_ITERS = 500

ctx = zmq.Context.instance()


def worker():
    client = requests.Session()

    pull = ctx.socket(zmq.PULL)
    pull.connect("inproc://#1")

    push = ctx.socket(zmq.PUSH)
    push.connect("inproc://#2")

    while True:
        if not pull.recv_pyobj():
            return

        r = client.get(
            "https://query1.finance.yahoo.com/v8/finance/chart/",
            params="symbol": "ADANIENT.NS", "interval": "2m", "range": "60d",,
        )
        push.send_pyobj(r.content)


def ventilator():
    push = ctx.socket(zmq.PUSH)
    push.bind("inproc://#1")

    # distribute tasks to all workers
    for _ in range(N_ITERS):
        push.send_pyobj(True)

    # close down workers
    for _ in range(N_WORKERS):
        push.send_pyobj(False)



# start workers & ventilator
threads = [Thread(target=worker) for _ in range(N_WORKERS)]
threads.append(Thread(target=ventilator))
for t in threads:
    t.start()

# pull results from workers
pull = ctx.socket(zmq.PULL)
pull.bind("inproc://#2")

for i in range(N_ITERS):
    pull.recv_pyobj()
    print(i, end=", ")

# wait for workers to exit
for t in threads:
    t.join()

【问题讨论】:

【参考方案1】:

问题是您client.get(...) 返回一个请求对象,该对象带有操作系统级套接字的实时句柄。未能关闭该对象会导致 aiohttp 用尽套接字,即达到连接器限制,即100 by default。

要解决此问题,您需要关闭client.get() 返回的对象,或使用async with,这将确保对象在with 块完成后立即关闭。例如:

async def get(client):
    async with client.get(
            "https://query1.finance.yahoo.com/v8/finance/chart/",
            params="symbol": "ADANIENT.NS", "interval": "2m", "range": "60d",) as resp:
        pass

async def main():
    async with aiohttp.ClientSession() as client:
        coros = [get(client) for _ in range(500)]
        for i, coro in enumerate(asyncio.as_completed(coros)):
            await coro
            print(i, end=", ", flush=True)

asyncio.run(main())

另外,aiohttp.ClientSession对象也要关闭,也可以使用async with来完成,如上图。

【讨论】:

@DevAggarwal 您可以使用None 而不是math.inf,这可能只是偶然。我认为它不会取消共享会话的使用,因为会话包含连接器以外的东西。话虽如此,在了解为什么会发生阻塞之前,我仍然不会提高限制,甚至可能将其作为错误报告给 aiohttp 维护者。 @DevAggarwal 我发现了问题所在,问题是您从未关闭client.get() 返回的对象。请参阅编辑后的答案。 @DevAggarwal 一个有趣的挑战——让我试试吧!首先,我再次查看了您的示例,并意识到它实际上是可运行的,即它不需要一些内部设置。我运行了它,果然,重现了这个问题。然后我修改了代码以使用更小的限制 TCPConnector 来实例化会话,比如 10。仍然是相同的挂起 - 所以它不是服务器端块,也不太可能是 aiohttp 错误。 我挠了挠头想,什么可能会阻止连接器创建新连接?正如您在问题中所说,这种并行下载几乎是使用 aiohttp 可以尝试的最基本的东西,并且有很多教程如何做到这一点,运行得很好。 (我自己在这里写了一些。)所以我把注意力转移到你用下载的数据做什么 - 并且发现 - 什么都没有!有一个client.get,就在那里,传递给as_completed,就像那样。 我突然意识到client.get 不返回数据,而是一个知道如何检索数据的响应句柄。这个句柄有一个连接句柄,它肯定必须关闭才能释放该连接(关闭或重用于另一个请求)。在这一点上,我几乎可以肯定我找到了问题,因为指向该问题的标志堆积如山。就像您曾经看过 aiohttp 代码一样,它总是有所有这些 async with 块来处理响应 - 它们被精确地编写以确保响应对象被释放。所以,就是这样,希望对您有所帮助。

以上是关于并行请求在使用 asyncio 恰好 100 个请求后无限阻塞的主要内容,如果未能解决你的问题,请参考以下文章

一走进多线程

为啥 asyncio 的 run_in_executor 在发出 HTTP 请求时很少提供并行化?

在 Python 中使用 asyncio 并行化 Web 任务

使用 aiohttp/asyncio 发出 100 万个请求 - 字面意思

我如何在异步中使用请求?

为啥在 python 中对 asyncio 服务器的多个请求的时间会增加?