如何有效地打开 30gb 的文件并处理其中的片段而不减慢速度?

Posted

技术标签:

【中文标题】如何有效地打开 30gb 的文件并处理其中的片段而不减慢速度?【英文标题】:How can I efficiently open 30gb of file and process pieces of it without slowing down? 【发布时间】:2019-09-12 03:06:42 【问题描述】:

我有一些大文件(超过 30gb),其中包含一些我需要对其进行计算的信息,例如求平均值。我提到的部分是文件的切片,我知道每个切片的开始行号和后续行数。

所以我有一个字典,其中键作为起始行号,值作为后续行的计数,我使用这个字典循环遍历文件并获取切片。对于每个切片,我创建一个表,进行一些转换和平均,创建一个新表并将其转换为字典。我使用 islice 进行切片,使用 pandas 数据框从每个切片创建表。

然而,随着时间的推移,过程变得越来越慢,甚至切片的大小也差不多。 前 1k 个切片 - 在 1 小时内处理 第二个 1k 切片 - 在 4 小时内处理 第三个 1k 切片 - 在 8 小时内处理 第二个 1k 切片 - 在 17 小时内处理 我正在等待几天才能完成这些过程。

现在我在 Windows 10 机器上执行此操作,1tb SSD,32 GB 内存。以前我还尝试在具有 250gb SSD 和 8gb 内存 + 8gb 虚拟内存的 Linux 机器(ubuntu 18.4)上。两者的结果大致相同。

我在 Windows 中注意到的是,17% 的 CPU 和 11% 的内存正在使用,但磁盘使用率为 100%。我不完全了解磁盘使用的含义以及如何改进它。

作为代码的一部分,我还在 Linux 上工作时将数据导入 mongodb,我想这可能是因为 mongodb 中的索引。但是当我打印处理时间和导入时间时,我注意到几乎所有时间都花在处理上,导入需要几秒钟。 另外为了争取时间,我现在正在更强大的 Windows 机器上进行处理部分,并将文档编写为 txt 文件。我希望在磁盘上写入会稍微减慢进程,但 txt 文件大小不超过 600kb。

下面是一段代码,我是如何读取文件的:

with open(infile) as inp:
    for i in range(0,len(seg_ids)): 
        inp.seek(0)
        segment_slice = islice(inp,list(seg_ids.keys())[i], (list(seg_ids.keys())[i]+list(seg_ids.values())[i]+1)) 
        segment = list(segment_slice)

        for _, line in enumerate(segment[1:]):
            #create dataframe and perform calculations

所以我想了解是否有办法改善处理时间。我想我的代码从每个切片的开头读取整个文件,并且通过文件读取结束的时间越来越长。

作为说明,由于时间限制,我从必须首先处理的最重要的切片开始。所以剩下的将是文件上的更多随机切片。所以解决方案应该适用于随机切片,如果有的话(我希望)。

我没有编写脚本的经验,所以如果我问了一个愚蠢的问题,请原谅我,但我真的找不到任何答案。

【问题讨论】:

一种方法是存储一个额外的索引文件,其中包含文件中每千行左右的字节位置,然后使用索引更快地找到合适的起始行。但这需要一些编码,实际上对于这个网站来说太宽泛了。 如果您忽略索引,为什么要调用enumeratefor line in segment[1:]:。还有很多看似不必要的列表实例化,您几乎可以肯定地按顺序读取文件,而不是每次都从头开始并让slice 反复阅读。 (仅此一项就变成了 O(n^2) 算法中可能的 O(n) 算法。) 您好 E.Ergin,欢迎来到 SO。为什么不生成mcve?这 2 个 for 循环很有可能是罪魁祸首。 【参考方案1】:

这里的问题是您从文件的开头逐行多次重读一个巨大的未索引文件。难怪这需要 HOURS。

您的代码中的每个islice 都从文件的最开头开始——每次——并在文件到达感兴趣数据的开头之前读取并丢弃文件中的所有行。这是非常缓慢和低效的。

解决方案是为该文件创建一个穷人的索引,然后为每个切片读取更小的卡盘。

让我们创建一个测试文件:

from pathlib import Path

p=Path('/tmp/file')

with open(p, 'w') as f:
    for i in range(1024*1024*500):
        f.write(f'i+1\n')
        
print(f'p is p.stat().st_size/(1024**3):.2f GB')        

这会创建一个大约 4.78 GB 的文件。没有 30 GB 大,但如果您不考虑周到,大到可以慢下来。

现在尝试使用 Unix 实用程序 wc 逐行读取整个文件以计算总行数(wc 通常是计算行数的最快方法):

$ time wc -l /tmp/file
 524288000 file

real    0m3.088s
user    0m2.452s
sys 0m0.636s

与Python3逐行读取文件并打印总数的速度对比:

$ time python3 -c 'with open("file","r") as f: print(sum(1 for l in f))'
524288000

real    0m53.880s
user    0m52.849s
sys 0m0.940s

Python 逐行读取文件的速度几乎比wc 慢 18 倍。

现在做进一步的比较。查看 Unix 实用程序 tail 打印文件最后 n 行的速度:

$ time tail -n 3 file 
524287998
524287999
524288000

real    0m0.007s
user    0m0.003s
sys 0m0.004s

tail 实用程序比 wc 快 445 倍(比 Python 快大约 8,000 倍),因为它使用了窗口索引缓冲区,所以它可以到达文件的最后三行。即,tail 在文件末尾读取一些字节数,然后从它读取的缓冲区中获取最后的n 行。

可以对您的应用程序使用相同的tail 方法。

看看这张照片:

您使用的方法相当于读取该机架上的每个磁带,以查找仅位于中间两个磁带上的数据 - 并一遍又一遍地进行......

在 1950 年代(照片时代),每个磁带都被粗略地索引为它所包含的内容。计算机会要求机架中的特定磁带——而不是机架中的所有磁带。

您的问题的解决方案(在监督中)是建立一个类似磁带的索引方案:

    运行一次 30 GB 文件,并通过块的起始行号创建子块索引。将每个子块大致视为一个磁带(除了它可以轻松运行到最后......) 不要在每次读取之前使用int.seek(0),而是寻找包含感兴趣行号的块(就像tail 所做的那样),然后使用islice 偏移调整到该块的起始行号所在的位置与文件的开头有关。 与他们在 50 年代和 60 年代必须做的相比,你有一个巨大的优势:你只需要计算起始块,因为你可以访问整个剩余的文件。 1950 年的磁带索引要求磁带 x,y,z,... 读取大于一盘磁带所能容纳的数据。您只需找到包含感兴趣的起始行号的x。 顺便说一句,由于this type 的每个 IBM 磁带大约有 3 MB,因此您的 30 GB 文件将超过 1000 万个这些磁带...

正确实现(这并不是很难做到)它将读取性能提高 100 倍或更多。

通过行偏移为文本文件构建一个有用的索引可能就像这样简单:

def index_file(p, delimiter=b'\n', block_size=1024*1024):
    index=0:0
    total_lines, cnt=(0,0)
    with open(p, 'rb') as f:
        while buf:=f.raw.read(block_size):
            cnt=buf.count(delimiter)
            idx=buf.rfind(delimiter)
            key=cnt+total_lines
            index[key]=f.tell()-(len(buf)-idx)+len(delimiter)
            total_lines+=cnt 
    
    return index

# this index is created in about 4.9 seconds on my computer...
# with a 4.8 GB file, there are ~4,800 index entries

构造一个索引,将起始行号(在该块中)与文件开头的字节偏移量相关联:

>>> idx=index_file(p)
>>> idx
0: 0, 165668: 1048571, 315465: 2097150, 465261: 3145722,
...
 524179347: 5130682368, 524284204: 5131730938, 524288000: 5131768898

那么,如果您想访问lines[524179347:524179500],则无需读取 4.5 GB 即可到达;你可以做f.seek(5130682368) 并立即开始阅读。

【讨论】:

【参考方案2】:

使用 pandas 或 dask 并注意 read_csv() 的选项。主要是:chunck_size、nrows、skirows、usecols、engine(使用C)、low_memory、memory_map

[https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html][1]

【讨论】:

【参考方案3】:

我想到了几件事。

首先,如果您将数据放入 pandas DataFrame 中,则有一个用于导入大数据的“chunksize”参数。它允许您处理/转储您需要/不需要的内容,同时证明df.describe 等信息将为您提供摘要统计信息。

另外,我听说了关于 dask 的好消息。它是一个可扩展的平台,通过并行、多核、多机处理,几乎与使用 pandas 和 numpy 一样简单,所需的资源管理非常少。

【讨论】:

我也会选择 Dask 选项。它可以处理大型数据集,并且很容易从 Pandas 切换。

以上是关于如何有效地打开 30gb 的文件并处理其中的片段而不减慢速度?的主要内容,如果未能解决你的问题,请参考以下文章

如何在hadoop map reduce作业中有效地缓存大文件?

如何有效地将压缩的 json 数据推送到 azure 事件中心并在 azure 流分析中处理?

如何有效地读取和写入太大而无法放入内存的文件?

批处理文件返回文件夹大小并删除内容(如果大于50gb)

读取带有双引号和单引号变量的 CSV 文件

使用 Python 有效地查找部分字符串匹配 --> 从 5 GB 文件中的值列表开始的值