Spark:排序和分区数据的最有效方法,以拼花形式写入

Posted

技术标签:

【中文标题】Spark:排序和分区数据的最有效方法,以拼花形式写入【英文标题】:Spark: Most efficient way to sort and partition data to be written as parquet 【发布时间】:2017-07-20 20:43:44 【问题描述】:

我的数据原则上是一个表,除了其他“数据”之外,它还包含一个列ID 和一个列GROUP_ID

在第一步中,我将 CSV 读入 Spark,进行一些处理以准备第二步的数据,并将数据写入 parquet。 第二步做了很多groupBy('GROUP_ID')Window.partitionBy('GROUP_ID').orderBy('ID')

现在的目标是——为了避免在第二步中进行洗牌——在第一步中有效地加载数据,因为这是一个一次性计时器。

问题第 1 部分: AFAIK,Spark 在从 parquet 加载时保留分区(这实际上是任何“优化写入考虑”的基础) - 正确吗?

我想出了三种可能性:

df.orderBy('ID').write.partitionBy('TRIP_ID').parquet('/path/to/parquet') df.orderBy('ID').repartition(n, 'TRIP_ID').write.parquet('/path/to/parquet') df.repartition(n, 'TRIP_ID').sortWithinPartitions('ID').write.parquet('/path/to/parquet')

我会设置 n 以使各个 parquet 文件约为 100MB。

问题第 2 部分:三个选项在目标方面产生“相同”/相似的结果是否正确(避免在第二步中改组)?如果不是,有什么区别?哪个“更好”?

问题第 3 部分:三个选项中哪一个在第 1 步中表现更好?

感谢您分享您的知识!


编辑 2017-07-24

在进行了一些测试(写入和读取 parquet)之后,Spark 在第二步中似乎无法默认恢复 partitionByorderBy 信息。分区数(从df.rdd.getNumPartitions() 获得似乎取决于核心数和/或spark.default.parallelism(如果设置),而不是拼花分区数。所以回答问题 1 将是错误,问题 2 和 3 将无关紧要。

所以事实证明真正的问题是:有没有办法告诉 Spark,数据已经按列 X 分区并按列 排序是

【问题讨论】:

如果您使用df.repartition(n, 'TRIP_ID').write.partitionBy('TRIP_ID').parquet('/path/to/parquet'),它将创建 n 个镶木地板文件。如果现在您的 spark.default.parallelismspark.sql.shuffle.partitions 等于 n 它将避免一个随机播放阶段,但是您使用此数据集。如果您在 TRIP_ID 上有过滤子句,则谓词下推将只读取该文件。 看看这个讲座youtu.be/99fYi2mopbs你可能会得到一些有见地的提示 【参考方案1】:

您可能会对 Spark 中的分桶支持感兴趣。

在此处查看详细信息 https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-bucketing.html

large.write
  .bucketBy(4, "id")
  .sortBy("id")
  .mode(SaveMode.Overwrite)
  .saveAsTable(bucketedTableName)

注意 Spark 2.4 添加了对bucket pruning 的支持(如partition pruning

您正在查看的更直接的功能是 Hive 的分桶排序表 https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-BucketedSortedTables 这在 Spark 中尚不可用(请参阅下面的 PS 部分)

还要注意,Spark 不会自动加载排序信息,但由于数据已经排序.. 对它的排序操作实际上会快得多,因为不需要做太多工作 - 例如一次传递数据只是为了确认它已经排序。

PS。 Spark 和 Hive 分桶略有不同。 这是为在 Hive 中创建的分桶表在 Spark 中提供兼容性的总括票 - https://issues.apache.org/jira/browse/SPARK-19256

【讨论】:

【参考方案2】:

据我所知,没有办法从 parquet 读取数据并告诉 Spark 它已经被某些表达式分区并排序。

简而言之,HDFS 等上的一个文件对于一个 Spark 分区来说太大了。即使您使用 Parquet 属性(例如 parquet.split.files=falseparquet.task.side.metadata=true 等)将整个文件读入一个分区,与仅进行一次随机播放相比,成本也会更高。

【讨论】:

【参考方案3】:

试试 bucketBy。此外,分区发现也有帮助。

【讨论】:

以上是关于Spark:排序和分区数据的最有效方法,以拼花形式写入的主要内容,如果未能解决你的问题,请参考以下文章

spark 函数

Spark高效的groupby操作-重新分区?

使用 pyarrow 从分区拼花数据集中读取特定分区

使用 SparkListener 的 Spark 输出拼花大小

在 AzureML 上保存分区拼花

从分区拼花文件中读取 DataFrame