逐行处理非常大 (>20GB) 的文本文件
Posted
技术标签:
【中文标题】逐行处理非常大 (>20GB) 的文本文件【英文标题】:Process very large (>20GB) text file line by line 【发布时间】:2013-05-16 04:28:11 【问题描述】:我有许多非常大的文本文件需要处理,最大的大约 60GB。
每行在七个字段中有 54 个字符,我想从前三个字段中的每个字段中删除最后三个字符 - 这应该会减少大约 20% 的文件大小。
我是 Python 的新手,我有一个代码可以以每小时 3.4 GB 的速度完成我想做的事情,但要成为一项有价值的练习,我确实需要至少 10 GB/小时 - 有没有加快速度的方法?这段代码并没有接近挑战我的处理器,所以我猜测它受到内部硬盘驱动器读写速度的限制?
def ProcessLargeTextFile():
r = open("filepath", "r")
w = open("filepath", "w")
l = r.readline()
while l:
x = l.split(' ')[0]
y = l.split(' ')[1]
z = l.split(' ')[2]
w.write(l.replace(x,x[:-3]).replace(y,y[:-3]).replace(z,z[:-3]))
l = r.readline()
r.close()
w.close()
任何帮助将不胜感激。我在 Windows 7 上使用 IDLE Python GUI 并拥有 16GB 内存 - 也许不同的操作系统会更高效?
编辑:这是要处理的文件的摘录。
70700.642014 31207.277115 -0.054123 -1585 255 255 255
70512.301468 31227.990799 -0.255600 -1655 155 158 158
70515.727097 31223.828659 -0.066727 -1734 191 187 180
70566.756699 31217.065598 -0.205673 -1727 254 255 255
70566.695938 31218.030807 -0.047928 -1689 249 251 249
70536.117874 31227.837662 -0.033096 -1548 251 252 252
70536.773270 31212.970322 -0.115891 -1434 155 158 163
70533.530777 31215.270828 -0.154770 -1550 148 152 156
70533.555923 31215.341599 -0.138809 -1480 150 154 158
【问题讨论】:
如果您使用 Python 2.7 编写,您可以尝试在 PyPy 上运行。即时编译器可以为您的字段改组提供性能加速,但我不确定如果文件系统是瓶颈,这将有多大帮助。 你能给我们一个文件的小sn-p吗? 【参考方案1】:您可以尝试先保存拆分结果,而不是每次需要字段时都保存。可能这会加快速度。
您也可以尝试不在 gui 中运行它。在cmd中运行。
【讨论】:
【参考方案2】:使用for l in r:
读取文件以从缓冲中受益。
【讨论】:
【参考方案3】:这样写代码更习惯
def ProcessLargeTextFile():
with open("filepath", "r") as r, open("outfilepath", "w") as w:
for line in r:
x, y, z = line.split(' ')[:3]
w.write(line.replace(x,x[:-3]).replace(y,y[:-3]).replace(z,z[:-3]))
这里的主要节省是只执行一次split
,但如果 CPU 没有被征税,这可能几乎没有什么区别
它可能有助于一次保存几千行并将它们写入一次以减少硬盘驱动器的抖动。一百万行只有 54MB 的 RAM!
def ProcessLargeTextFile():
bunchsize = 1000000 # Experiment with different sizes
bunch = []
with open("filepath", "r") as r, open("outfilepath", "w") as w:
for line in r:
x, y, z = line.split(' ')[:3]
bunch.append(line.replace(x,x[:-3]).replace(y,y[:-3]).replace(z,z[:-3]))
if len(bunch) == bunchsize:
w.writelines(bunch)
bunch = []
w.writelines(bunch)
@Janne 建议,另一种生成线条的方法
def ProcessLargeTextFile():
bunchsize = 1000000 # Experiment with different sizes
bunch = []
with open("filepath", "r") as r, open("outfilepath", "w") as w:
for line in r:
x, y, z, rest = line.split(' ', 3)
bunch.append(' '.join((x[:-3], y[:-3], z[:-3], rest)))
if len(bunch) == bunchsize:
w.writelines(bunch)
bunch = []
w.writelines(bunch)
【讨论】:
如果行的大小恒定,您可以尝试以更大的块读取/写入文件... @rootfor
不应该在那个(和其他)情况下做缓冲吗?
@glglgl -- 好吧,它可以同时对数千行进行替换操作......(不确定哪种方式最快 - 也许是正则表达式?)跨度>
@root,每行替换不同。无论如何,OP 似乎不受 CPU 限制
如果我理解要求,您可以使用write(x[:-3]+' '+y[:-3]+' '+z[:-3]+'\n')
代替replace
链。【参考方案4】:
那些看起来像非常大的文件......为什么它们这么大?你在每行做什么处理?为什么不使用带有一些 map reduce 调用(如果合适)或简单的数据操作的数据库?数据库的意义在于抽象处理和管理无法全部放入内存的大量数据。
您可以使用sqlite3 开始尝试这个想法,它只使用平面文件作为数据库。如果你觉得这个想法有用,那么升级到更强大、更通用的东西,比如postgresql。
创建数据库
conn = sqlite3.connect('pts.db')
c = conn.cursor()
创建一个表
c.execute('''CREATE TABLE ptsdata (filename, line, x, y, z''')
然后使用上述算法之一,通过调用插入数据库中的所有线和点
c.execute("INSERT INTO ptsdata VALUES (filename, lineNumber, x, y, z)")
现在你如何使用它取决于你想做什么。例如通过查询来处理文件中的所有点
c.execute("SELECT lineNumber, x, y, z FROM ptsdata WHERE filename=file.txt ORDER BY lineNumber ASC")
并从此查询中一次获取n
行
c.fetchmany(size=n)
我确信某处有更好的 sql 语句包装器,但你明白了。
【讨论】:
谢谢 Chris,这些文件是点云信息的 .PTS 文件。每行代表笛卡尔坐标中空间中的一个不同点,这是我们从供应商处获取数据的格式以及我们的软件需要的格式。 在 3D 空间中?数据顺序重要吗?您的软件如何使用这些数据? @ChrisRaastad:Tom_b 是否请求帮助重构正在使用的系统或改进提供的代码?【参考方案5】:您的代码相当不习惯,并且进行的函数调用比需要的要多得多。一个更简单的版本是:
ProcessLargeTextFile():
with open("filepath") as r, open("output") as w:
for line in r:
fields = line.split(' ')
fields[0:2] = [fields[0][:-3],
fields[1][:-3],
fields[2][:-3]]
w.write(' '.join(fields))
而且我不知道有一个现代文件系统比 Windows慢。既然您似乎将这些庞大的数据文件用作数据库,那么您是否考虑过使用真正的数据库?
最后,如果您只是对减小文件大小感兴趣,您是否考虑过压缩/压缩文件?
【讨论】:
【参考方案6】:ProcessLargeTextFile():
r = open("filepath", "r")
w = open("filepath", "w")
l = r.readline()
while l:
正如已经建议的那样,您可能希望使用 for 循环来使其更加优化。
x = l.split(' ')[0]
y = l.split(' ')[1]
z = l.split(' ')[2]
您在这里执行了 3 次拆分操作,具体取决于每行的大小,这将对性能产生不利影响。您应该拆分一次并将 x,y,z 分配给返回的数组中的条目。
w.write(l.replace(x,x[:-3]).replace(y,y[:-3]).replace(z,z[:-3]))
您正在阅读的每一行,都在立即写入文件,这是非常 I/O 密集型的。您应该考虑将输出缓冲到内存并定期推送到磁盘。像这样的:
BUFFER_SIZE_LINES = 1024 # Maximum number of lines to buffer in memory
def ProcessLargeTextFile():
r = open("filepath", "r")
w = open("filepath", "w")
buf = ""
bufLines = 0
for lineIn in r:
x, y, z = lineIn.split(' ')[:3]
lineOut = lineIn.replace(x,x[:-3]).replace(y,y[:-3]).replace(z,z[:-3])
bufLines+=1
if bufLines >= BUFFER_SIZE:
# Flush buffer to disk
w.write(buf)
buf = ""
bufLines=1
buf += lineOut + "\n"
# Flush remaining buffer to disk
w.write(buf)
buf.close()
r.close()
w.close()
您可以调整 BUFFER_SIZE 以确定内存使用和速度之间的最佳平衡。
【讨论】:
【参考方案7】:测量!你得到了一些有用的提示,如何改进你的 python 代码,我同意他们的观点。但你应该首先弄清楚,你真正的问题是什么。我找到瓶颈的第一步是:
从您的代码中删除任何处理。只需读写数据并测量速度。如果只是读写文件太慢,那不是你的代码问题。 如果只是读写已经很慢,尝试使用多个磁盘。你在同时阅读和写作。在同一张光盘上?如果是,请尝试使用不同的光盘并重试。 一些异步 io 库(Twisted?)也可能有帮助。如果您找出了确切的问题,请再次询问该问题的优化。
【讨论】:
【参考方案8】:由于您似乎不受 CPU 限制,而是受 I/O 限制,您是否尝试过对 open
的第三个参数进行一些变化?
确实,这第三个参数可用于给出用于文件操作的缓冲区大小!
从文件中读取时,只需写入open( "filepath", "r", 16777216 )
将使用 16 MB 缓冲区。一定有帮助。
对输出文件使用相同的文件,其余部分与相同的文件进行测量/比较。
注意:这与其他人建议的优化相同,但您可以在这里免费获得它,无需更改代码,无需自己缓冲。
【讨论】:
【参考方案9】:既然你只提到节省空间是一个好处,那么你有什么理由不能只存储压缩后的文件吗?这应该可以节省 70% 甚至更多的数据。如果随机访问仍然很重要,或者考虑让 NTFS 压缩文件。完成上述任一操作后,您将大大节省 I/O 时间。
更重要的是,您仅获得 3.4GB/小时的数据在哪里?这在 USBv1 速度附近有所下降。
【讨论】:
【参考方案10】:我将添加这个答案来解释 为什么 缓冲是有意义的,并提供另一种解决方案
你的表现非常糟糕。这篇文章Is it possible to speed-up python IO? 表明,读取 10 GB 大约需要 3 分钟。顺序写入速度相同。所以你错过了 30 倍,你的性能目标仍然比应该可能的慢 10 倍。
几乎可以肯定,这种差异在于磁盘正在执行的磁头寻道次数。磁头寻道需要几毫秒。一次查找对应于几兆字节的顺序读写。非常昂贵。同一磁盘上的复制操作需要在输入和输出之间进行搜索。如前所述,减少寻道的一种方法是以这样一种方式进行缓冲,即在写入磁盘之前读取许多兆字节,反之亦然。如果你能说服 python io 系统这样做,那就太好了。否则,您可以读取行并将其处理为字符串数组,然后在大约 50 mb 的输出准备好后写入。这个大小意味着一个搜索将导致对数据传输本身的
另一种完全消除输入和输出文件之间寻道的非常简单的方法是使用具有两个物理磁盘和每个物理磁盘完全独立的 io 通道的机器。从一个输入。输出到其他。如果您要进行大量的大文件转换,最好有一台具有此功能的机器。
【讨论】:
【参考方案11】:这是加载任何大小的文本文件而不会导致内存问题的代码。它支持千兆字节大小的文件。它可以在任何类型的机器上顺利运行,您只需要根据您的系统 RAM 配置 CHUNK_SIZE。 CHUNK_SIZE越大,一次读取的数据越多
https://gist.github.com/iyvinjose/e6c1cb2821abd5f01fd1b9065cbc759d
下载文件 data_loading_utils.py 并将其导入您的代码中
用法
import data_loading_utils.py.py
file_name = 'file_name.ext'
CHUNK_SIZE = 1000000
def process_lines(line, eof, file_name):
# check if end of file reached
if not eof:
# process data, data is one single line of the file
else:
# end of file reached
data_loading_utils.read_lines_from_file_as_data_chunks(file_name, chunk_size=CHUNK_SIZE, callback=process_lines)
process_lines 方法是回调函数。它将为所有行调用,参数行一次代表文件的一行。
您可以根据您的机器硬件配置配置变量CHUNK_SIZE。
【讨论】:
我正在尝试使用您的代码,但出现NameError: name 'self' is not defined.
的错误在这种情况下,self
指的是什么对象?谢谢!
@horcle_buzz。对提出的错误表示歉意。我已经更新了代码。请检查
@IyvinJose 很棒的教程! - 有帮助!以上是关于逐行处理非常大 (>20GB) 的文本文件的主要内容,如果未能解决你的问题,请参考以下文章