深入理解spark-taskScheduler,schedulerBackend源码分析
Posted yankang
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入理解spark-taskScheduler,schedulerBackend源码分析相关的知识,希望对你有一定的参考价值。
上次分析了dagshceduler是如何将任务拆分成job,stage,task的,但是拆分后的仅仅是一个逻辑结果,保存为一个resultstage对象,并没执行;
而将任务正在执行的是spark的taskscheduler模块和shcedulerbackend模块,
taskcheduler模块负责task的调度,schedulerbackend负责task的自愿申请,这两个结合比价紧密,实现也是在一起实现的;
点开sparkcontext的内部属性,可以看到taskscheduler的的对象(org.apache.spark.scheduler.TaskScheduler)是一个trait(Scala的叫法,简单的理解为类似于java的interface),这是因为task的提交方式有多种,可以是yarn-client模式,也可以是yarn-cluster模型,这取决于提交spark提交时候设置的参数master。
master设置不同,最终实现的也不同,当是yarn-client模式的时候,task实现方式则是yarnscheduler。
同样的schedulerbackend也是一个trait,具体的实现也是根据spark.master来决定,如果是yarn-client模式,实现则是yarnclientschedulerbackend。
具体看一下代码实现:
SparkContext#createTaskScheduler
sparkcontext中调用createtaskscheduler,根据master来决定生成的实际类型,taskscheduler,schedulerbackend
val (sched , ts) = SparkContext.createTaskScheduler(this, master) // 这里的master是"spark.master"参数的值,String类型 _schedulerBackend = sched//生成 schedulerBackend _taskScheduler = ts//生成 taskScheduler _taskScheduler .start()
进入到createtaskscheduler方法中,具体实现根据事master的模型,有yarn-client,yarn-cluster,local等;
我们只看yarn-client模式(平常用的比较多的时候,yarn-client模式的时候,driver在客户端,那么输出的日志也会在本地可以查看,yarn-cluster模式下driver是在资源管理器下的,首先日志不太方便查看),可以看到内部实现是根据match case来实现匹配的。yarn-clent模式下,schedulerbackend实现org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend,taskscheduler 实现org.apache.spark.scheduler.cluster.YarnScheduler;
case "yarn-client" => val scheduler = try { val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnScheduler") val cons = clazz.getConstructor(classOf[SparkContext]) cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl] } catch { case e: Exception => { throw new SparkException("YARN mode not available ?", e) } } val backend = try { val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend") val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext]) cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend] } catch { case e: Exception => { throw new SparkException("YARN mode not available ?", e) } } scheduler.initialize(backend) (backend, scheduler)
在根据master获得了实际调度类型之后,并没有马上返回,
以上是关于深入理解spark-taskScheduler,schedulerBackend源码分析的主要内容,如果未能解决你的问题,请参考以下文章
SparkContext的初始化(叔篇)——TaskScheduler的启动
TaskScheduler内幕天机:Spark shell案例,TaskScheduler和SchedulerBackendFIFO与FAIRTask运行时本地性算法详解