pickle.PicklingError: Can't pickle: it's not the same object as



【中文标题】pickle.PicklingError: Can\'t pickle: it\'s not the same object as【英文标题】:pickle.PicklingError: Can't pickle: it's not the same object aspickle.PicklingError: Can't pickle: it's not the same object as 【发布时间】:2021-08-21 15:22:57 【问题描述】:

我们正在尝试将元类用于自定义后端选择(multiprocessing.Processthreading.Thread)。此实现背后的基本原理是扩展 Process/Thread 的功能以供我们自定义使用。虽然以下代码适用于fork(unix 中的默认值)。但是对于spawn(windows 中的默认值),我得到一个错误。

pickle.PicklingError: Can't pickle <class '__main__.DProcess'>: it's not the same object as __main__.DProcess

错误来自pickle module,因为对象不一样。

obj: <class '__main__.DProcess'>, 
obj.__dict__: '__module__': '__main__', 'run': <function DProcess.run at 0x7fa76ccd97a0>, '__doc__': None, '__slotnames__': []
hash(obj): 5875061359185

obj2: <class '__main__.DProcess'>, 
obj2.__dict__: '__module__': '__main__', 'run': <function DProcess.run at 0x7fa76ccd97a0>, '__dict__': <attribute '__dict__' of 'DProcess' objects>, '__weakref__': <attribute '__weakref__' of 'DProcess' objects>, '__doc__': None, 
hash(obj2): 5875061305336


    为什么这两个对象不同?在类对象上从 pickle 模块执行 save_global 不会失败。是因为__call__ 的实现吗?我该如何解决? 为什么不为 fork 执行此检查?


class Backend(type):
    _cache = 

    def __new__(cls, name, bases, dct):
        _cls = super().__new__(cls, name, bases, dct)
        # store the subclass dict to be used during __call__
            name: 'cls': cls, 'name': name, 'bases': bases, 'dct': dct
        return _cls

    def __call__(cls, *args, **kwargs) -> 'Backend':
            # check arg & select the base class
            if args[0] == 'process':
                import multiprocessing
                _cls = multiprocessing.Process
            elif args[0] == 'thread':
                import threading
                _cls = threading.Thread
        except KeyError:
            print('Please pass process or thread as the 1st arg')

        for c in cls.mro()[-2::-1]:
            # pick args from __new__ and call type()
            arg_cls = Backend._cache[c.__name__]['cls']
            arg_name = Backend._cache[c.__name__]['name']
            arg_dct = Backend._cache[c.__name__]['dct']
            _cls = super().__new__(arg_cls, arg_name, (_cls,), arg_dct)

        return type.__call__(_cls, *args[1:], **kwargs)

class DProcess(metaclass=Backend):
    def run(self):
        print('we are in dprocess')

if __name__ == '__main__':
    from multiprocessing import set_start_method as _set_start_method



如果您不需要元类,则不应使用它--有更好的模式可以满足您的需求。首先:你真的需要从线程或进程继承吗?也许更好的选择是将这些作为 DProcess 类的关联属性,然后 - 它可以作为普通类属性工作。

由于两者的重要接口基本上是设置目标可调用,startjoin 你可以为它们创建代理方法,或者直接在类属性中调用方法。


class DProcess():
    def __init__(self, backend):
        if backend == "process":
            self.backend_cls = multiprocessing.Process
        elif backend == "thread":
            self.backend_cls = threading.Thread
        self.worker = self.backend_cls(target=self.run)
    def start(self):
        # or just call "instance.worker.start()" from outside
    def join(self):
        return self.worker.join()
    def run(self):
        print('we are in dprocess')

现在,您的原始代码失败的原因是因为它是错误的:您实际上确实在每次实例化时为 DProcess 创建了一个 new 同级类 DProcess,动态调用元类__call__上的super().__new__

因此,在您的核心中声明的class DProcess 是一类。但是每次您尝试实例化它时,都会创建一个新的类对象,并且它会被实例化——这就是 pickle 所抱怨的。 (虽然我们在这里:fork 的多处理只是在新进程上拥有完全相同的对象,而 Windows 方式必须从头开始一个新进程,并序列化对象以便将它们发送到新进程 - 它不是“检查”——DProcess 的“幽灵兄弟”不能被 Pickle 反序列化,因为它不存在于其他进程中。

现在,如果你真的想让你的类继承自 Thread 或 Process,你可以只创建这两个类,然后使用工厂函数来选择你想要的。 虽然有一个函数来创建两个相似的类并将其放入列表或全局字典中是微不足道的,但 Pickle 不太喜欢:它需要在模块的顶层声明要腌制的实例或类(以便类的限定名称可以让您回到类构造函数)。即使在那里,也无需重复代码 - 您可以使用 mixin 类和您的通用代码,并使用两行代码创建您的 ProcessDworker 和 ThreadDWorker(然后可以通过工厂函数选择):

class Stub:
    """just needed in case some linter or static checker complain about
    these methods not being present in the mixin
    But you could also declare these as @abstractmethod 
    to ensure just a proper class incorporating Thread or Process can
    be instantiated
    def run(self): pass
    def start(self): pass
    def join(self): pass

class DProcessMixin(Stub):
    def __init__(self, *args, **kw):
        # whatever code you need to setup yoru worker - like creating queues, and such
        super().__init__(self, ...)
class ThreadDprocess(DProcessMixin, threading.Thread):
    queue_class = threading.Queue

class ProcessDProcess(DProcessMixin, threadng.
    queue_class = multiprocessing.Queue

def DProcess(*args, backend, **kwargs):
    if backend == "process":
        cls = ProcessDProcess
    elif backend == "thread":
        cls = ThreadDprocess
    return cls(*args, **kwargs)

最后,如果你真的想使用元类,只需感知元类中的__call__ 方法与上一个示例中的Dprocess 工厂函数的位置相同。如果您预先创建这两个类并实际缓存它们,并在模块globals 中使用实名设置它们,它将起作用。但是如果你回到你的“缓存”,你会发现它是假的:它甚至不能真正为同一个元类中的多个类“缓存”信息:你的缓存应该将类名作为键,作为值您可能有另一个字典,其中包含每个类的“名称、基数、命名空间”值。顺便说一句,您还将传递给元类__new__cls arg 与类本身混淆了——这也是错误的。简而言之:我认为您对类机制的工作原理没有足够的了解来围绕它构建代码,而且由于您的问题似乎仅通过组合就可以轻松解决,因此应该不行。


感谢您的回答。我敢肯定,使用工厂方法构建这样的模式会更容易。我想学习使用上面的代码使用“元类”。评论:“你的“缓存”你可以看到它是假的”:这是因为我删除了部分代码以确保问题只关注问题。无论如何,我现在已经将其更新为原始代码。 是的 - 缓存现在看起来好多了。不过,您存储的“cls”值是元类本身,而不是创建的类。

以上是关于pickle.PicklingError: Can't pickle: it's not the same object as的主要内容,如果未能解决你的问题,请参考以下文章



自定义 sklearn 管道变压器给出“pickle.PicklingError”



尝试从 BigQuery 读取表并使用 Airflow 将其保存为数据框时出现 _pickle.PicklingError