为啥队列大小保持为零?

Posted

技术标签:

【中文标题】为啥队列大小保持为零?【英文标题】:Why does the queue size remain zero?为什么队列大小保持为零? 【发布时间】:2021-11-11 21:13:53 【问题描述】:

我正在使用多重处理来处理我的记录。

queue = Queue()

def produce(i, item):
    data = process(i, item)
    queue.put(data)

def process(i, item):
    data = do_processing(i, item)
    return data

if __name__ == '__main__':
    records = load_records()

    with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
        print('produce items')
        for i, item in ennumerate(records.items()):
            executor.submit(produce, i, item)

    print('queue size:'.format(queue.qsize()))
    while not queue.empty():
        save(queue.get())

在这里,我将记录从生产中放入队列中,因为该步骤非常耗时。处理完记录后,我保存它们。由于消耗步骤不费时,我不费心在单独的线程中运行它。

在我执行代码之后,队列仍然是空的。这是怎么回事?

【问题讨论】:

大概程序在任何线程将任何东西放入队列之前就结束了。 但是我正在运行带有上下文的执行程序,即withwith ProcessPoolExecutor(max_workers=os.cpu_count()) as executor 应该等到所有内容都处理完毕。这是我的理解。 可能相关:***.com/questions/68751929/… 您使用的是多处理而不是多线程。每个进程都会创建自己的 queue() 副本。您需要使用显式共享的数据结构。或者,只需直接调用 process(),并利用各种 map 调用将返回所有结果这一事实。 @FrankYellin 是否有共享数据结构,相当于我可以换出的队列? 【参考方案1】:
def process(item):
    data = do_processing(item)
    return data

queue = Queue() # not a multiprocessing queue

with multiprocessing.Pool(processes=os.cpu_count()) as pool:
    for result in pool.imap(process, records):
        queue.put(result)

while not queue.empty():
    save(queue.get())

【讨论】:

感谢您提供此代码 sn-p,它可能会提供一些有限的即时帮助。 proper explanation 将通过展示为什么这是解决问题的好方法,并使其对有其他类似问题的未来读者更有用,从而大大提高其长期价值。请编辑您的答案以添加一些解释,包括您所做的假设。 @martineau:我不想重复上面已经写过的所有内容。我以为 OP 知道发生了什么。 探索:上面的代码将对记录的每个元素调用进程。它完全按照原始代码所做的工作。你需要额外的东西吗? @Exploring。我故意不使用 Multprocessing.Queue()。它是昂贵的。相反,主线程上只有一个队列。当每个结果返回时,它被主线程放在一个队列中。 我的意思是 OP 不是唯一的观众(假设他们什么都懂)。人们也不应该在挑选你的问题下通过一堆 cmets 来理解代码。【参考方案2】:

我认为这就是如何做你想做的事。正如comment 中提到的,每个进程都在自己的内存空间中运行,因此不能简单地共享队列等全局变量,也不能将其作为参数传递给每个进程。

使用ProcessPoolExecutor 时,您可以有效地完成所需的操作——共享队列——通过定义将在每个进程开始时调用的初始化函数,该函数将为该进程创建一个全局变量并将队列作为it 的参数。

这里有一些与您的代码非常相似并且实际上可以运行的东西,说明了我的意思:

from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Queue
import os


MAX_RECORDS = 10

def load_records():
    return dict.fromkeys(range(MAX_RECORDS), 0)

def do_processing(item):
    return item

def init_queue(queue):
    globals()['queue'] = queue  # Makes queue a global in each process.

def produce(i, item):
    data = process(i, item)
    queue.put(data)

def process(i, item):
    data = do_processing(item)
    return data


if __name__ == '__main__':
    records = load_records()

    queue = Queue()
    with ProcessPoolExecutor(max_workers=os.cpu_count(),
                             initializer=init_queue, initargs=(queue,)) as executor:
        print('producing items')
        for i, item in enumerate(records.items()):
            future = executor.submit(produce, i, item)
        print('done producing items')

    print('queue size: '.format(queue.qsize()))
    while not queue.empty():
        print(queue.get())

输出:

producing items
done producing items
queue size: 10
(0, 0)
(1, 0)
(2, 0)
(3, 0)
(4, 0)
(5, 0)
(6, 0)
(7, 0)
(8, 0)
(9, 0)

【讨论】:

感谢您添加详细说明。但是,此代码与我提交的代码完全相同,尽管此版本更具可读性。我错过了什么吗? 这可能会导致死锁,队列已满,阻止子进程加入执行程序的__exit__。 ***.com/questions/31665328/… @Exploring:是的。在我的版本中,实际上有一个Queue 在进程之间共享。在你的每个过程中都有一个不同的过程,包括主过程。 @martineau - 在我的解决方案中,它没有创建多个队列。我正在使用multiprocessing.Queue()。所以你的评论是不正确的。 @Exploring:不,你错了,因为你不了解 Python 中的多处理是如何工作的。每次重新导入脚本时都会执行queue = multiprocessing.Queue(),因此每个脚本中都会有一个不同的模块级全局变量。【参考方案3】:

使用multiprocessing.Queue() 处理多处理。

queue = multiprocessing.Queue()

def produce(item):
    data = process(item)
    queue.put(data)

def process(item):
    data = do_processing(item)
    return data

if __name__ == '__main__':
    records = load_records()

    with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
        print('produce items')
        for item in records.items():
            executor.submit(produce, item)

    print('queue size:'.format(queue.qsize()))
    while not queue.empty():
        save(queue.get())

【讨论】:

以上是关于为啥队列大小保持为零?的主要内容,如果未能解决你的问题,请参考以下文章

Keras嵌入层:将零填充值保持为零

我使用 coreData 保存数据,但对象保持为零

CSS图像父P标签高度保持为零[关闭]

OpenCV——改变图像大小

为啥锚定到 ApplicationWindow 的项目的大小为零?

为啥即使我们将指针分配给NULL,指针的指向对象的大小也不为零?