在队列为空之前调用join时的Python 3多处理队列死锁
Posted
技术标签:
【中文标题】在队列为空之前调用join时的Python 3多处理队列死锁【英文标题】:Python 3 Multiprocessing queue deadlock when calling join before the queue is empty 【发布时间】:2015-10-18 08:39:41 【问题描述】:我对python 3 中multiprocessing
模块中的队列有疑问
这是他们在programming guidelines中所说的:
请记住,已将项目放入队列的进程将在之前等待 终止,直到所有缓冲的项目都被“feeder”线程提供给 底层管道。 (子进程可以调用 Queue.cancel_join_thread 队列的方法来避免这种行为。)
这意味着每当您使用队列时,您需要确保所有 已放入队列的项目最终将在 过程加入。否则,您无法确定具有 放入队列中的项目将终止。还要记住非守护进程 进程将自动加入。
一个会死锁的例子如下:
从多处理导入进程,队列 定义 f(q): q.put('X' * 1000000) 如果 __name__ == '__main__': 队列 = 队列() p = 进程(目标 = f,参数 =(队列,)) p.start() p.join() # 这个死锁 obj = queue.get()这里的解决方法是交换最后两行(或简单地删除 p.join() 行)。
显然,queue.get()
不应该在 join()
之后调用。
但是有一些使用队列的示例,其中 get
在 join
之后被调用,例如:
import multiprocessing as mp
import random
import string
# define a example function
def rand_string(length, output):
""" Generates a random string of numbers, lower- and uppercase chars. """
rand_str = ''.join(random.choice(
string.ascii_lowercase
+ string.ascii_uppercase
+ string.digits)
for i in range(length))
output.put(rand_str)
if __name__ == "__main__":
# Define an output queue
output = mp.Queue()
# Setup a list of processes that we want to run
processes = [mp.Process(target=rand_string, args=(5, output))
for x in range(2)]
# Run processes
for p in processes:
p.start()
# Exit the completed processes
for p in processes:
p.join()
# Get process results from the output queue
results = [output.get() for p in processes]
print(results)
我已经运行了这个程序并且它可以工作(也作为 *** 问题Python 3 - Multiprocessing - Queue.get() does not respond 的解决方案发布)。
有人可以帮我理解这里的死锁规则吗?
【问题讨论】:
【参考方案1】:多处理中允许数据在进程之间传输的队列实现依赖于标准的操作系统管道。
操作系统管道不是无限长的,因此在put()
操作期间,排队数据的进程可能会在操作系统中被阻塞,直到其他进程使用get()
从队列中检索数据。
对于少量数据,例如您的示例中的数据,主进程可以join()
所有衍生的子进程然后拾取数据。这通常效果很好,但无法扩展,并且不清楚何时会中断。
但它肯定会因大量数据而崩溃。子进程将被阻塞在put()
等待主进程用get()
从队列中删除一些数据,但主进程被阻塞在join()
等待子进程完成。这会导致死锁。
这是一个用户拥有this exact issue 的示例。我在那里的答案中发布了一些代码,帮助他解决了他的问题。
【讨论】:
我理解你的操作系统管道解释,但我在这里有一个问题:如果我在进程完成执行之前调用 q.get(),说它是一个 HTTP Longpoll Get 请求,q.get( ) 等待响应/超时?该过程是否有可能在我的 q.get() 之后的 mp.join() 处返回,而我错过了该返回值? 这取决于你如何设置 q.get() 块和超时参数。【参考方案2】:在从共享队列中获得所有消息之前,不要在进程对象上调用join()
。
我使用以下解决方法允许进程在处理所有结果之前退出:
results = []
while True:
try:
result = resultQueue.get(False, 0.01)
results.append(result)
except queue.Empty:
pass
allExited = True
for t in processes:
if t.exitcode is None:
allExited = False
break
if allExited & resultQueue.empty():
break
它可以缩短,但我把它留得更长,以便新手更清楚。
这里的resultQueue
是与multiprocess.Process
对象共享的multiprocess.Queue
。在这段代码之后,您将获得 result
数组,其中包含队列中的所有消息。
问题是接收消息的队列管道的输入缓冲区可能已满,导致写入器无限阻塞,直到有足够的空间接收下一条消息。所以你有三种方法可以避免阻塞:
增加multiprocessing.connection.BUFFER
的大小(不太好)
减小消息大小或其数量(不太好)
当消息到来时立即从队列中获取消息(好方法)
【讨论】:
以上是关于在队列为空之前调用join时的Python 3多处理队列死锁的主要内容,如果未能解决你的问题,请参考以下文章