使用 Python 处理不适合内存的文件
Posted
技术标签:
【中文标题】使用 Python 处理不适合内存的文件【英文标题】:Process files that don't fit in memory with Python 【发布时间】:2019-10-23 13:01:12 【问题描述】:我有一个包含数百万 XML 文件的大 tar 文件(总共 700GB)。这些 XML 文件有很多垃圾数据,我尝试解析它们,获取我需要的详细信息并将它们存储在 CSV 中。
我的第一步是将 tar 文件拆分为更小的(每个约 1-1.5GB)文件。现在,我需要浏览所有 tar 文件,阅读它们,获取信息并将其存储在 2 个不同的 CSV 文件中。
我的代码:
import tarfile
import csv
import glob
from multiprocessing import Process
import xml.etree.ElementTree as ET
def main(index, tar_file):
tar = tarfile.open(tar_file)
file1 = open('file1_' + str(index) + '.csv', "w")
file2 = open('file2_' + str(index) + '.csv', "w")
writer1 = csv.writer(file1, delimiter=',')
writer2 = csv.writer(file2, delimiter=',')
for member in tar:
if member.isreg() and member.name.endswith('.xml'): # regular xml file
with closing(tar.extractfile(member)) as xmlfile:
root = ET.parse(xmlfile).getroot()
if <statement>:
#get the data I want from root
writer1.writerow(<some data>)
if <statement>:
#get the data I want from root
writer2.writerow(<some data>)
workFile.close()
peerFile.close()
tar.close()
if __name__ == '__main__':
files = [f for f in glob.glob("data/*.tar", recursive=True)]
procs = []
for index, f in enumerate(files):
proc = Process(target=main, args=(index, f,))
procs.append(proc)
proc.start()
for proc in procs:
proc.join()
我是这样做的,所以我不在内存中保留任何内容,而是逐行写入文件。但是,在运行上述代码一段时间后,我的笔记本电脑刚刚关闭。我想,代码中有一部分会填满内存。我如何处理这种情况而不需要立即阅读所有内容?
【问题讨论】:
如果内存是问题,我希望oom killer 会杀死你的进程。您的笔记本电脑关机这一事实表明存在不同的问题,并且您的系统(硬件或软件)可能无法正确处理该问题。 将 700GB 的文件拆分为 1-1.5GB 的小文件后,您仍然需要处理大约 500 个文件。您正在为每个文件启动一个新进程,因此大约有 500 个进程。在达到机器上的计算核心数(减 1)后,您将看不到任何效率提升。您正在将这么多进程塞进您的机器中,这可能是关机的原因。更好的解决方案是使用concurrent.futures
模块中的 ProcessPoolExecutor
。
【参考方案1】:
目前还不清楚您的笔记本电脑关闭的原因。这可能是“内存不足”和“文件描述符不足”的错误组合(你产生了很多进程,每个进程都打开 3 个文件,是吗?)并且可能是你的操作系统中的错误或某些硬件故障。
无论哪种方式,您都可以尝试通过减少衍生进程的数量来避免它。首先,为每个文件生成一个进程没有任何好处。经验法则是:永远不要产生超过,比如说 [3 x 核心数] 并行函数(当你执行纯粹的 CPU 密集型任务时,通常只有 [核心数] 就足够了,但你确实有少量 i/o以及)。
所以不是
files = [f for f in glob.glob("data/*.tar", recursive=True)]
procs = []
for index, f in enumerate(files):
proc = Process(target=main, args=(index, f,))
procs.append(proc)
proc.start()
for proc in procs:
proc.join()
试试这个
from multiprocessing import Pool, cpu_count
pool = Pool(2*cpu_count()) # or 3, do some empirical testing
files = [f for f in glob.glob("data/*.tar", recursive=True)]
procs = []
for index, f in enumerate(files):
pool.apply_async(main, (index, f,))
pool.close()
pool.join()
在此处阅读有关池的更多信息:https://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers
如果你使用的是 Python3.x 你也可以试试执行器:https://docs.python.org/3/library/concurrent.futures.html
【讨论】:
以上是关于使用 Python 处理不适合内存的文件的主要内容,如果未能解决你的问题,请参考以下文章