SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起

Posted 鸿乃江边鸟

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起相关的知识,希望对你有一定的参考价值。

背景

本文基于 SPARK 3.3.0
从一个unit test来探究SPARK Codegen的逻辑,

  test("SortAggregate should be included in WholeStageCodegen") 
    val df = spark.range(10).agg(max(col("id")), avg(col("id")))
    withSQLConf("spark.sql.test.forceApplySortAggregate" -> "true") 
      val plan = df.queryExecution.executedPlan
      assert(plan.exists(p =>
        p.isInstanceOf[WholeStageCodegenExec] &&
          p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SortAggregateExec]))
      assert(df.collect() === Array(Row(9, 4.5)))
    
  

分析

执行计划的真实面目

Spark的全代码流程,网上和代码中都有提及,如下:

  WholeStageCodegen       Plan A               FakeInput        Plan B
  =========================================================================
 
  -> execute()
      |
   doExecute() --------->   inputRDDs() -------> inputRDDs() ------> execute()
      |
      +----------------->   produce()
                              |
                           doProduce()  -------> produce()
                                                    |
                                                 doProduce()
                                                    |
                          doConsume() <--------- consume()
                              |
   doConsume()  <--------  consume()

整体逻辑都知道,道理都懂,但是里面涉及到了好多细节,就拿以上的例子来说,会生成如下的执行计划:

*(2) SortAggregate(key=[], functions=[max(id#0L), avg(id#0L)], output=[max(id)#5L, avg(id)#6])
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#13]
   +- *(1) SortAggregate(key=[], functions=[partial_max(id#0L), partial_avg(id#0L)], output=[max#12L, sum#13, count#14L])
      +- *(1) Range (0, 10, step=1, splits=2)

但是实际在缕代码的时候,你就会发现这个全代码的逻辑根本就缕不通,那是因为CollapseCodegenStages规则会加WholeStageCodegenExecInputAdapter物理计划,加了以后的计划为:

WholeStageCodegen
*(2) SortAggregate(key=[], functions=[max(id#0L), avg(id#0L)], output=[max(id)#5L, avg(id)#6])
  InputAdapter
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#13]
    WholeStageCodegen
   +- *(1) SortAggregate(key=[], functions=[partial_max(id#0L), partial_avg(id#0L)], output=[max#12L, sum#13, count#14L])
      +- *(1) Range (0, 10, step=1, splits=2)

注意:类似有*(1),*(2)这种符号的,表明是有全代码生成的,而为什么在物理计划的时候没有显示 WholeStageCodegenExecInputAdapter 计划是因为该两个计划重写了generateTreeString方法:
WholeStageCodegenExec 重写为如下:

override def generateTreeString(
      depth: Int,
      lastChildren: Seq[Boolean],
      append: String => Unit,
      verbose: Boolean,
      prefix: String = "",
      addSuffix: Boolean = false,
      maxFields: Int,
      printNodeId: Boolean,
      indent: Int = 0): Unit = 
    child.generateTreeString(
      depth,
      lastChildren,
      append,
      verbose,
      if (printNodeId) "* " else s"*($codegenStageId) ",
      false,
      maxFields,
      printNodeId,
      indent)
  

这里的*($codegenStageId) 就是上面所说的*(1),*(2),而这里的数字1和2代表者不同的两个代码生成阶段,因为Exchange不支持代码生成,所以被隔离成了两个代码生成。而*($codegenStageId) 作为子计划的前缀传递到了下游。
InputAdapter 重写为如下:

 override def generateTreeString(
      depth: Int,
      lastChildren: Seq[Boolean],
      append: String => Unit,
      verbose: Boolean,
      prefix: String = "",
      addSuffix: Boolean = false,
      maxFields: Int,
      printNodeId: Boolean,
      indent: Int = 0): Unit = 
    child.generateTreeString(
      depth,
      lastChildren,
      append,
      verbose,
      prefix = "",
      addSuffix = false,
      maxFields,
      printNodeId,
      indent)

看到这里的prefix = “”,所以单纯从执行计划看是没有任何迹象能表明存在着InputAdapter计划.

所以说,我们最后应该看到的数据流应为:
第一阶段wholeStageCodegen:

 WholeStageCodegenExec      SortAggregateExec(Partial)     RangeExec        
  =========================================================================
 
  -> execute()
      |
   doExecute() --------->   inputRDDs() -----------------> inputRDDs() 
      |
   doCodeGen()
      |
      +----------------->   produce()
                              |
                           doProduce() 
                              |
                           doProduceWithoutKeys() -------> produce()
                                                              |
                                                          doProduce()
                                                              |
                           doConsume()<------------------- consume()
                              |
                           doConsumeWithoutKeys()
                              |并不是doConsumeWithoutKeys调用consume,而是由doProduceWithoutKeys调用
   doConsume()  <--------  consume()

第二阶段wholeStageCodegen:

 WholeStageCodegenExec      SortAggregateExec(Final)      InputAdapter       ShuffleExchangeExec        
  ====================================================================================
 
  -> execute()
      |
   doExecute() --------->   inputRDDs() -----------------> inputRDDs() -------> execute()
      |                                                                            |
   doCodeGen()                                                                  doExecute()     
      |                                                                            |
      +----------------->   produce()                                           ShuffledRowRDD
                              |
                           doProduce() 
                              |
                           doProduceWithoutKeys() -------> produce()
                                                              |
                                                          doProduce()
                                                              |
                           doConsume() <------------------- consume()
                              |
                           doConsumeWithoutKeys()
                              |并不是doConsumeWithoutKeys调用consume,而是由doProduceWithoutKeys调用
   doConsume()  <--------  consume()

以上是关于SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起的主要内容,如果未能解决你的问题,请参考以下文章

SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起

SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起

SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起

SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起

SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起

SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起