处理大文件的最快方法?

Posted

技术标签:

【中文标题】处理大文件的最快方法?【英文标题】:Fastest way to process a large file? 【发布时间】:2015-07-29 10:09:12 【问题描述】:

我有多个 3 GB 制表符分隔文件。每个文件中有 2000 万行。所有行都必须独立处理,任何两行之间没有关系。我的问题是,什么会更快?

    逐行阅读?

    with open() as infile:
        for line in infile:
    

    将文件分块读入内存并进行处理,比如一次 250 MB?

处理不是很复杂,我只是将column1中的值抓取到List1,column2到List2等。可能需要将一些列值添加在一起。

我在具有 30GB 内存的 linux 机器上使用 python 2.7。 ASCII 文本。

有什么方法可以并行加速?现在我使用的是前一种方法,过程很慢。使用任何CSVReader 模块会有所帮助吗? 我不必在 python 中做,欢迎任何其他语言或数据库使用想法。

【问题讨论】:

您的代码是受 I/O 限制还是受 CPU 限制?换句话说,处理是否比读取花费更多时间?如果是这样,您可能可以通过多处理来加速它;否则,您的后台进程将把所有时间都花在等待下一次读取上,而您将得不到任何好处。 同时,for line in infile: 已经在 io 模块代码(在 Python 3.1+ 中)或在下面的 C stdio 内部(在 Python 2.x 中)进行了不错的缓冲,所以除非你正在使用Python 3.0,应该没问题。但是如果你想强制它使用更大的缓冲区,你总是可以循环遍历,比如infile.readlines(65536),然后循环遍历每个块中的行。 另外,这是 2.x 还是 3.x,如果是 3.x 是哪个 3.x 版本,你在什么平台上,以及这是否是 ASCII 文本,这可能会有很大的不同或者确实需要解码的东西,所以请添加该信息。 @abarnert 充其量是“体面的”。如果他/她有足够的内存并且不关心 3GB 命中,他/她可以执行for line in infile.readlines():,这将比文件对象本身更快地迭代 @Vincenzzzochi 实际上,我个人在使用 Python 处理“大数据”方面有很多经验,如果您正确设计解决方案,它的表现会非常好;再次取决于问题的性质 CPU Bound vs. I/O Bound 或两者兼而有之。 Python 并没有真的那么慢:) 【参考方案1】:

听起来您的代码受 I/O 限制。这意味着多处理无济于事——如果您花费 90% 的时间从磁盘读取数据,那么在下一次读取时等待额外的 7 个进程也无济于事。

而且,虽然使用 CSV 读取模块(无论是 stdlib 的 csv 还是类似 NumPy 或 Pandas 的东西)可能是一个简单的好主意,但它不太可能对性能产生太大影响。

不过,值得检查您是否真的 受 I/O 限制,而不仅仅是猜测。运行您的程序并查看您的 CPU 使用率是接近 0% 还是接近 100% 或核心。执行 Amadan 在评论中建议的操作,并仅使用 pass 运行您的程序进行处理,看看是否会减少 5% 或 70% 的时间。您甚至可能想尝试与os.openos.read(1024*1024) 上的循环进行比较,看看是否更快。


自从您使用 Python 2.x 以来,Python 依靠 C stdio 库来猜测一次要缓冲多少,因此可能值得强制它缓冲更多。最简单的方法是对一些大的bufsize 使用readlines(bufsize)。 (您可以尝试不同的数字并测量它们以查看峰值在哪里。根据我的经验,通常 64K-8MB 的任何东西都差不多,但取决于您的系统,可能会有所不同——尤其是如果您正在阅读关闭具有高吞吐量但可怕的延迟的网络文件系统,淹没了实际物理驱动器的吞吐量与延迟以及操作系统所做的缓存。)

所以,例如:

bufsize = 65536
with open(path) as infile: 
    while True:
        lines = infile.readlines(bufsize)
        if not lines:
            break
        for line in lines:
            process(line)

同时,假设您在 64 位系统上,您可能想尝试使用 mmap 而不是首先读取文件。这当然不是保证会更好,但它可能会更好,具体取决于您的系统。例如:

with open(path) as infile:
    m = mmap.mmap(infile, 0, access=mmap.ACCESS_READ)

Python mmap 是一种奇怪的对象——它同时像 strfile,因此您可以手动迭代扫描换行符,或者您可以调用 @ 987654335@ 就好像它是一个文件一样。与将文件作为行迭代或批处理 readlines 相比,这两者都需要从 Python 进行更多的处理(因为 C 中的循环现在在纯 Python 中......尽管也许你可以使用 re 或使用一个简单的 Cython 扩展?)……但操作系统的 I/O 优势知道您正在使用映射做什么,这可能会淹没 CPU 的劣势。

不幸的是,Python 没有公开 madvise 调用,您可以使用它来调整事物以尝试在 C 中优化它(例如,显式设置 MADV_SEQUENTIAL 而不是让内核猜测,或强制透明巨大页)——但您实际上可以在libc 之外使用ctypes 函数。

【讨论】:

我在 linux 机器上有 30 GB 的内存。执行 readlines() 将整个文件放入内存有什么问题吗? @Reise45:这取决于您所说的“问题”是什么意思。它应该工作; 3GB 文件上的readlines 应占用 4GB 以下,如果您还将所有行预处理为内存中的值列表,则不应超过 12GB,因此您仍在舒适的范围内。但这意味着您必须预先完成所有读取操作,因此操作系统无法帮助您流水线化您的 I/O 等待和 CPU 工作;您将时间浪费在 malloc 和缓存错误上;等等。如果有一些好处(例如,它可以让你使用 NumPy 来加速一个缓慢的处理循环),那可能是值得的,但如果没有,为什么要这样做? @Reise45:同时,如果你有很多这样的文件,并且每个文件都需要 25 分钟,比如你现在正在做的事情,那就试着用另一种方式做一个看看如果它在 15 分钟内完成,或者您必须在一小时后取消它;这会告诉你比你通过猜测得到的更多的信息。 我正在使用缓冲区大小进行阅读,但是当 mem% 达到 100% 时,脚本仍然会被终止。我该如何防止呢?我需要修复读取数据的数据结构吗? @Reise45 如果您正在逐步构建一个太大而无法放入 30GB RAM 的数据结构,那么是的,这就是您的问题。在不了解您的代码的情况下,很难说出更具体的内容。【参考方案2】:

我知道这个问题很老了;但我想做类似的事情,我创建了一个简单的框架,可以帮助你并行读取和处理一个大文件。留下我尝试过的答案。

这是代码,我最后举个例子

def chunkify_file(fname, size=1024*1024*1000, skiplines=-1):
    """
    function to divide a large text file into chunks each having size ~= size so that the chunks are line aligned

    Params : 
        fname : path to the file to be chunked
        size : size of each chink is ~> this
        skiplines : number of lines in the begining to skip, -1 means don't skip any lines
    Returns : 
        start and end position of chunks in Bytes
    """
    chunks = []
    fileEnd = os.path.getsize(fname)
    with open(fname, "rb") as f:
        if(skiplines > 0):
            for i in range(skiplines):
                f.readline()

        chunkEnd = f.tell()
        count = 0
        while True:
            chunkStart = chunkEnd
            f.seek(f.tell() + size, os.SEEK_SET)
            f.readline()  # make this chunk line aligned
            chunkEnd = f.tell()
            chunks.append((chunkStart, chunkEnd - chunkStart, fname))
            count+=1

            if chunkEnd > fileEnd:
                break
    return chunks

def parallel_apply_line_by_line_chunk(chunk_data):
    """
    function to apply a function to each line in a chunk

    Params :
        chunk_data : the data for this chunk 
    Returns :
        list of the non-None results for this chunk
    """
    chunk_start, chunk_size, file_path, func_apply = chunk_data[:4]
    func_args = chunk_data[4:]

    t1 = time.time()
    chunk_res = []
    with open(file_path, "rb") as f:
        f.seek(chunk_start)
        cont = f.read(chunk_size).decode(encoding='utf-8')
        lines = cont.splitlines()

        for i,line in enumerate(lines):
            ret = func_apply(line, *func_args)
            if(ret != None):
                chunk_res.append(ret)
    return chunk_res

def parallel_apply_line_by_line(input_file_path, chunk_size_factor, num_procs, skiplines, func_apply, func_args, fout=None):
    """
    function to apply a supplied function line by line in parallel

    Params :
        input_file_path : path to input file
        chunk_size_factor : size of 1 chunk in MB
        num_procs : number of parallel processes to spawn, max used is num of available cores - 1
        skiplines : number of top lines to skip while processing
        func_apply : a function which expects a line and outputs None for lines we don't want processed
        func_args : arguments to function func_apply
        fout : do we want to output the processed lines to a file
    Returns :
        list of the non-None results obtained be processing each line
    """
    num_parallel = min(num_procs, psutil.cpu_count()) - 1

    jobs = chunkify_file(input_file_path, 1024 * 1024 * chunk_size_factor, skiplines)

    jobs = [list(x) + [func_apply] + func_args for x in jobs]

    print("Starting the parallel pool for  jobs ".format(len(jobs)))

    lines_counter = 0

    pool = mp.Pool(num_parallel, maxtasksperchild=1000)  # maxtaskperchild - if not supplied some weird happend and memory blows as the processes keep on lingering

    outputs = []
    for i in range(0, len(jobs), num_parallel):
        print("Chunk start = ", i)
        t1 = time.time()
        chunk_outputs = pool.map(parallel_apply_line_by_line_chunk, jobs[i : i + num_parallel])

        for i, subl in enumerate(chunk_outputs):
            for x in subl:
                if(fout != None):
                    print(x, file=fout)
                else:
                    outputs.append(x)
                lines_counter += 1
        del(chunk_outputs)
        gc.collect()
        print("All Done in time ", time.time() - t1)

    print("Total lines we have = ".format(lines_counter))

    pool.close()
    pool.terminate()
    return outputs

例如,我有一个文件,我想在其中计算每行中的单词数,那么每行的处理如下所示

def count_words_line(line):
    return len(line.strip().split())

然后像这样调用函数:

parallel_apply_line_by_line(input_file_path, 100, 8, 0, count_words_line, [], fout=None)

使用它,我在大小约为 20GB 的示例文件上逐行读取的速度提高了约 8 倍,在该示例文件中,我对每一行进行了一些中等复杂的处理。

【讨论】:

这种方法不会给您留下一个潜在的情况,即一行在 100 字节的块处中断并且其他行被计为不同的行?当您将文件分成字节块时,您永远不知道当前行将在哪里中断以满足该空间要求 有一个readline() 来寻找指向行尾的文件指针,以便获得行对齐的块 如果您将文件作为二进制文件读取,那么块是否重要?如果你做'rb'并不会否定\ n。如果是这样的话,你还需要担心文件的大块被截断吗?

以上是关于处理大文件的最快方法?的主要内容,如果未能解决你的问题,请参考以下文章

在本地复制大文件的最快方法

从多个文件中读取大数据并在python中聚合数据的最快方法是啥?

java读大文件最快性能

在文本文件中求和整数的最快方法

在 python 或 spark 中获取大数据缺失值的最快方法是啥?

请教Java处理大批量的数据