为啥聚合的 Spark Parquet 文件比原始文件大?

Posted

技术标签:

【中文标题】为啥聚合的 Spark Parquet 文件比原始文件大?【英文标题】:Why are Spark Parquet files for an aggregate larger than the original?为什么聚合的 Spark Parquet 文件比原始文件大? 【发布时间】:2016-07-01 21:12:42 【问题描述】:

我正在尝试创建一个聚合文件供最终用户使用,以避免让他们处理具有更大文件的多个源。为此,我: A) 遍历所有源文件夹,删除最常请求的 12 个字段,在这些结果位于同一位置的新位置旋转出 parquet 文件。 B) 我尝试回顾在步骤 A 中创建的文件,并通过按 12 个字段分组来重新聚合它们,以将其减少为每个唯一组合的摘要行。

我发现步骤 A 以 5:1 的比例减少了有效负载(大约 250 gigs 变成了 48.5 gigs)。但是,步骤 B 并没有进一步减少,而是比步骤 A 增加了 50%。但是,我的计数匹配。

这是使用 Spark 1.5.2 我的代码修改为仅用 field1...field12 替换字段名称以使其更具可读性,下面是我记录的结果。

虽然我不一定期望另一个 5:1 减少,但我不知道我在为具有相同架构的较少行增加存储方面做错了什么。谁能帮我理解我做错了什么?

谢谢!

//for each eventName found in separate source folders, do the following:
//spit out one row with key fields from the original dataset for quicker availability to clients 
//results in a 5:1 reduction in size
val sqlStatement = "Select field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12, cast(1 as bigint) as rCount from table"
sqlContext.sql(sqlCommand).coalesce(20).write.parquet("<aws folder>" + dt + "/" + eventName + "/")
//results in over 700 files with a total of  16,969,050,506 rows consuming 48.65 gigs of storage space in S3, compressed 

//after all events are processed, aggregate the results
val sqlStatement = "Select field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12, sum(rCount) as rCount from results group by field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12"
//Use a wildcard to search all sub-folders created above
sqlContext.read.parquet("<aws folder>" + dt + "/*/").registerTempTable("results")
sqlContext.sql(sqlStatement).coalesce(20).saveAsParquetFile("<a new aws folder>" + dt + "/")
//This results in  3,295,206,761 rows with an aggregate value of 16,969,050,506 for rCount but consumes 79.32 gigs of storage space in S3, compressed

//The parquet schemas created (both tables match):
 |-- field1: string (nullable = true) (10 characters)
 |-- field2: string (nullable = true) (15 characters)
 |-- field3: string (nullable = true) (50 characters max)
 |-- field4: string (nullable = true) (10 characters)
 |-- field5: string (nullable = true) (10 characters)
 |-- field6: string (nullable = true) (10 characters)
 |-- field7: string (nullable = true) (16 characters)
 |-- field8: string (nullable = true) (10 characters)
 |-- field9 string (nullable = true)  (15 characters)
 |-- field10: string (nullable = true)(20 characters)
 |-- field11: string (nullable = true)(14 characters)
 |-- field12: string (nullable = true)(14 characters)
 |-- rCount: long (nullable = true)   
 |-- dt: string (nullable = true)

【问题讨论】:

【参考方案1】:

一般来说,Parquet 等列式存储格式对数据分布(数据组织)和各个列的基数非常敏感。数据越有条理,基数越低,存储效率越高。

作为您应用的聚合,必须对数据进行洗牌。当您检查执行计划时,您会看到它正在使用哈希分区器。这意味着聚合后分布的效率可能低于原始数据的分布。同时sum 可以减少行数但增加rCount 列的基数。

您可以尝试不同的工具来解决这个问题,但并非所有工具都在 Spark 1.5.2 中可用:

按具有低基数的列(由于完全洗牌而相当昂贵)或sortWithinPartitions 对完整数据集进行排序。 使用DataFrameWriterpartitionBy 方法使用低基数列对数据进行分区。 使用DataFrameWriter (Spark 2.0.0+) 的bucketBysortBy 方法,通过分桶和本地排序改进数据分布。

【讨论】:

bucketBy 似乎无法与 Spark 2.0.0 中的 DataFrameWriter 一起使用 但是为什么基于低基数的排序有助于压缩呢?如果我理解正确,排序有助于运行长度编码,而相对较低的基数列是使用字典编码压缩的——那么为什么排序很重要?除非...将这些值组合在一起有助于推断基数较低,如果它们被分布,则可能不会发生这种推断(这是一个问题,而不是一个陈述)

以上是关于为啥聚合的 Spark Parquet 文件比原始文件大?的主要内容,如果未能解决你的问题,请参考以下文章

为啥 Apache Spark 会读取嵌套结构中不必要的 Parquet 列?

为啥 Spark 不能自动检测 Parquet 文件中的新字段?

为啥 spark.read.parquet() 运行 2 个作业?

为啥 spark 中的 sample 和减去方法会给出这样的结果

Spark + Parquet + Snappy:spark shuffle 数据后整体压缩率下降

有没有办法优化 spark sql 代码?