在多处理进程之间共享大型只读 Numpy 数组
Posted
技术标签:
【中文标题】在多处理进程之间共享大型只读 Numpy 数组【英文标题】:Share Large, Read-Only Numpy Array Between Multiprocessing Processes 【发布时间】:2013-07-21 01:15:10 【问题描述】:我有一个 60GB SciPy 数组(矩阵),我必须在 5+ multiprocessing
Process
对象之间共享。我看过 numpy-sharedmem 并阅读了 SciPy 列表中的 this discussion。似乎有两种方法——numpy-sharedmem
和使用 multiprocessing.RawArray()
并将 NumPy dtype
s 映射到 ctype
s。现在,numpy-sharedmem
似乎是要走的路,但我还没有看到一个好的参考示例。我不需要任何类型的锁,因为数组(实际上是矩阵)将是只读的。现在,由于它的大小,我想避免复制。 听起来正确的方法是将数组的 only 副本创建为sharedmem
数组,然后将其传递给Process
对象?几个具体问题:
将 sharedmem 句柄实际传递给 sub-Process()
es 的最佳方法是什么?我需要一个队列来传递一个数组吗?管道会更好吗?我可以将它作为参数传递给 Process()
子类的 init(我假设它是腌制的)吗?
在我上面链接的讨论中,提到 numpy-sharedmem
不是 64 位安全的?我肯定使用了一些不可 32 位寻址的结构。
RawArray()
方法是否存在权衡?更慢、更麻烦?
numpy-sharedmem 方法是否需要任何 ctype-to-dtype 映射?
有没有人有一些开源代码的例子?我是一个动手能力很强的人,如果没有任何好的例子可以看,很难做到这一点。
如果我可以提供任何其他信息来帮助其他人澄清这一点,请发表评论,我会添加。谢谢!
这需要在 Ubuntu Linux 和也许 Mac OS 上运行,但可移植性并不是一个大问题。
【问题讨论】:
如果不同的进程要写入该数组,预计multiprocessing
会为每个进程复制整个内容。
@tiago:“我不需要任何类型的锁,因为数组(实际上是矩阵)将是只读的”
@tiago:此外,只要未明确告知(通过target_function
的参数),多处理就不会进行复制。操作系统只会在修改时将部分父内存复制到子内存空间。
这里是a RawArray-based example that should work both on *nix and Windows, and it also supports writing to the array。
我之前问过few questions 这个问题。我的解决方案可以在这里找到:github.com/david-hoffman/peaks/blob/…(对不起,代码是一场灾难)。
【参考方案1】:
为什么不使用多线程? 主进程的资源可以被其线程本地共享,因此多线程显然是共享主进程拥有的对象的更好方法。
如果担心python的GIL机制,或许可以求助nogil
的numba
。
【讨论】:
【参考方案2】:您可能还会发现查看pyro 的文档很有用,就好像您可以适当地划分您的任务一样,您可以使用它在不同机器上以及在同一台机器的不同内核上执行不同的部分。
【讨论】:
【参考方案3】:@Velimir Mlaker 给出了很好的答案。我想我可以添加一些 cmets 和一个小例子。
(我在 sharedmem 上找不到太多文档 - 这些是我自己实验的结果。)
-
您需要在子进程启动时传递句柄,还是在它启动后传递句柄?如果只是前者,您可以将
target
和args
参数用于Process
。这可能比使用全局变量更好。
从您链接的讨论页面看来,对 64 位 Linux 的支持似乎是不久前添加到 sharedmem 中的,因此这可能不是问题。
我不知道这个。
没有。请参阅下面的示例。
示例
#!/usr/bin/env python
from multiprocessing import Process
import sharedmem
import numpy
def do_work(data, start):
data[start] = 0;
def split_work(num):
n = 20
width = n/num
shared = sharedmem.empty(n)
shared[:] = numpy.random.rand(1, n)[0]
print "values are %s" % shared
processes = [Process(target=do_work, args=(shared, i*width)) for i in xrange(num)]
for p in processes:
p.start()
for p in processes:
p.join()
print "values are %s" % shared
print "type is %s" % type(shared[0])
if __name__ == '__main__':
split_work(4)
输出
values are [ 0.81397784 0.59667692 0.10761908 0.6736734 0.46349645 0.98340718
0.44056863 0.10701816 0.67167752 0.29158274 0.22242552 0.14273156
0.34912309 0.43812636 0.58484507 0.81697513 0.57758441 0.4284959
0.7292129 0.06063283]
values are [ 0. 0.59667692 0.10761908 0.6736734 0.46349645 0.
0.44056863 0.10701816 0.67167752 0.29158274 0. 0.14273156
0.34912309 0.43812636 0.58484507 0. 0.57758441 0.4284959
0.7292129 0.06063283]
type is <type 'numpy.float64'>
这个related question 可能有用。
【讨论】:
【参考方案4】:你可能对我写的一小段代码感兴趣:github.com/vmlaker/benchmark-sharedmem
唯一感兴趣的文件是main.py
。这是numpy-sharedmem 的基准——代码只是通过管道将数组(numpy
或sharedmem
)传递给生成的进程。工作人员只需在数据上调用sum()
。我只对比较两种实现之间的数据通信时间感兴趣。
我还写了另一个更复杂的代码:github.com/vmlaker/sherlock。
在这里,我使用numpy-sharedmem 模块通过 OpenCV 进行实时图像处理——图像是 NumPy 数组,根据 OpenCV 较新的cv2
API。这些图像,实际上是其中的引用,通过从multiprocessing.Manager
创建的字典对象在进程之间共享(与使用队列或管道相反)。与使用普通 NumPy 数组相比,我得到了很大的性能改进。
管道与队列:
根据我的经验,使用 Pipe 的 IPC 比 Queue 更快。这是有道理的,因为 Queue 添加了锁定以确保多个生产者/消费者的安全。管道没有。但是,如果您只有两个进程来回通信,则使用 Pipe 是安全的,或者,如文档所述:
...同时使用管道的不同端的进程不存在损坏风险。
sharedmem
安全:
sharedmem
模块的主要问题是程序不正常退出时内存泄漏的可能性。这在一个冗长的讨论here 中有描述。尽管在 2011 年 4 月 10 日,Sturla 提到了内存泄漏的修复,但从那以后我仍然遇到了泄漏,使用两个 repos,Sturla Molden 自己在 GitHub (github.com/sturlamolden/sharedmem-numpy) 和 Chris Lee-Messer 在 Bitbucket (bitbucket.org/cleemesser/numpy-sharedmem) 上的。
【讨论】:
谢谢,非常有用。不过,sharedmem
中的内存泄漏听起来很重要。有解决这个问题的线索吗?
除了注意到泄漏之外,我还没有在代码中寻找它。我在上面的“sharedmem 安全”下添加了我的答案,sharedmem
模块的两个开源存储库的维护者,以供参考。【参考方案5】:
如果您的数组那么大,您可以使用numpy.memmap
。例如,如果您有一个存储在磁盘中的数组,比如'test.array'
,即使在“写入”模式下,您也可以使用同步进程来访问其中的数据,但您的情况更简单,因为您只需要“读取”模式。
创建数组:
a = np.memmap('test.array', dtype='float32', mode='w+', shape=(100000,1000))
然后您可以像处理普通数组一样填充这个数组。例如:
a[:10,:100]=1.
a[10:,100:]=2.
删除变量a
时,数据会存储到磁盘中。
稍后您可以使用多个进程来访问test.array
中的数据:
# read-only mode
b = np.memmap('test.array', dtype='float32', mode='r', shape=(100000,1000))
# read and writing mode
c = np.memmap('test.array', dtype='float32', mode='r+', shape=(100000,1000))
相关答案:
Working with big data in python and numpy, not enough ram, how to save partial results on disc?
Is it possible to map a discontiuous data on disk to an array with python?
【讨论】:
那么在这种情况下,所有进程都可以访问同一个np.memmap
对象而无需复制,也不必以某种方式传递该对象?【参考方案6】:
如果您使用的是 Linux(或任何符合 POSIX 的系统),则可以将此数组定义为全局变量。 multiprocessing
在 Linux 上启动新子进程时使用 fork()
。新生成的子进程会自动与其父进程共享内存(copy-on-write 机制)。
既然你说“我不需要任何类型的锁,因为数组(实际上是一个矩阵)将是只读的”利用这种行为将是一种非常简单但非常有效的方法:所有孩子当读取这个大型 numpy 数组时,进程将访问物理内存中的相同数据。
不要将您的数组交给Process()
构造函数,这会将数据指示multiprocessing
到pickle
给孩子,这在您的情况下效率极低或不可能。在 Linux 上,在fork()
之后,子元素是使用相同物理内存的父元素的精确副本,因此您需要做的就是确保可以从target
函数中访问“包含”矩阵的 Python 变量你交给Process()
。这通常可以通过“全局”变量来实现。
示例代码:
from multiprocessing import Process
from numpy import random
global_array = random.random(10**4)
def child():
print sum(global_array)
def main():
processes = [Process(target=child) for _ in xrange(10)]
for p in processes:
p.start()
for p in processes:
p.join()
if __name__ == "__main__":
main()
在不支持fork()
的Windows 上——multiprocessing
正在使用win32 API 调用CreateProcess
。它从任何给定的可执行文件创建一个全新的进程。这就是为什么在 Windows 上,如果需要在父进程运行时创建的数据,需要将数据腌制给子进程。
【讨论】:
Copy-on-write 将复制包含引用计数器的页面(因此每个分叉的 python 都有自己的引用计数器),但它不会复制整个数据数组。 我要补充一点,我使用模块级变量比使用全局变量更成功......即将变量添加到分支之前的全局范围内的模块中 对偶然发现此问题/答案的人的警告:如果您碰巧将 OpenBLAS 链接的 Numpy 用于其 multithreadead 操作,请确保在使用时禁用其多线程(导出 OPENBLAS_NUM_THREADS=1)multiprocessing
或子进程在对共享的全局数组/矩阵执行线性代数运算时可能最终挂起(通常使用 one 处理器的 1/n,而不是 n 个处理器)。 known multithreaded conflict with OpenBLAS 似乎扩展到 Python multiprocessing
谁能解释为什么 python 不只使用操作系统fork
来传递给Process
的参数,而不是序列化它们?也就是说,在调用child
之前 不能将fork
应用于父进程,以便操作系统仍然可以使用参数值吗?似乎比序列化更高效?
我们都知道fork()
在 Windows 上不可用,我的回答和 cmets 中多次说明了这一点。我知道这是您最初的问题,我在 this 上方回答了四个 cmets:“折衷方案是默认情况下在两个平台上使用相同的参数传输方法,以获得更好的可维护性并确保平等行为。”。两种方式都有其优点和缺点,这就是为什么在 Python 3 中用户可以更灵活地选择方法。没有讨论细节,这个讨论是没有成效的,我们不应该在这里做。以上是关于在多处理进程之间共享大型只读 Numpy 数组的主要内容,如果未能解决你的问题,请参考以下文章
multiprocessing.Pool 跨子进程共享内存中只读的大型列表列表