为啥这种 for 循环并行化在 Python 中不起作用?

Posted

技术标签:

【中文标题】为啥这种 for 循环并行化在 Python 中不起作用?【英文标题】:Why does this for-loop paralellization doesn't work in Python?为什么这种 for 循环并行化在 Python 中不起作用? 【发布时间】:2021-05-22 20:11:57 【问题描述】:

我需要浏览 10,000 个文件夹,从每个文件夹中收集一些数据,将其添加到 3 个容器(c18、c17、c16、3 个最初为空的列表中,每个列表将填充 10,000 个数字),如果没有,这将需要永远并行化。

我的目标是使用 for 循环遍历所有文件夹(for i in range(10000)),并在 for 循环的每次迭代中分别将从每个文件夹中提取的 3 个值附加到 c18、c17、c16。 我还想显示一个进度条 - 大致了解需要多长时间。

我之前从未并行化循环或包含进度条。我曾尝试使用 SO。在阅读了一些答案后,我进入了我写的点:

pool = multiprocessing.Pool(4)
pool.imap(funct, tqdm.tqdm(range(len(a0s))) # or pool.map(funct, tqdm.tqdm(range(len(a0s))))

len(a0s) 产生 10,000 个。

函数 functdef funct(i): 并执行我上面写的操作:对于使用 for 循环变量 i(当前迭代次数)定义的给定文件夹,它完成提取 3 个值并附加它们的工作到 c18、c17、c16。

我在 main() 函数中调用 pool.imap(funct, tqdm.tqdm(range(len(a0s))),并在我编写的 .py 脚本末尾:

if __name__ == '__main__':
    main()

我正在导入:

import processing
import tqdm

但是,以上所有方法都不起作用。 我该如何进行?欢迎任何帮助。 谢谢!

a0s = np.loadtxt("Intensity_Wcm2_versus_a0_10_21_10_23_range.txt", usecols=(1,)) # has 10,000 entries
pool = multiprocessing.Pool(4)

top_folder_path = os.getcwd()
base_path = top_folder_path + "/a0_"

for i in range(len(a0s)):
    results_folder = base_path + ":.4f".format(a0s[i])
    if os.path.isdir(results_folder):
        os.chdir(results_folder)
        S = happi.Open(".")
        pbb = S.ParticleBinning(0).get() # charge states diagnostic
        c18.append(pbb['data'][-1][-1]) # first -1 is for last timestep recorded by diagnostic, second -1 is for last charge state (bare ions, Ar18+)
        c17.append(pbb['data'][-1][-2])
        c16.append(pbb['data'][-1][-2])
        print("###########################################################]#########")
        print("We have done the folder number: " + str(i) + " out of: " + str(len(a0s)))
        os.chdir(top_folder_path)

    else:
        continue
 
 def funct(i):
    results_folder = base_path + ":.4f".format(a0s[i])
    if os.path.isdir(results_folder):
        os.chdir(results_folder)
        S = happi.Open(".")
        pbb = S.ParticleBinning(0).get() # charge states diagnosti
        c18_val = pbb['data'][-1][-1]
        c17_val = pbb['data'][-1][-2]
        c16_val = pbb['data'][-1][-3]
        c18.append(c18_val)
        c17.append(c17_val)
        c16.append(c16_val)
    else:
        return

def main():
    pool.imap(funct, tqdm(range(len(a0s))))

if __name__ == '__main__':
    main()

【问题讨论】:

不要描述代码。将实际代码显示为minimal reproducible example。 明白,现在就去做。 @MarkTolonen,现在看起来更好了吗?谢谢 这不是minimal reproducible example,但它更好。我会给你写一个使用 tqdm 和多处理的通用示例,因为我无法按原样运行代码。 【参考方案1】:

这是一个用于多个进度条和多处理的模板。希望能帮助到你。我将其设置为期望在每个进程中更新 10 次,并添加一个睡眠作为并行化的“工作”。

import multiprocessing as mp
import tqdm
import time
from itertools import repeat

def funct(lock,i):
    with lock:
        bar = tqdm.tqdm(position=i,total=10,leave=False,ncols=100)
    bar.set_lock(lock)
    for _ in range(10):
        time.sleep(.2)
        bar.update(1)
    bar.close()
    return i*2

def main():
    lock = mp.Manager().Lock()
    with mp.Pool() as pool:
        result = pool.starmap(funct, zip(repeat(lock),range(8)))
    print()
    print(result)

if __name__ == '__main__':
    main()

【讨论】:

非常感谢。我几乎都明白了。但是,我无法在那里得到结果。程序在加载完第一个进度条后简单地进入一个无限循环(或类似的循环)。它不打印结果。我怀疑结果应该是一个数字,因为函数总是返回 i*2。实际上,它不会打印任何我告诉它在 with mp.Pool() as pool: 指令块之后打印的内容。我需要使用 Ctrl+Z(键盘中断)退出 python

以上是关于为啥这种 for 循环并行化在 Python 中不起作用?的主要内容,如果未能解决你的问题,请参考以下文章

为啥正常的 for 循环允许为结构字段分配值,而 for range 在 Golang 中不起作用? [复制]

为啥 MPI_SEND 在我的 for 循环中不起作用?如果明确说明它工作正常

为啥多处理会减慢嵌套的 for 循环?

为啥以下简单的并行化代码比 Python 中的简单循环慢得多?

为啥我的 while 循环中的条件在 python 中不起作用?

如何将python for循环从顺序转换为并行运行