Spark源码剖析——SparkContext

Posted Icedzzz

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark源码剖析——SparkContext相关的知识,希望对你有一定的参考价值。


SparkContext:通常而言,用户开发的Spark应用程序的提交与执行都离不开SparkContex的支持。在正式提交应用程序之前, 首先需要初始化SparkContext。SparkContext隐藏了网络通信、分布式部署、消息通信、存储体系、计算引擎、度量系统、文件服务、Web UI等内容,应用程序开发者只需要使用SparkContext提供的API完成功能开发。但SparkContext中最重要的两个功能就是: DAGScheduler和TaskScheduler

初始化SparkContext,创建DAG/TaskScheduler

SparkContext成员变量: SparkConf、日志目录、SparkEnv(保存运行中的Spark实例,包括序列化起、RPCEnv、块管理器等,Spark可以通过一个全局变量找到SparkEnv,所有的线程都可以访问同一个SparkEnv)、SparkUI、心跳线程、DAG/TaskScheduler等

 private var _conf: SparkConf = _
  private var _eventLogDir: Option[URI] = None
  private var _eventLogCodec: Option[String] = None
  private var _listenerBus: LiveListenerBus = _
  private var _env: SparkEnv = _
  private var _statusTracker: SparkStatusTracker = _
  private var _progressBar: Option[ConsoleProgressBar] = None
  private var _ui: Option[SparkUI] = None
  private var _hadoopConfiguration: Configuration = _
  private var _executorMemory: Int = _
  private var _schedulerBackend: SchedulerBackend = _
  private var _taskScheduler: TaskScheduler = _
  private var _heartbeatReceiver: RpcEndpointRef = _
  @volatile private var _dagScheduler: DAGScheduler = _
  private var _applicationId: String = _
  private var _applicationAttemptId: Option[String] = None
  private var _eventLogger: Option[EventLoggingListener] = None
  private var _executorAllocationManager: Option[ExecutorAllocationManager] = None
  private var _cleaner: Option[ContextCleaner] = None
  private var _listenerBusStarted: Boolean = false
  private var _jars: Seq[String] = _
  private var _files: Seq[String] = _
  private var _shutdownHookRef: AnyRef = _
  private var _statusStore: AppStatusStore = _

初始化上下文:

  1. 检查验证SparkConf
   _conf = config.clone()
    _conf.validateSettings()

    if (!_conf.contains("spark.master")) {
      throw new SparkException("A master URL must be set in your configuration")
    }
    if (!_conf.contains("spark.app.name")) {
      throw new SparkException("An application name must be set in your configuration")
    }

    // log out spark.app.name in the Spark driver logs
    logInfo(s"Submitted application: $appName")

    // System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster
    if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) {
      throw new SparkException("Detected yarn cluster mode, but isn't running on a cluster. " +
        "Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")
    }
    .....
  1. 设置默认参数
 _jars = Utils.getUserJars(_conf)
    _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
      .toSeq.flatten
    _eventLogDir =
      if (isEventLogEnabled) {
        val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
          .stripSuffix("/")
        Some(Utils.resolveURI(unresolvedDir))
      } else {
        None
      }
 _statusTracker = new SparkStatusTracker(this, _statusStore)
 _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
 ...
  1. 创建常见Env
    SparkEnv: Spark执行环境(缓存、映射输出跟踪器等)
    创建Driver、executor的Env,均是调用create方法:
private[spark] def createDriverEnv(
      conf: SparkConf,
      isLocal: Boolean,
      listenerBus: LiveListenerBus,
      numCores: Int,
      mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
    val bindAddress = conf.get(DRIVER_BIND_ADDRESS)
    val advertiseAddress = conf.get(DRIVER_HOST_ADDRESS)
    val port = conf.get("spark.driver.port").toInt
    val ioEncryptionKey = if (conf.get(IO_ENCRYPTION_ENABLED)) {
      Some(CryptoStreamUtils.createKey(conf))
    } else {
      None
    }
    create(
      conf,
      SparkContext.DRIVER_IDENTIFIER,
      bindAddress,
      advertiseAddress,
      Option(port),
      isLocal,
      numCores,
      ioEncryptionKey,
      listenerBus = listenerBus,
      mockOutputCommitCoordinator = mockOutputCommitCoordinator
    )
  }

create方法创建执行环境包括:

 private def create(
       conf: SparkConf,
      executorId: String,
      bindAddress: String,
      advertiseAddress: String,
      port: Option[Int],
      isLocal: Boolean,
      numUsableCores: Int,
      ioEncryptionKey: Option[Array[Byte]],
      listenerBus: LiveListenerBus = null,
      mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
      //安全管理器
       val securityManager = new SecurityManager(conf, ioEncryptionKey)
      //RPC环境
      val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
      securityManager, numUsableCores, !isDriver)
      //序列化器
      val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)
      //广播管理器
       val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
       //Shuffle管理器
       val shortShuffleMgrNames = Map(
      "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
      "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
    val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
    val shuffleMgrClass =
      shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
    val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
    //内存管理器:默认是统一内存管理模型,可选静态管理模型
       val memoryManager: MemoryManager =
      if (useLegacyMemoryManager) {
        new StaticMemoryManager(conf, numUsableCores)
      } else {
        UnifiedMemoryManager(conf, numUsableCores)
      }
      //块管理器
        val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
      serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,
      blockTransferService, securityManager, numUsableCores)
      // Spark Metrics System 定期轮询指标数据到sink/源处
       MetricsSystem.createMetricsSystem("driver", conf, securityManager)
	//返回SparkEnv对象
   val envInstance = new SparkEnv(
      executorId,
      rpcEnv,
      serializer,
      closureSerializer,
      serializerManager,
      mapOutputTracker,
      shuffleManager,
      broadcastManager,
      blockManager,
      securityManager,
      metricsSystem,
      memoryManager,
      outputCommitCoordinator,
      conf)
  1. 重要:创建Task/DAGScheduler
   val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
   _schedulerBackend = sched
   _taskScheduler = ts
   _dagScheduler = new DAGScheduler(this)
   _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
   //初始化完DAG/TaskScheduler后start
   _taskScheduler.start()

创建TaskScheduler:
只讨论StandAlone模式:
创建TaskSchedulerImpl,并由StandaloneSchedulerBackend接收TaskSchedulerImpl的控制,StandaloneSchedulerBackend会创建一个caseclass,将appName、maxCores、executorMemory等信息包装成ApplicationDescription,并创建一个AppClient,AppClient内部调用tryRegisterAllMasters 方法注册所有Master,一旦我们成功连接到一个Master,所有调度工作将被取消。

  private def createTaskScheduler(
      sc: SparkContext,
      master: String,
      deployMode: String): (SchedulerBackend, TaskScheduler) = {
      case SPARK_REGEX(sparkUrl) =>
       //1. 创建TaskSchedulerImpl,TaskScheduler的实际执行对象
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        //2. StandaloneSchedulerBackend包装TaskSchedulerImpl,接收TaskSchedulerImpl的控制
        val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        (backend, scheduler)
	}

StandaloneSchedulerBackend:
StandaloneSchedulerBackend内部接收SparkContext,获取SparkConf信息,调用start方法

 override def start() {
	//3. 将application的信息包装成appDesc
  val appDesc = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
      webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
     //4. 并创建AppClient,传入appDesc
    client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
    client.start()
    launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
    //等待Master注册成功
    waitForRegistration()
    launcherBackend.setState(SparkAppHandle.State.RUNNING)
}

StandaloneAppClient
StandaloneAppClient接收rpcEnv、所有Master的URL、Application信息发送给cluster manager(StandAlone中是Master):

override def onStart(): Unit = {
      try {
        registerWithMaster(1)
      } catch {
        case e: Exception =>
          logWarning("Failed to connect to master", e)
          markDisconnected()
          stop()
      }
 private def tryRegisterAllMasters(): Array[JFuture[_]] = {
      for (masterAddress <- masterRpcAddresses) yield {
        registerMasterThreadPool.submit(new Runnable {
          override def run(): Unit = try {
            if (registered.get) {
            //只要有一个Master响应获取注册App信息,则返回
              return
            }
            logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
            //5. 通过rpc向所有Master发送注册信息RegisterApplication
            val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
            masterRef.send(RegisterApplication(appDescription, self))
          } catch {
            case ie: InterruptedException => // Cancelled
            case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
          }
        })
      }
    }

最后,initialize中创建task调度器(FIFO/FIAR):

  private def createTaskScheduler
  ....
 case SPARK_REGEX(sparkUrl) =>
      val scheduler = new TaskSchedulerImpl(sc)
      val masterUrls = sparkUrl.split(",").map("spark://" + _)
      val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
      scheduler.initialize(backend)
      (backend, scheduler)

  def initialize(backend: SchedulerBackend) {
    this.backend = backend
    schedulableBuilder = {
      schedulingMode match {
        case SchedulingMode.FIFO =>
          new FIFOSchedulableBuilder(rootPool)
        case SchedulingMode.FAIR =>
          new FairSchedulableBuilder(rootPool, conf)
        case _ =>
          throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
          s"$schedulingMode")
      }
    }
    schedulableBuilder.buildPools()
  }

DAGScheduler:
DAGScheduler中最重要的是DAGSchedulerEventProcessLoop,负责接收各种事件和各组件通信

private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
  extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging {
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
    case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
      dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
    case StageCancelled(stageId, reason) =>
      dagScheduler.handleStageCancellation(stageId, reason)
    case JobCancelled(jobId, reason) =>
      dagScheduler.handleJobCancellation(jobId, reason)
    case JobGroupCancelled(groupId) =>
      dagScheduler.handleJobGroupCancelled(groupId)
    case AllJobsCancelled =>
      dagScheduler.doCancelAllJobs()
    case ExecutorAdded(execId, host) =>
      dagScheduler.handleExecutorAdded(execId, host)
	...
	...
   
  }
	//如果出现异常,则取消所有job
	  override def onError(e: Throwable): Unit = {
    logError("DAGSchedulerEventProcessLoop failed; shutting down SparkContext", e)
    try {
      dagScheduler.doCancelAllJobs()
    } catch {
      case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t)
    }
    dagScheduler.sc.stopInNewThread()
  }

SparkContext运行作业

  1. Spark在触发Action操作时,会调用SparkContext的runJob操作:
  def collect(): Array[T] = withScope {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  }
  1. 将作业提交给DAGScheduler
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
    if (stopped.get()) {
      throw new IllegalStateException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD's recursive dependencies:\\n" + rdd.toDebugString)
    }
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    //保存当前RDD,在Job完成之后调用父rdd
    rdd.doCheckpoint()
  }
  ....
  def runJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): Unit = {
   
    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
 	....
    }
  }

提交任务——submitJob

def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = {
   //JobWaiter等待Job被执行结束
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    //eventProcessLoop匹配事件
    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      SerializationUtils.clone(properties)))
    waiter
  }

//匹配到DAGSchedulerEventProcessLoop的event——JobSubmitted
//调用 dagScheduler.handleJobSubmitted方法
...
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
    case JobSubmitted(jobId, rdd, func,

以上是关于Spark源码剖析——SparkContext的主要内容,如果未能解决你的问题,请参考以下文章

Spark源码剖析——SparkContext

Spark源码剖析——SparkContext的初始化_创建任务调度器TaskScheduler

Spark源码剖析——SparkContext的初始化_TaskScheduler的启动

Spark源码剖析——SparkContext的初始化_创建和启动DAGScheduler

Spark源码剖析——SparkContext的初始化_Hadoop相关配置及Executor环境变量

Spark实战_SparkContext原理剖析与源码分析