是否可以限制在 asyncio 中同时运行的协程数量?

Posted

技术标签:

【中文标题】是否可以限制在 asyncio 中同时运行的协程数量?【英文标题】:Is it possible to limit the number of coroutines running corcurrently in asyncio? 【发布时间】:2018-05-12 17:14:06 【问题描述】:

我已经使用 asyncio 编写了我的脚本,但发现同时运行的协程数量太多,并且经常会挂起。

所以我想限制并发的协程数量,一旦达到限制,我想等待任何一个协程完成后再执行另一个。

我当前的代码如下所示:

loop = asyncio.get_event_loop()
p = map(my_func, players)
result = loop.run_until_complete(asyncio.gather(*p))

async def my_func(player):
    # something done with `await`

players 的类型为 list,包含许多元素(例如 12000)。在asyncio.gather(*p)中同时运行所有这些需要大量的计算资源,所以我希望同时运行的玩家数量为200。一旦达到199,我希望另一个协程开始执行。

这在 asyncio 中可行吗?

【问题讨论】:

asyncio queueing 库可能对您有用吗? 协程实际上并不是同时运行的,如果你是这么想的话。他们轮流进行。 @castis 谢谢,让我看看... 这能回答你的问题吗? How to limit concurrency with Python asyncio? 【参考方案1】:

我可以建议使用asyncio.BoundedSemaphore

import asyncio

async def my_func(player, asyncio_semaphore):
    async with asyncio_semaphore:
        # do stuff

async def main():
    asyncio_semaphore = asyncio.BoundedSemaphore(200)
    jobs = []
    for i in range(12000):
        jobs.append(asyncio.ensure_future(my_func(players[i], asyncio_semaphore)))
    await asyncio.gather(*jobs)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.set_debug(True)
    loop.run_until_complete(main())

这样一来,只有 200 个并发任务可以获取信号量并使用系统资源,而手头有 12000 个任务。

【讨论】:

请注意,您不需要BoundedSemaphore - 普通的Semaphore(200) 将具有相同的效果。 BoundedSemaphore 有不同的用途 - 它被设计为与普通的 Semaphore 不同,当信号量释放 的次数超过它被获取的次数时引发异常(而不是阻塞)。仅使用with 获取/释放它时不会发生这种情况。 @user4815162342 感谢您的信息!我去看看。 非常适合我,非常感谢。【参考方案2】:

您可以包装您的收集并强制执行信号量:

import asyncio

async def semaphore_gather(num, coros, return_exceptions=False):
    semaphore = asyncio.Semaphore(num)

    async def _wrap_coro(coro):
        async with semaphore:
            return await coro

    return await asyncio.gather(
        *(_wrap_coro(coro) for coro in coros), return_exceptions=return_exceptions
    )

# async def a():
#     return 1

# print(asyncio.run(semaphore_gather(10, [a() for _ in range(100)])))
# [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

【讨论】:

这是一个很好的选择,感谢您提供了一个不需要进入我所有 coros 的选项。 仅供参考,我最近遇到了aioitertools.asyncio.gather,允许以更便宜的方式限制并发。不过请谨慎使用,因为它是一个相当自定义的实现。【参考方案3】:

您可能需要考虑将aiostream.stream.map 与task_limit 参数一起使用:

from aiostream import stream, pipe

async def main():
    xs = stream.iterate(players)
    ys = stream.map(xs, my_func, task_limit=100)
    zs = stream.list(ys)
    results = await zs

使用管道的方法相同:

async def main():
    results = await (
        stream.iterate(players) | 
        pipe.map(my_func, task_limit=100) |
        pipe.list())

请参阅aiostream project page 和documentation 了解更多信息。

免责声明:我是项目维护者。

【讨论】:

以上是关于是否可以限制在 asyncio 中同时运行的协程数量?的主要内容,如果未能解决你的问题,请参考以下文章

golang中最大协程数的限制(线程)

使用 asyncio.gather() 的协程/期货的经过时间

Python中的协程与asyncio原理

Python中的协程与asyncio原理

Python中的协程与asyncio原理

asyncio 异步编程