Master HA彻底解密(DT大数据梦工厂)

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Master HA彻底解密(DT大数据梦工厂)相关的知识,希望对你有一定的参考价值。

内容:

1、Master HA解析;

2、Master HA的四种方式;

3、Master HA的内部工作机制;

4、Master HA的源码解密;

本讲主要源码角度分析Master HA,因为在生产环境必然要做的

==========Master HA解析============

Spark是Master-Slave的结构

技术分享

现在业界是1个Master Active,2个以上standby

如果有HA的话,切换active的时候,会在上次运行的基础上继续运行

Drvier提交程序、申请资源,是跟Master交互

ZOOKEEPER工作采用的是leader的机制,它对外提供服务,其它都是follower

ZOOKEEPER保留了集群的信息:Worker、Driver、Application的信息,会被持久化到Zookeeper,切换的时候只会影响新Job的提交。

因为每个Job运行之前,只要跟集群申请到资源之后,和Master没关系了,之后就是Driver和executor的交互,在27讲说过。所以只会影响新Job提交,而不会影响现有Job的运行。

=>总结:

1、生产环境一般采用zookeeper做HA,且建议为3台Master,Zookeeper会自动化管理Masters的切换;

2、采用Zookeeper做HA的时候,Zookeeper会保存整个Spark集群运行时候的元数据:所有的Workers、Drivers、Applications、Executors等;

3、Zookeeper在遇到当前Active级别的Master出现故障的时候,会从standby Master中选取出一台做为Active Master,但是要注意:被选举后到成为真正的Active Master之间需要从Zookeeper获取集群当前运行状态的元数据信息并进行恢复;

4、在Master切换的过程中,所有的已经在运行的程序皆正常运行!因为Spark Application在运行前就已经通过Cluster Manager获得了计算资源,所以在运行时Job本身的调度和处理和Master是没有任何关系的;

5、在Master切换过程中唯一的影响是,是不能提交新的Job:一方面不能提交新的应用程序给集群,因为只有Active Master才能接收新的程序的提交请求,另外一方面,已经运行的程序中也不能够因为Action操作触发新的Job的提交请求;

经验之谈:yarn的模式比standalone模式性能低30%左右

==========Master HA的四种方式============

1、MasterHA四大方式分别是:ZOOKEEPER、FILESYSTEM、CUSTOM(自定义)、NONE(不做HA,下载Spark直接使用);

2、需要说明的是:

1)ZOOKEEPER是自动管理Master;

2)FILESYSTEM的方式在Master出现故障后需要手动重新启动机器,机器启动后会立即成为Active级别的Master来对外提供服务(接受应用程序提交的请求,接受新的Job运行的请求);

3)CUSTOM的方式允许用户自定义MasterHA的实现,这对于高级用户特别有用;

4)NONE,这是默认情况,当我们下载安装了Spark集群后,就是采用这种方式,该方式不会持久化集群的数据,Master启动后立即管理集群;

Master中onStart

val serializer = new JavaSerializer(conf)
val (persistenceEngine_leaderElectionAgent_) = RECOVERY_MODE match {
  case "ZOOKEEPER" =>
    logInfo("Persisting recovery state to ZooKeeper")
    val zkFactory =
      new ZooKeeperRecoveryModeFactory(confserializer)
    (zkFactory.createPersistenceEngine()zkFactory.createLeaderElectionAgent(this))//数据持久化引擎和leader选举
  case "FILESYSTEM" =>
    val fsFactory =
      new FileSystemRecoveryModeFactory(confserializer)
    (fsFactory.createPersistenceEngine()fsFactory.createLeaderElectionAgent(this))
  case "CUSTOM" =>
    val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
    val factory = clazz.getConstructor(classOf[SparkConf]classOf[Serializer])
      .newInstance(confserializer)
      .asInstanceOf[StandaloneRecoveryModeFactory]
    (factory.createPersistenceEngine()factory.createLeaderElectionAgent(this))
  case _ =>
    (new BlackHolePersistenceEngine()new MonarchyLeaderAgent(this))
}
persistenceEngine = persistenceEngine_
leaderElectionAgent = leaderElectionAgent_

4、persistEngine中有一个至关重要的方法persist来实现数据持久化,readPersistedData来回复集群中的元数据;

/**
 * Returns the persisted data sorted by their respective ids (which implies that they‘re
 * sorted by time of creation).
 */
final def readPersistedData(
    rpcEnv: RpcEnv): (Seq[ApplicationInfo]Seq[DriverInfo]Seq[WorkerInfo]) = {
  rpcEnv.deserialize { () =>
    (read[ApplicationInfo]("app_")read[DriverInfo]("driver_")read[WorkerInfo]("worker_"))
  }
}

5、FILESYSTEM和NONE均是采用MonarchyLeaderAgent的方式来完成leader的选举,其实际实现是直接将传入的Master作为leader

def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent = {
  new MonarchyLeaderAgent(master)
}

/** Single-node implementation of LeaderElectionAgent -- we‘re initially and always the leader. */
private[spark] class MonarchyLeaderAgent(val masterInstance: LeaderElectable)
  extends LeaderElectionAgent {
  masterInstance.electedLeader()
}

6、NONE根本不需要持久化,为什么写了BlockHolePersistenceEngine,里面啥都没实现?代码结构统一,且易扩展;

private[master] class BlackHolePersistenceEngine extends PersistenceEngine {

  override def persist(name: Stringobj: Object): Unit = {}

  override def unpersist(name: String): Unit = {}

  override def read[T: ClassTag](name: String): Seq[T] = Nil

}

==========Master HA的内部工作机制(主要Zookeeper)============

1、Zookeeper自动从Standby Master里面选取出作为Leader的Master;

2、使用ZookeeprPersistEngine去读取集群的状态数据Workers、Drivers、Applications、Executors 等信息;

3、判断元数据信息是否有空的内容;

4、把通过Zookeeper持久化引擎获得的Workers、Drivers、Applications、Executors 等信息重新注册到Master的内存中缓存起来;

5、验证获得的信息和当前正在运行的集群的状态的一致性;

6、将Applications和Workers的状态标识为UNKOWN,然后会向Application中的Driver以及Worker发送现在是Leader的standby模式的Master的地址信息;

7、当Drivers以及Workers收到新的Master地址信息后,会响应改信息;

8、Master接收到来自Drviers和Workers的响应信息后,会使用一个关键的方法completeRecovery,对没有响应的Applications(Drivers)、Workers(Executors)进行处理,Master的state会变成RecoveryState.ALIVE ,从而可以开始对外服务

private def completeRecovery() {
  // Ensure "only-once" recovery semantics using a short synchronization period.
  if (state != RecoveryState.RECOVERING) { return }
  state = RecoveryState.COMPLETING_RECOVERY

  // Kill off any workers and apps that didn‘t respond to us.
  workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
  apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)

  // Reschedule drivers which were not claimed by any workers
  drivers.filter(_.worker.isEmpty).foreach { d =>
    logWarning(s"Driver ${d.id} was not found after master recovery")
    if (d.desc.supervise) {
      logWarning(s"Re-launching ${d.id}")
      relaunchDriver(d)
    } else {
      removeDriver(d.idDriverState.ERRORNone)
      logWarning(s"Did not re-launch ${d.id} because it was not supervised")
    }
  }

  state = RecoveryState.ALIVE
  schedule()
  logInfo("Recovery complete - resuming operations!")
}

d.desc.supervise此种方式在Drvier失败后重启Drvier

9、(关键一步)此时Master调用自己的scheduler方法对正在等待的Applications和Drviers进行资源调度!!!

技术分享


本文出自 “一枝花傲寒” 博客,谢绝转载!

以上是关于Master HA彻底解密(DT大数据梦工厂)的主要内容,如果未能解决你的问题,请参考以下文章

彻底解密WordCount运行原理(DT大数据梦工厂)

Spark on Yarn彻底解密(DT大数据梦工厂)

Spark高级排序彻底解密(DT大数据梦工厂)

Checkpoint彻底解密:Checkpoint的运行原理和源码实现彻底详解(DT大数据梦工厂)

Spark Sort-Based Shuffle内幕彻底解密(DT大数据梦工厂)

CacheManager彻底解密:CacheManager运行原理流程图和源码详解(DT大数据梦工厂)