源码详解 | SparkSQL底层解析原理

Posted 数据仓库与Python大数据

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了源码详解 | SparkSQL底层解析原理相关的知识,希望对你有一定的参考价值。



导读:静下心来读源码,给想要了解spark sql底层解析原理的读者朋友!


【本文大纲】

1、执行计划回顾

2、遍历过程概述

3、遍历过程详解

4、思考小问题


执行计划回顾

Spark  sql执行计划的生成过程:


  1. 接收 sql 语句,初步解析成 logical plan

  2. 分析上步生成的 logical plan,生成验证后的 logical plan

  3. 对分析过后的 logical plan,进行优化

  4. 对优化过后的 logical plan,生成 physical plan

  5. 根据 physical plan,生成 rdd 的程序,并且提交运行


SELECT A,B FROM TESTDATA2 WHERE A>2


结合上图,写测试用例,每一步生成的执行计划如下:

源码详解 | SparkSQL底层解析原理


Spark sql解析会生成四种plan:

Parsed Logical Plan, Analyzed Logical Plan, Optimized Logical Plan, Physical Plan

上面这四种plan,无论是 LogicalPlan 还是 PhysicalPlan,都是通过树的形式表示。每一步都是对树进行操作,生成新的树。在这个过程中,对树的遍历非常重要。


遍历过程概述

最常用到的有  后序遍历 和 前序遍历  两种


后序遍历

TreeNode 中的 transformUp方法以及AnalysisHelper 中的 resolveOperatorsUp方法 等

这两个方法类似,以TreeNode 中的 transformUp为例:

def transformUp(rule: PartialFunction[BaseType, BaseType]): BaseType = { // 先遍历子节点,得到叶子节点 val afterRuleOnChildren = mapChildren(_.transformUp(rule))  //对节点执行规则 val newNode = if (this fastEquals afterRuleOnChildren) { CurrentOrigin.withOrigin(origin) { //这里用到了PartialFunction的applyOrElse方法,用来避免undefined的情况发生。如果当前节点应用rule没有匹配的话,则返回默认的当前节点本身 rule.applyOrElse(this, identity[BaseType]) } } else { CurrentOrigin.withOrigin(origin) { rule.applyOrElse(afterRuleOnChildren, identity[BaseType]) } } // If the transform function replaces this node with a new one, carry over the tags. newNode.copyTagsFrom(this) newNode}


递归逻辑:

  • 递归结束条件:如果是子节点,那么使用该规则执行该节点,并且返回执行规则后的节点

  • 递归继续条件:如果有子节点,那么先根据遍历子节点的结果,生成新节点。最后在使用该规则执行新节点


前序遍历

TreeNode 中的 transformDown方法以及AnalysisHelper 中的 resolveOperatorsDown方法 等

TreeNode 中的 transformDown为例:

def transformDown(rule: PartialFunction[BaseType, BaseType]): BaseType = { // 对当前节点,调用rule函数。 val afterRule = CurrentOrigin.withOrigin(origin) { // 这里rule函数有可能会生成新的节点,新节点的子节点可能不一样 rule.applyOrElse(this, identity[BaseType]) }
// Check if unchanged and then possibly return old copy to avoid gc churn. //再遍历子节点 if (this fastEquals afterRule) {// 如果当前节点没有变化,则继续遍历它的子节点 mapChildren(_.transformDown(rule)) } else { // 如果当前节点发生改变,需要对改变后的节点进行遍历 afterRule.copyTagsFrom(this) afterRule.mapChildren(_.transformDown(rule)) }}


递归逻辑:

  • 递归结束条件:如果是叶子节点,那么使用规则对该节点操作,并且返回操作后的节点。

  • 递归继续条件:如果不是叶子节点,那么先使用该规则对该节点操作。对操作后的该节点,继续遍历其子节点,用子节点的返回结果,来构建成新的节点。


遍历中的通用方法

上面几种方法中,都用到了TreeNode中的mapChildren、mapProductIterator方法


mapChildren

mapChildren 会依次调用函数对子节点操作,根据返回的结果生成一个新的节点。


def mapChildren(f: BaseType => BaseType): BaseType = {//如果不是叶子节点,那么会执行mapChildren(f, forceCopy = false)方法,遍历构造函数的参数。如果参数是子节点,那么递归遍历if (containsChild.nonEmpty) { mapChildren(f, forceCopy = false) } else {//如果是叶子节点,则返回自身节点  this }}

private def mapChildren( f: BaseType => BaseType, forceCopy: Boolean): BaseType = { var changed = false
def mapChild(child: Any): Any = child match { case arg: TreeNode[_] if containsChild(arg) => val newChild = f(arg.asInstanceOf[BaseType]) if (forceCopy || !(newChild fastEquals arg)) { changed = true newChild } else { arg } case tuple @ (arg1: TreeNode[_], arg2: TreeNode[_]) => val newChild1 = if (containsChild(arg1)) { f(arg1.asInstanceOf[BaseType]) } else { arg1.asInstanceOf[BaseType] }
val newChild2 = if (containsChild(arg2)) { f(arg2.asInstanceOf[BaseType]) } else { arg2.asInstanceOf[BaseType] }
if (forceCopy || !(newChild1 fastEquals arg1) || !(newChild2 fastEquals arg2)) { changed = true (newChild1, newChild2) } else { tuple } case other => other }
// 调用了mapProductIterator方法,遍历构造函数的参数,返回新的构造参数 val newArgs = mapProductIterator {// 如果参数是TreeNode子类,并且是当前节点的子节点 case arg: TreeNode[_] if containsChild(arg) =>// 递归调用函数遍历 这里的f可能是 transformUp or transformDown val newChild = f(arg.asInstanceOf[BaseType])// 如果子节点发生变化了,则更改changed标识 if (forceCopy || !(newChild fastEquals arg)) { changed = true newChild } else { arg } case Some(arg: TreeNode[_]) if containsChild(arg) => val newChild = f(arg.asInstanceOf[BaseType]) if (forceCopy || !(newChild fastEquals arg)) { changed = true Some(newChild) } else { Some(arg) } case m: Map[_, _] => m.mapValues { case arg: TreeNode[_] if containsChild(arg) => val newChild = f(arg.asInstanceOf[BaseType]) if (forceCopy || !(newChild fastEquals arg)) { changed = true newChild } else { arg } case other => other }.view.force // `mapValues` is lazy and we need to force it to materialize case d: DataType => d // Avoid unpacking Structs case args: Stream[_] => args.map(mapChild).force // Force materialization on stream case args: Iterable[_] => args.map(mapChild) case nonChild: AnyRef => nonChild case null => null } // 如果子节点发生变化,则利用新的构造参数,实例化新的节点 if (forceCopy || changed) makeCopy(newArgs, forceCopy) else this}


mapProductIterator

TreeNode 继承了 Product 接口,TreeNode 的子类实现了 Product 接口,所以支持访问构造方法的参数。TreeNode 的 mapProductIterator 方法,接收一个函数用来遍历当前节点的构造参数


这里有一个知识点(ClassTag用法):https://dzone.com/articles/scala-classtag-a-simple-use-case

//ClassTag用法 def mapProductIterator[B: ClassTag](f: Any => B): Array[B] = {// protected def mapProductIterator[B: ClassTag](f: Any => B): Array[B] = { val arr = Array.ofDim[B](productArity) var i = 0 while (i < arr.length) { arr(i) = f(productElement(i)) i += 1 } arr}



遍历过程详解

下面以Parsed Logical Plan --> Analyzed Logical Plan的过程中 ,某个规则为例,详细跟踪一下这两种遍历方式。


分析一下当前的Parsed Logical Plan

当前sql

SELECT A,B FROM TESTDATA2 WHERE A>2


生成的Parsed Logical Plan:

== Parsed Logical Plan =='Project ['A, 'B]+- 'Filter ('A > 2) +- 'UnresolvedRelation [TESTDATA2]


上面执行计划涉及到 三个类(Project、Filter、UnresolvedRelation):


case class Project(projectList: Seq[NamedExpression], child: LogicalPlan)

两个参数:

  • Project-projectList: Seq['A, 'B]

  • Project-child(LogicalPlan):

  • 'Filter ('A > 2)

  •    +- 'UnresolvedRelation [TESTDATA2]

Project有一个子节点Filter


case class Filter(condition: Expression, child: LogicalPlan)

两个参数:

  • Filter-condition:('A > 2)

  • Filter--child(LogicalPlan): 

      'UnresolvedRelation [TESTDATA2]

Filter有一个子节点UnresolvedRelation


case class UnresolvedRelation( multipartIdentifier: Seq[String])

一个参数:

  • UnresolvedRelation-multipartIdentifier: Seq[TESTDATA2]

UnresolvedRelation无子节点


Project、Filter、UnresolvedRelation与 LogicalPlan、 TreeNode的继承关系如下

源码详解 | SparkSQL底层解析原理


Project、Filter、UnresolvedRelation本身是Logical Plan 、TreeNode。


后序遍历(AnalysisHelper.resolveOperatorsUp)

Parsed Logical Plan 需要 通过Analyzer类中的一系列rule 转换生成Analyzed Logical Plan。

下图是Analyzer类中rule,会提前初始化在batches里:

源码详解 | SparkSQL底层解析原理


这里的rule通过apply方法遍历Parsed Logical Plan 的每个节点,依据定好的规则生成Analyzed Logical Plan,以  ResolveHints.ResolveJoinStrategyHint为例:

源码详解 | SparkSQL底层解析原理


主要通过AnalysisHelper 中的 resolveOperatorsUp(后序遍历的)方法:


// 入参为rule,偏函数def resolveOperatorsUp(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = { if (!analyzed) { AnalysisHelper.allowInvokingTransformsInAnalyzer { // 1、先遍历子节点,得到叶子节点 val afterRuleOnChildren = mapChildren(_.resolveOperatorsUp(rule))  //2、为节点执行规则 if (self fastEquals afterRuleOnChildren) { CurrentOrigin.withOrigin(origin) { // 如果遍历后当前节点没有发生变化,对当前的plan执行rule规则 rule.applyOrElse(self, identity[LogicalPlan]) } } else { CurrentOrigin.withOrigin(origin) { // 如果遍历后 当前 节点发 生了变化,则新负值的afterRuleOnChildren执行rule规则 rule.applyOrElse(afterRuleOnChildren, identity[LogicalPlan]) } } } } else { self }}


当前的节点为Project,执行Project 的 resolveOperatorsUp 方法,该方法会先遍历Project的子节点。


第一层遍历:

执行Project 的 mapChildren方法

源码详解 | SparkSQL底层解析原理


第二层遍历:

执行Filter 的 mapChildren方法

源码详解 | SparkSQL底层解析原理


第三层遍历:

执行UnresolvedRelation 的 mapChildren方法

源码详解 | SparkSQL底层解析原理


由 于 UnresolvedRelation为子节点,返回节点本 身,为UnresolvedRelation执行rule。

为UnresolvedRelation节点执行ResolveJoinStrategyHint的apply方法:


// 该规则主要是针对Hint节点起作用 ,目前是UnresolvedRelation节点def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp { case h: UnresolvedHint if STRATEGY_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) => if (h.parameters.isEmpty) { // If there is no table alias specified, apply the hint on the entire subtree. ResolvedHint(h.child, createHintInfo(h.name)) } else { // Otherwise, find within the subtree query plans to apply the hint. val relationNamesInHint = h.parameters.map { case tableName: String => UnresolvedAttribute.parseAttributeName(tableName) case tableId: UnresolvedAttribute => tableId.nameParts case unsupported => throw new AnalysisException("Join strategy hint parameter " + s"should be an identifier or string but was $unsupported (${unsupported.getClass}") }.toSet val relationsInHintWithMatch = new mutable.HashSet[Seq[String]] val applied = applyJoinStrategyHint( h.child, relationNamesInHint, relationsInHintWithMatch, h.name)
// Filters unmatched relation identifiers in the hint val unmatchedIdents = relationNamesInHint -- relationsInHintWithMatch hintErrorHandler.hintRelationsNotFound(h.name, h.parameters, unmatchedIdents) applied }}


这个规则主要是 对Hint节点起作用,但目前是UnresolvedRelation节点,不能匹配的上。因此通过

 rule.applyOrElse(self, identity[LogicalPlan])


之后,返回UnresolvedRelation本身。

UnresolvedRelation返回后,就会接着先后为Filter-->Project执行ResolveJoinStrategyHint规则,最后返回Project本身。

到此,整个ResolveJoinStrategyHint对Logical plan的 遍历及执行规则的 过 程 就结束了。 


前序遍历(AnalysisHelper.resolveOperatorsDown)

Analyzer 中的 ExtractWindowExpressions规则

源码详解 | SparkSQL底层解析原理


主要通过AnalysisHelper 中的 resolveOperatorsDown方法:

/** Similar to [[resolveOperatorsUp]], but does it top-down. */def resolveOperatorsDown(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = { if (!analyzed) { AnalysisHelper.allowInvokingTransformsInAnalyzer {  val afterRule = CurrentOrigin.withOrigin(origin) { // 1、为当前节点执行规则 rule.applyOrElse(self, identity[LogicalPlan]) }
// 2、对执行完规则后的新节点遍历迭代 if (self fastEquals afterRule) { //如果执行完规则后的节点没有变化(即规则没有起到作用),则对节点遍历迭代 mapChildren(_.resolveOperatorsDown(rule)) } else { //如果执行完规则后的节点发生变化,则对新节点遍历迭代 afterRule.mapChildren(_.resolveOperatorsDown(rule)) } } } else { self }}


先为Project节点执行ExtractWindowExpressions.apply方法:

源码详解 | SparkSQL底层解析原理


Project节点模式匹配case p: LogicalPlan if !p.childrenResolved => p 返回Project节点本身


第一层遍历:

源码详解 | SparkSQL底层解析原理


得到Project的子节点Filter,执行Filter.resolveOperatorsDown方法,先对Filter节点执行ExtractWindowExpressions.apply方法,跑一遍规则,最后由于sql没有用到window相关函数,返回Filter节点本身,开始对Filter节点进行遍历


第二层遍历:

源码详解 | SparkSQL底层解析原理


得到Filter的子节点UnresolvedRelation,执行UnresolvedRelation.resolveOperatorsDown方法,先对UnresolvedRelation节点执行ExtractWindowExpressions.apply方法,跑一遍规则,返回UnresolvedRelation节点本身,开始对UnresolvedRelation节点进行遍历


第三层遍历:


UnresolvedRelation没有子节点,在mapChildren方法被返回。

最终 返回Project节点,ExtractWindowExpressions执行完成。


思考

什么rule适合用后序遍历?什么rule适合前序遍历?

当我们自己开发规则时,该怎么选呢?




在文末分享、点赞、在看,给个三连呗~~


【推荐阅读】

往期推荐


欢迎加入 大数据数仓中台交流群,跟同行零距离交流。如想进群,请加v:iom1128,备注:数仓,审核通过自主入群。


入群请联系小助手:iom1128『紫霞仙子』

 

关注不迷路~ 各种福利、资源定期分享!


在成为最厉害最厉害最厉害的道路上

很高兴认识你

以上是关于源码详解 | SparkSQL底层解析原理的主要内容,如果未能解决你的问题,请参考以下文章

Spark SQL底层执行流程详解

Spark SQL架构工作原理及流程解析

Spring源码解读---底层核心原理解析

Spring源码解读---底层核心原理解析

Spark SQL底层执行流程详解

Spark SQL底层执行流程详解