Spark:排序和分区数据的最有效方法,以拼花形式写入
Posted
技术标签:
【中文标题】Spark:排序和分区数据的最有效方法,以拼花形式写入【英文标题】:Spark: Most efficient way to sort and partition data to be written as parquet 【发布时间】:2017-07-20 20:43:44 【问题描述】:我的数据原则上是一个表,除了其他“数据”之外,它还包含一个列ID
和一个列GROUP_ID
。
在第一步中,我将 CSV 读入 Spark,进行一些处理以准备第二步的数据,并将数据写入 parquet。
第二步做了很多groupBy('GROUP_ID')
和Window.partitionBy('GROUP_ID').orderBy('ID')
。
现在的目标是——为了避免在第二步中进行洗牌——在第一步中有效地加载数据,因为这是一个一次性计时器。
问题第 1 部分: AFAIK,Spark 在从 parquet 加载时保留分区(这实际上是任何“优化写入考虑”的基础) - 正确吗?
我想出了三种可能性:
df.orderBy('ID').write.partitionBy('TRIP_ID').parquet('/path/to/parquet')
df.orderBy('ID').repartition(n, 'TRIP_ID').write.parquet('/path/to/parquet')
df.repartition(n, 'TRIP_ID').sortWithinPartitions('ID').write.parquet('/path/to/parquet')
我会设置 n
以使各个 parquet 文件约为 100MB。
问题第 2 部分:三个选项在目标方面产生“相同”/相似的结果是否正确(避免在第二步中改组)?如果不是,有什么区别?哪个“更好”?
问题第 3 部分:三个选项中哪一个在第 1 步中表现更好?
感谢您分享您的知识!
编辑 2017-07-24
在进行了一些测试(写入和读取 parquet)之后,Spark 在第二步中似乎无法默认恢复 partitionBy
和 orderBy
信息。分区数(从df.rdd.getNumPartitions()
获得似乎取决于核心数和/或spark.default.parallelism
(如果设置),而不是拼花分区数。所以回答问题 1 将是错误,问题 2 和 3 将无关紧要。
所以事实证明真正的问题是:有没有办法告诉 Spark,数据已经按列 X 分区并按列 排序是?
【问题讨论】:
如果您使用df.repartition(n, 'TRIP_ID').write.partitionBy('TRIP_ID').parquet('/path/to/parquet')
,它将创建 n 个镶木地板文件。如果现在您的 spark.default.parallelism
和 spark.sql.shuffle.partitions
等于 n 它将避免一个随机播放阶段,但是您使用此数据集。如果您在 TRIP_ID
上有过滤子句,则谓词下推将只读取该文件。
看看这个讲座youtu.be/99fYi2mopbs你可能会得到一些有见地的提示
【参考方案1】:
您可能会对 Spark 中的分桶支持感兴趣。
在此处查看详细信息 https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-bucketing.html
large.write
.bucketBy(4, "id")
.sortBy("id")
.mode(SaveMode.Overwrite)
.saveAsTable(bucketedTableName)
注意 Spark 2.4 添加了对bucket pruning
的支持(如partition pruning
)
您正在查看的更直接的功能是 Hive 的分桶排序表 https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-BucketedSortedTables 这在 Spark 中尚不可用(请参阅下面的 PS 部分)
还要注意,Spark 不会自动加载排序信息,但由于数据已经排序.. 对它的排序操作实际上会快得多,因为不需要做太多工作 - 例如一次传递数据只是为了确认它已经排序。
PS。 Spark 和 Hive 分桶略有不同。 这是为在 Hive 中创建的分桶表在 Spark 中提供兼容性的总括票 - https://issues.apache.org/jira/browse/SPARK-19256
【讨论】:
【参考方案2】:据我所知,没有办法从 parquet 读取数据并告诉 Spark 它已经被某些表达式分区并排序。
简而言之,HDFS 等上的一个文件对于一个 Spark 分区来说太大了。即使您使用 Parquet 属性(例如 parquet.split.files=false
、parquet.task.side.metadata=true
等)将整个文件读入一个分区,与仅进行一次随机播放相比,成本也会更高。
【讨论】:
【参考方案3】:试试 bucketBy。此外,分区发现也有帮助。
【讨论】:
以上是关于Spark:排序和分区数据的最有效方法,以拼花形式写入的主要内容,如果未能解决你的问题,请参考以下文章