与输入迭代相比,multiprocessing.Pool 返回不同长度的输出

Posted

技术标签:

【中文标题】与输入迭代相比,multiprocessing.Pool 返回不同长度的输出【英文标题】:multiprocessing.Pool returns different length of output compared to the input iterable 【发布时间】:2021-12-07 12:17:39 【问题描述】:

我编写了一个 Python 程序,我想在调用程序 (MyProgram.__call__()) 时使用 multiprocessing.Pool 对其进行并行化。预期的输出是一个字典列表 (dicts),其长度与输入列表 images 相同。但是,当我使用 20 cpus 的 multiprocessing.Pool 使用长度为 60 的输入对其进行测试时,我得到的输出长度仅为 41。

下面是我的代码:

from acat.utilities import neighbor_shell_list, get_adj_matrix, get_max_delta_sum_path
from acat.build.adlayer import StochasticPatternGenerator as SPG
from acat.build.ordering import RandomOrderingGenerator as ROG
from ase.build import fcc111
from ase.io import read
from multiprocessing import Pool
import networkx as nx
import numpy as np
import os

class MyProgram(object):

    def __init__(self, alpha=.75, n_jobs=os.cpu_count()):
        self.alpha = alpha
        self.n_jobs = n_jobs

    def __call__(self, images):
        # Parallelization
        pool = Pool(self.n_jobs)
        dicts = pool.map(self.get_dict, images)
        return dicts

    def get_dict(self, atoms):
        d = 
        numbers = atoms.numbers
        nblist = neighbor_shell_list(atoms, dx=0.3, neighbor_number=1, mic=True)
        A = get_adj_matrix(nblist)
        for i in range(len(A)):
            nbrs = np.where(A[i] == 1)[0]
            An = A[nbrs,:][:,nbrs]
            Gn = nx.from_numpy_matrix(An)
            path = max(nx.all_simple_paths(Gn, source=0, target=next(Gn.neighbors(0))),
                       key=lambda x: len(x))
            path_numbers = list(numbers[nbrs[path]])
            sorted_numbers = get_max_delta_sum_path(path_numbers)
            lab1 = str(numbers[i])
            lab2 = lab1 + ':' + ','.join(map(str, sorted_numbers))
            labs = [lab1, lab2]
            for idx, lab in enumerate(labs):
                if idx == 0:
                    factor = 1
                elif idx == 1:
                    factor = self.alpha
                if lab in d:
                    d[lab] += factor
                else:
                    d[lab] = factor
        return d

if __name__ == '__main__':
    MP = MyProgram(alpha=.75, n_jobs=20)
    slab = fcc111('Pt', (4, 4, 4))
    slab.center(vacuum=5., axis=2)
    rog = ROG(slab, elements=['Ni', 'Pt'])
    rog.run(num_gen=10)
    slabs = read('orderings.traj', index=':')
    spg = SPG(slabs, surface='fcc111',
              adsorbate_species=['CO','OH','C'],
              min_adsorbate_distance=3.,
              composition_effect=True)
    spg.run(num_gen=60, action='add', unique=False)
    images = read('patterns.traj', index=':')
    dicts = MP(images)
    print(len(images))
    print(len(dicts))

输出

60
41

有谁知道为什么multiprocessing.Pool 从输入返回不同长度的输出?不幸的是,我在使用简化代码时无法重现这种现象。但是如果有人想运行我的代码,你只需要通过pip3 install acat 安装acat。提前致谢。

【问题讨论】:

【参考方案1】:

尝试将 call 更改为:

with Pool(self.n_jobs) as pool:
    dicts = pool.map(self.get_dict, images)
return dicts

我怀疑问题是__call__ 在所有工作完成之前返回。 len 可能以某种方式只看到已完成的工作,而不是所有工作。

【讨论】:

我不太确定这是问题所在,(尽管我不是 100% 确定,并且无论如何使用 with 是一种很好的做法)。 pool.map 通常应该等待正确数量的输出。它在计算开始时需要len(iterable),并等待那么多结果...... 感谢您的建议,但这仍然不能解决问题。我也试过pool.close()pool.terminate(),但都没有帮助。当我尝试在每个self.get_dict 中输入print(d) 时,它只打印出14 个结果,甚至少于输出的长度。非常混乱。 对不起。值得一试。这真的很奇怪。 @FrankYellin 我想通了。这是next(Gn.neighbors(0)) 的问题。由于其中一些生成器是空的,它会引发StopIteration 错误,但由于某种原因,此错误不会显示在multiprocessing.Pool 中。我将开始投票结束这篇文章。 我想知道multiprocessing.Pool中是否有需要报告的错误。我很高兴你发现了问题。真的没有必要结束这个问题。

以上是关于与输入迭代相比,multiprocessing.Pool 返回不同长度的输出的主要内容,如果未能解决你的问题,请参考以下文章

逐行迭代文本文件的内容 - 是不是有最佳实践? (与 PMD 的 AssignmentInOperand 相比)

与迭代两个大型 Pandas 数据框相比,效率更高

为什么我的递归Fibonacci实现与迭代实现相比如此之慢?

与非多处理情况相比,使用 AllenNLP 解码的多处理是缓慢的

迭代器运算

multiprocessing模块