在python中批处理非常大的文本文件

Posted

技术标签:

【中文标题】在python中批处理非常大的文本文件【英文标题】:Batching very large text file in python 【发布时间】:2020-06-15 18:10:15 【问题描述】:

我正在尝试将一个非常大的文本文件(大约 150 GB)批处理成几个较小的文本文件(大约 10 GB)。

我的一般流程是:

# iterate over file one line at a time
# accumulate batch as string 
--> # given a certain count that correlates to the size of my current accumulated batch and when that size is met: (this is where I am unsure)
        # write to file

# accumulate size count

我有一个粗略的指标来计算 when 到批处理(当所需的批处理大小时),但我不太清楚我应该如何计算给定批处理写入磁盘的频率。例如,如果我的批处理大小为 10 GB,我假设我需要迭代写入而不是将整个 10 GB 批处理保存在内存中。我显然不想写太多,因为这可能非常昂贵。

您是否有任何粗略的计算或技巧,您喜欢用来确定何时写入磁盘以执行诸如此类的任务,例如大小与内存还是什么?

【问题讨论】:

为什么要缓冲?你不能一次写一行吗? 写每一行不是很贵吗? mmapit 让操作系统自己解决。 您可以使用更大的buffering 参数打开二进制文件,例如4 兆。然后逐行读/写。您将通过操作系统磁盘缓存获得良好的性能。使用 DIRECTIO 可以更快,但使用 python 就不是那么容易了。 @sbabbi 我从来没有用过mmap,你能详细说明一下吗? 【参考方案1】:

假设您的大文件是简单的非结构化文本,即这对于像 JSON 这样的结构化文本没有好处,这是读取每一行的替代方法:读取输入文件的大二进制位,直到您的块大小,然后读取几行, 关闭当前输出文件并继续下一个。

我使用与我的代码相同的块大小调整的@tdelaney 代码将其与逐行进行了比较 - 该代码需要 250 秒才能将 12GiB 输入文件拆分为 6x2GiB 块,而这需要大约 50 秒,因此速度可能快五倍,并且看起来它在我的 SSD 上运行 >200MiB/s 读取和写入的 I/O 绑定,其中逐行运行 40-50MiB/s 读取和写入。

我关闭了缓冲,因为没有太多意义。咬合大小和缓冲设置可以调整以提高性能,还没有尝试任何其他设置,因为对我来说它似乎是 I/O 绑定的。

import time

outfile_template = "outfile-.txt"
infile_name = "large.text"
chunksize = 2_000_000_000
MEB = 2**20   # mebibyte
bitesize = 4_000_000 # the size of the reads (and writes) working up to chunksize

count = 0

starttime = time.perf_counter()

infile = open(infile_name, "rb", buffering=0)
outfile = open(outfile_template.format(count), "wb", buffering=0)

while True:
    byteswritten = 0
    while byteswritten < chunksize:
        bite = infile.read(bitesize)
        # check for EOF
        if not bite:
            break
        outfile.write(bite)
        byteswritten += len(bite)
    # check for EOF
    if not bite:
        break
    for i in range(2):
        l = infile.readline()
        # check for EOF
        if not l:
            break
        outfile.write(l)
    # check for EOF
    if not l:
        break
    outfile.close()
    count += 1
    print( count )
    outfile = open(outfile_template.format(count), "wb", buffering=0)

outfile.close()
infile.close()

endtime = time.perf_counter()

elapsed = endtime-starttime

print( f"Elapsed= elapsed" )

注意我没有详尽地测试过它不会丢失数据,尽管没有证据表明它确实会丢失任何东西,你应该自己验证。

通过检查何时在块的末尾以查看还有多少数据要读取来增加一些鲁棒性可能很有用,因此您最终不会得到最后一个输出文件的长度为 0(或比 bitize 短) )

HTH 谷仓

【讨论】:

【参考方案2】:

我使用稍微修改过的版本来解析 250GB json,我选择需要多少个较小的文件 number_of_slices 然后我找到对文件进行切片的位置(我总是寻找行尾)。最后我用file.seekfile.read(chunk)切片文件

import os
import mmap


FULL_PATH_TO_FILE = 'full_path_to_a_big_file'
OUTPUT_PATH = 'full_path_to_a_output_dir' # where sliced files will be generated


def next_newline_finder(mmapf):
    def nl_find(mmapf):
        while 1:
            current = hex(mmapf.read_byte())
            if hex(ord('\n')) == current:  # or whatever line-end symbol
                return(mmapf.tell())
    return nl_find(mmapf)


# find positions where to slice a file
file_info = os.stat(FULL_PATH_TO_FILE)
file_size = file_info.st_size
positions_for_file_slice = [0]
number_of_slices = 15  # say u want slice the big file to 15 smaller files
size_per_slice = file_size//number_of_slices

with open(FULL_PATH_TO_FILE, "r+b") as f:
    mmapf = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
    slice_counter = 1
    while slice_counter < number_of_slices:
        pos = size_per_slice*slice_counter
        mmapf.seek(pos)
        newline_pos = next_newline_finder(mmapf)
        positions_for_file_slice.append(newline_pos)
        slice_counter += 1

# create ranges for found positions (from, to)
positions_for_file_slice = [(pos, positions_for_file_slice[i+1]) if i < (len(positions_for_file_slice)-1) else (
    positions_for_file_slice[i], file_size) for i, pos in enumerate(positions_for_file_slice)]


# do actual slice of a file
with open(FULL_PATH_TO_FILE, "rb") as f:
    for i, position_pair in enumerate(positions_for_file_slice):
        read_from, read_to = position_pair
        f.seek(read_from)
        chunk = f.read(read_to-read_from)
        with open(os.path.join(OUTPUT_PATH, f'dummyfilei.json'), 'wb') as chunk_file:
            chunk_file.write(chunk)

【讨论】:

【参考方案3】:

这是一个逐行写入的示例。它以二进制模式打开以避免行解码步骤,该步骤需要一些时间,但可能会扭曲字符计数。例如,utf-8 编码可能会在磁盘上为单个 python 字符使用多个字节。

4 Meg 是对缓冲的猜测。这个想法是让操作系统一次读取更多文件,减少查找时间。这是否有效或最好使用的数字是有争议的 - 并且对于不同的操作系统会有所不同。我发现 4 meg 很重要……但那是几年前的事了,情况正在发生变化。

outfile_template = "outfile-.txt"
infile_name = "infile.txt"
chunksize = 10_000_000_000
MEB = 2**20   # mebibyte

count = 0
byteswritten = 0
infile = open(infile_name, "rb", buffering=4*MEB)
outfile = open(outfile_template.format(count), "wb", buffering=4*MEB)

try:
    for line in infile:
        if byteswritten > chunksize:
            outfile.close()
            byteswritten = 0
            count += 1
            outfile = open(outfile_template.format(count), "wb", buffering=4*MEB)
        outfile.write(line)
        byteswritten += len(line)
finally:
    infile.close()
    outfile.close()

【讨论】:

以上是关于在python中批处理非常大的文本文件的主要内容,如果未能解决你的问题,请参考以下文章

逐行处理非常大 (>20GB) 的文本文件

试图在python中读取一个非常大的文本列表[重复]

在python中逐行读取一个大的压缩文本文件

Python3读取大文件的方法

有效地读取 R 中的一个非常大的文本文件 [重复]

如果前缀是非常大的文件中的某个字符串,则在括号之间获取/匹配文本[关闭]