如何在 Spark 2 中解压 LZ4 JSON
Posted
技术标签:
【中文标题】如何在 Spark 2 中解压 LZ4 JSON【英文标题】:How to decompress LZ4 JSON in Spark 2 【发布时间】:2017-04-05 15:01:02 【问题描述】:我已经从https://censys.io/
下载了一个xxxx.json.lz4
文件,但是当我尝试使用以下行读取文件时,我没有得到任何数据输出/计数为 0。
metadata_lz4 = spark.read.json("s3n://file.json.lz4")
虽然手动解压可以正常工作并且可以导入Spark,但它没有返回任何结果。
我也试过了
val metadata_lz4_2 = spark.sparkContext.newAPIHadoopFile("s3n://file.json.lz4", classOf[TextInputFormat], classOf[LongWritable], classOf[Text])
这也不会返回任何结果。
我有多个 100GBs
文件,所以我非常希望不必手动解压缩每个文件。
有什么想法吗?
【问题讨论】:
although decompressing manually works fine
你能告诉我如何手动执行此操作吗?
【参考方案1】:
据此open issue spark LZ4 减压器使用与标准 LZ4 减压器不同的规格。
因此,在 apache-spark 解决此问题之前,您将无法使用 spark LZ4 来解压缩标准 LZ4 压缩文件。
我不认为我们的 Lz4Codec 实现实际上使用了 FRAME 规范 (http://cyan4973.github.io/lz4/lz4_Frame_format.html) 创建基于文本的文件时。它似乎是作为编解码器添加的 用于块压缩格式,例如 SequenceFiles/HFiles/etc.,但不面向来自的文本文件 它的外观,或者是在没有 FRAME 的时候引入的 LZ4的规格。
因此,从根本上说,我们不能与 lz4 互操作 实用程序。 区别非常类似于 GPLExtras 的 LzoCodec 与。 LzopCodec,前者只是数据压缩算法,但 后者是一种可与 lzop CLI 实用程序互操作的实际框架格式。
为了使自己具有互操作性,我们需要引入一个新框架 包装编解码器,例如 LZ4FrameCodec,用户可以在 他们想要解压缩或压缩由 lz4/lz4cat CLI 实用程序。
【讨论】:
【参考方案2】:我通过这种方式实现了在 Pyspark 中解析 lz4 压缩:
import lz4.frame
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").getOrCreate()
sc = spark.sparkContext
list_paths = ['/my/file.json.lz4', '/my/beautiful/file.json.lz4']
rdd = sc.binaryFiles(','.join(list_paths))
df = rdd.map(lambda x: lz4.frame.decompress(x[1])).map(lambda x: str(x)).map(lambda x: (x, )).toDF()
这对于非复杂对象通常已经足够了。但是如果你是压缩的 JSON
解析有嵌套结构,那么在调用函数F.from_json()
之前需要对解析后的文件进行额外的清理:
schema = spark.read.json("/my/uncompressed_file.json").schema
df = df.select(F.regexp_replace(F.regexp_replace(F.regexp_replace(F.regexp_replace(F.regexp_replace("_1", "None", "null"), "False", "false"), "True", "true"), "b'", ""), "'", "").alias("json_notation"))
result_df = df.select(F.from_json("json_notation", schema)
其中/my/uncompressed_file.json
是您之前解压缩的/my/file.json.lz4
(除非您想手动提供架构,如果不太复杂的话,无论如何它都可以工作)
【讨论】:
以上是关于如何在 Spark 2 中解压 LZ4 JSON的主要内容,如果未能解决你的问题,请参考以下文章
net.jpounz.lz4 使用火花流从 kafka 读取时出现异常
如何使用 Odin 解压、修改、打包和刷写 `system.img.ext4` 文件?
SBT 测试错误:java.lang.NoSuchMethodError:net.jpountz.lz4.LZ4BlockInputStream