山东大学软件工程应用与实践——Spark(13)代码分析

Posted Cruel.p

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了山东大学软件工程应用与实践——Spark(13)代码分析相关的知识,希望对你有一定的参考价值。

2021SC@SDUSC

1.执行物理执行计划:

   经过分析、优化、逻辑计划转换为物理计划的懒执行,最终调用SparkPlan的execute方法执行物理计划。以execution.Project为例,其execute方法见代码:

Project及其execute方法:


@DeveloperApi
case class Project (projectList: Seq [NamedExpression], chi1d: SparkPlan) extends
    UnaryNode 
    override def output = projectList .map(_.toAttribute)

    @transient 1azy val bui ldProjection = newMutableProjection (projectList, child.output)

    def execute() = child.execute().mapPartitions  iter =>
        val resuableProjection = bu1ldProjection ().
        iter.map (resuableProjection)
    

        Project的execute方法执行步骤:

        1.调用child的execute方法,以保证将要投影的输入数据已经经过处理。

        2.调用SparkOlan的newMutableProjection来处理其投影操作,newMutableProjection的实现代码:

SparkPlan的newMutableProjection方法:


protected def newMutableProjection(
    expressions: Seq[Expression],
    inputSchema: Seq[Attribute]): () => MutableProjection = 

    log.debug (
    s"Creating MutableProj: $expressions, inputSchema: $inputSchema, codegen: $codegenEnabled")
    if (codegenEnabled) 
        GenerateMutableProjection (expressions,inputSchema)
     else 
        () => new InterpretedMutableProjection(expressions, inputSchema)
    


        newMuablePrijction默认情况下使用InterpretedMutableProjection处理投影,其实现见代码。BindReferences. bindReference再次使用了transform 方法,用于给表达式绑定引用,比如将List(name#1)替换为List(input[1])。最终的投影由InterpretedMutableProjection的apply方法来完成。BindReferences.bindReference 的实现见代码:

Projection.scala中的InterpretedMutableProjection实现:


case class InterpretedMutableProjection (expressions: Seq[Expression]) extends
MutableProjection 
    def this lexpressions: Seq lExpression], inputSchema: Seq[Attribute]l =
        this (expressions.map (BindRe ferences.bindReference(_, input Schema1)l

    privatelthis] val exprArray = expressions.toArray
    private[this] var mutableRow: MutableRow = new GenericMutableRow (exprArray.size)
    def currentvalue: Row = mutableRow

    override def target (xow: MutableRow): MutableProjectlon = 
        mutableRow = row
        this
    

    override def apply(input: Row): Row - 
        var i = 0
        while (i < exptArray.length) 
            mutableRow(i) = exprArray(i).eval (input)
            i += 1
        
        mutableRow
    

BindReferences. bindReference的实现


object BindReferences extends Logging 

    def bindReference[A <: Expression] (
        expression: A,
        input: Seq [Attribute],
        allowFailures: Boolean = false): A = 
    expression. transform  case a: AttributeReference =>
        attachTree(a, "Binding attribute") (
        val ordinal = input. indexWhere(_.exprId = a.exprId)
        if (ordinal == -1) 
            if (allowFailures) (
                a
             else 
                    sys .error(s"Couldn't find $a in $input.mkString("[",",", "]")")
                
             else 
                BoundReference (ordinal, a.dataType, a.nullable)
            
            
    .asInstanceof[A] // Kind of a hack, but safe. TODO: Tighten return type when possible.
    

再以execution.Filter为例,其execute方法见代码:
Filter的execute方法的执行步骤如下:
1 )调用child的execute方法,以保证将要过滤的输入数据已经经过处理。
2)调用SparkPlan的newPredicate来处理其过滤操作,newPredicate 的实现见代码:

Filter及其excute方法:


@DeveloperApi
case class Filter (condition: Expression, child: SparkPlan) extends UnaryNode 
    override def output = child. output

    @transient lazy val conditionEvaluator = newPredicate (condition, child. output)

    def execute() = child. execute() .mapPartitions  iter =>
        iter. filter (conditionEvaluator)
    


        newPredicate默认使用InterpretedPredicate处理过滤,其实现见下面代码,BindReferences. bindReference方法在此处将(age#0 >= 13) && (age#0 <= 19))转换为[(input[0] >=13), (input[0] <= 19)]。 最终的过滤由InterpretedPredicate 的第二个apply 方法来完成。

SparkPlan的newPredicate方法:


protected def newPredicate (
        expression: Expression, inputSchema: Seq [Attribute]): (Row) => Boolean = 
    if (codegenEnabled)
        GeneratePredicate (expression, inputSchema)
     else
        InterpretedPredicate (expression,inputSchema)
    

InterpretedPredicate的实现:


object InterpretedPredicate 
    def apply (expression: Expression, inputSchema: Seq[Attribute]): (Row => Boolean) =
        apply (BindReferences.bindReference (expression, inputSchema) )
    
    def apply (expression: Expression) : (Row => Boolean) = 
        (r: Row) => expression.eval (r).asInstanceOf[Boolean]
    

        execution.Project和execution.Filter都有child,并不是所有的SparkPlan的子类都有child。
        比如execution.PhysicalRDD是没有child的,因为execution.PhysicalRDD一般是作为最底层的LogicalPlan 节点,其代码实现如下。

case class PhysicalRDD (output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode 
override def execute () = rdd 


        基于整个SparkPlan的execute体系,就可以保证先执行低层(孩子)的SparkPlan的转换动作,然后才执行当前SparkPlan的转换动作,最终完成SQL的执行。

以上是关于山东大学软件工程应用与实践——Spark(13)代码分析的主要内容,如果未能解决你的问题,请参考以下文章

软件工程应用与实践(13)——JWT

软件工程应用与实践(13)——JWT

原创 Hadoop&Spark 动手实践 8Spark 应用经验调优与动手实践

原创 Hadoop&Spark 动手实践 11Spark Streaming 应用与动手实践

寒假学习报告13

2015级计算机科学与技术软件工程(西北师范大学)助教总结