SPARK闲杂--为什么复用Exchange和subquery
Posted 鸿乃江边鸟
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SPARK闲杂--为什么复用Exchange和subquery相关的知识,希望对你有一定的参考价值。
背景
本文基于Spark 3.3.0
我们在Spark代码中有时候会看到 exchangeReuseEnabled 和subqueryReuseEnabled 配置,这个配置的作用是什么,结合spark源码我们分析一下
分析
exchangeReuseEnabled
在PlanDynamicPruningFilters中我们可以看到:
val canReuseExchange = conf.exchangeReuseEnabled && buildKeys.nonEmpty &&
plan.exists
case BroadcastHashJoinExec(_, _, _, BuildLeft, _, left, _, _) =>
left.sameResult(sparkPlan)
case BroadcastHashJoinExec(_, _, _, BuildRight, _, _, right, _) =>
right.sameResult(sparkPlan)
case _ => false
if (canReuseExchange)
val executedPlan = QueryExecution.prepareExecutedPlan(sparkSession, sparkPlan)
val mode = broadcastMode(buildKeys, executedPlan.output)
// plan a broadcast exchange of the build side of the join
val exchange = BroadcastExchangeExec(mode, executedPlan)
val name = s"dynamicpruning#$exprId.id"
// place the broadcast adaptor for reusing the broadcast results on the probe side
val broadcastValues =
SubqueryBroadcastExec(name, broadcastKeyIndex, buildKeys, exchange)
DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId))
只有开启了重用Exchange的情况下且存在物理计划是BroadcastHashJoinExec的情况下(只有这样才能获取最大的收益),才会进行动态分区裁剪。其实在这里并没有做过多的操作,只不过是生成了一个BroadcastExchangeExec的操作,看到这里完全没看出来重用Exchange的作用在哪里。
原因是在 Rule ReuseExchangeAndSubquery中,这里会进行exchange的替换,如果存在一样的Exchange,就会进行替换,所以以上分区裁剪中的DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId))涉及的BroadcastExchangeExec才会被复用。
但是为什么BroadcastExchangeExec复用了就会减少spark的计算呢?
还是拿DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId)) 举例:
InSubqueryExec 中 broadcastValues的类型是SubqueryBroadcastExec,而SubqueryBroadcastExec中的计算逻辑:
@transient
private lazy val relationFuture: Future[Array[InternalRow]] =
// relationFuture is used in "doExecute". Therefore we can get the execution id correctly here.
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
Future
// This will run in another thread. Set the execution id so that we can connect these jobs
// with the correct execution.
SQLExecution.withExecutionId(session, executionId)
val beforeCollect = System.nanoTime()
val broadcastRelation = child.executeBroadcast[HashedRelation]().value
可以看到这个relationFuture变量是lazy val修饰的(这样多次调动这个变量只会初始化一次,所以会较少driver端的计算量), 而relationFuture这个变量的初始化在:
protected override def doPrepare(): Unit =
relationFuture
而doPrepare方法的调用是在方法executeQuery driver端形成RDD的时候,如下:
protected final def executeQuery[T](query: => T): T =
RDDOperationScope.withScope(sparkContext, nodeName, false, true)
prepare()
waitForSubqueries()
query
而在val broadcastRelation = child.executeBroadcastHashedRelation.value,这里的child是BroadcastExchangeExec类型的,这样进而触发广播操作。
subqueryReuseEnabled
拿InjectRuntimeFilter举例,在InjectRuntimeFilter的规则中,会最终形成以下逻辑计划:
val filter = InSubquery(Seq(mayWrapWithHash(filterApplicationSideExp)),
ListQuery(aggregate, childOutputs = aggregate.output))
Filter(filter, filterApplicationSidePlan)
而InSubquery最终进过PlanSubqueries规则会形成物理计划(在InjectRuntimeFilte中并不会形成InSubqueryExec,我们这里只是举例):
val executedPlan = QueryExecution.prepareExecutedPlan(sparkSession, query)
InSubqueryExec(expr, SubqueryExec(s"subquery#$exprId.id", executedPlan), exprId)
对于InSubqueryExc:
def updateResult(): Unit =
val rows = plan.executeCollect()
result = if (plan.output.length > 1)
rows.asInstanceOf[Array[Any]]
else
rows.map(_.get(0, child.dataType))
if (shouldBroadcast)
resultBroadcast = plan.session.sparkContext.broadcast(result)
该updataResult方法也是在executeQuery方法中(也是在drive端调用)被调用,而*plan.executeCollect()*中,plan是BaseSubqueryExec类型的,该类型的实现类中relationFuture变量如下(以SubqueryBroadcastExec为例):
private lazy val relationFuture
也是lazy val,也是只会初始化一次,所以在ReuseExchangeAndSubquery中对BaseSubqueryExec进行复用,就可以减少在driver端的计算.
以上是关于SPARK闲杂--为什么复用Exchange和subquery的主要内容,如果未能解决你的问题,请参考以下文章
“Exchange hashpartitioning”如何在 spark 中工作
在 Spark SQL Query 中通过 Repartition 重用 Exchange
初次启动hive,解决 ls: cannot access /home/hadoop/spark-2.2.0-bin-hadoop2.6/lib/spark-assembly-*.jar: No su
如何摆脱 NoSuchMethodError:Spark Streaming + Kafka 中的 org.apache.kafka.clients.consumer.KafkaConsumer.su