SPARK闲杂--为什么复用Exchange和subquery

Posted 鸿乃江边鸟

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SPARK闲杂--为什么复用Exchange和subquery相关的知识,希望对你有一定的参考价值。

背景

本文基于Spark 3.3.0
我们在Spark代码中有时候会看到 exchangeReuseEnabledsubqueryReuseEnabled 配置,这个配置的作用是什么,结合spark源码我们分析一下

分析

exchangeReuseEnabled

PlanDynamicPruningFilters中我们可以看到:

        val canReuseExchange = conf.exchangeReuseEnabled && buildKeys.nonEmpty &&
          plan.exists 
            case BroadcastHashJoinExec(_, _, _, BuildLeft, _, left, _, _) =>
              left.sameResult(sparkPlan)
            case BroadcastHashJoinExec(_, _, _, BuildRight, _, _, right, _) =>
              right.sameResult(sparkPlan)
            case _ => false
          

        if (canReuseExchange) 
          val executedPlan = QueryExecution.prepareExecutedPlan(sparkSession, sparkPlan)
          val mode = broadcastMode(buildKeys, executedPlan.output)
          // plan a broadcast exchange of the build side of the join
          val exchange = BroadcastExchangeExec(mode, executedPlan)
          val name = s"dynamicpruning#$exprId.id"
          // place the broadcast adaptor for reusing the broadcast results on the probe side
          val broadcastValues =
            SubqueryBroadcastExec(name, broadcastKeyIndex, buildKeys, exchange)
          DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId))

只有开启了重用Exchange的情况下且存在物理计划是BroadcastHashJoinExec的情况下(只有这样才能获取最大的收益),才会进行动态分区裁剪。其实在这里并没有做过多的操作,只不过是生成了一个BroadcastExchangeExec的操作,看到这里完全没看出来重用Exchange的作用在哪里。
原因是在 Rule ReuseExchangeAndSubquery中,这里会进行exchange的替换,如果存在一样的Exchange,就会进行替换,所以以上分区裁剪中的DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId))涉及的BroadcastExchangeExec才会被复用。
但是为什么BroadcastExchangeExec复用了就会减少spark的计算呢?
还是拿
DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId))
举例:
InSubqueryExecbroadcastValues的类型是SubqueryBroadcastExec,而SubqueryBroadcastExec中的计算逻辑:

  @transient
  private lazy val relationFuture: Future[Array[InternalRow]] = 
    // relationFuture is used in "doExecute". Therefore we can get the execution id correctly here.
    val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
    Future 
      // This will run in another thread. Set the execution id so that we can connect these jobs
      // with the correct execution.
      SQLExecution.withExecutionId(session, executionId) 
        val beforeCollect = System.nanoTime()

        val broadcastRelation = child.executeBroadcast[HashedRelation]().value

可以看到这个relationFuture变量是lazy val修饰的(这样多次调动这个变量只会初始化一次,所以会较少driver端的计算量), 而relationFuture这个变量的初始化在:

  protected override def doPrepare(): Unit = 
    relationFuture
  

而doPrepare方法的调用是在方法executeQuery driver端形成RDD的时候,如下:

  protected final def executeQuery[T](query: => T): T = 
    RDDOperationScope.withScope(sparkContext, nodeName, false, true) 
      prepare()
      waitForSubqueries()
      query
    
  

而在val broadcastRelation = child.executeBroadcastHashedRelation.value,这里的child是BroadcastExchangeExec类型的,这样进而触发广播操作。

subqueryReuseEnabled

拿InjectRuntimeFilter举例,在InjectRuntimeFilter的规则中,会最终形成以下逻辑计划:

    val filter = InSubquery(Seq(mayWrapWithHash(filterApplicationSideExp)),
      ListQuery(aggregate, childOutputs = aggregate.output))
    Filter(filter, filterApplicationSidePlan)

而InSubquery最终进过PlanSubqueries规则会形成物理计划(在InjectRuntimeFilte中并不会形成InSubqueryExec,我们这里只是举例):

        val executedPlan = QueryExecution.prepareExecutedPlan(sparkSession, query)
        InSubqueryExec(expr, SubqueryExec(s"subquery#$exprId.id", executedPlan), exprId)

对于InSubqueryExc:

  def updateResult(): Unit = 
    val rows = plan.executeCollect()
    result = if (plan.output.length > 1) 
      rows.asInstanceOf[Array[Any]]
     else 
      rows.map(_.get(0, child.dataType))
    
    if (shouldBroadcast) 
      resultBroadcast = plan.session.sparkContext.broadcast(result)
    
  

该updataResult方法也是在executeQuery方法中(也是在drive端调用)被调用,而*plan.executeCollect()*中,plan是BaseSubqueryExec类型的,该类型的实现类中relationFuture变量如下(以SubqueryBroadcastExec为例):

private lazy val relationFuture

也是lazy val,也是只会初始化一次,所以在ReuseExchangeAndSubquery中对BaseSubqueryExec进行复用,就可以减少在driver端的计算.

以上是关于SPARK闲杂--为什么复用Exchange和subquery的主要内容,如果未能解决你的问题,请参考以下文章

“Exchange hashpartitioning”如何在 spark 中工作

在 Spark SQL Query 中通过 Repartition 重用 Exchange

javaScript你不知道的闲杂知识

初次启动hive,解决 ls: cannot access /home/hadoop/spark-2.2.0-bin-hadoop2.6/lib/spark-assembly-*.jar: No su

如何摆脱 NoSuchMethodError:Spark Streaming + Kafka 中的 org.apache.kafka.clients.consumer.KafkaConsumer.su

设计模式-七大原则