multiprocessing.Pool - PicklingError: Can't pickle <type 'thread.lock'>: 属性查找 thread.lock 失败

Posted

技术标签:

【中文标题】multiprocessing.Pool - PicklingError: Can\'t pickle <type \'thread.lock\'>: 属性查找 thread.lock 失败【英文标题】:multiprocessing.Pool - PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failedmultiprocessing.Pool - PicklingError: Can't pickle <type 'thread.lock'>: 属性查找 thread.lock 失败 【发布时间】:2011-12-13 12:08:21 【问题描述】:

multiprocessing.Pool 快把我逼疯了... 我想升级很多包,对于每一个包,我都必须检查是否有更高的版本。这是由check_one 函数完成的。 主要代码在Updater.update 方法中:在那里我创建了Pool 对象并调用map() 方法。

代码如下:

def check_one(args):
    res, total, package, version = args
    i = res.qsize()
    logger.info('\r[0:.1% - 1, 2 / 3]',
        i / float(total), package, i, total, addn=False)
    try:
        json = PyPIJson(package).retrieve()
        new_version = Version(json['info']['version'])
    except Exception as e:
        logger.error('Error: Failed to fetch data for 0 (1)', package, e)
        return
    if new_version > version:
        res.put_nowait((package, version, new_version, json))

class Updater(FileManager):

    # __init__ and other methods...

    def update(self):    
        logger.info('Searching for updates')
        packages = Queue.Queue()
        data = ((packages, self.set_len, dist.project_name, Version(dist.version)) \
            for dist in self.working_set)
        pool = multiprocessing.Pool()
        pool.map(check_one, data)
        pool.close()
        pool.join()
        while True:
            try:
                package, version, new_version, json = packages.get_nowait()
            except Queue.Empty:
                break
            txt = 'A new release is avaiable for 0: 1!s (old 2), update'.format(package,
                                                                                      new_version,
                                                                                      version)
            u = logger.ask(txt, bool=('upgrade version', 'keep working version'), dont_ask=self.yes)
            if u:
                self.upgrade(package, json, new_version)
            else:
                logger.info('0 has not been upgraded', package)
        self._clean()
        logger.success('Updating finished successfully')

当我运行它时,我得到了这个奇怪的错误:

Searching for updates
Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/local/lib/python2.7/dist-packages/multiprocessing/pool.py", line 225, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed

【问题讨论】:

【参考方案1】:

多处理通过mp.SimpleQueue 将任务(包括check_onedata)传递给工作进程。与Queue.Queues 不同,放在mp.SimpleQueue 中的所有内容都必须是可选择的。 Queue.Queues 不可取:

import multiprocessing as mp
import Queue

def foo(queue):
    pass

pool=mp.Pool()
q=Queue.Queue()

pool.map(foo,(q,))

产生此异常:

UnpickleableError: Cannot pickle <type 'thread.lock'> objects

您的data 包括packages,它是一个Queue.Queue。这可能是问题的根源。


这是一个可能的解决方法:Queue 用于两个目的:

    找出大致尺寸(通过调用qsize) 存储结果以供日后检索。

我们可以使用mp.Value,而不是调用qsize,在多个进程之间共享一个值。

我们可以(并且应该)只返回来自对check_one 的调用的值,而不是将结果存储在队列中。 pool.map 将结果收集到自己制作的队列中,并将结果作为pool.map 的返回值返回。

例如:

import multiprocessing as mp
import Queue
import random
import logging

# logger=mp.log_to_stderr(logging.DEBUG)
logger = logging.getLogger(__name__)


qsize = mp.Value('i', 1)
def check_one(args):
    total, package, version = args
    i = qsize.value
    logger.info('\r[0:.1% - 1, 2 / 3]'.format(
        i / float(total), package, i, total))
    new_version = random.randrange(0,100)
    qsize.value += 1
    if new_version > version:
        return (package, version, new_version, None)
    else:
        return None

def update():    
    logger.info('Searching for updates')
    set_len=10
    data = ( (set_len, 'project-0'.format(i), random.randrange(0,100))
             for i in range(set_len) )
    pool = mp.Pool()
    results = pool.map(check_one, data)
    pool.close()
    pool.join()
    for result in results:
        if result is None: continue
        package, version, new_version, json = result
        txt = 'A new release is avaiable for 0: 1!s (old 2), update'.format(
            package, new_version, version)
        logger.info(txt)
    logger.info('Updating finished successfully')

if __name__=='__main__':
    logging.basicConfig(level=logging.DEBUG)
    update()

【讨论】:

谢谢!但是现在我怎样才能填充我的队列?我不能让它成为全球性的......我也试图让check_one成为一种方法(使用这个黑客:***.com/questions/1816958/…),但同样,它没有用...... 我在这里找到了解决方案:***.com/questions/3217002/…。我必须使用multiprocessing.Manager().Queue() 而不是multiprocessing.Queue 至于解决方法,我认为您是对的。我会修复我的代码。再次感谢您! 我知道这个问题的地址是multiprocessing.Pool,但是如果我有一个实际的threading.Lock,我该怎么办?【参考方案2】:

在对类似问题进行了大量挖掘之后......

事实证明,任何碰巧包含 threading.Condition() 对象的对象都永远不会与 multiprocessing.Pool 一起使用。

这是一个例子

import multiprocessing as mp
import threading

class MyClass(object):
   def __init__(self):
      self.cond = threading.Condition()

def foo(mc):
   pass

pool=mp.Pool()
mc=MyClass()
pool.map(foo,(mc,))

我在 Python 2.7.5 上运行过这个程序并遇到了同样的错误:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 811, in __bootstrap_inner
self.run()
  File "/usr/lib64/python2.7/threading.py", line 764, in run
self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 342, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed

但后来在 python 3.4.1 上运行,这个问题已经修复。

虽然对于我们这些仍在使用 2.7.x 的人来说,我还没有遇到任何有用的解决方法。

【讨论】:

从 Python 3.4.3 开始,threading.Lock 仍然不能腌制(尽管正如你提到的,threading.Condition 可以)。 我确认threading.Lock 对象仍然不能在 Python 3.6.2 中被腌制。 嗯...仍然在 3.7 版本 3.6.8 并且现在有这个问题,挖掘堆栈溢出...... 当我尝试将 threading.RLock 传递给 Pool 初始化程序时,我遇到了同样的问题。切换到 multiprocessing.RLock 后,一切都按预期工作。【参考方案3】:

我在 docker 上使用 python 3.6 版遇到了这个问题。将版本更改为 3.7.3 并解决。

【讨论】:

面对 Python 3.7.6 中的问题

以上是关于multiprocessing.Pool - PicklingError: Can't pickle <type 'thread.lock'>: 属性查找 thread.lock 失败的主要内容,如果未能解决你的问题,请参考以下文章

Python 多进程编程之multiprocessing--Pool

我们啥时候应该调用 multiprocessing.Pool.join?

`multiprocessing.Pool.map()` 似乎安排错误

multiprocessing.Pool() 比只使用普通函数慢

python之multiprocessing:multiprocessing.Pool

multiprocessing.pool.ApplyResult 的文档在哪里?