为并行进程提供共享只读资源
Posted
技术标签:
【中文标题】为并行进程提供共享只读资源【英文标题】:Providing shared read-only ressources to parallel processes 【发布时间】:2021-12-23 17:48:54 【问题描述】:我正在研究一个允许一些相当不成问题的并行化的问题。我很难弄清楚什么是合适的。 Python 中提供了并行化机制。我正在 MacOS 上使用 python 3.9。
我的管道是:
get_common_input()
以一种不易并行化的方式获取一些数据。如果这很重要,它的返回值 common_input_1
一个整数列表。
parallel_computation_1()
获取 common_input_1
和来自列表 individual_inputs
的单个输入。公共输入只能读取。
common_input_2
或多或少是从 parallel_computation_1()` 收集的输出。
parallel_computation_2()
然后再次将 common_input_2
作为只读输入,加上一些单独的输入。
我可以做到以下几点:
import multiprocessing
common_input_1 = None
common_input_2 = None
def parallel_computation_1(individual_input):
return sum(1 for i in common_input_1 if i == individual_input)
def parallel_computation_2(individual_input):
return individual_input in common_input_2
def main():
multiprocessing.set_start_method('fork')
global common_input_1
global common_input_2
common_input_1 = [1, 2, 3, 1, 1, 3, 1]
individual_inputs_1 = [0,1,2,3]
individual_inputs_2 = [0,1,2,3,4]
with multiprocessing.Pool() as pool:
common_input_2 = pool.map(parallel_computation_1, individual_inputs_1)
with multiprocessing.Pool() as pool:
common_output = pool.map(parallel_computation_2, individual_inputs_2)
print(common_output)
if __name__ == '__main__':
main()
正如this answer 所建议的,我使用全局变量来共享数据。如果我使用 set_start_method('fork')
(这对我有用,但在 MacOS 上似乎有问题),那会起作用。
请注意,如果我删除第二个 with multiprocessing.Pool()
以仅将一个池用于两个并行任务,则将无法正常工作(进程看不到 common_input_2
的新值)。
除了使用全局变量对我来说似乎是一种糟糕的编码风格(是吗?这只是我的直觉)之外,启动新池的需要并不让我满意,因为它引入了一些可能不必要的开销。
您如何看待这些问题,尤其是。第二个?
有没有好的选择?我看到我可以使用multiprocessing.Array
,但由于我的数据是列表列表,我需要将其展平为一个列表并以某种重要的方式在parallel_computation
中使用它。如果我的共享输入更加复杂,我将不得不付出相当大的努力将其包装到 multiprocessing.Value
或 multiprocessing.Array
中。
【问题讨论】:
我假设您使用的是多进程方法,因为计算是 CPU 密集型的,因此与此相比,创建进程池的时间应该可以忽略不计。 关于全局变量:如果您有许多修改它们的函数(尤其是在大型项目中),它们会使代码难以遵循;在你的情况下,你没有修改状态,所以它不应该是一个问题。 @IonutTicus 但是我怀疑从全局变量读取速度很慢是对的吗? 访问全局变量确实比访问局部变量慢,因为它们的优先级,但即使你访问它数千次,它仍然可以忽略不计;您可以创建一个本地引用(最好是您要使用的数据部分)以减轻一些开销。 【参考方案1】:您可以在创建进程池之前将output_1
定义和计算为全局变量;这样每个进程都可以访问数据;这不会导致任何内存重复,因为您没有更改该数据(写时复制)。
_output_1 = serial_computation()
def parallel_computation(input_2):
# here you can access _output_1
# you must not modify it as this will result in creating new copy in the child process
...
def main():
input_2 = ...
with Pool() as pool:
output_2 = pool.map(parallel_computation, input_2)
【讨论】:
我明白了,这似乎有效(如果我使用 fork 作为启动方法;应该说我在 MacOS 上工作,但到目前为止,它有效)。如果我有并行计算 -> 公共输出 -> 另一个并行计算,我该怎么办?我必须将公共输出分配给全局变量并启动一个新池,不是吗?似乎我不能重用同一个池,b / c如果在启动池后在主进程中说全局变量,它也会被复制。是否有可能只启动一次池? 感谢您的回答顺便说一句。我根据您的建议修改了我的问题。以上是关于为并行进程提供共享只读资源的主要内容,如果未能解决你的问题,请参考以下文章