在 Python 中通过线程/核心/节点并行化 for 循环

Posted

技术标签:

【中文标题】在 Python 中通过线程/核心/节点并行化 for 循环【英文标题】:Parallelize for-loop in Python over threads/cores/nodes 【发布时间】:2019-12-17 14:33:09 【问题描述】:

我打算在 Python 中并行化一个 for 循环,如下所示处理大型数据数组。线程/核心/节点上的并行化如何适合此代码,以及如何实现它?任何建议表示赞赏。谢谢!

所有输入都是具有以下典型大小的 NumPy 数组:

vector_data (int64): 1M x 3
matrix (float64): 0.1M x 0.1M x 3

根据帖子的回答进行编辑:

运行时性能测试表明multiprocessing 会导致显着减速,并且内存要求更高。

from timeit import timeit
from multiprocessing import Pool

import numpy as np
from numba import jit

def OP():
    N = len(matrix_data)
    pop_array = np.zeros((N, N))
    for vector in vector_data:
        vector_2 = np.dot(vector, vector)
        pop_array += (np.exp(-vector_2) / vector_2
                      * np.cos(np.tensordot(matrix_data, vector, axes=([2], [0]))))
    return pop_array

def worker(vector):
    vector_2 = np.dot(vector, vector)
    return (np.exp(-vector_2) / vector_2
            * np.cos(np.tensordot(matrix_data, vector, axes=([2], [0]))))

def f1():
    N = len(matrix_data)
    pop_array = np.zeros((N, N))

    with Pool() as pool:
        results = pool.map(worker, vector_data)

    for res in results:
        pop_array += res

    return pop_array

def f2():
    N = len(matrix_data)
    pop_array = np.zeros((N, N))

    with Pool() as pool:
        for result in pool.imap(worker, vector_data):
            pop_array += result

    return pop_array

jit(parallel=True)
def f3():
    N = len(matrix_data)
    pop_array = np.zeros((N, N)) 
    for vector in vector_data:
        vector_2 = np.dot(vector, vector)
        pop_array += (np.exp(-vector_2) / vector_2
                      * np.cos(np.tensordot(matrix_data, vector, axes=([2], [0]))))
    return pop_array

max_vector_index = 150
vector_size = int(1E3)
matrix_size = int(1E2)

vector_shape = vector_size, 3
matrix_shape = matrix_size, matrix_size, 3

vector_data = np.random.randint(-max_vector_index, max_vector_index+1, vector_shape)
matrix_data = np.random.random(matrix_shape)

print(f'OP: timeit(OP, number=10):.3e sec')
print(f'f1: timeit(f1, number=10):.3e sec')
print(f'f2: timeit(f2, number=10):.3e sec')
print(f'f3: timeit(f3, number=10):.3e sec')

以下是样本运行的运行时间成本:

vector_size = int(1E2)
matrix_size = int(1E1)

OP: 9.527e-02 sec
f1: 2.402e+00 sec (25.21x)
f2: 2.269e+00 sec (23.82x)
f3: 3.414e-02 sec (0.36x)

OP: 43.0 MiB
f1: 41.9 MiB (0.97x)
f2: 41.9 MiB (0.97x)
vector_size = int(1E3)
matrix_size = int(1E2)

OP: 1.420e+00 sec
f1: 1.448e+01 sec (10.20x)
f2: 2.051e+01 sec (14.44x)
f3: 1.213e+00 sec (0.86x)

OP: 43.4 MiB
f1: 119.0 MiB (2.74x)
f2: 43.8 MiB (1x)
vector_size = int(1E4)
matrix_size = int(1E3)

OP: 5.116e+02 sec
f1: 8.902e+02 sec (1.74x)
f2: 6.509e+02 sec (1.27x)

OP: 73.9 MiB
f1: 76402.1 MiB (1033x)
f2: 209.7 MiB (2.84x)

【问题讨论】:

numba provides some support for parallelization 您似乎在重复和重复与 2017 年及更早的相同类型的问题,没有花费任何精力来有效地解决这一整类性能问题,而是通过再次询问 - 或者我们错过了一些研究和测试调整以避免已知的for循环性能不佳? --- 详情参考:***.com/questions/44888667/… numpy 函数确实已经并行运行。您如何期望进一步的并行化更有效?你会期望额外的开销,对吧? 你可以试试ray,它是分布式计算的,也可以在单机上运行 【参考方案1】:

您可以使用multiprocessing Pool。然后您可以使用map 方法在可迭代对象上运行函数。因此,您可以首先创建要传递给工作人员的函数,以处理可迭代对象中的每个元素:

def worker(vector):
    vector_2 = np.dot(vector, vector)
    return (np.exp(-vector_2) / vector_2
            * np.cos(np.tensordot(matrix, vector, axes=([2], [0]))))

现在您可以创建Pool 以在每个向量上运行此函数。它将返回结果列表,然后我们可以将这些结果添加到pop_array。像这样:

from multiprocessing import Pool

def par_fun(vector_data, matrix):
    N = len(matrixA)
    pop_array = np.zeros((N, N))

    with Pool() as pool:
        results = pool.map(worker, vector_data)

    for res in results:
        pop_array += res

    return pop_array

另一种可能更简洁的方法是使用imap。来自文档:

请注意,对于非常长的迭代,它可能会导致高内存使用。 考虑使用带有显式 chunksize 的 imap() 或 imap_unordered() 提高效率的选择。

还有:

chunksize 参数与map() 使用的参数相同 方法。对于非常长的迭代,使用较大的 chunksize 值可以 比使用默认值1 更快地完成作业大大

所以你可以使用这个代码:

def par_fun(vector_data, matrix):
    N = len(matrixA)
    pop_array = np.zeros((N, N))

    pool_size = None
    chunksize = 1

    with Pool(pool_size) as pool:
        for result in pool.imap(worker, vector_data, chunksize=chunksize):
            pop_array += result

    return pop_array

并使用不同的 pool_sizechunksize 值来获得最佳结果。


另一种选择是使用线程而不是进程。进程具有可能影响运行时的创建和维护开销。要将代码更改为使用线程,只需将导入更改为使用 dummy 包装器:

from multiprocessing.dummy import Pool

其余代码保持不变

【讨论】:

您是否介意分别对设置成本进行基准测试(时间 [us],空间 [GB])分别 [us] 中的实际计算部分持续时间? 换句话说,发布关于您“支付”多少 [us] 时间以使流程“运行”并计算超过 1E3 的结果的基准硬事实, 1E6、1E9 [B] 大小的数据?如果没有这部分故事,承诺作为建议是不公平的,是吗?如果没有这些隐藏的(有时是残酷的)成本,那么讲完整的故事是公平的,不是吗? @user3666197 你如何测试内存成本? @Viswanath 通过分析 - 这是一个明显的工具/使用外部监控系统报告的与进程相关的分配更糟糕。 @user3666197 谢谢。我已经分析了内存要求的代码,结果发现multiprocessing 没有帮助。欢迎提出任何建议。 @Viswanath 好的,已经分析了代码... 结果是..... ... [us] ... [GB] for pure -serial run 和 ... [us] ... [GB] 分别用于 multiprocessing.Pool()-sizing 运行的大小?定量结果很重要。

以上是关于在 Python 中通过线程/核心/节点并行化 for 循环的主要内容,如果未能解决你的问题,请参考以下文章

MPI + 线程并行化与仅 MPI 相比有啥优势(如果有)?

实际案例:在现有代码中通过async/await实现并行

Java编程思想学习(十六) 并发编程

一行 Python 实现并行化 -- 日常多线程操作的新思路

qt中通过重写run方法创建线程与通过movetothread方法有啥区别

如何在 Python 中通过 apply_async() 传递 gurobipy.Model 变量?