Python多处理PicklingError:无法腌制<type'function'>

Posted

技术标签:

【中文标题】Python多处理PicklingError:无法腌制<type\'function\'>【英文标题】:Python multiprocessing PicklingError: Can't pickle <type 'function'>Python多处理PicklingError:无法腌制<type'function'> 【发布时间】:2012-01-10 14:28:25 【问题描述】:

很抱歉,我无法用更简单的示例重现该错误,并且我的代码太复杂而无法发布。如果我在 IPython shell 而不是常规的 Python 中运行程序,一切都会顺利进行。

我查阅了有关此问题的一些以前的注释。它们都是由使用池调用类函数中定义的函数引起的。但对我来说不是这样。

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib64/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

如果有任何帮助,我将不胜感激。

更新:我pickle的函数定义在模块的顶层。虽然它调用了一个包含嵌套函数的函数。即f()调用g()调用h(),它有一个嵌套函数i(),我正在调用pool.apply_async(f)f()g()h() 都是在顶层定义的。我用这种模式尝试了更简单的例子,但它确实有效。

【问题讨论】:

***/接受的答案很好,但这可能意味着您需要重新构建代码,这可能会很痛苦。我建议任何有此问题的人也阅读使用dillpathos 的附加答案。但是,在使用 vtkobjects 时,我对任何解决方案都不走运 :( 任何人都设法在并行处理 vtkPolyData 时运行 python 代码? 【参考方案1】:

这是list of what can be pickled。特别是,只有在模块的顶层定义的函数才是可挑选的。

这段代码:

import multiprocessing as mp

class Foo():
    @staticmethod
    def work(self):
        pass

if __name__ == '__main__':   
    pool = mp.Pool()
    foo = Foo()
    pool.apply_async(foo.work)
    pool.close()
    pool.join()

产生与您发布的错误几乎相同的错误:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

问题在于pool 方法都使用mp.SimpleQueue 将任务传递给工作进程。通过mp.SimpleQueue 的所有内容都必须是可挑选的,而foo.work 是不可挑选的,因为它没有在模块的顶层定义。

可以通过在顶层定义一个函数来修复它,调用foo.work()

def work(foo):
    foo.work()

pool.apply_async(work,args=(foo,))

请注意,foo 是可挑选的,因为 Foo 是在顶层定义的,而 foo.__dict__ 是可挑选的。

【讨论】:

感谢您的回复。我更新了我的问题。不过,我不认为这是原因 要获得 PicklingError 必须在队列中放入不可腌制的东西。它可以是函数或其参数。要了解更多有关问题的信息,我建议复制您的程序,然后开始对其进行精简,使其更简单更简单,每次重新运行程序以查看问题是否仍然存在。当它变得非常简单时,您要么自己发现了问题,要么可以在这里发布一些内容。 另外:如果你在模块的顶层定义了一个函数,但它被修饰了,那么引用将指向修饰器的输出,无论如何你都会得到这个错误。 只晚了 5 年,但我刚刚遇到了这个问题。事实证明,“***”必须比平时更直接地理解:在我看来,函数定义必须在 池的初始化之前(即pool = Pool() 行here )。我没想到,这可能是OP问题持续存在的原因。 特别是,函数只有在模块的顶层定义时才可挑选。 似乎将functool.partial 应用于顶层的结果函数也是可腌制的,即使它是在另一个函数中定义的。【参考方案2】:

我会使用pathos.multiprocesssing,而不是multiprocessingpathos.multiprocessing 是使用 dillmultiprocessing 的一个分支。 dill 可以在 python 中序列化几乎任何东西,因此您可以并行发送更多内容。 pathos fork 还可以直接使用多个参数函数,这是您对类方法的需要。

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool(4)
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
>>> 
>>> class Foo(object):
...   @staticmethod
...   def work(self, x):
...     return x+1
... 
>>> f = Foo()
>>> p.apipe(f.work, f, 100)
<processing.pool.ApplyResult object at 0x10504f8d0>
>>> res = _
>>> res.get()
101

在此处获取pathos(如果您愿意,dill): https://github.com/uqfoundation

【讨论】:

辛苦了。对于其他人,我通过以下方式安装了这两个库:sudo pip install git+https://github.com/uqfoundation/dill.git@mastersudo pip install git+https://github.com/uqfoundation/pathos.git@master @AlexanderMcFarlane 我不会安装带有sudo 的python 包(尤其是来自github 等外部资源)。相反,我建议运行:pip install --user git+... 仅使用 pip install pathos 并不能正常工作,并给出以下消息:Could not find a version that satisfies the requirement pp==1.5.7-pathos (from pathos) pip install pathos 现在可以工作了,pathos 与 python 3 兼容。 @DanielGoldfarb: multiprocessmultiprocessing 的一个分支,其中dill 在代码中的多个位置替换了pickle……但基本上就是这样。 pathosmultiprocess 上提供了一些额外的 API 层,并且还有额外的后端。但是,这就是它的要点。【参考方案3】:

multiprocessing 出现此问题时,一个简单的解决方案是从Pool 切换到ThreadPool。这可以在不更改导入以外的代码的情况下完成-

from multiprocessing.pool import ThreadPool as Pool

这是因为 ThreadPool 与主线程共享内存,而不是创建一个新进程——这意味着不需要酸洗。

这种方法的缺点是 python 不是处理线程的最佳语言 - 它使用称为全局解释器锁的东西来保持线程安全,这可能会减慢这里的一些用例。但是,如果您主要与其他系统交互(运行 HTTP 命令、与数据库通信、写入文件系统),那么您的代码可能不受 CPU 约束并且不会受到太大影响。事实上,我在编写 HTTP/HTTPS 基准测试时发现,这里使用的线程模型具有更少的开销和延迟,因为创建新进程的开销远高于创建新线程的开销,否则程序只是等待 HTTP回复。

因此,如果您在 python 用户空间中处理大量内容,这可能不是最好的方法。

【讨论】:

但是你只使用一个 CPU(至少对于使用 GIL 的常规 Python 版本),这有点违背目的。 这真的取决于目的是什么。 Global Interpreter Lock 确实意味着一次只有一个实例可以运行 python 代码,但对于严重阻塞的操作(文件系统访问、下载大文件或多个文件、运行外部代码),GIL 最终不是问题。在某些情况下,打开新进程(而不是线程)的开销超过了 GIL 开销。 确实如此,谢谢。您仍然可能希望在答案中包含一个警告。如今,处理能力的提高主要以更多而不是更强大的 CPU 内核的形式出现,从多核切换到单核执行是一个相当重要的副作用。 好点 - 我已经用更多细节更新了答案。我确实想指出,虽然切换到线程多处理不会使 python 仅在单个内核上运行。【参考方案4】:

正如其他人所说,multiprocessing 只能将 Python 对象传输到可以腌制的工作进程。如果你不能按照 unutbu 的描述重新组织你的代码,你可以使用dills 扩展的 pickling/unpickling 功能来传输数据(尤其是代码数据),如下所示。

这个方案只需要安装dill,不需要像pathos那样安装其他库:

import os
from multiprocessing import Pool

import dill


def run_dill_encoded(payload):
    fun, args = dill.loads(payload)
    return fun(*args)


def apply_async(pool, fun, args):
    payload = dill.dumps((fun, args))
    return pool.apply_async(run_dill_encoded, (payload,))


if __name__ == "__main__":

    pool = Pool(processes=5)

    # asyn execution of lambda
    jobs = []
    for i in range(10):
        job = apply_async(pool, lambda a, b: (a, b, a * b), (i, i + 1))
        jobs.append(job)

    for job in jobs:
        print job.get()
    print

    # async execution of static method

    class O(object):

        @staticmethod
        def calc():
            return os.getpid()

    jobs = []
    for i in range(10):
        job = apply_async(pool, O.calc, ())
        jobs.append(job)

    for job in jobs:
        print job.get()

【讨论】:

我是dillpathos 的作者......虽然你是对的,但在我的回答中使用pathos 不是更好、更干净、更灵活吗?或者也许我有点偏见…… 在撰写本文时我并不知道pathos 的状态,我想提出一个非常接近答案的解决方案。现在我已经看到了你的解决方案,我同意这是要走的路。 我阅读了您的解决方案,然后说,Doh… I didn't even think of doing it like that. 这有点酷。 感谢您的发帖,我使用这种方法来处理/取消处理无法腌制的参数:***.com/questions/27883574/… @rocksportrocker。我正在阅读此示例,但无法理解为什么有明确的 for 循环。我通常会看到并行例程获取一个列表并返回一个没有循环的列表。【参考方案5】:

我发现我也可以通过尝试在其上使用探查器,在一段完美运行的代码上准确地生成该错误输出。

请注意,这是在 Windows 上(分支不太优雅)。

我在跑步:

python -m profile -o output.pstats <script> 

发现去掉profiling去掉了错误,放置profiling又恢复了。也让我发疯,因为我知道过去的代码可以工作。我正在检查是否有东西更新了 pool.py... 然后有一种下沉的感觉并消除了分析,就是这样。

在这里张贴存档以防其他人遇到它。

【讨论】:

哇,感谢您的提及!在最后一个小时左右,它让我发疯了;我尝试了一切,直到一个非常简单的例子 - 似乎没有任何工作。但我也让探查器在我的批处理文件中运行:( 哦,非常感谢。不过,这听起来确实很愚蠢,因为它是如此出乎意料。我认为应该在文档中提到它。我所拥有的只是一个 import pdb 语句,而一个只有 pass 的简单***函数不能“pickle”。【参考方案6】:
Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

如果您在传递给异步作业的模型对象中有任何内置函数,也会出现此错误。

因此,请务必检查传递的 模型对象 没有内置函数。 (在我们的例子中,我们在模型内部使用django-model-utils 的FieldTracker() 函数来跟踪某个字段)。这是相关 GitHub 问题的 link。

【讨论】:

【参考方案7】:

这个解决方案只需要安装 dill 不需要其他库作为 pathos

def apply_packed_function_for_map((dumped_function, item, args, kwargs),):
    """
    Unpack dumped function as target function and call it with arguments.

    :param (dumped_function, item, args, kwargs):
        a tuple of dumped function and its arguments
    :return:
        result of target function
    """
    target_function = dill.loads(dumped_function)
    res = target_function(item, *args, **kwargs)
    return res


def pack_function_for_map(target_function, items, *args, **kwargs):
    """
    Pack function and arguments to object that can be sent from one
    multiprocessing.Process to another. The main problem is:
        «multiprocessing.Pool.map*» or «apply*»
        cannot use class methods or closures.
    It solves this problem with «dill».
    It works with target function as argument, dumps it («with dill»)
    and returns dumped function with arguments of target function.
    For more performance we dump only target function itself
    and don't dump its arguments.
    How to use (pseudo-code):

        ~>>> import multiprocessing
        ~>>> images = [...]
        ~>>> pool = multiprocessing.Pool(100500)
        ~>>> features = pool.map(
        ~...     *pack_function_for_map(
        ~...         super(Extractor, self).extract_features,
        ~...         images,
        ~...         type='png'
        ~...         **options,
        ~...     )
        ~... )
        ~>>>

    :param target_function:
        function, that you want to execute like  target_function(item, *args, **kwargs).
    :param items:
        list of items for map
    :param args:
        positional arguments for target_function(item, *args, **kwargs)
    :param kwargs:
        named arguments for target_function(item, *args, **kwargs)
    :return: tuple(function_wrapper, dumped_items)
        It returs a tuple with
            * function wrapper, that unpack and call target function;
            * list of packed target function and its' arguments.
    """
    dumped_function = dill.dumps(target_function)
    dumped_items = [(dumped_function, item, args, kwargs) for item in items]
    return apply_packed_function_for_map, dumped_items

它也适用于 numpy 数组。

【讨论】:

【参考方案8】:

以@rocksportrocker 解决方案为基础, 在发送和接收结果时进行 dill 是有意义的。

import dill
import itertools
def run_dill_encoded(payload):
    fun, args = dill.loads(payload)
    res = fun(*args)
    res = dill.dumps(res)
    return res

def dill_map_async(pool, fun, args_list,
                   as_tuple=True,
                   **kw):
    if as_tuple:
        args_list = ((x,) for x in args_list)

    it = itertools.izip(
        itertools.cycle([fun]),
        args_list)
    it = itertools.imap(dill.dumps, it)
    return pool.map_async(run_dill_encoded, it, **kw)

if __name__ == '__main__':
    import multiprocessing as mp
    import sys,os
    p = mp.Pool(4)
    res = dill_map_async(p, lambda x:[sys.stdout.write('%s\n'%os.getpid()),x][-1],
                  [lambda x:x+1]*10,)
    res = res.get(timeout=100)
    res = map(dill.loads,res)
    print(res)

【讨论】:

【参考方案9】:

正如@penky Suresh 在answer 中所建议的那样,不要使用内置关键字。

显然args 是处理多处理时的内置关键字


class TTS:
    def __init__(self):
        pass

    def process_and_render_items(self):
        multiprocessing_args = ["a": "b", "c": "d", "e": "f", "g": "h"]

        with ProcessPoolExecutor(max_workers=10) as executor:
          # Using args here is fine. 
            future_processes = 
              executor.submit(TTS.process_and_render_item, args)
                for args in multiprocessing_args
            

            for future in as_completed(future_processes):
                try:
                    data = future.result()
                except Exception as exc:
                    print(f"Generated an exception: exc")
                else:
                   print(f"Generated data for comment process: future")
 

    # Dont use 'args' here. It seems to be a built-in keyword.
    # Changing 'args' to 'arg' worked for me.
    def process_and_render_item(arg):
        print(arg)
      # This will print "a": "b", "c": "d" for the first process
      # and "e": "f", "g": "h" for the second process.



PS:制表符/空格可能有点偏离。

【讨论】:

这是一个不好的例子。代码不完整。 multiprocessing_args 未定义,TTS 未定义。它也与问题无关,这与酸洗功能有关。您还使用 python 2.7 回复了 9 年的帖子。如果我可以对此投反对票,我会的。 @TLK3,你是对的。我已经修改了代码并添加了 cmets。希望它现在更有意义。我意识到我正在回复旧帖子,但人们仍在旧帖子中寻找新答案。

以上是关于Python多处理PicklingError:无法腌制<type'function'>的主要内容,如果未能解决你的问题,请参考以下文章

python多处理cPickle.PicklingError

事件处理程序中的 Python Kivy PicklingError:无法腌制 <type 'code'>:属性查找 __builtin__.code 失败

使用多处理时出现 PicklingError

pickle.PicklingError:无法腌制未打开读取的文件

PySpark / Glue:PicklingError:无法序列化对象:TypeError:无法腌制thread.lock对象

_pickle.PicklingError:无法序列化对象:TypeError:无法腌制_thread.RLock对象