如何使用 Python 多处理 Pool.map 在 for 循环中填充 numpy 数组
Posted
技术标签:
【中文标题】如何使用 Python 多处理 Pool.map 在 for 循环中填充 numpy 数组【英文标题】:How to use Python multiprocessing Pool.map to fill numpy array in a for loop 【发布时间】:2014-11-11 08:28:28 【问题描述】:我想在 for 循环中填充一个 2D-numpy 数组,并通过使用多处理来加快计算。
import numpy
from multiprocessing import Pool
array_2D = numpy.zeros((20,10))
pool = Pool(processes = 4)
def fill_array(start_val):
return range(start_val,start_val+10)
list_start_vals = range(40,60)
for line in xrange(20):
array_2D[line,:] = pool.map(fill_array,list_start_vals)
pool.close()
print array_2D
执行它的效果是 Python 运行了 4 个子进程并占用了 4 个 CPU 核心,但是执行没有完成,并且没有打印数组。如果我尝试将数组写入磁盘,则没有任何反应。
谁能告诉我为什么?
【问题讨论】:
你还记得你是如何运行这段代码的吗?在命令行、jupyter 还是脚本中? 【参考方案1】:问题是由于在for循环中运行pool.map
,map()方法的结果在功能上等同于内置的map(),除了个别任务是并行运行的。
因此,在您的情况下, pool.map(fill_array,list_start_vals) 将被调用 20 次,并开始为 for 循环的每次迭代并行运行,下面的代码应该可以工作
代码:
#!/usr/bin/python
import numpy
from multiprocessing import Pool
def fill_array(start_val):
return range(start_val,start_val+10)
if __name__ == "__main__":
array_2D = numpy.zeros((20,10))
pool = Pool(processes = 4)
list_start_vals = range(40,60)
# running the pool.map in a for loop is wrong
#for line in xrange(20):
# array_2D[line,:] = pool.map(fill_array,list_start_vals)
# get the result of pool.map (list of values returned by fill_array)
# in a pool_result list
pool_result = pool.map(fill_array,list_start_vals)
# the pool is processing its inputs in parallel, close() and join()
#can be used to synchronize the main process
#with the task processes to ensure proper cleanup.
pool.close()
pool.join()
# Now assign the pool_result to your numpy
for line,result in enumerate(pool_result):
array_2D[line,:] = result
print array_2D
【讨论】:
感谢您的回复。不幸的是,效果是一样的。 Python 启动子进程并占用 PC,但没有任何反应。我在 Windows 7 机器上运行代码(双核 CPU,超线程 => 几乎是四核),Python 2.7.5 32 位,我使用 SpyderLib 作为编程接口。 @MoTSCHIGGE 我运行了我在 Windows 环境中发布的代码,它似乎正在运行,我认为您正在运行没有 if "main"==__name__ 的代码: , 如果是这种情况,代码将在 windows 中无限期运行,请参考 Stack Overflow 链接,了解 windows 中 if 条件的重要性***.com/questions/20222534/… 我只是尝试运行上面的示例代码,包括 "if name == "main": " 但没有任何反应。我不知道这里出了什么问题..【参考方案2】:以下作品。首先,将代码的主要部分保护在主块内是一个好主意,以避免奇怪的副作用。 poo.map()
的结果是一个列表,其中包含迭代器 list_start_vals
中每个值的评估,因此您不必在之前创建 array_2D
。
import numpy as np
from multiprocessing import Pool
def fill_array(start_val):
return list(range(start_val, start_val+10))
if __name__=='__main__':
pool = Pool(processes=4)
list_start_vals = range(40, 60)
array_2D = np.array(pool.map(fill_array, list_start_vals))
pool.close() # ATTENTION HERE
print array_2D
也许您在使用pool.close()
时会遇到问题,您可以从@hpaulj 的cmets 中删除此行以防遇到问题...
【讨论】:
对于较大的数组,我收到错误Exception RuntimeError: RuntimeError('cannot join current thread',) in <Finalize object, dead> ignored
。 apply_async
没有给出这个警告。
没有pool.close()
命令,我看不到Error
。
@hpaulj 感谢您的反馈...我尝试生成一个数组 10000 X 10000
没有问题,将 60 更改为 10040 并将 10 更改为 10000...
可能是机器大小和速度的问题。我的比较老了。
在进一步测试中,如果映射太慢,pool.join()
似乎更重要。【参考方案3】:
如果还想使用数组填充,可以使用pool.apply_async
代替pool.map
。根据 Saullo 的回答:
import numpy as np
from multiprocessing import Pool
def fill_array(start_val):
return range(start_val, start_val+10)
if __name__=='__main__':
pool = Pool(processes=4)
list_start_vals = range(40, 60)
array_2D = np.zeros((20,10))
for line, val in enumerate(list_start_vals):
result = pool.apply_async(fill_array, [val])
array_2D[line,:] = result.get()
pool.close()
print array_2D
这比map
运行得慢一点。但它不会像我测试地图版本那样产生运行时错误:Exception RuntimeError: RuntimeError('cannot join current thread',) in <Finalize object, dead> ignored
【讨论】:
以上是关于如何使用 Python 多处理 Pool.map 在 for 循环中填充 numpy 数组的主要内容,如果未能解决你的问题,请参考以下文章
在 Python 多处理中将 Pool.map 与共享内存数组结合起来
CHUNKSIZE无关的多处理在Python / pool.map?