通过并行化加速随机数生成

Posted

技术标签:

【中文标题】通过并行化加速随机数生成【英文标题】:Speeding up random number generation by parallelizing 【发布时间】:2021-12-25 22:11:06 【问题描述】:

我需要使用来自标准正态分布的随机数创建许多大型 numpy 数组(4e6、100),我正在努力加快速度。我尝试使用多个内核生成阵列的不同部分,但我没有得到预期的速度提升。是我做错了什么,还是期望以这种方式提高速度是错误的?

from numpy.random import default_rng
from multiprocessing import Pool
from time import time


def rng_mp(rng):
    return rng.standard_normal((250000, 100))


if __name__ == '__main__':

    n_proc = 4
    rngs = [default_rng(n) for n in range(n_proc)]
    rng_all = default_rng(1)

    start = time()
    result = rng_all.standard_normal((int(1e6), 100))
    print(f'Single process: time() - start:.3f seconds')

    start = time()
    with Pool(processes=n_proc) as p:
        result = p.map_async(rng_mp, rngs).get()
    print(f'MP: time() - start:.3f seconds')

    # Single process: 1.114 seconds
    # MP: 2.634 seconds

【问题讨论】:

如果你真的得到了 ~2.6 秒的多处理持续时间,这些值我不会抱怨。我在我的 3GHz 10 核 Intel Xeon W 上试过这个,它花了大约 10 秒 你也用了4核还是增加了核数?我有一个 i7-6700HQ 2.6GHz 英特尔。问题在于与单进程速度的比较,我不明白为什么多进程更慢。 NumPy 实现很可能已经使用了多个内核,因此您只是在增加开销。来自here: "... 但是现在许多架构都有一个 BLAS,它也利用了多核机器。如果你的 numpy/scipy 是使用其中之一编译的,那么 dot() 将被并行计算(如果这更快),而无需您做任何事情。..." 【参考方案1】:

这并不是对原始问题的回答 - 更多的是后续问题,我无法回答。

我重新排列了代码,试图看看这里到底发生了什么。

from numpy.random import default_rng
from concurrent.futures import ProcessPoolExecutor
import time

NPROC = 4

def rng_mp(i):
    s = time.perf_counter()
    r = default_rng(i).standard_normal((250000, 100))
    e = time.perf_counter()
    print(f'Process i e-s:.2fs')
    return r


if __name__ == '__main__':
    start = time.perf_counter()
    with ProcessPoolExecutor() as executor:
        for fr in [executor.submit(rng_mp, i) for i in range(NPROC)]:
            s = time.perf_counter()
            fr.result()
            e = time.perf_counter()
            print(f'Result time e-s:.2f')
    end = time.perf_counter()
    print(f'Overall end - start:.3f seconds')

一个典型的输出如下:

进程 0 0.33s 流程2 0.33s 流程1 0.33s 进程 3 0.33s 结果时间 2.27 结果时间 5.57 结果时间 0.00 结果时间 0.00 总时长 7.999 秒

换句话说,ring_mp() 进程执行得非常及时。 但是延迟似乎是在获取结果时,我只能猜测这与在子进程和主进程之间移动大量内存有关。 FWIW 我在 macOS 12.0.1 上使用 Python 3.9.8 运行 numpy 1.21.4。我无法解释。

更新:根据@Booboo 的回答,我改为使用 ThreadPoolExecutor(无需其他更改),结果如下:

流程 3 0.34 秒 流程1 0.35s 进程 0 0.35s 结果时间 0.35 结果时间 0.00 流程2 0.35s 结果时间 0.00 结果时间 0.00 总共 0.388 秒

【讨论】:

【参考方案2】:

我怀疑减速仅仅是因为您需要将大量数据从子进程的地址空间移回主进程。我还怀疑用于随机数生成的 C 语言实现 numpy 会释放全局解释器锁,并且使用多线程而不是多处理可以解决您的性能问题:

from numpy.random import default_rng
from multiprocessing.pool import ThreadPool
from time import time


def rng_mp(rng):
    return rng.standard_normal((250000, 100))


if __name__ == '__main__':

    n_proc = 4
    rngs = [default_rng(n) for n in range(n_proc)]
    rng_all = default_rng(1)

    start = time()
    result = rng_all.standard_normal((int(1e6), 100))
    print(f'Single process: time() - start:.3f seconds')

    start = time()
    with ThreadPool(processes=n_proc) as p:
        result = p.map_async(rng_mp, rngs).get()
    print(f'MT: time() - start:.3f seconds')

打印:

Single process: 1.210 seconds
MT: 0.413 seconds

【讨论】:

【参考方案3】:

感谢其他贡献者提出这个建议,但我找到了一种比其他建议更快的方法,因为它使用填充现有数组而不是创建新数组。它改编自 numpy 文档 here,针对二维数组进行了优化。

from numpy.random import default_rng, SeedSequence
import multiprocessing
import concurrent.futures
import numpy as np
from time import time


class MultithreadedRNG2D:
    def __init__(self, shape, seed=None, threads=None):
        if threads is None:
            threads = multiprocessing.cpu_count()
        self.threads = threads

        seq = SeedSequence(seed)
        self._random_generators = [default_rng(s)
                                   for s in seq.spawn(threads)]

        self.shape = shape
        self.executor = concurrent.futures.ThreadPoolExecutor(threads)
        self.values = np.empty(shape)
        self.steps = [(t * (shape[0] // threads), (t + 1) * (shape[0] // threads))
                      if t < (threads - 1)
                      else (t * (shape[0] // threads), shape[0])
                      for t in range(threads)]

    def fill(self):
        def _fill(random_state, out, firstrow, lastrow):
            random_state.standard_normal(out=out[firstrow:lastrow])

        futures = 
        for i in range(self.threads):
            args = (_fill,
                    self._random_generators[i],
                    self.values,
                    self.steps[i][0],
                    self.steps[i][1])
            futures[self.executor.submit(*args)] = i
        concurrent.futures.wait(futures)

    def __del__(self):
        self.executor.shutdown(False)


mrng = MultithreadedRNG2D((int(1e6), 100), seed=1, threads=4)
start = time()
mrng.fill()
print(f'MT: time() - start:.3f seconds')

# MT: 0.336 seconds

【讨论】:

说句公道话,你也应该把mrng = MultithreadedRNG2D((int(1e6), 100), seed=1, threads=4) 放在时间上 也许,但在我的情况下,我需要创建许多相同形状的数组,我只需要实例化一次,因此fill 的执行时间最重要。这也是为什么我只是在这里添加它作为参考,但没有将它作为接受的答案。【参考方案4】:

我其他答案的逻辑现在在包mtalg 中实现,该包旨在使用多线程生成随机数。

from mtalg.random import MultithreadedRNG
mrng = MultithreadedRNG(seed=1, num_threads=4)
mrng.standard_normal(size=(int(1e6), 100))

【讨论】:

以上是关于通过并行化加速随机数生成的主要内容,如果未能解决你的问题,请参考以下文章

在并行程序中播种随机数生成器

使用 scikit-learn 并行生成随机森林

以均匀随机分布生成任意斜率随机线性分布

并行化随机森林

测量随机算法中的并行加速

如何使用 CUDA 生成随机排列