多处理队列已满
Posted
技术标签:
【中文标题】多处理队列已满【英文标题】:multiprocessing queue full 【发布时间】:2015-10-11 17:07:39 【问题描述】:我正在使用 concurrent.futures 来实现多处理。我收到一个 queue.Full 错误,这很奇怪,因为我只分配了 10 个作业。
A_list = [np.random.rand(2000, 2000) for i in range(10)]
with ProcessPoolExecutor() as pool:
pool.map(np.linalg.svd, A_list)
错误:
Exception in thread Thread-9:
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/threading.py", line 921, in _bootstrap_inner
self.run()
File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/threading.py", line 869, in run
self._target(*self._args, **self._kwargs)
File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/concurrent/futures/process.py", line 251, in _queue_management_worker
shutdown_worker()
File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/concurrent/futures/process.py", line 209, in shutdown_worker
call_queue.put_nowait(None)
File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/queues.py", line 131, in put_nowait
return self.put(obj, False)
File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/queues.py", line 82, in put
raise Full
queue.Full
【问题讨论】:
如果你使用一个小得多的数组,你会得到同样的错误吗? 我在较小的阵列上没有得到错误。我能走的最大的是~200x200。 只有当Pool
因工作进程崩溃而中断时才会运行失败的对 shutdown_worker
的调用 - 所以您需要追查的真正问题是为什么会发生这种情况。
【参考方案1】:
简答 我相信管道尺寸限制是根本原因。除了将数据分解成更小的块并迭代处理它们之外,您对此无能为力。这意味着您可能需要找到一种新算法,该算法一次可以处理 2000x2000 数组的一小部分,才能找到奇异值组合。
详情 让我们马上明白一件事:你正在处理大量信息。仅仅因为您只使用十个项目并不意味着它是微不足道的。这些项目中的每一个都是一个 2000x2000 数组,其中包含 4,000,000 个浮点数,每个浮点数通常为 64 位,因此您看到每个数组大约 244MB,以及在Numpy's ndarrays 中标记的其他数据。
ProcessPoolExecutor 通过启动一个单独的线程来管理工作进程来工作。管理线程使用multiprocesing.Queue 将作业传递给工作人员,称为_call_queue
。这些multiprocessing.Queue
s 实际上只是pipes 的精美包装,而您试图传递给工作人员的ndarray 可能太大而无法正确处理管道。
阅读Python Issue 8426 表明,即使您可以为您的操作系统查找一些标称管道大小限制,也很难准确地确定您的管道有多大。有太多变数使它变得简单。即使将事物从队列中拉出的顺序也会在底层管道中引发竞争条件,从而触发奇怪的错误。
我怀疑您的某个工作人员正在从其_call_queue
中获取不完整或损坏的对象,因为该队列的管道中充满了您的巨型对象。该工作人员以不干净的方式死亡,工作队列管理器检测到此故障,因此它放弃工作并告诉剩余的工作人员退出。但它通过将poison pills 传递给_call_queue
来做到这一点,_call_queue
仍然充满了你的巨型 ndarrays。这就是为什么您得到完整队列异常的原因 - 您的数据填满了队列,然后管理线程尝试使用同一个队列将控制消息传递给其他工作人员。
我认为这是一个典型的例子,说明了在程序中不同实体之间混合数据和控制流的潜在危险。您的大数据不仅阻止了工作人员接收更多数据,还阻止了经理与工作人员的控制通信,因为他们使用相同的路径。
我无法重现你的失败,所以我不能确定这一切都是正确的。但是,您可以使此代码与 200x200 数组 (~2.5MB) 一起工作,这一事实似乎支持了这一理论。标称管道大小限制似乎以 KB 或最多几 MB 为单位,具体取决于操作系统和体系结构。如此大量的数据可以通过管道这一事实并不令人惊讶,尤其是当您考虑到如果消费者持续接收数据时,并非所有 2.5MB 都需要立即真正放入管道中。它为您可以通过管道连续获取的数据量提出了一个合理的上限。
【讨论】:
【参考方案2】:我最近在调试一个通过管道发送各种 GB 数据的 python3.6 程序时偶然发现了这一点。这是我发现的(希望它可以节省其他人的时间!)。
就像skrrgwasme 所说,如果队列管理器在发送毒丸时无法获取信号量,则raises 出现队列已满错误。 信号量的acquire call 是非阻塞的,它会导致管理器失败(由于数据和控制流共享相同的队列,它无法发送“控制”命令)。请注意,上面的链接是指 python 3.6.0
现在我想知道为什么我的队列管理器会发送毒丸。一定有其他的失败! 显然发生了一些异常(在其他一些子进程中?在父进程中?),队列管理器试图清理并关闭所有子进程。在这一点上,我有兴趣找到这个根本原因。
调试根本原因
我最初尝试记录子进程中的所有异常,但显然那里没有发生显式错误。 来自issue 3895:
请注意,当 unpickle 结果失败时,multiprocessing.Pool 也会被破坏。
似乎 py36 中的多处理模块已损坏,因为它无法正确捕获和处理序列化错误。
不幸的是,由于时间限制,我没有设法自己复制和验证问题,而是更愿意跳到行动点和更好的编程实践(不要通过管道发送所有数据 :)。这里有几个想法:
-
尝试腌制应该通过管道运行的数据。由于我的数据(数百 GB)的巨大性质和时间限制,我无法找到哪些记录是不可序列化的。
将调试器放入 python3.6 并打印原始异常。
行动要点
尽可能改造您的程序以减少通过管道发送的数据。
阅读issue 3895 后,问题似乎与酸洗错误有关。另一种选择(和良好的编程实践)可能是使用不同的方式传输数据。例如,可以让子进程写入文件并返回父进程的路径(这可能只是一个小字符串,可能是几个字节)。
等待以后的 python 版本。显然,在issue 3895 的上下文中,这已在 python 版本标签 v3.7.0b3 上得到修复。 Full 异常将是 shutdown_worker 内的 handled。在撰写本文时,Python 的当前维护版本是 3.6.5
【讨论】:
以上是关于多处理队列已满的主要内容,如果未能解决你的问题,请参考以下文章