多处理 - 具有多维 numpy 数组的共享内存

Posted

技术标签:

【中文标题】多处理 - 具有多维 numpy 数组的共享内存【英文标题】:Multiprocessing - shared memory with multidimensional numpy array 【发布时间】:2018-10-20 13:46:12 【问题描述】:

我需要并行处理一个非常大的 numpy 数组 (55x117x256x256)。尝试使用通常的多处理方法传递它会产生 AssertionError,我理解这是因为数组太大而无法复制到每个进程中。因此,我想尝试将共享内存与多处理一起使用。 (我对其他方法持开放态度,只要它们不太复杂)。

我看到一些关于使用 python 多处理的共享内存方法的问题,例如

import numpy as np
import multiprocessing as mp

unsharedData = np.zeros((10,))
sharedData = mp.Array('d', unsharedData)

这似乎工作正常。但是,我还没有看到使用多维数组完成此操作的示例。

我尝试将多维数组粘贴到mp.Array 中,这给了我TypeError: only size-1 arrays can be converted to Python scalars

unsharedData2 = np.zeros((10,10))
sharedData2 = mp.Array('d', unsharedData2)## Gives TypeError

我可以展平阵列,但如果可以避免,我宁愿不这样做。

是否有一些技巧可以让 multiprocessing Array 处理多维数据?

【问题讨论】:

近期相关问题***.com/q/50235377/901925 【参考方案1】:

虽然已经给出了多处理的答案,但存在使用ray 的替代方案,这是一个替代的多处理框架。

使用ray,您可以使用obj_ref = ray.put(obj) 将任何对象放入只读共享内存中。好消息是 ray 内置了对从共享内存中零拷贝检索 numpy 数组的支持。

使用共享内存的光线实现会有一点开销,但考虑到数组如此之大,这可能不会成为问题。

import numpy as np
import ray

@ray.remote
def function(arr, num: int):
    # array is automatically retrieved if a reference is passed to
    # a remote function, you could do this manually with ray.get(ref)
    return arr.mean() + num

if __name__ == '__main__':
    ray.init()
    # generate array and place into shared memory, return reference
    array_ref = ray.put(np.random.randn(55, 117, 256, 256))
    # multiple processes operating on shared array
    results = ray.get([function.remote(array_ref, i) for i in range(8)])
    print(results)

【讨论】:

【参考方案2】:

您可以使用与Array 关联的get_obj() 方法在共享相同内存的每个进程中创建一个新的多维numpy 数组,该方法返回呈现缓冲区接口的ctypes 数组。

请看下面的例子:

import ctypes as c
import numpy as np
import multiprocessing as mp


unsharedData2 = np.zeros((10, 10))
n, m = unsharedData2.shape[0], unsharedData2.shape[1]


def f1(mp_arr):
    #in each new process create a new numpy array as follows:
    arr = np.frombuffer(mp_arr.get_obj())
    b = arr.reshape((n, m))# mp_arr arr and b share the same memory
    b[2][1] = 3


def f2(mp_arr):
    #in each new process create a new numpy array as follows:
    arr = np.frombuffer(mp_arr.get_obj())
    b = arr.reshape((n, m)) # mp_arr arr and b share the same memory
    b[1][1] = 2


if __name__ == '__main__':
    mp_arr = mp.Array(c.c_double, n*m)
    p = mp.Process(target=f1, args=(mp_arr,))
    q = mp.Process(target=f2, args=(mp_arr,))
    p.start()
    q.start()
    p.join()
    q.join()
    arr = np.frombuffer(mp_arr.get_obj())
    b = arr.reshape((10, 10))
    print(b)
    '''
    [[0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
     [0. 2. 0. 0. 0. 0. 0. 0. 0. 0.]
     [0. 3. 0. 0. 0. 0. 0. 0. 0. 0.]
     [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
     [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
     [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
     [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
     [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
     [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
     [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]]
    '''

【讨论】:

【参考方案3】:

您可以使用 np.reshape((-1,))np.ravel 而不是 np.flatten 来制作数组的一维 view,而无需进行 flatten 所做的不必要复制:

import numpy as np
import multiprocessing as mp

unsharedData2 = np.zeros((10, 10))
ravel_copy = np.ravel(unsharedData2)
reshape_copy2 = unsharedData2.reshape((-1,))
ravel_copy[11] = 1.0       # -> saves 1.0 in unsharedData2 at [1, 1]
reshape_copy2[22] = 2.0    # -> saves 2.0 in unsharedData2 at [2, 2]
sharedData2 = mp.Array('d', ravel_copy)
sharedData2 = mp.Array('d', reshape_copy2)

【讨论】:

以上是关于多处理 - 具有多维 numpy 数组的共享内存的主要内容,如果未能解决你的问题,请参考以下文章

在 python 多处理中传递共享内存变量

python之numpy多维数组

多处理中的共享内存对象

包含具有可变形状的多维 numpy 数组的 numpy 数组

在多处理进程之间共享大型只读 Numpy 数组

20200111(Numpy)