多处理队列已满

Posted

技术标签:

【中文标题】多处理队列已满【英文标题】:multiprocessing queue full 【发布时间】:2015-10-11 17:07:39 【问题描述】:

我正在使用 concurrent.futures 来实现多处理。我收到一个 queue.Full 错误,这很奇怪,因为我只分配了 10 个作业。

A_list = [np.random.rand(2000, 2000) for i in range(10)]

with ProcessPoolExecutor() as pool:
    pool.map(np.linalg.svd, A_list)

错误:

Exception in thread Thread-9:
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/threading.py", line 921, in _bootstrap_inner
    self.run()
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/threading.py", line 869, in run
    self._target(*self._args, **self._kwargs)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/concurrent/futures/process.py", line 251, in _queue_management_worker
    shutdown_worker()
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/concurrent/futures/process.py", line 209, in shutdown_worker
    call_queue.put_nowait(None)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/queues.py", line 131, in put_nowait
    return self.put(obj, False)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/queues.py", line 82, in put
    raise Full
queue.Full

【问题讨论】:

如果你使用一个小得多的数组,你会得到同样的错误吗? 我在较小的阵列上没有得到错误。我能走的最大的是~200x200。 只有当 Pool 因工作进程崩溃而中断时才会运行失败的对 shutdown_worker 的调用 - 所以您需要追查的真正问题是为什么会发生这种情况。 【参考方案1】:

简答 我相信管道尺寸限制是根本原因。除了将数据分解成更小的块并迭代处理它们之外,您对此无能为力。这意味着您可能需要找到一种新算法,该算法一次可以处理 2000x2000 数组的一小部分,才能找到奇异值组合。

详情 让我们马上明白一件事:你正在处理大量信息。仅仅因为您只使用十个项目并不意味着它是微不足道的。这些项目中的每一个都是一个 2000x2000 数组,其中包含 4,000,000 个浮点数,每个浮点数通常为 64 位,因此您看到每个数组大约 244MB,以及在Numpy's ndarrays 中标记的其他数据。

ProcessPoolExecutor 通过启动一个单独的线程来管理工作进程来工作。管理线程使用multiprocesing.Queue 将作业传递给工作人员,称为_call_queue。这些multiprocessing.Queues 实际上只是pipes 的精美包装,而您试图传递给工作人员的ndarray 可能太大而无法正确处理管道。

阅读Python Issue 8426 表明,即使您可以为您的操作系统查找一些标称管道大小限制,也很难准确地确定您的管道有多大。有太多变数使它变得简单。即使将事物从队列中拉出的顺序也会在底层管道中引发竞争条件,从而触发奇怪的错误。

我怀疑您的某个工作人员正在从其_call_queue 中获取不完整或损坏的对象,因为该队列的管道中充满了您的巨型对象。该工作人员以不干净的方式死亡,工作队列管理器检测到此故障,因此它放弃工作并告诉剩余的工作人员退出。但它通过将poison pills 传递给_call_queue 来做到这一点,_call_queue仍然充满了你的巨型 ndarrays。这就是为什么您得到完整队列异常的原因 - 您的数据填满了队列,然后管理线程尝试使用同一个队列将控制消息传递给其他工作人员。

我认为这是一个典型的例子,说明了在程序中不同实体之间混合数据和控制流的潜在危险。您的大数据不仅阻止了工作人员接收更多数据,还阻止了经理与工作人员的控制通信,因为他们使用相同的路径。

我无法重现你的失败,所以我不能确定这一切都是正确的。但是,您可以使此代码与 200x200 数组 (~2.5MB) 一起工作,这一事实似乎支持了这一理论。标称管道大小限制似乎以 KB 或最多几 MB 为单位,具体取决于操作系统和体系结构。如此大量的数据可以通过管道这一事实并不令人惊讶,尤其是当您考虑到如果消费者持续接收数据时,并非所有 2.5MB 都需要立即真正放入管道中。它为您可以通过管道连续获取的数据量提出了一个合理的上限。

【讨论】:

【参考方案2】:

我最近在调试一个通过管道发送各种 GB 数据的 python3.6 程序时偶然发现了这一点。这是我发现的(希望它可以节省其他人的时间!)。

就像skrrgwasme 所说,如果队列管理器在发送毒丸时无法获取信号量,则raises 出现队列已满错误。 信号量的acquire call 是非阻塞的,它会导致管理器失败(由于数据和控制流共享相同的队列,它无法发送“控制”命令)。请注意,上面的链接是指 python 3.6.0

现在我想知道为什么我的队列管理器会发送毒丸。一定有其他的失败! 显然发生了一些异常(在其他一些子进程中?在父进程中?),队列管理器试图清理并关闭所有子进程。在这一点上,我有兴趣找到这个根本原因。

调试根本原因

我最初尝试记录子进程中的所有异常,但显然那里没有发生显式错误。 来自issue 3895:

请注意,当 unpickle 结果失败时,multiprocessing.Pool 也会被破坏。

似乎 py36 中的多处理模块已损坏,因为它无法正确捕获和处理序列化错误。

不幸的是,由于时间限制,我没有设法自己复制和验证问题,而是更愿意跳到行动点和更好的编程实践(不要通过管道发送所有数据 :)。这里有几个想法:

    尝试腌制应该通过管道运行的数据。由于我的数据(数百 GB)的巨大性质和时间限制,我无法找到哪些记录是不可序列化的。 将调试器放入 python3.6 并打印原始异常。

行动要点

    尽可能改造您的程序以减少通过管道发送的数据。

    阅读issue 3895 后,问题似乎与酸洗错误有关。另一种选择(和良好的编程实践)可能是使用不同的方式传输数据。例如,可以让子进程写入文件并返回父进程的路径(这可能只是一个小字符串,可能是几个字节)。

    等待以后的 python 版本。显然,在issue 3895 的上下文中,这已在 python 版本标签 v3.7.0b3 上得到修复。 Full 异常将是 shutdown_worker 内的 handled。在撰写本文时,Python 的当前维护版本是 3.6.5

【讨论】:

以上是关于多处理队列已满的主要内容,如果未能解决你的问题,请参考以下文章

JAVA进阶-多线程

如何检测多处理。管道已满?

多线程7-阻塞队列

关于高性能流数据处理的问题

Java线程池中的四种拒绝策略

Java多线程-----线程池详解