spark 源码分析之二 -- SparkContext 的初始化过程
Posted johnny666888
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark 源码分析之二 -- SparkContext 的初始化过程相关的知识,希望对你有一定的参考价值。
创建或使用现有Session
从Spark 2.0 开始,引入了 SparkSession的概念,创建或使用已有的session 代码如下:
1 val spark = SparkSession 2 .builder 3 .appName("SparkTC") 4 .getOrCreate()
首先,使用了 builder 模式来创建或使用已存在的SparkSession,org.apache.spark.sql.SparkSession.Builder#getOrCreate 代码如下:
1 def getOrCreate(): SparkSession = synchronized 2 assertOnDriver() // 注意,spark session只能在 driver端创建并访问 3 // Get the session from current thread‘s active session. 4 // activeThreadSession 是一个InheritableThreadLocal(继承自ThreadLocal)方法。因为数据在 ThreadLocal中存放着,所以不需要加锁 5 var session = activeThreadSession.get() 6 // 如果session不为空,且session对应的sparkContext已经停止了,可以使用现有的session 7 if ((session ne null) && !session.sparkContext.isStopped) 8 options.foreach case (k, v) => session.sessionState.conf.setConfString(k, v) 9 if (options.nonEmpty) 10 logWarning("Using an existing SparkSession; some configuration may not take effect.") 11 12 return session 13 14 15 // 给SparkSession 对象加锁,防止重复初始化 session 16 SparkSession.synchronized 17 // If the current thread does not have an active session, get it from the global session. 18 // 如果默认session 中有session存在,切其sparkContext 已经停止,也可以使用 19 session = defaultSession.get() 20 if ((session ne null) && !session.sparkContext.isStopped) 21 options.foreach case (k, v) => session.sessionState.conf.setConfString(k, v) 22 if (options.nonEmpty) 23 logWarning("Using an existing SparkSession; some configuration may not take effect.") 24 25 return session 26 27 28 // 创建session 29 val sparkContext = userSuppliedContext.getOrElse // 默认userSuppliedContext肯定没有SparkSession对象 30 val sparkConf = new SparkConf() 31 options.foreach case (k, v) => sparkConf.set(k, v) 32 33 // set a random app name if not given. 34 if (!sparkConf.contains("spark.app.name")) 35 sparkConf.setAppName(java.util.UUID.randomUUID().toString) 36 37 38 SparkContext.getOrCreate(sparkConf) 39 // Do not update `SparkConf` for existing `SparkContext`, as it‘s shared by all sessions. 40 41 42 // Initialize extensions if the user has defined a configurator class. 43 val extensionConfOption = sparkContext.conf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS) 44 if (extensionConfOption.isDefined) 45 val extensionConfClassName = extensionConfOption.get 46 try 47 val extensionConfClass = Utils.classForName(extensionConfClassName) 48 val extensionConf = extensionConfClass.newInstance() 49 .asInstanceOf[SparkSessionExtensions => Unit] 50 extensionConf(extensions) 51 catch 52 // Ignore the error if we cannot find the class or when the class has the wrong type. 53 case e @ (_: ClassCastException | 54 _: ClassNotFoundException | 55 _: NoClassDefFoundError) => 56 logWarning(s"Cannot use $extensionConfClassName to configure session extensions.", e) 57 58 59 // 初始化 SparkSession,并把刚初始化的 SparkContext 传递给它 60 session = new SparkSession(sparkContext, None, None, extensions) 61 options.foreach case (k, v) => session.initialSessionOptions.put(k, v) 62 // 设置 default session 63 setDefaultSession(session) 64 // 设置 active session 65 setActiveSession(session) 66 67 // Register a successfully instantiated context to the singleton. This should be at the 68 // end of the class definition so that the singleton is updated only if there is no 69 // exception in the construction of the instance. 70 // 设置 apark listener ,当application 结束时,default session 重置 71 sparkContext.addSparkListener(new SparkListener 72 override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = 73 defaultSession.set(null) 74 75 ) 76 77 78 return session 79
org.apache.spark.SparkContext#getOrCreate方法如下:
1 def getOrCreate(config: SparkConf): SparkContext = 2 // Synchronize to ensure that multiple create requests don‘t trigger an exception 3 // from assertNoOtherContextIsRunning within setActiveContext 4 // 使用Object 对象锁 5 SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized 6 // activeContext是一个AtomicReference 实例,它的数据set或update都是原子性的 7 if (activeContext.get() == null) 8 // 一个session 只有一个 SparkContext 上下文对象 9 setActiveContext(new SparkContext(config), allowMultipleContexts = false) 10 else 11 if (config.getAll.nonEmpty) 12 logWarning("Using an existing SparkContext; some configuration may not take effect.") 13 14 15 activeContext.get() 16 17
Spark Context 初始化
SparkContext 代表到 spark 集群的连接,它可以用来在spark集群上创建 RDD,accumulator和broadcast 变量。一个JVM 只能有一个活动的 SparkContext 对象,当创建一个新的时候,必须调用stop 方法停止活动的 SparkContext。
当调用了构造方法后,会初始化类的成员变量,然后进入初始化过程。由 try catch 块包围,这个 try catch 块是在执行构造函数时执行的,参照我写的一篇文章:scala class中孤立代码块揭秘
这块孤立的代码块如下:
1 try 2 // 1. 初始化 configuration 3 _conf = config.clone() 4 _conf.validateSettings() 5 6 if (!_conf.contains("spark.master")) 7 throw new SparkException("A master URL must be set in your configuration") 8 9 if (!_conf.contains("spark.app.name")) 10 throw new SparkException("An application name must be set in your configuration") 11 12 13 // log out spark.app.name in the Spark driver logs 14 logInfo(s"Submitted application: $appName") 15 16 // System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster 17 if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) 18 throw new SparkException("Detected yarn cluster mode, but isn‘t running on a cluster. " + 19 "Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.") 20 21 22 if (_conf.getBoolean("spark.logConf", false)) 23 logInfo("Spark configuration:\\n" + _conf.toDebugString) 24 25 26 // Set Spark driver host and port system properties. This explicitly sets the configuration 27 // instead of relying on the default value of the config constant. 28 _conf.set(DRIVER_HOST_ADDRESS, _conf.get(DRIVER_HOST_ADDRESS)) 29 _conf.setIfMissing("spark.driver.port", "0") 30 31 _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER) 32 33 _jars = Utils.getUserJars(_conf) 34 _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty)) 35 .toSeq.flatten 36 // 2. 初始化日志目录并设置压缩类 37 _eventLogDir = 38 if (isEventLogEnabled) 39 val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR) 40 .stripSuffix("/") 41 Some(Utils.resolveURI(unresolvedDir)) 42 else 43 None 44 45 46 _eventLogCodec = 47 val compress = _conf.getBoolean("spark.eventLog.compress", false) 48 if (compress && isEventLogEnabled) 49 Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName) 50 else 51 None 52 53 54 // 3. LiveListenerBus负责将SparkListenerEvent异步地传递给对应注册的SparkListener. 55 _listenerBus = new LiveListenerBus(_conf) 56 57 // Initialize the app status store and listener before SparkEnv is created so that it gets 58 // all events. 59 // 4. 给 app 提供一个 kv store(in-memory) 60 _statusStore = AppStatusStore.createLiveStore(conf) 61 // 5. 注册 AppStatusListener 到 LiveListenerBus 中 62 listenerBus.addToStatusQueue(_statusStore.listener.get) 63 64 // Create the Spark execution environment (cache, map output tracker, etc) 65 // 6. 创建 driver端的 env 66 // 包含所有的spark 实例运行时对象(master 或 worker),包含了序列化器,RPCEnv,block manager, map out tracker等等。 67 // 当前的spark 通过一个全局的变量代码找到 SparkEnv,所有的线程可以访问同一个SparkEnv, 68 // 创建SparkContext之后,可以通过 SparkEnv.get方法来访问它。 69 _env = createSparkEnv(_conf, isLocal, listenerBus) 70 SparkEnv.set(_env) 71 72 // If running the REPL, register the repl‘s output dir with the file server. 73 _conf.getOption("spark.repl.class.outputDir").foreach path => 74 val replUri = _env.rpcEnv.fileServer.addDirectory("/classes", new File(path)) 75 _conf.set("spark.repl.class.uri", replUri) 76 77 // 7. 从底层监控 spark job 和 stage 的状态并汇报的 API 78 _statusTracker = new SparkStatusTracker(this, _statusStore) 79 80 // 8. console 进度条 81 _progressBar = 82 if (_conf.get(UI_SHOW_CONSOLE_PROGRESS) && !log.isInfoEnabled) 83 Some(new ConsoleProgressBar(this)) 84 else 85 None 86 87 88 // 9. spark ui, 使用jetty 实现 89 _ui = 90 if (conf.getBoolean("spark.ui.enabled", true)) 91 Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "", 92 startTime)) 93 else 94 // For tests, do not enable the UI 95 None 96 97 // Bind the UI before starting the task scheduler to communicate 98 // the bound port to the cluster manager properly 99 _ui.foreach(_.bind()) 100 101 // 10. 创建 hadoop configuration 102 _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf) 103 104 // 11. Add each JAR given through the constructor 105 if (jars != null) 106 jars.foreach(addJar) 107 108 109 if (files != null) 110 files.foreach(addFile) 111 112 // 12. 计算 executor 的内存 113 _executorMemory = _conf.getOption("spark.executor.memory") 114 .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY"))) 115 .orElse(Option(System.getenv("SPARK_MEM")) 116 .map(warnSparkMem)) 117 .map(Utils.memoryStringToMb) 118 .getOrElse(1024) 119 120 // Convert java options to env vars as a work around 121 // since we can‘t set env vars directly in sbt. 122 for (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing")) 123 value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey))) 124 executorEnvs(envKey) = value 125 126 Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach v => 127 executorEnvs("SPARK_PREPEND_CLASSES") = v 128 129 // The Mesos scheduler backend relies on this environment variable to set executor memory. 130 // TODO: Set this only in the Mesos scheduler. 131 executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m" 132 executorEnvs ++= _conf.getExecutorEnv 133 executorEnvs("SPARK_USER") = sparkUser 134 135 // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will 136 // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640) 137 // 13. 创建 HeartbeatReceiver endpoint 138 _heartbeatReceiver = env.rpcEnv.setupEndpoint( 139 HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this)) 140 141 // Create and start the scheduler 142 // 14. 创建 task scheduler 和 scheduler backend 143 val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) 144 _schedulerBackend = sched 145 _taskScheduler = ts 146 // 15. 创建DAGScheduler实例 147 _dagScheduler = new DAGScheduler(this) 148 _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet) 149 150 // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler‘s 151 // constructor 152 // 16. 启动 task scheduler 153 _taskScheduler.start() 154 155 // 17. 从task scheduler 获取 application ID 156 _applicationId = _taskScheduler.applicationId() 157 // 18. 从 task scheduler 获取 application attempt id 158 _applicationAttemptId = taskScheduler.applicationAttemptId() 159 _conf.set("spark.app.id", _applicationId) 160 if (_conf.getBoolean("spark.ui.reverseProxy", false)) 161 System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId) 162 163 // 19. 为ui 设置 application id 164 _ui.foreach(_.setAppId(_applicationId)) 165 // 20. 初始化 block manager 166 _env.blockManager.initialize(_applicationId) 167 168 // The metrics system for Driver need to be set spark.app.id to app ID. 169 // So it should start after we get app ID from the task scheduler and set spark.app.id. 170 // 21. 启动 metricsSystem 171 _env.metricsSystem.start() 172 // Attach the driver metrics servlet handler to the web ui after the metrics system is started. 173 // 22. 将 metricSystem 的 servlet handler 给 ui 用 174 _env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler))) 175 176 // 23. 初始化 event logger listener 177 _eventLogger = 178 if (isEventLogEnabled) 179 val logger = 180 new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get, 181 _conf, _hadoopConfiguration) 182 logger.start() 183 listenerBus.addToEventLogQueue(logger) 184 Some(logger) 185 else 186 None 187 188 189 // Optionally scale number of executors dynamically based on workload. Exposed for testing. 190 // 24. 如果启用了动态分配 executor, 需要实例化 executorAllocationManager 并启动之 191 val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf) 192 _executorAllocationManager = 193 if (dynamicAllocationEnabled) 194 schedulerBackend match 195 case b: ExecutorAllocationClient => 196 Some(new ExecutorAllocationManager( 197 schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf, 198 _env.blockManager.master)) 199 case _ => 200 None 201 202 else 203 None 204 205 _executorAllocationManager.foreach(_.start()) 206 207 // 25. 初始化 ContextCleaner,并启动之 208 _cleaner = 209 if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) 210 Some(new ContextCleaner(this)) 211 else 212 None 213 214 _cleaner.foreach(_.start()) 215 // 26. 建立并启动 listener bus 216 setupAndStartListenerBus() 217 // 27. task scheduler 已就绪,发送环境已更新请求 218 postEnvironmentUpdate() 219 // 28. 发送 application start 请求事件 220 postApplicationStart() 221 222 // Post init 223 // 29.等待 直至task scheduler backend 准备好了 224 _taskScheduler.postStartHook() 225 // 30. 注册 dagScheduler metricsSource 226 _env.metricsSystem.registerSource(_dagScheduler.metricsSource) 227 // 31. 注册 metric source 228 _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager)) 229 //32. 注册 metric source 230 _executorAllocationManager.foreach e => 231 _env.metricsSystem.registerSource(e.executorAllocationManagerSource) 232 233 234 // Make sure the context is stopped if the user forgets about it. This avoids leaving 235 // unfinished event logs around after the JVM exits cleanly. It doesn‘t help if the JVM 236 // is killed, though. 237 logDebug("Adding shutdown hook") // force eager creation of logger 238 // 33. 设置 shutdown hook, 在spark context 关闭时,要做的回调操作 239 _shutdownHookRef = ShutdownHookManager.addShutdownHook( 240 ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) () => 241 logInfo("Invoking stop() from shutdown hook") 242 try 243 stop() 244 catch 245 case e: Throwable => 246 logWarning("Ignoring Exception while stopping SparkContext from shutdown hook", e) 247 248 249 catch 250 case NonFatal(e) => 251 logError("Error initializing SparkContext.", e) 252 try 253 stop() 254 catch 255 case NonFatal(inner) => 256 logError("Error stopping SparkContext after init error.", inner) 257 finally 258 throw e 259 260
从上面可以看出,spark context 的初始化是非常复杂的,涉及的spark 组件很多,包括 异步事务总线系统LiveListenerBus、SparkEnv、SparkUI、DAGScheduler、metrics监测系统、EventLoggingListener、TaskScheduler、ExecutorAllocationManager、ContextCleaner等等。先暂且当作是总述,后面对部分组件会有比较全面的剖析。
以上是关于spark 源码分析之二 -- SparkContext 的初始化过程的主要内容,如果未能解决你的问题,请参考以下文章
spark streaming 接收kafka消息之二 -- 运行在driver端的receiver