多进程在进程之间共享不可序列化的对象

Posted

技术标签:

【中文标题】多进程在进程之间共享不可序列化的对象【英文标题】:Multiprocessing Share Unserializable Objects Between Processes 【发布时间】:2014-03-24 23:10:26 【问题描述】:

三个问题可能重复(但过于具体):

How to properly set up multiprocessing proxy objects for objects that already exist Share object with process (multiprocess) Can I use a ProcessPoolExecutor from within a Future?

通过回答这个问题可以回答所有其他三个问题。 希望我说清楚:

一旦我在由多处理创建的某个进程中创建了一个对象:

    如何将对该对象的引用传递给其他进程? (不那么重要)如何确保在我持有引用时此进程不会终止?

示例 1(已解决)

from concurrent.futures import *

def f(v):
    return lambda: v * v

if __name__ == '__main__':
    with ThreadPoolExecutor(1) as e: # works with ThreadPoolExecutor
        l = list(e.map(f, [1,2,3,4]))
    print([g() for g in l]) # [1, 4, 9, 16]

示例 2

假设f 返回一个具有可变状态的对象。这个相同的对象应该可以从其他进程访问。

示例 3

我有一个对象,它有一个打开的文件和一个锁 - 我如何授予对其他进程的访问权限?

提醒

我不希望出现此特定错误。或针对此特定用例的解决方案。该解决方案应该足够通用,以便在进程之间共享不可移动的对象。对象可能在任何进程中创建。使所有对象都可移动并保留身份的解决方案也很好。

欢迎任何提示,任何指向如何实现解决方案的部分解决方案或代码片段都是值得的。所以我们可以一起制定解决方案。

这是一个尝试来解决这个问题,但没有多处理:https://github.com/niccokunzmann/pynet/blob/master/documentation/done/tools.rst

问题

您希望其他进程如何处理引用?

引用可以传递给使用多处理创建的任何其他进程(重复 3)。可以访问属性,调用引用。访问的属性可能是也可能不是代理。

只使用代理有什么问题?

也许没有问题,只有挑战。我的印象是代理有一个管理器,并且一个管理器有自己的进程,因此不可序列化的对象必须被序列化和传输(使用 StacklessPython/fork 部分解决)。 还存在特殊对象的代理 - 为所有对象构建代理很难但并非不可能(可解决)。

解决方案? - 代理 + 经理?

Eric Urban 表明序列化不是问题。真正的挑战在于示例 2 和 3:状态的同步。我对解决方案的想法是为经理创建一个特殊的代理类。这个代理类

    采用不可序列化对象的构造函数 获取一个可序列化的对象并将其传输到管理器进程。 (问题)根据1.不可序列化对象必须在管理器进程中创建。

【问题讨论】:

应该编辑问题以解释您希望其他进程对引用执行什么操作。只将它们传回原始流程? 已编辑。如果这不能回答问题,请告诉我,谢谢。 只使用代理有什么问题? 我编辑了这个问题。谢谢你的回答,很有见地。 所以我想在上一篇文章中说的是,我没有看到任何例子表明将对象转移到管理器而不是首先在那里创建它真的更好。 【参考方案1】:

大多数时候,将现有对象的引用传递给另一个进程并不是很理想。相反,您创建要在进程之间共享的类:

class MySharedClass:
    # stuff...

然后你做一个这样的代理管理器:

import multiprocessing.managers as m
class MyManager(m.BaseManager):
    pass # Pass is really enough. Nothing needs to be done here.

然后您在该 Manager 上注册您的课程,如下所示:

MyManager.register("MySharedClass", MySharedClass)

然后,一旦管理器被实例化并启动,使用manager.start(),您就可以使用manager.MySharedClass 创建您的类的共享实例。这应该适用于所有需求。返回的代理与原始对象完全一样,除了documentation 中描述的一些例外情况。

【讨论】:

这太棒了!我对其进行了测试,并且效果很好。 codepad.org/zW2LU6XV 仍然存在并发问题。但是这些都可以。 这并不能解决问题。我已将此代码用作具有(模拟)数据库游标的 MySharedClass 的模板。如果我尝试在 MySharedClass 方法中返回它,我会收到 Unserializable Message 错误。 @sinwav 据我了解,在进程之间共享数据库游标是不可能的。无论您在进程之间使用哪种传输机制,在某些时候对象都需要以某种方式进行序列化。 Python 使用酸洗来达到这个目的。如果你不能腌制某些东西,那是有原因的。使用数据库游标的问题是,游标仅在创建它的连接上有效,但该连接仅在一个进程中打开。由此得出结论,数据库游标仅在创建它的进程中有效。此处无法分享。【参考方案2】:

在阅读此答案之前,请注意其中解释的解决方案很糟糕。请注意答案末尾的警告。

我找到了一种通过multiprocessing.Array 共享对象状态的方法。 所以我制作了这个类,它通过所有进程透明地共享它的状态:

import multiprocessing as m
import pickle

class Store:
    pass

class Shareable:
    def __init__(self, size = 2**10):
        object.__setattr__(self, 'store', m.Array('B', size))
        o = Store() # This object will hold all shared values
        s = pickle.dumps(o)
        store(object.__getattribute__(self, 'store'), s)

    def __getattr__(self, name):
        s = load(object.__getattribute__(self, 'store'))
        o = pickle.loads(s)
        return getattr(o, name)

    def __setattr__(self, name, value):
        s = load(object.__getattribute__(self, 'store'))
        o = pickle.loads(s)
        setattr(o, name, value)
        s = pickle.dumps(o)
        store(object.__getattribute__(self, 'store'), s)

def store(arr, s):
    for i, ch in enumerate(s):
        arr[i] = ch

def load(arr):
    l = arr[:]
    return bytes(arr)

您可以将此类(及其子类)的实例传递给任何其他进程,并且它将通过所有进程同步其状态。 这是用这段代码测试过的:

class Foo(Shareable):
    def __init__(self):
        super().__init__()
        self.f = 1

    def foo(self):
        self.f += 1

def f(s):
    s.f += 1

if __name__ == '__main__':
    import multiprocessing as m
    import time
    s = Foo()
    print(s.f)
    p = m.Process(target=f, args=(s,))
    p.start()
    time.sleep(1)
    print(s.f)

这个类的“魔力”在于它将所有属性存储在类Store 的另一个实例中。这门课不是很特别。它只是一些可以具有任意属性的类。 (一个 dict 也可以。)

但是,这个类有一些非常讨厌的怪癖。我找到了两个。

第一个怪癖是您必须指定Store 实例最多占用多少空间。这是因为multiprocessing.Array 具有静态大小。所以里面可以腌制的对象只能是数组那么大。

第二个怪癖是您不能将此类与 ProcessPoolExecutors 或简单的 Pools 一起使用。如果你尝试这样做,你会得到一个错误:

>>> s = Foo()
>>> with ProcessPoolExecutor(1) as e:
...     e.submit(f, args=(s,))
... 
<Future at 0xb70fe20c state=running>
Traceback (most recent call last):
<omitted>
RuntimeError: SynchronizedArray objects should only be shared between processes through inheritance

警告 您可能不应该使用这种方法,因为它使用无法控制的内存量,与使用代理相比过于复杂(请参阅我的其他答案)并且可能会以惊人的方式崩溃。

【讨论】:

【参考方案3】:

只需使用无堆栈 python。你可以用pickle 序列化几乎任何东西,包括函数。在这里,我使用pickle 模块对lambda 进行序列化和反序列化。这类似于您在示例中尝试执行的操作。

这里是 Stackless Python http://www.stackless.com/wiki/Download的下载链接

Python 2.7.5 Stackless 3.1b3 060516 (default, Sep 23 2013, 20:17:03) 
[GCC 4.6.3] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> f = 5
>>> g = lambda : f * f
>>> g()
25
>>> import pickle
>>> p = pickle.dumps(g)
>>> m = pickle.loads(p)
>>> m()
25
>>> 

【讨论】:

+1 这很好,但是 1. 保留身份 m 是 g 和 2. 如果我序列化函数并在另一个进程中反序列化它,它会在原始进程中被调用吗? - 不。但是,如果您需要在进程关闭时保存函数,这绝对是一个不错的解决方案。

以上是关于多进程在进程之间共享不可序列化的对象的主要内容,如果未能解决你的问题,请参考以下文章

Java多线程详解

Java多线程

与另一个进程共享复杂对象

对象的传递为啥要 序列化 呢

关于tcp连接对象在多进程中的错误:pickle.PicklingError

Linux多进程与多线程之间的区别