Spark + Parquet + Snappy:spark shuffle 数据后整体压缩率下降

Posted

技术标签:

【中文标题】Spark + Parquet + Snappy:spark shuffle 数据后整体压缩率下降【英文标题】:Spark + Parquet + Snappy: Overall compression ratio loses after spark shuffles data 【发布时间】:2018-07-28 14:17:54 【问题描述】:

社区!

请帮助我了解如何使用 Spark 获得更好的压缩比?

让我描述一下案例:

    我有数据集,我们称它为 HDFS 上的 product,它是使用 Sqoop ImportTool as-parquet-file 使用编解码器 snappy 导入的。作为导入的结果,我有 100 个文件,总 46 GB du,文件大小不同(最小 11MB,最大 1.5GB,平均 ~ 500MB)。 84 列

    的记录总数略多于 80 亿

    我也在使用 snappy 使用 Spark 进行简单的读取/重新分区/写入,结果我得到:

~100 GB 具有相同文件数、相同编解码器、相同计数和相同列的输出大小。

代码sn-p:

val productDF = spark.read.parquet("/ingest/product/20180202/22-43/")

productDF
.repartition(100)
.write.mode(org.apache.spark.sql.SaveMode.Overwrite)
.option("compression", "snappy")
.parquet("/processed/product/20180215/04-37/read_repartition_write/general")
    使用 parquet-tools 我查看了来自摄取和处理的随机文件,它们如下所示:

摄取:

creator:                        parquet-mr version 1.5.0-cdh5.11.1 (build $buildNumber) 
extra:                          parquet.avro.schema = "type":"record","name":"AutoGeneratedSchema","doc":"Sqoop import of QueryResult","fields"

and almost all columns looks like
AVAILABLE: OPTIONAL INT64 R:0 D:1

row group 1:                    RC:3640100 TS:36454739 OFFSET:4 

AVAILABLE:                       INT64 SNAPPY DO:0 FPO:172743 SZ:370515/466690/1.26 VC:3640100 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 126518400000, max: 1577692800000, num_nulls: 2541633]

已处理:

creator:                        parquet-mr version 1.5.0-cdh5.12.0 (build $buildNumber) 
extra:                          org.apache.spark.sql.parquet.row.metadata = "type":"struct","fields"

AVAILABLE:                      OPTIONAL INT64 R:0 D:1
...

row group 1:                    RC:6660100 TS:243047789 OFFSET:4 

AVAILABLE:                       INT64 SNAPPY DO:0 FPO:4122795 SZ:4283114/4690840/1.10 VC:6660100 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE ST:[min: -2209136400000, max: 10413820800000, num_nulls: 4444993]

另一方面,如果不重新分区或使用合并 - 大小仍接近摄取数据大小。

    接下来,我做了以下事情:

    读取数据集并用

    写回
    productDF
      .write.mode(org.apache.spark.sql.SaveMode.Overwrite)
      .option("compression", "none")
      .parquet("/processed/product/20180215/04-37/read_repartition_write/nonewithoutshuffle")
    

    读取数据集,重新分区并用

    写回
    productDF
      .repartition(500)
      .write.mode(org.apache.spark.sql.SaveMode.Overwrite)
      .option("compression", "none")
      .parquet("/processed/product/20180215/04-37/read_repartition_write/nonewithshuffle")
    

结果:80 GB 没有,283 GB 使用相同的输出文件重新分区

80GB parquet 元示例:

AVAILABLE:                       INT64 UNCOMPRESSED DO:0 FPO:456753 SZ:1452623/1452623/1.00 VC:11000100 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: -1735747200000, max: 2524550400000, num_nulls: 7929352]

283 GB parquet 元示例:

AVAILABLE:                       INT64 UNCOMPRESSED DO:0 FPO:2800387 SZ:2593838/2593838/1.00 VC:3510100 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: -2209136400000, max: 10413820800000, num_nulls: 2244255]

看起来,即使没有未压缩的数据,镶木地板本身(带有编码?)也大大减少了数据的大小。如何 ? :)

我尝试读取未压缩的 80GB,重新分区并写回 - 我有 283GB

我的第一个问题是为什么我在 spark 重新分区/洗牌后变得更大了?

第二个是如何在 spark 中高效地打乱数据以利于 parquet 编码/压缩(如果有的话)?

一般来说,我不希望我的数据大小在 Spark 处理后增长,即使我没有进行任何更改。

另外,我没有找到,是否有任何 可配置的压缩率 用于 snappy,例如-1 ... -9?据我所知,gzip 有这个,但是在 Spark/Parquet 编写器中控制这个速率的方法是什么?

感谢您的帮助!

谢谢!

【问题讨论】:

Why are Spark Parquet files for an aggregate larger than the original? 感谢@user8371915!现在我明白为什么大小不同了,我尝试通过我的数据集的一些(幸运地找到)列进行重新分区,结果我一直得到 80Gb 而不是 250Gb。但是第二个问题,解决这类问题的常用步骤是什么?我试图调查DataFrameStatFunctions,但我不够强大,无法发现它们有用。有人可以建议如何处理数据组织问题吗? 在我的特定数据集案例中,Sqoop 导入结果非常小~50Gb 的压缩数据,我假设因为 Sqoop 导入的分区已排序主键范围,例如第一个分区的 ids 从 1 到 100000,分区内的数据彼此更接近,并且使用 parquet 和 snappy 具有更好的编码/压缩比。 我试图找到一种方法来使用DataFrame API 归档相同的数据组织,但发现范围分区器将从***.com/questions/30995699/… 中讨论的2.3.0 开始可用。将尝试降低到 RDD 级别并实现自定义范围分区器以测试数据分布。 【参考方案1】:

当您在数据帧上调用 repartition(n) 时,您正在执行循环分区。在重新分区之前存在的任何数据局部性都消失了,熵增加了。因此,运行长度和字典编码器以及压缩编解码器实际上并没有太多可使用的地方。

所以当你重新分区时,你需要使用repartition (n, col) 版本。给它一个可以保留数据局部性的好列。

此外,由于您可能正在针对下游作业优化您的 sqooped 表,您可以sortWithinPartition 以加快扫描速度。

df.repartition(100, $"userId").sortWithinPartitions("userId").write.parquet(...)

【讨论】:

" 消失了,熵上升了" 这是什么意思?【参考方案2】:

这更多地与 parquet 压缩数据的方式有关。简而言之,如果您将 1000 行写入单个文件。考虑一个带有字符串的列,parquet 使用字典编码来存储它们。

如果1000个字符串都不同,那么就需要使用更大的字典编码(1000个key map)——通常称为大熵 如果所有 1000 个字符串都相同,那么您需要使用更小的字典(只有一个键映射)——通常称为更小的熵

由于更大的字典会导致更多的数据(更多的熵情况),因此会导致磁盘上的大小更大。

【讨论】:

以上是关于Spark + Parquet + Snappy:spark shuffle 数据后整体压缩率下降的主要内容,如果未能解决你的问题,请参考以下文章

Spark - 如何将 Bz2 文件解压缩为 parquet 文件

Spark SQL - gzip vs snappy vs lzo 压缩格式之间的区别

使用 Python 编写 Parquet 文件的方法?

设置 parquet snappy 输出文件大小是 hive?

Parquet vs ORC vs ORC with Snappy

Flink 实战系列Flink 同步 Kafka 数据到 HDFS parquet 格式存储 snappy 压缩