spark 源码分析之二 -- SparkContext 的初始化过程

Posted johnny666888

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark 源码分析之二 -- SparkContext 的初始化过程相关的知识,希望对你有一定的参考价值。

  创建或使用现有Session

  从Spark 2.0 开始,引入了 SparkSession的概念,创建或使用已有的session 代码如下:

1 val spark = SparkSession
2   .builder
3   .appName("SparkTC")
4   .getOrCreate()

  首先,使用了 builder 模式来创建或使用已存在的SparkSession,org.apache.spark.sql.SparkSession.Builder#getOrCreate 代码如下:

  

 1 def getOrCreate(): SparkSession = synchronized 
 2   assertOnDriver() // 注意,spark session只能在 driver端创建并访问
 3   // Get the session from current thread‘s active session.
 4 // activeThreadSession 是一个InheritableThreadLocal(继承自ThreadLocal)方法。因为数据在 ThreadLocal中存放着,所以不需要加锁
 5   var session = activeThreadSession.get()
 6 // 如果session不为空,且session对应的sparkContext已经停止了,可以使用现有的session
 7   if ((session ne null) && !session.sparkContext.isStopped) 
 8     options.foreach  case (k, v) => session.sessionState.conf.setConfString(k, v) 
 9     if (options.nonEmpty) 
10       logWarning("Using an existing SparkSession; some configuration may not take effect.")
11     
12     return session
13   
14 
15   // 给SparkSession 对象加锁,防止重复初始化 session
16 SparkSession.synchronized 
17     // If the current thread does not have an active session, get it from the global session.
18 // 如果默认session 中有session存在,切其sparkContext 已经停止,也可以使用
19     session = defaultSession.get()
20     if ((session ne null) && !session.sparkContext.isStopped) 
21       options.foreach  case (k, v) => session.sessionState.conf.setConfString(k, v) 
22       if (options.nonEmpty) 
23         logWarning("Using an existing SparkSession; some configuration may not take effect.")
24       
25       return session
26     
27 
28     // 创建session
29     val sparkContext = userSuppliedContext.getOrElse  // 默认userSuppliedContext肯定没有SparkSession对象
30       val sparkConf = new SparkConf()
31       options.foreach  case (k, v) => sparkConf.set(k, v) 
32 
33       // set a random app name if not given.
34       if (!sparkConf.contains("spark.app.name")) 
35         sparkConf.setAppName(java.util.UUID.randomUUID().toString)
36       
37 
38       SparkContext.getOrCreate(sparkConf)
39       // Do not update `SparkConf` for existing `SparkContext`, as it‘s shared by all sessions.
40     
41 
42     // Initialize extensions if the user has defined a configurator class.
43     val extensionConfOption = sparkContext.conf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS)
44     if (extensionConfOption.isDefined) 
45       val extensionConfClassName = extensionConfOption.get
46       try 
47         val extensionConfClass = Utils.classForName(extensionConfClassName)
48         val extensionConf = extensionConfClass.newInstance()
49           .asInstanceOf[SparkSessionExtensions => Unit]
50         extensionConf(extensions)
51        catch 
52         // Ignore the error if we cannot find the class or when the class has the wrong type.
53         case e @ (_: ClassCastException |
54                   _: ClassNotFoundException |
55                   _: NoClassDefFoundError) =>
56           logWarning(s"Cannot use $extensionConfClassName to configure session extensions.", e)
57       
58     
59    // 初始化 SparkSession,并把刚初始化的 SparkContext 传递给它
60     session = new SparkSession(sparkContext, None, None, extensions)
61     options.foreach  case (k, v) => session.initialSessionOptions.put(k, v) 
62 // 设置 default session
63     setDefaultSession(session)
64 // 设置 active session
65 setActiveSession(session)
66 
67     // Register a successfully instantiated context to the singleton. This should be at the
68     // end of the class definition so that the singleton is updated only if there is no
69     // exception in the construction of the instance.
70     // 设置 apark listener ,当application 结束时,default session 重置
71 sparkContext.addSparkListener(new SparkListener 
72       override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = 
73         defaultSession.set(null)
74       
75     )
76   
77 
78   return session
79 

  org.apache.spark.SparkContext#getOrCreate方法如下:

 1 def getOrCreate(config: SparkConf): SparkContext = 
 2   // Synchronize to ensure that multiple create requests don‘t trigger an exception
 3   // from assertNoOtherContextIsRunning within setActiveContext
 4 // 使用Object 对象锁
 5   SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized 
 6 // activeContext是一个AtomicReference 实例,它的数据set或update都是原子性的
 7     if (activeContext.get() == null) 
 8 // 一个session 只有一个 SparkContext 上下文对象
 9       setActiveContext(new SparkContext(config), allowMultipleContexts = false)
10      else 
11       if (config.getAll.nonEmpty) 
12         logWarning("Using an existing SparkContext; some configuration may not take effect.")
13       
14     
15     activeContext.get()
16   
17 

Spark Context 初始化

  SparkContext 代表到 spark 集群的连接,它可以用来在spark集群上创建 RDD,accumulator和broadcast 变量。一个JVM 只能有一个活动的 SparkContext 对象,当创建一个新的时候,必须调用stop 方法停止活动的 SparkContext。
  当调用了构造方法后,会初始化类的成员变量,然后进入初始化过程。由 try catch 块包围,这个 try catch 块是在执行构造函数时执行的,参照我写的一篇文章:scala class中孤立代码块揭秘

  这块孤立的代码块如下:  

  1 try 
  2   // 1. 初始化 configuration
  3   _conf = config.clone()
  4   _conf.validateSettings()
  5 
  6   if (!_conf.contains("spark.master")) 
  7     throw new SparkException("A master URL must be set in your configuration")
  8   
  9   if (!_conf.contains("spark.app.name")) 
 10     throw new SparkException("An application name must be set in your configuration")
 11   
 12 
 13   // log out spark.app.name in the Spark driver logs
 14   logInfo(s"Submitted application: $appName")
 15 
 16   // System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster
 17   if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) 
 18     throw new SparkException("Detected yarn cluster mode, but isn‘t running on a cluster. " +
 19       "Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")
 20   
 21 
 22   if (_conf.getBoolean("spark.logConf", false)) 
 23     logInfo("Spark configuration:\\n" + _conf.toDebugString)
 24   
 25 
 26   // Set Spark driver host and port system properties. This explicitly sets the configuration
 27   // instead of relying on the default value of the config constant.
 28   _conf.set(DRIVER_HOST_ADDRESS, _conf.get(DRIVER_HOST_ADDRESS))
 29   _conf.setIfMissing("spark.driver.port", "0")
 30 
 31   _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
 32 
 33   _jars = Utils.getUserJars(_conf)
 34   _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
 35     .toSeq.flatten
 36   // 2. 初始化日志目录并设置压缩类
 37   _eventLogDir =
 38     if (isEventLogEnabled) 
 39       val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
 40         .stripSuffix("/")
 41       Some(Utils.resolveURI(unresolvedDir))
 42      else 
 43       None
 44     
 45 
 46   _eventLogCodec = 
 47     val compress = _conf.getBoolean("spark.eventLog.compress", false)
 48     if (compress && isEventLogEnabled) 
 49       Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName)
 50      else 
 51       None
 52     
 53   
 54   // 3. LiveListenerBus负责将SparkListenerEvent异步地传递给对应注册的SparkListener.
 55   _listenerBus = new LiveListenerBus(_conf)
 56 
 57   // Initialize the app status store and listener before SparkEnv is created so that it gets
 58   // all events.
 59   // 4. 给 app 提供一个 kv store(in-memory)
 60   _statusStore = AppStatusStore.createLiveStore(conf)
 61   // 5. 注册 AppStatusListener 到 LiveListenerBus 中
 62   listenerBus.addToStatusQueue(_statusStore.listener.get)
 63 
 64   // Create the Spark execution environment (cache, map output tracker, etc)
 65   // 6. 创建 driver端的 env
 66   // 包含所有的spark 实例运行时对象(master 或 worker),包含了序列化器,RPCEnv,block manager, map out tracker等等。
 67   // 当前的spark 通过一个全局的变量代码找到 SparkEnv,所有的线程可以访问同一个SparkEnv,
 68   // 创建SparkContext之后,可以通过 SparkEnv.get方法来访问它。
 69   _env = createSparkEnv(_conf, isLocal, listenerBus)
 70   SparkEnv.set(_env)
 71 
 72   // If running the REPL, register the repl‘s output dir with the file server.
 73   _conf.getOption("spark.repl.class.outputDir").foreach  path =>
 74     val replUri = _env.rpcEnv.fileServer.addDirectory("/classes", new File(path))
 75     _conf.set("spark.repl.class.uri", replUri)
 76   
 77   // 7. 从底层监控 spark job 和 stage 的状态并汇报的 API
 78   _statusTracker = new SparkStatusTracker(this, _statusStore)
 79 
 80   // 8. console 进度条
 81   _progressBar =
 82     if (_conf.get(UI_SHOW_CONSOLE_PROGRESS) && !log.isInfoEnabled) 
 83       Some(new ConsoleProgressBar(this))
 84      else 
 85       None
 86     
 87 
 88   // 9. spark ui, 使用jetty 实现
 89   _ui =
 90     if (conf.getBoolean("spark.ui.enabled", true)) 
 91       Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "",
 92         startTime))
 93      else 
 94       // For tests, do not enable the UI
 95       None
 96     
 97   // Bind the UI before starting the task scheduler to communicate
 98   // the bound port to the cluster manager properly
 99   _ui.foreach(_.bind())
100 
101   // 10. 创建 hadoop configuration
102   _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
103 
104   // 11. Add each JAR given through the constructor
105   if (jars != null) 
106     jars.foreach(addJar)
107   
108 
109   if (files != null) 
110     files.foreach(addFile)
111   
112   // 12. 计算 executor 的内存
113   _executorMemory = _conf.getOption("spark.executor.memory")
114     .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
115     .orElse(Option(System.getenv("SPARK_MEM"))
116     .map(warnSparkMem))
117     .map(Utils.memoryStringToMb)
118     .getOrElse(1024)
119 
120   // Convert java options to env vars as a work around
121   // since we can‘t set env vars directly in sbt.
122   for  (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))
123     value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey))) 
124     executorEnvs(envKey) = value
125   
126   Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach  v =>
127     executorEnvs("SPARK_PREPEND_CLASSES") = v
128   
129   // The Mesos scheduler backend relies on this environment variable to set executor memory.
130   // TODO: Set this only in the Mesos scheduler.
131   executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
132   executorEnvs ++= _conf.getExecutorEnv
133   executorEnvs("SPARK_USER") = sparkUser
134 
135   // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
136   // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
137   // 13. 创建 HeartbeatReceiver endpoint
138   _heartbeatReceiver = env.rpcEnv.setupEndpoint(
139     HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
140 
141   // Create and start the scheduler
142   // 14. 创建 task scheduler 和 scheduler backend
143   val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
144   _schedulerBackend = sched
145   _taskScheduler = ts
146   // 15. 创建DAGScheduler实例
147   _dagScheduler = new DAGScheduler(this)
148   _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
149 
150   // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler‘s
151   // constructor
152   // 16. 启动 task scheduler
153   _taskScheduler.start()
154 
155   // 17. 从task scheduler 获取 application ID
156   _applicationId = _taskScheduler.applicationId()
157   // 18. 从 task scheduler 获取 application attempt id
158   _applicationAttemptId = taskScheduler.applicationAttemptId()
159   _conf.set("spark.app.id", _applicationId)
160   if (_conf.getBoolean("spark.ui.reverseProxy", false)) 
161     System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId)
162   
163   // 19. 为ui 设置 application id
164   _ui.foreach(_.setAppId(_applicationId))
165   // 20. 初始化 block manager
166   _env.blockManager.initialize(_applicationId)
167 
168   // The metrics system for Driver need to be set spark.app.id to app ID.
169   // So it should start after we get app ID from the task scheduler and set spark.app.id.
170   // 21. 启动 metricsSystem
171   _env.metricsSystem.start()
172   // Attach the driver metrics servlet handler to the web ui after the metrics system is started.
173   // 22. 将 metricSystem 的 servlet handler 给 ui 用
174   _env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
175 
176   // 23. 初始化 event logger listener
177   _eventLogger =
178     if (isEventLogEnabled) 
179       val logger =
180         new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
181           _conf, _hadoopConfiguration)
182       logger.start()
183       listenerBus.addToEventLogQueue(logger)
184       Some(logger)
185      else 
186       None
187     
188 
189   // Optionally scale number of executors dynamically based on workload. Exposed for testing.
190   // 24. 如果启用了动态分配 executor, 需要实例化 executorAllocationManager 并启动之
191   val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
192   _executorAllocationManager =
193     if (dynamicAllocationEnabled) 
194       schedulerBackend match 
195         case b: ExecutorAllocationClient =>
196           Some(new ExecutorAllocationManager(
197             schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
198             _env.blockManager.master))
199         case _ =>
200           None
201       
202      else 
203       None
204     
205   _executorAllocationManager.foreach(_.start())
206 
207   // 25. 初始化 ContextCleaner,并启动之
208   _cleaner =
209     if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) 
210       Some(new ContextCleaner(this))
211      else 
212       None
213     
214   _cleaner.foreach(_.start())
215   // 26. 建立并启动 listener bus
216   setupAndStartListenerBus()
217   // 27.  task scheduler 已就绪,发送环境已更新请求
218   postEnvironmentUpdate()
219   // 28.  发送 application start 请求事件
220   postApplicationStart()
221 
222   // Post init
223   // 29.等待 直至task scheduler backend 准备好了
224   _taskScheduler.postStartHook()
225   // 30. 注册 dagScheduler metricsSource
226   _env.metricsSystem.registerSource(_dagScheduler.metricsSource)
227   // 31. 注册 metric source
228   _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
229   //32. 注册 metric source
230   _executorAllocationManager.foreach  e =>
231     _env.metricsSystem.registerSource(e.executorAllocationManagerSource)
232   
233 
234   // Make sure the context is stopped if the user forgets about it. This avoids leaving
235   // unfinished event logs around after the JVM exits cleanly. It doesn‘t help if the JVM
236   // is killed, though.
237   logDebug("Adding shutdown hook") // force eager creation of logger
238   // 33. 设置 shutdown hook, 在spark context 关闭时,要做的回调操作
239   _shutdownHookRef = ShutdownHookManager.addShutdownHook(
240     ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY)  () =>
241     logInfo("Invoking stop() from shutdown hook")
242     try 
243       stop()
244      catch 
245       case e: Throwable =>
246         logWarning("Ignoring Exception while stopping SparkContext from shutdown hook", e)
247     
248   
249  catch 
250   case NonFatal(e) =>
251     logError("Error initializing SparkContext.", e)
252     try 
253       stop()
254      catch 
255       case NonFatal(inner) =>
256         logError("Error stopping SparkContext after init error.", inner)
257      finally 
258       throw e
259     
260  

  从上面可以看出,spark context 的初始化是非常复杂的,涉及的spark 组件很多,包括 异步事务总线系统LiveListenerBus、SparkEnv、SparkUI、DAGScheduler、metrics监测系统、EventLoggingListener、TaskScheduler、ExecutorAllocationManager、ContextCleaner等等。先暂且当作是总述,后面对部分组件会有比较全面的剖析。

以上是关于spark 源码分析之二 -- SparkContext 的初始化过程的主要内容,如果未能解决你的问题,请参考以下文章

spark streaming 接收kafka消息之二 -- 运行在driver端的receiver

spark分组取 topN

谈谈MySQL死锁之二 死锁检测和处理源码分析

nova创建虚拟机流程源码系列分析之二 wsgi模型

Tornado 高并发源码分析之二---Tornado启动和请求处理流程

flume-ng源码分析-整体架构之二常用架构篇