spark 启动job的流程分析
Posted Brenda
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark 启动job的流程分析相关的知识,希望对你有一定的参考价值。
从WordCount開始分析
编写一个样例程序
编写一个从HDFS中读取并计算wordcount的样例程序:
packageorg.apache.spark.examples
importorg.apache.spark.SparkContext
importorg.apache.spark.SparkContext._
objectWordCount{
defmain(args : Array[String]) {
valsc = newSparkContext(args(0),"wordcount by hdfs",
System.getenv("SPARK_HOME"),SparkContext.jarOfClass(this.getClass()))
//从hadoophdfs的根路径下得到一个文件
valfile =sc.textFile("/hadoop-test.txt")
valcounts =file.flatMap(line=> line.split(" "))
.map(word => (word,1)).reduceByKey(_+ _)
counts.saveAsTextFile("/newtest.txt")
}
}
生成SparkContext实例
在上面样例中。要运行map/reduce操作。首先须要一个SparkContext。因此看看SparkContext的实例生成
defthis(
master: String,
appName: String,
sparkHome: String = null,
jars: Seq[String] = Nil,
environment: Map[String, String]= Map(),
preferredNodeLocationData:Map[String, Set[SplitInfo]] = Map()) =
{
this(SparkContext.updatedConf(newSparkConf(), master, appName, sparkHome, jars, environment),
preferredNodeLocationData)
}
编写WordCount样例时使用了上面列出的构造函数,后面两个environment与preferredNodeLocationData传入为默认值。
调用updatedConf的单例函数,生成或更新当前的SparkConf实例。
调用SparkContext的默认构造函数。
1.生成并启动监控的Jettyui,SparkUI.
2.生成TaskScheduler实例,并启动。
此函数会依据不同的mastername生成不同的TaskScheduler实例。,yarn-cluster为YarnClusterScheduler。
主要用来启动/停止task,监控task的执行状态。
private[spark]vartaskScheduler= SparkContext.createTaskScheduler(this,master,appName)
taskScheduler.start()
3.生成DAGScheduler实例,并启动。
@volatileprivate[spark]vardagScheduler= newDAGScheduler(taskScheduler)
dagScheduler.start()
在scheduler进行start操作后,通过调用postStartHook来把SparkContext加入到appmaster中。
生成WorkerRunnable线程。通过nmclient启动worker相应的container。此container线程CoarseGrainedExecutorBackend的实例,此实例通过Executor实例来载入相关的task。
SparkContext.textFile生成RDD
此方法用来生成RDD的实例,通常读取文本文件的方式通过textFile来进行,并其调用hadoopFile来运行。
通过hadoopFile得到一个HadoopRDD<K,V>的实例后,通过.map得到V的值。并生成RDD返回。
deftextFile(path: String, minSplits: Int = defaultMinSplits):RDD[String] = {
hadoopFile(path,classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minSplits).map(pair=> pair._2.toString)
}
终于通过hadoopFile函数生成一个HadoopRDD实例。
defhadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <:InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int = defaultMinSplits
): RDD[(K, V)] = {
//A Hadoopconfiguration can be about 10 KB, which is pretty big, so broadcastit.
valconfBroadcast= broadcast(newSerializableWritable(hadoopConfiguration))
valsetInputPathsFunc= (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf,path)
newHadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minSplits)
}
RDD函数的抽象运行
reduceByKey须要执行shuffle的reduce。也就是须要多个map中的数据集合到同样的reduce中执行,生成相关的DAG任务
valfile =sc.textFile("/hadoop-test.txt")
valcounts =file.flatMap(line=> line.split(" "))
.map(word => (word,1)).reduceByKey(_+ _)
counts.saveAsTextFile("/newtest.txt")
在以上代码中,textFile,flatMap,map,reduceByKey都是spark中RDD的transformation,
而saveAsTextFile才是RDD中进行运行操作的action.
下面引用http://my-oschina-net/hanzhankang/blog/200275的相关说明:
详细可參见:http://spark.apache.org/docs/0.9.0/scala-programming-guide.html。
1,transformation是得到一个新的RDD,方式非常多。比方从数据源生成一个新的RDD,从RDD生成一个新的RDD
2,action是得到一个值,或者一个结果(直接将RDDcache到内存中)
全部的transformation都是採用的懒策略。就是假设仅仅是将transformation提交是不会运行计算的。计算仅仅有在action被提交的时候才被触发。
transformation操作:
map(func):对调用map的RDD数据集中的每一个element都使用func,然后返回一个新的RDD,这个返回的数据集是分布式的数据集
filter(func): 对调用filter的RDD数据集中的每一个元素都使用func,然后返回一个包括使func为true的元素构成的RDD
flatMap(func):和map差点儿相同。可是flatMap生成的是多个结果
mapPartitions(func):和map非常像,可是map是每一个element。而mapPartitions是每一个partition
mapPartitionsWithSplit(func):和mapPartitions非常像,可是func作用的是当中一个split上,所以func中应该有index
sample(withReplacement,faction,seed):抽样
union(otherDataset):返回一个新的dataset,包括源dataset和给定dataset的元素的集合
distinct([numTasks]):返回一个新的dataset,这个dataset含有的是源dataset中的distinct的element
groupByKey(numTasks):返回(K,Seq[V])。也就是hadoop中reduce函数接受的key-valuelist
reduceByKey(func,[numTasks]):就是用一个给定的reducefunc再作用在groupByKey产生的(K,Seq[V]),比方求和。求平均数
sortByKey([ascending],[numTasks]):依照key来进行排序,是升序还是降序。ascending是boolean类型
join(otherDataset,[numTasks]):当有两个KV的dataset(K,V)和(K,W)。返回的是(K,(V,W))的dataset,numTasks为并发的任务数
cogroup(otherDataset,[numTasks]):当有两个KV的dataset(K,V)和(K,W),返回的是(K,Seq[V],Seq[W])的dataset,numTasks为并发的任务数
cartesian(otherDataset):笛卡尔积就是m*n,大家懂的
action操作:
reduce(func):说白了就是聚集,可是传入的函数是两个參数输入返回一个值。这个函数必须是满足交换律和结合律的
collect():一般在filter或者足够小的结果的时候。再用collect封装返回一个数组
count():返回的是dataset中的element的个数
first():返回的是dataset中的第一个元素
take(n):返回前n个elements。这个士driverprogram返回的
takeSample(withReplacement。num,seed):抽样返回一个dataset中的num个元素,随机种子seed
saveAsTextFile(path):把dataset写到一个textfile中,或者hdfs。或者hdfs支持的文件系统中,spark把每条记录都转换为一行记录。然后写到file中
saveAsSequenceFile(path):仅仅能用在key-value对上。然后生成SequenceFile写到本地或者hadoop文件系统
countByKey():返回的是key相应的个数的一个map,作用于一个RDD
foreach(func):对dataset中的每一个元素都使用func
RDD的action中提交Job
在运行RDD的saveAsTextFile时调用SparkContext.runJob方法
saveAsTextFile方法,-->saveAsHadoopFile,终于调用SparkContext.runJob方法
defsaveAsTextFile(path: String) {
this.map(x=> (NullWritable.get(), newText(x.toString)))
.saveAsHadoopFile[TextOutputFormat[NullWritable,Text]](path)
}
......下面一行代码就是在saveASTextFile函数嵌套调用中终于调用的函数,调用SparkContext.runJob
self.context.runJob(self,writeToFile _)
SparkContext.runJob的定义:
defrunJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T])=> U): Array[U] = {
runJob(rdd, func, 0until rdd.partitions.size,false)
}
SparkContext的终于运行runJob函数定义
defrunJob[T, U: ClassTag](
rdd: RDD[T],//此处是详细的RDD实例值
func: (TaskContext, Iterator[T])=> U,//详细的运行的action的逻辑,如reduceByKey
partitions:Seq[Int],//分区数组,一个数值从0到partitions.size-1
allowLocal: Boolean,//能否够在本地运行
//result的处理逻辑,每个Task的处理
resultHandler: (Int, U) =>Unit) {
val以上是关于spark 启动job的流程分析的主要内容,如果未能解决你的问题,请参考以下文章