共享的只读数据是不是复制到不同的进程以进行多处理?
Posted
技术标签:
【中文标题】共享的只读数据是不是复制到不同的进程以进行多处理?【英文标题】:Is shared readonly data copied to different processes for multiprocessing?共享的只读数据是否复制到不同的进程以进行多处理? 【发布时间】:2011-07-29 18:47:57 【问题描述】:我的这段代码看起来像这样:
glbl_array = # a 3 Gb array
def my_func( args, def_param = glbl_array):
#do stuff on args and def_param
if __name__ == '__main__':
pool = Pool(processes=4)
pool.map(my_func, range(1000))
有没有办法确保(或鼓励)不同的进程不会获得 glbl_array 的副本而是共享它。如果没有办法停止复制,我将使用 memmapped 数组,但我的访问模式不是很规律,所以我希望 memmapped 数组更慢。以上似乎是首先要尝试的。这是在 Linux 上。我只是想从 *** 获得一些建议,不想惹恼系统管理员。如果第二个参数是真正的不可变对象,如glbl_array.tostring()
,您认为会有帮助吗?
【问题讨论】:
我以为不同的进程不能共享内存变量 @Andrey:那你今天学到了一些东西 :) 【参考方案1】:您可以相当轻松地将来自multiprocessing
的共享内存内容与 Numpy 一起使用:
import multiprocessing
import ctypes
import numpy as np
shared_array_base = multiprocessing.Array(ctypes.c_double, 10*10)
shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
shared_array = shared_array.reshape(10, 10)
#-- edited 2015-05-01: the assert check below checks the wrong thing
# with recent versions of Numpy/multiprocessing. That no copy is made
# is indicated by the fact that the program prints the output shown below.
## No copy was made
##assert shared_array.base.base is shared_array_base.get_obj()
# Parallel processing
def my_func(i, def_param=shared_array):
shared_array[i,:] = i
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=4)
pool.map(my_func, range(10))
print shared_array
打印
[[ 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
[ 1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
[ 2. 2. 2. 2. 2. 2. 2. 2. 2. 2.]
[ 3. 3. 3. 3. 3. 3. 3. 3. 3. 3.]
[ 4. 4. 4. 4. 4. 4. 4. 4. 4. 4.]
[ 5. 5. 5. 5. 5. 5. 5. 5. 5. 5.]
[ 6. 6. 6. 6. 6. 6. 6. 6. 6. 6.]
[ 7. 7. 7. 7. 7. 7. 7. 7. 7. 7.]
[ 8. 8. 8. 8. 8. 8. 8. 8. 8. 8.]
[ 9. 9. 9. 9. 9. 9. 9. 9. 9. 9.]]
不过,Linux 在fork()
上具有写时复制语义,因此即使不使用multiprocessing.Array
,除非写入数据,否则数据不会被复制。
【讨论】:
太棒了!这是一个很好的答案。疑问一,shared_array、shared_array_base的定义是否需要if __name__ == '__main__':
保护。我担心的是,每次加载模块时,它们都会被重新定义并花费额外的空间。但我很可能错了。
唯一的约束wrt。多处理是在调用pool.map
之前定义了shared_array_base。 fork()
和 multiprocessing.Pool
不会重新导入模块,所以你唯一需要注意的是 my_func()
内部的内存分配。
请注意,在 Python 中 fork() 实际上意味着访问时复制(因为仅访问对象会更改其引用计数)。
复制只会复制引用计数整数所在的内存页。因此不会复制 Numpy 数组中的数据。
知道了。你应该使用 np.frombuffer(shared_array_base.get_obj()) 而不是 np.ctypeslib.as_array【参考方案2】:
以下代码适用于 Win7 和 Mac(可能适用于 linux,但未经测试)。
import multiprocessing
import ctypes
import numpy as np
#-- edited 2015-05-01: the assert check below checks the wrong thing
# with recent versions of Numpy/multiprocessing. That no copy is made
# is indicated by the fact that the program prints the output shown below.
## No copy was made
##assert shared_array.base.base is shared_array_base.get_obj()
shared_array = None
def init(shared_array_base):
global shared_array
shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
shared_array = shared_array.reshape(10, 10)
# Parallel processing
def my_func(i):
shared_array[i, :] = i
if __name__ == '__main__':
shared_array_base = multiprocessing.Array(ctypes.c_double, 10*10)
pool = multiprocessing.Pool(processes=4, initializer=init, initargs=(shared_array_base,))
pool.map(my_func, range(10))
shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
shared_array = shared_array.reshape(10, 10)
print shared_array
【讨论】:
【参考方案3】:对于那些坚持使用不支持fork()
(除非使用 CygWin)的 Windows 的人,pv 的答案不起作用。全局变量不可用于子进程。
相反,您必须在 Pool
/Process
的初始化过程中传递共享内存,如下所示:
#! /usr/bin/python
import time
from multiprocessing import Process, Queue, Array
def f(q,a):
m = q.get()
print m
print a[0], a[1], a[2]
m = q.get()
print m
print a[0], a[1], a[2]
if __name__ == '__main__':
a = Array('B', (1, 2, 3), lock=False)
q = Queue()
p = Process(target=f, args=(q,a))
p.start()
q.put([1, 2, 3])
time.sleep(1)
a[0:3] = (4, 5, 6)
q.put([4, 5, 6])
p.join()
(它不是 numpy 也不是好的代码,但它说明了这一点 ;-)
【讨论】:
【参考方案4】:如果您正在寻找一种在 Windows 上高效工作的选项,并且适用于不规则访问模式、分支和其他可能需要基于共享内存矩阵和进程的组合分析不同矩阵的场景 -本地数据,ParallelRegression 包中的 mathDict 工具包旨在处理这种确切的情况。
【讨论】:
以上是关于共享的只读数据是不是复制到不同的进程以进行多处理?的主要内容,如果未能解决你的问题,请参考以下文章