3--Master注册机制源码分析和状态改变机制源码分析

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了3--Master注册机制源码分析和状态改变机制源码分析相关的知识,希望对你有一定的参考价值。

这部分直接看源码会比较直观!!!

[注]本篇是对第二篇中间的Master状态改变以及注册机制进行剖析
 
master注册机制原理图如下 , 其实就是将Application信息 , Driver信息和所有的Worker信息加入缓存队列中.
技术分享图片
 
1. Application的注册
case RegisterApplication(description, driver) => {
// 如果当前master为standByMaster , 不是ActiveMaster , 那么Application来注册则什么都不会做
if (state == RecoveryState.STANDBY) {
// ignore, don‘t send response
} else {
logInfo("Registering app " + description.name)
// 通过接收到的application desc信息创建Application对象
val app = createApplication(description, driver)
// 注册Application对象
registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id)
// 持久化Application信息
persistenceEngine.addApplication(app)
// 向master发送注册Application的信息 ,
// 也就是反向向SparkDeploySchedulerBackend的AppClient的ClientActor发送已经注册的RegisteredApplication消息
driver.send(RegisteredApplication(app.id, self))
// 开始资源调度
schedule()
}
}
 
1.1 createApplication()方法过接收到的application desc信息创建Application对象
private def createApplication(desc: ApplicationDescription, driver: RpcEndpointRef):
ApplicationInfo = {
val now = System.currentTimeMillis()
val date = new Date(now)
val appId = newApplicationId(date)
new ApplicationInfo(now, appId, desc, date, driver, defaultCores)
}
 
1.2 registerApplication()注册Application对象
/**
* 将读取到的持久化的app信息.重新注册到standby master的内存缓存结构中
* 处理Application注册请求信息
* @param app 参数是ApplicationInfo-->ApplicationDescription
* 其中的private val addressToApp = new HashMap[RpcAddress, ApplicationInfo]
*/
private def registerApplication(app: ApplicationInfo): Unit = {
val appAddress = app.driver.address
// 如果新的Master中已经包含了这个要注册的Application的话,尝试重启即可
if (addressToApp.contains(appAddress)) {
logInfo("Attempted to re-register application at same address: " + appAddress)
return
}
 
// 通过接收到的application desc信息创建Application对象
// 这个app的appSource()方法是对appDesc的基本信息进行相关的计算
// 这里就不向下跟踪了
applicationMetricsSystem.registerSource(app.appSource)
// 加入到内存缓存中去--->HastSet(去重)
// val apps = new HashSet[ApplicationInfo]
apps += app
// val idToApp = new HashMap[String, ApplicationInfo]
idToApp(app.id) = app
endpointToApp(app.driver) = app
addressToApp(appAddress) = app
// 将app加入到等待调度队列中 -->waitingApps其实就是一个ArrayBuffer
waitingApps += app
}
 
2.Worker的注册原理和Application相似
case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) => {
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
// 检查Master的状态
if (state == RecoveryState.STANDBY) {
context.reply(MasterInStandby)
} else if (idToWorker.contains(id)) {
context.reply(RegisterWorkerFailed("Duplicate worker ID"))
} else {
// 创建WorkerInfo信息对象 , 封装相关的worker信息
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
workerRef, workerWebUiUrl)
// 注册worker , 调用registerWorker()方法进行注册 ,
// 若注册成功进行worker信息持久化并向master发送注册的消息
if (registerWorker(worker)) {
// worker信息持久化
persistenceEngine.addWorker(worker)
// 向master发送注册消息
context.reply(RegisteredWorker(self, masterWebUiUrl))
// 开始调度
schedule()
} else {
// 注册失败的话向master发送注册失败的消息
val workerAddress = worker.endpoint.address
logWarning("Worker registration failed. Attempted to re-register worker at same " +
"address: " + workerAddress)
context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: "
+ workerAddress))
}
}
}
 
2.1 registerWorker()注册worker的相关信息
/**
* 注册worker的相关信息
* @param worker
* @return
*/
private def registerWorker(worker: WorkerInfo): Boolean = {
// There may be one or more refs to dead workers on this same node (w/ different ID‘s),
// remove them.
// 这里过滤掉已经死掉的worker , 将他们从缓存队列中移除
workers.filter { w =>
(w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)
}.foreach { w =>
workers -= w
}
 
// 获取worker的url地址
val workerAddress = worker.endpoint.address
// 检查worker的地址缓存队列中是否已经有了该worker的地址信息
if (addressToWorker.contains(workerAddress)) {
// 从worker的地址缓存队列(HashMap)中获取已经存在的worker的地址信息 , 称之为oldworker
val oldWorker = addressToWorker(workerAddress)
// 若是oldworker为UNKNOW状态的话,需要将其从缓存队列中移除
if (oldWorker.state == WorkerState.UNKNOWN) {
// A worker registering from UNKNOWN implies that the worker was restarted during recovery.
// The old worker must thus be dead, so we will remove it and accept the new worker.
// 这个方法,上一篇已将说了
removeWorker(oldWorker)
} else {
logInfo("Attempted to re-register worker at same address: " + workerAddress)
return false
}
}
 
// 将新增加的worker加入缓存队列HashSet中
workers += worker
// worker的id信息加入id缓存队列
idToWorker(worker.id) = worker
// 将worker的地址加入address缓存队列中
addressToWorker(workerAddress) = worker
true
}
 
2.2 removeWorker
private def removeWorker(worker: WorkerInfo) {
logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
// 首先修改worker的状态为DADE
worker.setState(WorkerState.DEAD)
// 1、从内存缓存结构中移除
idToWorker -= worker.id
addressToWorker -= worker.endpoint.address
// 2、从相关的组件的内存中移除-->executor和driver
// 这个地方也都涉及到了supersiver机制问题
for (exec <- worker.executors.values) {
logInfo("Telling app of lost executor: " + exec.id)
exec.application.driver.send(ExecutorUpdated(
exec.id, ExecutorState.LOST, Some("worker lost"), None))
exec.application.removeExecutor(exec)
}
for (driver <- worker.drivers.values) {
if (driver.desc.supervise) {
logInfo(s"Re-launching ${driver.id}")
relaunchDriver(driver)
} else {
logInfo(s"Not re-launching ${driver.id} because it was not supervised")
removeDriver(driver.id, DriverState.ERROR, None)
}
}
 
3. Dirver的注册源码也相似
/**
* 注册Driver
*/
case RequestSubmitDriver(description) => {
// 检查master状态
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
"Can only accept driver submissions in ALIVE state."
context.reply(SubmitDriverResponse(self, false, None, msg))
} else {
logInfo("Driver submitted " + description.command.mainClass)
// 根据DriverDescription创建Driver
val driver = createDriver(description)
// 持久化Driver信息
persistenceEngine.addDriver(driver)
// 将Driver加入等待调度的缓存队列中
waitingDrivers += driver
// 将Driver加入缓存队列
drivers.add(driver)
// 开始调度
schedule()
 
// TODO: It might be good to instead have the submission client poll the master to determine
// the current status of the driver. For now it‘s simply "fire and forget".
context.reply(SubmitDriverResponse(self, true, Some(driver.id),
s"Driver successfully submitted as ${driver.id}"))
}
}
 
3.1 参数DriverDescription
private[deploy] case class DriverDescription(
jarUrl: String, // jar包的名称
mem: Int, // Dirver所需要的内存
cores: Int, // Driver所需要的cpu core数量
supervise: Boolean, // supervise机制
command: Command) { // 相关命令
 
override def toString: String = s"DriverDescription (${command.mainClass})"
}
 
3.2 Driver状态改变
/**
* Driver的状态改变
*/
case DriverStateChanged(driverId, state, exception) => {
state match {
// 如果Driver的状态为ERROR,FINISHED,KILLED,FAILED 那么都会将Driver移除
case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
removeDriver(driverId, state, exception)
case _ =>
throw new Exception(s"Received unexpected state update for driver $driverId: $state")
}
}
 
3.2 Driver移除
/**
* 移除Driver
*/
private def removeDriver(
driverId: String,
finalState: DriverState,
exception: Option[Exception]) {
// 用scala的高阶函数find() 来匹配当前的driverId
drivers.find(d => d.id == driverId) match {
// 使用样例类some(),来判断是否有值
case Some(driver) =>
logInfo(s"Removing driver: $driverId")
// 如果有值,将Driver从内存缓存<HashSet>中移除
drivers -= driver
if (completedDrivers.size >= RETAINED_DRIVERS) {
val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
completedDrivers.trimStart(toRemove)
}
// Driver是一种资源,向已经完成的completedDrivers中加入Driver
completedDrivers += driver
// 持久化缓存信息
persistenceEngine.removeDriver(driver)
// 设置Driver相应的state、exception
driver.state = finalState
driver.exception = exception
// 将Driver从所在的Worker中移除
driver.worker.foreach(w => w.removeDriver(driver))
// 其实在我看就相当于清空刷新,等待调度
schedule()
case None =>
logWarning(s"Asked to remove unknown driver: $driverId")
}
}
}
 
4 Executor的状态的改变
/**
* Executor状态改变所需要的操作
*/
case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {
// 找到executor对应app,然后反过来通过app对应的executor缓存获取executor信息
val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
execOption match {
// 成功找到
case Some(exec) => {
// 设置executor的当前状态
val appInfo = idToApp(appId)
val oldState = exec.state
exec.state = state
 
if (state == ExecutorState.RUNNING) {
assert(oldState == ExecutorState.LAUNCHING,
s"executor $execId state transfer from $oldState to RUNNING is illegal")
appInfo.resetRetryCount()
}
 
// 向driver同步发送executorUpdate的信息
exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus))
 
// 判断 , 如果executor已经完成了
if (ExecutorState.isFinished(state)) {
// Remove this executor from the worker and app
logInfo(s"Removing executor ${exec.fullId} because it is $state")
// If an application has already finished, preserve its
// state to display its information properly on the UI
// 从Application中移除掉executor
if (!appInfo.isFinished) {
appInfo.removeExecutor(exec)
}
// 从worker中移除executor
exec.worker.removeExecutor(exec)
 
// 如果executor的状态退出异常
val normalExit = exitStatus == Some(0)
// Only retry certain number of times so we don‘t go into an infinite loop.
if (!normalExit) {
// 判断Application当前的重试次数是否达到了最大值 , 最大值默认为10
if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) {
// 没有达到最大值 则继续调度
schedule()
} else {
// 没有达到最大值那就认为executor调度失败 ,
// 并同时认为Application也是失败了 , 将Application也从缓存队列移除掉
val execs = appInfo.executors.values
if (!execs.exists(_.state == ExecutorState.RUNNING)) {
logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
s"${appInfo.retryCount} times; removing it")
// 移除掉executor所在的Application
removeApplication(appInfo, ApplicationState.FAILED)
}
}
}
}
}
case None =>
logWarning(s"Got status update for unknown executor $appId/$execId")
}
}

以上是关于3--Master注册机制源码分析和状态改变机制源码分析的主要内容,如果未能解决你的问题,请参考以下文章

小记--------spark的Master的Application注册机制源码分析及Master的注册机制原理分析

Master的注册机制和状态改变管理解密

Zookeeper-watcher机制源码分析

01.dubbo源码解析--注册中心(缓存机制)

SpringCloud技术专题「Eureka源码分析」从源码层面让你认识Eureka工作流程和运作机制(上)

SpringCloud技术专题「Eureka源码分析」从源码层面让你认识Eureka工作流程和运作机制(下)