如何高效快速地解析大量行格式json文件
Posted
技术标签:
【中文标题】如何高效快速地解析大量行格式json文件【英文标题】:How to parse large amounts of line format json files efficiently and quickly 【发布时间】:2020-10-10 01:10:07 【问题描述】:情况:
我有来自社交媒体网站的大量数据,包含 5 个月,以数分钟的行格式 json 文件分隔。 See here for explanation
所有数据都位于具有近 400 GB 内存和 32 核、64 线程 CPU 的服务器上。我一直在使用 python 的多处理模块来利用这一点,但是,我基本上必须将 29 个进程用于解析,其余的用于提取、写入和读取/删除。性能似乎并没有比我目前的配置好多少。
我创建了一个包含 5 个主要进程的程序,这些进程扩展了 python 多处理模块。每一步都有一个流程:
extracting
:提取包含 1 天 1 分钟 json.bz2 文件的 tar 文件。
reading
:直接从bz2文件中读取,发送json dicts到解析队列
完成 tar 文件后向删除模块发送消息。
deleting
: 删除旧的 tar 文件以免占用服务器上的空间
parsing
: 解析 json 字典中的关键字以进行分类、展平、放置
进入数据帧,并发送到写入队列。
write
:根据类别合并写入队列中的数据帧。当主
数据帧达到一定大小,写入输出目录并丢弃。
问题:
获取数据的时间太长了。如果我按现在的方式运行程序而不停止,则需要 20 多周的时间来解析我真正想要的数据并将其分成 4 个大类,然后我将它们放入数据帧并写入 hdf5 文件。
我尝试过的:
我尝试获取 json,将其逐行放入字典中,如下所示:
with open('file') as f:
for line in f:
json.loads(line)
# send data to queue for parse processes
我还尝试转换为另一种数据类型以减少解析过程的一些开销:
with open('file') as f:
for line in f:
json.loads(line)
# categorize and convert to flattened dataframes and send
# to parse process
我也尝试过其他方法,例如使用 pandas Series 或 Lists 代替 python dicts,但这似乎也无济于事。
当前主要解析过程的伪代码:
while True:
while parse queue empty: wait 1 second
item = parse_queue.get
if parse_queue.get is None:
break
if item.get(keyword) is not None:
# flatten, convert to dataframe, send to write
# I do this for 4 different cases of items.
我也尝试过发送 pandas Series 或 Dataframes 而不是 python dicts。这似乎没有帮助。我已经尝试过在手前压平字典,这并没有改变任何东西。
访问 python dict 是 O(1)。读取过程显然足够快,可以以每秒 2-3k dicts 的速度创建 python dicts 并将其发送到解析队列(在过滤掉不需要的垃圾 json 之后)。那么为什么这个解析需要这么长时间呢?解析的时间大约是读取文件的 10-100 倍。
我认为会有所帮助的方法:
我相信我的问题只能通过直接从文件流中解析来解决,但是当这些 json 对象采用这样的文件格式时,我该如何对它们进行文件流处理呢?
我的主要问题:
如何使这个解析过程更快?我需要直接从文件流中读取吗?如果是这样,我该怎么做?
解决方案::
当您通过multiprocessing.Queue
发送大量商品时,它必须在发送时反序列化,然后消费者必须重新解析该商品。这大大减慢了您的处理时间。要么将大量的小对象组合成一个可以通过队列一次发送的大对象,要么考虑组合两个进程(例如,读取和解析 json),这样会快得多。
通过组合我创建的read
和parse
类解决了我之前遇到的问题后,我发现我也遇到了巨大的写入瓶颈。我通过将包含已解析 json 对象的整个文件连接到一个列表中,并通过队列将该列表发送到 write
进程来解决此问题。
关键要点:队列有很多开销并且需要大量处理时间到.get
,所以找到一种方法来最小化通过队列发送的项目数量,方法是结合以下进程:不必要地抽象或将许多对象组合成一个大对象。
【问题讨论】:
检查我的answer,可能编写诸如自定义“解析器”之类的方法将是您的解决方案。 这些队列,是带线程的进程内队列还是进程间队列?提取 - 不要让它走得太远。理想情况下,提取的文件在读取时仍在文件缓存中。由于复制现有数据,逐行附加到数据帧可能会变得昂贵。最后将列表放入数据框可能会更好。 好像你json.loads(line)
然后发送到另一个进程,这意味着它在下一个进程中再次被序列化和解析。 json.loads 过滤和处理可能应该在一个进程中。你有一个队列,你必须轮询和睡觉?使用有等待的队列。您是否消耗了所有 400 GB 内存?限制进程以避免颠簸。您有多个硬盘驱动器或 SSD 吗?尝试在它们之间传播 I/O - 例如,可能将 tar 提取到不同的驱动器。
是的,我一次只将一天的 tar 文件提取到服务器上的 NVME 驱动器。我相信每个 tar 文件都是一小时。所以我提取了 24 个 tar 文件,里面有 60 个文件夹,还有 60 个 .json.bz2 文件。读取过程可以非常快速地运行。我会在阅读后直接尝试解析。我不消耗所有 400 GB 的内存,我最多消耗 20-30。它似乎更占用 CPU,当我让它使用那么多内存时,我得到的回报非常递减。数据帧的实际合并只需要很少的时间(这是我在程序花费很长时间时的第一个想法)
嘿@tdelany 非常感谢。整整一周,我一直在用头撞墙。我对其进行了更改,以便在现场完成解析并且将其修复得很好。现在速度非常快。
【参考方案1】:
如果您只想更快地解析 json,您可以查看 ujson (https://pypi.org/project/ujson/)
而不是
import json
parsed = json.loads(line)
你会写
import ujson
parsed = ujson.loads(line)
但是,如果您非常了解 json 文件,您可能会尝试使用自定义正则表达式或自定义解析器来提取要提取的信息,但这些解决方案可能会在某些情况下中断,除非它们写得非常好,因为它们可能依赖 json 行来获得比 json 标准要求的更多的细节。
除此之外,我建议您分析您的代码以找出究竟是什么让您慢了下来。
可能值得这样绑定:
让一个进程读取一个文件,读取 n 行(例如 50)
将这 50 行的解析 (ujson.loads()
) 委托给工作池。
多处理有相当多的开销。如果你委派了非常小的任务/元素(比如只有一行),那么多处理开销可能会大于你通过分配工作负载获得的开销)
【讨论】:
以上是关于如何高效快速地解析大量行格式json文件的主要内容,如果未能解决你的问题,请参考以下文章