小记--------spark-job触发流程源码分析

Posted yzqyxq

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了小记--------spark-job触发流程源码分析相关的知识,希望对你有一定的参考价值。

 job是串行执行的, 执行完上一个才执行下一个
 
eg:Wordcount案例
val lines = sc.textFile("本地URL or HDFS URL")//详解见代码1
val words = lines.flatMap(line => line.split(" "))//也会返回一个MapPartitionsRDD
val pairs = words.map(word => (word , 1))//同样也是返回一个MapPartitionsRDD
val counts = pairs.reduceByKey(_+_)//详解见代码2
counts.foreach(count => printLn(count._1 + ":" + count._2))//见代码4

 

源码位置:
SparkContext类:spark-core_2.11-2.1.0-sources.jar > org.apache.spark.SparkContext.scala
代码1
/**
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
*/
/**
*首先,hadoopFile()方法的调用,会创建一个HadoopRDD,其中的元素,其实是(key,value) pair RDD . key 是hdfs或文本文件的每一行的offset, value 就是文本行
*然后对HadoopRDD调用map()方法,会剔除key,只保留value,然后会获得一个MapPartitionsRDD,MapPartitionsRDD内部的元素,其实就是一行一行的文本行
*/
def textFile(
    path: String,
    minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
  assertNotStopped()
  hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
    minPartitions).map(pair => pair._2.toString).setName(path)
}
 
//因为RDD.scala类中是没有ReduceByKey方法的,因此它会调用ReduceByKey方法时,会触发scala的隐式转换;此时就会在作用域内,寻找隐式转换,会在RDD中找到rddToPairRDDFunctions()隐式转换,然后再去PairRDDFunctions类里面调用ReduceByKey方法
implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
  (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
  new PairRDDFunctions(rdd)//代码详见代码3
}
 
代码3
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
  combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}
 
 
代码4
//通过foreach方法进行runjob的多次重载到本RunJob方法
def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    resultHandler: (Int, U) => Unit): Unit = {
  if (stopped.get()) {
    throw new IllegalStateException("SparkContext has been shutdown")
  }
  val callSite = getCallSite
  val cleanedFunc = clean(func)
  logInfo("Starting job: " + callSite.shortForm)
  if (conf.getBoolean("spark.logLineage", false)) {
    logInfo("RDD‘s recursive dependencies:
" + rdd.toDebugString)
  }
// 调用SparkContext,之前初始化时创建的DAGScheduler的Runjob方法
// 会把当前执行action操作的RDD传到DAGScheduler的runjob方法中
  dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
  progressBar.foreach(_.finishAll())
  rdd.doCheckpoint()
}

 

 
 
 
 

以上是关于小记--------spark-job触发流程源码分析的主要内容,如果未能解决你的问题,请参考以下文章

如何在 spark-2.1.1-bin-hadoop2.7 的 bin 文件夹外运行 spark-jobs

设计- Kafka Producer 可以写成 Spark-job 吗?

Node程序debug小记

android源码解析(二十六)-->截屏事件流程

spark-job提交原理和资源配置

XXL-JOB分布式任务调度框架-源码分析-任务调度执行流程及实现原理