spark shuffle partitions 和 partition by tag 如何相互配合
Posted
技术标签:
【中文标题】spark shuffle partitions 和 partition by tag 如何相互配合【英文标题】:How spark shuffle partitions and partition by tag along with each other 【发布时间】:2021-03-25 16:25:08 【问题描述】:我正在从 HDFS 读取一组 10,000 个 10 TB 累积大小的 parquet 文件,并使用以下代码以分区方式将其写回 HDFS
spark.read.orc("HDFS_LOC").repartition(col("x")).write.partitionBy("x").orc("HDFS_LOC_1")
我正在使用
spark.sql.shuffle.partitions=8000
我看到 spark 已将 5000 个不同的 "x" 分区写入 HDFS(HDFS_LOC_1) 。在整个过程中如何使用“8000”的随机分区。我看到在“x”的所有分区中只有 15,000 个文件被写入。这是否意味着spark尝试在“X”的每个分区创建8000个文件,并且在写入期间发现没有足够的数据在每个分区写入8000个文件并最终写入更少的文件?你能帮我理解一下吗?
【问题讨论】:
Spark 从 HDFS 读取数据时,生成的 DF 中创建的默认分区数取决于用于读取该文件的 Hadoop 输入格式创建的输入拆分。 【参考方案1】:spark.sql.shuffle.partitions=8000
设置将设置 Spark 程序的默认 shuffle 分区号。如果您在设置此选项后尝试执行联接或聚合,您将看到此数字生效(您可以通过df.rdd.getNumPartitions()
确认)。更多信息请参考here。
但在您的情况下,您将此设置与 repartition(col("x")
和 partitionBy("x")
一起使用。因此,如果不首先使用连接或聚合转换,您的程序将不会受到此设置的影响。 repartition
和partitionBy
的区别在于,第一个会将内存中的数据进行分区,创建cardinality("x")
个分区,而第二个会将大致相同数量的分区写入HDFS。为什么大约?好吧,因为有更多因素决定了输出文件的确切数量。请查看以下资源以更好地了解此主题:
因此,当使用按列重新分区 repartition(*cols)
或 partitionBy(*cols)
时,首先要考虑的是列(或列组合)具有的唯一值(基数)的数量。
话虽如此,如果您想确保创建 8000 个分区(即输出文件),请使用 repartition(partitionsNum, col("x"))
其中 partitionsNum == 8000 在您的情况下,然后调用 write.orc("HDFS_LOC_1")
。否则,如果您想保持分区数接近 x 的基数,只需调用 partitionBy("x")
到您的原始 df,然后调用 write.orc("HDFS_LOC_1")
将数据存储到 HDFS。这将使用您的分区数据创建cardinality(x)
文件夹。
【讨论】:
很好的解释!谢谢以上是关于spark shuffle partitions 和 partition by tag 如何相互配合的主要内容,如果未能解决你的问题,请参考以下文章
spark.sql.shuffle.partitions 的 200 个默认分区难题
spark shuffle partitions 和 partition by tag 如何相互配合
[Spark]What's the difference between spark.sql.shuffle.partitions and spark.default.parallelism?