Master HA彻底解密(DT大数据梦工厂)
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Master HA彻底解密(DT大数据梦工厂)相关的知识,希望对你有一定的参考价值。
内容:
1、Master HA解析;
2、Master HA的四种方式;
3、Master HA的内部工作机制;
4、Master HA的源码解密;
本讲主要源码角度分析Master HA,因为在生产环境必然要做的
==========Master HA解析============
Spark是Master-Slave的结构
现在业界是1个Master Active,2个以上standby
如果有HA的话,切换active的时候,会在上次运行的基础上继续运行
Drvier提交程序、申请资源,是跟Master交互
ZOOKEEPER工作采用的是leader的机制,它对外提供服务,其它都是follower
ZOOKEEPER保留了集群的信息:Worker、Driver、Application的信息,会被持久化到Zookeeper,切换的时候只会影响新Job的提交。
因为每个Job运行之前,只要跟集群申请到资源之后,和Master没关系了,之后就是Driver和executor的交互,在27讲说过。所以只会影响新Job提交,而不会影响现有Job的运行。
=>总结:
1、生产环境一般采用zookeeper做HA,且建议为3台Master,Zookeeper会自动化管理Masters的切换;
2、采用Zookeeper做HA的时候,Zookeeper会保存整个Spark集群运行时候的元数据:所有的Workers、Drivers、Applications、Executors等;
3、Zookeeper在遇到当前Active级别的Master出现故障的时候,会从standby Master中选取出一台做为Active Master,但是要注意:被选举后到成为真正的Active Master之间需要从Zookeeper获取集群当前运行状态的元数据信息并进行恢复;
4、在Master切换的过程中,所有的已经在运行的程序皆正常运行!因为Spark Application在运行前就已经通过Cluster Manager获得了计算资源,所以在运行时Job本身的调度和处理和Master是没有任何关系的;
5、在Master切换过程中唯一的影响是,是不能提交新的Job:一方面不能提交新的应用程序给集群,因为只有Active Master才能接收新的程序的提交请求,另外一方面,已经运行的程序中也不能够因为Action操作触发新的Job的提交请求;
经验之谈:yarn的模式比standalone模式性能低30%左右
==========Master HA的四种方式============
1、MasterHA四大方式分别是:ZOOKEEPER、FILESYSTEM、CUSTOM(自定义)、NONE(不做HA,下载Spark直接使用);
2、需要说明的是:
1)ZOOKEEPER是自动管理Master;
2)FILESYSTEM的方式在Master出现故障后需要手动重新启动机器,机器启动后会立即成为Active级别的Master来对外提供服务(接受应用程序提交的请求,接受新的Job运行的请求);
3)CUSTOM的方式允许用户自定义MasterHA的实现,这对于高级用户特别有用;
4)NONE,这是默认情况,当我们下载安装了Spark集群后,就是采用这种方式,该方式不会持久化集群的数据,Master启动后立即管理集群;
Master中onStart
val serializer = new JavaSerializer(conf)
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
val zkFactory =
new ZooKeeperRecoveryModeFactory(conf, serializer)
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))//数据持久化引擎和leader选举
case "FILESYSTEM" =>
val fsFactory =
new FileSystemRecoveryModeFactory(conf, serializer)
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
case "CUSTOM" =>
val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
.newInstance(conf, serializer)
.asInstanceOf[StandaloneRecoveryModeFactory]
(factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
case _ =>
(new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
}
persistenceEngine = persistenceEngine_
leaderElectionAgent = leaderElectionAgent_
4、persistEngine中有一个至关重要的方法persist来实现数据持久化,readPersistedData来回复集群中的元数据;
/**
* Returns the persisted data sorted by their respective ids (which implies that they‘re
* sorted by time of creation).
*/
final def readPersistedData(
rpcEnv: RpcEnv): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = {
rpcEnv.deserialize { () =>
(read[ApplicationInfo]("app_"), read[DriverInfo]("driver_"), read[WorkerInfo]("worker_"))
}
}
5、FILESYSTEM和NONE均是采用MonarchyLeaderAgent的方式来完成leader的选举,其实际实现是直接将传入的Master作为leader
def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent = {
new MonarchyLeaderAgent(master)
}
/** Single-node implementation of LeaderElectionAgent -- we‘re initially and always the leader. */
private[spark] class MonarchyLeaderAgent(val masterInstance: LeaderElectable)
extends LeaderElectionAgent {
masterInstance.electedLeader()
}
6、NONE根本不需要持久化,为什么写了BlockHolePersistenceEngine,里面啥都没实现?代码结构统一,且易扩展;
private[master] class BlackHolePersistenceEngine extends PersistenceEngine {
override def persist(name: String, obj: Object): Unit = {}
override def unpersist(name: String): Unit = {}
override def read[T: ClassTag](name: String): Seq[T] = Nil
}
==========Master HA的内部工作机制(主要Zookeeper)============
1、Zookeeper自动从Standby Master里面选取出作为Leader的Master;
2、使用ZookeeprPersistEngine去读取集群的状态数据Workers、Drivers、Applications、Executors 等信息;
3、判断元数据信息是否有空的内容;
4、把通过Zookeeper持久化引擎获得的Workers、Drivers、Applications、Executors 等信息重新注册到Master的内存中缓存起来;
5、验证获得的信息和当前正在运行的集群的状态的一致性;
6、将Applications和Workers的状态标识为UNKOWN,然后会向Application中的Driver以及Worker发送现在是Leader的standby模式的Master的地址信息;
7、当Drivers以及Workers收到新的Master地址信息后,会响应改信息;
8、Master接收到来自Drviers和Workers的响应信息后,会使用一个关键的方法completeRecovery,对没有响应的Applications(Drivers)、Workers(Executors)进行处理,Master的state会变成RecoveryState.ALIVE ,从而可以开始对外服务
private def completeRecovery() {
// Ensure "only-once" recovery semantics using a short synchronization period.
if (state != RecoveryState.RECOVERING) { return }
state = RecoveryState.COMPLETING_RECOVERY
// Kill off any workers and apps that didn‘t respond to us.
workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)
// Reschedule drivers which were not claimed by any workers
drivers.filter(_.worker.isEmpty).foreach { d =>
logWarning(s"Driver ${d.id} was not found after master recovery")
if (d.desc.supervise) {
logWarning(s"Re-launching ${d.id}")
relaunchDriver(d)
} else {
removeDriver(d.id, DriverState.ERROR, None)
logWarning(s"Did not re-launch ${d.id} because it was not supervised")
}
}
state = RecoveryState.ALIVE
schedule()
logInfo("Recovery complete - resuming operations!")
}
d.desc.supervise此种方式在Drvier失败后重启Drvier
9、(关键一步)此时Master调用自己的scheduler方法对正在等待的Applications和Drviers进行资源调度!!!
本文出自 “一枝花傲寒” 博客,谢绝转载!
以上是关于Master HA彻底解密(DT大数据梦工厂)的主要内容,如果未能解决你的问题,请参考以下文章
Checkpoint彻底解密:Checkpoint的运行原理和源码实现彻底详解(DT大数据梦工厂)