具有大对象的 Python 多处理:防止对象的复制/序列化

Posted

技术标签:

【中文标题】具有大对象的 Python 多处理:防止对象的复制/序列化【英文标题】:Python multiprocessing with large objects: prevent copying/serialization of object 【发布时间】:2020-05-01 00:09:28 【问题描述】:

我已经针对较大对象的一些问题实现了多处理,如下所示:

import time
import pathos.multiprocessing as mp
from functools import partial
from random import randrange


class RandomNumber():
    def __init__(self, object_size=100):
        self.size = bytearray(object_size*10**6)  # 100 MB size
        self.foo = None

    def do_something(self, *args, **kwargs):
        self.foo = randrange(1, 10)
        time.sleep(0.5)  # wait for 0.5 seconds
        return self


def wrapper(random_number, *args, **kwargs):
    return random_number.do_something(*args, **kwargs)


if __name__ == '__main__':
    # create data
    numbers = [RandomNumber() for m in range(0, 9)]
    kwds = 'add': randrange(1, 10)

    # calculate
    pool = mp.Pool(processes=mp.cpu_count())
    result = pool.map_async(partial(wrapper, **kwds), numbers)
    try:
        result = result.get()
    except:
        pass

    # print result
    my_results = [i.foo for i in result]
    print(my_results)

    pool.close()
    pool.join()

产生类似的东西:

[8, 7, 8, 3, 1, 2, 6, 4, 8]

现在的问题是,当对象非常小时,与使用列表推导相比,我在性能上有了巨大的改进,而这种改进在更大的对象大小时变成了相反的情况,例如100 MB 或更大。

从documentation 和其他问题中,我发现这是由于使用 pickle/dill 对单个对象进行序列化以便将它们传递给池中的工作人员造成的。换句话说:对象被复制,这个 IO 操作成为瓶颈,因为它比实际计算更耗时。

我已经尝试使用multiprocessing.Manager 处理同一个对象,但这会导致运行时间更长。

问题是我被绑定到一个特定的类结构(这里通过RandomNumber() 表示),我无法改变它..

现在我的问题是:是否有任何方法或概念来规避这种行为,并且只在do_something() 上调用我,而无需序列化或复制的开销?

欢迎任何提示。提前致谢!

【问题讨论】:

这是否涉及 IO 或 CPU 密集型任务 这是一个占用大量 CPU 资源的任务。我将数学求解器应用于特定问题,而数据负责对象的内存大小。 【参考方案1】:

您需要使用Batch processing。不要为每个号码创建销毁工人。 根据cpu_count 制作有限的工人。然后将一个列表传递给每个工作并处理它们。使用map 并传递一个包含batches 数字的列表。

【讨论】:

您能否提供一个最小示例或有关如何实现此功能的链接? @CordKaldemeyer 我不在电脑前....将尝试给出一个想法....从您的主列表创建子列表....现在将每个子列表传递给 map....in您的函数在 Len 列出并执行某些操作时执行...一旦您运行此命令...您将获得批处理或子列表的最佳大小.... 当开始运行这个......玩batchsize......你最终会找到这个任务的正确大小【参考方案2】:

我从concurrent.futures 库中找到了一个使用多处理或多线程的解决方案,它不需要腌制对象。在我的例子中,使用ThreadPoolExecutor 的多线程比使用ProcessPoolExecutor 的多线程带来了明显的优势。

import time
from random import randrange
import concurrent.futures as cf


class RandomNumber():
    def __init__(self, object_size=100):
        self.size = bytearray(object_size*10**6)  # 100 MB size
        self.foo = None

    def do_something(self, *args, **kwargs):
        self.foo = randrange(1, 10)
        time.sleep(0.5)  # wait for 0.5 seconds
        return self


def wrapper(random_number, *args, **kwargs):
    return random_number.do_something(*args, **kwargs)


if __name__ == '__main__':
    # create data
    numbers = [RandomNumber() for m in range(0, 100)]
    kwds = 'add': randrange(1, 10)

    # run
    with cf.ThreadPoolExecutor(max_workers=3) as executor:
        result = executor.map(wrapper, numbers, timeout=5*60)

    # print result
    my_results = [i.foo for i in result]
    print(my_results)

产量:

[3, 3, 1, 1, 3, 7, 7, 6, 7, 5, 9, 5, 6, 5, 6, 9, 1, 5, 1, 7, 5, 3, 6, 2, 9, 2, 1, 2, 5, 1, 7, 9, 2, 9, 4, 9, 8, 5, 2, 1, 7, 8, 5, 1, 4, 5, 8, 2, 2, 5, 3, 6, 3, 2, 5, 3, 1, 9, 6, 7, 2, 4, 1, 5, 4, 4, 4, 9, 3, 1, 5, 6, 6, 8, 4, 4, 8, 7, 5, 9, 7, 8, 6, 2, 3, 1, 7, 2, 4, 8, 3, 6, 4, 1, 7, 7, 3, 4, 1, 2]

real    0m21.100s
user    0m1.100s
sys 0m2.896s

尽管如此,如果我有太多对象(此处为numbers),这仍然会导致内存泄漏,并且如果必须交换内存(即系统冻结直到任务完成。

关于如何防止这种情况的任何提示?

【讨论】:

以上是关于具有大对象的 Python 多处理:防止对象的复制/序列化的主要内容,如果未能解决你的问题,请参考以下文章

python中的多处理-在多个进程之间共享大对象(例如pandas数据框)

Python创建具有相同值的新对象

多处理 - 共享一个复杂的对象

我可以有两个具有相同属性名称的对象吗? [复制]

python是不是有检查对象是不是具有属性的简写? [复制]

具有分布式集群的 Python 多处理