使用 numpy/scipy 最小化 Python multiprocessing.Pool 中的开销

Posted

技术标签:

【中文标题】使用 numpy/scipy 最小化 Python multiprocessing.Pool 中的开销【英文标题】:Minimize overhead in Python multiprocessing.Pool with numpy/scipy 【发布时间】:2016-09-01 07:50:11 【问题描述】:

我已经花费了几个小时来尝试并行化我的数字运算代码,但这样做只会变得更慢。不幸的是,当我尝试将其简化为下面的示例时,问题就消失了,我真的不想在这里发布整个程序。所以问题是:在这种类型的程序中我应该避免哪些陷阱?

(注:Unutbu 回答后的后续在底部。)

具体情况如下:

这是关于一个模块,它定义了一个类BigData,其中包含大量内部数据。在示例中,有一个列表ff 的插值函数;在实际程序中,还有更多,例如ffA[k]ffB[k]ffC[k]。 计算将被归类为“令人尴尬的并行”:一次可以对较小的数据块进行工作。在示例中为do_chunk()。 在我的实际程序中,示例中显示的方法会导致性能最差:每个块大约 1 秒(在单线程中完成时实际计算时间大约为 0.1 秒)。因此,对于 n=50,do_single() 将在 5 秒内运行,do_multi() 将在 55 秒内运行。 我还尝试通过将xiyi 数组分割成连续的块并遍历每个块中的所有k 值来拆分工作。那效果好一点。现在,无论我使用 1、2、3 还是 4 个线程,总执行时间都没有差异。但当然,我希望看到实际的加速! 这可能是相关的:Multiprocessing.Pool makes Numpy matrix multiplication slower。然而,在程序的其他地方,我使用了一个多处理池来进行更加孤立的计算:一个看起来像def do_chunk(array1, array2, array3) 的函数(未绑定到类),并且只对该数组进行 numpy 计算。在那里,速度显着提升。 CPU 使用率与预期的并行进程数一样(三个线程的 CPU 使用率 300%)。
#!/usr/bin/python2.7

import numpy as np, time, sys
from multiprocessing import Pool
from scipy.interpolate import RectBivariateSpline

_tm=0
def stopwatch(msg=''):
    tm = time.time()
    global _tm
    if _tm==0: _tm = tm; return
    print("%s: %.2f seconds" % (msg, tm-_tm))
    _tm = tm

class BigData:
    def __init__(self, n):
        z = np.random.uniform(size=n*n*n).reshape((n,n,n))
        self.ff = []
        for i in range(n):
            f = RectBivariateSpline(np.arange(n), np.arange(n), z[i], kx=1, ky=1)
            self.ff.append(f)
        self.n = n

    def do_chunk(self, k, xi, yi):
        s = np.sum(np.exp(self.ff[k].ev(xi, yi)))
        sys.stderr.write(".")
        return s

    def do_multi(self, numproc, xi, yi):
        procs = []
        pool = Pool(numproc)
        stopwatch('Pool setup')
        for k in range(self.n):
            p = pool.apply_async( _do_chunk_wrapper, (self, k, xi, yi))
            procs.append(p)
        stopwatch('Jobs queued (%d processes)' % numproc)
        sum = 0.0
        for k in range(self.n):
            # Edit/bugfix: replaced p.get by procs[k].get
            sum += np.sum(procs[k].get(timeout=30)) # timeout allows ctrl-C interrupt
            if k == 0: stopwatch("\nFirst get() done")
        stopwatch('Jobs done')
        pool.close()
        pool.join()
        return sum

    def do_single(self, xi, yi):
        sum = 0.0
        for k in range(self.n):
            sum += self.do_chunk(k, xi, yi)
        stopwatch('\nAll in single process')
        return sum

def _do_chunk_wrapper(bd, k, xi, yi): # must be outside class for apply_async to chunk
    return bd.do_chunk(k, xi, yi)        

if __name__ == "__main__":
    stopwatch()
    n = 50
    bd = BigData(n)
    m = 1000*1000
    xi, yi = np.random.uniform(0, n, size=m*2).reshape((2,m))
    stopwatch('Initialized')
    bd.do_multi(2, xi, yi)
    bd.do_multi(3, xi, yi)
    bd.do_single(xi, yi)

输出:

Initialized: 0.06 seconds
Pool setup: 0.01 seconds
Jobs queued (2 processes): 0.03 seconds
..
First get() done: 0.34 seconds
................................................Jobs done: 7.89 seconds
Pool setup: 0.05 seconds
Jobs queued (3 processes): 0.03 seconds
..
First get() done: 0.50 seconds
................................................Jobs done: 6.19 seconds
..................................................
All in single process: 11.41 seconds

时间安排在 Intel Core i3-3227 CPU 上,2 核 4 线程,运行 64 位 Linux。对于实际程序,多进程版本(池机制,即使只使用一个内核)比单进程版本慢 10 倍。

跟进

Unutbu 的回答让我走上了正轨。在实际程序中,self 被腌制成一个 37 到 140 MB 的对象,需要传递给工作进程。更糟糕的是,Python 酸洗非常慢;酸洗本身需要几秒钟,这发生在传递给工作进程的每一块工作中。除了pickling和传递大数据对象,apply_async在Linux中的开销很小;对于一个小函数(添加几个整数参数),每对 apply_async/get 只需要 0.2 毫秒。因此,将工作分成非常小的块本身并不是问题。因此,我将所有大数组参数作为索引传输到全局变量。出于 CPU 缓存优化的目的,我将块大小保持得很小。

全局变量存储在一个全局dict中;设置工作池后,这些条目会立即在父进程中删除。只有dict 的密钥被传输到工作进程。酸洗/IPC唯一的大数据是工人创造的新数据。

#!/usr/bin/python2.7

import numpy as np, sys
from multiprocessing import Pool

_mproc_data =   # global storage for objects during multiprocessing.

class BigData:
    def __init__(self, size):
        self.blah = np.random.uniform(0, 1, size=size)

    def do_chunk(self, k, xi, yi):
        # do the work and return an array of the same shape as xi, yi
        zi = k*np.ones_like(xi)
        return zi

    def do_all_work(self, xi, yi, num_proc):
        global _mproc_data
        mp_key = str(id(self))
        _mproc_data['bd'+mp_key] = self # BigData
        _mproc_data['xi'+mp_key] = xi
        _mproc_data['yi'+mp_key] = yi
        pool = Pool(processes=num_proc)
        # processes have now inherited the global variabele; clean up in the parent process
        for v in ['bd', 'xi', 'yi']:
            del _mproc_data[v+mp_key]

        # setup indices for the worker processes (placeholder)
        n_chunks = 45
        n = len(xi)
        chunk_len = n//n_chunks
        i1list = np.arange(0,n,chunk_len)
        i2list = i1list + chunk_len
        i2list[-1] = n
        klist = range(n_chunks) # placeholder

        procs = []
        for i in range(n_chunks):
            p = pool.apply_async( _do_chunk_wrapper, (mp_key, i1list[i], i2list[i], klist[i]) )
            sys.stderr.write(".")
            procs.append(p)
        sys.stderr.write("\n")

        # allocate space for combined results
        zi = np.zeros_like(xi)

        # get data from workers and finish  
        for i, p in enumerate(procs):
            zi[i1list[i]:i2list[i]] = p.get(timeout=30) # timeout allows ctrl-C handling

        pool.close()
        pool.join()

        return zi

def _do_chunk_wrapper(key, i1, i2, k):
    """All arguments are small objects."""
    global _mproc_data
    bd = _mproc_data['bd'+key]
    xi = _mproc_data['xi'+key][i1:i2]
    yi = _mproc_data['yi'+key][i1:i2]
    return bd.do_chunk(k, xi, yi)


if __name__ == "__main__":
    xi, yi = np.linspace(1, 100, 100001), np.linspace(1, 100, 100001)
    bd = BigData(int(1e7))
    bd.do_all_work(xi, yi, 4)

这是速度测试的结果(同样,2 核,4 线程),改变工作进程的数量和块中的内存量(xiyi、@987654346 的总字节数@ 数组切片)。这些数字以“每秒百万个结果值”为单位,但这对于比较而言并不重要。 “1 个进程”所在的行是对 do_chunk 的直接调用,包含完整的输入数据,没有任何子进程。

#Proc   125K    250K    500K   1000K   unlimited
1                                      0.82 
2       4.28    1.96    1.3     1.31 
3       2.69    1.06    1.06    1.07 
4       2.17    1.27    1.23    1.28 

内存中数据大小的影响是相当显着的。 CPU 具有 3 MB 共享 L3 高速缓存,以及每个内核 256 KB L2 高速缓存。请注意,计算还需要访问 BigData 对象的几 MB 内部数据。因此,我们从中学到的是,进行这种速度测试很有用。对于这个程序,2个进程最快,4个次之,3个最慢。

【问题讨论】:

旁白:你看过dask吗?它可能会使您的许多多处理任务变得更简单。 @ajcr 还没有。但现在我想尽量减少外部依赖的数量,因为我可能需要在我没有管理员权限的服务器上运行它,并与可能有相同限制的其他人共享它。 这也让我害怕:“Dask 数组在大型数组上实现了 NumPy 接口的 子集”。这听起来像是与现有代码交互的大量潜在工作。 关于:“每个块大约 1 秒(在单线程中完成时实际计算时间大约为 0.1 秒)”?这是否意味着对 pool.apply_async(_do_chunk_wrapper, ...).get() 的一次调用需要 1 秒(在您的实际代码中)?而在单线程中调用self.do_chunk 需要 0.1 秒? @unutbu 是的。我没有加速,而是减速了 10 倍......(问题已更新以澄清) 【参考方案1】:

尽量减少进程间通信。 在multiprocessing 模块中,所有(单机)进程间通信都通过队列完成。通过队列传递的对象 被腌制。所以尝试通过队列发送更少和/或更小的对象。

不要通过队列发送selfBigData 的实例。它相当大,并且随着self 中数据量的增长而变大:

In [6]: import pickle
In [14]: len(pickle.dumps(BigData(50)))
Out[14]: 1052187

每个 时间pool.apply_async( _do_chunk_wrapper, (self, k, xi, yi)) 被调用, self 在主进程中被腌制,在工作进程中被解封。这 len(pickle.dumps(BigData(N))) 的大小增加了 N

让数据从一个全局变量中读取。在 Linux 上,您可以利用 Copy-on-Write。作为Jan-Philip Gehrcke explains:

在 fork() 之后,parent 和 child 处于等价状态。将父级的整个内存复制到 RAM 中的另一个位置是愚蠢的。这就是copy-on-write原则[进来]的地方。只要孩子不改变它的内存状态,它实际上会访问父母的内存。只有修改后,才会将相应的点点滴滴复制到孩子的内存空间中。

因此,您可以避免通过队列传递BigData 的实例 通过简单地将实例定义为全局 bd = BigData(n),(正如您已经在做的那样)并在工作进程中引用它的值(例如 _do_chunk_wrapper)。它基本上相当于从对pool.apply_async的调用中删除self

p = pool.apply_async(_do_chunk_wrapper, (k_start, k_end, xi, yi))

并以全局方式访问 bd,并对 do_chunk_wrapper 的呼叫签名进行必要的服务更改。

尝试将运行时间较长的函数 func 传递给 pool.apply_async。 如果您对pool.apply_async 有许多快速完成的调用,那么通过队列传递参数和返回值的开销将成为整个时间的重要组成部分。相反,如果您减少对pool.apply_async 的调用,并在返回结果之前让每个func 做更多的工作,那么进程间通信将占总时间的一小部分。

下面,我修改了_do_chunk_wrapper 以接受k_startk_end 参数,这样每次调用pool.apply_async 都会在返回结果之前计算k 的许多值的总和。


import math
import numpy as np
import time
import sys
import multiprocessing as mp
import scipy.interpolate as interpolate

_tm=0
def stopwatch(msg=''):
    tm = time.time()
    global _tm
    if _tm==0: _tm = tm; return
    print("%s: %.2f seconds" % (msg, tm-_tm))
    _tm = tm

class BigData:
    def __init__(self, n):
        z = np.random.uniform(size=n*n*n).reshape((n,n,n))
        self.ff = []
        for i in range(n):
            f = interpolate.RectBivariateSpline(
                np.arange(n), np.arange(n), z[i], kx=1, ky=1)
            self.ff.append(f)
        self.n = n

    def do_chunk(self, k, xi, yi):
        n = self.n
        s = np.sum(np.exp(self.ff[k].ev(xi, yi)))
        sys.stderr.write(".")
        return s

    def do_chunk_of_chunks(self, k_start, k_end, xi, yi):
        s = sum(np.sum(np.exp(self.ff[k].ev(xi, yi)))
                    for k in range(k_start, k_end))
        sys.stderr.write(".")
        return s

    def do_multi(self, numproc, xi, yi):
        procs = []
        pool = mp.Pool(numproc)
        stopwatch('\nPool setup')
        ks = list(map(int, np.linspace(0, self.n, numproc+1)))
        for i in range(len(ks)-1):
            k_start, k_end = ks[i:i+2]
            p = pool.apply_async(_do_chunk_wrapper, (k_start, k_end, xi, yi))
            procs.append(p)
        stopwatch('Jobs queued (%d processes)' % numproc)
        total = 0.0
        for k, p in enumerate(procs):
            total += np.sum(p.get(timeout=30)) # timeout allows ctrl-C interrupt
            if k == 0: stopwatch("\nFirst get() done")
        print(total)
        stopwatch('Jobs done')
        pool.close()
        pool.join()
        return total

    def do_single(self, xi, yi):
        total = 0.0
        for k in range(self.n):
            total += self.do_chunk(k, xi, yi)
        stopwatch('\nAll in single process')
        return total

def _do_chunk_wrapper(k_start, k_end, xi, yi): 
    return bd.do_chunk_of_chunks(k_start, k_end, xi, yi)        

if __name__ == "__main__":
    stopwatch()
    n = 50
    bd = BigData(n)
    m = 1000*1000
    xi, yi = np.random.uniform(0, n, size=m*2).reshape((2,m))
    stopwatch('Initialized')
    bd.do_multi(2, xi, yi)
    bd.do_multi(3, xi, yi)
    bd.do_single(xi, yi)

产量

Initialized: 0.15 seconds

Pool setup: 0.06 seconds
Jobs queued (2 processes): 0.00 seconds

First get() done: 6.56 seconds
83963796.0404
Jobs done: 0.55 seconds
..
Pool setup: 0.08 seconds
Jobs queued (3 processes): 0.00 seconds

First get() done: 5.19 seconds
83963796.0404
Jobs done: 1.57 seconds
...
All in single process: 12.13 seconds

与原代码对比:

Initialized: 0.10 seconds
Pool setup: 0.03 seconds
Jobs queued (2 processes): 0.00 seconds

First get() done: 10.47 seconds
Jobs done: 0.00 seconds
..................................................
Pool setup: 0.12 seconds
Jobs queued (3 processes): 0.00 seconds

First get() done: 9.21 seconds
Jobs done: 0.00 seconds
..................................................
All in single process: 12.12 seconds

【讨论】:

有道理,谢谢。在我的实际程序中,BigData 是 37 MB,腌制。我在想 IPC 足够快,可以在几分之一秒内处理 37 MB,但真正的瓶颈似乎是 pickle.loads(pickle.dumps(bigdata)) 在我的系统上需要 2.8 秒!现在我可以继续第四次重新实现多处理代码......我需要确保为全局变量(可能是 bigdata_ 之类的变量名)正确记账,并确保任何对变量的修改发生在父进程中。 小块的原因是我想将所有正在处理的数据保留在 CPU 缓存中。不过,内存带宽似乎是一个次要因素。 是的,这是另一个重要的考虑因素。上面,我设置了ks = list(map(int, np.linspace(0, self.n, numproc+1)))。这使得对apply_async 的调用次数等于numproc。您可能希望使用ks = list(map(int, np.linspace(0, self.n, numchunks+1))) 并尝试使用numchunks 的其他值。 我的示例代码中有一个错误;最后一个池条目总是getting。修复后,多 CPU 运行时与您的更相似。不过,我会摆脱头顶的泡菜/解泡菜! 新实现完成;在问题中添加了“跟进”部分。

以上是关于使用 numpy/scipy 最小化 Python multiprocessing.Pool 中的开销的主要内容,如果未能解决你的问题,请参考以下文章

Python地理数据处理环境的搭建和准备(numpy,scipy,matplotlib)

python numpy/scipy曲线拟合

Python (NumPy, SciPy),寻找矩阵的零空间

[使用Python,NumPy,SciPy使用矩阵乘法对矩阵进行有效切片

使用 python 3 和 numpy、scipy、pillow 和 matplotlib 在 Raspberry Pi 3B+ 上构建 docker 映像失败

在Python / Numpy / Scipy中找到两个数组之间的插值交集