multiprocessing:maxtasksperchild和chunksize冲突?
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了multiprocessing:maxtasksperchild和chunksize冲突?相关的知识,希望对你有一定的参考价值。
我正在使用multiprocessing
中的Python 3.7
模块。我的代码未按预期工作(请参阅此问题here)。有人建议将maxtasksperchild
设置为1。然后,在阅读文档时,我认为最好也将chunksize
设置为1。这是相关的代码部分:
# Parallel Entropy Calculation
# ============================
node_combinations = [(i, j) for i in g.nodes for j in g.nodes]
pool = Pool(maxtaskperchild=1)
start = datetime.datetime.now()
logging.info("Start time: %s", start)
print("Start time: ", start)
results = pool.starmap(g._log_probability_path_ij, node_combinations, chunksize=1)
end = datetime.datetime.now()
print("End time: ", end)
print("Run time: ", end - start)
logging.info("End time: %s", end)
logging.info("Total run time: %s", start)
pool.close()
pool.join()
这事与愿违。仅设置maxtasksperchild
或仅设置chunksize
即可在预期的时间内完成工作(对于用于测试代码的较小数据集)。设置都无法完成,几秒钟后什么也没真正运行(我用htop
检查了内核是否在工作)。
问题
maxtasksperchild
和chunksize
设置在一起时是否冲突?他们是否做同样的事情?
maxtasksperchild
级别的Pool()
和chunksize
方法级别的Pool
?
============================================== ========
编辑
我了解从所提供的代码摘录中可能无法进行调试,请在下面找到完整的代码。模块graph
和graphfile
只是我编写的GitHub中的小程序库。如果您希望运行代码,则可以使用上述GitHub存储库中data/
目录中的任何文件。使用F2可以更好地进行简短测试,但是F1和F3会在HPC中造成麻烦。
import graphfile
import graph
from multiprocessing.pool import Pool
import datetime
import logging
def remove_i_and_f(edges):
new_edges = dict()
for k,v in edges.items():
if 'i' in k:
continue
elif 'f' in k:
key = (k[0],k[0])
new_edges[key] = v
else:
new_edges[k] = v
return new_edges
if __name__ == "__main__":
import sys
# Read data
# =========
graph_to_study = sys.argv[1]
full_path = "/ComplexNetworkEntropy/"
file = graphfile.GraphFile(full_path + "data/" + graph_to_study + ".txt")
edges = file.read_edges_from_file()
# logging
# =======
d = datetime.date.today().strftime("%Y_%m_%d")
log_filename = full_path + "results/" + d + "_probabilities_log_" + graph_to_study + ".log"
logging.basicConfig(filename=log_filename, level=logging.INFO, format='%(asctime)s === %(message)s')
logging.info("Graph to study: %s", graph_to_study)
logging.info("Date: %s", d)
# Process data
# ==============
edges = remove_i_and_f(edges)
g = graph.Graph(edges)
# Parallel Entropy Calculation
# ============================
node_combinations = [(i, j) for i in g.nodes for j in g.nodes]
pool = Pool(maxtasksperchild=1)
start = datetime.datetime.now()
logging.info("Start time: %s", start)
print("Start time: ", start)
results = pool.starmap(g._log_probability_path_ij, node_combinations, chunksize=1)
end = datetime.datetime.now()
print("End time: ", end)
print("Run time: ", end - start)
logging.info("End time: %s", end)
logging.info("Total run time: %s", start)
pool.close()
pool.join()
maxtasksperchild
确保在完成一定数量的任务后重新启动工作程序。换句话说,它会在运行给定函数的maxtaskperchild
迭代后终止该进程。提供此服务是为了防止由于长期运行的服务实施不当而导致的资源泄漏。
chunksize
将给定的集合/迭代器分为多个任务。然后,它将整个组运送到内部管道上,以减少进程间通信(IPC)的开销。收集元素仍将按1处理。chunksize
如果您有大量的小元素集合,并且IPC开销相对于元素本身的处理而言非常重要,则很有用。副作用是同一进程将处理整个块。
将两个参数都设置为1会极大地增加进程轮换和IPC,这两个进程都占用大量资源,尤其是在具有大量内核的机器上。
以上是关于multiprocessing:maxtasksperchild和chunksize冲突?的主要内容,如果未能解决你的问题,请参考以下文章
如何释放 multiprocessing.sharedctypes.RawValue 和 multiprocessing.sharedctypes.RawArray?