在 python 多处理中传递共享内存变量
Posted
技术标签:
【中文标题】在 python 多处理中传递共享内存变量【英文标题】:Passing shared memory variables in python multiprocessing 【发布时间】:2021-05-28 10:08:53 【问题描述】:我有一堆文件,我想使用 Python 的多处理并行读取这些文件,并将所有数据收集到单个 NumPy 数组中。为此,我想定义一个共享内存 NumPy 数组并将其切片传递给不同的进程以并行读取。下面的代码中给出了我正在尝试做的玩具说明,我尝试使用多处理来修改 numpy 数组。
示例 1:
import numpy as np
import multiprocessing
def do_stuff(i, arr):
arr[:]=i
return
def print_error(err):
print(err)
if __name__ == '__main__':
idx = [0,1,2,3]
# Need to fill this array in parallel
arr = np.zeros(4)
p = multiprocessing.Pool(4)
# Passing slices to arr to modify using multiprocessing
for i in idx:
p.apply(do_stuff, args=(i,arr[i:i+1]))
p.close()
p.join()
print(arr)
在这段代码中,我希望 arr 用 0、1、2、3 填充。但是,这会将 arr 打印为全零。看了here的答案后,我用multiprocessing.Array定义了共享内存变量,修改了我的代码如下
示例 2:
import numpy as np
import multiprocessing
def do_stuff(i, arr):
arr[:]=i
return
def print_error(err):
print(err)
if __name__ == '__main__':
idx = [0,1,2,3]
p = multiprocessing.Pool(4)
# Shared memory Array
shared = multiprocessing.Array('d', 4)
arr = np.ctypeslib.as_array(shared.get_obj())
for i in idx:
p.apply(do_stuff, args=(i,arr[i:i+1]))
p.close()
p.join()
print(arr)
这也会为 arr 打印全零。但是,当我在 main 之外定义数组并使用 pool.map 时,代码可以工作。例如,以下代码有效
示例 3:
import numpy as np
import multiprocessing
shared = multiprocessing.Array('d', 4)
arr = np.ctypeslib.as_array(shared.get_obj())
def do_stuff(i):
arr[i]=i
return
def print_error(err):
print(err)
if __name__ == '__main__':
idx = [0,1,2,3]
p = multiprocessing.Pool(4)
shared = multiprocessing.Array('d', 4)
p.map(do_stuff, idx)
p.close()
p.join()
print(arr)
这将打印 [0,1,2,3]。
我对这一切感到非常困惑。我的问题是:
当我定义 arr = np.zeros(4) 时,哪个处理器拥有这个变量?然后,当我将此数组的切片发送到不同的处理器时,如果未在这些处理器上定义此变量,则会发送什么。
为什么示例 2 不起作用,而示例 3 起作用?
我正在研究 Linux 和 Python/3.7/4
【问题讨论】:
【参考方案1】:当我定义 arr = np.zeros(4) 时,哪个处理器拥有这个变量?
只有主进程才能访问它。如果您使用“fork”作为启动方法,则子进程可以访问所有内容,但是一旦尝试修改它,它就会在修改之前复制到它自己的私有内存空间(写入时复制)。如果您有大型只读数组,这会减少开销,但对于将数据写回这些数组没有太大帮助。
如果这些处理器上未定义此变量,则会发送什么。
在通过管道和pickle
从主进程发送参数后重新构造参数时,在子进程中创建一个新数组。数据被序列化为文本并重新构造,因此除了切片中数据的值之外没有其他信息。这是一个全新的对象。
为什么示例 2 不起作用,而示例 3 起作用?
示例 3 有效,因为在“分叉”时(您调用 Pool
的那一刻),arr
已经创建,并将被共享。使用Array
创建它也很重要,因此当您尝试修改数据时,数据是共享的(具体机制很复杂)。
示例 2 的工作方式与示例 1 的工作方式不同:您将数组的切片作为参数传递,该参数将转换为全新的对象,因此您的 do_stuff
函数中的 arr
是只是主进程中arr[i:i+1]
的副本。在调用 Pool
之前创建将在进程之间共享的任何内容仍然很重要(如果您依赖“fork”来共享数据),但这不是此示例不起作用的原因。
您应该知道:示例 3 仅适用于您在 linux 上,并且默认启动方法是 fork
。这不是首选的启动方法,因为在锁定状态下复制锁定对象可能会发生死锁。这在 Windows 上根本不起作用,并且默认情况下在 MacOS 3.8 及更高版本上也不起作用。
所有这一切的最佳解决方案(最可移植)是将Array
本身作为参数传递,并在子进程内重新构造 numpy 数组。这具有复杂性,“共享对象”只能在创建子进程时作为参数传递。如果您使用Process
,这没什么大不了的,但是使用Pool
,您基本上必须将任何共享对象作为参数传递给初始化函数,并将重新构造的数组作为孩子的全局变量范围。例如,在此示例中,尝试将 buf
作为参数传递给 p.map
或 p.apply
时会出错,但在将 buf
作为 initargs=(buf,)
传递给 Pool()
时不会出错
import numpy as np
from multiprocessing import Pool, Array
def init_child(buf):
global arr #use global context (for each process) to pass arr to do_stuff
arr = np.frombuffer(buf.get_obj(), dtype='d')
def do_stuff(i):
global arr
arr[i]=i
if __name__ == '__main__':
idx = [0,1,2,3]
buf = Array('d', 4)
arr = np.frombuffer(buf.get_obj(), dtype='d')
arr[:] = 0
#"with" context is easier than writing "close" and "join" all the time
with Pool(4, initializer=init_child, initargs=(buf,)) as p:
for i in idx:
p.apply(do_stuff, args=(i,)) #you could pass more args to get slice indices too
print(arr)
对于 3.8 及更高版本,有一个新模块优于 Array
或任何其他 sharedctypes
类,称为:shared_memory
。这使用起来有点复杂,并且有一些额外的依赖于操作系统的麻烦,但理论上它的开销更低且速度更快。如果你想进入兔子洞,我已经写了一个关于shared_memory
主题的few answers,如果你想看看我的答案,我最近一直在回答很多关于并发的问题从上一两个月开始。
【讨论】:
感谢您的详细回答,这对我的理解很有帮助。您提出的解决方案有效。但是,您说我的问题中的示例 2 不起作用,因为 arr 仅在分叉之后创建。我尝试在调用 Pool 之前移动 arr 定义,但这个示例仍然不起作用。你能解释一下为什么会这样吗?我还将查看您共享的链接,以提高我对这一切的理解, @DeepakDalakoti 示例 2 与示例 1 基本相同。arr
在全局范围内的位置无关紧要,因为参数会在 do_stuff
函数的本地范围内覆盖它.如果你省略传递arr
,它不会被覆盖,你会得到全局版本。我有点错过了,可能应该在我的回答中提到它......以上是关于在 python 多处理中传递共享内存变量的主要内容,如果未能解决你的问题,请参考以下文章