Spark启动流程(Standalone)- master源码

Posted hyunbar

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark启动流程(Standalone)- master源码相关的知识,希望对你有一定的参考价值。

Master源码

 1 package org.apache.spark.deploy.master
 2 //伴生类
 3 private[deploy] class Master(
 4                                 override val rpcEnv: RpcEnv,
 5                                 address: RpcAddress,
 6                                 webUiPort: Int,
 7                                 val securityMgr: SecurityManager,
 8                                 val conf: SparkConf)
 9     extends ThreadSafeRpcEndpoint with Logging with LeaderElectable
10 {
11 ...
12 }
13 //伴生对象
14 private[deploy] object Master extends Logging{
15     val SYSTEM_NAME = "sparkMaster"
16     val ENDPOINT_NAME = "Master"
17     
18     // 启动 Master 的入口函数
19     def main(argStrings: Array[String]) {
20         Utils.initDaemon(log)
21         val conf = new SparkConf
22         // 构建用于参数解析的实例   
23         //--host hadoop201 --port 7077 --webui-port 8080
24         val args = new MasterArguments(argStrings, conf)
25         // 启动 RPC 通信环境和 MasterEndPoint(通信终端)
26        //<<1>>
27         val (rpcEnv, _, _): (RpcEnv, Int, Option[Int]) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
28         rpcEnv.awaitTermination()
29     }
30    ...
31 }

 

<< 1 >>、启动Mater返回一个三元组

 1     /**
 2       * Start the Master and return a three tuple of:
 3       * 启动 Master, 并返回一个三元组
 4       * (1) The Master RpcEnv
 5       * (2) The web UI bound port
 6       * (3) The REST server bound port, if any
 7       */
 8     def startRpcEnvAndEndpoint(
 9                                   host: String,
10                                   port: Int,
11                                   webUiPort: Int,
12                                   conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
13         val securityMgr = new SecurityManager(conf)
14         // 创建 Master 端的 RpcEnv 环境, 并启动 RpcEnv
15         // 参数: sparkMaster hadoop201 7077 conf securityMgr
16         // 返回值  的实际类型是: NettyRpcEnv
17         //<< 1.1 >>
18         val rpcEnv: RpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
19         // 创建 Master对象, 该对象就是一个 RpcEndpoint, 在 RpcEnv 中注册这个 RpcEndpoint
20         // 返回该 RpcEndpoint 的引用, 使用该引用来接收信息和发送信息
21         //<< 1.2 >>
22         val masterEndpoint: RpcEndpointRef = rpcEnv.setupEndpoint(ENDPOINT_NAME,
23             new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
24         // 向 Master 的通信终端发法请求,获取 BoundPortsResponse 对象
25         // BoundPortsResponse 是一个样例类包含三个属性: rpcEndpointPort webUIPort restPort
26         val portsResponse: BoundPortsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
27         (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
28     }

 

<< 1.1 >> RpcEnv的创建

 1 def create(
 2               name: String,
 3               bindAddress: String,
 4               advertiseAddress: String,
 5               port: Int,
 6               conf: SparkConf,
 7               securityManager: SecurityManager,
 8               clientMode: Boolean): RpcEnv = {
 9     // 保存 RpcEnv 的配置信息
10     val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
11         clientMode)
12     // 创建 NettyRpcEvn
13     //<< 1.1.1 >>
14     new NettyRpcEnvFactory().create(config)
15 }

 

真正的创建是调用NettyRpcEnvFactory 的 create 方法创建

 

创建NettyRpcEnv的时候,会创建消息分发器,收件箱和存储远程地址与发件箱的Map

 

RpcEnv.scala

 

<< 1.1.1 >> NettyRpcEnvFactory ( NettyRpcEnv .scala)

 1 private[rpc] class NettyRpcEnvFactory extends RpcEnvFactory with Logging {
 2     /*
 3     创建 NettyRpcEnv, 并且启动为后台程序
 4      */
 5     def create(config: RpcEnvConfig): RpcEnv = {
 6         val sparkConf: SparkConf = config.conf
 7         // Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support
 8         // KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance
 9         // 用于 Rpc传输对象时的序列化
10         val javaSerializerInstance: JavaSerializerInstance = new JavaSerializer(sparkConf)
11             .newInstance()
12             .asInstanceOf[JavaSerializerInstance]
13         // 实例化 NettyRpcEnv
14         val nettyEnv = new NettyRpcEnv(
15             sparkConf,
16             javaSerializerInstance,
17             config.advertiseAddress,
18             config.securityManager)
19         if (!config.clientMode) {
20             // 定义 NettyRpcEnv 的启动函数
21             val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
22                 nettyEnv.startServer(config.bindAddress, actualPort)
23                 (nettyEnv, nettyEnv.address.port)
24             }
25             try {
26                 // 启动 NettyRpcEnv
27                 Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
28             } catch {
29                 case NonFatal(e) =>
30                     nettyEnv.shutdown()
31                     throw e
32             }
33         }
34         nettyEnv
35     }
36 }

 

<< 1.2 >> Master伴生类(Master 端的 RpcEndpoint 启动)

 

Master是一个RpcEndpoint.

 

他的生命周期方法是: constructor -> onStart -> receive* -> onStop

 

onStart 主要代码片段

 

 1  // 创建 WebUI 服务器
 2  webUi = new MasterWebUI(this, webUiPort)
 3 
 4 
 5 // 按照固定的频率去启动线程来检查 Worker 是否超时. 其实就是给自己发信息: CheckForWorkerTimeOut
 6 // 默认是每分钟检查一次.
 7  checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
 8     override def run(): Unit = Utils.tryLogNonFatalError {
 9 // 在 receive 方法中对 CheckForWorkerTimeOut 进行处理
10        //<< 1.2.1 >>
11         self.send(CheckForWorkerTimeOut)
12     }
13 }, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
14 
15 
16 private val WORKER_TIMEOUT_MS = conf.getLong("spark.worker.timeout", 60) * 1000

 

<< 1.2.1 >> 检查并移除超时的worker

 

 1    /** Check for, and remove, any timed-out workers */
 2     private def timeOutDeadWorkers() {
 3         // Copy the workers into an array so we don‘t modify the hashset while iterating through it
 4         val currentTime = System.currentTimeMillis()
 5         //  把超时的 Worker 从 workers 中移除
 6 //过滤出来要移除的worker:(上次心跳时间 小于 当前时间 减去 超时时间 )即为超时
 7         val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT_MS).toArray
 8         for (worker <- toRemove) {
 9             // 如果 worker 的状态不是 DEAD
10             if (worker.state != WorkerState.DEAD) {
11                 logWarning("Removing %s because we got no heartbeat in %d seconds".format(
12                     worker.id, WORKER_TIMEOUT_MS / 1000))
13                 removeWorker(worker) //
14             } else {
15                 if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT_MS)) {
16                     workers -= worker // we‘ve seen this DEAD worker in the UI, etc. for long enough; cull it
17                 }
18             }
19         }
20     }

以上是关于Spark启动流程(Standalone)- master源码的主要内容,如果未能解决你的问题,请参考以下文章

大数据:Spark Standalone 集群调度如何创建分配Executors的资源

spark job提交执行流程

spark job提交执行流程

spark job提交执行流程

spark job提交执行流程

Spark 学习笔记之 Standalone与Yarn启动和运行时间测试