SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(10)
Posted 鸿乃江边鸟
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(10)相关的知识,希望对你有一定的参考价值。
背景
本文基于 SPARK 3.3.0
从一个unit test来探究SPARK Codegen的逻辑,
test("SortAggregate should be included in WholeStageCodegen")
val df = spark.range(10).agg(max(col("id")), avg(col("id")))
withSQLConf("spark.sql.test.forceApplySortAggregate" -> "true")
val plan = df.queryExecution.executedPlan
assert(plan.exists(p =>
p.isInstanceOf[WholeStageCodegenExec] &&
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SortAggregateExec]))
assert(df.collect() === Array(Row(9, 4.5)))
该sql形成的执行计划第二部分的全代码生成部分如下:
WholeStageCodegen
*(2) SortAggregate(key=[], functions=[max(id#0L), avg(id#0L)], output=[max(id)#5L, avg(id)#6])
InputAdapter
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#13]
分析
第二阶段wholeStageCodegen
第二阶段的代码生成涉及到SortAggregateExec和ShuffleExchangeExec以及InputAdapter的produce和consume方法,这里一一来分析:
第二阶段wholeStageCodegen数据流如下:
WholeStageCodegenExec SortAggregateExec(Final) InputAdapter ShuffleExchangeExec
====================================================================================
-> execute()
|
doExecute() ---------> inputRDDs() -----------------> inputRDDs() -------> execute()
| |
doCodeGen() doExecute()
| |
+-----------------> produce() ShuffledRowRDD
|
doProduce()
|
doProduceWithoutKeys() -------> produce()
|
doProduce()
|
doConsume() <------------------- consume()
|
doConsumeWithoutKeys()
|并不是doConsumeWithoutKeys调用consume,而是由doProduceWithoutKeys调用
doConsume() <-------- consume()
SortAggregateExec(Final) 的inputRDDs()
- val rdds = child.asInstanceOf[CodegenSupport].inputRDDs()
调用的是子类的inputRDDS
,也就是SortAggregateExec
的inputRDDS
方法,最终调用到InputAdaptor
的inputRDD
方法:
,也就是调用的是override def inputRDD: RDD[InternalRow] = child.execute()
ShuffleExchangeExec
的execute
方法:protected override def doExecute(): RDD[InternalRow] =
// Returns the same ShuffleRowRDD if this plan is used by multiple plans.
if (cachedShuffleRDD == null)
cachedShuffleRDD = new ShuffledRowRDD(shuffleDependency, readMetrics)
cachedShuffleRDD
```
这样整个链路就串联起来了。
以上是关于SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(10)的主要内容,如果未能解决你的问题,请参考以下文章
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起