在 python 多处理中传递共享内存变量

Posted

技术标签:

【中文标题】在 python 多处理中传递共享内存变量【英文标题】:Passing shared memory variables in python multiprocessing 【发布时间】:2021-05-28 10:08:53 【问题描述】:

我有一堆文件,我想使用 Python 的多处理并行读取这些文件,并将所有数据收集到单个 NumPy 数组中。为此,我想定义一个共享内存 NumPy 数组并将其切片传递给不同的进程以并行读取。下面的代码中给出了我正在尝试做的玩具说明,我尝试使用多处理来修改 numpy 数组。

示例 1:


import numpy as np
import multiprocessing

def do_stuff(i, arr):
    arr[:]=i
    return

def print_error(err):
    print(err)

if __name__ == '__main__':
    idx = [0,1,2,3]
    # Need to fill this array in parallel
    arr = np.zeros(4)
    p = multiprocessing.Pool(4)
    # Passing slices to arr to modify using multiprocessing
    for i in idx:
        p.apply(do_stuff, args=(i,arr[i:i+1]))
    p.close()
    p.join()
    print(arr)

在这段代码中,我希望 arr 用 0、1、2、3 填充。但是,这会将 arr 打印为全零。看了here的答案后,我用multiprocessing.Array定义了共享内存变量,修改了我的代码如下

示例 2:

import numpy as np
import multiprocessing

def do_stuff(i, arr):
    arr[:]=i
    return

def print_error(err):
    print(err)

if __name__ == '__main__':
    idx = [0,1,2,3]
    p = multiprocessing.Pool(4)
    # Shared memory Array
    shared = multiprocessing.Array('d', 4)
    arr = np.ctypeslib.as_array(shared.get_obj())

    for i in idx:
        p.apply(do_stuff, args=(i,arr[i:i+1]))
    p.close()
    p.join()
    print(arr)

这也会为 arr 打印全零。但是,当我在 main 之外定义数组并使用 pool.map 时,代码可以工作。例如,以下代码有效

示例 3:

import numpy as np
import multiprocessing

shared = multiprocessing.Array('d', 4)
arr = np.ctypeslib.as_array(shared.get_obj())

def do_stuff(i):
    arr[i]=i
    return

def print_error(err):
    print(err)

if __name__ == '__main__':
    idx = [0,1,2,3]
    p = multiprocessing.Pool(4)
    shared = multiprocessing.Array('d', 4)
    p.map(do_stuff, idx)
    p.close()
    p.join()
    print(arr)
             

这将打印 [0,1,2,3]。

我对这一切感到非常困惑。我的问题是:

    当我定义 arr = np.zeros(4) 时,哪个处理器拥有这个变量?然后,当我将此数组的切片发送到不同的处理器时,如果未在这些处理器上定义此变量,则会发送什么。

    为什么示例 2 不起作用,而示例 3 起作用?

我正在研究 Linux 和 Python/3.7/4

【问题讨论】:

【参考方案1】:

当我定义 arr = np.zeros(4) 时,哪个处理器拥有这个变量?

只有主进程才能访问它。如果您使用“fork”作为启动方法,则子进程可以访问所有内容,但是一旦尝试修改它,它就会在修改之前复制到它自己的私有内存空间(写入时复制)。如果您有大型只读数组,这会减少开销,但对于将数据写回这些数组没有太大帮助。

如果这些处理器上未定义此变量,则会发送什么。

在通过管道和pickle 从主进程发送参数后重新构造参数时,在子进程中创建一个新数组。数据被序列化为文本并重新构造,因此除了切片中数据的值之外没有其他信息。这是一个全新的对象。

为什么示例 2 不起作用,而示例 3 起作用?

示例 3 有效,因为在“分叉”时(您调用 Pool 的那一刻),arr 已经创建,并将被共享。使用Array 创建它也很重要,因此当您尝试修改数据时,数据是共享的(具体机制很复杂)。

示例 2 的工作方式与示例 1 的工作方式不同:您将数组的切片作为参数传递,该参数将转换为全新的对象,因此您的 do_stuff 函数中的 arr 是只是主进程中arr[i:i+1] 的副本。在调用 Pool 之前创建将在进程之间共享的任何内容仍然很重要(如果您依赖“fork”来共享数据),但这不是此示例不起作用的原因。

您应该知道:示例 3 仅适用于您在 linux 上,并且默认启动方法是 fork。这不是首选的启动方法,因为在锁定状态下复制锁定对象可能会发生死锁。这在 Windows 上根本不起作用,并且默认情况下在 MacOS 3.8 及更高版本上也不起作用。

所有这一切的最佳解决方案(最可移植)是将Array 本身作为参数传递,并在子进程内重新构造 numpy 数组。这具有复杂性,“共享对象”只能在创建子进程时作为参数传递。如果您使用Process,这没什么大不了的,但是使用Pool,您基本上必须将任何共享对象作为参数传递给初始化函数,并将重新构造的数组作为孩子的全局变量范围。例如,在此示例中,尝试将 buf 作为参数传递给 p.mapp.apply 时会出错,但在将 buf 作为 initargs=(buf,) 传递给 Pool() 时不会出错

import numpy as np
from multiprocessing import Pool, Array

def init_child(buf):
    global arr #use global context (for each process) to pass arr to do_stuff
    arr = np.frombuffer(buf.get_obj(), dtype='d')

def do_stuff(i):
    global arr
    arr[i]=i

if __name__ == '__main__':
    idx = [0,1,2,3]
    
    buf = Array('d', 4)
    arr = np.frombuffer(buf.get_obj(), dtype='d')
    arr[:] = 0
    
    #"with" context is easier than writing "close" and "join" all the time
    with Pool(4, initializer=init_child, initargs=(buf,)) as p:
        for i in idx:
            p.apply(do_stuff, args=(i,)) #you could pass more args to get slice indices too
    print(arr)

对于 3.8 及更高版本,有一个新模块优于 Array 或任何其他 sharedctypes 类,称为:shared_memory。这使用起来有点复杂,并且有一些额外的依赖于操作系统的麻烦,但理论上它的开销更低且速度更快。如果你想进入兔子洞,我已经写了一个关于shared_memory 主题的few answers,如果你想看看我的答案,我最近一直在回答很多关于并发的问题从上一两个月开始。

【讨论】:

感谢您的详细回答,这对我的理解很有帮助。您提出的解决方案有效。但是,您说我的问题中的示例 2 不起作用,因为 arr 仅在分叉之后创建。我尝试在调用 Pool 之前移动 arr 定义,但这个示例仍然不起作用。你能解释一下为什么会这样吗?我还将查看您共享的链接,以提高我对这一切的理解, @DeepakDalakoti 示例 2 与示例 1 基本相同。arr 在全局范围内的位置无关紧要,因为参数会在 do_stuff 函数的本地范围内覆盖它.如果你省略传递arr,它不会被覆盖,你会得到全局版本。我有点错过了,可能应该在我的回答中提到它......

以上是关于在 python 多处理中传递共享内存变量的主要内容,如果未能解决你的问题,请参考以下文章

了解多处理:Python 中的共享内存管理、锁和队列

Python - 多处理和共享内存

python学习笔记——多进程中共享内存Value & Array

如何在 python 中跨多处理器共享内存?

多处理 - 具有多维 numpy 数组的共享内存

在 Python 多处理中将 Pool.map 与共享内存数组结合起来