在 Python 中通过线程/核心/节点并行化 for 循环
Posted
技术标签:
【中文标题】在 Python 中通过线程/核心/节点并行化 for 循环【英文标题】:Parallelize for-loop in Python over threads/cores/nodes 【发布时间】:2019-12-17 14:33:09 【问题描述】:我打算在 Python 中并行化一个 for 循环,如下所示处理大型数据数组。线程/核心/节点上的并行化如何适合此代码,以及如何实现它?任何建议表示赞赏。谢谢!
所有输入都是具有以下典型大小的 NumPy 数组:
vector_data (int64): 1M x 3
matrix (float64): 0.1M x 0.1M x 3
根据帖子的回答进行编辑:
运行时性能测试表明multiprocessing
会导致显着减速,并且内存要求更高。
from timeit import timeit
from multiprocessing import Pool
import numpy as np
from numba import jit
def OP():
N = len(matrix_data)
pop_array = np.zeros((N, N))
for vector in vector_data:
vector_2 = np.dot(vector, vector)
pop_array += (np.exp(-vector_2) / vector_2
* np.cos(np.tensordot(matrix_data, vector, axes=([2], [0]))))
return pop_array
def worker(vector):
vector_2 = np.dot(vector, vector)
return (np.exp(-vector_2) / vector_2
* np.cos(np.tensordot(matrix_data, vector, axes=([2], [0]))))
def f1():
N = len(matrix_data)
pop_array = np.zeros((N, N))
with Pool() as pool:
results = pool.map(worker, vector_data)
for res in results:
pop_array += res
return pop_array
def f2():
N = len(matrix_data)
pop_array = np.zeros((N, N))
with Pool() as pool:
for result in pool.imap(worker, vector_data):
pop_array += result
return pop_array
jit(parallel=True)
def f3():
N = len(matrix_data)
pop_array = np.zeros((N, N))
for vector in vector_data:
vector_2 = np.dot(vector, vector)
pop_array += (np.exp(-vector_2) / vector_2
* np.cos(np.tensordot(matrix_data, vector, axes=([2], [0]))))
return pop_array
max_vector_index = 150
vector_size = int(1E3)
matrix_size = int(1E2)
vector_shape = vector_size, 3
matrix_shape = matrix_size, matrix_size, 3
vector_data = np.random.randint(-max_vector_index, max_vector_index+1, vector_shape)
matrix_data = np.random.random(matrix_shape)
print(f'OP: timeit(OP, number=10):.3e sec')
print(f'f1: timeit(f1, number=10):.3e sec')
print(f'f2: timeit(f2, number=10):.3e sec')
print(f'f3: timeit(f3, number=10):.3e sec')
以下是样本运行的运行时间成本:
vector_size = int(1E2)
matrix_size = int(1E1)
OP: 9.527e-02 sec
f1: 2.402e+00 sec (25.21x)
f2: 2.269e+00 sec (23.82x)
f3: 3.414e-02 sec (0.36x)
OP: 43.0 MiB
f1: 41.9 MiB (0.97x)
f2: 41.9 MiB (0.97x)
vector_size = int(1E3)
matrix_size = int(1E2)
OP: 1.420e+00 sec
f1: 1.448e+01 sec (10.20x)
f2: 2.051e+01 sec (14.44x)
f3: 1.213e+00 sec (0.86x)
OP: 43.4 MiB
f1: 119.0 MiB (2.74x)
f2: 43.8 MiB (1x)
vector_size = int(1E4)
matrix_size = int(1E3)
OP: 5.116e+02 sec
f1: 8.902e+02 sec (1.74x)
f2: 6.509e+02 sec (1.27x)
OP: 73.9 MiB
f1: 76402.1 MiB (1033x)
f2: 209.7 MiB (2.84x)
【问题讨论】:
numba
provides some support for parallelization
您似乎在重复和重复与 2017 年及更早的相同类型的问题,没有花费任何精力来有效地解决这一整类性能问题,而是通过再次询问 - 或者我们错过了一些研究和测试调整以避免已知的for循环性能不佳? --- 详情参考:***.com/questions/44888667/…
numpy 函数确实已经并行运行。您如何期望进一步的并行化更有效?你会期望额外的开销,对吧?
你可以试试ray,它是分布式计算的,也可以在单机上运行
【参考方案1】:
您可以使用multiprocessing
Pool
。然后您可以使用map
方法在可迭代对象上运行函数。因此,您可以首先创建要传递给工作人员的函数,以处理可迭代对象中的每个元素:
def worker(vector):
vector_2 = np.dot(vector, vector)
return (np.exp(-vector_2) / vector_2
* np.cos(np.tensordot(matrix, vector, axes=([2], [0]))))
现在您可以创建Pool
以在每个向量上运行此函数。它将返回结果列表,然后我们可以将这些结果添加到pop_array
。像这样:
from multiprocessing import Pool
def par_fun(vector_data, matrix):
N = len(matrixA)
pop_array = np.zeros((N, N))
with Pool() as pool:
results = pool.map(worker, vector_data)
for res in results:
pop_array += res
return pop_array
另一种可能更简洁的方法是使用imap
。来自文档:
请注意,对于非常长的迭代,它可能会导致高内存使用。 考虑使用带有显式 chunksize 的 imap() 或 imap_unordered() 提高效率的选择。
还有:
chunksize 参数与map() 使用的参数相同 方法。对于非常长的迭代,使用较大的 chunksize 值可以 比使用默认值
1
更快地完成作业大大。
所以你可以使用这个代码:
def par_fun(vector_data, matrix):
N = len(matrixA)
pop_array = np.zeros((N, N))
pool_size = None
chunksize = 1
with Pool(pool_size) as pool:
for result in pool.imap(worker, vector_data, chunksize=chunksize):
pop_array += result
return pop_array
并使用不同的 pool_size
和 chunksize
值来获得最佳结果。
另一种选择是使用线程而不是进程。进程具有可能影响运行时的创建和维护开销。要将代码更改为使用线程,只需将导入更改为使用 dummy
包装器:
from multiprocessing.dummy import Pool
其余代码保持不变
【讨论】:
您是否介意分别对设置成本进行基准测试(时间 [us],空间 [GB])和分别 [us] 中的实际计算部分持续时间? 换句话说,发布关于您“支付”多少 [us] 时间以使流程“运行”并计算超过 1E3 的结果的基准硬事实, 1E6、1E9 [B] 大小的数据?如果没有这部分故事,承诺作为建议是不公平的,是吗?如果没有这些隐藏的(有时是残酷的)成本,那么讲完整的故事是公平的,不是吗? @user3666197 你如何测试内存成本? @Viswanath 通过分析 - 这是一个明显的工具/使用外部监控系统报告的与进程相关的分配更糟糕。 @user3666197 谢谢。我已经分析了内存要求的代码,结果发现multiprocessing
没有帮助。欢迎提出任何建议。
@Viswanath 好的,已经分析了代码... 结果是..... ... [us] ... [GB] for pure -serial run 和 ... [us] ... [GB] 分别用于 multiprocessing.Pool()-sizing 运行的大小?定量结果很重要。以上是关于在 Python 中通过线程/核心/节点并行化 for 循环的主要内容,如果未能解决你的问题,请参考以下文章
MPI + 线程并行化与仅 MPI 相比有啥优势(如果有)?
一行 Python 实现并行化 -- 日常多线程操作的新思路