Spark 启动 | 从启动脚本分析 Master 的启动流程

Posted 大数据记事本

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark 启动 | 从启动脚本分析 Master 的启动流程相关的知识,希望对你有一定的参考价值。

一、往期回顾

     在《 》一篇中,分析过 Spark 从 2.x 版本开始,RPC 框架完全采用 Netty 实现。其中三个重要的概念包括:RpcEnv、Endpoint、EndpointRef。而 Master 和 Worker 均是 Endpoint 的子类,所以 Master 由 RpcEnv 管理,且生命周期遵循 Endpoint 的生命周期。
二、启动脚本分析
     在启动 Spark 时,执行的是 /sbin/start-all.sh 脚本,其内部分别执行了 /sbin/start-master.sh 启动一个 Master 节点,执行了 /sbin/start-slaves.sh 启动多个 Worker 节点
# Start Master"${SPARK_HOME}/sbin"/start-master.sh
# Start Workers"${SPARK_HOME}/sbin"/start-slaves.sh

/sbin/start-master.sh

内部执行了 /sbin/spark-daemon.sh

"${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS 1 \ --host $SPARK_MASTER_HOST --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT \ $ORIGINAL_ARGS

/sbin/start-slaves.sh

内部多次执行了 /sbin/start-slave.sh 来启动多个 Worker 节点

/sbin/start-slave.sh

内部同样执行了 /sbin/spark-daemon.sh

"${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS $WORKER_NUM \ --webui-port "$WEBUI_PORT" $PORT_FLAG $PORT_NUM $MASTER "$@"

/sbin/spark-daemon.sh

内部执行了 /bin/spark-class 脚本

case "$mode" in (class) execute_command nice -n "$SPARK_NICENESS" "${SPARK_HOME}"/bin/spark-class "$command" "$@" ;;

/bin/spark-class

最终在 /bin/spark-class 脚本中,调用了 org.apache.spark.launcher.Main 类的 main 方法

build_command() { "$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@" printf "%d\0" $?}

Main.main 方法会屏蔽平台的差异性,然后分别启动 master 和 worker

master启动:java org.apache.spark.deploy.master.Masterworker启动:java org.apache.spark.deploy.worker.Worker
三、Master 启动流程图解

四、Master 启动流程分析

Master 类的伴生对象提供了 main 方法,来启动 Master,代码如下

private[deploy] object Master extends Logging { //RpcEnv名称 val SYSTEM_NAME = "sparkMaster" //Master对应的Endpoint名称 val ENDPOINT_NAME = "Master"
/** * TODO Master启动入口 * @param argStrings */ def main(argStrings: Array[String]) {
Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler( exitOnUncaughtException = false)) Utils.initDaemon(log)
//TODO 第一步:解析参数 //创建conf对象,读取spark默认的配置参数 val conf = new SparkConf
//解析程序参数,读取以spark. 开头的环境变量 val args = new MasterArguments(argStrings, conf)
/** * 综上,spark配置的来源有三个: * 1.环境变量 * 2.程序运行参数 * 3.配置文件 */
//TODO 第二步:启动RPC服务端 val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
rpcEnv.awaitTermination()}

mian 方法中主要做了两件事:

  • 解析参数

  • 启动 RPC 服务端

这里的参数有三个来源:

  • 环境变量(如 SPARK_MASTER_HOST、SPARK_MASTER_IP 等)

  • 程序运行参数(如 --ip、--host 等)

  • 配置文件(spark-defaults.conf)

    启动 RPC 服务端调用的是 startRpcEnvAndEndpoint 方法,这个方法主要分为四步:
  • 创建 RpcEnv 对象

  • 创建 Endpoint 对象,启动 Endpoint 生命周期,即调用其 onStart 方法

  • 将 Endpoint 对象注册到消息分发器 Dispatcher

  • 调用 EndpointRef 代理对象的 askSync 方法,同步等待返回。这里会给自己发送一个 BoundPortsRequest 类型的消息,由 Master 的 receiveAndReply 方法处理

def startRpcEnvAndEndpoint( host: String, port: Int, webUiPort: Int, conf: SparkConf): (RpcEnv, Int, Option[Int]) = { //初始化SecurityManager val securityMgr = new SecurityManager(conf)
//TODO 创建RpcEnv对象,类似akka中的ActorSystem。其中SYSTEM_NAME是其名称 val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
//TODO 创建EndPoint对象并启动 //做两件事: //1.创建Master对象 //2.rpcEnv.setupEndpoint 就会调用 EndPoint 的 onStart() 方法 val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
//调用EndPoint对象的askSync方法,同步等待返回 val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest) (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)}

下面依次分析这四个步骤:

1.创建 RpcEnv 对象

调用的是 RpcEnv 的 create 方法:

def create( name: String, bindAddress: String, advertiseAddress: String, port: Int, conf: SparkConf, securityManager: SecurityManager, numUsableCores: Int, clientMode: Boolean): RpcEnv = { //TODO 第一步:初始化 RpcEnvConfig 配置对象 val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager, numUsableCores, clientMode) //TODO 第二步:创建NettyRpcEnvFactory工厂对象,通过工厂对象创建RpcEnv实例 new NettyRpcEnvFactory().create(config)}
  • 第一步:初始化 RpcEnvConfig 配置对象

  • 第二步:创建 NettyRpcEnvFactory 工厂对象,通过工厂对象的 create 方法创建RpcEnv实例。create 方法如下:

def create(config: RpcEnvConfig): RpcEnv = { //TODO 第一步:获取配置对象 val sparkConf = config.conf
//TODO 第二步:获取一个Java序列化器,从这里可以看出来,spark的序列化默认采用的是java的序列化 val javaSerializerInstance = new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance] //TODO 第三步:创建一个NettyRpcEnv对象 val nettyEnv = new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress, config.securityManager, config.numUsableCores) //clientMode默认为false,走这个分支 if (!config.clientMode) { val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort => //启动 NettyRpcEnv nettyEnv.startServer(config.bindAddress, actualPort) (nettyEnv, nettyEnv.address.port) } try { //绑定端口,启动服务 Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1 } catch { case NonFatal(e) => nettyEnv.shutdown() throw e } } nettyEnv}
  • 获取 SparkConf 配置对象

  • 获取 Java 序列化器。从这里可以看出,Spark 序列化默认采用的是 Java 的序列化

  • 创建 NettyRpcEnv 实例

  • 如果 clientMode 为 false,(默认为 false)绑定端口,启动 NettyRpcEnv 服务

这里重点看一下 NettyRpcEnv 的定义:
private[netty] class NettyRpcEnv( val conf: SparkConf,//配置对象 javaSerializerInstance: JavaSerializerInstance,//java序列化器 host: String,//主机名 securityManager: SecurityManager, numUsableCores: Int//可用的核数,默认为0 ) extends RpcEnv(conf) with Logging {
//配置对象 private[netty] val transportConf = SparkTransportConf.fromSparkConf( conf.clone.set("spark.rpc.io.numConnectionsPerPeer", "1"), "rpc", conf.getInt("spark.rpc.io.threads", numUsableCores))
//事件分发器。调度:哪些请求由哪些EndPoint处理 private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)
//文件服务,提交任务时的jar包,文件都是这个组件管理的 private val streamManager = new NettyStreamManager(this)
//NettyRpcEnv的上下文对象,里面包含了NettyRpcEnv的各种功能 private val transportContext = new TransportContext(transportConf, new NettyRpcHandler(dispatcher, this, streamManager))
//如果将来当前节点需要和其它的 RpcServer(TransportServer)进行通信,就需要获取TransportClient对象, //这里的clientFactory工厂对象用来构建TransportClient实例对象 private val clientFactory = transportContext.createClientFactory(createClientBootstraps()) //超时任务检查的线程池 val timeoutScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("netty-rpc-env-timeout") //TODO 客户端连接线程池 private[netty] val clientConnectionExecutor = ThreadUtils.newDaemonCachedThreadPool( "netty-rpc-connection", conf.getInt("spark.rpc.connect.threads", 64)) //标识Master的状态,启动后为false private val stopped = new AtomicBoolean(false) //邮箱集合,用于记录远端Endpoint地址及对应的Outbox对象 private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]() //当前服务端地址 @Nullable override lazy val address: RpcAddress = { if (server != null) RpcAddress(host, server.getPort()) else null } ...}

其中几个重要的变量含义如下:

  • dispatcher:消息分发器。对应接收到的消息,如果是发送给自己管理的 Endpoint ,那么把这个消息放入该 Endpoint 的 Inbox 收件箱;如果是发送给远端的 RpcEnc 管理的 Endpoint,则将该消息放入该 Endpoint 的 Outbox 发件箱。Dispatcher 内部定义了 EndpointData 类,用于维护 Endpoint、EndpointRef 及 Inbox 的对应关系,并通过变量 endpoints 和 endpointRefs 管理

private class EndpointData( val name: String,//Endpoint名称 val endpoint: RpcEndpoint,//Endpoint对象 val ref: NettyRpcEndpointRef//EndpointRef 代理对象 ) { val inbox = new Inbox(ref, endpoint)//Endpoint对象的收件箱 }//key为Endpoint名称,value是EndpointData对象private val endpoints: ConcurrentMap[String, EndpointData] = new ConcurrentHashMap[String, EndpointData]//维护 Endpoint 和 EndpointRef 的对应关系private val endpointRefs: ConcurrentMap[RpcEndpoint, RpcEndpointRef] = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]
    注意:从这里可以看出,对于当前 RpcEnv 管理的 Endpoint,其 Inbox 信息是由 Dispatcher 消息分发器管理的;而对于远端 RpcEnv 管理的 Endpoint,其 Outbox 是由 RpcEnv 管理的
  • streamManager:文件服务。提交任务时的jar包、文件都是这个组件管理的

transportContext:NettyRpcEnv的上下文对象。里面包含了NettyRpcEnv的各种功能

  • clientFactory:如果将来当前节点需要和其它的 RpcServer(TransportServer)进行通信,就需要获取 TransportClient 对象,这里的 clientFactory 工厂对象用来构建 TransportClient 实例对象

  • clientConnectionExecutor:客户端连接的线程池。如果配置了 spark.rpc.connect.threads 参数,线程数采用配置值,如果没有配置采用默认值 64

2.创建 Endpoint 对象,开始其生命周期

这里就是创建 Master 对象

private[deploy] class Master( override val rpcEnv: RpcEnv,//RpcEnv对象 address: RpcAddress,//Endpoint地址,里面封装了host和port webUiPort: Int,//WebUI端口号 val securityMgr: SecurityManager,//安全管理对象 val conf: SparkConf//配置对象 ) extends ThreadSafeRpcEndpoint with Logging with LeaderElectable {
//发送消息的线程,作用就是给自己发消息,启动各种定时任务 private val forwardMessageThread = ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread")
//加载的 hadoop 配置 private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) //TODO Woker连接的超时时间,woker的心跳时间为15秒,默认4次没有接收到心跳则认为该worker宕机了 private val WORKER_TIMEOUT_MS = conf.getLong("spark.worker.timeout", 60) * 1000 //执行完成的app在webui里面保存的数量,默认为200 private val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 200) //保存执行的driver数量,默认为200 private val RETAINED_DRIVERS = conf.getInt("spark.deploy.retainedDrivers", 200) //断开的worker信息保存数量,默认为15 private val REAPER_ITERATIONS = conf.getInt("spark.dead.worker.persistence", 15) //worker恢复模式,默认为NONE private val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE") //executor最大重试次数,默认为10次 private val MAX_EXECUTOR_RETRIES = conf.getInt("spark.deploy.maxExecutorRetries", 10) //Master中存储Worker信息的容器 val workers = new HashSet[WorkerInfo] //存储appID 对应 Application信息的容器 val idToApp = new HashMap[String, ApplicationInfo] //等待执行的Application private val waitingApps = new ArrayBuffer[ApplicationInfo] //所有Application信息 val apps = new HashSet[ApplicationInfo]
//workerID对应的worker节点信息 private val idToWorker = new HashMap[String, WorkerInfo]
//RPC 中注册的 ip 与对应的 worker 信息的对应关系 private val addressToWorker = new HashMap[RpcAddress, WorkerInfo]
//Application的Driver所在节点的EndPointRef代理对象和App信息的对应关系 private val endpointToApp = new HashMap[RpcEndpointRef, ApplicationInfo]
//RPC 中注册的 ip 与对应的 app private val addressToApp = new HashMap[RpcAddress, ApplicationInfo]
//完成的Application private val completedApps = new ArrayBuffer[ApplicationInfo]
//Application 计数器 private var nextAppNumber = 0
//Driver信息容器 private val drivers = new HashSet[DriverInfo]
//完成的Driver private val completedDrivers = new ArrayBuffer[DriverInfo]
// Drivers currently spooled for scheduling //等待执行的Driver private val waitingDrivers = new ArrayBuffer[DriverInfo]
//Driver计数器 private var nextDriverNumber = 0
//host检查 Utils.checkHost(address.host)
//master注册metrics服务 private val masterMetricsSystem = MetricsSystem.createMetricsSystem("master", conf, securityMgr)
//Application注册Metrics服务 private val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications", conf, securityMgr)
//用来收集性能相关的信息,Master实例就是MasterSource的成员 private val masterSource = new MasterSource(this) //WebUI对象,在onStart方法中会初始化 private var webUi: MasterWebUI = null //所有的节点初始化都是 STANDBY 状态,只有经过选举了才会改为 ALIVE 状态 private var state = RecoveryState.STANDBY
//持久化引擎,基于zk实现,实现对zk的增删改查 private var persistenceEngine: PersistenceEngine = _
//帮忙执行 Spark 选举的代理对象,使用zk的客户端框架Curator来实现,调用LeaderLatch.start()方法的时候就执行了选举 //当选举结束,且成为leader后,就回调:isLeader() 方法,如果选举失败,就回调 notLeader() 方法 private var leaderElectionAgent: LeaderElectionAgent = _ ...}

Master 中定义了各种变量,重点看其中几个:

  • WORKER_TIMEOUT_MS:worker连接的超时时间,默认60秒。也就是说,如果超过 60 秒 没有收到 worker 发送的心跳消息,就认为该 worker 下线了。可以通过参数 spark.worker.timeout 设置。worker 的心跳间隔也是由这个变量决定的,即 WORKER_TIMEOUT_MS/4 得到的就是心跳间隔,默认 15 秒

  • RECOVERY_MODE:恢复模式默认为 NONE。但 spark-defaults.conf 配置文件中一般会配置 spark.deploy.recoverMode = ZOOKEEPER,然后在 spark-env.sh 中,设置变量 RECOVER_MODE = spark.deploy.recoverMode = ZOOKEEPER

  • persistenceEngine:持久化引擎。基于 zookeepe r实现,实现对 zookeeper 的增删改查

  • leaderElectionAgent:选举代理对象。属于 HA 范畴,当配置了多个 Master 时,只有一个可以选举成为 Leader,状态为 ALIVE,其余的状态都是 STANDBY。其使用 zk的客户端框架 Curator 来实现,调用LeaderLatch.start()方法的时候就执行了选举。当选举结束,且成为leader后,就回调:isLeader() 方法,如果选举失败,就回调 notLeader() 方法

onStart:开启 Master 生命周期

主要做了三件事:

  1. 启动WebUI

  2. 启动一个定时任务,每隔 60 秒检查一次是否有 worker 失去联系超过 60 秒

  3. 创建一个 Leader 选举的代理对象,并完成选举

①启动WebUI

//WebUI初始化webUi = new MasterWebUI(this, webUiPort)//绑定WebUI地址和端口号(默认8080),真正启动webUi.bind()

②启动定时任务

checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { /** * ref:发送消息给别人 * self:发送消息给自己。Master本身就是一个EndPoint实例 * 这里给自己发送一个CheckForWorkerTimeOut类型的消息,receive方法收到后执行对应的逻辑 */ self.send(CheckForWorkerTimeOut) } //WORKER_TIMEOUT_MS 60秒}, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)

    这里的 scheduleAtFixedRate 就是启动一个定时任务,每隔 60 秒给自己发送一个 CheckForWorkerTimeOut 类型的消息,用来检查是否有 worker 心跳超时。receive 方法接收到这个消息后,会调用 timeOutDeadWorks 方法进行处理

private def timeOutDeadWorkers() { //获取当前时间 val currentTime = System.currentTimeMillis() //查找心跳超时的workers节点,上一次心跳的时间 < 当前时间 - 心跳超时时间 val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT_MS).toArray //对所有心跳超时的worker节点执行下线 for (worker <- toRemove) { if (worker.state != WorkerState.DEAD) { logWarning("Removing %s because we got no heartbeat in %d seconds".format( worker.id, WORKER_TIMEOUT_MS / 1000)) //移除worker removeWorker(worker, s"Not receiving heartbeat for ${WORKER_TIMEOUT_MS / 1000} seconds") } else { if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT_MS)) { workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it } } }}

    该方法首先会查找心跳超时的 worker:最后一次心跳的时间 < 当前时间 - 心跳超时时间

    如果心跳超时,则调用 removeWorker 方法移除该 worker 节点信息

private def removeWorker(worker: WorkerInfo, msg: String) { logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port) //第一步:更新worker节点状态为DEAD worker.setState(WorkerState.DEAD) //第二步:将worker信息从变量中移除 idToWorker -= worker.id addressToWorker -= worker.endpoint.address //第三步:如果待移除的worker上面运行了一些executor,那么要将这些executor转移到别的worker运行 for (exec <- worker.executors.values) { logInfo("Telling app of lost executor: " + exec.id) exec.application.driver.send(ExecutorUpdated( exec.id, ExecutorState.LOST, Some("worker lost"), None, workerLost = true)) exec.state = ExecutorState.LOST exec.application.removeExecutor(exec) } //第四步:如果待移除的worker上面运行了一些driver,那么要将这些driver转移到别的worker运行 for (driver <- worker.drivers.values) { if (driver.desc.supervise) { logInfo(s"Re-launching ${driver.id}") //重新启动driver relaunchDriver(driver) } else { logInfo(s"Not re-launching ${driver.id} because it was not supervised") removeDriver(driver.id, DriverState.ERROR, None) } } logInfo(s"Telling app of lost worker: " + worker.id) apps.filterNot(completedApps.contains(_)).foreach { app => app.driver.send(WorkerRemoved(worker.id, worker.host, msg)) } //第五步:移除zk上的节点信息 persistenceEngine.removeWorker(worker)}

移除 worker 信息主要分为以下几步:

  • 第一步:更新 worker 节点的状态为 DEAD

  • 第二步:将 worker 信息从变量中移除。主要包括 idToWorker 和 addressToWorker

  • 第三步:如果待移除的 worker上面运行了一些 executor,那么要将这些 executor 转移到别的 worker 运行

  • 第四步:如果待移除的 worker上面运行了一些 driver,那么要将这些 driver 转移到别的 worker 运行

  • 第五步:移除 zk 上存储的 worker 信息

③创建选举代理对象,完成选举

val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match { //如果是ZOOKEEPER模式,将persistenceEngine_和leaderElectionAgent_两个对象创建出来 case "ZOOKEEPER" => logInfo("Persisting recovery state to ZooKeeper") //初始化zk的工厂对象 val zkFactory = new ZooKeeperRecoveryModeFactory(conf, serializer) ( //TODO 1.初始化基于zk的持久化引擎 zkFactory.createPersistenceEngine(), //TODO 2.初始化基于zk的Leader选举代理 zkFactory.createLeaderElectionAgent(this)) case "FILESYSTEM" => val fsFactory = new FileSystemRecoveryModeFactory(conf, serializer) (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this)) case "CUSTOM" => val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory")) val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer]) .newInstance(conf, serializer) .asInstanceOf[StandaloneRecoveryModeFactory] (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this)) case _ => (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))}

    这里根据 RECOVERY_MODE 恢复模式,执行不同的逻辑。我们以 ZOOKEEPER 模式为例:首先会初始化 zk 的工厂对象,然后初始化基于 zk 的持久化引擎和 Leader 选举代理

    zkFactory.createPersistenceEngine(),内部创建了 ZooKeeperPersistenceEngine 实例

private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serializer: Serializer) extends PersistenceEngine with Logging {
private val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status" //TODO 获取了一个zk的客户端,用于zk的管理,这里使用的就是zk的客户端框架Curator private val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)
//创建zk的 /spark/master_status znode节点 SparkCuratorUtil.mkdir(zk, WORKING_DIR) ...}

    这里主要是使用 Curator 框架获取了一个 zk 的客户端,并在 zk 上 创建了 /spark/master_status 节点

    zkFactory.createLeaderElectionAgent(this)),内部初始化了 ZooKeeperLeaderElectionAgent

private[master] class ZooKeeperLeaderElectionAgent(val masterInstance: LeaderElectable, conf: SparkConf) extends LeaderLatchListener with LeaderElectionAgent with Logging { //用于选举的节点路径 /spark/leader_election val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election" //zk客户端对象 private var zk: CuratorFramework = _ private var leaderLatch: LeaderLatch = _ //选举状态,初始为没有Leader private var status = LeadershipStatus.NOT_LEADER
start()
private def start() { logInfo("Starting ZooKeeper LeaderElection agent") //初始化zk的客户端 zk = SparkCuratorUtil.newClient(conf) //封装一个LeaderLatch leaderLatch = new LeaderLatch(zk, WORKING_DIR) //绑定当前的this对象为监听器,当leader选举成功,会调用该监听器的isLeader()方法 leaderLatch.addListener(this) //启动选举 leaderLatch.start() } ...}

初始化 ZooKeeperLeaderElectionAgent 对象的过程中,会调用其 start 方法,该方法的作用是:

  • 初始化 zk 客户端

  • 初始化 LeaderLatch 对象

  • 绑定当前的 this 对象为监听器,当 leader 选举成功,会调用该监听器的 isLeader 方法

  • 开启 Leader 选举

    对于 Leader 的选举流程这里先不做具体分析,我们直接看选举成功后,isLeader 方法的逻辑

override def isLeader() { synchronized { //确认是否产生了Leader if (!leaderLatch.hasLeadership) { return }
logInfo("We have gained leadership") //TODO 修改Leader状态 updateLeadershipStatus(true) }}

    首先会确认是否产生了 Leader,如果确实产生了,调用 updateLeadershipStatus 方法:

private def updateLeadershipStatus(isLeader: Boolean) { //如果成为了Leader,且之前是没有Leader的 if (isLeader && status == LeadershipStatus.NOT_LEADER) { //更新当前Master节点的状态为Leader status = LeadershipStatus.LEADER //给Master发送消息:当前的代理已经帮你竞选成为 Active Leader 了 masterInstance.electedLeader()
//如果成为Leader被撤销了 } else if (!isLeader && status == LeadershipStatus.LEADER) { status = LeadershipStatus.NOT_LEADER masterInstance.revokedLeadership() }}
传入的参数 isLeader 为 true,所以只看成为 Leader 的分支
①更新当前 Master 的状态为 Leader
②调用 Master.electedLeader 方法,其作用就是给自己发送一个 ElectedLeader 类型的消息。receive 方法处理的逻辑如下:
case ElectedLeader => //选举成功后,根据zk中是否有app,driver,worker等信息更新自己的状态 val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)
state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {    //如果这些信息都没有,说明是刚启动的选举,更新状态为ALIVE RecoveryState.ALIVE } else { //如果是恢复,即当前的alive master死掉了,standby master顶替上来。更新状态为RECOVERING RecoveryState.RECOVERING } logInfo("I have been elected leader! New state: " + state) //如果是恢复状态 if (state == RecoveryState.RECOVERING) { //启动恢复 beginRecovery(storedApps, storedDrivers, storedWorkers) //延时任务调度,延时60秒 ,向自己发送CompleteRecovery消息 recoveryCompletionTask = forwardMessageThread.schedule(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { self.send(CompleteRecovery) } }, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS) }

这里首先会尝试从 zk 中查看是否有 Application、Driver、Worker 等信息

  • 如果没有,说明是刚启动时的选举,直接更新 Master 状态为 ALIVE

  • 如果有,说明是恢复状态,即当前的 alive master 死掉了,standby master顶替上来,更新状态为RECOVERING。然后会启动恢复,并延时 60 秒向自己发送 CompleteRecovery 类型的消息

如何启动恢复:beginRecovery

    主要是三类角色的恢复:

  • Application 的恢复

  • Driver 的恢复

  • Worker 的恢复

private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo], storedWorkers: Seq[WorkerInfo]) { //TODO 1.Application的恢复 //遍历所有的Application信息 for (app <- storedApps) { logInfo("Trying to recover app: " + app.id) try { //在切换的Master上注册Application registerApplication(app) //更新应用状态为UNKNOW app.state = ApplicationState.UNKNOWN //给应用的Driver节点发送MasterChanged消息 app.driver.send(MasterChanged(self, masterWebUiUrl)) } catch { case e: Exception => logInfo("App " + app.id + " had exception on reconnect") } } //TODO 2.Driver的恢复 for (driver <- storedDrivers) { // Here we just read in the list of drivers. Any drivers associated with now-lost workers // will be re-launched when we detect that the worker is missing. drivers += driver } //TODO 3.Worker的恢复 for (worker <- storedWorkers) { logInfo("Trying to recover worker: " + worker.id) try { registerWorker(worker) worker.state = WorkerState.UNKNOWN worker.endpoint.send(MasterChanged(self, masterWebUiUrl)) } catch { case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect") } }}
     receive 方法如何处理 CompleteRecovery 类型的消息:调用 completeRecovery 方法
private def completeRecovery() { //验证 Master 状态,不为RECOVERING直接返回 if (state != RecoveryState.RECOVERING) { return } //更新状态为COMPLETING_RECOVERY state = RecoveryState.COMPLETING_RECOVERY
//将UNKNOWN状态的Worker和Application移除 workers.filter(_.state == WorkerState.UNKNOWN).foreach( removeWorker(_, "Not responding for recovery")) apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)
//将WAITING状态的Application更新为RUNNING状态 apps.filter(_.state == ApplicationState.WAITING).foreach(_.state = ApplicationState.RUNNING)
//给未分配Worker的Driver重新分配Worker drivers.filter(_.worker.isEmpty).foreach { d => logWarning(s"Driver ${d.id} was not found after master recovery") if (d.desc.supervise) { logWarning(s"Re-launching ${d.id}") relaunchDriver(d) } else { removeDriver(d.id, DriverState.ERROR, None) logWarning(s"Did not re-launch ${d.id} because it was not supervised") } } //更新 Master 状态为 ALIVE state = RecoveryState.ALIVE //调度资源 schedule() logInfo("Recovery complete - resuming operations!")}

该方法的逻辑是:

  • 验证 Master 状态

  • 更新 Master 状态为COMPLETING_RECOVERY

  • 移除 UNKNOWN 状态的 Worker 和 Application

  • 将 WAITING 状态的 Application 更新为 RUNNING 状态

  • 给未分配 Worker 的 Driver 重新分配 Worker

  • 更新 Master 状态为 ALIVE

  • 调度资源

    这里 schedule 方法的作用是:给等待的应用安排可用的资源。每当有新应用加入或者资源可用性发生改变时都会被调用,后续再进行详细分析。

3.将 Endpoint 对象注册到消息分发器 Dispatcher
val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))

    这里调用的是 NettyRpcEnv.setupEndpoint 方法,内部调用了 Dispatcher.registerEndpoint 方法

//注册 Endpoint 对象,即将 Endpoint 对象信息添加到各个变量中// 并获取对应的 EndpointRef 代理对象def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = { val addr = RpcEndpointAddress(nettyEnv.address, name) val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv) synchronized { if (stopped) { throw new IllegalStateException("RpcEnv has been stopped") } if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) { throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name") } //获取EndpointData对象 val data = endpoints.get(name) endpointRefs.put(data.endpoint, data.ref) receivers.offer(data) // for the OnStart message } endpointRef}
4.调用 EndpointRef 代理对象的 askSync 方法,同步等待返回
val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest)
    这里就是给自己发送了一个 BoundPortsRequest 类型的消息。处理逻辑也比较简单:封装了一个 BoundPortsResponse 对象,包含 Master 端口,WebUI 端口,restServer 端口三个属性
case BoundPortsRequest => context.reply(BoundPortsResponse(address.port, webUi.boundPort, restServerBoundPort))


以上是关于Spark 启动 | 从启动脚本分析 Master 的启动流程的主要内容,如果未能解决你的问题,请参考以下文章

Spark学习之路 (十五)SparkCore的源码解读启动脚本[转]

Spark Standalone如何通过start-all.sh启动集群

Spark启动流程(Standalone)-分析

spark源码阅读 启动代码阅读

spark submit

Spark 启动 | Worker 启动流程详解