spark sql解析过程中对tree的遍历(源码详解)
Posted 小萝卜算子
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark sql解析过程中对tree的遍历(源码详解)相关的知识,希望对你有一定的参考价值。
静下心来读源码,给想要了解spark sql底层解析原理的小伙伴们!
【本文大纲】
1、执行计划回顾
2、遍历过程概述
3、遍历过程详解
4、思考小问题
执行计划回顾
Spark sql执行计划的生成过程:
-
接收 sql 语句,初步解析成 logical plan
-
分析上步生成的 logical plan,生成验证后的 logical plan
-
对分析过后的 logical plan,进行优化
-
对优化过后的 logical plan,生成 physical plan
-
根据 physical plan,生成 rdd 的程序,并且提交运行
SELECT A,B FROM TESTDATA2 WHERE A>2
结合上图,写测试用例,每一步生成的执行计划如下:
Spark sql解析会生成四种plan:
Parsed Logical Plan, Analyzed Logical Plan, Optimized Logical Plan, Physical Plan
上面这四种plan,无论是 LogicalPlan 还是 PhysicalPlan,都是通过树的形式表示。每一步都是对树进行操作,生成新的树。在这个过程中,对树的遍历非常重要。
遍历过程概述
最常用到的有 后序遍历 和 前序遍历 两种
后序遍历
TreeNode 中的 transformUp方法以及AnalysisHelper 中的 resolveOperatorsUp方法 等
这两个方法类似,以TreeNode 中的 transformUp为例:
def transformUp(rule: PartialFunction[BaseType, BaseType]): BaseType =
// 先遍历子节点,得到叶子节点
val afterRuleOnChildren = mapChildren(_.transformUp(rule))
//对节点执行规则
val newNode = if (this fastEquals afterRuleOnChildren)
CurrentOrigin.withOrigin(origin)
//这里用到了PartialFunction的applyOrElse方法,用来避免undefined的情况发生。如果当前节点应用rule没有匹配的话,则返回默认的当前节点本身
rule.applyOrElse(this, identity[BaseType])
else
CurrentOrigin.withOrigin(origin)
rule.applyOrElse(afterRuleOnChildren, identity[BaseType])
// If the transform function replaces this node with a new one, carry over the tags.
newNode.copyTagsFrom(this)
newNode
递归逻辑:
-
递归结束条件:如果是子节点,那么使用该规则执行该节点,并且返回执行规则后的节点
-
递归继续条件:如果有子节点,那么先根据遍历子节点的结果,生成新节点。最后在使用该规则执行新节点
前序遍历
TreeNode 中的 transformDown方法以及AnalysisHelper 中的 resolveOperatorsDown方法 等
TreeNode 中的 transformDown为例:
def transformDown(rule: PartialFunction[BaseType, BaseType]): BaseType =
// 对当前节点,调用rule函数。
val afterRule = CurrentOrigin.withOrigin(origin)
// 这里rule函数有可能会生成新的节点,新节点的子节点可能不一样
rule.applyOrElse(this, identity[BaseType])
// Check if unchanged and then possibly return old copy to avoid gc churn.
//再遍历子节点
if (this fastEquals afterRule)
// 如果当前节点没有变化,则继续遍历它的子节点
mapChildren(_.transformDown(rule))
else
// 如果当前节点发生改变,需要对改变后的节点进行遍历
afterRule.copyTagsFrom(this)
afterRule.mapChildren(_.transformDown(rule))
递归逻辑:
-
递归结束条件:如果是叶子节点,那么使用规则对该节点操作,并且返回操作后的节点。
-
递归继续条件:如果不是叶子节点,那么先使用该规则对该节点操作。对操作后的该节点,继续遍历其子节点,用子节点的返回结果,来构建成新的节点。
遍历中的通用方法
上面几种方法中,都用到了TreeNode中的mapChildren、mapProductIterator方法
mapChildren
mapChildren 会依次调用函数对子节点操作,根据返回的结果生成一个新的节点。
def mapChildren(f: BaseType => BaseType): BaseType =
//如果不是叶子节点,那么会执行mapChildren(f, forceCopy = false)方法,遍历构造函数的参数。如果参数是子节点,那么递归遍历
if (containsChild.nonEmpty)
mapChildren(f, forceCopy = false)
else
//如果是叶子节点,则返回自身节点
this
private def mapChildren(
f: BaseType => BaseType,
forceCopy: Boolean): BaseType =
var changed = false
def mapChild(child: Any): Any = child match
case arg: TreeNode[_] if containsChild(arg) =>
val newChild = f(arg.asInstanceOf[BaseType])
if (forceCopy || !(newChild fastEquals arg))
changed = true
newChild
else
arg
case tuple @ (arg1: TreeNode[_], arg2: TreeNode[_]) =>
val newChild1 = if (containsChild(arg1))
f(arg1.asInstanceOf[BaseType])
else
arg1.asInstanceOf[BaseType]
val newChild2 = if (containsChild(arg2))
f(arg2.asInstanceOf[BaseType])
else
arg2.asInstanceOf[BaseType]
if (forceCopy || !(newChild1 fastEquals arg1) || !(newChild2 fastEquals arg2))
changed = true
(newChild1, newChild2)
else
tuple
case other => other
// 调用了mapProductIterator方法,遍历构造函数的参数,返回新的构造参数
val newArgs = mapProductIterator
// 如果参数是TreeNode子类,并且是当前节点的子节点
case arg: TreeNode[_] if containsChild(arg) =>
// 递归调用函数遍历 这里的f可能是 transformUp or transformDown
val newChild = f(arg.asInstanceOf[BaseType])
// 如果子节点发生变化了,则更改changed标识
if (forceCopy || !(newChild fastEquals arg))
changed = true
newChild
else
arg
case Some(arg: TreeNode[_]) if containsChild(arg) =>
val newChild = f(arg.asInstanceOf[BaseType])
if (forceCopy || !(newChild fastEquals arg))
changed = true
Some(newChild)
else
Some(arg)
case m: Map[_, _] => m.mapValues
case arg: TreeNode[_] if containsChild(arg) =>
val newChild = f(arg.asInstanceOf[BaseType])
if (forceCopy || !(newChild fastEquals arg))
changed = true
newChild
else
arg
case other => other
.view.force // `mapValues` is lazy and we need to force it to materialize
case d: DataType => d // Avoid unpacking Structs
case args: Stream[_] => args.map(mapChild).force // Force materialization on stream
case args: Iterable[_] => args.map(mapChild)
case nonChild: AnyRef => nonChild
case null => null
// 如果子节点发生变化,则利用新的构造参数,实例化新的节点
if (forceCopy || changed) makeCopy(newArgs, forceCopy) else this
mapProductIterator
TreeNode 继承了 Product 接口,TreeNode 的子类实现了 Product 接口,所以支持访问构造方法的参数。TreeNode 的 mapProductIterator 方法,接收一个函数用来遍历当前节点的构造参数
这里有一个知识点(ClassTag用法):https://dzone.com/articles/scala-classtag-a-simple-use-case
//ClassTag用法
def mapProductIterator[B: ClassTag](f: Any => B): Array[B] =
// protected def mapProductIterator[B: ClassTag](f: Any => B): Array[B] =
val arr = Array.ofDim[B](productArity)
var i = 0
while (i < arr.length)
arr(i) = f(productElement(i))
i += 1
arr
遍历过程详解
下面以Parsed Logical Plan --> Analyzed Logical Plan的过程中 ,某个规则为例,详细跟踪一下这两种遍历方式。
分析一下当前的Parsed Logical Plan
当前sql
SELECT A,B FROM TESTDATA2 WHERE A>2
生成的Parsed Logical Plan:
== Parsed Logical Plan ==
'Project ['A, 'B]
+- 'Filter ('A > 2)
+- 'UnresolvedRelation [TESTDATA2]
上面执行计划涉及到 三个类(Project、Filter、UnresolvedRelation):
case class Project(projectList: Seq[NamedExpression], child: LogicalPlan)
两个参数:
-
Project-projectList: Seq['A, 'B]
-
Project-child(LogicalPlan):
-
'Filter ('A > 2)
-
+- 'UnresolvedRelation [TESTDATA2]
Project有一个子节点Filter
case class Filter(condition: Expression, child: LogicalPlan)
两个参数:
-
Filter-condition:('A > 2)
-
Filter--child(LogicalPlan):
'UnresolvedRelation [TESTDATA2]
Filter有一个子节点UnresolvedRelation
case class UnresolvedRelation( multipartIdentifier: Seq[String])
一个参数:
-
UnresolvedRelation-multipartIdentifier: Seq[TESTDATA2]
UnresolvedRelation无子节点
Project、Filter、UnresolvedRelation与 LogicalPlan、 TreeNode的继承关系如下:
Project、Filter、UnresolvedRelation本身是Logical Plan 、TreeNode。
后序遍历(AnalysisHelper.resolveOperatorsUp)
Parsed Logical Plan 需要 通过Analyzer类中的一系列rule 转换生成Analyzed Logical Plan。
下图是Analyzer类中rule,会提前初始化在batches里:
这里的rule通过apply方法遍历Parsed Logical Plan 的每个节点,依据定好的规则生成Analyzed Logical Plan,以 ResolveHints.ResolveJoinStrategyHint为例:
主要通过AnalysisHelper 中的 resolveOperatorsUp(后序遍历的)方法:
// 入参为rule,偏函数
def resolveOperatorsUp(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan =
if (!analyzed)
AnalysisHelper.allowInvokingTransformsInAnalyzer
// 1、先遍历子节点,得到叶子节点
val afterRuleOnChildren = mapChildren(_.resolveOperatorsUp(rule))
//2、为节点执行规则
if (self fastEquals afterRuleOnChildren)
CurrentOrigin.withOrigin(origin)
// 如果遍历后当前节点没有发生变化,对当前的plan执行rule规则
rule.applyOrElse(self, identity[LogicalPlan])
else
CurrentOrigin.withOrigin(origin)
// 如果遍历后 当前 节点发 生了变化,则新负值的afterRuleOnChildren执行rule规则
rule.applyOrElse(afterRuleOnChildren, identity[LogicalPlan])
else
self
当前的节点为Project,执行Project 的 resolveOperatorsUp 方法,该方法会先遍历Project的子节点。
第一层遍历:
执行Project 的 mapChildren方法
第二层遍历:
执行Filter 的 mapChildren方法
第三层遍历:
执行UnresolvedRelation 的 mapChildren方法
由 于 UnresolvedRelation为子节点,返回节点本 身,为UnresolvedRelation执行rule。
为UnresolvedRelation节点执行ResolveJoinStrategyHint的apply方法:
// 该规则主要是针对Hint节点起作用 ,目前是UnresolvedRelation节点
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp
case h: UnresolvedHint if STRATEGY_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
if (h.parameters.isEmpty)
// If there is no table alias specified, apply the hint on the entire subtree.
ResolvedHint(h.child, createHintInfo(h.name))
else
// Otherwise, find within the subtree query plans to apply the hint.
val relationNamesInHint = h.parameters.map
case tableName: String => UnresolvedAttribute.parseAttributeName(tableName)
case tableId: UnresolvedAttribute => tableId.nameParts
case unsupported => throw new AnalysisException("Join strategy hint parameter " +
s"should be an identifier or string but was $unsupported ($unsupported.getClass")
.toSet
val relationsInHintWithMatch = new mutable.HashSet[Seq[String]]
val applied = applyJoinStrategyHint(
h.child, relationNamesInHint, relationsInHintWithMatch, h.name)
// Filters unmatched relation identifiers in the hint
val unmatchedIdents = relationNamesInHint -- relationsInHintWithMatch
hintErrorHandler.hintRelationsNotFound(h.name, h.parameters, unmatchedIdents)
applied
这个规则主要是 对Hint节点起作用,但目前是UnresolvedRelation节点,不能匹配的上。因此通过
rule.applyOrElse(self, identity[LogicalPlan])
之后,返回UnresolvedRelation本身。
UnresolvedRelation返回后,就会接着先后为Filter-->Project执行ResolveJoinStrategyHint规则,最后返回Project本身。
到此,整个ResolveJoinStrategyHint对Logical plan的 遍历及执行规则的 过 程 就结束了。
前序遍历(AnalysisHelper.resolveOperatorsDown)
Analyzer 中的 ExtractWindowExpressions规则
主要通过AnalysisHelper 中的 resolveOperatorsDown方法:
/** Similar to [[resolveOperatorsUp]], but does it top-down. */
def resolveOperatorsDown(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan =
if (!analyzed)
AnalysisHelper.allowInvokingTransformsInAnalyzer
val afterRule = CurrentOrigin.withOrigin(origin)
// 1、为当前节点执行规则
rule.applyOrElse(self, identity[LogicalPlan])
// 2、对执行完规则后的新节点遍历迭代
if (self fastEquals afterRule)
//如果执行完规则后的节点没有变化(即规则没有起到作用),则对节点遍历迭代
mapChildren(_.resolveOperatorsDown(rule))
else
//如果执行完规则后的节点发生变化,则对新节点遍历迭代
afterRule.mapChildren(_.resolveOperatorsDown(rule))
else
self
先为Project节点执行ExtractWindowExpressions.apply方法:
Project节点模式匹配case p: LogicalPlan if !p.childrenResolved => p 返回Project节点本身
第一层遍历:
得到Project的子节点Filter,执行Filter.resolveOperatorsDown方法,先对Filter节点执行ExtractWindowExpressions.apply方法,跑一遍规则,最后由于sql没有用到window相关函数,返回Filter节点本身,开始对Filter节点进行遍历
第二层遍历:
得到Filter的子节点UnresolvedRelation,执行UnresolvedRelation.resolveOperatorsDown方法,先对UnresolvedRelation节点执行ExtractWindowExpressions.apply方法,跑一遍规则,返回UnresolvedRelation节点本身,开始对UnresolvedRelation节点进行遍历
第三层遍历:
UnresolvedRelation没有子节点,在mapChildren方法被返回。
最终 返回Project节点,ExtractWindowExpressions执行完成。
思考
什么rule适合用后序遍历?什么rule适合前序遍历?
当我们自己开发规则时,该怎么选呢?
推荐阅读--
sparksql源码系列 | 最全的logical plan优化规则整理(spark2.3)
Sparksql源码系列 | 读源码必须掌握的scala基础语法
澄清 | snappy压缩到底支持不支持split? 为啥?
以上是关于spark sql解析过程中对tree的遍历(源码详解)的主要内容,如果未能解决你的问题,请参考以下文章
Spark SQL functions.scala 源码解析String functions (基于 Spark 3.3.0)