Spark 启动 | 从启动脚本分析 Master 的启动流程
Posted 大数据记事本
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark 启动 | 从启动脚本分析 Master 的启动流程相关的知识,希望对你有一定的参考价值。
一、往期回顾
# Start Master
"${SPARK_HOME}/sbin"/start-master.sh
# Start Workers
"${SPARK_HOME}/sbin"/start-slaves.sh
/sbin/start-master.sh
内部执行了 /sbin/spark-daemon.sh
"${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS 1 \
--host $SPARK_MASTER_HOST --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT \
$ORIGINAL_ARGS
/sbin/start-slaves.sh
内部多次执行了 /sbin/start-slave.sh 来启动多个 Worker 节点
/sbin/start-slave.sh
内部同样执行了 /sbin/spark-daemon.sh
"${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS $WORKER_NUM \
--webui-port "$WEBUI_PORT" $PORT_FLAG $PORT_NUM $MASTER "$@"
/sbin/spark-daemon.sh
内部执行了 /bin/spark-class 脚本
case "$mode" in
(class)
execute_command nice -n "$SPARK_NICENESS" "${SPARK_HOME}"/bin/spark-class "$command" "$@"
;;
/bin/spark-class
最终在 /bin/spark-class 脚本中,调用了 org.apache.spark.launcher.Main 类的 main 方法
build_command() {
"$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
printf "%d\0" $?
}
Main.main 方法会屏蔽平台的差异性,然后分别启动 master 和 worker
master启动:java org.apache.spark.deploy.master.Master
worker启动:java org.apache.spark.deploy.worker.Worker
Master 类的伴生对象提供了 main 方法,来启动 Master,代码如下
private[deploy] object Master extends Logging {
//RpcEnv名称
val SYSTEM_NAME = "sparkMaster"
//Master对应的Endpoint名称
val ENDPOINT_NAME = "Master"
/**
* TODO Master启动入口
* @param argStrings
*/
def main(argStrings: Array[String]) {
Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(
exitOnUncaughtException = false))
Utils.initDaemon(log)
//TODO 第一步:解析参数
//创建conf对象,读取spark默认的配置参数
val conf = new SparkConf
//解析程序参数,读取以spark. 开头的环境变量
val args = new MasterArguments(argStrings, conf)
/**
* 综上,spark配置的来源有三个:
* 1.环境变量
* 2.程序运行参数
* 3.配置文件
*/
//TODO 第二步:启动RPC服务端
val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
rpcEnv.awaitTermination()
}
mian 方法中主要做了两件事:
解析参数
启动 RPC 服务端
这里的参数有三个来源:
环境变量(如 SPARK_MASTER_HOST、SPARK_MASTER_IP 等)
程序运行参数(如 --ip、--host 等)
配置文件(spark-defaults.conf)
创建 RpcEnv 对象
创建 Endpoint 对象,启动 Endpoint 生命周期,即调用其 onStart 方法
将 Endpoint 对象注册到消息分发器 Dispatcher
调用 EndpointRef 代理对象的 askSync 方法,同步等待返回。这里会给自己发送一个 BoundPortsRequest 类型的消息,由 Master 的 receiveAndReply 方法处理
def startRpcEnvAndEndpoint(
host: String,
port: Int,
webUiPort: Int,
conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
//初始化SecurityManager
val securityMgr = new SecurityManager(conf)
//TODO 创建RpcEnv对象,类似akka中的ActorSystem。其中SYSTEM_NAME是其名称
val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
//TODO 创建EndPoint对象并启动
//做两件事:
//1.创建Master对象
//2.rpcEnv.setupEndpoint 就会调用 EndPoint 的 onStart() 方法
val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
//调用EndPoint对象的askSync方法,同步等待返回
val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest)
(rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
}
下面依次分析这四个步骤:
1.创建 RpcEnv 对象
调用的是 RpcEnv 的 create 方法:
def create(
name: String,
bindAddress: String,
advertiseAddress: String,
port: Int,
conf: SparkConf,
securityManager: SecurityManager,
numUsableCores: Int,
clientMode: Boolean): RpcEnv = {
第一步:初始化 RpcEnvConfig 配置对象
val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
clientMode)
第二步:创建NettyRpcEnvFactory工厂对象,通过工厂对象创建RpcEnv实例
new NettyRpcEnvFactory().create(config)
}
第一步:初始化 RpcEnvConfig 配置对象
第二步:创建 NettyRpcEnvFactory 工厂对象,通过工厂对象的 create 方法创建RpcEnv实例。create 方法如下:
def create(config: RpcEnvConfig): RpcEnv = {
//TODO 第一步:获取配置对象
val sparkConf = config.conf
//TODO 第二步:获取一个Java序列化器,从这里可以看出来,spark的序列化默认采用的是java的序列化
val javaSerializerInstance =
new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]
//TODO 第三步:创建一个NettyRpcEnv对象
val nettyEnv =
new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
config.securityManager, config.numUsableCores)
//clientMode默认为false,走这个分支
if (!config.clientMode) {
val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
//启动 NettyRpcEnv
nettyEnv.startServer(config.bindAddress, actualPort)
(nettyEnv, nettyEnv.address.port)
}
try {
//绑定端口,启动服务
Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
} catch {
case NonFatal(e) =>
nettyEnv.shutdown()
throw e
}
}
nettyEnv
}
获取 SparkConf 配置对象
获取 Java 序列化器。从这里可以看出,Spark 序列化默认采用的是 Java 的序列化
创建 NettyRpcEnv 实例
如果 clientMode 为 false,(默认为 false)绑定端口,启动 NettyRpcEnv 服务
private[netty] class NettyRpcEnv(
val conf: SparkConf,//配置对象
javaSerializerInstance: JavaSerializerInstance,//java序列化器
host: String,//主机名
securityManager: SecurityManager,
numUsableCores: Int//可用的核数,默认为0
) extends RpcEnv(conf) with Logging {
//配置对象
private[netty] val transportConf = SparkTransportConf.fromSparkConf(
conf.clone.set("spark.rpc.io.numConnectionsPerPeer", "1"),
"rpc",
conf.getInt("spark.rpc.io.threads", numUsableCores))
//事件分发器。调度:哪些请求由哪些EndPoint处理
private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)
//文件服务,提交任务时的jar包,文件都是这个组件管理的
private val streamManager = new NettyStreamManager(this)
//NettyRpcEnv的上下文对象,里面包含了NettyRpcEnv的各种功能
private val transportContext = new TransportContext(transportConf,
new NettyRpcHandler(dispatcher, this, streamManager))
//如果将来当前节点需要和其它的 RpcServer(TransportServer)进行通信,就需要获取TransportClient对象,
//这里的clientFactory工厂对象用来构建TransportClient实例对象
private val clientFactory = transportContext.createClientFactory(createClientBootstraps())
//超时任务检查的线程池
val timeoutScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("netty-rpc-env-timeout")
//TODO 客户端连接线程池
private[netty] val clientConnectionExecutor = ThreadUtils.newDaemonCachedThreadPool(
"netty-rpc-connection",
conf.getInt("spark.rpc.connect.threads", 64))
//标识Master的状态,启动后为false
private val stopped = new AtomicBoolean(false)
//邮箱集合,用于记录远端Endpoint地址及对应的Outbox对象
private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()
//当前服务端地址
override lazy val address: RpcAddress = {
if (server != null) RpcAddress(host, server.getPort()) else null
}
...
}
其中几个重要的变量含义如下:
dispatcher:消息分发器。对应接收到的消息,如果是发送给自己管理的 Endpoint ,那么把这个消息放入该 Endpoint 的 Inbox 收件箱;如果是发送给远端的 RpcEnc 管理的 Endpoint,则将该消息放入该 Endpoint 的 Outbox 发件箱。Dispatcher 内部定义了 EndpointData 类,用于维护 Endpoint、EndpointRef 及 Inbox 的对应关系,并通过变量 endpoints 和 endpointRefs 管理
private class EndpointData(
val name: String,//Endpoint名称
val endpoint: RpcEndpoint,//Endpoint对象
val ref: NettyRpcEndpointRef//EndpointRef 代理对象
) {
val inbox = new Inbox(ref, endpoint)//Endpoint对象的收件箱
}
//key为Endpoint名称,value是EndpointData对象
private val endpoints: ConcurrentMap[String, EndpointData] =
new ConcurrentHashMap[String, EndpointData]
//维护 Endpoint 和 EndpointRef 的对应关系
private val endpointRefs: ConcurrentMap[RpcEndpoint, RpcEndpointRef] =
new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]
streamManager:文件服务。提交任务时的jar包、文件都是这个组件管理的
transportContext:NettyRpcEnv的上下文对象。里面包含了NettyRpcEnv的各种功能
clientFactory:如果将来当前节点需要和其它的 RpcServer(TransportServer)进行通信,就需要获取 TransportClient 对象,这里的 clientFactory 工厂对象用来构建 TransportClient 实例对象
clientConnectionExecutor:客户端连接的线程池。如果配置了 spark.rpc.connect.threads 参数,线程数采用配置值,如果没有配置采用默认值 64
2.创建 Endpoint 对象,开始其生命周期
这里就是创建 Master 对象
private[deploy] class Master(
override val rpcEnv: RpcEnv,//RpcEnv对象
address: RpcAddress,//Endpoint地址,里面封装了host和port
webUiPort: Int,//WebUI端口号
val securityMgr: SecurityManager,//安全管理对象
val conf: SparkConf//配置对象
)
extends ThreadSafeRpcEndpoint with Logging with LeaderElectable {
//发送消息的线程,作用就是给自己发消息,启动各种定时任务
private val forwardMessageThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread")
//加载的 hadoop 配置
private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
//TODO Woker连接的超时时间,woker的心跳时间为15秒,默认4次没有接收到心跳则认为该worker宕机了
private val WORKER_TIMEOUT_MS = conf.getLong("spark.worker.timeout", 60) * 1000
//执行完成的app在webui里面保存的数量,默认为200
private val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 200)
//保存执行的driver数量,默认为200
private val RETAINED_DRIVERS = conf.getInt("spark.deploy.retainedDrivers", 200)
//断开的worker信息保存数量,默认为15
private val REAPER_ITERATIONS = conf.getInt("spark.dead.worker.persistence", 15)
//worker恢复模式,默认为NONE
private val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE")
//executor最大重试次数,默认为10次
private val MAX_EXECUTOR_RETRIES = conf.getInt("spark.deploy.maxExecutorRetries", 10)
//Master中存储Worker信息的容器
val workers = new HashSet[WorkerInfo]
//存储appID 对应 Application信息的容器
val idToApp = new HashMap[String, ApplicationInfo]
//等待执行的Application
private val waitingApps = new ArrayBuffer[ApplicationInfo]
//所有Application信息
val apps = new HashSet[ApplicationInfo]
//workerID对应的worker节点信息
private val idToWorker = new HashMap[String, WorkerInfo]
//RPC 中注册的 ip 与对应的 worker 信息的对应关系
private val addressToWorker = new HashMap[RpcAddress, WorkerInfo]
//Application的Driver所在节点的EndPointRef代理对象和App信息的对应关系
private val endpointToApp = new HashMap[RpcEndpointRef, ApplicationInfo]
//RPC 中注册的 ip 与对应的 app
private val addressToApp = new HashMap[RpcAddress, ApplicationInfo]
//完成的Application
private val completedApps = new ArrayBuffer[ApplicationInfo]
//Application 计数器
private var nextAppNumber = 0
//Driver信息容器
private val drivers = new HashSet[DriverInfo]
//完成的Driver
private val completedDrivers = new ArrayBuffer[DriverInfo]
// Drivers currently spooled for scheduling
//等待执行的Driver
private val waitingDrivers = new ArrayBuffer[DriverInfo]
//Driver计数器
private var nextDriverNumber = 0
//host检查
Utils.checkHost(address.host)
//master注册metrics服务
private val masterMetricsSystem = MetricsSystem.createMetricsSystem("master", conf, securityMgr)
//Application注册Metrics服务
private val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications", conf,
securityMgr)
//用来收集性能相关的信息,Master实例就是MasterSource的成员
private val masterSource = new MasterSource(this)
//WebUI对象,在onStart方法中会初始化
private var webUi: MasterWebUI = null
//所有的节点初始化都是 STANDBY 状态,只有经过选举了才会改为 ALIVE 状态
private var state = RecoveryState.STANDBY
//持久化引擎,基于zk实现,实现对zk的增删改查
private var persistenceEngine: PersistenceEngine = _
//帮忙执行 Spark 选举的代理对象,使用zk的客户端框架Curator来实现,调用LeaderLatch.start()方法的时候就执行了选举
//当选举结束,且成为leader后,就回调:isLeader() 方法,如果选举失败,就回调 notLeader() 方法
private var leaderElectionAgent: LeaderElectionAgent = _
...
}
Master 中定义了各种变量,重点看其中几个:
WORKER_TIMEOUT_MS:worker连接的超时时间,默认60秒。也就是说,如果超过 60 秒 没有收到 worker 发送的心跳消息,就认为该 worker 下线了。可以通过参数 spark.worker.timeout 设置。worker 的心跳间隔也是由这个变量决定的,即 WORKER_TIMEOUT_MS/4 得到的就是心跳间隔,默认 15 秒
RECOVERY_MODE:恢复模式,默认为 NONE。但 spark-defaults.conf 配置文件中一般会配置 spark.deploy.recoverMode = ZOOKEEPER,然后在 spark-env.sh 中,设置变量 RECOVER_MODE = spark.deploy.recoverMode = ZOOKEEPER
persistenceEngine:持久化引擎。基于 zookeepe r实现,实现对 zookeeper 的增删改查
leaderElectionAgent:选举代理对象。属于 HA 范畴,当配置了多个 Master 时,只有一个可以选举成为 Leader,状态为 ALIVE,其余的状态都是 STANDBY。其使用 zk的客户端框架 Curator 来实现,调用LeaderLatch.start()方法的时候就执行了选举。当选举结束,且成为leader后,就回调:isLeader() 方法,如果选举失败,就回调 notLeader() 方法
主要做了三件事:
启动WebUI
启动一个定时任务,每隔 60 秒检查一次是否有 worker 失去联系超过 60 秒
创建一个 Leader 选举的代理对象,并完成选举
①启动WebUI
//WebUI初始化
webUi = new MasterWebUI(this, webUiPort)
//绑定WebUI地址和端口号(默认8080),真正启动
webUi.bind()
②启动定时任务
checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
/**
* ref:发送消息给别人
* self:发送消息给自己。Master本身就是一个EndPoint实例
* 这里给自己发送一个CheckForWorkerTimeOut类型的消息,receive方法收到后执行对应的逻辑
*/
self.send(CheckForWorkerTimeOut)
}
//WORKER_TIMEOUT_MS 60秒
}, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
这里的 scheduleAtFixedRate 就是启动一个定时任务,每隔 60 秒给自己发送一个 CheckForWorkerTimeOut 类型的消息,用来检查是否有 worker 心跳超时。receive 方法接收到这个消息后,会调用 timeOutDeadWorks 方法进行处理
private def timeOutDeadWorkers() {
//获取当前时间
val currentTime = System.currentTimeMillis()
//查找心跳超时的workers节点,上一次心跳的时间 < 当前时间 - 心跳超时时间
val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT_MS).toArray
//对所有心跳超时的worker节点执行下线
for (worker <- toRemove) {
if (worker.state != WorkerState.DEAD) {
logWarning("Removing %s because we got no heartbeat in %d seconds".format(
worker.id, WORKER_TIMEOUT_MS / 1000))
//移除worker
removeWorker(worker, s"Not receiving heartbeat for ${WORKER_TIMEOUT_MS / 1000} seconds")
} else {
if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT_MS)) {
workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
}
}
}
}
该方法首先会查找心跳超时的 worker:最后一次心跳的时间 < 当前时间 - 心跳超时时间
如果心跳超时,则调用 removeWorker 方法移除该 worker 节点信息
private def removeWorker(worker: WorkerInfo, msg: String) {
logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
//第一步:更新worker节点状态为DEAD
worker.setState(WorkerState.DEAD)
//第二步:将worker信息从变量中移除
idToWorker -= worker.id
addressToWorker -= worker.endpoint.address
//第三步:如果待移除的worker上面运行了一些executor,那么要将这些executor转移到别的worker运行
for (exec <- worker.executors.values) {
logInfo("Telling app of lost executor: " + exec.id)
exec.application.driver.send(ExecutorUpdated(
exec.id, ExecutorState.LOST, Some("worker lost"), None, workerLost = true))
exec.state = ExecutorState.LOST
exec.application.removeExecutor(exec)
}
//第四步:如果待移除的worker上面运行了一些driver,那么要将这些driver转移到别的worker运行
for (driver <- worker.drivers.values) {
if (driver.desc.supervise) {
logInfo(s"Re-launching ${driver.id}")
//重新启动driver
relaunchDriver(driver)
} else {
logInfo(s"Not re-launching ${driver.id} because it was not supervised")
removeDriver(driver.id, DriverState.ERROR, None)
}
}
logInfo(s"Telling app of lost worker: " + worker.id)
apps.filterNot(completedApps.contains(_)).foreach { app =>
app.driver.send(WorkerRemoved(worker.id, worker.host, msg))
}
//第五步:移除zk上的节点信息
persistenceEngine.removeWorker(worker)
}
移除 worker 信息主要分为以下几步:
第一步:更新 worker 节点的状态为 DEAD
第二步:将 worker 信息从变量中移除。主要包括 idToWorker 和 addressToWorker
第三步:如果待移除的 worker上面运行了一些 executor,那么要将这些 executor 转移到别的 worker 运行
第四步:如果待移除的 worker上面运行了一些 driver,那么要将这些 driver 转移到别的 worker 运行
第五步:移除 zk 上存储的 worker 信息
③创建选举代理对象,完成选举
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
//如果是ZOOKEEPER模式,将persistenceEngine_和leaderElectionAgent_两个对象创建出来
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
//初始化zk的工厂对象
val zkFactory =
new ZooKeeperRecoveryModeFactory(conf, serializer)
(
//TODO 1.初始化基于zk的持久化引擎
zkFactory.createPersistenceEngine(),
//TODO 2.初始化基于zk的Leader选举代理
zkFactory.createLeaderElectionAgent(this))
case "FILESYSTEM" =>
val fsFactory =
new FileSystemRecoveryModeFactory(conf, serializer)
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
case "CUSTOM" =>
val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
.newInstance(conf, serializer)
.asInstanceOf[StandaloneRecoveryModeFactory]
(factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
case _ =>
(new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
}
zkFactory.createPersistenceEngine(),内部创建了 ZooKeeperPersistenceEngine 实例
private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serializer: Serializer)
extends PersistenceEngine
with Logging {
private val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
//TODO 获取了一个zk的客户端,用于zk的管理,这里使用的就是zk的客户端框架Curator
private val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)
//创建zk的 /spark/master_status znode节点
SparkCuratorUtil.mkdir(zk, WORKING_DIR)
...
}
这里主要是使用 Curator 框架获取了一个 zk 的客户端,并在 zk 上 创建了 /spark/master_status 节点
zkFactory.createLeaderElectionAgent(this)),内部初始化了 ZooKeeperLeaderElectionAgent
private[master] class ZooKeeperLeaderElectionAgent(val masterInstance: LeaderElectable,
conf: SparkConf) extends LeaderLatchListener with LeaderElectionAgent with Logging {
//用于选举的节点路径 /spark/leader_election
val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
//zk客户端对象
private var zk: CuratorFramework = _
private var leaderLatch: LeaderLatch = _
//选举状态,初始为没有Leader
private var status = LeadershipStatus.NOT_LEADER
start()
private def start() {
logInfo("Starting ZooKeeper LeaderElection agent")
//初始化zk的客户端
zk = SparkCuratorUtil.newClient(conf)
//封装一个LeaderLatch
leaderLatch = new LeaderLatch(zk, WORKING_DIR)
//绑定当前的this对象为监听器,当leader选举成功,会调用该监听器的isLeader()方法
leaderLatch.addListener(this)
//启动选举
leaderLatch.start()
}
...
}
初始化 ZooKeeperLeaderElectionAgent 对象的过程中,会调用其 start 方法,该方法的作用是:
初始化 zk 客户端
初始化 LeaderLatch 对象
绑定当前的 this 对象为监听器,当 leader 选举成功,会调用该监听器的 isLeader 方法
开启 Leader 选举
对于 Leader 的选举流程这里先不做具体分析,我们直接看选举成功后,isLeader 方法的逻辑
override def isLeader() {
synchronized {
//确认是否产生了Leader
if (!leaderLatch.hasLeadership) {
return
}
logInfo("We have gained leadership")
//TODO 修改Leader状态
updateLeadershipStatus(true)
}
}
首先会确认是否产生了 Leader,如果确实产生了,调用 updateLeadershipStatus 方法:
private def updateLeadershipStatus(isLeader: Boolean) {
//如果成为了Leader,且之前是没有Leader的
if (isLeader && status == LeadershipStatus.NOT_LEADER) {
//更新当前Master节点的状态为Leader
status = LeadershipStatus.LEADER
//给Master发送消息:当前的代理已经帮你竞选成为 Active Leader 了
masterInstance.electedLeader()
//如果成为Leader被撤销了
} else if (!isLeader && status == LeadershipStatus.LEADER) {
status = LeadershipStatus.NOT_LEADER
masterInstance.revokedLeadership()
}
}
case ElectedLeader =>
//选举成功后,根据zk中是否有app,driver,worker等信息更新自己的状态
val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)
state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
//如果这些信息都没有,说明是刚启动的选举,更新状态为ALIVE
RecoveryState.ALIVE
} else {
//如果是恢复,即当前的alive master死掉了,standby master顶替上来。更新状态为RECOVERING
RecoveryState.RECOVERING
}
logInfo("I have been elected leader! New state: " + state)
//如果是恢复状态
if (state == RecoveryState.RECOVERING) {
//启动恢复
beginRecovery(storedApps, storedDrivers, storedWorkers)
//延时任务调度,延时60秒 ,向自己发送CompleteRecovery消息
recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CompleteRecovery)
}
}, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
}
这里首先会尝试从 zk 中查看是否有 Application、Driver、Worker 等信息
如果没有,说明是刚启动时的选举,直接更新 Master 状态为 ALIVE
如果有,说明是恢复状态,即当前的 alive master 死掉了,standby master顶替上来,更新状态为RECOVERING。然后会启动恢复,并延时 60 秒向自己发送 CompleteRecovery 类型的消息
主要是三类角色的恢复:
Application 的恢复
Driver 的恢复
Worker 的恢复
private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],
storedWorkers: Seq[WorkerInfo]) {
//TODO 1.Application的恢复
//遍历所有的Application信息
for (app <- storedApps) {
logInfo("Trying to recover app: " + app.id)
try {
//在切换的Master上注册Application
registerApplication(app)
//更新应用状态为UNKNOW
app.state = ApplicationState.UNKNOWN
//给应用的Driver节点发送MasterChanged消息
app.driver.send(MasterChanged(self, masterWebUiUrl))
} catch {
case e: Exception => logInfo("App " + app.id + " had exception on reconnect")
}
}
//TODO 2.Driver的恢复
for (driver <- storedDrivers) {
// Here we just read in the list of drivers. Any drivers associated with now-lost workers
// will be re-launched when we detect that the worker is missing.
drivers += driver
}
//TODO 3.Worker的恢复
for (worker <- storedWorkers) {
logInfo("Trying to recover worker: " + worker.id)
try {
registerWorker(worker)
worker.state = WorkerState.UNKNOWN
worker.endpoint.send(MasterChanged(self, masterWebUiUrl))
} catch {
case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect")
}
}
}
private def completeRecovery() {
//验证 Master 状态,不为RECOVERING直接返回
if (state != RecoveryState.RECOVERING) {
return
}
//更新状态为COMPLETING_RECOVERY
state = RecoveryState.COMPLETING_RECOVERY
//将UNKNOWN状态的Worker和Application移除
workers.filter(_.state == WorkerState.UNKNOWN).foreach(
removeWorker(_, "Not responding for recovery"))
apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)
//将WAITING状态的Application更新为RUNNING状态
apps.filter(_.state == ApplicationState.WAITING).foreach(_.state = ApplicationState.RUNNING)
//给未分配Worker的Driver重新分配Worker
drivers.filter(_.worker.isEmpty).foreach { d =>
logWarning(s"Driver ${d.id} was not found after master recovery")
if (d.desc.supervise) {
logWarning(s"Re-launching ${d.id}")
relaunchDriver(d)
} else {
removeDriver(d.id, DriverState.ERROR, None)
logWarning(s"Did not re-launch ${d.id} because it was not supervised")
}
}
//更新 Master 状态为 ALIVE
state = RecoveryState.ALIVE
//调度资源
schedule()
logInfo("Recovery complete - resuming operations!")
}
该方法的逻辑是:
验证 Master 状态
更新 Master 状态为COMPLETING_RECOVERY
移除 UNKNOWN 状态的 Worker 和 Application
将 WAITING 状态的 Application 更新为 RUNNING 状态
给未分配 Worker 的 Driver 重新分配 Worker
更新 Master 状态为 ALIVE
调度资源
这里 schedule 方法的作用是:给等待的应用安排可用的资源。每当有新应用加入或者资源可用性发生改变时都会被调用,后续再进行详细分析。
val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
这里调用的是 NettyRpcEnv.setupEndpoint 方法,内部调用了 Dispatcher.registerEndpoint 方法
//注册 Endpoint 对象,即将 Endpoint 对象信息添加到各个变量中
// 并获取对应的 EndpointRef 代理对象
def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
val addr = RpcEndpointAddress(nettyEnv.address, name)
val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
synchronized {
if (stopped) {
throw new IllegalStateException("RpcEnv has been stopped")
}
if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
}
//获取EndpointData对象
val data = endpoints.get(name)
endpointRefs.put(data.endpoint, data.ref)
receivers.offer(data) // for the OnStart message
}
endpointRef
}
val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest)
case BoundPortsRequest =>
context.reply(BoundPortsResponse(address.port, webUi.boundPort, restServerBoundPort))
以上是关于Spark 启动 | 从启动脚本分析 Master 的启动流程的主要内容,如果未能解决你的问题,请参考以下文章
Spark学习之路 (十五)SparkCore的源码解读启动脚本[转]