SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起
Posted 鸿乃江边鸟
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起相关的知识,希望对你有一定的参考价值。
背景
本文基于 SPARK 3.3.0
从一个unit test来探究SPARK Codegen的逻辑,
test("SortAggregate should be included in WholeStageCodegen")
val df = spark.range(10).agg(max(col("id")), avg(col("id")))
withSQLConf("spark.sql.test.forceApplySortAggregate" -> "true")
val plan = df.queryExecution.executedPlan
assert(plan.exists(p =>
p.isInstanceOf[WholeStageCodegenExec] &&
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SortAggregateExec]))
assert(df.collect() === Array(Row(9, 4.5)))
分析
执行计划的真实面目
Spark的全代码流程,网上和代码中都有提及,如下:
WholeStageCodegen Plan A FakeInput Plan B
=========================================================================
-> execute()
|
doExecute() ---------> inputRDDs() -------> inputRDDs() ------> execute()
|
+-----------------> produce()
|
doProduce() -------> produce()
|
doProduce()
|
doConsume() <--------- consume()
|
doConsume() <-------- consume()
整体逻辑都知道,道理都懂,但是里面涉及到了好多细节,就拿以上的例子来说,会生成如下的执行计划:
*(2) SortAggregate(key=[], functions=[max(id#0L), avg(id#0L)], output=[max(id)#5L, avg(id)#6])
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#13]
+- *(1) SortAggregate(key=[], functions=[partial_max(id#0L), partial_avg(id#0L)], output=[max#12L, sum#13, count#14L])
+- *(1) Range (0, 10, step=1, splits=2)
但是实际在缕代码的时候,你就会发现这个全代码的逻辑根本就缕不通,那是因为CollapseCodegenStages规则会加WholeStageCodegenExec和InputAdapter物理计划,加了以后的计划为:
WholeStageCodegen
*(2) SortAggregate(key=[], functions=[max(id#0L), avg(id#0L)], output=[max(id)#5L, avg(id)#6])
InputAdapter
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#13]
WholeStageCodegen
+- *(1) SortAggregate(key=[], functions=[partial_max(id#0L), partial_avg(id#0L)], output=[max#12L, sum#13, count#14L])
+- *(1) Range (0, 10, step=1, splits=2)
注意:类似有*(1)
,*(2)
这种符号的,表明是有全代码生成的,而为什么在物理计划的时候没有显示 WholeStageCodegenExec 和 InputAdapter 计划是因为该两个计划重写了generateTreeString方法:
WholeStageCodegenExec
重写为如下:
override def generateTreeString(
depth: Int,
lastChildren: Seq[Boolean],
append: String => Unit,
verbose: Boolean,
prefix: String = "",
addSuffix: Boolean = false,
maxFields: Int,
printNodeId: Boolean,
indent: Int = 0): Unit =
child.generateTreeString(
depth,
lastChildren,
append,
verbose,
if (printNodeId) "* " else s"*($codegenStageId) ",
false,
maxFields,
printNodeId,
indent)
这里的*($codegenStageId)
就是上面所说的*(1)
,*(2)
,而这里的数字1和2代表者不同的两个代码生成阶段,因为Exchange
不支持代码生成,所以被隔离成了两个代码生成。而*($codegenStageId)
作为子计划的前缀传递到了下游。
InputAdapter
重写为如下:
override def generateTreeString(
depth: Int,
lastChildren: Seq[Boolean],
append: String => Unit,
verbose: Boolean,
prefix: String = "",
addSuffix: Boolean = false,
maxFields: Int,
printNodeId: Boolean,
indent: Int = 0): Unit =
child.generateTreeString(
depth,
lastChildren,
append,
verbose,
prefix = "",
addSuffix = false,
maxFields,
printNodeId,
indent)
看到这里的prefix = “”,所以单纯从执行计划看是没有任何迹象能表明存在着InputAdapter
计划.
所以说,我们最后应该看到的数据流应为:
第一阶段wholeStageCodegen:
WholeStageCodegenExec SortAggregateExec(Partial) RangeExec
=========================================================================
-> execute()
|
doExecute() ---------> inputRDDs() -----------------> inputRDDs()
|
doCodeGen()
|
+-----------------> produce()
|
doProduce()
|
doProduceWithoutKeys() -------> produce()
|
doProduce()
|
doConsume()<------------------- consume()
|
doConsumeWithoutKeys()
|并不是doConsumeWithoutKeys调用consume,而是由doProduceWithoutKeys调用
doConsume() <-------- consume()
第二阶段wholeStageCodegen:
WholeStageCodegenExec SortAggregateExec(Final) InputAdapter ShuffleExchangeExec
====================================================================================
-> execute()
|
doExecute() ---------> inputRDDs() -----------------> inputRDDs() -------> execute()
| |
doCodeGen() doExecute()
| |
+-----------------> produce() ShuffledRowRDD
|
doProduce()
|
doProduceWithoutKeys() -------> produce()
|
doProduce()
|
doConsume() <------------------- consume()
|
doConsumeWithoutKeys()
|并不是doConsumeWithoutKeys调用consume,而是由doProduceWithoutKeys调用
doConsume() <-------- consume()
以上是关于SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起的主要内容,如果未能解决你的问题,请参考以下文章
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起