Python:使用多处理池时使用队列写入单个文件

Posted

技术标签:

【中文标题】Python:使用多处理池时使用队列写入单个文件【英文标题】:Python: Writing to a single file with queue while using multiprocessing Pool 【发布时间】:2014-12-23 04:07:29 【问题描述】:

我有数十万个文本文件,我想以各种方式对其进行解析。我想将输出保存到单个文件而不会出现同步问题。我一直在使用多处理池来执行此操作以节省时间,但我不知道如何组合池和队列。

以下代码将保存文件名以及文件中连续“x”的最大数量。但是,我希望所有进程都将结果保存到同一个文件中,而不是像我的示例中那样保存到不同的文件中。对此的任何帮助将不胜感激。

import multiprocessing

with open('infilenamess.txt') as f:
    filenames = f.read().splitlines()

def mp_worker(filename):
 with open(filename, 'r') as f:
      text=f.read()
      m=re.findall("x+", text)
      count=len(max(m, key=len))
      outfile=open(filename+'_results.txt', 'a')
      outfile.write(str(filename)+'|'+str(count)+'\n')
      outfile.close()

def mp_handler():
    p = multiprocessing.Pool(32)
    p.map(mp_worker, filenames)

if __name__ == '__main__':
    mp_handler()

【问题讨论】:

【参考方案1】:

多处理池为您实现了一个队列。只需使用将工作人员返回值返回给调用者的池方法。 imap 运行良好:

import multiprocessing 
import re

def mp_worker(filename):
    with open(filename) as f:
        text = f.read()
    m = re.findall("x+", text)
    count = len(max(m, key=len))
    return filename, count

def mp_handler():
    p = multiprocessing.Pool(32)
    with open('infilenamess.txt') as f:
        filenames = [line for line in (l.strip() for l in f) if line]
    with open('results.txt', 'w') as f:
        for result in p.imap(mp_worker, filenames):
            # (filename, count) tuples from worker
            f.write('%s: %d\n' % result)

if __name__=='__main__':
    mp_handler()

【讨论】:

那么,我一次遍历一个结果并在它们进入时将它们写入文件?这是否意味着新的工作人员在每个“结果”都被写入之前不会启动,或者一次运行 32 个,但会等待写入?另外,您能解释一下为什么将我的 f.read().splitlines() 替换为 [line for line in (l.strip() for l in f) if line]? 32 个进程在后台运行,并在将结果传递回父进程时在“块”中获取更多文件名。结果会立即传回,因此父级并行执行其工作。逐行读取文件比读取整个文件并稍后拆分它更有效......这就是列表的用途。 太棒了!网上有很多关于这个的答案,但没有一个这么简单。向你致敬! 非常感谢!这对我来说并不完全清楚,何时何地创建池以及何时/何地将其排空到文件中。我的整个代码是这样的: Parallel(n_jobs=num_cores)(delayed(my_function)(entry) for entry in contents) 它需要一行并返回 30 行。我是否必须将所有进程的结果存储到一个列表中并写入文件?如果是,它会很慢,因为当我们最终将结果存储到列表中时,它会增长并且过程变得如此缓慢! 如果返回结果的顺序没有映射,也可以使用imap_unordered【参考方案2】:

我接受了已接受的答案并对其进行了简化,以了解其工作原理。我把它贴在这里以防它帮助别人。

import multiprocessing

def mp_worker(number):
    number += 1
    return number

def mp_handler():
    p = multiprocessing.Pool(32)
    numbers = list(range(1000))
    with open('results.txt', 'w') as f:
        for result in p.imap(mp_worker, numbers):
            f.write('%d\n' % result)

if __name__=='__main__':
    mp_handler()

【讨论】:

非常感谢!这对我来说并不完全清楚,何时何地创建池以及何时/何地将其排入文件。我的整个代码是这样的: Parallel(n_jobs=num_cores)(delayed(my_function)(entry) for entry in contents) 它需要一行并返回 30 行。我是否必须将所有进程的结果存储到一个列表中并写入文件?如果是,它会很慢,因为当我们最终将结果存储到列表中时,它会增长并且过程变得如此缓慢!【参考方案3】:

这是我使用多处理管理器对象的方法。这种方法的好处是,当处理退出管理器并在 run_multi() 函数中使用块时,文件写入器队列会自动关闭,从而使代码非常易于阅读,并且您无需尝试停止侦听队列。

from functools import partial
from multiprocessing import Manager, Pool, Queue
from random import randint
import time

def run_multi():
    input = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    with Manager() as manager:
        pool = Pool()  # By default pool will size depending on cores available
        message_queue = manager.Queue()  # Queue for sending messages to file writer listener
        pool.apply_async(file_writer, (message_queue, ))  # Start file listener ahead of doing the work
        pool.map(partial(worker, message_queue=message_queue), input)  # Partial function allows us to use map to divide workload

def worker(input: int, message_queue: Queue):
    message_queue.put(input * 10)
    time.sleep(randint(1, 5))  # Simulate hard work

def file_writer(message_queue: Queue):
    with open("demo.txt", "a") as report:
        while True:
            report.write(f"Value is: message_queue.get()\n")

if __name__ == "__main__":
    run_multi()

【讨论】:

以上是关于Python:使用多处理池时使用队列写入单个文件的主要内容,如果未能解决你的问题,请参考以下文章

如何结合python多处理和管道技术?

练习题,使用多线程编写一个简单的文本处理工具

在 python 中填充队列和管理多处理

如何在python中使用多处理将df的内容写入csv文件

Python 多处理写入 csv 数据以获取大量文件

在将输出写入文件时使用多处理显示来自子进程的实时输出