如何使用共享内存而不是通过多个进程之间的酸洗来传递对象

Posted

技术标签:

【中文标题】如何使用共享内存而不是通过多个进程之间的酸洗来传递对象【英文标题】:How to use shared memory instead of passing objects via pickling between multiple processes 【发布时间】:2019-05-06 01:30:07 【问题描述】:

我正在研究一个以加法模型为中心的 CPU 密集型机器学习问题。由于加法是主要操作,我可以将输入数据分成几部分并生成多个模型,然后通过覆盖的__add__ 方法合并它们。

与多处理相关的代码如下所示:

def pool_worker(filename, doshuffle):
    print(f"Processing file: filename")
    with open(filename, 'r') as f:
        partial = FragmentModel(order=args.order, indata=f, shuffle=doshuffle)
        return partial

def generateModel(is_mock=False, save=True):
    model = None
    with ThreadPool(args.nthreads) as pool:
        from functools import partial
        partial_models = pool.imap_unordered(partial(pool_worker, doshuffle=is_mock), args.input)
        i = 0
        for m in partial_models:
            logger.info(f'Starting to merge model i')
            if model is None:
                import copy
                model = copy.deepcopy(m)
            else:
                model += m
            logger.info(f'Done merging...')
            i += 1

    return model

问题在于,随着模型阶数的增加,内存消耗呈指数级增长,因此在阶数 4 时,模型的每个实例大约为 4-5 GB,这会导致线程池崩溃,因为中间模型对象不可腌制。

我读到了一点,它似乎即使酸洗不是问题,传递这样的数据仍然非常低效,正如this answer所评论的那样。

但是,关于如何为此目的使用共享内存的指导很少。是否可以避免这个问题而不必更改模型对象的内部结构?

【问题讨论】:

这回答了如何使用共享内存在进程之间共享数据并关闭酸洗:***.com/a/14135569/9521723 @SimonF 问题之间存在关键区别,您链接的问题是指子进程引用(即读取但不写入)大对象。就我而言,我想返回大对象,我的子进程彼此独立地获取输入数据。 使用多处理模块。阅读其文档以了解如何操作。 按照@Dima_Tisnek 的建议使用文件是正确的选择。云服务通常以文件格式存储大数据,尤其是当您的单个块达到 GB 时。在所有块转储到文件系统后,可以进行合并。 @knh190 问题是大型numpy数组都是自定义对象中的变量 【参考方案1】:

您应该使用 Manager 代理对象来共享可编辑对象:https://docs.python.org/3/library/multiprocessing.html#multiprocessing-managers 访问锁将由该 Manager 代理对象处理。

在Customized managers 部分有一个例子,应该适合你:

from multiprocessing.managers import BaseManager

class MathsClass:
    def add(self, x, y):
        return x + y
    def mul(self, x, y):
        return x * y

class MyManager(BaseManager):
    pass

MyManager.register('Maths', MathsClass)

if __name__ == '__main__':
    with MyManager() as manager:
        maths = manager.Maths()
        print(maths.add(4, 3))         # prints 7
        print(maths.mul(7, 8))         # prints 56

之后,您必须从不同的进程(如using a remote manager 所示)连接到该管理器并根据需要进行编辑。

【讨论】:

【参考方案2】:

查看ray 项目,这是一个使用apache arrow 进行序列化的分布式执行框架。如果您正在使用 numpy 数组,它尤其棒,因此它是 ML 工作流程的绝佳工具。

这是来自object serialization 上的文档的 sn-p

在 Ray 中,我们使用 Apache Arrow 数据优化 numpy 数组 格式。当我们从对象反序列化一个 numpy 数组列表时 存储,我们仍然创建一个 numpy 数组对象的 Python 列表。然而, 而不是复制每个 numpy 数组,每个 numpy 数组对象都包含一个 指向共享内存中保存的相关数组的指针。有一些 这种序列化形式的优势。

反序列化可以非常快。 内存在进程之间共享 因此工作进程都可以读取相同的数据而无需复制 它。

在我看来,它比用于并行执行的多处理库更容易使用,尤其是在寻求使用共享内存时,tutorial 中的用法介绍。

【讨论】:

【参考方案3】:

使用文件!

不,真的,使用文件——它们很高效(操作系统将缓存内容),并且允许您处理更大的问题(数据集不必放入 RAM)。

使用任何https://docs.scipy.org/doc/numpy-1.15.0/reference/routines.io.html 将numpy 数组转储/加载到文件中,并且只在进程之间传递文件名。

附:基准序列化方法,取决于中间数组大小,最快的可能是“原始”(无转换开销)或“压缩”(如果文件最终被写入磁盘)或其他东西。 IIRC 加载“原始”文件可能需要提前知道数据格式(尺寸、大小)。

【讨论】:

如果有问题的 numpy 数组是自定义对象中的变量,那将如何工作? 您可以使用内存映射来代替文件。 Python 有 mmap module,numpy 有 memmap 模块 (example)。 @posdef 首先将 numpy 数组从对象写入文件或 mmap。然后,为每个子进程或线程提供它负责的文件/mmap 的部分(偏移量)。使用mmap/numpy.memmap 之类的东西,只有访问的索引才会加载到内存中。 @posdef 你必须保存加载这些对象;鉴于您已经覆盖了 __add__ 方法,我假设实现在您的控制之下。如果每个片段只有一个ndarray,那很简单。如果还有更多,请考虑 pandas 以方便使用,或者修改一些自定义的东西以提高速度。 @knh190 我不熟悉这些库,但这里是文档,其中包含使用数组的 numpy.memmap 示例:docs.scipy.org/doc/numpy/reference/generated/…【参考方案4】:

从 Python 3.8 开始,multiprocessing.shared_memory 支持进程之间的直接内存共享,类似于 C 或 Java 中的“真实”多线程。直接内存共享比通过文件、套接字或数据复制序列化/反序列化共享要快得多。

它的工作原理是提供一个共享内存缓冲区,不同的进程可以通过基本的SharedMemory 类实例或更高级的SharedMemoryManager 类实例来声明和声明变量。可以使用内置的ShareableList 方便地声明基本 python 数据类型中的变量。 numpy.ndarray等高级数据类型中的变量,可以通过在声明时指定内存缓冲区来共享。

numpy.ndarray 为例:

import numpy as np
from multiprocessing import shared_memory

# setting up
data = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
d_shape = (len(data),)
d_type = np.int64
d_size = np.dtype(d_type).itemsize * np.prod(d_shape)

# IN THE MAIN PROCESS
# allocate new shared memory
shm = shared_memory.SharedMemory(create=True, size=d_size)
shm_name = shm.name
# numpy array on shared memory buffer
a = np.ndarray(shape=d_shape, dtype=d_type, buffer=shm.buf)
# copy data into shared memory ndarray once
a[:] = data[:]

# IN ANOTHER PROCESS
# reuse existing shared memory
ex_shm = shared_memory.SharedMemory(name=shm_name)
# numpy array b uses the same memory buffer as a
b = np.ndarray(shape=d_shape, dtype=d_type, buffer=ex_shm.buf)
# changes in b will be reflected in a and vice versa...
ex_shm.close()  # close after using

# IN THE MAIN PROCESS
shm.close()  # close after using
shm.unlink()  # free memory

在上面的代码中,ab 数组使用相同的底层内存,并且可以直接更新相同的内存,这在机器学习中非常有用。但是,您应该注意并发更新问题并决定如何处理它们,方法是使用Lock/partitioned accesses/或接受随机更新(所谓的 HogWild 样式)。

【讨论】:

我在 python 3.9 上运行它并得到:AttributeError: module 'multiprocessing' has no attribute 'shared_memory' @A.Genchev 请尝试from multiprocessing import shared_memory【参考方案5】:

使用多线程。

尝试使用多线程而不是多处理,因为线程可以在本地共享主进程的内存。

如果担心python的GIL机制,或许可以求助numbanogil

【讨论】:

以上是关于如何使用共享内存而不是通过多个进程之间的酸洗来传递对象的主要内容,如果未能解决你的问题,请参考以下文章

“共享内存模型”与“消息传递模型”

如何在多个进程之间共享缓存?

c# 通过内存映射实现文件共享内存

通过使用 mmap() 在进程之间共享内存

Linux:是不是可以在进程之间共享代码?

运行中nginx进程间的关系