Spark源码笔记

Posted 拱头

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark源码笔记相关的知识,希望对你有一定的参考价值。

以下内容源于spark1.4.1以上版本

一.Spark可执行文件笔记:
1.spark-shell:
1.1 bin/spark-class 启动了org.apache.spark.launcher.Main,这个Main函数用来启动Master,Worker以及SparkSubmit等,start-master.sh/start-slaves.sh/spark-submit都调用了spark-class。
1.2 bin/spark-shell启动了org.apache.spark.repl.Main
1.3 sbin/start-master.sh 启动了org.apache.spark.deploy.master.Master
1.4 sbin/start-slaves.sh 启动了start-slave.sh 再启动org.apache.spark.deploy.worker.Worker
1.5 bin/load-spark-env.sh 启动了 spark-env.sh
1.6 bin/spark-submit 启动了org.apache.spark.deploy.SparkSubmit

二.Spark框架记录:
1.rpc包:
1.1 AkkaRpcEnv:
1.1.1 这个类是RpcEnv 的主要实现,这个实现完成了很多通信的任务,代码中涉及RpcEnv,都基本用的是这个类
1.1.2 主构造器:private[spark] class AkkaRpcEnv private[akka] (val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int) extends RpcEnv(conf) with Logging
1.1.3 从主构造器中看出,存在 val actorSystem,这个变量用来构造其他actor,AkkaRpcEnv在SparkEnv中创建。
1.1.4 "override def setupEndpoint(name: String, endpoint: RpcEndpoint)"这个方法就是用来创建子actor的,SparkEnv中注册的监控都是调用这个方法
1.1.5 在使用setupEndpoint 方法创建子actor的时候还实现了特征ActorLogReceive,如下:
"lazy val actorRef = actorSystem.actorOf(Props(new Actor with ActorLogReceive with Logging....)"
这个特质中有这样一段代码:
"override def receive: Actor.Receive = new Actor.Receive
private val _receiveWithLogging = receiveWithLogging
......"
所以当调用actorRef!的时候,是先进入receive方法,再进入receiveWithLogging方法的(这里用到偏函数的知识)。
1.1.6 如果receiveWithLogging方法接收到AkkaMessage的话,再将方法传递给processMessage,在processMessage中有一行代码"val pf: PartialFunction[Any, Unit] =",这段代码代码将RpcEndpoint返回成一个偏函数,然后接着用代码"pf.applyOrElse[Any, Unit](message, message =>"执行,也就是调用RpcEndpoint的receiveAndReply或者receive方法执行RpcEndpointRef间传递的讯息
1.1.7

1.2 AkkaRpcEndpointRef:
1.2.1 这个类是RpcEndpointRef 的主要实现
1.2.2 主构造器:private[akka] class AkkaRpcEndpointRef(
@transient defaultAddress: RpcAddress,
@transient _actorRef: => ActorRef,
@transient conf: SparkConf,
@transient initInConstructor: Boolean = true)
extends RpcEndpointRef(conf) with Logging
1.2.3 主构造器包含变量 _actorRef ,这个变量是一个actor,AkkaRpcEndpointRef主要使用这个actor进行节点通讯。
1.2.4 另外是一个 RpcEndpoint 和 一个 RpcEndpointRef 是一一对应的关系。
1.2.5 如果receiveWithLogging方法接收到的信息是AkkaMessage,那么最后执行的是RpcEndpoint的receive或receiveAndReply方法。

1.3 RpcEndpoint 的实现类众多(HeartbeatReceiver/CoarseGrainedExecutorBackend等)
1.3.1 需要通信的RpcEndpoint都有一个与之对应的RpcEndpointRef,而需要通讯的RpcEndpoint需要通过RpcEnv注册自己,调用的方法就RpcEnv的setupEndpoint
1.3.2 RpcEndpoint和与之对应的RpcEndpointRef应该是在同一个节点上运行的,这样RpcEndpoint和RpcEndpointRef才能实现相互调用,其中通信部分交给RpcEndpointRef,具体的工作交给RpcEndpoint

2.scheduler 包
2.1 DAGScheduler类:
2.1.1 主构造器:private[spark]
class DAGScheduler(
private[scheduler] val sc: SparkContext,
private[scheduler] val taskScheduler: TaskScheduler,
listenerBus: LiveListenerBus,
mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster,
env: SparkEnv,
clock: Clock = new SystemClock())
extends Logging
2.1.2 private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging
2.1.3 类最后一行代码 eventProcessLoop.start(),其中val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)//这个类定义如上,eventProcessLoop内含一个Thread类(在EventLoop中),也就是说DAGScheduler在第一次构造的时候就会启动eventProcessLoop。eventProcessLoop中有一个val eventQueue,这个队列存放了所有来自DAGScheduler的事件,并且在eventProcessLoop自身的线程中不断的处理eventQueue中的事件。
2.1.4 虽然DAGScheduler的事件由线程EventLoop响应,但是EventLoop调用DAGSchedulerEventProcessLoop的onReceive方法,而onReceive方法又调用DAGScheduler的方法,所以DAGScheduler的事件最后还是由DAGScheduler
2.1.5 一个Job在DAGScheduler被切分成多个Stage,调用handleJobSubmitted->newResultStage->getParentStagesAndId->getParentStages方法,在getParentStages方法的作用主要就是切分一个Job。
2.1.6 newResultStage方法用来产生finalStage,newShuffleMapStage方法用来产生ShuffleMapStage,一个Job被提交的时候,首先会产生finalStage,然后再根据finalStage的父stage递归产生ShuffleMapStage,递归产生ShuffleMapStage的地方入口为getParentStages方法的parents += getShuffleMapStage(shufDep, jobId)这一行代码。
2.1.7 DAGSchedulerEventProcessLoop是1.4.1源码里使用的,对应1.1.0源码里的 DAGSchedulerEventProcessActor,前者使用的是线程处理队列里的事件,后者采用Actor实线,现在看起来两者之间没有什么区别。而且DAG中处理事件不需要通信,所以有没有使用actor都一样。
2.1.8 Stage提交,首先调用submitStage方法,从finalStage开始提交,通过val missing = getMissingParentStages(stage).sortBy(_.id) 这行代码检查父Stage是否可用,如果不可用,递归提交父Stage
2.1.9 taskSet提交,submitStage->submitMissingTasks->taskScheduler.submitTasks
2.1.10 DAGScheduler和DAGSchedulerEventProcessLoop都执行在Master中.
2.1.11 Tasks提交过程,handleJobSubmitted->submitStage(这一步是递归提交的,多个相互之间没有依赖关系的Stage可以同时运行,这个每个Stage会放进一个rootPool,roolPool根据设定的规则采用FIFO或者FAIR策略调度Stage)->submitMissingTasks->taskScheduler.submitTasks

2.2 Stage类:
2.2.1 主构造器:private[spark] abstract class Stage(
val id: Int,
val rdd: RDD[_],
val numTasks: Int,
val parents: List[Stage],
val jobId: Int,
val callSite: CallSite)
extends Logging
2.2.2 注意主构造器:Stage会记录下自身的父Stage和对应的jobId

2.3 TaskSchedulerImpl类:
TaskScheduler的实例类,TaskScheduler在start()的时候,会把backend一起启动.
2.3.1 主构造器:private[spark] class TaskSchedulerImpl(
val sc: SparkContext,
val maxTaskFailures: Int,
isLocal: Boolean = false)
extends TaskScheduler with Logging
2.3.2 def initialize(backend: SchedulerBackend) 初始化方法,在初始化的过程中创建调度池,这个调度池默认FIFO,手动可设置FAIR模式。使用FAIR模式,需要配置fairscheduler.xml,这个调度池用来管理Stage的执行。
2.3.3 TaskSchedulerImpl主要用来和DAGScheduler交互
2.3.4 override def submitTasks(taskSet: TaskSet) 这个方法供DAGScheduler调用,在DAGScheduler将Stage拆分成数据集之后,用这个方法提交数据集,submitTasks的最后调用backend.reviveOffers(),用来和底层资源调度系统进行交互
2.3.5 在 submitTasks方法中有代码val manager = createTaskSetManager(taskSet, maxTaskFailures) 可以看出,一个TaskSetManager对应一个TaskSet而且TaskSetManager不是在Executor里创建的,是在Driver中创建,在后续的代码中也没有看见TaskSetManager有传给Executor,所以所有的TaskSetManager其实都是运行在Driver上的。
2.3.6 var backend: SchedulerBackend,backend在TaskSchedulerImpl中负责和Executor进行通讯,backend有多个实现,主要是为了和底层资源调度系统进行交互(如Mesos/YARN和Local)。
2.3.7 def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]],这个方法供backend调用,用来在提交TaskSet之后给TaskSet调度资源
2.3.8 private def resourceOfferSingleTaskSet 这个方法就是给每个Task分配对应执行的Executor

2.3 TaskSetManage类:
2.3.1 主构造器:private[spark] class TaskSetManager(
sched: TaskSchedulerImpl,
val taskSet: TaskSet,
val maxTaskFailures: Int,
clock: Clock = new SystemClock())
extends Schedulable with Logging
2.3.2 所有TaskSetManage都在Master中执行,不是在Executor中执行。
2.3.3 一个TaskSet对应一个TaskSetManage,也可以说一个Stage对应一个TaskSetManage。
2.3.4 TaskManage决定每个Task在哪个Executor上执行,然后将这个信息告诉Backend类,由Backend类去启动相应的Excutor中执行Task。
2.3.5 注意了注意了,在执行一个Application的时候,是先启动Executor再启动Task。TaskSetManager在适配任务时,会根据资源的情况和任务的数据来源,尽量选择最佳的Locality进行匹配。通常我们讲到数据本地性时,
与之相关联的原始 数据在很多情况下都来自HDFS。值得注意的是,与直接基于Hadoop YARN进行任务调度的系统相比(如MapReduce on YARN),Spark的应用程序由于首先需要申请资源运行Executor,而后再调度任务,所以,
如果第一步在申请自愿运行Executor时没有任务数据本地性的信息,那么除非应用程序申请的计算资源足够多,从而保证了在每个节点上都有Executor,否则,在第二部调度任务时,是可能无法做到数据本地处理的
(因为数据所在节点上没有运行Executor)。这种情况在大集群环境,应用程序处理的数据集较小,不需要申请很多资源时,显然是很容易发生的。还好在YARN相关模式下,
是可以在为Spark应用创建资源申请时就额外传递数据文件的位置信息的,其他模式却暂时没有实现相关功能。

2.4 LocalBackend类:
2.4.1 在使用本地模式的启动Spark的时候,使用这个类作为SchedulerBackend和ExecutorBackend

2.5 CoarseGrainedSchedulerBackend类:
2.5.1 主构造器:class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: RpcEnv)
extends ExecutorAllocationClient with SchedulerBackend with Logging
2.5.2 YARN/Mesos/Standalone这三种模式运行在Application端的backend都继承了这个类。
2.5.3 override def reviveOffers() driverEndpoint.send(ReviveOffers) 这个方法是SchedulerBackend中的申请系统资源的方法,调用的driverEndpoint是DriverEndpoint的实例
2.5.4 class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) 这个是CoarseGrainedSchedulerBackend的内部类
2.5.4.1 CoarseGrainedSchedulerBackend调用driverEndpoint.send(ReviveOffers)的后,实际调用的是DriverEndpoint的receive->case ReviveOffers->makeOffers->launchTasks 方法,通过launchTasks方法将Task传递给CoarseGrainedExecutorBackend去具体执行。

2.6 CoarseMesosSchedulerBackend类:
2.6.1 以Mesos模式启动后在Application端启动的backend

2.7 SparkDeploySchedulerBackend类:
2.7.1 以Standalone模式启动后在Driver端启动的backend
2.7.2 在这个类的start()方法中,有如下代码:val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
可以看出,这个类在启动的过程中发送指令启动CoarseGrainedExecutorBackend,由于CoarseGrainedExecutorBackend是由指令启动,所以CoarseGrainedExecutorBackend的使用main函数启动 2.7.3 启动CoarseGrainedExecutorBackend的过程,SparkDeploySchedulerBackend类(command->appDesc->client.start())-> AppClient类(start->onStart->registerWithMaster->tryRegisterAllMasters->masterRef.send(RegisterApplication(appDescription, self)))-> Master类(case RegisterApplication -> registerApplication) 通过这个过程向Master注册和启动AppClient类->schedule()->startExecutorsOnWorkers()启动Executor 2.7.4 在standalone模式下,Executor注册过程: requestExecutors(或requestTotalExecutors)->doRequestTotalExecutors->c.requestTotalExecutors(requestedTotal)->endpoint.askWithRetry[Boolean](RequestExecutors(appId, requestedTotal))

2.8 YarnSchedulerBackend类:
YarnClientSchedulerBackend
YarnClusterSchedulerBackend
2.8.1 以YARN对应的两种模式启动后在Application端启动的backend

2.9 LiveListenerBus类:
2.9.1 主构造器:private[spark] class LiveListenerBus
extends AsynchronousListenerBus[SparkListener, SparkListenerEvent]("SparkListenerBus")
with SparkListenerBus
2.9.2 AsynchronousListenerBus中的start启动lisenerBus
2.9.3 AsynchronousListenerBus中的run看到是一个循环,所以启动后ListenerBus会一直运作,将listenerBus中的eventQueue逐个通过postToAll推送给listenerBus中的全部listen,所以listenerBus中的listen只是一个普通的处理类,listenerBus启动之后会源源不断的将所有event传给bus中的所有listener进行处理。

3 executor包:
3.1 Executor类:
3.1.1 主构造器:private[spark] class Executor(
executorId: String,
executorHostname: String,
env: SparkEnv,
userClassPath: Seq[URL] = Nil,
isLocal: Boolean = false)
extends Logging
3.1.2 def launchTask(context: ExecutorBackend,taskId: Long,attemptNumber: Int,taskName: String,serializedTask: ByteBuffer): Unit (CoarseGrainedExecutorBackend/MesosExecutorBackend/LocalEndpoint)调用,用来将Task放进Executor的线程池中启动。
3.1.3 Executor在启动一个Task后做以下任务:设置使用内存,告知Driver更改Task状态,执行Task

3.2 CoarseGrainedExecutorBackend类:
3.2.1 在使用standalone和YARN启动的时候,Worker上启动的Executor就是这个类的实例,在这个类在启动的时候,Driver会向Worker发送一条指令:org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url akka.tcp://sparkDriver@192.168.1.120:57205/user/CoarseGrainedScheduler --executor-id 0 --hostname 192.168.1.131 --cores 1 --app-id app-20151113110458-0276 --worker-url akka.tcp://sparkWorker@192.168.1.131:45200/user/Worker,就是用这条指令来启动CoarseGrainedExecutorBackend的main函数。

3.3 MesosExecutorBackend类:
这个类是Mesos模式的ExecutorBackend。

4 storage包
4.1 BlockManager类:
4.1.1 主构造器:private[spark] class BlockManager(
executorId: String,
rpcEnv: RpcEnv,
val master: BlockManagerMaster,
defaultSerializer: Serializer,
maxMemory: Long,
val conf: SparkConf,
mapOutputTracker: MapOutputTracker,
shuffleManager: ShuffleManager,
blockTransferService: BlockTransferService,//这个属性用来提供block在节点之间转移的服务
securityManager: SecurityManager,
numUsableCores: Int)
extends BlockDataManager with Logging
4.1.2 在每个节点都会运行一个BlockManager类用来管理Application中使用到的block,不管是driver还是executor
4.1.3 这个类在使用之前需要执行initialize
4.1.4 一个BlockManager包装了一个BlockManagerMaster,一个BlockManagerMaster包装了一个BlockManagerMasterEndpoint,用来确保主从节点之间的通讯。还包装了一个BlockManagerMasterEndpoint用来保证主从之间传递RDD间的控制信息。同时在master的BlockManagerMasterEndpoint中会有一个非空的HashMap[BlockManagerId, BlockManagerInfo],用来保存注册到master的Executor上的BlockManager的信息
4.1.5 BlockManager构造的时候有一段代码"val diskBlockManager = new DiskBlockManager(this, conf)",也就是说会创建一个磁盘管理,管理物理存储。

4.2 BlockManagerMaster类:
4.2.1 主构造器:class BlockManagerMaster(
var driverEndpoint: RpcEndpointRef,
conf: SparkConf,
isDriver: Boolean)
extends Logging
4.2.2 一个BlockManager中,会有一个BlockManagerMaster

4.3 BlockManagerMasterEndpoint类:
4.3.1 主构造器:class BlockManagerMasterEndpoint(
override val rpcEnv: RpcEnv,
val isLocal: Boolean,
conf: SparkConf,
listenerBus: LiveListenerBus)
extends ThreadSafeRpcEndpoint with Logging
4.3.2 BlockManagerMasterEndpoint负责BlockManagerMasterEndpoint间的通讯。主要通讯内容如下:
4.3.2.1 "case RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint)"Executor端创建BlockManager后向Driver发送请求进行注册。
4.3.2.2 "case UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize)"更新数据块的元信息。
4.3.2.3 "case GetPeers(blockManagerId)"请求获得其他BlockManager的ID。
4.3.2.4 "case GetLocations(blockId)"获取数据块所在BlockManager的ID。
4.3.2.5 "case RemoveExecutor(execId)"删除已经死亡的Executor上的BlockManager。
4.3.3 driver的BlockManagerMasterEndpoint中会有一个非空的HashMap[BlockManagerId, BlockManagerInfo],用来保存注册到driver的Executor上的BlockManager的信息

4.4 BlockManagerInfo类:
4.4.1 保存了BlockManager的主要信息

4.5 BlockManagerSlaveEndpoint类:
4.5.1 主构造器:class BlockManagerSlaveEndpoint(
override val rpcEnv: RpcEnv,
blockManager: BlockManager,
mapOutputTracker: MapOutputTracker)
extends RpcEndpoint with Logging
4.5.2 主要用来接收从master端传递到executor端对block或者rdd的控制信息

4.6 DiskBlockManager类:
4.6.1 此类管理物理存储

5.util包:
5.1 MetadataCleaner类
5.1.1 这是一个定时类,用来清理元数据。
5.1.2 主构造器:private[spark] class MetadataCleaner(
cleanerType: MetadataCleanerType.MetadataCleanerType,
cleanupFunc: (Long) => Unit,
conf: SparkConf)
extends Logging
5.1.3 从主构造器中看出,这个类可以自定义清理的方法直接启动。清理间隔可以根据"spark.cleaner.ttl"配置

6.deploy包
6.1 AppClient类
6.1.1 这个类负责Applications与集群之间的通讯,在Executor启动期间起了比较大的作用.
6.1.2 这个类代表了Spark中的一个Application,在backend启动的时候一起启动.

6.2 Master类:
6.2.1 Master启动Executor的过程:
AppClient类开始 tryRegisterAllMasters->masterRef.send(RegisterApplication(appDescription, self))进入到Master类
Master类: case RegisterApplication->schedule()//在这里给正在等待的Driver分配可以运行的Worker,有的模式,例如Yarn-Client是将Driver运行在Worker上的,所以需要这一步->
startExecutorsOnWorkers()//这里过滤调内存不足,没有可用core的Worker选出可用的Worker->allocateWorkerResourceToExecutors()//给每个Executor分配core然后启动Executor->launchExecutor(worker, exec)
6.2.2 scheduleExecutorsOnWorkers 计算可用的可用的Worker需要分配多少个cores来启动Executor,只有分配了cores的数目大于0的Worker才会启动Executor.返回的是每个Worker分配cores的数目
在这个方法里会用到spark.deploy.spreadOut 这个参数,设置这个参数为false,Spark就会使用尽可能少的Worker启动Executor否则会使用尽可能多的Worker启动Executor

*通讯类
*.1 BlockManagerMasterEndpoint 在存储管理中,负责主从节点之间的信息传递。
*.2 BlockManagerSlaveEndpoint 在存储中负责接收从master端传递到executor端对block或者rdd的控制信息


三.整体模块
1.job调度:
1.1 调度阶段的拆分:private def getParentStages(rdd: RDD[_], jobId: Int) 根据RDD生成有依赖关系的stages,根据层次遍历,遍历RDD的依赖关系
1.2 调度阶段的提交:smt
1.3 任务集的提交:调用TaskScheduler的submitTasks
1.4 作业监控:DAGScheduler暴露一系列回调函数给TaskScheduler调用用来监控TaskSet的执行情况。包括:executorLost,executorHeartbeatReceived,executorAdded
1.5 任务结果获取:TaskResultGetter类的enqueueSuccessfulTask返回TaskSet的结果。

2.local模式下RPC/AKKA的注册顺序,使用AkkaRpcEnv注册的都带有RpcEndpointRef(内部包含子Actor),注册就是创建子Actor,是可以通讯的:
2.1 MapOutputTrackerMaster
2.2 BlockManagerMaster
2.3 OutputCommitCoordinator
2.4 HeartbeatReceiver
2.5 ExecutorEndpoint//接收来之Driver的信息
2.6 LocalBackendEndpoint

任务分配:



存储系统:




四.类总结:

SparkSubmit//这个类是Spark Application的入口,application首先调用这个类

SparkConf(配置文件spark-defaults.conf和SparkConf可以配置的属性是一样的):
1.SparkConf创建后,如果Application中使用了Set设置参数,则使用Application的,否则使用spark-shell或spark-submit设置的参数,最后才使用spark-defaults.conf中的参数
2.在使用SparkConf创建SparkContext的时候,SparkContext一旦创建,SparkContext使用的就是SparkConf的副本,再改变SparkConf的属性也不会影响到SparkContext。所以SparkContext不支持在运行中改变属性。


SparkContext:
1DAGScheduler和TaskScheduler都是在SparkContext中创建的。
2.SparkDeploySchedulerBackend extends CoarseGrainedSchedulerBackend ,SparkContext就是通过SparkDeploySchedulerBackend启动CoarseGrainedSchedulerBackend。
3._jobProgressListener = new JobProgressListener(_conf) //job监控,应该仅仅是用来监控,就算job有问题也不会回复,在UI中使用
4._env = createSparkEnv(_conf, isLocal, listenerBus) //_env 在创建的过程中包含了akka,初始化运行环境
5._dagScheduler = new DAGScheduler(this) //创建DAGScheduler,DAGScheduler在创建的时候启动
6._taskScheduler.start() //启动taskScheduler,并且在启动taskScheduler的时候将backend一起启动
7.metricsSystem.start() //启动监控系统
8 createTaskScheduler方法,会创建DAGScheduler和TaskSchedulerImpl的实例,并且在创建过程中调用scheduler.initialize(backend),这就导致backend在SparkEnv创建DAGScheduler和TaskSchedulerImp的时候启动了。
9 setupAndStartListenerBus()启动ListenerBus

HttpFileServer类:
1.此类用来提供节点间的网络服务

SparkEnv:
1.这个类在SparkContext中被创建。
2.这个类在在执行过程中创建了ActorSystem for Akka,245行,在这里将actor的akka重新包装了一下,包装成AkkaRpcEnv
3."spark.shuffle.blockTransferService"默认配置"netty",325行
4."val shuffleMemoryManager = new ShuffleMemoryManager(conf)" //每个Executor节点会生一个shuffleMemoryManager用来管理自身的内存,供task使用,使用完释放
5.val blockManagerMaster = new BlockManagerMaster //创建blockManagerMaster
6."val broadcastManager = new BroadcastManager" 创建broadcast管理
7."val cacheManager = new CacheManager" 创建cache管理
8.注册actor顺序:MapOutputTracker/BlockManagerMaster/OutputCommitCoordinator/HeartbeatReceiver

TaskSchedulerImpl:
说明:
1.在start的时候,客户端会注册Master。
def说明:
1.提交任务集:override def submitTasks(taskSet: TaskSet),在这个过程中会创建TaskSetManager
2.def initialize(backend: SchedulerBackend) 初始化方法,在初始化的过程中创建调度池,这个调度池默认FIFO,手动可设置FAIR模式。使用FAIR模式,需要配置fairscheduler.xml

TaskSetManage:
def说明:
1.resourceOffer(execId: String,host: String,maxLocality: TaskLocality.TaskLocality) : Option[TaskDescription] 根据TaskScheduler所提供的单个Resource资源以及任务的host/executor/locality本地性的要求返回一个合适的任务。
2.def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit //更新任务的状态为成功,并将任务移除出正在运行的列表,并通过回调函数通知DAGScheduler.
3.def handleFailedTask(tid: Long, state: TaskState, reason: TaskEndReason) //更新任务状态为失败
4.def handleTaskGettingResult(tid: Long): Unit //更新任务状态为getting result

TaskResultGetter //这个类用来处理TaskSet的返回结果
//def说明:
enqueueSuccessfulTask //返回TaskSet的结果

存储管理:

//存储层框架:
abstract class BlockStore(val blockManager: BlockManager)//存储框架的基类

class MemoryStore(blockManager: BlockManager, maxMemory: Long)//内存存储管理(包含在BlockManager中)

class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManager)//磁盘存储管理(包含在BlockManager中)

通讯类整理:

比较迷惑不知用途的类:
OutputCommitCoordinator

问题:
1.考虑线程的通信问题,重点了解一下netty(netty和nio一样,都是实现了tcp/ip协议)。
2.如果要修改代码,还要考虑block的分布问题。
3.

以上是关于Spark源码笔记的主要内容,如果未能解决你的问题,请参考以下文章

《Apache Spark源码剖析》学习笔记之Spark作业提交

Spark发行版笔记13:Spark Streaming源码解读之Driver容错安全性

Spark发行版笔记9:Spark Streaming源码解读之Receiver生成全生命周期彻底研究和思考

Spark笔记整理:spark单机安装部署分布式集群与HA安装部署+spark源码编译

Spark发行版笔记10:Spark Streaming源码解读之流数据不断接收和全生命周期彻底研究和思考

Spark1.4源码走读笔记之模式匹配