如何使用 Python 多处理 Pool.map 在 for 循环中填充 numpy 数组

Posted

技术标签:

【中文标题】如何使用 Python 多处理 Pool.map 在 for 循环中填充 numpy 数组【英文标题】:How to use Python multiprocessing Pool.map to fill numpy array in a for loop 【发布时间】:2014-11-11 08:28:28 【问题描述】:

我想在 for 循环中填充一个 2D-numpy 数组,并通过使用多处理来加快计算。

import numpy
from multiprocessing import Pool


array_2D = numpy.zeros((20,10))
pool = Pool(processes = 4)

def fill_array(start_val):
    return range(start_val,start_val+10)

list_start_vals = range(40,60)
for line in xrange(20):
    array_2D[line,:] = pool.map(fill_array,list_start_vals)
pool.close()

print array_2D

执行它的效果是 Python 运行了 4 个子进程并占用了 4 个 CPU 核心,但是执行没有完成,并且没有打印数组。如果我尝试将数组写入磁盘,则没有任何反应。

谁能告诉我为什么?

【问题讨论】:

你还记得你是如何运行这段代码的吗?在命令行、jupyter 还是脚本中? 【参考方案1】:

问题是由于在for循环中运行pool.map,map()方法的结果在功能上等同于内置的map(),除了个别任务是并行运行的。 因此,在您的情况下, pool.map(fill_array,list_start_vals) 将被调用 20 次,并开始为 for 循环的每次迭代并行运行,下面的代码应该可以工作

代码:

#!/usr/bin/python

import numpy
from multiprocessing import Pool

def fill_array(start_val):
    return range(start_val,start_val+10)

if __name__ == "__main__":
    array_2D = numpy.zeros((20,10))
    pool = Pool(processes = 4)    
    list_start_vals = range(40,60)

    # running the pool.map in a for loop is wrong
    #for line in xrange(20):
    #    array_2D[line,:] = pool.map(fill_array,list_start_vals)

    # get the result of pool.map (list of values returned by fill_array)
    # in a pool_result list 
    pool_result = pool.map(fill_array,list_start_vals)

    # the pool is processing its inputs in parallel, close() and join() 
    #can be used to synchronize the main process 
    #with the task processes to ensure proper cleanup.
    pool.close()
    pool.join()

    # Now assign the pool_result to your numpy
    for line,result in enumerate(pool_result):
        array_2D[line,:] = result

    print array_2D

【讨论】:

感谢您的回复。不幸的是,效果是一样的。 Python 启动子进程并占用 PC,但没有任何反应。我在 Windows 7 机器上运行代码(双核 CPU,超线程 => 几乎是四核),Python 2.7.5 32 位,我使用 SpyderLib 作为编程接口。 @MoTSCHIGGE 我运行了我在 Windows 环境中发布的代码,它似乎正在运行,我认为您正在运行没有 if "main"==__name__ 的代码: , 如果是这种情况,代码将在 windows 中无限期运行,请参考 Stack Overflow 链接,了解 windows 中 if 条件的重要性***.com/questions/20222534/… 我只是尝试运行上面的示例代码,包括 "if name == "main": " 但没有任何反应。我不知道这里出了什么问题..【参考方案2】:

以下作品。首先,将代码的主要部分保护在主块内是一个好主意,以避免奇怪的副作用。 poo.map() 的结果是一个列表,其中包含迭代器 list_start_vals 中每个值的评估,因此您不必在之前创建 array_2D

import numpy as np
from multiprocessing import Pool

def fill_array(start_val):
    return list(range(start_val, start_val+10))

if __name__=='__main__':
    pool = Pool(processes=4)
    list_start_vals = range(40, 60)
    array_2D = np.array(pool.map(fill_array, list_start_vals))
    pool.close() # ATTENTION HERE
    print array_2D

也许您在使用pool.close() 时会遇到问题,您可以从@hpaulj 的cmets 中删除此行以防遇到问题...

【讨论】:

对于较大的数组,我收到错误 Exception RuntimeError: RuntimeError('cannot join current thread',) in <Finalize object, dead> ignoredapply_async 没有给出这个警告。 没有pool.close() 命令,我看不到Error @hpaulj 感谢您的反馈...我尝试生成一个数组 10000 X 10000 没有问题,将 60 更改为 10040 并将 10 更改为 10000... 可能是机器大小和速度的问题。我的比较老了。 在进一步测试中,如果映射太慢,pool.join() 似乎更重要。【参考方案3】:

如果还想使用数组填充,可以使用pool.apply_async 代替pool.map。根据 Saullo 的回答:

import numpy as np
from multiprocessing import Pool

def fill_array(start_val):
    return range(start_val, start_val+10)

if __name__=='__main__':
    pool = Pool(processes=4)
    list_start_vals = range(40, 60)
    array_2D = np.zeros((20,10))
    for line, val in enumerate(list_start_vals):
        result = pool.apply_async(fill_array, [val])
        array_2D[line,:] = result.get()
    pool.close()
    print array_2D

这比map 运行得慢一点。但它不会像我测试地图版本那样产生运行时错误:Exception RuntimeError: RuntimeError('cannot join current thread',) in <Finalize object, dead> ignored

【讨论】:

以上是关于如何使用 Python 多处理 Pool.map 在 for 循环中填充 numpy 数组的主要内容,如果未能解决你的问题,请参考以下文章

在 Python 多处理中将 Pool.map 与共享内存数组结合起来

CHUNKSIZE无关的多处理在Python / pool.map?

Python - 多进程map的使用方法

在集成 Python 的多处理中使用 Pool.map 时,程序运行速度越来越慢

多处理:如何在类中定义的函数上使用 Pool.map?

python multiprocessing pool.map() 等到方法完成