Multiprocessing.Pool 使 Numpy 矩阵乘法变慢

Posted

技术标签:

【中文标题】Multiprocessing.Pool 使 Numpy 矩阵乘法变慢【英文标题】:Multiprocessing.Pool makes Numpy matrix multiplication slower 【发布时间】:2013-03-03 01:59:06 【问题描述】:

所以,我在玩 multiprocessing.PoolNumpy,但似乎我错过了一些重要的点。为什么pool 版本慢得多?我查看了htop,我可以看到创建了几个进程,但它们都共享一个 CPU,加起来约为 100%。

$ cat test_multi.py 
import numpy as np
from timeit import timeit
from multiprocessing import Pool


def mmul(matrix):
    for i in range(100):
        matrix = matrix * matrix
    return matrix

if __name__ == '__main__':
    matrices = []
    for i in range(4):
        matrices.append(np.random.random_integers(100, size=(1000, 1000)))

    pool = Pool(8)
    print timeit(lambda: map(mmul, matrices), number=20)
    print timeit(lambda: pool.map(mmul, matrices), number=20)

$ python test_multi.py 
16.0265390873
19.097837925

[更新]

更改为 timeit 用于基准测试流程 具有多个我的核心的初始化池 改变了计算,以便有更多的计算和更少的内存传输(我希望)

仍然没有变化。 pool 版本仍然较慢,我可以在 htop 中看到只使用了一个内核,并且产生了多个进程。

[更新2]

目前我正在阅读@Jan-Philip Gehrcke 建议使用multiprocessing.Process()Queue。但与此同时,我想知道:

    为什么我的示例适用于 tiago?它无法在我的机器1 上运行的原因可能是什么? 在我的示例代码中,进程之间是否有任何复制?我打算让我的代码给每个线程一个矩阵列表的矩阵。 我的代码是不是一个不好的例子,因为我使用了Numpy

我了解到,当其他人知道我的最终目标时,通常会有更好的答案:我有很多文件,这些文件是 atm 以串行方式加载和处理的。处理是 CPU 密集型的,所以我认为并行化可以获得很多。我的目标是调用并行分析文件的python函数。此外,我认为这个函数只是 C 代码的一个接口,这会有所不同。

1 Ubuntu 12.04、Python 2.7.3、i7 860 @ 2.80 - 如果您需要更多信息,请发表评论。

[更新3]

以下是 Stefano 示例代码的结果。由于某种原因,没有加速。 :/

testing with 16 matrices
base  4.27
   1  5.07
   2  4.76
   4  4.71
   8  4.78
  16  4.79
testing with 32 matrices
base  8.82
   1 10.39
   2 10.58
   4 10.73
   8  9.46
  16  9.54
testing with 64 matrices
base 17.38
   1 19.34
   2 19.62
   4 19.59
   8 19.39
  16 19.34

[更新 4] 回复Jan-Philip Gehrcke's comment

抱歉,我没有让自己更清楚。正如我在更新 2 中所写,我的主要目标是并行化第 3 方 Python 库函数的许多串行调用。这个函数是一些 C 代码的接口。有人建议我使用Pool,但这不起作用,所以我尝试了一些更简单的方法,如上图所示的numpy 示例。但在那里我也无法实现性能改进,即使它看起来“令人难以接受的可并行化”。所以我想我一定错过了一些重要的事情。这个信息就是我在这个问题和赏金中寻找的信息。

[更新 5]

感谢您的大力投入。但是阅读你的答案只会给我带来更多的问题。出于这个原因,我将阅读basics 并在我对我不知道的内容有更清晰的了解时创建新的 SO 问题。

【问题讨论】:

我猜创建进程的开销会杀死你。尝试使用timeit 模块或至少将pool = Pool() 函数移出计时程序。 我可能是错的,但我怀疑大部分时间都花在了在进程之间来回发送矩阵。 但是所有进程/线程不应该在它们自己的矩阵上工作吗?就像每个进程从列表中获取一个矩阵并使用它? 但是你必须在不同的进程之间传递它们(即复制内存)。矩阵乘法相当快(根据您的时间安排大约需要 6 毫秒),因此这种开销很大。 我改变了例子,这样计算量更大,内存传输更少。 【参考方案1】:

关于您的所有进程都在同一个 CPU 上运行这一事实,see my answer here。

在导入期间,numpy 会更改父进程的 CPU 亲和性,这样当您稍后使用Pool 时,它产生的所有工作进程最终将争夺同一个核心,而不是使用所有您机器上可用的内核。

您可以在导入numpy 后调用taskset 重置CPU 亲和性,以便使用所有内核:

import numpy as np
import os
from timeit import timeit
from multiprocessing import Pool


def mmul(matrix):
    for i in range(100):
        matrix = matrix * matrix
    return matrix

if __name__ == '__main__':

    matrices = []
    for i in range(4):
        matrices.append(np.random.random_integers(100, size=(1000, 1000)))

    print timeit(lambda: map(mmul, matrices), number=20)

    # after importing numpy, reset the CPU affinity of the parent process so
    # that it will use all cores
    os.system("taskset -p 0xff %d" % os.getpid())

    pool = Pool(8)
    print timeit(lambda: pool.map(mmul, matrices), number=20)

输出:

    $ python tmp.py                                     
    12.4765810966
    pid 29150's current affinity mask: 1
    pid 29150's new affinity mask: ff
    13.4136221409

如果您在运行此脚本时使用top 观察 CPU 使用情况,您应该会看到它在执行“并行”部分时使用了所有内核。正如其他人指出的那样,在您的原始示例中,酸洗数据、流程创建等所涉及的开销可能超过了并行化可能带来的任何好处。

编辑:我怀疑单个进程似乎始终更快的部分原因是numpy 可能有一些技巧可以加快它无法使用的逐元素矩阵乘法当作业分布在多个内核上时。

例如,如果我只使用普通的 Python 列表来计算斐波那契数列,我可以从并行化中获得巨大的加速。同样,如果我以不利用矢量化的方式进行逐元素乘法,我会在并行版本中获得类似的加速:

import numpy as np
import os
from timeit import timeit
from multiprocessing import Pool

def fib(dummy):
    n = [1,1]
    for ii in xrange(100000):
        n.append(n[-1]+n[-2])

def silly_mult(matrix):
    for row in matrix:
        for val in row:
            val * val

if __name__ == '__main__':

    dt = timeit(lambda: map(fib, xrange(10)), number=10)
    print "Fibonacci, non-parallel: %.3f" %dt

    matrices = [np.random.randn(1000,1000) for ii in xrange(10)]
    dt = timeit(lambda: map(silly_mult, matrices), number=10)
    print "Silly matrix multiplication, non-parallel: %.3f" %dt

    # after importing numpy, reset the CPU affinity of the parent process so
    # that it will use all CPUS
    os.system("taskset -p 0xff %d" % os.getpid())

    pool = Pool(8)

    dt = timeit(lambda: pool.map(fib,xrange(10)), number=10)
    print "Fibonacci, parallel: %.3f" %dt

    dt = timeit(lambda: pool.map(silly_mult, matrices), number=10)
    print "Silly matrix multiplication, parallel: %.3f" %dt

输出:

$ python tmp.py
Fibonacci, non-parallel: 32.449
Silly matrix multiplication, non-parallel: 40.084
pid 29528's current affinity mask: 1
pid 29528's new affinity mask: ff
Fibonacci, parallel: 9.462
Silly matrix multiplication, parallel: 12.163

【讨论】:

我认为这个答案的第一句话几乎就是整个答案。一切都在同一个核心上运行,因此它稍微慢一些(因为有额外的开销)而不是更快(因为没有并行化)。 其实我还是认为这更可能是与numpy的怪癖有关,而不仅仅是与CPU利用率有关。即使当我并行化 Framester 的原始代码以便它实际上利用我所有的 CPU 时,我仍然发现它比串行运行它稍微慢一些。只有当我故意避免numpy特别擅长的事情时,我才能看到并行化带来的任何性能提升。 你是对的;抱歉,我读得不够远,我刚刚开始测试我自己的琐碎/愚蠢的示例代码。没关系。 :) 为了比较,你必须展示当你离开os.system("taskset -p 0xff %d" % os.getpid())时会发生什么。 为什么?如果我不考虑那条线,那么(至少在我的机器上)只会使用一个内核,所以我当然看不到并行版本有任何加速。【参考方案2】:

通信开销和计算加速之间不可预测的竞争绝对是这里的问题。你所观察的一切都很好。您是否获得净加速取决于许多因素,并且必须正确量化(就像您所做的那样)。

那么为什么 multiprocessing 在你的情况下如此“出乎意料地慢”? multiprocessingmapmap_async 函数实际上通过连接管道的管道来回腌制 Python 对象父进程与子进程。这可能需要相当长的时间。在那段时间,子进程几乎无事可做,这就是在htop 中看到的。在不同的系统之间,可能存在相当大的管道传输性能差异,这也是为什么对于某些人来说,你的池代码比你的单 CPU 代码快,尽管对你来说不是(其他因素可能在这里起作用,这只是举例说明效果)。

你能做些什么来让它更快?

    不要在符合 POSIX 的系统上腌制输入。

    如果您使用的是 Unix,则可以通过利用 POSIX 的进程分叉行为(写入时复制内存)来解决父->子通信开销:

    全局可访问变量中创建要在父进程中处理的工作输入(例如大型矩阵列表)。然后通过自己调用multiprocessing.Process() 创建工作进程。在子项中,从全局变量中获取作业输入。简单地说,这使得子进程访问父进程的内存没有任何通信开销(*,解释如下)。通过例如将结果发送回父级multiprocessing.Queue。这将节省大量的通信开销,尤其是在输出与输入相比较小的情况下。此方法不适用于例如Windows,因为multiprocessing.Process() 在那里创建了一个全新的 Python 进程,它不继承父进程的状态。

    利用 numpy 多线程。 根据您的实际计算任务,涉及multiprocessing 可能根本无济于事。如果您自己编译 numpy 并启用 OpenMP 指令,那么大型矩阵上的操作可能会变得非常高效地多线程(并且分布在许多 CPU 内核上;GIL 在这里不是限制因素)。基本上,这是在 numpy/scipy 的上下文中可以最有效地使用多个 CPU 内核。

*孩子一般不能直接访问父母的记忆。但是,在fork() 之后,父母和孩子处于等效状态。将父级的整个内存复制到 RAM 中的另一个位置是愚蠢的。这就是写时复制原则介入的原因。只要子进程不更改它的内存状态,它实际上会访问父进程的内存。只有修改后,才会将相应的点点滴滴复制到孩子的内存空间中。

主要修改:

让我添加一段代码,它使用多个工作进程处理大量输入数据,并遵循“1. 不要在 POSIX 兼容系统上腌制输入”的建议。此外,传输回工作管理器(父进程)的信息量非常少。这个例子的繁重计算部分是单值分解。它可以大量使用 OpenMP。我已经多次执行了这个例子:

一旦有 1、2 或 4 个工作进程和OMP_NUM_THREADS=1,因此每个工作进程创建的最大负载为 100%。在那里,提到的工作人员数量的计算时间扩展行为几乎是线性的,并且净加速因子对应于所涉及的工作人员数量。 使用 1、2 或 4 个工作进程和OMP_NUM_THREADS=4,这样每个进程创建的最大负载为 400%(通过生成 4 个 OpenMP 线程)。我的机器有 16 个真正的核心,因此 4 个最大负载为 400% 的进程将几乎从机器中获得最大的性能。缩放不再是完全线性的,加速因素不是所涉及的工作人员数量,但与OMP_NUM_THREADS=1 相比,绝对计算时间显着减少,并且时间仍然随着工作进程数量的增加而显着减少。 曾经有较大的输入数据、4 个内核和OMP_NUM_THREADS=4。它导致平均系统负载为 1253 %。 使用与上次相同的设置,但 OMP_NUM_THREADS=5。它导致平均系统负载为 1598%,这表明我们从那台 16 核机器上得到了一切。但是,与后一种情况相比,实际计算时间并没有改善。

代码:

import os
import time
import math
import numpy as np
from numpy.linalg import svd as svd
import multiprocessing


# If numpy is compiled for OpenMP, then make sure to control
# the number of OpenMP threads via the OMP_NUM_THREADS environment
# variable before running this benchmark.


MATRIX_SIZE = 1000
MATRIX_COUNT = 16


def rnd_matrix():
    offset = np.random.randint(1,10)
    stretch = 2*np.random.rand()+0.1
    return offset + stretch * np.random.rand(MATRIX_SIZE, MATRIX_SIZE)


print "Creating input matrices in parent process."
# Create input in memory. Children access this input.
INPUT = [rnd_matrix() for _ in xrange(MATRIX_COUNT)]


def worker_function(result_queue, worker_index, chunk_boundary):
    """Work on a certain chunk of the globally defined `INPUT` list.
    """
    result_chunk = []
    for m in INPUT[chunk_boundary[0]:chunk_boundary[1]]:
        # Perform single value decomposition (CPU intense).
        u, s, v = svd(m)
        # Build single numeric value as output.
        output =  int(np.sum(s))
        result_chunk.append(output)
    result_queue.put((worker_index, result_chunk))


def work(n_workers=1):
    def calc_chunksize(l, n):
        """Rudimentary function to calculate the size of chunks for equal 
        distribution of a list `l` among `n` workers.
        """
        return int(math.ceil(len(l)/float(n)))

    # Build boundaries (indices for slicing) for chunks of `INPUT` list.
    chunk_size = calc_chunksize(INPUT, n_workers)
    chunk_boundaries = [
        (i, i+chunk_size) for i in xrange(0, len(INPUT), chunk_size)]

    # When n_workers and input list size are of same order of magnitude,
    # the above method might have created less chunks than workers available. 
    if n_workers != len(chunk_boundaries):
        return None

    result_queue = multiprocessing.Queue()
    # Prepare child processes.
    children = []
    for worker_index in xrange(n_workers):
        children.append(
            multiprocessing.Process(
                target=worker_function,
                args=(
                    result_queue,
                    worker_index,
                    chunk_boundaries[worker_index],
                    )
                )
            )

    # Run child processes.
    for c in children:
        c.start()

    # Create result list of length of `INPUT`. Assign results upon arrival.
    results = [None] * len(INPUT)

    # Wait for all results to arrive.
    for _ in xrange(n_workers):
        worker_index, result_chunk = result_queue.get(block=True)
        chunk_boundary = chunk_boundaries[worker_index]
        # Store the chunk of results just received to the overall result list.
        results[chunk_boundary[0]:chunk_boundary[1]] = result_chunk

    # Join child processes (clean up zombies).
    for c in children:
        c.join()
    return results


def main():
    durations = []
    n_children = [1, 2, 4]
    for n in n_children:
        print "Crunching input with %s child(ren)." % n
        t0 = time.time()
        result = work(n)
        if result is None:
            continue
        duration = time.time() - t0
        print "Result computed by %s child process(es): %s" % (n, result)
        print "Duration: %.2f s" % duration
        durations.append(duration)
    normalized_durations = [durations[0]/d for d in durations]
    for n, normdur in zip(n_children, normalized_durations):
        print "%s-children speedup: %.2f" % (n, normdur)


if __name__ == '__main__':
    main()

输出:

$ export OMP_NUM_THREADS=1
$ /usr/bin/time python test2.py 
Creating input matrices in parent process.
Crunching input with 1 child(ren).
Result computed by 1 child process(es): [5587, 8576, 11566, 12315, 7453, 23245, 6136, 12387, 20634, 10661, 15091, 14090, 11997, 20597, 21991, 7972]
Duration: 16.66 s
Crunching input with 2 child(ren).
Result computed by 2 child process(es): [5587, 8576, 11566, 12315, 7453, 23245, 6136, 12387, 20634, 10661, 15091, 14090, 11997, 20597, 21991, 7972]
Duration: 8.27 s
Crunching input with 4 child(ren).
Result computed by 4 child process(es): [5587, 8576, 11566, 12315, 7453, 23245, 6136, 12387, 20634, 10661, 15091, 14090, 11997, 20597, 21991, 7972]
Duration: 4.37 s
1-children speedup: 1.00
2-children speedup: 2.02
4-children speedup: 3.81
48.75user 1.75system 0:30.00elapsed 168%CPU (0avgtext+0avgdata 1007936maxresident)k
0inputs+8outputs (1major+809308minor)pagefaults 0swaps

$ export OMP_NUM_THREADS=4
$ /usr/bin/time python test2.py 
Creating input matrices in parent process.
Crunching input with 1 child(ren).
Result computed by 1 child process(es): [22735, 5932, 15692, 14129, 6953, 12383, 17178, 14896, 16270, 5591, 4174, 5843, 11740, 17430, 15861, 12137]
Duration: 8.62 s
Crunching input with 2 child(ren).
Result computed by 2 child process(es): [22735, 5932, 15692, 14129, 6953, 12383, 17178, 14896, 16270, 5591, 4174, 5843, 11740, 17430, 15861, 12137]
Duration: 4.92 s
Crunching input with 4 child(ren).
Result computed by 4 child process(es): [22735, 5932, 15692, 14129, 6953, 12383, 17178, 14896, 16270, 5591, 4174, 5843, 11740, 17430, 15861, 12137]
Duration: 2.95 s
1-children speedup: 1.00
2-children speedup: 1.75
4-children speedup: 2.92
106.72user 3.07system 0:17.19elapsed 638%CPU (0avgtext+0avgdata 1022240maxresident)k
0inputs+8outputs (1major+841915minor)pagefaults 0swaps

$ /usr/bin/time python test2.py 
Creating input matrices in parent process.
Crunching input with 4 child(ren).
Result computed by 4 child process(es): [21762, 26806, 10148, 22947, 20900, 8161, 20168, 17439, 23497, 26360, 6789, 11216, 12769, 23022, 26221, 20480, 19140, 13757, 23692, 19541, 24644, 21251, 21000, 21687, 32187, 5639, 23314, 14678, 18289, 12493, 29766, 14987, 12580, 17988, 20853, 4572, 16538, 13284, 18612, 28617, 19017, 23145, 11183, 21018, 10922, 11709, 27895, 8981]
Duration: 12.69 s
4-children speedup: 1.00
174.03user 4.40system 0:14.23elapsed 1253%CPU (0avgtext+0avgdata 2887456maxresident)k
0inputs+8outputs (1major+1211632minor)pagefaults 0swaps

$ export OMP_NUM_THREADS=5
$ /usr/bin/time python test2.py 
Creating input matrices in parent process.
Crunching input with 4 child(ren).
Result computed by 4 child process(es): [19528, 17575, 21792, 24303, 6352, 22422, 25338, 18183, 15895, 19644, 20161, 22556, 24657, 30571, 13940, 18891, 10866, 21363, 20585, 15289, 6732, 10851, 11492, 29146, 12611, 15022, 18967, 25171, 10759, 27283, 30413, 14519, 25456, 18934, 28445, 12768, 28152, 24055, 9285, 26834, 27731, 33398, 10172, 22364, 12117, 14967, 18498, 8111]
Duration: 13.08 s
4-children speedup: 1.00
230.16user 5.98system 0:14.77elapsed 1598%CPU (0avgtext+0avgdata 2898640maxresident)k
0inputs+8outputs (1major+1219611minor)pagefaults 0swaps

【讨论】:

关于 point2 的其他问题:***.com/questions/15531556/… +1:最合理的解释。让我补充一下,除了在 numpy 中允许 OpenMP 之外,还应该使用供应商 blas 库(如果可用)。 我想知道您在numpy 中并行化矩阵操作时没有看到很多明显的性能提升的原因实际上可能是因为 numpy 使用外部 BLAS 和 LAPACK 库,这些库通常被编译为同时使用多个内核。如果您尝试并行运行 svd 之类的东西(它使用 LAPACK),也许每个工作人员仍然表现得好像在多个内核上执行一样,并且执行“次优”的事情,例如写入彼此的缓存等。 @ali_m:在第一个示例中,我们看到了理想的缩放比例(1-children speedup:1.00,2-children speedup:2.02,4-children speedup:3.81)。我猜你在说的是:如果有 4 个孩子/OMP_NUM_THREADS=1,计算持续时间:4.37 秒与 OMP_NUM_THREADS=4 的 2.95 秒。是的,到目前为止,这并不是因素 4 的变化(这本来是理想的)。然而,这是意料之中的。由于大型矩阵上的 SVD 涉及在 RAM、缓存和寄存器之间转移大量数据,因此相应的管道(尤其是 CPU 和 RAM 之间,即 Hypertransport/Quickpath/FSB)是瓶颈。很简单。 感谢示例代码。不幸的是,有时代码在“使用 1 个孩子(ren)处理输入”之后停止并永远停留在那里。但我还没有检查我的 numpy 版本的 OMP 支持。【参考方案3】:

您的代码是正确的。我只是在我的系统上运行它(有 2 个内核,超线程)并获得了以下结果:

$ python test_multi.py 
30.8623809814
19.3914041519

我查看了进程,正如预期的那样,并行部分显示多个进程以接近 100% 的速度工作。这必须是您的系统或 python 安装中的内容。

【讨论】:

感谢您尝试我的代码 +1 和您的评估。知道可能出了什么问题,或者我可以用谷歌搜索什么吗? 不确定是什么问题。你用的是什么系统?我会尝试除 Pool 之外的其他 multiprocessing 方法来启动,甚至是 Pool 使用不同的进程处理共享数组的一部分。【参考方案4】:

默认情况下,Pool 仅使用 n 个进程,其中 n 是您机器上的 CPU 数量。您需要指定您希望它使用多少个进程,例如Pool(5)

See here for more info

【讨论】:

【参考方案5】:

测量算术吞吐量是一项非常困难的任务:基本上你的测试用例太简单了,我看到了很多问题。

首先你要测试整数算术:有什么特殊原因吗?使用浮点,您可以获得在许多不同架构中具有可比性的结果。

第二个matrix = matrix*matrix覆盖输入参数(矩阵是通过ref而不是值传递),每个样本都要处理不同的数据...

最后的测试应该在更广泛的问题规模和工人数量上进行,以便掌握总体趋势。

这是我修改后的测试脚本

import numpy as np
from timeit import timeit
from multiprocessing import Pool

def mmul(matrix):
    mymatrix = matrix.copy()
    for i in range(100):
        mymatrix *= mymatrix
    return mymatrix

if __name__ == '__main__':

    for n in (16, 32, 64):
        matrices = []
        for i in range(n):
            matrices.append(np.random.random_sample(size=(1000, 1000)))

        stmt = 'from __main__ import mmul, matrices'
        print 'testing with', n, 'matrices'
        print 'base',
        print '%5.2f' % timeit('r = map(mmul, matrices)', setup=stmt, number=1)

        stmt = 'from __main__ import mmul, matrices, pool'
        for i in (1, 2, 4, 8, 16):
            pool = Pool(i)
            print "%4d" % i, 
            print '%5.2f' % timeit('r = pool.map(mmul, matrices)', setup=stmt, number=1)
            pool.close()
            pool.join()

和我的结果:

$ python test_multi.py 
testing with 16 matrices
base  5.77
   1  6.72
   2  3.64
   4  3.41
   8  2.58
  16  2.47
testing with 32 matrices
base 11.69
   1 11.87
   2  9.15
   4  5.48
   8  4.68
  16  3.81
testing with 64 matrices
base 22.36
   1 25.65
   2 15.60
   4 12.20
   8  9.28
  16  9.04

[更新] 我在家里的另一台计算机上运行此示例,获得一致的减速:

testing with 16 matrices
base  2.42
   1  2.99
   2  2.64
   4  2.80
   8  2.90
  16  2.93
testing with 32 matrices
base  4.77
   1  6.01
   2  5.38
   4  5.76
   8  6.02
  16  6.03
testing with 64 matrices
base  9.92
   1 12.41
   2 10.64
   4 11.03
   8 11.55
  16 11.59

我不得不承认,我不知道该怪谁(numpy、python、编译器、内核)...

【讨论】:

谢谢,但我收到以下错误消息:'Exception RuntimeError: RuntimeError('cannot join current thread',) in ignored' @Framester 请在pool.close() 之后添加pool.join();如果运行时间很短,您可以增加 timeit 中的迭代次数。 除了代码没有人可以责怪! :) 我在现代 16 核 E5-2650 系统上进行了尝试。我观察到大小为 2 和 4 的 mp 池的加速。在此之上,执行时间再次变得更糟。这段代码的并行化方法到目前为止效率不高。 Stefano:您在一台计算机上观察到的速度提升与所涉及的内核数量完全不成线性关系。解释两台计算机之间差异的合理理论:在第一个示例中,单核速度与管道传输性能之间的比率小于第二个示例。【参考方案6】:

既然你提到你有很多文件,我建议以下解决方案;

制作文件名列表。 编写一个函数来加载和处理一个名为输入参数的文件。 使用Pool.map() 将该函数应用于文件列表。

由于现在每个实例都加载自己的文件,因此传递的唯一数据是文件名,而不是(可能很大的)numpy 数组。

【讨论】:

【参考方案7】:

我还注意到,当我在 Pool.map() 函数中运行 numpy 矩阵乘法时,它在某些机器上的运行速度要慢得多。我的目标是使用 Pool.map() 并行化我的工作,并在我机器的每个核心上运行一个进程。当事情运行得很快时,numpy 矩阵乘法只是并行执行的整体工作的一小部分。当我查看进程的 CPU 使用率时,我可以看到每个进程都可以使用例如在运行缓慢的机器上 400+% CPU,但在快速运行的机器上总是 stop numpy from multithreading。事实证明,在我的 Pool.map() 运行缓慢的机器上,numpy 被设置为多线程。显然,如果您已经在使用 Pool.map() 进行并行化,那么让 numpy 也并行化只会产生干扰。在运行我的 Python 代码之前,我刚刚调用了 export MKL_NUM_THREADS=1,它在任何地方都运行得很快。

【讨论】:

【参考方案8】:

解决方案

在任何计算之前设置以下环境变量(对于一些早期版本的numpy,您可能需要在执行import numpy之前设置它们):

os.environ["OMP_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"
os.environ["OPENBLAS_NUM_THREADS"] = "1"
os.environ["VECLIB_MAXIMUM_THREADS"] = "1"
os.environ["NUMEXPR_NUM_THREADS"] = "1"

它是如何工作的

numpy 的实现已经使用多线程和优化库,例如 OpenMP 或 MKL 或 OpenBLAS 等。这就是为什么我们自己实现多处理并没有看到太大的改进。更糟糕的是,我们遭受了太多线程。例如,如果我的机器有 8 个 CPU 内核,当我编写 single 处理代码时,numpy 可能会使用 8 个线程进行计算。然后我使用多处理启动 8 个进程,得到 64 个线程。这是没有好处的,并且线程之间的上下文切换和其他开销可能会花费更多时间。通过设置上述环境变量,我们将每个进程的线程数限制为 1,因此我们得到了最有效的总线程数。

代码示例

from timeit import timeit
from multiprocessing import Pool
import sys
import os

import numpy as np

def matmul(_):
    matrix = np.ones(shape=(1000, 1000))
    _ = np.matmul(matrix, matrix)

def mixed(_):
    matrix = np.ones(shape=(1000, 1000))
    _ = np.matmul(matrix, matrix)

    s = 0
    for i in range(1000000):
        s += i

if __name__ == '__main__':
    if sys.argv[1] == "--set-num-threads":
        os.environ["OMP_NUM_THREADS"] = "1"
        os.environ["MKL_NUM_THREADS"] = "1"
        os.environ["OPENBLAS_NUM_THREADS"] = "1"
        os.environ["VECLIB_MAXIMUM_THREADS"] = "1"
        os.environ["NUMEXPR_NUM_THREADS"] = "1"

    if sys.argv[2] == "matmul":
        f = matmul
    elif sys.argv[2] == "mixed":
        f = mixed

    print("Serial:")
    print(timeit(lambda: list(map(f, [0] * 8)), number=20))

    with Pool(8) as pool:
        print("Multiprocessing:")
        print(timeit(lambda: pool.map(f, [0] * 8), number=20))

我在具有 8 个 vCPU(不一定意味着 8 个内核)的 AWS p3.2xlarge 实例上测试了代码:

$ python test_multi.py --no-set-num-threads matmul
Serial:
3.3447616740000115
Multiprocessing:
3.5941055110000093

$ python test_multi.py --set-num-threads matmul
Serial:
9.464500446000102
Multiprocessing:
2.570238267999912

在设置这些环境变量之前,串口版本和多进程版本没有太大区别,都在3秒左右,通常多进程版本比较慢,就像OP演示的一样。设置好线程数后,我们看到串口版本耗时 9.46 秒,变慢了很多!这证明 numpy 即使在使用单个进程时也正在使用多线程。 multiprocessing 版本耗时 2.57 秒,稍微改进了一点,这可能是因为我的实现节省了跨线程数据传输时间。

由于 numpy 已经在使用并行化,因此该示例没有显示多处理的强大功能。当普通的 Python 密集型 CPU 计算与 numpy 操作混合时,多处理是最有益的。例如

$ python test_multi.py --no-set-num-threads mixed
Serial:
12.380275611000116
Multiprocessing:
8.190792100999943

$ python test_multi.py --set-num-threads mixed
Serial:
18.512066430999994
Multiprocessing:
4.8058130150000125

这里将线程数设置为 1 的多处理是最快的。

备注:这也适用于其他一些 CPU 计算库,例如 PyTorch。

【讨论】:

以上是关于Multiprocessing.Pool 使 Numpy 矩阵乘法变慢的主要内容,如果未能解决你的问题,请参考以下文章

如何将二维数组作为 multiprocessing.Array 传递给 multiprocessing.Pool?

Python 多进程编程之multiprocessing--Pool

我们啥时候应该调用 multiprocessing.Pool.join?

`multiprocessing.Pool.map()` 似乎安排错误

multiprocessing.Pool() 比只使用普通函数慢

python之multiprocessing:multiprocessing.Pool