在 python 中高效处理约 5000 万条记录文件

Posted

技术标签:

【中文标题】在 python 中高效处理约 5000 万条记录文件【英文标题】:Efficiently processing ~50 million record file in python 【发布时间】:2016-06-26 21:20:44 【问题描述】:

我们有一个包含大约 4600 万条 CSV 格式记录的文件。每条记录大约有 18 个字段,其中一个是 64 字节的 ID。我们有另一个文件,其中包含大约 167K 的唯一 ID。与 ID 对应的记录需要被拉出。因此,我们编写了一个 python 程序,它将 167K ID 读取到一个数组中,并处理 4600 万条记录文件,检查 ID 是否存在于每条记录中。这是代码的sn-p:

import csv
...
csvReadHandler = csv.reader(inputFile, delimiter=chr(1))
csvWriteHandler = csv.writer(outputFile, delimiter=chr(1), lineterminator='\n')
for fieldAry in csvReadHandler:
    lineCounts['orig'] += 1
    if fieldAry[CUSTOMER_ID] not in idArray:
        csvWriteHandler.writerow(fieldAry)
        lineCounts['mod'] += 1

在一小部分数据上测试了程序,这里是处理时间:

lines: 117929 process time: 236.388447046 sec
lines: 145390 process time: 277.075321913 sec

我们已经在昨晚 3:00am EST 开始在 4600 万条记录文件(大约 13GB 大小)上运行程序,现在大约是 10am EST,它仍在处理中!

问题:

    是否有更好的方法来处理这些记录以缩短处理时间? python 是正确的选择吗? awk 或其他工具会有所帮助吗? 我猜测在以下语句中查找 167K 数组的 64 字节 ID 是罪魁祸首: if fieldAry[CUSTOMER_ID] not in idArray:

有更好的选择吗?

谢谢!

更新:这是在带有 EBS 附加卷的 EC2 实例上处理的。

【问题讨论】:

idArray的类型是什么?确保它是一些支持有效成员资格测试的数据结构,例如set 可能会比普通的 [] 列表快很多。 如何使用数据库管理系统... 1) “将 167K ID 读入一个数组” ...如果我理解正确,您的意思是列表?尝试使用 set(); 2) 为 ineCounts['orig'] 和 lineCounts['mod'] 创建 tmp 变量,在循环后将它们分配回去; 3)不确定csv.reader是如何实现的,如果yield(for row in it)产生IO操作,也许你可以先把CSV读入内存.. 这个很棘手,我已经回答了一个类似的问题,在这种情况下,用户需要检查大量数据集的时间戳(每行的开头)。我会问你同样的问题,是否有可能在数据库替代方案中获取 4500 万条记录? @FrerichRaabe idArray 是一个包含 64 字节 ID 的数组,大小约为 167K。我们将根据数组检查集合。 -谢谢 【参考方案1】:

稍微加快速度的最简单方法是将行处理与一些分布式解决方案并行化。最简单的方法是使用 multiprocessing.Pool。您应该这样做(未检查语法):

from multiprocessing import Pool

p = Pool(processes=4)
p.map(process_row, csvReadHandler)

尽管如此,python 并不是进行这种批处理的最佳语言(主要是因为写入磁盘非常慢)。最好将所有磁盘写入管理(缓冲、排队等)留给 linux 内核,这样使用 bash 解决方案会更好。最有效的方法是将输入文件拆分成块,然后简单地执行反向 grep 来过滤 id。

for file in $list_of_splitted_files; then
  cat $file | grep -v (id1|id2|id3|...) > $file.out
done;

如果你之后需要简单地合并:

for file in $(ls *.out); then
  cat $file >> final_results.csv
done

注意事项:

不知道对所有 id 执行一次 grep 是否更多/更少 比遍历所有 id 并执行单个 id 更有效 grep。 在编写并行解决方案时,尝试读取/写入不同的 文件以最小化 I/O 瓶颈(所有线程试图写入 同一个文件)所有语言。 在代码中为每个处理部分设置计时器。这样你就会看到哪个部分浪费了更多的时间。我真的推荐这个,因为我有一个类似的程序要写,我认为处理部分是瓶颈(类似于你与 ids 向量的比较),但实际上它是 I/O 正在拖动停止所有执行。

【讨论】:

这最多会增加 4 倍,但无论如何,整个事情都应该是 I/O 绑定的。 感谢您的反馈。还没有机会尝试,因为我们使用的是 EC2 的 T2 实例,并且它是 CPU 信用有限的。我们正计划分拆 c4 或 m4,您的建议可能对我们有所帮助。【参考方案2】:

应该必须使用set 而不是list之前 for 循环做:

idArray = set(idArray)

csvReadHandler = csv.reader(inputFile, delimiter=chr(1))
csvWriteHandler = csv.writer(outputFile, delimiter=chr(1), lineterminator='\n')
for fieldAry in csvReadHandler:
    lineCounts['orig'] += 1
    if fieldAry[CUSTOMER_ID] not in idArray:
        csvWriteHandler.writerow(fieldAry)
        lineCounts['mod'] += 1

并看到令人难以置信的加速;您使用了 不必要的处理时间,只是因为您选择了错误的数据结构。


in 运算符与 set 具有 O(1) 时间复杂度,而 list 具有 O(n) 时间复杂度。这可能听起来“没什么大不了”,但实际上这是您脚本中的瓶颈。尽管set 对于那个O 会有更高的常数。因此,您的代码在单个 in 操作上使用的时间比必要的多 30000 倍。如果在最佳版本中需要 30 秒,那么现在您仅在一行上就花费 10 天。

请参阅以下测试:我生成 100 万个 ID,并将 10000 个放在另一个列表中 - to_remove。然后我像你一样做一个for循环,为每条记录做in操作:

import random
import timeit

all_ids = [random.randint(1, 2**63) for i in range(1000000)]
to_remove = all_ids[:10000]
random.shuffle(to_remove)
random.shuffle(all_ids)


def test_set():
    to_remove_set = set(to_remove)
    for i in all_ids:
        if i in to_remove_set:
            pass

def test_list():
    for i in all_ids:
        if i in to_remove:
            pass


print('starting')
print('testing list', timeit.timeit(test_list, number=1))
print('testing set', timeit.timeit(test_set, number=1))

结果:

testing list 227.91903045598883
testing set 0.14897623099386692

set 版本耗时 149 毫秒; list 版本需要 228 秒。现在这些数字很小:在你的情况下,你有 5000 万条输入记录,而我的 100 万条;因此,您需要将 testing set 时间乘以 50:使用您的数据集大约需要 7.5 秒。

另一方面,列表版本需要将该时间乘以 50 * 17 - 不仅输入记录多 50 倍,要匹配的记录也多 17 倍。因此我们得到 227 * 50 * 17 = 192950。

因此,您的算法需要 2.2 天的时间来完成通过使用正确的数据结构可以在 7.5 秒内完成的事情。当然,这并不意味着您可以在 7.5 秒内扫描整个 50 GB 文档,但也可能不会超过 2.2 天。所以我们改变了:

             2 days                           2.2 days 
 |reading and writing the files||------- doing id in list ------|

类似

             2 days            7.5 seconds (doing id in set)
 |reading and writing the files||

【讨论】:

在同一组数据上尝试了“set”而不是数组。真的没有帮助。以下是处理时间:行:117929 处理时间:249.074661016 秒和行:145390 处理时间:309.015148878 秒 - 谢谢 “真的没有帮助吗?”发生了什么? "set" 确实提高了查找时间。我们的样本数据有问题,因此之前报告的数据是错误的。我们还实现了 DJB 的 cdb 查找,看起来也不错。另外发现问题的原因是我们的 EC2 实例上的默认“CPU Credit”限制。它只是加剧了整个延迟问题。刚刚完成了三个数据集的运行,一个使用“set”查找,另外两个使用 cdb。 “设置”查找存在问题,我们正在检查数据。感谢大家的反馈! 是的,我也考虑了 CPU 积分。请注意,如果您将其加载到 RDBMS 中,则 DELETE 不一定需要重写 13 GB,它只是将一些行标记为死。【参考方案3】:

免责声明: 不要在不解释原因的情况下投反对票,因为 OP 没有包括他的整个代码库或硬件/基础设施设计。但如果我在代码或逻辑中犯了严重错误,请解释它们并相应地投反对票。

让我们从定义您将遇到的瓶颈开始(有些明显,有些不明显)。

硬盘驱动器 - 它们速度慢,不会缓存大量数据 多次重新读取相同的数据 内存,你不能存储一个 13GB 的文件,或者你可以存储并且这是一个选项?

要解决这些问题,您可以走多条路线。

    一种明显有益的方法是将大数据读入数据库(例如 postgresql 或 mariadb)。但我认为目前这根本不可能。

关于 CSV 阅读器,它们很好,但可能效率不高。 由于您无论如何都要通读这两个文件,因此我会考虑以下内容:

    逐行读取 13GB 文件并将ID 保存在字典中,而不检查键/值是否存在。 (为什么?因为检查值是否存在比仅仅覆盖它要慢,而且字典还有一个额外的好处,即键是唯一的,因此重复项将被清除) 添加它到set(),正如许多其他人所描述的那样。

    然后逐行读取较小的文件并检查您的dictset 是否包含ID

dict() vs set() vs list()

下面是set()list()dict()这三种数据类型的比较:使用的代码:test.py

(11.719028949737549, 999950, 'Using dict() for inserts')
(1.8462610244750977, 'dict() time spent looking through the data')

(11.793760061264038, 999961, 'Using set() for inserts')
(7.019757986068726, 'set() time spent looking through the data')

(15min+, 'Using list()')  # Honestly, I never let it run to it's finish.. It's to slow.

如您所见,dictset 稍快,而list() 完全落后(参见Antti 的原因说明)。我应该指出我的数据有点歪斜,因为它是速度差异的快速而肮脏的演示,但总体思路应该仍然存在。

使用 dict() 作为解决方案

因此,如果您无法访问源数据的数据库版本,并且需要使用 Python,请使用以下内容:

delimiter = b'\x01'
big_source = 
with open('13_gig_source.log', 'rb') as fh:
    for line in fh:
        csv = line.split(delimiter)
        big_source[csv[4]] = None # 5:th column, change to match CUSTOMER_ID

output = open('result.log', 'wb')
with open('smaller_file.log', 'rb') as fh:
    for line in fh:
        csv = line.split(delimiter)
        if csv[4] in big_source:
            output.write(csv[4] + b'\n')
output.close()

由于我不知道数据存在于哪一列,我没有优化split()。 例如,如果它是您尝试获取的最后一列,请改为使用line.rsplit(delimiter, 1)[-1]。或者如果是 3:d 列,请执行 line.split(delimiter, 3)[2],因为它将中止在 split() 函数中查找 delimiter 的下一个位置的过程。

使用 linux 工具

是的,某些工具可能更适合此操作,例如 awk,因为它是用 C 语言编写的执行特定任务的特定工具。即使 Python 是基于 C 的,它仍然在 C 代码之上有很多抽象层,并且在很大程度上比为特定任务编写的对应 C 工具要慢。

现在我没有数据来测试这个,我也不是 PRO With Nix-CommandsPWN-C 简称。所以我会让别人给你举个例子,但我发现了这个:

merge two csv files according to matching rows and add new columns in linux

这可能会有所帮助。

【讨论】:

感谢您的反馈!【参考方案4】:

我认为最好使用数据库来解决这个问题 首先创建一个数据库,如 mysql 或其他任何东西 然后将文件中的数据写入2个表 最后使用一个简单的 sql 查询来选择行 类似于: select * from table1 where id in(select ids from table2)

【讨论】:

以上是关于在 python 中高效处理约 5000 万条记录文件的主要内容,如果未能解决你的问题,请参考以下文章

Laravel 迁移 - 为 5000 万条数据添加新列

Mysql 性能调优问题

使用 .NET listview 高效搜索数十万条记录

在 iOS 5 上实现快速高效的核心数据导入

两个大文本文件的高效文件比较

我可以将搜索引擎(solr搜索或lucene搜索)集成到Maximo中吗?