为啥队列大小保持为零?
Posted
技术标签:
【中文标题】为啥队列大小保持为零?【英文标题】:Why does the queue size remain zero?为什么队列大小保持为零? 【发布时间】:2021-11-11 21:13:53 【问题描述】:我正在使用多重处理来处理我的记录。
queue = Queue()
def produce(i, item):
data = process(i, item)
queue.put(data)
def process(i, item):
data = do_processing(i, item)
return data
if __name__ == '__main__':
records = load_records()
with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
print('produce items')
for i, item in ennumerate(records.items()):
executor.submit(produce, i, item)
print('queue size:'.format(queue.qsize()))
while not queue.empty():
save(queue.get())
在这里,我将记录从生产中放入队列中,因为该步骤非常耗时。处理完记录后,我保存它们。由于消耗步骤不费时,我不费心在单独的线程中运行它。
在我执行代码之后,队列仍然是空的。这是怎么回事?
【问题讨论】:
大概程序在任何线程将任何东西放入队列之前就结束了。 但是我正在运行带有上下文的执行程序,即with
。 with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor
应该等到所有内容都处理完毕。这是我的理解。
可能相关:***.com/questions/68751929/…
您使用的是多处理而不是多线程。每个进程都会创建自己的 queue() 副本。您需要使用显式共享的数据结构。或者,只需直接调用 process(),并利用各种 map 调用将返回所有结果这一事实。
@FrankYellin 是否有共享数据结构,相当于我可以换出的队列?
【参考方案1】:
def process(item):
data = do_processing(item)
return data
queue = Queue() # not a multiprocessing queue
with multiprocessing.Pool(processes=os.cpu_count()) as pool:
for result in pool.imap(process, records):
queue.put(result)
while not queue.empty():
save(queue.get())
【讨论】:
感谢您提供此代码 sn-p,它可能会提供一些有限的即时帮助。 proper explanation 将通过展示为什么这是解决问题的好方法,并使其对有其他类似问题的未来读者更有用,从而大大提高其长期价值。请编辑您的答案以添加一些解释,包括您所做的假设。 @martineau:我不想重复上面已经写过的所有内容。我以为 OP 知道发生了什么。 探索:上面的代码将对记录的每个元素调用进程。它完全按照原始代码所做的工作。你需要额外的东西吗? @Exploring。我故意不使用 Multprocessing.Queue()。它是昂贵的。相反,主线程上只有一个队列。当每个结果返回时,它被主线程放在一个队列中。 我的意思是 OP 不是唯一的观众(假设他们什么都懂)。人们也不应该在挑选你的问题下通过一堆 cmets 来理解代码。【参考方案2】:我认为这就是如何做你想做的事。正如comment 中提到的,每个进程都在自己的内存空间中运行,因此不能简单地共享队列等全局变量,也不能将其作为参数传递给每个进程。
使用ProcessPoolExecutor
时,您可以有效地完成所需的操作——共享队列——通过定义将在每个进程开始时调用的初始化函数,该函数将为该进程创建一个全局变量并将队列作为it 的参数。
这里有一些与您的代码非常相似并且实际上可以运行的东西,说明了我的意思:
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Queue
import os
MAX_RECORDS = 10
def load_records():
return dict.fromkeys(range(MAX_RECORDS), 0)
def do_processing(item):
return item
def init_queue(queue):
globals()['queue'] = queue # Makes queue a global in each process.
def produce(i, item):
data = process(i, item)
queue.put(data)
def process(i, item):
data = do_processing(item)
return data
if __name__ == '__main__':
records = load_records()
queue = Queue()
with ProcessPoolExecutor(max_workers=os.cpu_count(),
initializer=init_queue, initargs=(queue,)) as executor:
print('producing items')
for i, item in enumerate(records.items()):
future = executor.submit(produce, i, item)
print('done producing items')
print('queue size: '.format(queue.qsize()))
while not queue.empty():
print(queue.get())
输出:
producing items
done producing items
queue size: 10
(0, 0)
(1, 0)
(2, 0)
(3, 0)
(4, 0)
(5, 0)
(6, 0)
(7, 0)
(8, 0)
(9, 0)
【讨论】:
感谢您添加详细说明。但是,此代码与我提交的代码完全相同,尽管此版本更具可读性。我错过了什么吗? 这可能会导致死锁,队列已满,阻止子进程加入执行程序的__exit__
。 ***.com/questions/31665328/…
@Exploring:是的。在我的版本中,实际上有一个Queue
在进程之间共享。在你的每个过程中都有一个不同的过程,包括主过程。
@martineau - 在我的解决方案中,它没有创建多个队列。我正在使用multiprocessing.Queue()
。所以你的评论是不正确的。
@Exploring:不,你错了,因为你不了解 Python 中的多处理是如何工作的。每次重新导入脚本时都会执行queue = multiprocessing.Queue()
,因此每个脚本中都会有一个不同的模块级全局变量。【参考方案3】:
使用multiprocessing.Queue()
处理多处理。
queue = multiprocessing.Queue()
def produce(item):
data = process(item)
queue.put(data)
def process(item):
data = do_processing(item)
return data
if __name__ == '__main__':
records = load_records()
with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
print('produce items')
for item in records.items():
executor.submit(produce, item)
print('queue size:'.format(queue.qsize()))
while not queue.empty():
save(queue.get())
【讨论】:
以上是关于为啥队列大小保持为零?的主要内容,如果未能解决你的问题,请参考以下文章