读取大文本文件(约 20m 行),将函数应用于行,写入新文本文件

Posted

技术标签:

【中文标题】读取大文本文件(约 20m 行),将函数应用于行,写入新文本文件【英文标题】:Read in large text file (~20m rows), apply function to rows, write to new text file 【发布时间】:2022-01-10 03:22:39 【问题描述】:

我有一个非常大的文本文件和一个函数,它可以对每一行执行我希望它执行的操作。但是,逐行阅读并应用该功能时,大约需要三个小时。我想知道是否没有办法通过分块或多处理来加快速度。

我的代码如下所示:

with open('f.txt', 'r') as f:
    function(f,w)

函数接收大文本文件和空文本文件并应用函数并写入空文件。

我试过了:

def multiprocess(f,w):    
    cores = multiprocessing.cpu_count()

    with Pool(cores) as p:
        pieces = p.map(function,f,w)
    
    f.close()
    w.close()

multiprocess(f,w)

但是当我这样做时,我得到一个类型为“io.TextWrapper”和“int”的 TypeError

【问题讨论】:

输入文件能否完全放入您机器的内存中? (根据您的操作系统文件统计信息)在上面的第二个 sn-p 中,似乎所有内核都在尝试同时访问输入和输出文件(或句柄),对吗? 如果我理解上面的代码是如何工作的,那么是的,他们都会尝试同时访问输入和输出文件。 多处理是否可以大大加快您的处理速度取决于您正在调用的处理每一行的实际函数。如果它的 CPU 密集程度足够高,那么多处理可以在智能完成的情况下实现性能提升。还有其他因素需要考虑,例如您拥有多少内存以及哪种类型的驱动器(硬盘与固态硬盘)可能会使一种策略比另一种策略更具性能。以您提供的信息不充分,无法真正回答这个问题。 直到今天,我正在开发的系统对我来说还是一个黑盒子。从那以后我发现它有一个 Intel Xeon CPU E5-2673 v4 @ 2.30 GHz 2.29 GHz 处理器、16 个内核和 64 GB RAM。该函数正在读取每一行并确定该行是否已意外拆分为多行并将虚线连接起来。虚线是不结束的行,随后以引号开头,每一行都应该如此。这够了吗? 【参考方案1】:

即使您可以成功地将打开的文件对象作为参数fw (我认为您在任何操作系统上都不能)成功地传递给池中的子操作系统进程,也尝试同时读取和写入文件至少可以说是个坏主意。

一般情况下,我建议使用 Process 类而不是 Pool,假设输出最终结果需要保持与输入 20m 行文件相同的顺序。

https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Process

最慢的解决方案,但最有效的 RAM 使用率

逐行执行和处理文件的初始解决方案

为了最大速度,但大多数 RAM 消耗

通过f.readlines()将整个文件作为列表读取到RAM中,如果您的整个数据集可以放入内存中,舒适 计算内核数(例如 8 个内核) 将列表平均分成 8 个列表 将每个列表传递给要由 Process 实例执行的函数(此时您的 RAM 使用量将进一步增加一倍,这是最大速度的折衷),但您应该在 del 原始大列表之后释放一些内存 每个进程逐行处理其整个块,并将其写入自己的输出文件(out_file1.txt、out_file2.txt 等) 让您的操作系统将您的输出文件串联成一个大的输出文件。如果您运行的是 UNIX 系统,则可以使用 subprocess.run('cat out_file* > big_output.txt'),或用于 windows 的等效 Windows 命令。

对于速度和 RAM 之间的中间权衡,但最复杂的是,我们将不得不使用 Queue

https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Queue

计算出变量cores(比如8)中的核心数 初始化8个队列,8个进程,将每个Queue传递给每个进程。此时每个进程都应该打开自己的输出文件(outfile1.txt、outfile2.txt 等) 每个进程应轮询(并阻塞)10_000 行的块,对其进行处理,并将它们按顺序写入各自的输出文件中 在父进程的循环中,从输入的 20m 行文件中读取 10_000 * 8 行 将其拆分为多个列表(10K 块)以推送到各自的进程队列 当您完成 20m 行退出循环时,将一个特殊值传递到每个进程队列中,以表示输入数据结束 当每个进程在自己的队列中检测到特殊的数据结束值时,每个进程都应关闭其输出文件并退出 让您的操作系统将您的输出文件串联成一个大的输出文件。如果您运行的是 UNIX 系统,则可以使用 subprocess.run('cat out_file* > big_output.txt'),或用于 windows 的等效 Windows 命令。

复杂?好吧,这通常是速度、RAM、复杂性之间的权衡。同样对于 20m 行的任务,需要确保数据处理尽可能优化 - 内联尽可能多的函数,避免大量数学运算,尽可能在子进程中使用 Pandas / numpy,等等。

【讨论】:

【参考方案2】:

使用 in 进行迭代不是这种方式,但您可以一次调用多行,您只需将一个或多个相加即可读取多行,这样做程序会读取得更快。

看看这个sn-p。

# Python code to
# demonstrate readlines()
 
L = ["Geeks\n", "for\n", "Geeks\n"]
 
# writing to file
file1 = open('myfile.txt', 'w')
file1.writelines(L)
file1.close()
 
# Using readlines()
file1 = open('myfile.txt', 'r')
Lines = file1.readlines()
 
count = 0
# Strips the newline character
for line in Lines:
    count += 1
    print("Line: ".format(count, line.strip()))

我来自:https://www.geeksforgeeks.org/read-a-file-line-by-line-in-python/。

【讨论】:

那么您的意思是,没有比逐行执行更好的方法了吗? 你可以一次调用多行,你只需要将一个或多个相加就可以读取多行,这样做程序读取速度会更快。

以上是关于读取大文本文件(约 20m 行),将函数应用于行,写入新文本文件的主要内容,如果未能解决你的问题,请参考以下文章

如何根据不同 R 生态系统中的另一个向量重写代码,将函数应用于行子集?

在python中逐行读取一个大的压缩文本文件

分块读取大文本文件

本地存储

本地存储

python大文件处理