多处理 - 共享一个复杂的对象

Posted

技术标签:

【中文标题】多处理 - 共享一个复杂的对象【英文标题】:multiprocessing - sharing a complex object 【发布时间】:2014-01-24 04:39:51 【问题描述】:

我有一个类似dict 的大型对象,需要在多个工作进程之间共享。每个工作人员读取对象中信息的随机子集并对其进行一些计算。我想避免复制大对象,因为我的机器很快就会耗尽内存。

我正在使用this SO question 的代码,并对其进行了一些修改以使用固定大小的进程池,这更适合我的用例。然而,这似乎打破了它。

from multiprocessing import Process, Pool
from multiprocessing.managers import BaseManager

class numeri(object):
    def __init__(self):
        self.nl = []

    def getLen(self):
        return len(self.nl)

    def stampa(self):
        print self.nl

    def appendi(self, x):
        self.nl.append(x)

    def svuota(self):
        for i in range(len(self.nl)):
            del self.nl[0]

class numManager(BaseManager):
    pass

def produce(listaNumeri):
    print 'producing', id(listaNumeri)
    return id(listaNumeri)

def main():
    numManager.register('numeri', numeri, exposed=['getLen', 'appendi',
                        'svuota', 'stampa'])
    mymanager = numManager()
    mymanager.start()
    listaNumeri = mymanager.numeri()
    print id(listaNumeri)

    print '------------ Process'
    for i in range(5):
        producer = Process(target=produce, args=(listaNumeri,))
        producer.start()
        producer.join()

    print '--------------- Pool'
    pool = Pool(processes=1)
    for i in range(5):
        pool.apply_async(produce, args=(listaNumeri,)).get()

if __name__ == '__main__':
    main()

输出是

4315705168
------------ Process
producing 4315705168
producing 4315705168
producing 4315705168
producing 4315705168
producing 4315705168
--------------- Pool
producing 4299771152
producing 4315861712
producing 4299771152
producing 4315861712
producing 4299771152

如您所见,在第一种情况下,所有工作进程都获得相同的对象(通过 id)。第二种情况,id不一样。这是否意味着正在复制对象?

附:我认为这并不重要,但我使用的是joblib,它在内部使用了Pool

from joblib import delayed, Parallel

print '------------- Joblib'
        Parallel(n_jobs=4)(delayed(produce)(listaNumeri) for i in range(5))

哪个输出:

------------- Joblib
producing 4315862096
producing 4315862288
producing 4315862480
producing 4315862672
producing 4315862352

【问题讨论】:

检查zeromq.org,它以简单的方式在进程之间共享信息。 【参考方案1】:

恐怕这里几乎没有什么能像你希望那样工作:-(

首先请注意,由不同进程产生的相同id()告诉您没有关于对象是否真的是同一个对象。每个进程都有自己的虚拟地址空间,由操作系统分配。两个进程中的相同虚拟地址可以引用完全不同的物理内存位置。您的代码是否产生相同的id() 输出几乎完全是偶然的。在多次运行中,有时我会在 Process 部分看到不同的 id() 输出,在 Pool 部分看到重复的 id() 输出,反之亦然,或两者兼而有之。

其次,Manager 提供语义共享,但不提供物理共享。 numeri 实例的数据存在于管理器进程中。您的所有工作进程都会看到(副本)代理对象。这些是瘦包装器,它们转发要由管理器进程执行的所有操作。这涉及大量进程间通信,以及管理器进程内的序列化。这是编写非常慢的代码的好方法;-) 是的,numeri 数据只有一个副本,但所有工作都由单个进程(管理器进程)完成。

要更清楚地看到这一点,请进行@martineau 建议的更改,并将get_list_id() 更改为:

def get_list_id(self):  # added method
    import os
    print("get_list_id() running in process", os.getpid())
    return id(self.nl)

这是示例输出:

41543664
------------ Process
producing 42262032
get_list_id() running in process 5856
with list_id 44544608
producing 46268496
get_list_id() running in process 5856
with list_id 44544608
producing 42262032
get_list_id() running in process 5856
with list_id 44544608
producing 44153904
get_list_id() running in process 5856
with list_id 44544608
producing 42262032
get_list_id() running in process 5856
with list_id 44544608
--------------- Pool
producing 41639248
get_list_id() running in process 5856
with list_id 44544608
producing 41777200
get_list_id() running in process 5856
with list_id 44544608
producing 41776816
get_list_id() running in process 5856
with list_id 44544608
producing 41777168
get_list_id() running in process 5856
with list_id 44544608
producing 41777136
get_list_id() running in process 5856
with list_id 44544608

清楚吗?每次获得相同列表 id 的原因是不是因为每个工作进程具有相同的self.nl 成员,这是因为所有numeri 方法都运行在 单个进程(管理器进程)。这就是列表 id 始终相同的原因。

如果您在 Linux 系统(支持 fork() 的操作系统)上运行,更好的办法是忘记所有这些 Manager 内容并在启动任何工作进程之前在模块级别创建复杂对象.然后工作人员将继承您的复杂对象(地址空间副本)。通常的写时复制fork() 语义将使其尽可能节省内存。如果不需要将突变折叠回复杂对象的主程序副本中,这就足够了。如果确实需要重新折叠突变,那么您又需要大量的进程间通信,而multiprocessing 的吸引力也就相应降低了。

这里没有简单的答案。不要射击信使;-)

【讨论】:

谢谢,所有三个答案都很棒,我希望我能接受所有这些。我认为这是编写慢代码的好方法,我只是尝试过。我的应用程序当前受内存限制,因为我没有足够的 RAM 来运行与内核一样多的工作人员。我希望通过减少内存使用量,我能够让更多的工作人员启动并运行。情况确实如此,只是工人速度变慢了三倍。 我必须同意这是最好的答案——但我对 import this Peters 先生的期望不会少。 ;-) @mbatchkarov,虽然Manager 听起来不适合您的问题,但还有很多其他方法可以使用multiprocessing。也许您可以打开另一个描述问题的问题,并使用适当削减的可执行代码。例如,也许主程序可以选择“随机样本”并仅将这些样本传递给工作人员,而不是传递整个对象。但更具体的答案需要更具体的问题;-) 又是好点。 ***.com/questions/20971191/… @TimPeters 我想与子进程共享父进程的 numpy 随机状态。我试过使用Manager,但仍然没有运气。你能看看我的问题here,看看你能不能提供一个解决方案?如果我每次生成随机数时都执行np.random.seed(None),我仍然可以获得不同的随机数,但这不允许我使用父进程的随机状态,这不是我想要的。非常感谢任何帮助。【参考方案2】:

如果您在代码中添加两行,您会发现这种行为非常奇怪:

def produce(listaNumeri):
    print 'producing', id(listaNumeri)
    print listaNumeri # <- New line
    return id(listaNumeri)


def main():
    numManager.register('numeri', numeri, exposed=['getLen', 'appendi', 'svuota', 'stampa', 'getAll'])
    mymanager = numManager()
    mymanager.start()
    listaNumeri = mymanager.numeri()
    print listaNumeri # <- New line
    print id(listaNumeri)

这将为您提供以下输出:

<__main__.numeri object at 0x103892990>
4354247888
------------ Process
producing 4354247888
<__main__.numeri object at 0x103892990>
producing 4354247888
<__main__.numeri object at 0x103892990>
producing 4354247888
<__main__.numeri object at 0x103892990>
producing 4354247888
<__main__.numeri object at 0x103892990>
producing 4354247888
<__main__.numeri object at 0x103892990>
--------------- Pool
producing 4352988560
<__main__.numeri object at 0x103892990>
producing 4354547664
<__main__.numeri object at 0x103892990>
producing 4352988560
<__main__.numeri object at 0x103892990>
producing 4354547664
<__main__.numeri object at 0x103892990>
producing 4352988560
<__main__.numeri object at 0x103892990>

如您所见,对象每次都是相同的但 id 并不总是相同的。此外,查看池部分使用的 id - 它在两个 id 之间来回切换。

答案是在produce 期间实际打印出__class__ 属性。每次运行,__class__ 实际上是

<class 'multiprocessing.managers.AutoProxy[numeri]'>

所以numeri 对象每次都被包裹在一个AutoProxy 中,而AutoProxy 并不总是相同的。但是,被包装的numeri 对象在每次调用produce 时都是相同的。如果您在produce 中调用一次appendi 方法,那么listaNumeri 将在您的程序结束时得到10 个项目。

【讨论】:

【参考方案3】:

您将对象实例 numeri 与其经理 listaNumeri 混淆了。这可以通过对代码进行一些小的修改来说明:

首先将get_list_id方法添加到class numeri(object),它返回正在使用的实际内部数据结构的id

    ...                                                   
    def get_list_id(self):  # added method
        return id(self.nl)

然后修改produce()使用:

def produce(listaNumeri):
    print 'producing', id(listaNumeri)
    print ' with list_id', listaNumeri.get_list_id()  # added
    return id(listaNumeri)

最后,确保将新方法作为numManager 接口的一部分公开:

def main():
    numManager.register('numeri', numeri, exposed=['getLen', 'appendi',
                                                   'svuota', 'stampa',
                                                   'get_list_id'])  # added
    ...                                                   

之后你会看到类似下面的输出:

13195568
------------ Process
producing 12739600
 with list_id 13607080
producing 12739600
 with list_id 13607080
producing 12739600
 with list_id 13607080
producing 12739600
 with list_id 13607080
producing 12739600
 with list_id 13607080
--------------- Pool
producing 13690384
 with list_id 13607080
producing 13691920
 with list_id 13607080
producing 13691888
 with list_id 13607080
producing 13691856
 with list_id 13607080
producing 13691824
 with list_id 13607080

如图所示,即使每个 Pool 进程有不同的 Manager 对象,它们都使用(共享)相同的“托管”数据对象。

【讨论】:

以上是关于多处理 - 共享一个复杂的对象的主要内容,如果未能解决你的问题,请参考以下文章

跨多进程共享基于异步等待协程的复杂对象

多处理中的共享内存对象

多线程,共享HttpServeltRequest对象时候,对象丢失现象

如何在 python 的多处理中创建嵌套共享对象?

python中的多处理-在多个进程之间共享大对象(例如pandas数据框)

Java 多线程共享对象 - 需要设计模式