Python多处理安全地写入文件

Posted

技术标签:

【中文标题】Python多处理安全地写入文件【英文标题】:Python multiprocessing safely writing to a file 【发布时间】:2012-11-06 22:34:37 【问题描述】:

我正在尝试解决一个涉及大量子问题的大数值问题,并且我正在使用 Python 的多处理模块(特别是 Pool.map)将不同的独立子问题拆分到不同的核心上。每个子问题都涉及计算大量子子问题,如果尚未由任何进程计算,我试图通过将它们存储到文件中来有效地记忆这些结果,否则跳过计算并从文件中读取结果。

我遇到了文件的并发问题:不同的进程有时会检查是否已经计算了子子问题(通过查找将存储结果的文件),看到它没有,运行计算,然后尝试将结果同时写入同一个文件。如何避免写这样的冲突?

【问题讨论】:

查看文档中使用multiprocessing.Lock 同步多个进程的示例。 您可以只有一个进程写入结果,队列作为输入,可以由其他工作进程提供。我相信让所有工作进程只读是安全的。 我应该提到,为了让事情变得更复杂,我在一个集群上同时运行多个不同的主要问题,每个问题都将结果写入同一个网络上的子子问题文件系统。因此,我可以从完全在不同机器上运行的进程中获得冲突(所以我认为使用 multiprocessing.Lock 之类的解决方案不会起作用)。 如果您的网络文件系统支持文件锁定,您可以使用操作系统特定的文件创建方法来独占创建文件并对其保持独占锁定,直到结果准备好,然后关闭它。任何未能“赢得”创建竞赛的进程都会尝试打开它并重试(有延迟),直到能够打开它,然后他们才能读取结果。 您实际上是在对数据库服务器进行编程。您是否考虑过使用现有的? 【参考方案1】:

@GP89 ​​提到了一个很好的解决方案。使用队列将写入任务发送到对文件具有唯一写入权限的专用进程。所有其他工作人员都具有只读访问权限。这将消除碰撞。这是一个使用 apply_async 的示例,但它也适用于 map:

import multiprocessing as mp
import time

fn = 'c:/temp/temp.txt'

def worker(arg, q):
    '''stupidly simulates long running process'''
    start = time.clock()
    s = 'this is a test'
    txt = s
    for i in range(200000):
        txt += s 
    done = time.clock() - start
    with open(fn, 'rb') as f:
        size = len(f.read())
    res = 'Process' + str(arg), str(size), done
    q.put(res)
    return res

def listener(q):
    '''listens for messages on the q, writes to file. '''

    with open(fn, 'w') as f:
        while 1:
            m = q.get()
            if m == 'kill':
                f.write('killed')
                break
            f.write(str(m) + '\n')
            f.flush()

def main():
    #must use Manager queue here, or will not work
    manager = mp.Manager()
    q = manager.Queue()    
    pool = mp.Pool(mp.cpu_count() + 2)

    #put listener to work first
    watcher = pool.apply_async(listener, (q,))

    #fire off workers
    jobs = []
    for i in range(80):
        job = pool.apply_async(worker, (i, q))
        jobs.append(job)

    # collect results from the workers through the pool result queue
    for job in jobs: 
        job.get()

    #now we are done, kill the listener
    q.put('kill')
    pool.close()
    pool.join()

if __name__ == "__main__":
   main()

【讨论】:

嗨,迈克,感谢您的回答。我认为这适用于我所说的问题,但我不太确定它是否能解决问题的 cmets 中概述的全部问题,特别是我如何在联网的多台机器上运行几个主要程序文件系统,所有这些都可能具有将尝试写入同一文件的进程。 (FWIW,我不久前以一种 hacky 的方式解决了我的个人问题,但如果其他人有类似的问题,我会发表评论。) 我真的很想多次投票。这对我有很多帮助。今天再来一次。 我必须在pool.close() 下方添加一个pool.join()。否则,我的工作人员会在侦听器之前完成,并且该过程将停止。 当消费者人数大大超过并导致内存问题时怎么办?你将如何实现多个消费者都写入同一个文件? 设置进程数时为什么要mp.cpu_count() + 2【参考方案2】:

在我看来,您需要使用Manager 将结果临时保存到列表中,然后将列表中的结果写入文件。另外,使用starmap 传递您要处理的对象和托管列表。第一步是构建要传递给starmap的参数,其中包括托管列表。

from multiprocessing import Manager
from multiprocessing import Pool  
import pandas as pd

def worker(row, param):
    # do something here and then append it to row
    x = param**2
    row.append(x)

if __name__ == '__main__':
    pool_parameter = [] # list of objects to process
    with Manager() as mgr:
        row = mgr.list([])

        # build list of parameters to send to starmap
        for param in pool_parameter:
            params.append([row,param])

        with Pool() as p:
            p.starmap(worker, params)

从这一点开始,您需要决定如何处理列表。如果您有大量 RAM 和庞大的数据集,请随意使用 pandas 进行连接。然后,您可以非常轻松地将文件保存为 csv 或 pickle。

        df = pd.concat(row, ignore_index=True)

        df.to_pickle('data.pickle')
        df.to_csv('data.csv')

【讨论】:

我能得到一些反馈,说明为什么这被否决了吗?我看到接受的答案要好得多。我只是想学习。 这里的参数是什么?我看不到它在任何地方被初始化。另外,mgr.list([]),它会是一个空列表吗?您正在将元组行和参数附加到参数,参数包含要处理的对象,行包含什么? 它可能会被否决,因为在您的代码中,所有进程输出都存储在内存中,这并不能解决问题。 OP 询问在处理时将每个进程输出写入文件。这里要解决的主要问题是避免碰撞,例如多个进程试图同时访问该文件。

以上是关于Python多处理安全地写入文件的主要内容,如果未能解决你的问题,请参考以下文章

如何使Python多处理池工作以写入相同的日志文件

Python:使用多处理池时使用队列写入单个文件

如何在python中使用多处理将df的内容写入csv文件

Python多处理脚本部分输出

python日志打印和写入并发简易版本实现

如何使用 PHP 安全地将 JSON 数据写入文件