山东大学软件工程应用与实践——Spark(13)代码分析
Posted Cruel.p
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了山东大学软件工程应用与实践——Spark(13)代码分析相关的知识,希望对你有一定的参考价值。
2021SC@SDUSC
1.执行物理执行计划:
经过分析、优化、逻辑计划转换为物理计划的懒执行,最终调用SparkPlan的execute方法执行物理计划。以execution.Project为例,其execute方法见代码:
Project及其execute方法:
@DeveloperApi
case class Project (projectList: Seq [NamedExpression], chi1d: SparkPlan) extends
UnaryNode
override def output = projectList .map(_.toAttribute)
@transient 1azy val bui ldProjection = newMutableProjection (projectList, child.output)
def execute() = child.execute().mapPartitions iter =>
val resuableProjection = bu1ldProjection ().
iter.map (resuableProjection)
Project的execute方法执行步骤:
1.调用child的execute方法,以保证将要投影的输入数据已经经过处理。
2.调用SparkOlan的newMutableProjection来处理其投影操作,newMutableProjection的实现代码:
SparkPlan的newMutableProjection方法:
protected def newMutableProjection(
expressions: Seq[Expression],
inputSchema: Seq[Attribute]): () => MutableProjection =
log.debug (
s"Creating MutableProj: $expressions, inputSchema: $inputSchema, codegen: $codegenEnabled")
if (codegenEnabled)
GenerateMutableProjection (expressions,inputSchema)
else
() => new InterpretedMutableProjection(expressions, inputSchema)
newMuablePrijction默认情况下使用InterpretedMutableProjection处理投影,其实现见代码。BindReferences. bindReference再次使用了transform 方法,用于给表达式绑定引用,比如将List(name#1)替换为List(input[1])。最终的投影由InterpretedMutableProjection的apply方法来完成。BindReferences.bindReference 的实现见代码:
Projection.scala中的InterpretedMutableProjection实现:
case class InterpretedMutableProjection (expressions: Seq[Expression]) extends
MutableProjection
def this lexpressions: Seq lExpression], inputSchema: Seq[Attribute]l =
this (expressions.map (BindRe ferences.bindReference(_, input Schema1)l
privatelthis] val exprArray = expressions.toArray
private[this] var mutableRow: MutableRow = new GenericMutableRow (exprArray.size)
def currentvalue: Row = mutableRow
override def target (xow: MutableRow): MutableProjectlon =
mutableRow = row
this
override def apply(input: Row): Row -
var i = 0
while (i < exptArray.length)
mutableRow(i) = exprArray(i).eval (input)
i += 1
mutableRow
BindReferences. bindReference的实现
object BindReferences extends Logging
def bindReference[A <: Expression] (
expression: A,
input: Seq [Attribute],
allowFailures: Boolean = false): A =
expression. transform case a: AttributeReference =>
attachTree(a, "Binding attribute") (
val ordinal = input. indexWhere(_.exprId = a.exprId)
if (ordinal == -1)
if (allowFailures) (
a
else
sys .error(s"Couldn't find $a in $input.mkString("[",",", "]")")
else
BoundReference (ordinal, a.dataType, a.nullable)
.asInstanceof[A] // Kind of a hack, but safe. TODO: Tighten return type when possible.
再以execution.Filter为例,其execute方法见代码:
Filter的execute方法的执行步骤如下:
1 )调用child的execute方法,以保证将要过滤的输入数据已经经过处理。
2)调用SparkPlan的newPredicate来处理其过滤操作,newPredicate 的实现见代码:
Filter及其excute方法:
@DeveloperApi
case class Filter (condition: Expression, child: SparkPlan) extends UnaryNode
override def output = child. output
@transient lazy val conditionEvaluator = newPredicate (condition, child. output)
def execute() = child. execute() .mapPartitions iter =>
iter. filter (conditionEvaluator)
newPredicate默认使用InterpretedPredicate处理过滤,其实现见下面代码,BindReferences. bindReference方法在此处将(age#0 >= 13) && (age#0 <= 19))转换为[(input[0] >=13), (input[0] <= 19)]。 最终的过滤由InterpretedPredicate 的第二个apply 方法来完成。
SparkPlan的newPredicate方法:
protected def newPredicate (
expression: Expression, inputSchema: Seq [Attribute]): (Row) => Boolean =
if (codegenEnabled)
GeneratePredicate (expression, inputSchema)
else
InterpretedPredicate (expression,inputSchema)
InterpretedPredicate的实现:
object InterpretedPredicate
def apply (expression: Expression, inputSchema: Seq[Attribute]): (Row => Boolean) =
apply (BindReferences.bindReference (expression, inputSchema) )
def apply (expression: Expression) : (Row => Boolean) =
(r: Row) => expression.eval (r).asInstanceOf[Boolean]
execution.Project和execution.Filter都有child,并不是所有的SparkPlan的子类都有child。
比如execution.PhysicalRDD是没有child的,因为execution.PhysicalRDD一般是作为最底层的LogicalPlan 节点,其代码实现如下。
case class PhysicalRDD (output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode
override def execute () = rdd
基于整个SparkPlan的execute体系,就可以保证先执行低层(孩子)的SparkPlan的转换动作,然后才执行当前SparkPlan的转换动作,最终完成SQL的执行。
以上是关于山东大学软件工程应用与实践——Spark(13)代码分析的主要内容,如果未能解决你的问题,请参考以下文章
原创 Hadoop&Spark 动手实践 8Spark 应用经验调优与动手实践