在进程之间共享一个复杂的对象?

Posted

技术标签:

【中文标题】在进程之间共享一个复杂的对象?【英文标题】:Sharing a complex object between processes? 【发布时间】:2011-04-09 22:58:24 【问题描述】:

我有一个相当复杂的 Python 对象,我需要在多个进程之间共享它。我使用multiprocessing.Process 启动这些进程。当我与其中的multiprocessing.Queuemultiprocessing.Pipe 共享一个对象时,它们共享得很好。但是当我尝试与其他非多处理模块对象共享一个对象时,似乎 Python 分叉了这些对象。这是真的吗?

我尝试使用 multiprocessing.Value。但我不确定应该是什么类型?我的对象类称为 MyClass。但是当我尝试multiprocess.Value(MyClass, instance) 时,它失败了:

TypeError: this type has no size

知道发生了什么吗?

【问题讨论】:

相关:***.com/questions/659865/… 【参考方案1】:

经过大量研究和测试,我发现“经理”在非复杂对象级别完成这项工作。

下面的代码显示对象inst在进程之间共享,这意味着inst的属性var在子进程更改时在外部更改。

from multiprocessing import Process, Manager
from multiprocessing.managers import BaseManager

class SimpleClass(object):
    def __init__(self):
        self.var = 0

    def set(self, value):
        self.var = value

    def get(self):
        return self.var


def change_obj_value(obj):
    obj.set(100)


if __name__ == '__main__':
    BaseManager.register('SimpleClass', SimpleClass)
    manager = BaseManager()
    manager.start()
    inst = manager.SimpleClass()

    p = Process(target=change_obj_value, args=[inst])
    p.start()
    p.join()

    print inst                    # <__main__.SimpleClass object at 0x10cf82350>
    print inst.get()              # 100

好的,如果你只需要分享简单的对象,上面的代码就足够了

为什么不复杂?因为如果您的对象是嵌套的(对象内的对象)它可能会失败

from multiprocessing import Process, Manager
from multiprocessing.managers import BaseManager

class GetSetter(object):
    def __init__(self):
        self.var = None

    def set(self, value):
        self.var = value

    def get(self):
        return self.var


class ChildClass(GetSetter):
    pass

class ParentClass(GetSetter):
    def __init__(self):
        self.child = ChildClass()
        GetSetter.__init__(self)

    def getChild(self):
        return self.child


def change_obj_value(obj):
    obj.set(100)
    obj.getChild().set(100)


if __name__ == '__main__':
    BaseManager.register('ParentClass', ParentClass)
    manager = BaseManager()
    manager.start()
    inst2 = manager.ParentClass()

    p2 = Process(target=change_obj_value, args=[inst2])
    p2.start()
    p2.join()

    print inst2                    # <__main__.ParentClass object at 0x10cf82350>
    print inst2.getChild()         # <__main__.ChildClass object at 0x10cf6dc50>
    print inst2.get()              # 100
    #good!

    print inst2.getChild().get()   # None
    #bad! you need to register child class too but there's almost no way to do it
    #even if you did register child class, you may get PicklingError :)

我认为这种行为的主要原因是因为Manager 只是建立在管道/队列等低级通信工具之上的直板。

因此,这种方法推荐用于多处理情况。如果您可以使用 lock/semaphore/pipe/queue 等低级工具或 Redis 队列Redis 发布/订阅 /strong> 用于复杂的用例(只是我的建议,哈哈)。

【讨论】:

如何共享复杂对象? 我认为这是一个更好的答案,因为实际代码,2个简单和复杂的例子。所选答案中链接的文档并没有很好地解释这一点。能够使用此策略通过 Tensorboard 编写器。我要澄清一下,如果自定义对象考虑到并行处理(例如 Tensorboard 编写器),这很好。【参考方案2】:

您可以使用 Python 的 multiprocessing "Manager" 类和您定义的代理类来执行此操作。请参阅 Python 文档中的 Proxy Objects。

您要做的是为您的自定义对象定义一个代理类,然后使用“远程管理器”共享该对象——查看“Using a remote manager”部分中同一链接文档页面中的示例,其中文档展示了如何共享远程队列。您将做同样的事情,但您对your_manager_instance.register() 的调用将在其参数列表中包含您的自定义代理类。

通过这种方式,您将设置服务器以与自定义代理共享自定义对象。您的客户端需要访问服务器(同样,请参阅优秀的文档示例,了解如何设置客户端/服务器对远程队列的访问,但您不是共享 Queue,而是共享对特定类的访问)。

【讨论】:

这个问题中的代码帮助我补充了文档页面。这是一个带有自定义类的示例。 ***.com/questions/11951750/…【参考方案3】:

这是我专门为此制作的一个 python 包(在进程之间共享复杂对象)。

git:https://github.com/dRoje/pipe-proxy

这个想法是你为你的对象创建一个代理并将它传递给一个进程。然后你使用代理,就像你有对原始对象的引用一样。虽然你只能使用方法调用,所以访问对象变量是通过抛出 setter 和 getter 来完成的。

假设我们有一个名为“example”的对象,创建代理和代理监听器很容易:

from pipeproxy import proxy 
example = Example() 
exampleProxy, exampleProxyListener = proxy.createProxy(example) 

现在您将代理发送到另一个进程。

p = Process(target=someMethod, args=(exampleProxy,)) p.start()

在其他过程中使用它,就像使用原始对象一样(示例):

def someMethod(exampleProxy):
    ...
    exampleProxy.originalExampleMethod()
    ...

但你确实要在主进程中听:

exampleProxyListener.listen()

阅读更多内容并在此处查找示例:

http://matkodjipalo.com/index.php/2017/11/12/proxy-solution-python-multiprocessing/

【讨论】:

这个还在维护吗?【参考方案4】:

在 Python 3.6 中,文档说:

3.6 版更改:共享对象可以嵌套。例如,共享容器对象(如共享列表)可以包含其他共享对象,这些共享对象都将由 SyncManager 管理和同步。

只要通过 SyncManager 创建实例,您就应该能够使对象相互引用。不过,在另一种类型的对象的方法中动态创建一种类型的对象可能仍然是不可能的或非常棘手的。

编辑:我在 python 3.6.5 和 3.6.7 中偶然发现了这个问题Multiprocessing managers and custom classes。需要检查python 3.7

编辑 2:由于其他一些问题,我目前无法使用 python3.7 进行测试。 https://***.com/a/50878600/7541006 中提供的解决方法对我来说很好

【讨论】:

【参考方案5】:

我尝试使用 BaseManager 并注册我的自定义类以使其快乐,并解决了上面提到的关于嵌套类的问题。

我认为主要原因与所说的嵌套类无关,而是python采用低级的通信机制。原因是python使用一些类似套接字的通信机制来同步在低级别的服务器进程中修改自定义类。我认为它封装了一些 rpc 方法,使其对用户透明,就好像他们调用了嵌套类对象的本地方法一样。

所以,当你想修改、检索你的自定义对象或一些第三方对象时,你应该在你的进程中定义一些接口来与之通信,而不是直接获取或设置值。

然而在嵌套对象中操作多嵌套对象时,可以忽略上面提到的问题,就像你在普通例程中所做的那样,因为你在注册类中的嵌套对象不再是代理对象,on该操作将不会再次通过类似套接字的通信例程并且已本地化。

这是我为解决问题而编写的可行代码。

from multiprocessing import Process, Manager, Lock
from multiprocessing.managers import BaseManager
import numpy as np

class NestedObj(object):
       def __init__(self):
                self.val = 1

class CustomObj(object):
        def __init__(self, numpy_obj):
                self.numpy_obj = numpy_obj
                self.nested_obj = NestedObj()

        def set_value(self, p, q, v):
                self.numpy_obj[p, q] = v

        def get_obj(self):
                return self.numpy_obj

        def get_nested_obj(self):
                return self.nested_obj.val

class CustomProcess(Process):
        def __init__(self, obj, p, q, v):
                super(CustomProcess, self).__init__()
                self.obj = obj
                self.index = p, q
                self.v = v

        def run(self):
                self.obj.set_value(*self.index, self.v)



if __name__=="__main__":
        BaseManager.register('CustomObj', CustomObj)
        manager = BaseManager()
        manager.start()
        data = [[0 for x in range(10)] for y in range(10)]
        matrix = np.matrix(data)
        custom_obj = manager.CustomObj(matrix)
        print(custom_obj.get_obj())
        process_list = []
        for p in range(10):
                for q in range(10):
                        proc = CustomProcess(custom_obj, p, q, 10*p+q)
                        process_list.append(proc)
        for x in range(100):
                process_list[x].start()
        for x in range(100):
                process_list[x].join()
        print(custom_obj.get_obj())
        print(custom_obj.get_nested_obj())

【讨论】:

有任何问题请发至zcj5918@163.com 如何让它与远程管理器一起工作?【参考方案6】:

为了避免共享资源的一些麻烦,您可以尝试在由例如映射的函数的返回语句中收集需要访问单例资源的数据。 pool.imap_unordered 然后在一个循环中进一步处理它来检索部分结果:

for result in in pool.imap_unordered(process_function, iterable_data):
    do_something(result)

如果返回的数据不多,那么执行此操作可能不会有太多开销。

【讨论】:

以上是关于在进程之间共享一个复杂的对象?的主要内容,如果未能解决你的问题,请参考以下文章

多处理 - 共享一个复杂的对象

与另一个进程共享复杂对象

在进程之间共享活动对象的正确方法是啥?

多进程在进程之间共享不可序列化的对象

在工作进程之间共享对象

锁对象只能通过继承在进程之间共享