Spark 启动 | Worker 启动流程详解

Posted 大数据记事本

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark 启动 | Worker 启动流程详解相关的知识,希望对你有一定的参考价值。

一、往期回顾

    从上一篇对 Spark 启动脚本的分析中可以知道,Worker 类的全限定名称如下:
java org.apache.spark.deploy.worker.Worker

    启动 Worker 实际执行的就是伴生对象的 main 方法。

    Worker 和 Master 都是 Endpoint 的子类,所以启动流程很相似,即创建 RpcEnv 对象,然后创建 Worker 对象并执行 onStart 方法开启其生命周期。

    由于 Master 和 Worker 采用的是主从架构,所以 Worker 启动后最重要的事就是向 Master 注册,注册成功后定期发送心跳消息告诉 Master 自己活着。

二、Worker 启动流程图解

三、Worker 启动流程源码详解

Worker 伴生对象的 main 方法:

def main(argStrings: Array[String]) { Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler( exitOnUncaughtException = false)) Utils.initDaemon(log) //第一步:解析参数 //创建conf对象,读取spark默认的配置参数 val conf = new SparkConf
//解析程序运行参数,读取以spark. 开头的环境变量 val args = new WorkerArguments(argStrings, conf)
//TODO 第二步:启动 Worker 的 RPC 服务端 val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores, args.memory, args.masters, args.workDir, conf = conf)
val externalShuffleServiceEnabled = conf.get(config.SHUFFLE_SERVICE_ENABLED) val sparkWorkerInstances = scala.sys.env.getOrElse("SPARK_WORKER_INSTANCES", "1").toInt require(externalShuffleServiceEnabled == false || sparkWorkerInstances <= 1, "Starting multiple workers on one host is failed because we may launch no more than one " + "external shuffle service on each host, please set spark.shuffle.service.enabled to " + "false or set SPARK_WORKER_INSTANCES to 1 to resolve the conflict.") rpcEnv.awaitTermination()}

该方法主要做了两件事:

  • 解析参数:包括配置文件、程序运行参数及环境变量

  • 启动 Worker 的 RPC 服务端

    这里启动 Worker 的 RPC 服务端调用的是 startRpcEnvAndEndpoint 方法,逻辑如下:

def startRpcEnvAndEndpoint( host: String, port: Int, webUiPort: Int, cores: Int, memory: Int, masterUrls: Array[String], workDir: String, workerNumber: Option[Int] = None, conf: SparkConf = new SparkConf): RpcEnv = {
val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("") //初始化SecurityManager val securityMgr = new SecurityManager(conf)
//TODO 创建RpcEnv对象 val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
//获取可能存在的多个 Master 的地址 val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))
//TODO 创建EndPoint对象并启动,用于网络通信 // 做两件事: // 1.创建Worker对象,调用onStart 方法开启生命周期 // 2.Worker 对象向 rpcEnv 中的 Dispatcher 对象注册 rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory, masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr)) rpcEnv}

该方法主要包括三个步骤:

  • 第一步:创建 RpcEnv 对象

  • 第二步:创建 Worker 对象并调用 onStart 方法开启生命周期

  • 第三步:将 Worker 对象注册到 RpcEnv 管理的 Dispatcher 对象中

    其中,第一步和第三步在分析 Master 启动流程时已经进行了详细说明,这里不再赘述。重点看 Worker 对象的初始化及其 onStart 方法

注意:由于高可用的机制,一个 Worker 可能对应多个 Master,其中只有一个状态是 ALIVE,其它状态都是 STANDBY

Worker 定义:

private[deploy] class Worker( override val rpcEnv: RpcEnv, webUiPort: Int, cores: Int,//总核数 memory: Int,//总的可用内存 masterRpcAddresses: Array[RpcAddress],//Master地址,可能存在多个 endpointName: String, workDirPath: String = null,//工作目录路径 val conf: SparkConf, val securityMgr: SecurityManager, externalShuffleServiceSupplier: Supplier[ExternalShuffleService] = null) extends ThreadSafeRpcEndpoint with Logging {
//设置主机名和端口号 private val host = rpcEnv.address.host private val port = rpcEnv.address.port
//主机名和端口号合法性验证 Utils.checkHost(host) assert (port > 0)
//进行发送消息调度的线程池 private val forwordMessageScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-forward-message-scheduler")
//也是一个线程池,完成清理工作的线程池。清理任务执行过程中的jar包,文件等,每隔30分钟执行一次 private val cleanupThreadExecutor = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonSingleThreadExecutor("worker-cleanup-thread"))
//心跳间隔时间,用 60/4 = 15秒 private val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4
private val INITIAL_REGISTRATION_RETRIES = 6 private val TOTAL_REGISTRATION_RETRIES = INITIAL_REGISTRATION_RETRIES + 10 private val FUZZ_MULTIPLIER_INTERVAL_LOWER_BOUND = 0.500 private val REGISTRATION_RETRY_FUZZ_MULTIPLIER = { val randomNumberGenerator = new Random(UUID.randomUUID.getMostSignificantBits) randomNumberGenerator.nextDouble + FUZZ_MULTIPLIER_INTERVAL_LOWER_BOUND } private val INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS = (math.round(10 * REGISTRATION_RETRY_FUZZ_MULTIPLIER)) private val PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS = (math.round(60 * REGISTRATION_RETRY_FUZZ_MULTIPLIER))
private val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", false)
//cleanup定时任务的周期:30分钟 private val CLEANUP_INTERVAL_MILLIS = conf.getLong("spark.worker.cleanup.interval", 60 * 30) * 1000
//应用文件夹/数据的ttl,ttl过期时间为7天,过期后将被清理 private val APP_DATA_RETENTION_SECONDS = conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)
//是否清理 executor 存在的非 shuffer 产生的文件,默认为true private val CLEANUP_NON_SHUFFLE_FILES_ENABLED = conf.getBoolean("spark.storage.cleanupFilesAfterExecutorExit", true)
private val testing: Boolean = sys.props.contains("spark.testing")
//Master节点的RpcEndpointRef代理对象 private var master: Option[RpcEndpointRef] = None
private val preferConfiguredMasterAddress = conf.getBoolean("spark.worker.preferConfiguredMasterAddress", false)
private var masterAddressToConnect: Option[RpcAddress] = None private var activeMasterUrl: String = "" private[worker] var activeMasterWebUiUrl : String = "" private var workerWebUiUrl: String = "" private val workerUri = RpcEndpointAddress(rpcEnv.address, endpointName).toString //标识该worker节点是否注册成功 private var registered = false private var connected = false //生成workerID private val workerId = generateWorkerId() private val sparkHome = if (testing) { assert(sys.props.contains("spark.test.home"), "spark.test.home is not set!") new File(sys.props("spark.test.home")) } else { new File(sys.env.get("SPARK_HOME").getOrElse(".")) }
//工作目录 var workDir: File = null
//当前Worker上完成的Executor的集合 val finishedExecutors = new LinkedHashMap[String, ExecutorRunner]
//当前Worker上的Driver集合 val drivers = new HashMap[String, DriverRunner]
//当前Worker上executor集合 val executors = new HashMap[String, ExecutorRunner]
//当前Worker上完成的Driver集合 val finishedDrivers = new LinkedHashMap[String, DriverRunner]
//当前Worker上Application的工作文件夹 val appDirectories = new HashMap[String, Seq[String]]
//当前Worker上已完成的Application val finishedApps = new HashSet[String]
val retainedExecutors = conf.getInt("spark.worker.ui.retainedExecutors", WorkerWebUI.DEFAULT_RETAINED_EXECUTORS) val retainedDrivers = conf.getInt("spark.worker.ui.retainedDrivers", WorkerWebUI.DEFAULT_RETAINED_DRIVERS) //TODO 启动shuffle服务 private val shuffleService = if (externalShuffleServiceSupplier != null) { externalShuffleServiceSupplier.get() } else { //默认实现 new ExternalShuffleService(conf, securityMgr) }
private val publicAddress = { val envVar = conf.getenv("SPARK_PUBLIC_DNS") if (envVar != null) envVar else host } private var webUi: WorkerWebUI = null
private var connectionAttemptCount = 0
//性能监控的组件 private val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf, securityMgr) private val workerSource = new WorkerSource(this)
val reverseProxy = conf.getBoolean("spark.ui.reverseProxy", false)
private var registerMasterFutures: Array[JFuture[_]] = null private var registrationRetryTimer: Option[JScheduledFuture[_]] = None
//向Master注册的线程池,由于向Master注册是同步阻塞的,所以线程个数和Master的个数是一致的 private val registerMasterThreadPool = ThreadUtils.newDaemonCachedThreadPool( "worker-register-master-threadpool", masterRpcAddresses.length // Make sure we can register with all masters at the same time )
//用于记录资源 //已使用cpu个数 var coresUsed = 0 //已使用内存大小 var memoryUsed = 0 ...}

可以看到,Worker 中定义了多个属性。重点看以下几个:

  • cores:cup总个数

  • memory:可用内存

  • workDirPath:工作目录路径

  • forwordMessageScheduler:进行发送消息调度的线程池

  • cleanupThreadExecutor:完成清理工作的线程池。清理任务执行过程中上传的 jar 包、文件等,清理周期由参数 spark.worker.cleanup.interval 设置,默认每隔 30 分钟清理一次

  • HEARTBEAT_MILLIS:心跳的间隔时间。计算逻辑是 spark.worker.timeout / 4,即 60 /4 = 15 秒

  • APP_DATA_RETENTION_SECONDS:应用文件夹/数据的过期时间。由参数 spark.worker.cleanup.appDataTtl 配置,默认为 7 天

  • master:ALIVE 状态的 Master 的代理对象

  • registered:标识该 Worker 是否注册成功

  • drivers:当前 Worker 上的 Driver 集合

  • executors:当前 Worker 上的 Executor 集合

  • shuffleService:shuffle 服务对象。后面会进行单独分析

  • coreUsed:已使用的 CPU 个数

  • memoryUsed:已使用的内存大小

    由于 Worker 是真正执行任务的,所以它会记录自身的可用资源,以便向 Master 报告

开启 Worker 生命周期:onStart 方法
override def onStart() { //验证该Worker未注册 assert(!registered) logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format( host, port, cores, Utils.megabytesToString(memory))) logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}") logInfo("Spark home: " + sparkHome)
//TODO 第一步:创建当前Worker的工作目录 createWorkDir()
//TODO 第二步:启动shuffle服务 startExternalShuffleService()
//TODO 第三步:启动WebUI,绑定端口 webUi = new WorkerWebUI(this, workDir, webUiPort) webUi.bind() workerWebUiUrl = s"http://$publicAddress:${webUi.boundPort}"
//TODO 第四步:向Master注册 registerWithMaster()
//TODO 第五步:注册指标监控系统并启动 metricsSystem.registerSource(workerSource) metricsSystem.start() metricsSystem.getServletHandlers.foreach(webUi.attachHandler)}

onStart 方法主要分为五步:

  • 第一步:创建当前 Worker 的工作目录。也就是在 SPARK_HOME 目录下创建一个 work 目录

  • 第二步:启动 shuffle 服务

  • 第三步:启动 WebUI,绑定端口

  • 第四步:向 Master 注册

  • 第五步:注册指标监控系统并启动

    这里重点分析如何向 Master 进行注册,至于如何启动 shuffle 服务,后面单独分析

向 Master 注册调用了 registerWithMaster 方法:

private def registerWithMaster() {
registrationRetryTimer match { case None => //如果还未注册 registered = false //TODO 开启注册,多次重试直到注册成功 registerMasterFutures = tryRegisterAllMasters() connectionAttemptCount = 0
//如果没有注册成功,就启动一个定时任务,每隔一段时间([5,15]之间的随机整数)重新注册一次 registrationRetryTimer = Some(forwordMessageScheduler.scheduleAtFixedRate( new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { Option(self).foreach(_.send(ReregisterWithMaster)) } }, //[5,15]之间的随机整数 INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS, INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS, TimeUnit.SECONDS)) case Some(_) => logInfo("Not spawning another attempt to register with the master, since there is an" + " attempt scheduled already.") }}
  • 如果从未注册过,则启动注册;

  • 如果没有注册成功,则启动一个定时任务,每隔一段时间给自己发送一个 ReregisterWithMaster 类型的消息

启动注册:tryRegisterAllMasters

private def tryRegisterAllMasters(): Array[JFuture[_]] = { //向每个Master提交注册信息 masterRpcAddresses.map { masterAddress => //往线程池中提交了一个注册的任务 registerMasterThreadPool.submit(new Runnable { override def run(): Unit = { try { logInfo("Connecting to master " + masterAddress + "...")
//获取到MasterEndPointRef代理对象 val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
//发送RegisterWorker注册消息给Master sendRegisterMessageToMaster(masterEndpoint) } catch { case ie: InterruptedException => // Cancelled case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e) } } }) }}

Master 接收注册消息的处理逻辑如下:

case RegisterWorker(id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, masterAddress) => logInfo("Registering worker %s:%d with %d cores, %s RAM".format( workerHost, workerPort, cores, Utils.megabytesToString(memory)))
//如果是STANDBY状态的Master if (state == RecoveryState.STANDBY) {
//什么都不做,给Worker返回一个StandBy workerRef.send(MasterInStandby)
//判断该Worker是否注册过 } else if (idToWorker.contains(id)) {
//如果注册过了,就返回一个RegisterWorkerFailed消息 workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))
//如果没有注册过,则完成注册 } else {
//初始化一个WorkerInfo对象,存储了Worker的信息 val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, workerRef, workerWebUiUrl)
//执行Worker的注册,将Worker信息放入各种数据结构中 //如果注册成功 if (registerWorker(worker)) {
//把Worker信息写到zk集群 persistenceEngine.addWorker(worker)
//给Worker发送RegisteredWorker注册成功的消息 workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress))
//Worker加入工作,当Worker加入时,可用资源必定增加,那么就将等待执行的Driver,Executor等启动起来执行任务 schedule() } else { val workerAddress = worker.endpoint.address logWarning("Worker registration failed. Attempted to re-register worker at same " + "address: " + workerAddress) workerRef.send(RegisterWorkerFailed("Attempted to re-register worker at same address: " + workerAddress)) } }
逻辑如下:
  • 如果 Master 的状态为 STANDBY,则给 Worker 返回一个 MasterInStandby 消息,Worker 接收该类型消息什么都不会做
  • 如果 Master 状态为 ALIVE,但该 Worker 已经注册过,就返回一个 RegisterWorkerFailed 类型的消息,Worker 接收该类型消息后,如果 Worker 已经注册过,则什么都不做,如果没有注册过则记录错误日志
  • 如果 Master 状态为 ALIVE,且该 Worker 没有注册过,则完成 Worker 注册:
    • 第一步:将 Worker 信息封装成一个 WorkerInfo 对象
    • 第二步:进行 Worker 注册,即将 Worker 信息保存到 Master 的各种数据结构中
    • 第三步:如果注册成功,则将 Worker 信息写到 zk 集群
    • 第四步:给 Worker 发送 RegisteredWorker 类型的消息
    • 第五步:调度资源,让等待资源的 Driver 和 Executor 执行任务
Worker 接收 RegisteredWorker 类型消息的处理方法是:handleRegisterResponse
private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized { msg match {
//接收到注册成功的消息 case RegisteredWorker(masterRef, masterWebUiUrl, masterAddress) => if (preferConfiguredMasterAddress) { logInfo("Successfully registered with master " + masterAddress.toSparkURL) } else { logInfo("Successfully registered with master " + masterRef.address.toSparkURL) }
//标记注册成功 registered = true
//Worker会同时给两个Master发送注册信息,如果Master节点发生变更,那么这里就会更新Master的信息 changeMaster(masterRef, masterWebUiUrl, masterAddress)
//Worker注册成功后,启动定时任务,每隔15秒执行一次,给自己发送一个SendHeartbeat消息 forwordMessageScheduler.scheduleAtFixedRate(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { self.send(SendHeartbeat) } }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS) //如果配置了定期清理工作目录,则启动定时任务,延时30分钟后,每隔30分钟执行一次 if (CLEANUP_ENABLED) { logInfo( s"Worker cleanup enabled; old application directories will be deleted in: $workDir") forwordMessageScheduler.scheduleAtFixedRate(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { self.send(WorkDirCleanup) } }, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS) }
val execs = executors.values.map { e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state) } //给Master发送WorkerLatestState类型的消息 masterRef.send(WorkerLatestState(workerId, execs.toList, drivers.keys.toSeq))
... }}

如果接收到注册成功的消息,会执行以下步骤:

  • 第一步:更新 Worker 的 registered 变量,标记注册成功

  • 第二步:更新 Master 信息。Worker会同时给两个Master发送注册信息,如果Master节点发生变更,那么这里就会更新Master的信息

  • 第三步:启动心跳定时任务,每隔 15 秒给自己发送一个 SendHeartbeat 消息,自身接收到该消息,会给 Master 发送 Heartbeat 类型的消息

  • 第四步:如果开启了定期清理工作目录,则启动定时任务,延时30分钟后,每隔30分钟执行一次清理任务

  • 第五步:将所有 Executor 封装成 ExecutorDescription 对象,然后向 Master 发送 WorkerLatestState 类型的消息

    上面向 Master 发送了两种类型的消息:Heartbeat 和 WorkerLatestState ,下面看 Master 是如何处理的

Heartbeat :

case Heartbeat(workerId, worker) =>
//根据WorkerID获取Worker信息 idToWorker.get(workerId) match {
//如果获取到了则更新最后一次心跳的时间 case Some(workerInfo) => workerInfo.lastHeartbeat = System.currentTimeMillis()
//如果没有获取到,说明还没有注册 则给Worker发送ReconnectWorker消息 case None => if (workers.map(_.id).contains(workerId)) { logWarning(s"Got heartbeat from unregistered worker $workerId." + " Asking it to re-register.") worker.send(ReconnectWorker(masterUrl)) } else { logWarning(s"Got heartbeat from unregistered worker $workerId." + " This worker was never registered, so ignoring the heartbeat.") } }

如果是 Heartbeat 类型的消息,Master 会判断该 Worker 是否注册:

  • 如果注册了,会更新最后一个心跳的时间

  • 如果没有注册,会给 Worker 返回一个 ReconnectWorker 类型的消息。Worker 处理该消息的逻辑就是调用上面的 registerWithMaster 方法进行注册

WorkerLatestState :

case WorkerLatestState(workerId, executors, driverIds) => //根据workerId获取Worker信息 idToWorker.get(workerId) match { //如果获取到了指定的 Worker 信息 case Some(worker) => //遍历所有的Executor信息,看Master保存的Worker信息中是否包含该Executor信息, // 如果不包含,则给Worker发送KillExecutor消息,让其关闭该Executor for (exec <- executors) { val executorMatches = worker.executors.exists { case (_, e) => e.application.id == exec.appId && e.id == exec.execId } if (!executorMatches) { // master doesn't recognize this executor. So just tell worker to kill it. worker.endpoint.send(KillExecutor(masterUrl, exec.appId, exec.execId)) } } //遍历所有的Driver信息,看Master保存的Worker信息中是否包含该Driver信息, // 如果不包含,则给Worker发送KillDriver消息,让其关闭该Driver for (driverId <- driverIds) { val driverMatches = worker.drivers.exists { case (id, _) => id == driverId } if (!driverMatches) { // master doesn't recognize this driver. So just tell worker to kill it. worker.endpoint.send(KillDriver(driverId)) } } //如果没有获取到则记录日志 case None => logWarning("Worker state from unknown worker: " + workerId) }

    该分支的作用是检查当前 Worker 上的 Executor 和 Driver 是否和 Master 端记录的一致,如果不一致则关闭 Executor 和 Driver

注意:从这里也可以看出,在 Master 端维护的 WorkerInfo 信息中,包含了该 Worker 上运行的所有 Executor 和 Driver 信息

最后看一下如果 Worker 注册不成功,如何重新注册。调用的是

Worker.reregisterWithMaster 方法

private def reregisterWithMaster(): Unit = { Utils.tryOrExit { //更新尝试注册的次数 connectionAttemptCount += 1 //如果已经注册成功,取消最后一次重新注册的操作 if (registered) { cancelLastRegistrationRetry() //如果重试次数小于阈值(默认16) } else if (connectionAttemptCount <= TOTAL_REGISTRATION_RETRIES) { logInfo(s"Retrying connection to master (attempt # $connectionAttemptCount)")
master match { case Some(masterRef) => //如果Worker中保存了Master信息,但是注册失败了,说明失去了和Master的连接,所以代理对象不可用,需要重新创建代理对象 if (registerMasterFutures != null) { registerMasterFutures.foreach(_.cancel(true)) } //重新获取Master代理对象 val masterAddress = if (preferConfiguredMasterAddress) masterAddressToConnect.get else masterRef.address //重新向线程池添加注册Worker的任务 registerMasterFutures = Array(registerMasterThreadPool.submit(new Runnable { override def run(): Unit = { try { logInfo("Connecting to master " + masterAddress + "...") val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME) //给Master发送RegisterWorker消息 sendRegisterMessageToMaster(masterEndpoint) } catch { case ie: InterruptedException => // Cancelled case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e) } } })) case None => if (registerMasterFutures != null) { registerMasterFutures.foreach(_.cancel(true)) } // We are retrying the initial registration //尝试进行注册 registerMasterFutures = tryRegisterAllMasters() } // We have exceeded the initial registration retry threshold // All retries from now on should use a higher interval //如果达到了初始的重试次数阈值(默认为6),那么应该加大重试间隔 if (connectionAttemptCount == INITIAL_REGISTRATION_RETRIES) { //取消注册重试计时 registrationRetryTimer.foreach(_.cancel(true)) //重新设置注册的重试计时,重试间隔增大到[30,90]之间的一个随机数,假设为m //那么延时m秒后,每隔m秒重试一次 registrationRetryTimer = Some( forwordMessageScheduler.scheduleAtFixedRate(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { self.send(ReregisterWithMaster) } }, PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS, PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS, TimeUnit.SECONDS)) } } else { //如果超过重试次数阈值,则记录错误日志 logError("All masters are unresponsive! Giving up.") System.exit(1) } }}

该方法的逻辑是:

  • 第一步:更新重试注册的计数器

  • 第二步:如果已经注册成功了,那么取消最后一次的重新注册操作

  • 第三步:如果超过了注册重试的总次数(默认16次),则记录错误日志

  • 第四步:如果没有超过注册重试的总次数,判断是否有 Master 代理信息,如果没有,且没有注册失败的信息,则尝试进行初始注册

  • 第五步:如果有 Master 代理信息,但是注册失败了,说明失去了和 Master 的连接,那么需要重新获取 Master 的代理对象。然后使用重新获取的代理对象,向 Master 发送注册消息(这里是将注册的任务重新添加到管理注册线程的线程池里面)

  • 第六步:如果注册重试的次数达到了初始的重试次数阈值(默认6次),则重新设置注册的重试计时,增大重试的间隔时间,这个时间是 [30,90] 之前的一个随机整数,假设为m,单位是秒。然后启动定时任务,在延时 m 秒之后,每个 m 秒重新注册一次,直到达到总的重试次数。

以上是关于Spark 启动 | Worker 启动流程详解的主要内容,如果未能解决你的问题,请参考以下文章

Spark启动流程(Standalone)-分析

Spark——Spark执行流程

Spark 启动 | 从启动脚本分析 Master 的启动流程

Spark Web UI 监控详解

Spark2.0源码学习-5.Worker启动

spark job提交执行流程