multiprocessing.Pool 跨子进程共享内存中只读的大型列表列表

Posted

技术标签:

【中文标题】multiprocessing.Pool 跨子进程共享内存中只读的大型列表列表【英文标题】:multiprocessing.Pool sharing large lists of lists read-only in memory across child process 【发布时间】:2022-01-24 01:06:01 【问题描述】:

我正被这个问题困扰。

我有一大串列表,我想用并行代码访问这些列表以执行 CPU 密集型操作。为了做到这一点,我正在尝试使用multiprocessing.Pool,问题是我还需要在我的子进程中查看这个庞大的列表列表。

由于“列表列表”不规则(例如:[[1, 2], [1, 2, 3]]),我无法将它们存储为mp.Array,并且如前所述,我没有使用mp.Process,所以我没有找出在此任务中使用mp.Manager 的方法。保留这个列表列表对我来说很重要,因为我正在应用一个函数,该函数使用 from operator import itemgetter 基于索引进行查询。

这是我试图实现的一个虚构示例:

import multiprocessing as mp
from operator import itemgetter
import numpy as np

def foo(indexes):
    # here I must guarantee read acess for big_list_of_lists on every child process somehow
    # as this code would work with only with one child process using global variables but would fail
    # with larger data.
    store_tuples = itemgetter(*indexes)(big_list_of_lists)
    return np.mean([item for sublista in store_tuples for item in sublista])

def main():
    # big_list_of_lists is the varible that I want to share across my child process
    big_list_of_lists = [[1, 3], [3, 1, 3], [1, 2], [2, 0]]

    ctx = mp.get_context('spawn')
    # big_list_of_lists elements are also passed as args
    pool = mp.Pool(ctx.Semaphore(mp.cpu_count()).get_value())
    res=list(pool.map(foo, big_list_of_lists))
    pool.close()
    pool.join()

    return res

if __name__ is '__main__':
    print(main())
# desired output is equivalente to:
# a = []
# for i in big_list_of_lists:
#     store_tuples = itemgetter(*i)(big_list_of_lists)
#     a.append(np.mean([item for sublista in store_tuples for item in sublista]))
# 'a' would be equal to [1.8, 1.5714285714285714, 2.0, 1.75]

其他细节:最好使用 python 3.6 实现解决方案,并且必须在 windows 上运行

非常感谢!

【问题讨论】:

【参考方案1】:

使用mp.Managermp.Manager.listmp.Manager.lists 对我来说似乎工作正常。我相信这不会将列表复制到每个进程。

重要的一行是:

big_list_of_lists_proxy = manager.list([manager.list(sublist) for sublist in big_list_of_lists])

您可能想要使用,具体取决于您的用例:

big_list_of_lists_proxy = manager.list(big_list_of_lists)

每个子列表是否应该是代理取决于每个子列表是否很大,以及是否完整读取。如果子列表很大,那么将列表对象传输到需要它的每个进程(O(n) 复杂性)的成本很高,因此应该使用来自管理器的代理列表,但是如果无论如何都需要每个元素,使用代理没有任何好处。

import multiprocessing as mp
from operator import itemgetter
import numpy as np
from functools import partial


def foo(indexes, big_list_of_lists):
    # here I must guarantee read acess for big_list_of_lists on every child process somehow
    # as this code would work with only with one child process using global variables but would fail
    # with larger data.
    store_tuples = itemgetter(*indexes)(big_list_of_lists)
    return np.mean([item for sublista in store_tuples for item in sublista])


def main():
    # big_list_of_lists is the varible that I want to share across my child process
    big_list_of_lists = [[1, 3], [3, 1, 3], [1, 2], [2, 0]]
    ctx = mp.get_context('spawn')
    with ctx.Manager() as manager:
        big_list_of_lists_proxy = manager.list([manager.list(sublist) for sublist in big_list_of_lists])
        # big_list_of_lists elements are also passed as args
        pool = ctx.Pool(ctx.Semaphore(mp.cpu_count()).get_value())
        res = list(pool.map(partial(foo, big_list_of_lists=big_list_of_lists_proxy), big_list_of_lists))
        pool.close()
        pool.join()

    return res


if __name__ == '__main__':
    print(main())
# desired output is equivalente to:
# a = []
# for i in big_list_of_lists:
#     store_tuples = itemgetter(*i)(big_list_of_lists)
#     a.append(np.mean([item for sublista in store_tuples for item in sublista]))
# 'a' would be equal to [1.8, 1.5714285714285714, 2.0, 1.75]

【讨论】:

嗨@Oli,谢谢你的回答! Apperenty 你的解决方案正在工作,我正在做更多的测试以 100% 确定。但我无法理解“洞中的王牌”。您为什么要为主 manager.list 中的每个子列表创建一个 manager.list? 子列表不是绝对必要的manager.lists - 在这个特定的例子中,我认为它没有任何优势,因为从foo 的列表中获取的每个子列表都是全部使用。我的想法是,由于big_list_of_lists 可能很大,子列表也可能很大,因此将它们设置为代理是有利的,这样它们就不必被序列化并发送到需要从中读取的每个进程. 谢谢,这对我的问题很有意义

以上是关于multiprocessing.Pool 跨子进程共享内存中只读的大型列表列表的主要内容,如果未能解决你的问题,请参考以下文章

详解multiprocessing多进程-Pool进程池模块

multiprocessing.Pool:如何在旧进程完成时启动新进程?

如果子进程导致分段错误,multiprocessing.Pool 将挂起

python多进程multiprocessing Pool相关问题

使用 python multiprocessing.Pool 进入睡眠状态的子进程

python小随笔进程池 multiprocessing.Pool的简单实现与踩过的坑