spark shuffle partitions 和 partition by tag 如何相互配合

Posted

技术标签:

【中文标题】spark shuffle partitions 和 partition by tag 如何相互配合【英文标题】:How spark shuffle partitions and partition by tag along with each other 【发布时间】:2021-03-25 16:25:08 【问题描述】:

我正在从 HDFS 读取一组 10,000 个 10 TB 累积大小的 parquet 文件,并使用以下代码以分区方式将其写回 HDFS

spark.read.orc("HDFS_LOC").repartition(col("x")).write.partitionBy("x").orc("HDFS_LOC_1")

我正在使用

spark.sql.shuffle.partitions=8000

我看到 spark 已将 5000 个不同的 "x" 分区写入 HDFS(HDFS_LOC_1) 。在整个过程中如何使用“8000”的随机分区。我看到在“x”的所有分区中只有 15,000 个文件被写入。这是否意味着spark尝试在“X”的每个分区创建8000个文件,并且在写入期间发现没有足够的数据在每个分区写入8000个文件并最终写入更少的文件?你能帮我理解一下吗?

【问题讨论】:

Spark 从 HDFS 读取数据时,生成的 DF 中创建的默认分区数取决于用于读取该文件的 Hadoop 输入格式创建的输入拆分。 【参考方案1】:

spark.sql.shuffle.partitions=8000 设置将设置 Spark 程序的默认 shuffle 分区号。如果您在设置此选项后尝试执行联接或聚合,您将看到此数字生效(您可以通过df.rdd.getNumPartitions() 确认)。更多信息请参考here。

但在您的情况下,您将此设置与 repartition(col("x")partitionBy("x") 一起使用。因此,如果不首先使用连接或聚合转换,您的程序将不会受到此设置的影响。 repartitionpartitionBy 的区别在于,第一个会将内存中的数据进行分区,创建cardinality("x") 个分区,而第二个会将大致相同数量的分区写入HDFS。为什么大约?好吧,因为有更多因素决定了输出文件的确切数量。请查看以下资源以更好地了解此主题:

Difference between df.repartition and DataFrameWriter partitionBy? pyspark: Efficiently have partitionBy write to same number of total partitions as original table

因此,当使用按列重新分区 repartition(*cols)partitionBy(*cols) 时,首先要考虑的是列(或列组合)具有的唯一值(基数)的数量。

话虽如此,如果您想确保创建 8000 个分区(即输出文件),请使用 repartition(partitionsNum, col("x")) 其中 partitionsNum == 8000 在您的情况下,然后调用 write.orc("HDFS_LOC_1")。否则,如果您想保持分区数接近 x 的基数,只需调用 partitionBy("x") 到您的原始 df,然后调用 write.orc("HDFS_LOC_1") 将数据存储到 HDFS。这将使用您的分区数据创建cardinality(x) 文件夹。

【讨论】:

很好的解释!谢谢

以上是关于spark shuffle partitions 和 partition by tag 如何相互配合的主要内容,如果未能解决你的问题,请参考以下文章

spark.sql.shuffle.partitions 的 200 个默认分区难题

Spark shuffle 机制

spark shuffle partitions 和 partition by tag 如何相互配合

[Spark]What's the difference between spark.sql.shuffle.partitions and spark.default.parallelism?

Spark--Shuffle

Spark shuffle-write 和 shuffle-read 中对数据倾斜情况的处理