2--Master主备切换机制原理剖析与源码分析

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了2--Master主备切换机制原理剖析与源码分析相关的知识,希望对你有一定的参考价值。

原理剖析
Master实际上可以配置两个(防止单点故障),那么Spark原生的standalone模式是支持Master主备切换的。也就是说,当Active Master节点挂掉时,可以将StandBy master节点切换为Active Master。
Spark Master主备切换可以基于两种机制,一种是基于文件系统的,一种是基于Zookeeper的。基于文件系统的主备切换机制,需要在Active Master挂掉之后,由我们手动切换到StandBy Master上;而基于Zookeeper的主备切换机制,可以自动实现切换Master。
所以这里说的主备切换机制,实际上指的是在Active Master挂掉之后,切换到StandBy Master时,Master会执行的操作。
技术分享图片
流程说明:
Standby Master模式
1. 使用持久化引擎读取持久化的storeApps、storeDrivers、storeWorkers,持久化引擎有FileSystemPersistenceEngine和ZookeeperPersistenceEngine
 
2. 判读如果storedApps、storedDrivers、store的Workers有任何一个非空就继续向后执行.
 
3. 持久化引擎的Application、Driver、Worker的信息重新注册到Master内部的内存缓存结构中
 
4. 将Application和Worker的状态都修改为UNKONWN,然后向Application所对应的Driver及Worker发送Standby Master的地址
 
5. Driver和Worker在接收到Master发送的地址后,返回响应消息给新的Master
 
6. Master在陆续收到Driver和Worker发送来的响应消息后,会使用completeRecovery()方法对没有发生响应消息的Driver和Worker进行处理,过滤掉它们的信息。
 
7. 最后调用Master的schedule()方法,对正在等待资源调度的Driver和Application进行调度,比如在某个Worker上启动Driver或者为Application在Worker上启动它的Executor.
技术分享图片
 技术分享图片
源码剖析:
 
1 创建持久化引擎
入口文件包名:org.apache.spark.deploy.master
创建持久化引擎在onStart() [在1.6之前是在preStart()] 方法中,通过spark.deploy.recoveryMode配置参数确定持久化引擎的类别,缺省值为none.
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
// zookeeper类型的持久化引擎
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
val zkFactory =
new ZooKeeperRecoveryModeFactory(conf, serializer)
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
// 文件系统类型的持久化引擎
case "FILESYSTEM" =>
val fsFactory =
new FileSystemRecoveryModeFactory(conf, serializer)
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
// 自定义类型的持久化引擎
case "CUSTOM" =>
val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
.newInstance(conf, serializer)
.asInstanceOf[StandaloneRecoveryModeFactory]
(factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
case _ =>
(new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
}
persistenceEngine = persistenceEngine_
leaderElectionAgent = leaderElectionAgent_
 
二 对持久化数据的处理
override def receive: PartialFunction[Any, Unit] = {
case ElectedLeader => {
val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)
// 读取持久化的apps Drivers workers ,并且判断是否都不为空
state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
RecoveryState.ALIVE
} else {
RecoveryState.RECOVERING
}
logInfo("I have been elected leader! New state: " + state)
if (state == RecoveryState.RECOVERING) {
// 注册持久化信息,向Driver和Worker发送Master的信息
beginRecovery(storedApps, storedDrivers, storedWorkers)
// 对没有发送相应信息的Driver和Worker进行处理
recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CompleteRecovery)
}
}, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
}
}
 
三 持久化引擎的Application、Driver、Worker的信息重新注册到Master内部的
 具体的注册细节下一篇具体说下Master状态改变以及注册的问题
 
/**
* 持久化引擎的Application、Driver、Worker的信息重新注册到Master内部的内存缓存结构中
* @param storedApps
* @param storedDrivers
* @param storedWorkers
*/
private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],
storedWorkers: Seq[WorkerInfo]) {
for (app <- storedApps) {
logInfo("Trying to recover app: " + app.id)
try {
registerApplication(app)
// 将Application的状态修改为UNKNOWN
app.state = ApplicationState.UNKNOWN
// 向application对应的Driver发送新的Master的地址-->Standby Master
app.driver.send(MasterChanged(self, masterWebUiUrl))
} catch {
case e: Exception => logInfo("App " + app.id + " had exception on reconnect")
}
}
 
for (driver <- storedDrivers) {
 
// 读取drivers列表。任何与现在失去workers有关的driver
// 当我们发现worker丢失时,将重新启动driver。
drivers += driver
}
 
for (worker <- storedWorkers) {
logInfo("Trying to recover worker: " + worker.id)
try {
registerWorker(worker)
// 将Worker的状态修改为UNKNOWN
worker.state = WorkerState.UNKNOWN
// 向worker发送新的Master的地址-->Standby Master
worker.endpoint.send(MasterChanged(self, masterWebUiUrl))
} catch {
case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect")
}
}
}
 
四 Master在陆续收到Driver和Worker发送来的响应消息后,会使用completeRecovery()方法对没有发生响应消息的Driver和Worker进行处理,过滤掉它们的信息。
功能:
将Application和Worker过滤出来目前的状态,如果是UNKNOWN的进行遍历,分别调用removeWorker和finishApplication方法,对可能已经故障或者已经死掉的Application和Worker进行清理。
清理过程:1、从内存缓存结构中移除。2、从相关的组件的内存中移除。3、从持久化存储中移除。
private def completeRecovery() {
// Ensure "only-once" recovery semantics using a short synchronization period.
// 如果状态为RECOVERING,啥也不做,直接将状态修改为COMPLETING_RECOVERY
if (state != RecoveryState.RECOVERING) { return }
state = RecoveryState.COMPLETING_RECOVERY
 
// Kill off any workers and apps that didn‘t respond to us.
// worker的过滤-->removeWorker
workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
// application的过滤-->finishApplication
apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)
 
// Reschedule drivers which were not claimed by any workers
// driver的处理(也就是说由于Woker的重新注册,diver失去了之前依赖的worker)
// 这个地方要看是否开启了supervise机制
drivers.filter(_.worker.isEmpty).foreach { d =>
logWarning(s"Driver ${d.id} was not found after master recovery")
if (d.desc.supervise) {
// 开启的话,重新启动
logWarning(s"Re-launching ${d.id}")
relaunchDriver(d)
} else {
// 没有的话,直接删除这个driver
// 但是driver是集群的一种资源(线程),一般都是开启的
removeDriver(d.id, DriverState.ERROR, None)
logWarning(s"Did not re-launch ${d.id} because it was not supervised")
}
}
 
state = RecoveryState.ALIVE
// 调用Master的schedule()方法,对正在等待资源调度的Driver和Application进行调度
schedule()
logInfo("Recovery complete - resuming operations!")
}
 
4.1 removeWorker
private def removeWorker(worker: WorkerInfo) {
logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
// 首先修改worker的状态为DADE
worker.setState(WorkerState.DEAD)
// 1、从内存缓存结构中移除
idToWorker -= worker.id
addressToWorker -= worker.endpoint.address
// 2、从相关的组件的内存中移除-->executor和driver
// 这个地方也都涉及到了supersiver机制问题
for (exec <- worker.executors.values) {
logInfo("Telling app of lost executor: " + exec.id)
exec.application.driver.send(ExecutorUpdated(
exec.id, ExecutorState.LOST, Some("worker lost"), None))
exec.application.removeExecutor(exec)
}
for (driver <- worker.drivers.values) {
if (driver.desc.supervise) {
logInfo(s"Re-launching ${driver.id}")
relaunchDriver(driver)
} else {
logInfo(s"Not re-launching ${driver.id} because it was not supervised")
removeDriver(driver.id, DriverState.ERROR, None)
}
}
// 3、从持久化存储中移除。
persistenceEngine.removeWorker(worker)
}
4.2 finishApplication
private def finishApplication(app: ApplicationInfo) {
removeApplication(app, ApplicationState.FINISHED)
}
 
// 移除Application (两种情况下)
// 1,正常的application删除
// 2,executor的移除过程中异常<一直重试>--->我们会认为executor所对应的application也一直重试)
def removeApplication(app: ApplicationInfo, state: ApplicationState.Value) {
if (apps.contains(app)) {
logInfo("Removing app " + app.id)
// 移除Application相关的一切信息,然后会重新分配信息
apps -= app
idToApp -= app.id
endpointToApp -= app.driver
addressToApp -= app.driver.address
if (completedApps.size >= RETAINED_APPLICATIONS) {
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
completedApps.take(toRemove).foreach( a => {
Option(appIdToUI.remove(a.id)).foreach { ui => webUi.detachSparkUI(ui) }
applicationMetricsSystem.removeSource(a.appSource)
})
// trimStart()方法是从相应的缓存队列中移除参数列表中的元素
// <释放资源,但是这是个空壳,依然可以携带新的信息,waitingApps-->等待scheduler调度,对外提供服务>
completedApps.trimStart(toRemove)
}
completedApps += app // Remember it in our history
waitingApps -= app
 
// If application events are logged, use them to rebuild the UI
asyncRebuildSparkUI(app)
 
for (exec <- app.executors.values) {
killExecutor(exec)
}
app.markFinished(state)
if (state != ApplicationState.FINISHED) {
app.driver.send(ApplicationRemoved(state.toString))
}
persistenceEngine.removeApplication(app)
schedule()
 
// Tell all workers that the application has finished, so they can clean up any app state.
workers.foreach { w =>
w.endpoint.send(ApplicationFinished(app.id))
}
}
}
五 后续的就是scheduler()方法,对正在等待资源的Driver和Application进行资源分配,然后运行.这个地方就不多做解释,后面的资源调度再说.

以上是关于2--Master主备切换机制原理剖析与源码分析的主要内容,如果未能解决你的问题,请参考以下文章

Spark实战_SparkContext原理剖析与源码分析

MyBatis核心源码深度剖析工作机制和实现原理

Redis Sentinel 源码分析 - Sentinel 选举机制和主备倒换

深入浅出Spring原理及实战「原理分析专题」不看源码就带你剖析AOP容器核心流程以及运作原理

32_redis cluster的核心原理分析:gossip通信jedis smart定位主备切换

剖析源码:浅谈react-infinite的机制与原理