多处理池比手动实例化多个进程慢得多

Posted

技术标签:

【中文标题】多处理池比手动实例化多个进程慢得多【英文标题】:Multiprocessing Pool much slower than manually instantiating multiple Processes 【发布时间】:2021-01-29 21:19:37 【问题描述】:

我正在从一个大文件中读取一个块,将其作为行列表加载到内存中,然后在每一行上处理一个任务。

顺序解决方案耗时太长,因此我开始研究如何并行化它。

我想出的第一个解决方案是使用 Process 并管理列表中的每个子进程。

import multiprocessing as mp

BIG_FILE_PATH = 'big_file.txt'
CHUNKSIZE = '1000000'
N_PROCESSES = mp.cpu_count()


def read_in_chunks(file_object, chunk_size=1024):
    while True:
        data = file_object.read(chunk_size)
        if not data:
            break
        yield data


with open(BIG_FILE_PATH, encoding="Latin-1") as file:
    for piece in read_in_chunks(file, CHUNKSIZE):
        jobs = []
        piece_list = piece.splitlines()
        piece_list_len = len(piece_list)
        item_delta = round(piece_list_len/N_PROCESSES)
        start = 0
        for process in range(N_PROCESSES):
            finish = start + item_delta
            p = mp.Process(target=work, args=(piece_list[start:finish]))
            start = finish
            jobs.append(p)
            p.start()
        for job in jobs:
            job.join()

它在大约 2498 毫秒内完成每个块。

然后我发现了 Pool 工具来自动管理切片。

import multiprocessing as mp

BIG_FILE_PATH = 'big_file.txt'
CHUNKSIZE = '1000000'
N_PROCESSES = mp.cpu_count()


def read_in_chunks(file_object, chunk_size=1024):
    while True:
        data = file_object.read(chunk_size)
        if not data:
            break
        yield data


with open(BIG_FILE_PATH, encoding="Latin-1") as file:
    with mp.Pool(N_PROCESSES) as pool:
        for piece in read_in_chunks(file, CHUNKSIZE):
            piece_list = piece.splitlines()
            pool.map(work, piece_list)

它在大约 15540 毫秒内完成每个块,比手动慢 6 倍,但仍然比顺序快。

我是否使用了错误的池? 有没有更好或更快的方法来做到这一点?

感谢您的阅读。

更新

正如 Hannu 建议的那样,游泳池的开销很大。

Process 方法调用的工作函数需要一个行列表。

由于 Pool 决定切片的方式,由 Pool 方法调用的工作函数需要一行。

我不太确定如何让池一次给某个工人多条线路。

那应该能解决问题吧?

更新 2

最后一个问题,还有第三种更好的方法吗?

【问题讨论】:

您正在循环中创建Pool。因此,它被一次又一次地创建。在开始循环之前创建一次,如here所示。 哦不,我怎么看不到!谢谢,但运行时间不变。 【参考方案1】:

我对此并不完全确定,但在我看来,您的计划在提交给工人的内容方面存在重大差异。

在您的 Process 方法中,您似乎提交了大量行:

p = mp.Process(target=work, args=(piece_list[start:finish]))

但是当你使用 Pool 时,你会这样做:

for piece in read_in_chunks(file, CHUNKSIZE):
    piece_list = piece.splitlines()
    pool.map(work, piece_list)

读取您的文件分块,但是当您使用splitlines 时,您的piece_list 可迭代提交 个单位。

这意味着在您的流程方法中,您提交与 CPU 数量一样多的子任务,但在您的池方法中,您提交与源数据行数一样多的任务。如果您有很多行,这将在您的池中产生大量编排开销,因为每个工作人员一次只处理一行,然后完成,返回结果,然后池将另一行提交给新释放的工作人员。

如果这就是这里发生的事情,它肯定解释了为什么 Pool 需要更长的时间才能完成。

如果您将阅读器用作可迭代对象并跳过分行部分会发生什么:

pool.map(work, read_in_chunks(file, CHUNKSIZE))

【讨论】:

如果我按照您的建议使用阅读器,内存会饱和并开始交换,直到速度非常慢。但我确实明白你对开销的看法。事实上,Process 方法 work 函数需要一个行列表。而池方法 work 函数需要一行,因为池是如何迭代块的。 那么我建议降低 CHUNKSIZE。您的 read_in_chunks 是一个迭代器,因此将它与 map 一起使用应该绝对没问题。它不会在内存中读取您的文件,因此内存问题是由于您的工作人员在他们的盘子上太多或有太多工作人员造成的。尝试更小块或更少的工人,看看会发生什么。这通常需要反复试验才能找到最佳位置。 但是 Pool 中没有任何内容规定您的工作人员应该只处理一行。您应该能够使用与 Process 相同的 worker 并提交一个块而不是一行。 我同意你的看法。但无论 CHUNKSIZE 是多少,我相信池都是从迭代器读取直到 EOF。它永远不会停止阅读并开始工作。【参考方案2】:

我不知道这是否可行,但你可以试试这个吗?

if __name__ == "__main__":
    with open(BIG_FILE_PATH, encoding="Latin-1") as file:
        with mp.Pool(N_PROCESSES) as pool:
            for piece in read_in_chunks(file, CHUNKSIZE):
                piece_list = piece.splitlines()
            pool.map(work, piece_list)

我的推理:1. pool.map() ,只需要一次,你的代码正在循环它 2。我的猜测是循环使它变慢 3。因为并行处理应该更快呵呵

【讨论】:

您正在使用超出其范围的变量piece_list。我在整个文件的子集块上使用 map 因为文件太大而无法完全加载到内存中。【参考方案3】:

天啊!这是一个很有趣的过程,但仍然很有趣。

Pool.map 正在从迭代器中获取、腌制和分别将每个项目传递给每个工人。一旦工人完成,冲洗并重复,get -> pickle -> pass。这会产生明显的间接费用。

这实际上是有意的,因为 Pool.map 不够聪明,无法知道迭代器的长度,也无法有效地制作列表并在其中传递每个列表(chunk ) 给工人。

但是,它可以提供帮助。 简单地将列表转换为具有列表理解的块列表(lists)就像一个魅力,并将开销减少到与 Process 方法相同的水平。

import multiprocessing as mp

BIG_FILE_PATH = 'big_file.txt'
CHUNKSIZE = '1000000'
N_PROCESSES = mp.cpu_count()


def read_in_chunks(file_object, chunk_size=1024):
    while True:
        data = file_object.read(chunk_size)
        if not data:
            break
        yield data


with open(BIG_FILE_PATH, encoding="Latin-1") as file:
    with mp.Pool(N_PROCESSES) as pool:
        for piece in read_in_chunks(file, CHUNKSIZE):
            piece_list = piece.splitlines()
            piece_list_len = len(piece_list)
            item_delta = round(piece_list_len / N_PROCESSES)
            pool.map(work, [piece_list[i:i + item_delta] for i in range(0, piece_list_len, item_delta)])

这个具有列表迭代器列表的池与 Process 方法的运行时间完全相同。

【讨论】:

以上是关于多处理池比手动实例化多个进程慢得多的主要内容,如果未能解决你的问题,请参考以下文章

为啥以下简单的并行化代码比 Python 中的简单循环慢得多?

Intel 上的多线程比 AMD 慢得多

servlet 单例多线程

Servlet 单例多线程详解

Servlet 单例多线程详解

使用 HikariCP 到多个数据库的连接池