使用多处理并发执行一个 for 循环

Posted

技术标签:

【中文标题】使用多处理并发执行一个 for 循环【英文标题】:Using multiprocessing to concurrently execute a for-loop 【发布时间】:2022-01-03 23:01:58 【问题描述】:

我正在尝试在我的程序中实现多处理。 最初,我编写了这段代码。

pool = mp.Pool(mp.cpu_count())

for i in range(0, 10000):
    bid = i
    ask = i
    pool.apply_async(function1, args=(bid, ask,))
    pool.apply_async(function2, args=(bid, ask,))
    pool.apply_async(function3, args=(bid, ask,))
    pool.close()
    pool.join()

这给了我一个错误:

Python ValueError: Pool is still running

所以我将代码修改为:

for i in range(0, 10000):
    bid = i
    ask = i
    pool = mp.Pool(mp.cpu_count())
    pool.apply_async(function1, args=(bid, ask,))
    pool.apply_async(function2, args=(bid, ask,))
    pool.apply_async(function3, args=(bid, ask,))
    pool.close()
    pool.join()

这根本不执行并显示一个空白终端。

我想要实现的是对于范围内的每个值,我想并行运行 3 个函数,只有在执行这 3 个函数之后,它才应该移动到 range(0,1000) 中的下一个 i 值。

【问题讨论】:

如果你想要同步工作流,为什么要使用异步方法? 你应该使用 async io 而不是 mp 考虑使用多处理模块。参考this答案 你的循环中有pool.close()。池关闭后,您将永远无法向其发送更多工作。你的意思是closejoin 在循环之外吗?您将运行 30000 个任务。 多处理用于 CPU 绑定任务。线程用于网络绑定任务。多处理在后台实现线程。如果他想并行调用多个函数,他应该使用 async io。 【参考方案1】:

这是一个你可以适应的模式:

from concurrent.futures import ProcessPoolExecutor

def func1(a, b):
    pass

def func2(a, b):
    pass

def func3(a, b):
    pass

def main():
    with ProcessPoolExecutor() as executor: # work manager
        for i in range(1_000):
            futures = []
            for func in [func1, func2, func3]:
                futures.append(executor.submit(func, i, i))
            for future in futures:
                future.result() # wait for process to terminate

if __name__ == '__main__':
    main()

【讨论】:

以上是关于使用多处理并发执行一个 for 循环的主要内容,如果未能解决你的问题,请参考以下文章

一个for循环的Python脚本程序中如何加入多进程(并发进程)呢,急急急,在线等?

c# 并发编程系列之三:使用 Parallel 开始第1个多线程编码

python实现多线程并发执行

OpenCL – 执行动态 for 循环

python循环怎么用多线程去运行

shell脚本并发