Python 多处理写入 csv 数据以获取大量文件

Posted

技术标签:

【中文标题】Python 多处理写入 csv 数据以获取大量文件【英文标题】:Python Multiprocessing write to csv data for huge volume files 【发布时间】:2021-11-16 23:09:14 【问题描述】:

我正在尝试使用多处理程序进行计算并将其写入另一个 txt 文件。我在输出 txt 文件中出现计数不匹配。每次执行我都会得到不同的输出计数。

我是 python 新手,请有人帮忙。

import pandas as pd
import multiprocessing as mp

source = "\\share\usr\data.txt"
target = "\\share\usr\data_masked.txt"

Chunk = 10000

def process_calc(df):
    ''' 
        get source df do calc and return newdf
        ...
    '''
 return(newdf)        
  
def calc_frame(df):
    output_df = process_calc(df)
    output_df.to_csv(target,index=None,sep='|',mode='a',header=False)

if __name__ == '__main__':
    reader= pd.read_table(source,sep='|',chunksize = chunk,encoding='ANSI')
    pool = mp.Pool(mp.cpu_count())
    jobs = []
    
    for each_df in reader:
        process = mp.Process(target=calc_frame,args=(each_df)
        jobs.append(process)
        process.start()
    
    for j in jobs:
        j.join()

【问题讨论】:

问题可能只是多个进程同时写入同一个文件。最好先写入单个文件,然后cat 将所有文件放在一起。 您发布的代码有很多编译和语义错误。这根本不可能执行。 @Booboo:我为编译和语义错误道歉,因为我是在记事本中起草的。您的解决方案很简单,并且可以按预期完美运行。非常感谢!!! 【参考方案1】:

您在发布的源代码中存在几个问题,这些问题甚至会阻止它编译,更不用说运行了。我试图纠正这些问题,以解决您的主要问题。但是请彻底检查下面的代码,以确保更正有意义。

首先,Process 构造函数的 args 参数应指定为 tuple。您指定了args=(each_df),但(each_df) 不是 tuple,它是一个简单的括号表达式;您需要 (each_df,) 来生成 tuple (该语句也缺少右括号)。

除了没有针对同时尝试追加到同一个文件的多个进程做出规定之外,您还有一个问题是,您无法确定进程完成的顺序,因此您无法真正控制执行顺序数据框将附加到 csv 文件中。

解决方案是使用带有imap method 的处理池。传递给此方法的 iterable 只是 reader,它在迭代时返回下一个要处理的数据帧。 imap 的返回值是一个 iterable,它在迭代时将返回来自calc_frame 的下一个返回值按任务提交顺序,即与数据帧的顺序相同被提交。因此,当返回这些新的、修改过的数据帧时,主进程可以简单地将它们一一附加到输出文件中:

import pandas as pd
import multiprocessing as mp

source = r"\\share\usr\data.txt"
target = r"\\share\usr\data_masked.txt"

Chunk = 10000

def process_calc(df):
    ''' 
        get source df do calc and return newdf
        ...
    '''
    return(newdf)

def calc_frame(df):
    output_df = process_calc(df)
    return output_df

if __name__ == '__main__':
    with mp.Pool() as pool:
        reader = pd.read_table(source, sep='|', chunksize=Chunk, encoding='ANSI')
        for output_df in pool.imap(process_calc, reader):
            output_df.to_csv(target, index=None, sep='|', mode='a', header=False)

【讨论】:

以上是关于Python 多处理写入 csv 数据以获取大量文件的主要内容,如果未能解决你的问题,请参考以下文章

Python实现对csv的批量处理并保存

如何在python中使用多处理将df的内容写入csv文件

django生成CSV文件

Python遥感图像处理应用篇(十七):GDAL 将归一化处理csv数据转化为多波段遥感影像

Python遥感图像处理应用篇(十七):GDAL 将归一化处理csv数据转化为多波段遥感影像

Python:以追加方式写入数据到 csv 文件中