SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(10)

Posted 鸿乃江边鸟

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(10)相关的知识,希望对你有一定的参考价值。

背景

本文基于 SPARK 3.3.0
从一个unit test来探究SPARK Codegen的逻辑,

  test("SortAggregate should be included in WholeStageCodegen") 
    val df = spark.range(10).agg(max(col("id")), avg(col("id")))
    withSQLConf("spark.sql.test.forceApplySortAggregate" -> "true") 
      val plan = df.queryExecution.executedPlan
      assert(plan.exists(p =>
        p.isInstanceOf[WholeStageCodegenExec] &&
          p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SortAggregateExec]))
      assert(df.collect() === Array(Row(9, 4.5)))
    
  

该sql形成的执行计划第二部分的全代码生成部分如下:

WholeStageCodegen
*(2) SortAggregate(key=[], functions=[max(id#0L), avg(id#0L)], output=[max(id)#5L, avg(id)#6])
   InputAdapter
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#13]

分析

第二阶段wholeStageCodegen

第二阶段的代码生成涉及到SortAggregateExec和ShuffleExchangeExec以及InputAdapter的produce和consume方法,这里一一来分析:
第二阶段wholeStageCodegen数据流如下:

 WholeStageCodegenExec      SortAggregateExec(Final)      InputAdapter       ShuffleExchangeExec        
  ====================================================================================
 
  -> execute()
      |
   doExecute() --------->   inputRDDs() -----------------> inputRDDs() -------> execute()
      |                                                                            |
   doCodeGen()                                                                  doExecute()     
      |                                                                            |
      +----------------->   produce()                                           ShuffledRowRDD
                              |
                           doProduce() 
                              |
                           doProduceWithoutKeys() -------> produce()
                                                              |
                                                          doProduce()
                                                              |
                           doConsume() <------------------- consume()
                              |
                           doConsumeWithoutKeys()
                              |并不是doConsumeWithoutKeys调用consume,而是由doProduceWithoutKeys调用
   doConsume()  <--------  consume()

SortAggregateExec(Final) 的inputRDDs()

  • val rdds = child.asInstanceOf[CodegenSupport].inputRDDs()
    调用的是子类的inputRDDS,也就是SortAggregateExecinputRDDS方法,最终调用到InputAdaptorinputRDD方法:
      override def inputRDD: RDD[InternalRow] = child.execute()
    
    ,也就是调用的是ShuffleExchangeExecexecute方法:
      protected override def doExecute(): RDD[InternalRow] = 
    
// Returns the same ShuffleRowRDD if this plan is used by multiple plans.
if (cachedShuffleRDD == null) 
  cachedShuffleRDD = new ShuffledRowRDD(shuffleDependency, readMetrics)

cachedShuffleRDD
 ```
 
 这样整个链路就串联起来了。

以上是关于SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(10)的主要内容,如果未能解决你的问题,请参考以下文章

SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起

SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起

SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起

SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起

SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起

SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起