spark 启动job的流程分析

Posted Brenda

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark 启动job的流程分析相关的知识,希望对你有一定的参考价值。

WordCount開始分析

编写一个样例程序

编写一个从HDFS中读取并计算wordcount的样例程序:

packageorg.apache.spark.examples


importorg.apache.spark.SparkContext

importorg.apache.spark.SparkContext._


objectWordCount{

defmain(args : Array[String]) {

valsc = newSparkContext(args(0),"wordcount by hdfs",

System.getenv("SPARK_HOME"),SparkContext.jarOfClass(this.getClass()))

//hadoophdfs的根路径下得到一个文件

valfile =sc.textFile("/hadoop-test.txt")

valcounts =file.flatMap(line=> line.split(" "))

.map(word => (word,1)).reduceByKey(_+ _)

counts.saveAsTextFile("/newtest.txt")

}


}


生成SparkContext实例

在上面样例中。要运行map/reduce操作。首先须要一个SparkContext。因此看看SparkContext的实例生成

defthis(

master: String,

appName: String,

sparkHome: String = null,

jars: Seq[String] = Nil,

environment: Map[String, String]= Map(),

preferredNodeLocationData:Map[String, Set[SplitInfo]] = Map()) =

{

this(SparkContext.updatedConf(newSparkConf(), master, appName, sparkHome, jars, environment),

preferredNodeLocationData)

}


编写WordCount样例时使用了上面列出的构造函数,后面两个environmentpreferredNodeLocationData传入为默认值。

调用updatedConf的单例函数,生成或更新当前的SparkConf实例。

调用SparkContext的默认构造函数。

1.生成并启动监控的Jettyui,SparkUI.

2.生成TaskScheduler实例,并启动。

此函数会依据不同的mastername生成不同的TaskScheduler实例。,yarn-clusterYarnClusterScheduler

主要用来启动/停止task,监控task的执行状态。

private[spark]vartaskScheduler= SparkContext.createTaskScheduler(this,master,appName)

taskScheduler.start()

3.生成DAGScheduler实例,并启动。


@volatileprivate[spark]vardagScheduler= newDAGScheduler(taskScheduler)

dagScheduler.start()


scheduler进行start操作后,通过调用postStartHook来把SparkContext加入到appmaster中。

生成WorkerRunnable线程。通过nmclient启动worker相应的container。此container线程CoarseGrainedExecutorBackend的实例,此实例通过Executor实例来载入相关的task

SparkContext.textFile生成RDD

此方法用来生成RDD的实例,通常读取文本文件的方式通过textFile来进行,并其调用hadoopFile来运行。

通过hadoopFile得到一个HadoopRDD<K,V>的实例后,通过.map得到V的值。并生成RDD返回。

deftextFile(path: String, minSplits: Int = defaultMinSplits):RDD[String] = {

hadoopFile(path,classOf[TextInputFormat], classOf[LongWritable], classOf[Text],

minSplits).map(pair=> pair._2.toString)

}

终于通过hadoopFile函数生成一个HadoopRDD实例。

defhadoopFile[K, V](

path: String,

inputFormatClass: Class[_ <:InputFormat[K, V]],

keyClass: Class[K],

valueClass: Class[V],

minSplits: Int = defaultMinSplits

): RDD[(K, V)] = {

//A Hadoopconfiguration can be about 10 KB, which is pretty big, so broadcastit.

valconfBroadcast= broadcast(newSerializableWritable(hadoopConfiguration))

valsetInputPathsFunc= (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf,path)

newHadoopRDD(

this,

confBroadcast,

Some(setInputPathsFunc),

inputFormatClass,

keyClass,

valueClass,

minSplits)

}


RDD函数的抽象运行

reduceByKey须要执行shufflereduce。也就是须要多个map中的数据集合到同样的reduce中执行,生成相关的DAG任务

valfile =sc.textFile("/hadoop-test.txt")

valcounts =file.flatMap(line=> line.split(" "))

.map(word => (word,1)).reduceByKey(_+ _)

counts.saveAsTextFile("/newtest.txt")


在以上代码中,textFile,flatMap,map,reduceByKeysparkRDDtransformation,

saveAsTextFile才是RDD中进行运行操作的action.

下面引用http://my-oschina-net/hanzhankang/blog/200275的相关说明:

详细可參见:http://spark.apache.org/docs/0.9.0/scala-programming-guide.html

1transformation是得到一个新的RDD,方式非常多。比方从数据源生成一个新的RDD,从RDD生成一个新的RDD

2action是得到一个值,或者一个结果(直接将RDDcache到内存中)

全部的transformation都是採用的懒策略。就是假设仅仅是将transformation提交是不会运行计算的。计算仅仅有在action被提交的时候才被触发。


transformation操作:

map(func):对调用mapRDD数据集中的每一个element都使用func,然后返回一个新的RDD,这个返回的数据集是分布式的数据集

filter(func): 对调用filterRDD数据集中的每一个元素都使用func,然后返回一个包括使functrue的元素构成的RDD

flatMap(func):map差点儿相同。可是flatMap生成的是多个结果

mapPartitions(func):map非常像,可是map是每一个element。而mapPartitions是每一个partition

mapPartitionsWithSplit(func):mapPartitions非常像,可是func作用的是当中一个split上,所以func中应该有index

sample(withReplacement,faction,seed):抽样

union(otherDataset):返回一个新的dataset,包括源dataset和给定dataset的元素的集合

distinct([numTasks]):返回一个新的dataset,这个dataset含有的是源dataset中的distinctelement

groupByKey(numTasks):返回(K,Seq[V])。也就是hadoopreduce函数接受的key-valuelist

reduceByKey(func,[numTasks]):就是用一个给定的reducefunc再作用在groupByKey产生的(K,Seq[V]),比方求和。求平均数

sortByKey([ascending],[numTasks]):依照key来进行排序,是升序还是降序。ascendingboolean类型

join(otherDataset,[numTasks]):当有两个KVdataset(K,V)(K,W)。返回的是(K,(V,W))dataset,numTasks为并发的任务数

cogroup(otherDataset,[numTasks]):当有两个KVdataset(K,V)(K,W),返回的是(K,Seq[V],Seq[W])dataset,numTasks为并发的任务数

cartesian(otherDataset):笛卡尔积就是m*n,大家懂的


action操作:

reduce(func):说白了就是聚集,可是传入的函数是两个參数输入返回一个值。这个函数必须是满足交换律和结合律的

collect():一般在filter或者足够小的结果的时候。再用collect封装返回一个数组

count():返回的是dataset中的element的个数

first():返回的是dataset中的第一个元素

take(n):返回前nelements。这个士driverprogram返回的

takeSample(withReplacementnumseed):抽样返回一个dataset中的num个元素,随机种子seed

saveAsTextFilepath):把dataset写到一个textfile中,或者hdfs。或者hdfs支持的文件系统中,spark把每条记录都转换为一行记录。然后写到file

saveAsSequenceFile(path):仅仅能用在key-value对上。然后生成SequenceFile写到本地或者hadoop文件系统

countByKey():返回的是key相应的个数的一个map,作用于一个RDD

foreach(func):dataset中的每一个元素都使用func


RDDaction中提交Job

在运行RDDsaveAsTextFile时调用SparkContext.runJob方法

saveAsTextFile方法,-->saveAsHadoopFile,终于调用SparkContext.runJob方法

defsaveAsTextFile(path: String) {

this.map(x=> (NullWritable.get(), newText(x.toString)))

.saveAsHadoopFile[TextOutputFormat[NullWritable,Text]](path)

}

......下面一行代码就是在saveASTextFile函数嵌套调用中终于调用的函数,调用SparkContext.runJob

self.context.runJob(self,writeToFile _)

SparkContext.runJob的定义:

defrunJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T])=> U): Array[U] = {

runJob(rdd, func, 0until rdd.partitions.size,false)

}

SparkContext的终于运行runJob函数定义

defrunJob[T, U: ClassTag](

rdd: RDD[T],//此处是详细的RDD实例值

func: (TaskContext, Iterator[T])=> U,//详细的运行的action的逻辑,reduceByKey

partitions:Seq[Int],//分区数组,一个数值从0partitions.size-1

allowLocal: Boolean,//能否够在本地运行

//result的处理逻辑,每个Task的处理

resultHandler: (Int, U) =>Unit) {

val以上是关于spark 启动job的流程分析的主要内容,如果未能解决你的问题,请参考以下文章

源码分析XXL-JOB的执行器的注册流程

Spark源码分析之SparkSubmit的流程

Spark 启动 | 从启动脚本分析 Master 的启动流程

Spark启动流程(Standalone)-分析

sona:Spark on Angel任务启动流程分析

sona:Spark on Angel任务启动流程分析