并行化多次调用异步函数

Posted

技术标签:

【中文标题】并行化多次调用异步函数【英文标题】:Paralelize multiple calls of asynchronous function 【发布时间】:2020-06-23 18:03:51 【问题描述】:

我有以下场景:

from time import sleep

async def do_a(a):
    sleep(0.001)
    return 1*a

async def do_b(a):
    sleep(0.01)
    return 2*a

async def do_c(b):
    sleep(1)
    return 3*b


async def my_func():
    results = []
    for i in range(3):
        a = await do_a(i)
        b = await do_b(a)
        c = await do_c(b)
        results.append(c)
    return results

if __name__ == "__main__":
    import asyncio
    print(asyncio.run(my_func()))

基本上,我在循环中调用异步函数。执行上面的代码显示它在~3s 内运行。我想并行调用每个过程,因此预期时间会下降到〜1s(我知道这个开销有点过于乐观,但至少要优化一点运行时间)。我一直在研究我认为可以提供帮助的不同 python 库,但是在确定在这种情况下哪个有用时遇到了麻烦。 Python 的多处理、线程和 concurrent.futures 似乎都实现了并行/并发的一种或另一种形式。 我该怎么办?你能告诉我在这种情况下你将如何处理吗?

【问题讨论】:

哦,我打算提供一个小例子。我将睡眠调用放在我做“繁重”工作的地方(在 do_a 中查询数据库,在 do_b 中从磁盘读取,在 do_c 中进行矩阵乘法......) 【参考方案1】:

您应该使用asyncio.sleep 而不是time.sleep。如果您希望所有内容同时运行,这是您可以使用asyncio.gather 的一种方法:

import asyncio

async def do_a(a):
    await asyncio.sleep(0.001)
    return 1*a

async def do_b(a):
    await asyncio.sleep(0.01)
    return 2*a

async def do_c(b):
    await asyncio.sleep(1)
    return 3*b

async def do_abc(i):
    a = await do_a(i)
    b = await do_b(a)
    return await do_c(b)

async def my_func():
    return await asyncio.gather(*map(do_abc, range(3)))

if __name__ == "__main__":
    import asyncio
    print(asyncio.run(my_func()))
    # [0, 6, 12]

如果运行而不是 sleep 的实际代码是同步的(阻塞),您基本上会做同样的事情,只是您必须将该工作推迟到 executor。

【讨论】:

非常感谢!我仍然需要更深入地研究 executor,但似乎不是我的情况。

以上是关于并行化多次调用异步函数的主要内容,如果未能解决你的问题,请参考以下文章

我可以使用 OpenACC 并行化调用某些函数的大代码吗?

CUDA:并行化具有嵌套循环的函数调用的多个嵌套for循环

使用 Ray 并行化大型程序的正确方法

“线程安全”函数是不是依赖于并行化框架?

如何并行化 R 中包的函数

测试在内部使用 SemaphoreSlim 以实现并行化的异步方法