Spark源码分析之SparkSubmit的流程

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark源码分析之SparkSubmit的流程相关的知识,希望对你有一定的参考价值。

参考技术A

本文主要对SparkSubmit的任务提交流程源码进行分析。 Spark源码版本为2.3.1。

首先阅读一下启动脚本,看看首先加载的是哪个类,我们看一下 spark-submit 启动脚本中的具体内容。

可以看到这里加载的类是org.apache.spark.deploy.SparkSubmit,并且把启动相关的参数也带过去了。下面我们跟一下源码看看整个流程是如何运作的...

SparkSubmit的main方法如下

这里我们由于我们是提交作业,所有会走上面的submit(appArgs, uninitLog)方法

可以看到submit方法首先会准备任务提交的环境,调用了prepareSubmitEnvironment,该方法会返回四元组,该方法中会调用doPrepareSubmitEnvironment,这里我们重点注意 childMainClass类具体是什么 ,因为这里涉及到后面启动我们主类的过程。

以下是doPrepareSubmitEnvironment方法的源码...

可以看到该方法首先是解析相关的参数,如jar包,mainClass的全限定名,系统配置,校验一些参数,等等,之后的关键点就是根据我们 deploy-mode 参数来判断是如何运行我们的mainClass,这里主要是通过childMainClass这个参数来决定下一步首先启动哪个类。

childMainClass根据部署模型有不同的值:

之后该方法会把准备好的四元组返回,我们接着看之前的submit方法

可以看到这里最终会调用doRunMain()方法去进行下一步。

doRunMain的实现如下...

doRunMain方法中会判断是否需要一个代理用户,然后无论需不需要都会执行runMain方法,我们接下来看看runMain方法是如何实现的。

这里我们只假设以集群模式启动,首先会加载类,将我们的childMainClass加载为字节码对象mainClass ,然后将mainClass 映射成SparkApplication对象,因为我们以集群模式启动,那么上一步返回四元组中的childMainClass的参数为ClientApp的全限定名,而这里会调用app实例的start方法因此,这里最终调用的是ClientApp的start方法。

ClientApp的start方法如下...

可以看到这里和之前我们的master启动流程有些相似。
可以参考我上一篇文章 Spark源码分析之Master的启动流程 对这一流程加深理解。

首先是准备rpcEnv环境,之后通过master的地址获取masterEndpoints端点相关信息,因为这里运行start方法时会将之前配置的相关参数都传进来,之后就会通过rpcEnv注册相关clientEndPoint端点信息,同时需要注意,这里会把masterEndpoints端点信息也作为构造ClientEndpoint端点的参数,也就是说这个ClientEndpoint会和masterEndpoints通信。

而在我上一篇文章中说过,只要是setupEndpoint方法被调用,一定会调用相关端点的的onStart方法,而这会调用clientEndPoint的onStart方法。

ClientEndPoint类中的onStart方法会匹配launch事件。源码如下

onStart中匹配我们的launch的过程,这个过程是启动driverWrapper的过程,可以看到上面源码中封装了mainClass ,该参数对应DriverWrapper类的全限定名,之后将mainClass封装到command中,然后封装到driverDescription中,向Master申请启动Driver。

这个过程会向Mster发送消息,是通过rpcEnv来实现发射消息的,而这里就涉及到outbox信箱,会调用postToOutbox方法,向outbox信箱中添加消息,然后通过TransportClient的send或sendRpc方法发送消息。发件箱以及发送过程是在同一个线程中进行。

而细心的同学会注意到这里调用的方法名为SendToMasterAndForwardReply,见名之意,发送消息到master并且期待回应。

下面是rpcEnv来实现向远端发送消息的一个调用流程,最终会通过netty中的TransportClient来写出。

之后,Master端会触发receiveAndReply函数,匹配RequestSubmitDriver样例类,完成模式匹配执行后续流程。

可以看到这里首先将Driver信息封装成DriverInfo,然后添加待调度列表waitingDrivers中,然后调用通用的schedule函数。

由于waitingDrivers不为空,则会走LaunchDriver的流程,当前的application申请资源,这时会向worker发送消息,触发Worker的receive方法。

Worker的receive方法中,当Worker遇到LaunchDriver指令时,创建并启动一个DriverRunner,DriverRunner启动一个线程,异步的处理Driver启动工作。这里说启动的Driver就是刚才说的org.apache.spark.deploy.worker.DriverWrapper

可以看到上面在DriverRunner中是开辟线程异步的处理Driver启动工作,不会阻塞主进程的执行,而prepareAndRunDriver方法中最终调用 runDriver..

runDriver中主要先做了一些初始化工作,接着就开始启动driver了。

上述Driver启动工作主要分为以下几步:

下面我们直接看DriverWrapper的实现

DriverWrapper,会创建了一个RpcEndpoint与RpcEnv,RpcEndpoint为WorkerWatcher,主要目的为监控Worker节点是否正常,如果出现异常就直接退出,然后当前的ClassLoader加载userJar,同时执行userMainClass,在执行用户的main方法后关闭workerWatcher。

以上就是SparkSubmit的流程,下一篇我会对SparkContext的源码进行解析。

欢迎关注...

第三篇:Spark SQL Catalyst源码分析之Analyzer

/** Spark SQL源码分析系列文章*/

    前面几篇文章讲解了Spark SQL的核心执行流程和Spark SQL的Catalyst框架的Sql Parser是怎样接受用户输入sql,经过解析生成Unresolved Logical Plan的。我们记得Spark SQL的执行流程中另一个核心的组件式Analyzer,本文将会介绍Analyzer在Spark SQL里起到了什么作用。

    Analyzer位于Catalyst的analysis package下,主要职责是将Sql Parser 未能Resolved的Logical Plan 给Resolved掉。

    技术分享

一、Analyzer构造

    Analyzer会使用Catalog和FunctionRegistry将UnresolvedAttribute和UnresolvedRelation转换为catalyst里全类型的对象。

    Analyzer里面有fixedPoint对象,一个Seq[Batch].

 

 

[java] view plain copy
 
  1. class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Boolean)  
  2.   extends RuleExecutor[LogicalPlan] with HiveTypeCoercion {  
  3.   
  4.   // TODO: pass this in as a parameter.  
  5.   val fixedPoint = FixedPoint(100)  
  6.   
  7.   val batches: Seq[Batch] = Seq(  
  8.     Batch("MultiInstanceRelations", Once,  
  9.       NewRelationInstances),  
  10.     Batch("CaseInsensitiveAttributeReferences", Once,  
  11.       (if (caseSensitive) Nil else LowercaseAttributeReferences :: Nil) : _*),  
  12.     Batch("Resolution", fixedPoint,  
  13.       ResolveReferences ::  
  14.       ResolveRelations ::  
  15.       NewRelationInstances ::  
  16.       ImplicitGenerate ::  
  17.       StarExpansion ::  
  18.       ResolveFunctions ::  
  19.       GlobalAggregates ::  
  20.       typeCoercionRules :_*),  
  21.     Batch("AnalysisOperators", fixedPoint,  
  22.       EliminateAnalysisOperators)  
  23.   )  

    Analyzer里的一些对象解释:

 

    FixedPoint:相当于迭代次数的上限。

 

[java] view plain copy
 
  1. /** A strategy that runs until fix point or maxIterations times, whichever comes first. */  
  2. case class FixedPoint(maxIterations: Int) extends Strategy  

 

    Batch: 批次,这个对象是由一系列Rule组成的,采用一个策略(策略其实是迭代几次的别名吧,eg:Once)

 

[java] view plain copy
 
  1. /** A batch of rules. */,  
  2. protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)  

   Rule:理解为一种规则,这种规则会应用到Logical Plan 从而将UnResolved 转变为Resolved

 

 

[java] view plain copy
 
  1. abstract class Rule[TreeType <: TreeNode[_]] extends Logging {  
  2.   
  3.   /** Name for this rule, automatically inferred based on class name. */  
  4.   val ruleName: String = {  
  5.     val className = getClass.getName  
  6.     if (className endsWith "$") className.dropRight(1) else className  
  7.   }  
  8.   
  9.   def apply(plan: TreeType): TreeType  
  10. }  


   Strategy:最大的执行次数,如果执行次数在最大迭代次数之前就达到了fix point,策略就会停止,不再应用了。

 

 

[java] view plain copy
 
  1. /** 
  2.  * An execution strategy for rules that indicates the maximum number of executions. If the 
  3.  * execution reaches fix point (i.e. converge) before maxIterations, it will stop. 
  4.  */  
  5. abstract class Strategy { def maxIterations: Int }  


   Analyzer解析主要是根据这些Batch里面定义的策略和Rule来对Unresolved的逻辑计划进行解析的。

 

   这里Analyzer类本身并没有定义执行的方法,而是要从它的父类RuleExecutor[LogicalPlan]寻找,Analyzer也实现了HiveTypeCosercion,这个类是参考Hive的类型自动兼容转换的原理。如图:

    技术分享

    RuleExecutor:执行Rule的执行环境,它会将包含了一系列的Rule的Batch进行执行,这个过程都是串行的。

    具体的执行方法定义在apply里:

    可以看到这里是一个while循环,每个batch下的rules都对当前的plan进行作用,这个过程是迭代的,直到达到Fix Point或者最大迭代次数。

 

[java] view plain copy
 
  1. def apply(plan: TreeType): TreeType = {  
  2.    var curPlan = plan  
  3.   
  4.    batches.foreach { batch =>  
  5.      val batchStartPlan = curPlan  
  6.      var iteration = 1  
  7.      var lastPlan = curPlan  
  8.      var continue = true  
  9.   
  10.      // Run until fix point (or the max number of iterations as specified in the strategy.  
  11.      while (continue) {  
  12.        curPlan = batch.rules.foldLeft(curPlan) {  
  13.          case (plan, rule) =>  
  14.            val result = rule(plan) //这里将调用各个不同Rule的apply方法,将UnResolved Relations,Attrubute和Function进行Resolve  
  15.            if (!result.fastEquals(plan)) {  
  16.              logger.trace(  
  17.                s"""  
  18.                  |=== Applying Rule ${rule.ruleName} ===  
  19.                  |${sideBySide(plan.treeString, result.treeString).mkString("\n")}  
  20.                """.stripMargin)  
  21.            }  
  22.   
  23.            result //返回作用后的result plan  
  24.        }  
  25.        iteration += 1  
  26.        if (iteration > batch.strategy.maxIterations) { //如果迭代次数已经大于该策略的最大迭代次数,就停止循环  
  27.          logger.info(s"Max iterations ($iteration) reached for batch ${batch.name}")  
  28.          continue = false  
  29.        }  
  30.   
  31.        if (curPlan.fastEquals(lastPlan)) { //如果在多次迭代中不再变化,因为plan有个unique id,就停止循环。  
  32.          logger.trace(s"Fixed point reached for batch ${batch.name} after $iteration iterations.")  
  33.          continue = false  
  34.        }  
  35.        lastPlan = curPlan  
  36.      }  
  37.   
  38.      if (!batchStartPlan.fastEquals(curPlan)) {  
  39.        logger.debug(  
  40.          s"""  
  41.          |=== Result of Batch ${batch.name} ===  
  42.          |${sideBySide(plan.treeString, curPlan.treeString).mkString("\n")}  
  43.        """.stripMargin)  
  44.      } else {  
  45.        logger.trace(s"Batch ${batch.name} has no effect.")  
  46.      }  
  47.    }  
  48.   
  49.    curPlan //返回Resolved的Logical Plan  
  50.  }  

二、Rules介绍

    目前Spark SQL 1.0.0的Rule都定义在了Analyzer.scala的内部类。
    在batches里面定义了4个Batch。
    MultiInstanceRelations、CaseInsensitiveAttributeReferences、Resolution、AnalysisOperators 四个。
    这4个Batch是将不同的Rule进行归类,每种类别采用不同的策略来进行Resolve。
    技术分享

2.1、MultiInstanceRelation 

如果一个实例在Logical Plan里出现了多次,则会应用NewRelationInstances这儿Rule
[java] view plain copy
 
  1. Batch("MultiInstanceRelations", Once,  
  2.      NewRelationInstances)  
[java] view plain copy
 
  1. trait MultiInstanceRelation {  
  2.   def newInstance: this.type  
  3. }  
[java] view plain copy
 
  1. object NewRelationInstances extends Rule[LogicalPlan] {   
  2.   def apply(plan: LogicalPlan): LogicalPlan = {  
  3.     val localRelations = plan collect { case l: MultiInstanceRelation => l} //将logical plan应用partial function得到所有MultiInstanceRelation的plan的集合   
  4.     val multiAppearance = localRelations  
  5.       .groupBy(identity[MultiInstanceRelation]) //group by操作  
  6.       .filter { case (_, ls) => ls.size > 1 } //如果只取size大于1的进行后续操作  
  7.       .map(_._1)  
  8.       .toSet  
  9.   
  10.     //更新plan,使得每个实例的expId是唯一的。  
  11.     plan transform {  
  12.       case l: MultiInstanceRelation if multiAppearance contains l => l.newInstance  
  13.     }  
  14.   }  
  15. }  

2.2、LowercaseAttributeReferences

同样是partital function,对当前plan应用,将所有匹配的如UnresolvedRelation的别名alise转换为小写,将Subquery的别名也转换为小写。
总结:这是一个使属性名大小写不敏感的Rule,因为它将所有属性都to lower case了。
[java] view plain copy
 
  1. object LowercaseAttributeReferences extends Rule[LogicalPlan] {  
  2.   def apply(plan: LogicalPlan): LogicalPlan = plan transform {  
  3.     case UnresolvedRelation(databaseName, name, alias) =>  
  4.       UnresolvedRelation(databaseName, name, alias.map(_.toLowerCase))  
  5.     case Subquery(alias, child) => Subquery(alias.toLowerCase, child)  
  6.     case q: LogicalPlan => q transformExpressions {  
  7.       case s: Star => s.copy(table = s.table.map(_.toLowerCase))  
  8.       case UnresolvedAttribute(name) => UnresolvedAttribute(name.toLowerCase)  
  9.       case Alias(c, name) => Alias(c, name.toLowerCase)()  
  10.       case GetField(c, name) => GetField(c, name.toLowerCase)  
  11.     }  
  12.   }  
  13. }  

2.3、ResolveReferences

将Sql parser解析出来的UnresolvedAttribute全部都转为对应的实际的catalyst.expressions.AttributeReference AttributeReferences
这里调用了logical plan 的resolve方法,将属性转为NamedExepression。
[java] view plain copy
 
  1. object ResolveReferences extends Rule[LogicalPlan] {  
  2.   def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {  
  3.     case q: LogicalPlan if q.childrenResolved =>  
  4.       logger.trace(s"Attempting to resolve ${q.simpleString}")  
  5.       q transformExpressions {  
  6.         case u @ UnresolvedAttribute(name) =>  
  7.           // Leave unchanged if resolution fails.  Hopefully will be resolved next round.  
  8.           val result = q.resolve(name).getOrElse(u)//转化为NamedExpression  
  9.           logger.debug(s"Resolving $u to $result")  
  10.           result  
  11.       }  
  12.   }  
  13. }  

2.4、 ResolveRelations

这个比较好理解,还记得前面Sql parser吗,比如select * from src,这个src表parse后就是一个UnresolvedRelation节点。
这一步ResolveRelations调用了catalog这个对象。Catalog对象里面维护了一个tableName,Logical Plan的HashMap结果。
通过这个Catalog目录来寻找当前表的结构,从而从中解析出这个表的字段,如UnResolvedRelations 会得到一个tableWithQualifiers。(即表和字段) 
这也解释了为什么流程图那,我会画一个catalog在上面,因为它是Analyzer工作时需要的meta data。
[java] view plain copy
 
  1. object ResolveRelations extends Rule[LogicalPlan] {  
  2.     def apply(plan: LogicalPlan): LogicalPlan = plan transform {  
  3.       case UnresolvedRelation(databaseName, name, alias) =>  
  4.         catalog.lookupRelation(databaseName, name, alias)  
  5.     }  
  6.   }  

2.5、ImplicitGenerate

如果在select语句里只有一个表达式,而且这个表达式是一个Generator(Generator是一个1条记录生成到N条记录的映射)
当在解析逻辑计划时,遇到Project节点的时候,就可以将它转换为Generate类(Generate类是将输入流应用一个函数,从而生成一个新的流)。
[java] view plain copy
 
  1. object ImplicitGenerate extends Rule[LogicalPlan] {  
  2.   def apply(plan: LogicalPlan): LogicalPlan = plan transform {  
  3.     case Project(Seq(Alias(g: Generator, _)), child) =>  
  4.       Generate(g, join = false, outer = false, None, child)  
  5.   }  
  6. }  


2.6 StarExpansion

在Project操作符里,如果是*符号,即select * 语句,可以将所有的references都展开,即将select * 中的*展开成实际的字段。
[java] view plain copy
 
  1.   object StarExpansion extends Rule[LogicalPlan] {  
  2.     def apply(plan: LogicalPlan): LogicalPlan = plan transform {  
  3.       // Wait until children are resolved  
  4.       case p: LogicalPlan if !p.childrenResolved => p  
  5.       // If the projection list contains Stars, expand it.  
  6.       case p @ Project(projectList, child) if containsStar(projectList) =>   
  7.         Project(  
  8.           projectList.flatMap {  
  9.             case s: Star => s.expand(child.output) //展开,将输入的Attributeexpand(input: Seq[Attribute]) 转化为Seq[NamedExpression]  
  10.             case o => o :: Nil  
  11.           },  
  12.           child)  
  13.       case t: ScriptTransformation if containsStar(t.input) =>  
  14.         t.copy(  
  15.           input = t.input.flatMap {  
  16.             case s: Star => s.expand(t.child.output)  
  17.             case o => o :: Nil  
  18.           }  
  19.         )  
  20.       // If the aggregate function argument contains Stars, expand it.  
  21.       case a: Aggregate if containsStar(a.aggregateExpressions) =>  
  22.         a.copy(  
  23.           aggregateExpressions = a.aggregateExpressions.flatMap {  
  24.             case s: Star => s.expand(a.child.output)  
  25.             case o => o :: Nil  
  26.           }  
  27.         )  
  28.     }  
  29.     /** 
  30.      * Returns true if `exprs` contains a [[Star]]. 
  31.      */  
  32.     protected def containsStar(exprs: Seq[Expression]): Boolean =  
  33.       exprs.collect { case _: Star => true }.nonEmpty  
  34.   }  
  35. }  

2.7 ResolveFunctions

这个和ResolveReferences差不多,这里主要是对udf进行resolve。
将这些UDF都在FunctionRegistry里进行查找。
[java] view plain copy
 
  1. object ResolveFunctions extends Rule[LogicalPlan] {  
  2.   def apply(plan: LogicalPlan): LogicalPlan = plan transform {  
  3.     case q: LogicalPlan =>  
  4.       q transformExpressions {  
  5.         case u @ UnresolvedFunction(name, children) if u.childrenResolved =>  
  6.           registry.lookupFunction(name, children) //看是否注册了当前udf  
  7.       }  
  8.   }  
  9. }  

2.8 GlobalAggregates

全局的聚合,如果遇到了Project就返回一个Aggregate.
[java] view plain copy
 
  1. object GlobalAggregates extends Rule[LogicalPlan] {  
  2.   def apply(plan: LogicalPlan): LogicalPlan = plan transform {  
  3.     case Project(projectList, child) if containsAggregates(projectList) =>  
  4.       Aggregate(Nil, projectList, child)  
  5.   }  
  6.   
  7.   def containsAggregates(exprs: Seq[Expression]): Boolean = {  
  8.     exprs.foreach(_.foreach {  
  9.       case agg: AggregateExpression => return true  
  10.       case _ =>  
  11.     })  
  12.     false  
  13.   }  
  14. }  

2.9 typeCoercionRules

这个是Hive里的兼容SQL语法,比如将String和Int互相转换,不需要显示的调用cast xxx  as yyy了。如StringToIntegerCasts。
[java] view plain copy
 
  1. val typeCoercionRules =  
  2.   PropagateTypes ::  
  3.   ConvertNaNs ::  
  4.   WidenTypes ::  
  5.   PromoteStrings ::  
  6.   BooleanComparisons ::  
  7.   BooleanCasts ::  
  8.   StringToIntegralCasts ::  
  9.   FunctionArgumentConversion ::  
  10.   CastNulls ::  
  11.   Nil  

2.10 EliminateAnalysisOperators

将分析的操作符移除,这里仅支持2种,一种是Subquery需要移除,一种是LowerCaseSchema。这些节点都会从Logical Plan里移除。
 
[java] view plain copy
 
  1. object EliminateAnalysisOperators extends Rule[LogicalPlan] {  
  2.   def apply(plan: LogicalPlan): LogicalPlan = plan transform {  
  3.     case Subquery(_, child) => child //遇到Subquery,不反悔本身,返回它的Child,即删除了该元素  
  4.     case LowerCaseSchema(child) => child  
  5.   }  
  6. }  

三、实践

  补充昨天DEBUG的一个例子,这个例子证实了如何将上面的规则应用到Unresolved Logical Plan:
  当传递sql语句的时候,的确调用了ResolveReferences将mobile解析成NamedExpression。
  可以对照这看执行流程,左边是Unresolved Logical Plan,右边是Resoveld Logical Plan。
  先是执行了Batch Resolution,eg: 调用ResovelRalation这个RUle来使 Unresovled Relation 转化为 SparkLogicalPlan并通过Catalog找到了其对于的字段属性。
  然后执行了Batch Analysis Operator。eg:调用EliminateAnalysisOperators来将SubQuery给remove掉了。
  可能格式显示的不太好,可以向右边拖动下滚动轴看下结果。 :) 
  
[java] view plain copy
 
  1. val exec = sqlContext.sql("select mobile as mb, sid as id, mobile*2 multi2mobile, count(1) times from (select * from temp_shengli_mobile)a where pfrom_id=0.0 group by mobile, sid,  mobile*2")  
  2. 14/07/21 18:23:32 DEBUG SparkILoop$SparkILoopInterpreter: Invoking: public static java.lang.String $line47.$eval.$print()  
  3. 14/07/21 18:23:33 INFO Analyzer: Max iterations (2) reached for batch MultiInstanceRelations  
  4. 14/07/21 18:23:33 INFO Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences  
  5. 14/07/21 18:23:33 DEBUG Analyzer$ResolveReferences$: Resolving ‘pfrom_id to pfrom_id#5  
  6. 14/07/21 18:23:33 DEBUG Analyzer$ResolveReferences$: Resolving ‘mobile to mobile#2  
  7. 14/07/21 18:23:33 DEBUG Analyzer$ResolveReferences$: Resolving ‘sid to sid#1  
  8. 14/07/21 18:23:33 DEBUG Analyzer$ResolveReferences$: Resolving ‘mobile to mobile#2  
  9. 14/07/21 18:23:33 DEBUG Analyzer$ResolveReferences$: Resolving ‘mobile to mobile#2  
  10. 14/07/21 18:23:33 DEBUG Analyzer$ResolveReferences$: Resolving ‘sid to sid#1  
  11. 14/07/21 18:23:33 DEBUG Analyzer$ResolveReferences$: Resolving ‘mobile to mobile#2  
  12. 14/07/21 18:23:33 DEBUG Analyzer:   
  13. === Result of Batch Resolution ===  
  14. !Aggregate [‘mobile,‘sid,(‘mobile * 2) AS c2#27], [‘mobile AS mb#23,‘sid AS id#24,(‘mobile * 2) AS multi2mobile#25,COUNT(1) AS times#26L]   Aggregate [mobile#2,sid#1,(CAST(mobile#2, DoubleType) * CAST(2, DoubleType)) AS c2#27], [mobile#2 AS mb#23,sid#1 AS id#24,(CAST(mobile#2, DoubleType) * CAST(2, DoubleType)) AS multi2mobile#25,COUNT(1) AS times#26L]  
  15. ! Filter (‘pfrom_id = 0.0)                                                                                                                   Filter (CAST(pfrom_id#5, DoubleType) = 0.0)  
  16.    Subquery a                                                                                                                                 Subquery a  
  17. !   Project [*]                                                                                                                                Project [data_date#0,sid#1,mobile#2,pverify_type#3,create_time#4,pfrom_id#5,p_status#6,pvalidate_time#7,feffect_time#8,plastupdate_ip#9,update_time#10,status#11,preserve_int#12]  
  18. !    UnresolvedRelation None, temp_shengli_mobile, None                                                                                         Subquery temp_shengli_mobile  
  19. !                                                                                                                                                SparkLogicalPlan (ExistingRdd [data_date#0,sid#1,mobile#2,pverify_type#3,create_time#4,pfrom_id#5,p_status#6,pvalidate_time#7,feffect_time#8,plastupdate_ip#9,update_time#10,status#11,preserve_int#12], MapPartitionsRDD[4] at mapPartitions at basicOperators.scala:174)  
  20.           
  21. 14/07/21 18:23:33 DEBUG Analyzer:   
  22. === Result of Batch AnalysisOperators ===  
  23. !Aggregate [‘mobile,‘sid,(‘mobile * 2) AS c2#27], [‘mobile AS mb#23,‘sid AS id#24,(‘mobile * 2) AS multi2mobile#25,COUNT(1) AS times#26L]   Aggregate [mobile#2,sid#1,(CAST(mobile#2, DoubleType) * CAST(2, DoubleType)) AS c2#27], [mobile#2 AS mb#23,sid#1 AS id#24,(CAST(mobile#2, DoubleType) * CAST(2, DoubleType)) AS multi2mobile#25,COUNT(1) AS times#26L]  
  24. ! Filter (‘pfrom_id = 0.0)                                                                                                                   Filter (CAST(pfrom_id#5, DoubleType) = 0.0)  
  25. !  Subquery a                                                                                                                                 Project [data_date#0,sid#1,mobile#2,pverify_type#3,create_time#4,pfrom_id#5,p_status#6,pvalidate_time#7,feffect_time#8,plastupdate_ip#9,update_time#10,status#11,preserve_int#12]  
  26. !   Project [*]                                                                                                                                SparkLogicalPlan (ExistingRdd [data_date#0,sid#1,mobile#2,pverify_type#3,create_time#4,pfrom_id#5,p_status#6,pvalidate_time#7,feffect_time#8,plastupdate_ip#9,update_time#10,status#11,preserve_int#12], MapPartitionsRDD[4] at mapPartitions at basicOperators.scala:174)  
  27. !    UnresolvedRelation None, temp_shengli_mobile, None                                                                                       
  28.           

四、总结

    本文从源代码角度分析了Analyzer在对Sql Parser解析出的UnResolve Logical Plan 进行analyze的过程中,所执行的流程。
    流程是实例化一个SimpleAnalyzer,定义一些Batch,然后遍历这些Batch在RuleExecutor的环境下,执行Batch里面的Rules,每个Rule会对Unresolved Logical Plan进行Resolve,有些可能不能一次解析出,需要多次迭代,直到达到max迭代次数或者达到fix point。这里Rule里比较常用的就是ResolveReferences、ResolveRelations、StarExpansion、GlobalAggregates、typeCoercionRules和EliminateAnalysisOperators。
 
 
——EOF——

原创文章,转载请注明:

转载自:OopsOutOfMemory盛利的Blog,作者: OopsOutOfMemory

本文链接地址:http://blog.csdn.net/oopsoom/article/details/38025185

注:本文基于署名-非商业性使用-禁止演绎 2.5 中国大陆(CC BY-NC-ND 2.5 CN)协议,欢迎转载、转发和评论,但是请保留本文作者署名和文章链接。如若需要用于商业目的或者与授权方面的协商,请联系我。

技术分享

转自:http://blog.csdn.net/oopsoom/article/details/38025185





以上是关于Spark源码分析之SparkSubmit的流程的主要内容,如果未能解决你的问题,请参考以下文章

Spark学习之路 (十六)SparkCore的源码解读spark-submit提交脚本

第二篇:Spark SQL Catalyst源码分析之SqlParser

Spark源码分析之Sort-Based Shuffle读写流程

第三篇:Spark SQL Catalyst源码分析之Analyzer

Spark源码分析之四:Stage提交

02 Spark架构与运行流程