Spark 启动 | Worker 启动流程详解
Posted 大数据记事本
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark 启动 | Worker 启动流程详解相关的知识,希望对你有一定的参考价值。
一、往期回顾
java org.apache.spark.deploy.worker.Worker
启动 Worker 实际执行的就是伴生对象的 main 方法。
Worker 和 Master 都是 Endpoint 的子类,所以启动流程很相似,即创建 RpcEnv 对象,然后创建 Worker 对象并执行 onStart 方法开启其生命周期。
由于 Master 和 Worker 采用的是主从架构,所以 Worker 启动后最重要的事就是向 Master 注册,注册成功后定期发送心跳消息告诉 Master 自己活着。
二、Worker 启动流程图解
Worker 伴生对象的 main 方法:
def main(argStrings: Array[String]) {
Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(
exitOnUncaughtException = false))
Utils.initDaemon(log)
//第一步:解析参数
//创建conf对象,读取spark默认的配置参数
val conf = new SparkConf
//解析程序运行参数,读取以spark. 开头的环境变量
val args = new WorkerArguments(argStrings, conf)
//TODO 第二步:启动 Worker 的 RPC 服务端
val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores,
args.memory, args.masters, args.workDir, conf = conf)
val externalShuffleServiceEnabled = conf.get(config.SHUFFLE_SERVICE_ENABLED)
val sparkWorkerInstances = scala.sys.env.getOrElse("SPARK_WORKER_INSTANCES", "1").toInt
require(externalShuffleServiceEnabled == false || sparkWorkerInstances <= 1,
"Starting multiple workers on one host is failed because we may launch no more than one " +
"external shuffle service on each host, please set spark.shuffle.service.enabled to " +
"false or set SPARK_WORKER_INSTANCES to 1 to resolve the conflict.")
rpcEnv.awaitTermination()
}
该方法主要做了两件事:
解析参数:包括配置文件、程序运行参数及环境变量
启动 Worker 的 RPC 服务端
这里启动 Worker 的 RPC 服务端调用的是 startRpcEnvAndEndpoint 方法,逻辑如下:
def startRpcEnvAndEndpoint(
host: String,
port: Int,
webUiPort: Int,
cores: Int,
memory: Int,
masterUrls: Array[String],
workDir: String,
workerNumber: Option[Int] = None,
conf: SparkConf = new SparkConf): RpcEnv = {
val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
//初始化SecurityManager
val securityMgr = new SecurityManager(conf)
//TODO 创建RpcEnv对象
val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
//获取可能存在的多个 Master 的地址
val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))
//TODO 创建EndPoint对象并启动,用于网络通信
// 做两件事:
// 1.创建Worker对象,调用onStart 方法开启生命周期
// 2.Worker 对象向 rpcEnv 中的 Dispatcher 对象注册
rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,
masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr))
rpcEnv
}
该方法主要包括三个步骤:
第一步:创建 RpcEnv 对象
第二步:创建 Worker 对象并调用 onStart 方法开启生命周期
第三步:将 Worker 对象注册到 RpcEnv 管理的 Dispatcher 对象中
其中,第一步和第三步在分析 Master 启动流程时已经进行了详细说明,这里不再赘述。重点看 Worker 对象的初始化及其 onStart 方法
注意:由于高可用的机制,一个 Worker 可能对应多个 Master,其中只有一个状态是 ALIVE,其它状态都是 STANDBY
Worker 定义:
private[deploy] class Worker(
override val rpcEnv: RpcEnv,
webUiPort: Int,
cores: Int,//总核数
memory: Int,//总的可用内存
masterRpcAddresses: Array[RpcAddress],//Master地址,可能存在多个
endpointName: String,
workDirPath: String = null,//工作目录路径
val conf: SparkConf,
val securityMgr: SecurityManager,
externalShuffleServiceSupplier: Supplier[ExternalShuffleService] = null)
extends ThreadSafeRpcEndpoint with Logging {
//设置主机名和端口号
private val host = rpcEnv.address.host
private val port = rpcEnv.address.port
//主机名和端口号合法性验证
Utils.checkHost(host)
assert (port > 0)
//进行发送消息调度的线程池
private val forwordMessageScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-forward-message-scheduler")
//也是一个线程池,完成清理工作的线程池。清理任务执行过程中的jar包,文件等,每隔30分钟执行一次
private val cleanupThreadExecutor = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonSingleThreadExecutor("worker-cleanup-thread"))
//心跳间隔时间,用 60/4 = 15秒
private val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4
private val INITIAL_REGISTRATION_RETRIES = 6
private val TOTAL_REGISTRATION_RETRIES = INITIAL_REGISTRATION_RETRIES + 10
private val FUZZ_MULTIPLIER_INTERVAL_LOWER_BOUND = 0.500
private val REGISTRATION_RETRY_FUZZ_MULTIPLIER = {
val randomNumberGenerator = new Random(UUID.randomUUID.getMostSignificantBits)
randomNumberGenerator.nextDouble + FUZZ_MULTIPLIER_INTERVAL_LOWER_BOUND
}
private val INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS = (math.round(10 *
REGISTRATION_RETRY_FUZZ_MULTIPLIER))
private val PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS = (math.round(60
* REGISTRATION_RETRY_FUZZ_MULTIPLIER))
private val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", false)
//cleanup定时任务的周期:30分钟
private val CLEANUP_INTERVAL_MILLIS =
conf.getLong("spark.worker.cleanup.interval", 60 * 30) * 1000
//应用文件夹/数据的ttl,ttl过期时间为7天,过期后将被清理
private val APP_DATA_RETENTION_SECONDS =
conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)
//是否清理 executor 存在的非 shuffer 产生的文件,默认为true
private val CLEANUP_NON_SHUFFLE_FILES_ENABLED =
conf.getBoolean("spark.storage.cleanupFilesAfterExecutorExit", true)
private val testing: Boolean = sys.props.contains("spark.testing")
//Master节点的RpcEndpointRef代理对象
private var master: Option[RpcEndpointRef] = None
private val preferConfiguredMasterAddress =
conf.getBoolean("spark.worker.preferConfiguredMasterAddress", false)
private var masterAddressToConnect: Option[RpcAddress] = None
private var activeMasterUrl: String = ""
private[worker] var activeMasterWebUiUrl : String = ""
private var workerWebUiUrl: String = ""
private val workerUri = RpcEndpointAddress(rpcEnv.address, endpointName).toString
//标识该worker节点是否注册成功
private var registered = false
private var connected = false
//生成workerID
private val workerId = generateWorkerId()
private val sparkHome =
if (testing) {
assert(sys.props.contains("spark.test.home"), "spark.test.home is not set!")
new File(sys.props("spark.test.home"))
} else {
new File(sys.env.get("SPARK_HOME").getOrElse("."))
}
//工作目录
var workDir: File = null
//当前Worker上完成的Executor的集合
val finishedExecutors = new LinkedHashMap[String, ExecutorRunner]
//当前Worker上的Driver集合
val drivers = new HashMap[String, DriverRunner]
//当前Worker上executor集合
val executors = new HashMap[String, ExecutorRunner]
//当前Worker上完成的Driver集合
val finishedDrivers = new LinkedHashMap[String, DriverRunner]
//当前Worker上Application的工作文件夹
val appDirectories = new HashMap[String, Seq[String]]
//当前Worker上已完成的Application
val finishedApps = new HashSet[String]
val retainedExecutors = conf.getInt("spark.worker.ui.retainedExecutors",
WorkerWebUI.DEFAULT_RETAINED_EXECUTORS)
val retainedDrivers = conf.getInt("spark.worker.ui.retainedDrivers",
WorkerWebUI.DEFAULT_RETAINED_DRIVERS)
//TODO 启动shuffle服务
private val shuffleService = if (externalShuffleServiceSupplier != null) {
externalShuffleServiceSupplier.get()
} else {
//默认实现
new ExternalShuffleService(conf, securityMgr)
}
private val publicAddress = {
val envVar = conf.getenv("SPARK_PUBLIC_DNS")
if (envVar != null) envVar else host
}
private var webUi: WorkerWebUI = null
private var connectionAttemptCount = 0
//性能监控的组件
private val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf, securityMgr)
private val workerSource = new WorkerSource(this)
val reverseProxy = conf.getBoolean("spark.ui.reverseProxy", false)
private var registerMasterFutures: Array[JFuture[_]] = null
private var registrationRetryTimer: Option[JScheduledFuture[_]] = None
//向Master注册的线程池,由于向Master注册是同步阻塞的,所以线程个数和Master的个数是一致的
private val registerMasterThreadPool = ThreadUtils.newDaemonCachedThreadPool(
"worker-register-master-threadpool",
masterRpcAddresses.length // Make sure we can register with all masters at the same time
)
//用于记录资源
//已使用cpu个数
var coresUsed = 0
//已使用内存大小
var memoryUsed = 0
...
}
可以看到,Worker 中定义了多个属性。重点看以下几个:
cores:cup总个数
memory:可用内存
workDirPath:工作目录路径
forwordMessageScheduler:进行发送消息调度的线程池
cleanupThreadExecutor:完成清理工作的线程池。清理任务执行过程中上传的 jar 包、文件等,清理周期由参数 spark.worker.cleanup.interval 设置,默认每隔 30 分钟清理一次
HEARTBEAT_MILLIS:心跳的间隔时间。计算逻辑是 spark.worker.timeout / 4,即 60 /4 = 15 秒
APP_DATA_RETENTION_SECONDS:应用文件夹/数据的过期时间。由参数 spark.worker.cleanup.appDataTtl 配置,默认为 7 天
master:ALIVE 状态的 Master 的代理对象
registered:标识该 Worker 是否注册成功
drivers:当前 Worker 上的 Driver 集合
executors:当前 Worker 上的 Executor 集合
shuffleService:shuffle 服务对象。后面会进行单独分析
coreUsed:已使用的 CPU 个数
memoryUsed:已使用的内存大小
由于 Worker 是真正执行任务的,所以它会记录自身的可用资源,以便向 Master 报告
override def onStart() {
//验证该Worker未注册
assert(!registered)
logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
host, port, cores, Utils.megabytesToString(memory)))
logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
logInfo("Spark home: " + sparkHome)
//TODO 第一步:创建当前Worker的工作目录
createWorkDir()
//TODO 第二步:启动shuffle服务
startExternalShuffleService()
//TODO 第三步:启动WebUI,绑定端口
webUi = new WorkerWebUI(this, workDir, webUiPort)
webUi.bind()
workerWebUiUrl = s"http://$publicAddress:${webUi.boundPort}"
//TODO 第四步:向Master注册
registerWithMaster()
//TODO 第五步:注册指标监控系统并启动
metricsSystem.registerSource(workerSource)
metricsSystem.start()
metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
}
onStart 方法主要分为五步:
第一步:创建当前 Worker 的工作目录。也就是在 SPARK_HOME 目录下创建一个 work 目录
第二步:启动 shuffle 服务
第三步:启动 WebUI,绑定端口
第四步:向 Master 注册
第五步:注册指标监控系统并启动
这里重点分析如何向 Master 进行注册,至于如何启动 shuffle 服务,后面单独分析
向 Master 注册调用了 registerWithMaster 方法:
private def registerWithMaster() {
registrationRetryTimer match {
case None =>
//如果还未注册
registered = false
//TODO 开启注册,多次重试直到注册成功
registerMasterFutures = tryRegisterAllMasters()
connectionAttemptCount = 0
//如果没有注册成功,就启动一个定时任务,每隔一段时间([5,15]之间的随机整数)重新注册一次
registrationRetryTimer = Some(forwordMessageScheduler.scheduleAtFixedRate(
new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
Option(self).foreach(_.send(ReregisterWithMaster))
}
},
//[5,15]之间的随机整数
INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
TimeUnit.SECONDS))
case Some(_) =>
logInfo("Not spawning another attempt to register with the master, since there is an" +
" attempt scheduled already.")
}
}
如果从未注册过,则启动注册;
如果没有注册成功,则启动一个定时任务,每隔一段时间给自己发送一个 ReregisterWithMaster 类型的消息
启动注册:tryRegisterAllMasters
private def tryRegisterAllMasters(): Array[JFuture[_]] = {
//向每个Master提交注册信息
masterRpcAddresses.map { masterAddress =>
//往线程池中提交了一个注册的任务
registerMasterThreadPool.submit(new Runnable {
override def run(): Unit = {
try {
logInfo("Connecting to master " + masterAddress + "...")
//获取到MasterEndPointRef代理对象
val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
//发送RegisterWorker注册消息给Master
sendRegisterMessageToMaster(masterEndpoint)
} catch {
case ie: InterruptedException => // Cancelled
case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
}
}
})
}
}
Master 接收注册消息的处理逻辑如下:
case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, masterAddress) =>
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
//如果是STANDBY状态的Master
if (state == RecoveryState.STANDBY) {
//什么都不做,给Worker返回一个StandBy
workerRef.send(MasterInStandby)
//判断该Worker是否注册过
} else if (idToWorker.contains(id)) {
//如果注册过了,就返回一个RegisterWorkerFailed消息
workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))
//如果没有注册过,则完成注册
} else {
//初始化一个WorkerInfo对象,存储了Worker的信息
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
workerRef, workerWebUiUrl)
//执行Worker的注册,将Worker信息放入各种数据结构中
//如果注册成功
if (registerWorker(worker)) {
//把Worker信息写到zk集群
persistenceEngine.addWorker(worker)
//给Worker发送RegisteredWorker注册成功的消息
workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress))
//Worker加入工作,当Worker加入时,可用资源必定增加,那么就将等待执行的Driver,Executor等启动起来执行任务
schedule()
} else {
val workerAddress = worker.endpoint.address
logWarning("Worker registration failed. Attempted to re-register worker at same " +
"address: " + workerAddress)
workerRef.send(RegisterWorkerFailed("Attempted to re-register worker at same address: "
+ workerAddress))
}
}
-
如果 Master 的状态为 STANDBY,则给 Worker 返回一个 MasterInStandby 消息,Worker 接收该类型消息什么都不会做 -
如果 Master 状态为 ALIVE,但该 Worker 已经注册过,就返回一个 RegisterWorkerFailed 类型的消息,Worker 接收该类型消息后,如果 Worker 已经注册过,则什么都不做,如果没有注册过则记录错误日志 -
如果 Master 状态为 ALIVE,且该 Worker 没有注册过,则完成 Worker 注册:
-
第一步:将 Worker 信息封装成一个 WorkerInfo 对象 -
第二步:进行 Worker 注册,即将 Worker 信息保存到 Master 的各种数据结构中 -
第三步:如果注册成功,则将 Worker 信息写到 zk 集群 -
第四步:给 Worker 发送 RegisteredWorker 类型的消息 -
第五步:调度资源,让等待资源的 Driver 和 Executor 执行任务
private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized {
msg match {
//接收到注册成功的消息
case RegisteredWorker(masterRef, masterWebUiUrl, masterAddress) =>
if (preferConfiguredMasterAddress) {
logInfo("Successfully registered with master " + masterAddress.toSparkURL)
} else {
logInfo("Successfully registered with master " + masterRef.address.toSparkURL)
}
//标记注册成功
registered = true
//Worker会同时给两个Master发送注册信息,如果Master节点发生变更,那么这里就会更新Master的信息
changeMaster(masterRef, masterWebUiUrl, masterAddress)
//Worker注册成功后,启动定时任务,每隔15秒执行一次,给自己发送一个SendHeartbeat消息
forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(SendHeartbeat)
}
}, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
//如果配置了定期清理工作目录,则启动定时任务,延时30分钟后,每隔30分钟执行一次
if (CLEANUP_ENABLED) {
logInfo(
s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(WorkDirCleanup)
}
}, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
}
val execs = executors.values.map { e =>
new ExecutorDescription(e.appId, e.execId, e.cores, e.state)
}
//给Master发送WorkerLatestState类型的消息
masterRef.send(WorkerLatestState(workerId, execs.toList, drivers.keys.toSeq))
...
}
}
如果接收到注册成功的消息,会执行以下步骤:
第一步:更新 Worker 的 registered 变量,标记注册成功
第二步:更新 Master 信息。Worker会同时给两个Master发送注册信息,如果Master节点发生变更,那么这里就会更新Master的信息
第三步:启动心跳定时任务,每隔 15 秒给自己发送一个 SendHeartbeat 消息,自身接收到该消息,会给 Master 发送 Heartbeat 类型的消息
第四步:如果开启了定期清理工作目录,则启动定时任务,延时30分钟后,每隔30分钟执行一次清理任务
第五步:将所有 Executor 封装成 ExecutorDescription 对象,然后向 Master 发送 WorkerLatestState 类型的消息
上面向 Master 发送了两种类型的消息:Heartbeat 和 WorkerLatestState ,下面看 Master 是如何处理的
Heartbeat :
case Heartbeat(workerId, worker) =>
//根据WorkerID获取Worker信息
idToWorker.get(workerId) match {
//如果获取到了则更新最后一次心跳的时间
case Some(workerInfo) =>
workerInfo.lastHeartbeat = System.currentTimeMillis()
//如果没有获取到,说明还没有注册 则给Worker发送ReconnectWorker消息
case None =>
if (workers.map(_.id).contains(workerId)) {
logWarning(s"Got heartbeat from unregistered worker $workerId." +
" Asking it to re-register.")
worker.send(ReconnectWorker(masterUrl))
} else {
logWarning(s"Got heartbeat from unregistered worker $workerId." +
" This worker was never registered, so ignoring the heartbeat.")
}
}
如果是 Heartbeat 类型的消息,Master 会判断该 Worker 是否注册:
如果注册了,会更新最后一个心跳的时间
如果没有注册,会给 Worker 返回一个 ReconnectWorker 类型的消息。Worker 处理该消息的逻辑就是调用上面的 registerWithMaster 方法进行注册
WorkerLatestState :
case WorkerLatestState(workerId, executors, driverIds) =>
//根据workerId获取Worker信息
idToWorker.get(workerId) match {
//如果获取到了指定的 Worker 信息
case Some(worker) =>
//遍历所有的Executor信息,看Master保存的Worker信息中是否包含该Executor信息,
// 如果不包含,则给Worker发送KillExecutor消息,让其关闭该Executor
for (exec <- executors) {
val executorMatches = worker.executors.exists {
case (_, e) => e.application.id == exec.appId && e.id == exec.execId
}
if (!executorMatches) {
// master doesn't recognize this executor. So just tell worker to kill it.
worker.endpoint.send(KillExecutor(masterUrl, exec.appId, exec.execId))
}
}
//遍历所有的Driver信息,看Master保存的Worker信息中是否包含该Driver信息,
// 如果不包含,则给Worker发送KillDriver消息,让其关闭该Driver
for (driverId <- driverIds) {
val driverMatches = worker.drivers.exists { case (id, _) => id == driverId }
if (!driverMatches) {
// master doesn't recognize this driver. So just tell worker to kill it.
worker.endpoint.send(KillDriver(driverId))
}
}
//如果没有获取到则记录日志
case None =>
logWarning("Worker state from unknown worker: " + workerId)
}
该分支的作用是检查当前 Worker 上的 Executor 和 Driver 是否和 Master 端记录的一致,如果不一致则关闭 Executor 和 Driver
注意:从这里也可以看出,在 Master 端维护的 WorkerInfo 信息中,包含了该 Worker 上运行的所有 Executor 和 Driver 信息
最后看一下如果 Worker 注册不成功,如何重新注册。调用的是
Worker.reregisterWithMaster 方法
private def reregisterWithMaster(): Unit = {
Utils.tryOrExit {
//更新尝试注册的次数
connectionAttemptCount += 1
//如果已经注册成功,取消最后一次重新注册的操作
if (registered) {
cancelLastRegistrationRetry()
//如果重试次数小于阈值(默认16)
} else if (connectionAttemptCount <= TOTAL_REGISTRATION_RETRIES) {
logInfo(s"Retrying connection to master (attempt # $connectionAttemptCount)")
master match {
case Some(masterRef) =>
//如果Worker中保存了Master信息,但是注册失败了,说明失去了和Master的连接,所以代理对象不可用,需要重新创建代理对象
if (registerMasterFutures != null) {
registerMasterFutures.foreach(_.cancel(true))
}
//重新获取Master代理对象
val masterAddress =
if (preferConfiguredMasterAddress) masterAddressToConnect.get else masterRef.address
//重新向线程池添加注册Worker的任务
registerMasterFutures = Array(registerMasterThreadPool.submit(new Runnable {
override def run(): Unit = {
try {
logInfo("Connecting to master " + masterAddress + "...")
val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
//给Master发送RegisterWorker消息
sendRegisterMessageToMaster(masterEndpoint)
} catch {
case ie: InterruptedException => // Cancelled
case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
}
}
}))
case None =>
if (registerMasterFutures != null) {
registerMasterFutures.foreach(_.cancel(true))
}
// We are retrying the initial registration
//尝试进行注册
registerMasterFutures = tryRegisterAllMasters()
}
// We have exceeded the initial registration retry threshold
// All retries from now on should use a higher interval
//如果达到了初始的重试次数阈值(默认为6),那么应该加大重试间隔
if (connectionAttemptCount == INITIAL_REGISTRATION_RETRIES) {
//取消注册重试计时
registrationRetryTimer.foreach(_.cancel(true))
//重新设置注册的重试计时,重试间隔增大到[30,90]之间的一个随机数,假设为m
//那么延时m秒后,每隔m秒重试一次
registrationRetryTimer = Some(
forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(ReregisterWithMaster)
}
}, PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS,
PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS,
TimeUnit.SECONDS))
}
} else {
//如果超过重试次数阈值,则记录错误日志
logError("All masters are unresponsive! Giving up.")
System.exit(1)
}
}
}
该方法的逻辑是:
第一步:更新重试注册的计数器
第二步:如果已经注册成功了,那么取消最后一次的重新注册操作
第三步:如果超过了注册重试的总次数(默认16次),则记录错误日志
第四步:如果没有超过注册重试的总次数,判断是否有 Master 代理信息,如果没有,且没有注册失败的信息,则尝试进行初始注册
第五步:如果有 Master 代理信息,但是注册失败了,说明失去了和 Master 的连接,那么需要重新获取 Master 的代理对象。然后使用重新获取的代理对象,向 Master 发送注册消息(这里是将注册的任务重新添加到管理注册线程的线程池里面)
第六步:如果注册重试的次数达到了初始的重试次数阈值(默认6次),则重新设置注册的重试计时,增大重试的间隔时间,这个时间是 [30,90] 之前的一个随机整数,假设为m,单位是秒。然后启动定时任务,在延时 m 秒之后,每个 m 秒重新注册一次,直到达到总的重试次数。
以上是关于Spark 启动 | Worker 启动流程详解的主要内容,如果未能解决你的问题,请参考以下文章