在同一个 Spark 作业中设置每个 shuffle 的 shuffle 分区数
Posted
技术标签:
【中文标题】在同一个 Spark 作业中设置每个 shuffle 的 shuffle 分区数【英文标题】:Setting number of shuffle partitions per shuffle in the same Spark job 【发布时间】:2021-03-04 13:19:47 【问题描述】:有没有办法,在 same Spark 应用程序甚至同一个作业中,为每个 shuffle 指定不同数量的 shuffle 分区,而不是适用于所有 shuffle 分区的全局数量?
换句话说,可以
spark.sql.shuffle.partitions
为每个涉及洗牌的 DataFrame 转换动态设置不同的值?
这适用于作业是大型 DAG 的场景,一些 shuffle 输出可能很小,而另一些则非常大。
谢谢!
【问题讨论】:
尝试在每次 shuffle 操作后手动执行df.repartition(n_partition)
?
感谢您的评论。 repartition() 不会创建额外的改组吗?我的重点是性能。
不确定,我在想查询计划器可能会解释你想在 shuffle 之后更改分区数......也许你可以检查查询计划的样子,如果你把 .repartition
这是一个很好的观点——从我在这里看到的来看,它没有优化:jaceklaskowski.gitbooks.io/mastering-spark-sql/content/… 但现在我注意到 repartition() 在改组之前被调用了。因此,如果在改组之后,优化器仍有可能启动。
我认为实现您所要求的唯一方法是事先手动打乱数据帧。这样,Spark 将检测到数据帧已经对齐,并且不会再进行 shuffle。例如,您将df1.join(df2, "id")
重写为df1.repartition(10, df1("id")).join(df2.repartition(10, df2("id")), "id")
。在这两种情况下,数据只被洗牌一次,唯一的区别是结果数据帧中的分区数。前者将带来spark.sql.shuffle.partitions
分区,后者 - 10 个。
【参考方案1】:
当然可以。
在 JOIN 或 Aggregation 之前发出命令sqlContext.setConf("spark.sql.shuffle.partitions", "nnn")
。但对查询的广播哈希联接方面没有影响。
试试看。
【讨论】:
以上是关于在同一个 Spark 作业中设置每个 shuffle 的 shuffle 分区数的主要内容,如果未能解决你的问题,请参考以下文章
Spark学习之路 SparkCore的调优之Shuffle调优