与输入迭代相比,multiprocessing.Pool 返回不同长度的输出
Posted
技术标签:
【中文标题】与输入迭代相比,multiprocessing.Pool 返回不同长度的输出【英文标题】:multiprocessing.Pool returns different length of output compared to the input iterable 【发布时间】:2021-12-07 12:17:39 【问题描述】:我编写了一个 Python 程序,我想在调用程序 (MyProgram.__call__()
) 时使用 multiprocessing.Pool
对其进行并行化。预期的输出是一个字典列表 (dicts
),其长度与输入列表 images
相同。但是,当我使用 20 cpus 的 multiprocessing.Pool
使用长度为 60 的输入对其进行测试时,我得到的输出长度仅为 41。
下面是我的代码:
from acat.utilities import neighbor_shell_list, get_adj_matrix, get_max_delta_sum_path
from acat.build.adlayer import StochasticPatternGenerator as SPG
from acat.build.ordering import RandomOrderingGenerator as ROG
from ase.build import fcc111
from ase.io import read
from multiprocessing import Pool
import networkx as nx
import numpy as np
import os
class MyProgram(object):
def __init__(self, alpha=.75, n_jobs=os.cpu_count()):
self.alpha = alpha
self.n_jobs = n_jobs
def __call__(self, images):
# Parallelization
pool = Pool(self.n_jobs)
dicts = pool.map(self.get_dict, images)
return dicts
def get_dict(self, atoms):
d =
numbers = atoms.numbers
nblist = neighbor_shell_list(atoms, dx=0.3, neighbor_number=1, mic=True)
A = get_adj_matrix(nblist)
for i in range(len(A)):
nbrs = np.where(A[i] == 1)[0]
An = A[nbrs,:][:,nbrs]
Gn = nx.from_numpy_matrix(An)
path = max(nx.all_simple_paths(Gn, source=0, target=next(Gn.neighbors(0))),
key=lambda x: len(x))
path_numbers = list(numbers[nbrs[path]])
sorted_numbers = get_max_delta_sum_path(path_numbers)
lab1 = str(numbers[i])
lab2 = lab1 + ':' + ','.join(map(str, sorted_numbers))
labs = [lab1, lab2]
for idx, lab in enumerate(labs):
if idx == 0:
factor = 1
elif idx == 1:
factor = self.alpha
if lab in d:
d[lab] += factor
else:
d[lab] = factor
return d
if __name__ == '__main__':
MP = MyProgram(alpha=.75, n_jobs=20)
slab = fcc111('Pt', (4, 4, 4))
slab.center(vacuum=5., axis=2)
rog = ROG(slab, elements=['Ni', 'Pt'])
rog.run(num_gen=10)
slabs = read('orderings.traj', index=':')
spg = SPG(slabs, surface='fcc111',
adsorbate_species=['CO','OH','C'],
min_adsorbate_distance=3.,
composition_effect=True)
spg.run(num_gen=60, action='add', unique=False)
images = read('patterns.traj', index=':')
dicts = MP(images)
print(len(images))
print(len(dicts))
输出
60
41
有谁知道为什么multiprocessing.Pool
从输入返回不同长度的输出?不幸的是,我在使用简化代码时无法重现这种现象。但是如果有人想运行我的代码,你只需要通过pip3 install acat
安装acat
。提前致谢。
【问题讨论】:
【参考方案1】:尝试将 call 更改为:
with Pool(self.n_jobs) as pool:
dicts = pool.map(self.get_dict, images)
return dicts
我怀疑问题是__call__
在所有工作完成之前返回。 len
可能以某种方式只看到已完成的工作,而不是所有工作。
【讨论】:
我不太确定这是问题所在,(尽管我不是 100% 确定,并且无论如何使用with
是一种很好的做法)。 pool.map
通常应该等待正确数量的输出。它在计算开始时需要len(iterable)
,并等待那么多结果......
感谢您的建议,但这仍然不能解决问题。我也试过pool.close()
和pool.terminate()
,但都没有帮助。当我尝试在每个self.get_dict
中输入print(d)
时,它只打印出14 个结果,甚至少于输出的长度。非常混乱。
对不起。值得一试。这真的很奇怪。
@FrankYellin 我想通了。这是next(Gn.neighbors(0))
的问题。由于其中一些生成器是空的,它会引发StopIteration
错误,但由于某种原因,此错误不会显示在multiprocessing.Pool
中。我将开始投票结束这篇文章。
我想知道multiprocessing.Pool中是否有需要报告的错误。我很高兴你发现了问题。真的没有必要结束这个问题。以上是关于与输入迭代相比,multiprocessing.Pool 返回不同长度的输出的主要内容,如果未能解决你的问题,请参考以下文章
逐行迭代文本文件的内容 - 是不是有最佳实践? (与 PMD 的 AssignmentInOperand 相比)
为什么我的递归Fibonacci实现与迭代实现相比如此之慢?