Python多处理队列非常慢

Posted

技术标签:

【中文标题】Python多处理队列非常慢【英文标题】:Python multiprocessing queue very slow 【发布时间】:2021-11-11 22:17:48 【问题描述】:

我正在使用多处理队列来处理我的记录。

queue = multiprocessing.Queue()

def produce(i, item):
    data = process(i, item)
    queue.put(data)

def process(item):
    data = do_processing(item)
    return data

if __name__ == '__main__':
    records = load_records()

    with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
        print('produce items')
        for i, item in ennumerate(records.items()):
            executor.submit(produce, i, item)

    print('queue size:'.format(queue.qsize()))
    while not queue.empty():
        save(queue.get())

队列最初工作得很快。但随着队列的增长,变得非常缓慢。

根据其他 SO 答案,我也尝试使用 multiprocessing.Manager().Queue(),但没有成功。

欢迎指点。

【问题讨论】:

您需要在提交时将项目从队列中拉出,这样队列就不会变得太大。您可以使用线程来执行此操作,或者在这种情况下我会说使用multiprocessing.Pool.imap(它可以为您提交任务并将结果检索到线程)。恕我直言,concurrent.futures 没有比multiprocessing.Pool 更好的产品了。 @Aaron 如果我使用multiprocessing.Pool.imap 如何获得i enumerate 返回一个迭代器,您可以调用 map 或 imap... @Aaron 你能详细说明一下答案吗?真的很感激。 我还应该指出,如果您的任务需要一段时间才能运行,并且您设法在完成处理之前清除队列,则使用 while not queue.empty() 可能会丢失结果。通常最好确切地知道你应该从队列中get 有多少项目,或者从工作人员那里发送某种哨兵以表明不会有更多数据到来。 【参考方案1】:

在您的示例中,在所有作业都提交之前,您永远不会从队列中拉出数据,这可能会花费大量时间,从而导致相当多的缓冲区被填满。

Python 的 multiprocessing.Pool.imap 使用仅保存迭代器的线程向工作人员发送输入(阅读迭代器/生成器的效率),而另一个线程在完成时收集输出,并将它们呈现给输出迭代器因此它们可以在创建时被使用,因此您永远不必缓冲大量项目(只要您读取它们的速度比生成它们的速度快)。

from multiprocessing import Pool
from time import sleep

def produce(args):
    i, item = args #unpack tuple returned by enumerate
    data = process(i, item)
    return data

def process(i, item): #added "i" missing from original post
    data = do_processing(i, item) #do something with "i"?
    return data

if __name__ == '__main__':
    records = load_records()

    with Pool() as pool:
        print('produce items')
        for result in pool.imap(produce, enumerate(records.items())):
            save(result)

【讨论】:

我无法直接保存结果,因为它们都在写入同一个文件。 @Exploring 在此示例中,save 仅在主进程中被调用,因此对文件没有重叠访问。这里没有冲突。仅仅因为它在with 上下文中并不意味着它是并行发生的。只有在produce 函数上maping 的过程是并行发生的。 所以pool.imap(produce, enumerate(records.items())) 将阻塞直到所有进程完成。这种理解正确吗? @Exploring 在for 循环中迭代结果将阻塞,直到“下一个”结果可用,所以是的,所有任务都将完成。

以上是关于Python多处理队列非常慢的主要内容,如果未能解决你的问题,请参考以下文章

一个非常简单的多线程并行 URL 获取(无队列)

泡菜转储的多处理队列问题

在 python 中填充队列和管理多处理

为啥 Python 多处理队列会弄乱字典?

python python多处理队列

等待队列填充python多处理的最佳方法