如何在 python 中并行化以下代码片段?

Posted

技术标签:

【中文标题】如何在 python 中并行化以下代码片段?【英文标题】:How can I parallelize the following snippet of code in python? 【发布时间】:2021-05-16 00:07:41 【问题描述】:

我有一堆仅按行执行的矩阵乘法运算。我想知道如何通过并行化加速计算:

data = np.random.randint(1, 100, (100000, 800))
indices_1 = np.equal(data, 1)
A = np.zeros((100000, 100))
B = np.random.randn(800, 100)

for i in range(100000):
   ones = indices_1[i]
   not_ones = ~indices_1[i]
   B_ones = B[ones]
   B_not_ones = B[not_ones]
   A[i] = (data[i][not_ones] @ B_not_ones) @ np.linalg.inv(B_not_ones.T @ B_not_ones)  
   data[i][ones] = A[i] @ B_ones.T
    

我尝试了多处理器,但由于某种原因,它的性能并不比顺序处理器好。这是我的多处理器实现:

from multiprocessing.pool import ThreadPool, Pool
pool = ThreadPool() # can also use Pool

def f(i):
   ones = indices_1[i]
   not_ones = ~indices_1[i]
   B_ones = B[ones]
   B_not_ones = B[not_ones]
   A[i] = (data[i][not_ones] @ B_not_ones) @ np.linalg.inv(B_not_ones.T @ B_not_ones)  
   data[i][ones] = A[i] @ B_ones.T


pool.map(f, range(100000))

两者的运行时间相同(大约 32 秒)。 concurrent.futures 等其他并行化方法并没有提高运行时间(如下所示):

with concurrent.futures.ThreadPoolExecutor() as executor:
     result = executor.map(f, range(100000))

我也尝试申请dask,但无法让他们的框架在我的情况下工作。任何帮助都感激不尽!谢谢!

【问题讨论】:

多处理必须在进程之间复制您的数据。它不适合处理单个大块数据。 GPU 是否适合您?在 Google colab 上用 GPU 版本尝试过你的代码? 我会完全删除 for 循环,让 numpy 处理矩阵运算。 @ZeelBharatkumarPatel1931006 我刚刚在 Google colab 上尝试过使用 GPU,两者的运行时间都下降到了 28,但多处理器并没有改善运行时间。 你必须使用多线程模块,因为在多处理中每个进程工作者都有自己的内存并且你不会得到你想要的结果,你可以使用 cocurrent.futures.ThreadPoolExecutor 【参考方案1】:
import numpy as np
import multiprocessing as mp


data = list(np.random.randint(1, 100, (100000, 800)))
indices_1 = np.equal(data, 1)
A = list(np.zeros((100000, 100)))
B = np.random.randn(800, 100)


def f(data, A, i):
    ones = indices_1[i]
    not_ones = ~indices_1[i]
    B_ones = B[ones]
    B_not_ones = B[not_ones]
    A[i] = (data[i][not_ones] @ B_not_ones) @ np.linalg.inv(B_not_ones.T @ B_not_ones)
    data[i][ones] = A[i] @ B_ones.T

with mp.Manager() as manager:
    data_global = manager.list(data)
    A_global = manager.list(A)

    with mp.Pool() as p:
        results = [ p.apply_async(f, (data_global, A_global, i,)) for i in range(100000) ]
        for i in results:
            i.wait()

    data_global = list(data_global)
    A_global = list(A_global)

【讨论】:

感谢您提供代码,但仍然没有运气:( 我尝试将 max_workers 从 2 更改为 cpu_count()*4,但它们的运行时间都比顺序对应的要长。跨度> 好的,让我在本地运行你的代码,代码中的@符号是什么? 相当于 np.matmul() (我在 NumPy 文档中读过) Python 中的多线程受 Gil 限制。虽然它可以提供性能改进,但它并不能立即等同于 n 次并行执行。 我更新了代码,使用 4 核 CPU 的速度提高了大约 6-7 倍,您能否验证输出与顺序输出相同

以上是关于如何在 python 中并行化以下代码片段?的主要内容,如果未能解决你的问题,请参考以下文章

在 Python 多处理进程中运行较慢的 OpenCV 代码片段

在 Python 中通过线程/核心/节点并行化 for 循环

如何在 Python 3.7 中快速并行化函数?

如何并行化一个简单的 Python 循环?

如何在 fasta 文件中并行化计算,其中每个处理器采用一个序列

如何在 python 的类中使用光线并行性?