使用多处理时出现 PicklingError

Posted

技术标签:

【中文标题】使用多处理时出现 PicklingError【英文标题】:PicklingError when using multiprocessing 【发布时间】:2011-10-24 09:35:00 【问题描述】:

在多处理模块中使用Pool.map_async()(以及Pool.map())时遇到问题。我已经实现了一个并行循环函数,只要Pool.map_async 的函数输入是“常规”函数,它就可以正常工作。当功能是例如一个类的方法,然后我得到一个PicklingError

cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

我只使用 Python 进行科学计算,所以我对酸洗的概念不是很熟悉,今天刚刚了解了一点。我已经查看了几个之前的答案,例如 Can't pickle <type 'instancemethod'> when using multiprocessing Pool.map(),但我无法弄清楚如何使它工作,即使按照答案中提供的链接进行操作也是如此。

我的代码,其目标是使用多核模拟 Normal r.v 的向量。请注意,这只是一个示例,甚至可能无法在多核上运行。

import multiprocessing as mp
import scipy as sp
import scipy.stats as spstat

def parfor(func, args, static_arg = None, nWorkers = 8, chunksize = None):
    """
    Purpose: Evaluate function using Multiple cores.

    Input:
        func       - Function to evaluate in parallel
        arg        - Array of arguments to evaluate func(arg)  
        static_arg - The "static" argument (if any), i.e. the variables that are      constant in the evaluation of func.
        nWorkers   - Number of Workers to process computations.
    Output:
        func(i, static_arg) for i in args.
    
    """
    # Prepare arguments for func: Collect arguments with static argument (if any)
    if static_arg != None:
        arguments = [[arg] + static_arg for arg in list(args)]
    else:
        arguments = args
    
    # Initialize workers
    pool = mp.Pool(processes = nWorkers) 

    # Evaluate function
    result = pool.map_async(func, arguments, chunksize = chunksize)
    pool.close()
    pool.join()

    return sp.array(result.get()).flatten() 

# First test-function. Freeze location and scale for the Normal random variates generator.
# This returns a function that is a method of the class Norm_gen. Methods cannot be pickled
# so this will give an error.
def genNorm(loc, scale):
    def subfunc(a):
        return spstat.norm.rvs(loc = loc, scale = scale, size = a)
    return subfunc

# Second test-function. The same as above but does not return a method of a class. This is a "plain" function and can be 
# pickled
def test(fargs):
    x, a, b = fargs
    return spstat.norm.rvs(size = x, loc = a, scale = b)

# Try it out.
N = 1000000

# Set arguments to function. args1 = [1, 1, 1,... ,1], the purpose is just to generate a random variable of size 1 for each 
# element in the output vector.
args1 = sp.ones(N)
static_arg = [0, 1] # standarized normal.

# This gives the PicklingError
func = genNorm(*static_arg)
sim = parfor(func, args1, static_arg = None, nWorkers = 12, chunksize = None)

# This is OK:
func = test
sim = parfor(func, args1, static_arg = static_arg, nWorkers = 12, chunksize = None)

按照Can't pickle <type 'instancemethod'> when using multiprocessing Pool.map() 中问题答案中提供的链接,Steven Bethard(几乎在最后)建议使用copy_reg 模块。他的代码是:

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)

import copy_reg
import types

copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)

我真的不明白如何使用它。我唯一能想到的就是把它放在我的代码之前,但它没有帮助。一个简单的解决方案当然是只使用有效的解决方案,避免参与copy_reg。我更感兴趣的是让copy_reg 正常工作以充分利用多处理,而不必每次都解决问题。

【问题讨论】:

【参考方案1】:

这里的问题不是“pickle”错误消息而是概念性的: 多进程确实在“工人”不同的进程中分叉你的代码以执行 它的魔力。

然后,它通过无缝序列化和反序列化数据(即使用 pickle 的部分)将数据发送到不同进程或从不同进程发送数据。

当来回传递的部分数据是一个函数时 - 它假设被调用进程中存在一个同名的函数,并且(我猜)将函数名作为字符串传递。由于函数是无状态的,被调用的工作进程只是用它收到的数据调用同一个函数。 (Python函数不能通过pickle进行序列化,所以只在主进程和工作进程之间传递引用)

当你的函数是实例中的一个方法时——尽管当我们编写 python 代码时,它与函数很像,带有一个“自动”self 变量,但它在下面就不一样了。因为实例(对象)是有状态的。这意味着工作进程没有对象的副本,该对象是您要在另一端调用的方法的所有者。

解决将方法作为函数传递给 map_async 调用的方法也不起作用 - 因为多进程只使用函数引用,而不是传递它时的实际函数。

因此,您应该 (1) 更改您的代码,以便将函数(而不是方法)传递给工作进程,将对象保持的任何状态转换为要调用的新参数。 (2) 为 map_async 调用创建一个“目标”函数,该函数在工作进程端重建所需的对象,然后调用其中的函数。 Python 中最直接的类本身是可挑选的,因此您可以在 map_async 调用中传递作为函数所有者本身的对象 - 并且“目标”函数将在工作端调用适当的方法本身。

(2) 可能听起来“困难”,但可能就是这样——除非你的对象的类不能被腌制:

import types

def target(object, *args, **kw):
    method_name = args[0]
    return getattr(object, method_name)(*args[1:])
(...)    
#And add these 3 lines prior to your map_async call:


    # Evaluate function
    if isinstance (func, types.MethodType):
        arguments.insert(0, func.__name__)
        func = target
    result = pool.map_async(func, arguments, chunksize = chunksize)

*免责声明:我没有测试过这个

【讨论】:

感谢您的回答。我有一个问题,如果你能回答我将非常感激: 1. 你说:“(1) 要么更改你的代码,以便你将一个函数 - 而不是方法 - 传递给工作进程,......”。这就是我在第二次尝试中所做的,即使用 test() 函数对吗?我的问题是:如果我没有传递一个函数,它怎么会起作用?你的意思是我可以遇到未来的错误?我试过你的代码,它也有效,但如果我的第一个替代方案已经奏效,我看不出“复杂”的意义。 我还想指出,您的替代方案 (2) 对我不起作用,因为我的主要问题是我使用的类不可选择。我试图使用 copy_reg 来解决这个问题,这应该是可能的,因为 Steve Bethard 使用了我发布的第二个代码,并且它对他有用。再次感谢您抽出宝贵时间。 关于我的第一篇文章,我错了。我确实写了你的代码,但它没有任何效果,因为“if isinstance(func,types.MethodType):”从来都不是真的,因此代码没有被执行。对于之前没有注意到这一点,我深表歉意。

以上是关于使用多处理时出现 PicklingError的主要内容,如果未能解决你的问题,请参考以下文章

python多处理cPickle.PicklingError

Python 多处理 PicklingError:无法腌制 <type 'function'>

Python多处理PicklingError:无法腌制<type'function'>

关于tcp连接对象在多进程中的错误:pickle.PicklingError

使用一个热编码器时出现gridsearchCV错误

使用 ActionMailer 发送多部分邮件时出现问题