使用 multiprocessing.map 时如何节省内存?

Posted

技术标签:

【中文标题】使用 multiprocessing.map 时如何节省内存?【英文标题】:How to save memory when using multiprocessing.map? 【发布时间】:2021-12-04 11:14:42 【问题描述】:

我有一个函数,它基本上采用一对整数 (x,y) 并生成一个包含 3000 个元素的向量。所以,我用了:

pool_obj=multiprocessing.Pool()
result=np.array(pool_obj.map(f, RANGE))

其中RANGE 是 x,y 可能分别取的两组值的笛卡尔积。

我的问题是我只需要np.sum(result,axis=0) 3000 长。我想对所有 x 和 y 求和。总共有 1000x1000 对 (x,y)。使用这种方法将创建一个 1000000x3000 大并且超过内存限制的超大数组。

我该如何解决这个问题?

【问题讨论】:

如果RANGE 是一个生成器而不是一个数组,pool.map 应该只根据需要动态生成输入。那么只要f的输出很小,就不应该有过多的内存使用。 Pool.map 的文档指出“请注意,对于非常长的迭代,它可能会导致高内存使用。考虑使用带有显式块大小选项的 imap()imap_unordered() 以提高效率。 ”。你试过这些吗? 还有一个额外的要求,即迭代器必须具有__len__ 方法,否则池提前计算所有输入。这是因为它在确定块大小、输出大小以及检查输入是否为空时尝试使用len 非常感谢! imap_unordered 正是我想要的! 【参考方案1】:

使用x, y 对的生成器减少输入大小,同时使用imap 减少输出大小的示例(在返回主进程时减少数据)

import multiprocessing as mp
import numpy as np
from time import sleep

class yield_xy:
    """
    Generator for x, y pairs prevents all pairs of x and y from being generated
    at the start of the map call. In this example it would only be a million
    floats, so on the order of 4-8 Mb of data, but if x, and y are bigger
    (or maybe you have a z) this could dramatically reduce input data size
    """
    def __init__(self, x, y):
        self._x = x
        self._y = y
        
    def __len__(self): #map, imap, map_async, starmap etc.. need the input size ahead of time
        return len(self._x) * len(self._y)
    
    def __iter__(self): #simple generator needs storage x + y rather than x * y
        for x in self._x:
            for y in self._y:
                yield x, y

def task(args):
    x, y = args
    return (np.zeros(3000) + x) * y


def main():
    x = np.arange(0,1000)
    y = np.sin(x)
    
    out = np.zeros(3000)
    
    with mp.Pool() as pool:
        for result in pool.imap(task, yield_xy(x, y)):
            out += result #accumulate results
    return out


if __name__ == "__main__":
    result = main()

【讨论】:

以上是关于使用 multiprocessing.map 时如何节省内存?的主要内容,如果未能解决你的问题,请参考以下文章

使用 UIKeyCommand 时如何处理大小写字符

使用 ActiveMerchant 时如何处理超时?

使用 Kafka Streams DSL 时如何处理错误和不提交

使用 seaborn 绘图时如何处理缺失值?

使用路径前缀规则时如何用 Traefik 重写路径?

使用 git 子模块时如何处理绝对导入