为并行进程提供共享只读资源

Posted

技术标签:

【中文标题】为并行进程提供共享只读资源【英文标题】:Providing shared read-only ressources to parallel processes 【发布时间】:2021-12-23 17:48:54 【问题描述】:

我正在研究一个允许一些相当不成问题的并行化的问题。我很难弄清楚什么是合适的。 Python 中提供了并行化机制。我正在 MacOS 上使用 python 3.9。

我的管道是:

get_common_input() 以一种不易并行化的方式获取一些数据。如果这很重要,它的返回值 common_input_1 一个整数列表。 parallel_computation_1() 获取 common_input_1 和来自列表 individual_inputs 的单个输入。公共输入只能读取。 common_input_2 或多或少是从 parallel_computation_1()` 收集的输出。 parallel_computation_2() 然后再次将 common_input_2 作为只读输入,加上一些单独的输入。

我可以做到以下几点:

import multiprocessing
common_input_1 = None
common_input_2 = None

def parallel_computation_1(individual_input):
    return sum(1 for i in common_input_1 if i == individual_input)

def parallel_computation_2(individual_input):
    return individual_input in common_input_2

def main():
    multiprocessing.set_start_method('fork')
    global common_input_1
    global common_input_2
    common_input_1      = [1, 2, 3, 1, 1, 3, 1]
    individual_inputs_1 = [0,1,2,3]
    individual_inputs_2 = [0,1,2,3,4]
    with multiprocessing.Pool() as pool:
        common_input_2 = pool.map(parallel_computation_1, individual_inputs_1)
    with multiprocessing.Pool() as pool:
        common_output = pool.map(parallel_computation_2, individual_inputs_2)
    print(common_output)

if __name__ == '__main__':
    main()

正如this answer 所建议的,我使用全局变量来共享数据。如果我使用 set_start_method('fork')(这对我有用,但在 MacOS 上似乎有问题),那会起作用。

请注意,如果我删除第二个 with multiprocessing.Pool() 以仅将一个池用于两个并行任务,则将无法正常工作(进程看不到 common_input_2 的新值)。

除了使用全局变量对我来说似乎是一种糟糕的编码风格(是吗?这只是我的直觉)之外,启动新池的需要并不让我满意,因为它引入了一些可能不必要的开销。

您如何看待这些问题,尤其是。第二个?

有没有好的选择?我看到我可以使用multiprocessing.Array,但由于我的数据是列表列表,我需要将其展平为一个列表并以某种重要的方式在parallel_computation 中使用它。如果我的共享输入更加复杂,我将不得不付出相当大的努力将其包装到 multiprocessing.Valuemultiprocessing.Array 中。

【问题讨论】:

我假设您使用的是多进程方法,因为计算是 CPU 密集型的,因此与此相比,创建进程池的时间应该可以忽略不计。 关于全局变量:如果您有许多修改它们的函数(尤其是在大型项目中),它们会使代码难以遵循;在你的情况下,你没有修改状态,所以它不应该是一个问题。 @IonutTicus 但是我怀疑从全局变量读取速度很慢是对的吗? 访问全局变量确实比访问局部变量慢,因为它们的优先级,但即使你访问它数千次,它仍然可以忽略不计;您可以创建一个本地引用(最好是您要使用的数据部分)以减轻一些开销。 【参考方案1】:

您可以在创建进程池之前将output_1 定义和计算为全局变量;这样每个进程都可以访问数据;这不会导致任何内存重复,因为您没有更改该数据(写时复制)。

_output_1 = serial_computation()


def parallel_computation(input_2):
    # here you can access _output_1
    # you must not modify it as this will result in creating new copy in the child process
    ...


def main():
    input_2 = ...
    with Pool() as pool:
        output_2 = pool.map(parallel_computation, input_2)

【讨论】:

我明白了,这似乎有效(如果我使用 fork 作为启动方法;应该说我在 MacOS 上工作,但到目前为止,它有效)。如果我有并行计算 -> 公共输出 -> 另一个并行计算,我该怎么办?我必须将公共输出分配给全局变量并启动一个新池,不是吗?似乎我不能重用同一个池,b / c如果在启动池后在主进程中说全局变量,它也会被复制。是否有可能只启动一次池? 感谢您的回答顺便说一句。我根据您的建议修改了我的问题。

以上是关于为并行进程提供共享只读资源的主要内容,如果未能解决你的问题,请参考以下文章

查询处理器未能为执行并行查询启动必要的线程资源啥意思

什么是并发和并行?什么是进程和线程?进程和线程的区别

线程,进程。多进程,多线程。并发,并行的区别

python3多线程

进程与线程并行与并发的理解

进程线程关系