使用 Python 处理不适合内存的文件

Posted

技术标签:

【中文标题】使用 Python 处理不适合内存的文件【英文标题】:Process files that don't fit in memory with Python 【发布时间】:2019-10-23 13:01:12 【问题描述】:

我有一个包含数百万 XML 文件的大 tar 文件(总共 700GB)。这些 XML 文件有很多垃圾数据,我尝试解析它们,获取我需要的详细信息并将它们存储在 CSV 中。

我的第一步是将 tar 文件拆分为更小的(每个约 1-1.5GB)文件。现在,我需要浏览所有 tar 文件,阅读它们,获取信息并将其存储在 2 个不同的 CSV 文件中。

我的代码:

import tarfile
import csv  
import glob 
from multiprocessing import Process
import xml.etree.ElementTree as ET

def main(index, tar_file):

    tar = tarfile.open(tar_file)

    file1 = open('file1_' + str(index) + '.csv', "w")
    file2 = open('file2_' + str(index) + '.csv', "w")

    writer1 = csv.writer(file1, delimiter=',')
    writer2 = csv.writer(file2, delimiter=',')

    for member in tar:
        if member.isreg() and member.name.endswith('.xml'): # regular xml file
            with closing(tar.extractfile(member)) as xmlfile:
                root = ET.parse(xmlfile).getroot()
                if <statement>:
                    #get the data I want from root
                    writer1.writerow(<some data>)

                if <statement>:   
                    #get the data I want from root      
                    writer2.writerow(<some data>)
    workFile.close()
    peerFile.close()  
    tar.close()               

if __name__ == '__main__':

    files = [f for f in glob.glob("data/*.tar", recursive=True)]  
    procs = []
    for index, f in enumerate(files):
        proc = Process(target=main, args=(index, f,))
        procs.append(proc)
        proc.start()

    for proc in procs:
        proc.join()

我是这样做的,所以我不在内存中保留任何内容,而是逐行写入文件。但是,在运行上述代码一段时间后,我的笔记本电脑刚刚关闭。我想,代码中有一部分会填满内存。我如何处理这种情况而不需要立即阅读所有内容?

【问题讨论】:

如果内存是问题,我希望oom killer 会杀死你的进程。您的笔记本电脑关机这一事实表明存在不同的问题,并且您的系统(硬件或软件)可能无法正确处理该问题。 将 700GB 的文件拆分为 1-1.5GB 的小文件后,您仍然需要处理大约 500 个文件。您正在为每个文件启动一个新进程,因此大约有 500 个进程。在达到机器上的计算核心数(减 1)后,您将看不到任何效率提升。您正在将这么多进程塞进您的机器中,这可能是关机的原因。更好的解决方案是使用 concurrent.futures 模块中的 ProcessPoolExecutor 【参考方案1】:

目前还不清楚您的笔记本电脑关闭的原因。这可能是“内存不足”和“文件描述符不足”的错误组合(你产生了很多进程,每个进程都打开 3 个文件,是吗?)并且可能是你的操作系统中的错误或某些硬件故障。

无论哪种方式,您都可以尝试通过减少衍生进程的数量来避免它。首先,为每个文件生成一个进程没有任何好处。经验法则是:永远不要产生超过,比如说 [3 x 核心数] 并行函数(当你执行纯粹的 CPU 密集型任务时,通常只有 [核心数] 就足够了,但你确实有少量 i/o以及)。

所以不是

files = [f for f in glob.glob("data/*.tar", recursive=True)]  
procs = []
for index, f in enumerate(files):
    proc = Process(target=main, args=(index, f,))
    procs.append(proc)
    proc.start()

for proc in procs:
    proc.join()

试试这个

from multiprocessing import Pool, cpu_count
pool = Pool(2*cpu_count())  # or 3, do some empirical testing
files = [f for f in glob.glob("data/*.tar", recursive=True)]  
procs = []
for index, f in enumerate(files):
    pool.apply_async(main, (index, f,))

pool.close()
pool.join()

在此处阅读有关池的更多信息:https://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers

如果你使用的是 Python3.x 你也可以试试执行器:https://docs.python.org/3/library/concurrent.futures.html

【讨论】:

以上是关于使用 Python 处理不适合内存的文件的主要内容,如果未能解决你的问题,请参考以下文章

Apache Spark 如何处理不适合内存的数据?

Python 适合大数据量的处理吗

优化Scala代码以读取不适合内存的大文件的有效方法

Python的文件上传

Hadoop之HDFS入门实战

Hadoop之HDFS入门实战