具有大型数组的 Windows 上的 Python 多处理

Posted

技术标签:

【中文标题】具有大型数组的 Windows 上的 Python 多处理【英文标题】:Python multiprocessing on windows with large arrays 【发布时间】:2014-01-15 14:23:20 【问题描述】:

我使用python的多处理模块在linux平台上编写了一个脚本。当我尝试在 Windows 上运行该程序时,这并不能直接运行,我发现这与在 Windows 上如何生成子进程有关。可以腌制使用的对象似乎很重要。

我的主要问题是,我使用的是大型 numpy 数组。似乎有了一定的尺寸,它们就不再可以挑选了。要将其分解为一个简单的脚本,我想做这样的事情:

### Import modules

import numpy as np
import multiprocessing as mp

number_of_processes = 4

if __name__ == '__main__':

    def reverse_np_array(arr):
        arr = arr + 1
        return arr

    a = np.ndarray((200,1024,1280),dtype=np.uint16)

    def put_into_queue(_Queue,arr):
        _Queue.put(reverse_np_array(arr))


    Queue_list = []
    Process_list = []
    list_of_arrays = []

    for i in range(number_of_processes):
        Queue_list.append(mp.Queue())


    for i in range(number_of_processes):
        Process_list.append(mp.Process(target=put_into_queue, args=(Queue_list[i],a)))

    for i in range(number_of_processes):
        Process_list[i].start()

    for i in range(number_of_processes):
        list_of_arrays.append(Queue_list[i].get())

    for i in range(number_of_processes):
        Process_list[i].join()

我收到以下错误消息:

Traceback (most recent call last):
  File "Windows_multi.py", line 34, in <module>
    Process_list[i].start()
  File "C:\Program Files\Anaconda32\lib\multiprocessing\process.py", line 130, i
n start
    self._popen = Popen(self)
  File "C:\Program Files\Anaconda32\lib\multiprocessing\forking.py", line 277, i
n __init__
    dump(process_obj, to_child, HIGHEST_PROTOCOL)
  File "C:\Program Files\Anaconda32\lib\multiprocessing\forking.py", line 199, i
n dump
    ForkingPickler(file, protocol).dump(obj)
  File "C:\Program Files\Anaconda32\lib\pickle.py", line 224, in dump
    self.save(obj)
  File "C:\Program Files\Anaconda32\lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Program Files\Anaconda32\lib\pickle.py", line 419, in save_reduce
    save(state)
  File "C:\Program Files\Anaconda32\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Program Files\Anaconda32\lib\pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())

所以我基本上是在创建一个大型数组,我需要在所有进程中使用该数组进行计算并返回它。

一个重要的事情似乎是在声明if __name__ = '__main__':之前编写函数的定义

如果我将数组减少到 (50,1024,1280),整个事情就可以工作了。 但是,即使启动了 4 个进程并且 4 个内核正在工作,它也比仅针对一个内核(在 Windows 上)编写没有多处理的代码要慢。所以我想我这里还有一个问题。

后面我的真实程序中的函数在一个cython模块中。

我正在使用带有 python 32 位的 anaconda 包,因为我无法使用 64 位版本编译我的 cython 包(我将在不同的线程中询问)。

欢迎任何帮助!

谢谢! 菲利普

更新:

我犯的第一个错误是在__main__ 中有一个“put_into_queue”函数定义。

然后我按照建议引入了共享数组,但是,它使用了大量内存,并且使用的内存随着我使用的进程而扩展(当然不应该是这种情况)。 有什么想法我在这里做错了吗?我将共享数组的定义放在哪里(__main__ 内部或外部)似乎并不重要,但我认为它应该在__main__ 中。从这篇文章中得到这个:Is shared readonly data copied to different processes for Python multiprocessing?

import numpy as np
import multiprocessing as mp
import ctypes


shared_array_base = mp.Array(ctypes.c_uint, 1280*1024*20)
shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
#print shared_array
shared_array = shared_array.reshape(20,1024,1280)

number_of_processes = 4

def put_into_queue(_Queue,arr):
    _Queue.put(reverse_np_array(arr))
def reverse_np_array(arr):
    arr = arr + 1 + np.random.rand()
    return arr
if __name__ == '__main__':


    #print shared_arra

    #a = np.ndarray((50,1024,1280),dtype=np.uint16)


    Queue_list = []
    Process_list = []
    list_of_arrays = []

    for i in range(number_of_processes):
        Queue_list.append(mp.Queue())


    for i in range(number_of_processes):
        Process_list.append(mp.Process(target=put_into_queue, args=(Queue_list[i],shared_array)))

    for i in range(number_of_processes):
        Process_list[i].start()

    for i in range(number_of_processes):
       list_of_arrays.append(Queue_list[i].get())

    for i in range(number_of_processes):
        Process_list[i].join()

【问题讨论】:

这个问题的答案对你有帮助吗? ***.com/a/14593135/513688 这个想法是创建父子都可以写入的共享数组,而不是使用酸洗。 您好,感谢您的回答,我尝试使用共享数组,但它不起作用,见上文。有谁知道为什么?干杯 您将共享数组放入队列中。链接的示例不这样做。从一个工作示例开始,验证它是否有效,然后进行一些小改动,直到它不再表现出您想要/期望的样子。 感谢您的提示!只是为了验证我是否正确理解了多处理和队列:如果我想拥有需要输出的并行进程,我必须使用队列,对吗?否则我无法获取数据?线程和/或队列(不是 mp.queue)模块是否更适合我的应用程序?因为我只想对数组的部分(其中“部分”等于核心数量)进行独立操作。只是认为我可能值得退后一步,检查我是否使用了正确的模块。再次感谢! 【参考方案1】:

您没有包含完整的回溯;结束是最重要的。在我的 32 位 Python 上,我得到了最终以

结尾的相同回溯
  File "C:\Python27\lib\pickle.py", line 486, in save_string
    self.write(BINSTRING + pack("<i", n) + obj)
MemoryError

MemoryError 是例外,它表示您的内存不足。

64 位 Python 可以解决这个问题,但在进程之间发送大量数据很容易成为multiprocessing 的严重瓶颈。

【讨论】:

感谢您的回复!是的,没错。但是我现在该如何解决这个问题呢?必须有一种优雅的方式来处理这个问题,我认为人们处理更大的数组。这不会是我的案例的瓶颈,因为我只发送一次数组(向前和向后)。 @Fips 一个可能的解决方案是Use numpy array in shared memory

以上是关于具有大型数组的 Windows 上的 Python 多处理的主要内容,如果未能解决你的问题,请参考以下文章

在Windows上的python 2.7中列出具有Unicode名称的文件

具有堆分配的大型数组的分段错误

在 Python 中通过线程/核心/节点并行化 for 循环

处理大型数组上的大量查询的数据结构

iOS 上的解析平台:大型多对多的关系、联接或数组?

为啥大型本地数组会使我的程序崩溃,而全局数组却不会? [复制]