Spark 中的 Rebalance 操作以及与Repartition操作的区别
Posted 鸿乃江边鸟
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark 中的 Rebalance 操作以及与Repartition操作的区别相关的知识,希望对你有一定的参考价值。
背景
本文基本spark 3.2.1
在Partitioning Hints Types中有提到Rebalance操作以及Repartition操作,而且他们都可以做数据的重分区,他们之间有什么区别呢?
SELECT /*+ REPARTITION(3) */ * FROM t;
SELECT /*+ REPARTITION(c) */ * FROM t;
SELECT /*+ REBALANCE */ * FROM t;
SELECT /*+ REBALANCE(c) */ * FROM t;
分析
- Rebalance
参考对应的SPARK-35725,其目的是为了在AQE阶段,根据spark.sql.adaptive.advisoryPartitionSizeInBytes
进行分区的重新分区,防止数据倾斜。再加上SPARK-35786,就可以根据hint进行重分区。
具体看看怎么实现的,OptimizeSkewInRebalancePartitions代码如下:
override val supportedShuffleOrigins: Seq[ShuffleOrigin] =
Seq(REBALANCE_PARTITIONS_BY_NONE, REBALANCE_PARTITIONS_BY_COL)
...
override def apply(plan: SparkPlan): SparkPlan =
if (!conf.getConf(SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED))
return plan
plan match
case stage: ShuffleQueryStageExec if isSupported(stage.shuffle) =>
tryOptimizeSkewedPartitions(stage)
case _ => plan
只有开启了 spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled
了的情况下,才可以进行分区的expand,而且还得shuffle的来源还得是REBALANCE_PARTITIONS_BY_NONE, REBALANCE_PARTITIONS_BY_COL的情况下才能适用该规则.
tryOptimizeSkewedPartitions
的具体实现可以看代码,该代码的注释很清楚:
* We use ADVISORY_PARTITION_SIZE_IN_BYTES size to decide if a partition should be optimized.
* Let's say we have 3 maps with 3 shuffle partitions, and assuming r1 has data skew issue.
* the map side looks like:
* m0:[b0, b1, b2], m1:[b0, b1, b2], m2:[b0, b1, b2]
* and the reduce side looks like:
* (without this rule) r1[m0-b1, m1-b1, m2-b1]
* / \\
* r0:[m0-b0, m1-b0, m2-b0], r1-0:[m0-b1], r1-1:[m1-b1], r1-2:[m2-b1], r2[m0-b2, m1-b2, m2-b2]
*
* Note that, this rule is only applied with the SparkPlan whose top-level node is
* ShuffleQueryStageExec.
我们分析一下REBALANCE_PARTITIONS_BY_NONE, REBALANCE_PARTITIONS_BY_COL来源:
这是在ResolveHints规则中进行转换的:
private def createRebalance(hint: UnresolvedHint): LogicalPlan =
hint.parameters match
case partitionExprs @ Seq(_*) =>
val invalidParams = partitionExprs.filter(!_.isInstanceOf[UnresolvedAttribute])
if (invalidParams.nonEmpty)
val hintName = hint.name.toUpperCase(Locale.ROOT)
throw QueryCompilationErrors.invalidHintParameterError(hintName, invalidParams)
RebalancePartitions(partitionExprs.map(_.asInstanceOf[Expression]), hint.child)
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
_.containsPattern(UNRESOLVED_HINT), ruleId)
case hint @ UnresolvedHint(hintName, _, _) => hintName.toUpperCase(Locale.ROOT) match
case "REPARTITION" =>
createRepartition(shuffle = true, hint)
case "COALESCE" =>
createRepartition(shuffle = false, hint)
case "REPARTITION_BY_RANGE" =>
createRepartitionByRange(hint)
case "REBALANCE" if conf.adaptiveExecutionEnabled =>
createRebalance(hint)
case _ => hint
可见只有在AQE开启的情况下 该Rebalance的hint才生效,生成对应的RebalancePartitions逻辑计划,而该逻辑计划会在BasicOperators规则中,转换成ShuffleEchangeExec物理计划:
case r: logical.RebalancePartitions =>
val shuffleOrigin = if (r.partitionExpressions.isEmpty)
REBALANCE_PARTITIONS_BY_NONE
else
REBALANCE_PARTITIONS_BY_COL
exchange.ShuffleExchangeExec(r.partitioning, planLater(r.child), shuffleOrigin) :: Nil
因为只有shuffle操作的时候,AQE阶段才会应用到OptimizeSkewInRebalancePartitions
规则,这样才会在shuffle read阶段根据shuffle write阶段的数据进行优化。
注意:
其中 OptimizeShuffleWithLocalRead 不适用 shuffleOrigin为REBALANCE_PARTITIONS_BY_COL的,要不然在动态分区存在小文件的问题,具体见该处讨论
- Repartition
相对于Rebalance,该hint只是根据指定的固定的分区数据或者列进行分区,这个时候每个分区的大小并不能控制,只能说是平均分配或者说是按照列进行hash分区(这种情况存在文件大小不一的情况)
具体的分析,可以参考Rebalance的分析。
注意一点的是在SPARK-35650之后,Repartition操作也是在AQE阶段进行优化,而在SPARK-35725 之后,如果是单纯的REPARTITION hint 也是可以达到Rebalace hint的效果,因为在此处把shuffleOrigin从REPARTITION_BY_NONE改成了REBALANCE_PARTITIONS_BY_NONE了,所以也能使用于OptimizeSkewInRebalancePartitions规则。
结论
一般在reparition用到的地方都可以Rebalance来替换,而且Rebalance有更好的文件大小的控制能力,更多的信息可以查看对应的 spark-jira
以上是关于Spark 中的 Rebalance 操作以及与Repartition操作的区别的主要内容,如果未能解决你的问题,请参考以下文章
8.FLINK Transformation基本操作合并和连接拆分和选择rebalance重平衡分区其他分区操作API