Spark Sql 源码剖析(二): TreeNode

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark Sql 源码剖析(二): TreeNode相关的知识,希望对你有一定的参考价值。

参考技术A 使用 object CurrentOrigin 为 TreeNodes 提供一个可以查找上下文的地方,比如当前正在解析哪行 code。

object CurrentOrigin 主要包含一个 private val value = new ThreadLocal[Origin]() ,目前 CurrentOrigin 仅在 parser 中使用,在 visit 每个节点的时候都会使用,记录当前 parse 的节点是哪行哪列

另外,从 value 是 ThreadLocal 类型可以看出,在 Spark SQL 中,parse sql 时都是在单独的 thread 里进行的(不同的 sql 不同的 thread)

返回该节点的 seq of children,children 是不可变的。有三种情况:

查找第一个符合 f 条件(比如某个类型的)的 TreeNode,先序遍历。

将函数 f 递归应用于节点及其子节点

与 foreach 不同的是,foreach 先应用于 parent,再应用与 child;而 foreachUp 是先应用于 child 再应用与 parent

调用 foreach,foreach 中应用的函数是 ret += f(_) ,最终返回一个 seq,包含将 f 通过 foreach 方式应用于所有节点并 add 到 ret。其中 f 本身是 BaseType => A 类型

原理与 map 一致,只是 f 变成了 BaseType => TraversableOnce[A]

PartialFunction#lift :将 partial func 转换为一个返回 Option 结果的函数。将 pf 函数应用于符合 pf 定义的节点(即 pf.lift(node)返回的 Option 不是 None )并都 add 到 ret = new collection.mutable.ArrayBuffer[B] 以 Seq 形式返回

以 Seq 的形式返回 tree 的所有叶子节点

def collectFirst[B](pf: PartialFunction[BaseType, B]): Option[B] :注意,因为可能没有符合 pf 定义的节点,所有返回的 Option 可能是 None

相当于 productIterator.map(f).toArray ,即对于 productIterator 每个元素执行 f 然后将 ret 组成一个 arr 返回

注意:TreeNode 没有实现 Product 相关方法,都由其子类自行实现

使用 new children 替换并返回该节点的拷贝。该方法会对 productElement 每个元素进行模式匹配,根据节点类型及一定规则进行替换。

调用 transformDown

rule: PartialFunction[BaseType, BaseType]

返回 f 应用于所有子节点(非递归,一般将递归操作放在调用该函数的地方)后该节点的 copy。其内部的原理是调用 mapProductIterator,对每一个 productElement(i) 进行各种模式匹配,若能匹配上某个再根据一定规则进行转换,核心匹配转换如下:

以上都是适用于有 children 的 node,如果是 children 为 null 的 node 直接返回

反射生成节点副本

返回该类型 TreeNode 的 name,默认为 class name;注意,会移除物理操作的 Exec$ 前缀

所有应该以该节点内嵌套树表示的 nodes,比如,可以被用来表示 sub-queries

(children ++ innerChildren).toSet[TreeNode[_]]

主要用于交互式 debug,返回该 tree 指定下标的节点,num 可以在 numberedTreeString 找到。最终调用的

我的博客即将搬运同步至腾讯云+社区,邀请大家一同入驻: https://cloud.tencent.com/developer/support-plan?invite_code=x2lzoxh4s5hi

Spark源码剖析——SparkContext

文章目录


SparkContext:通常而言,用户开发的Spark应用程序的提交与执行都离不开SparkContex的支持。在正式提交应用程序之前, 首先需要初始化SparkContext。SparkContext隐藏了网络通信、分布式部署、消息通信、存储体系、计算引擎、度量系统、文件服务、Web UI等内容,应用程序开发者只需要使用SparkContext提供的API完成功能开发。但SparkContext中最重要的两个功能就是: DAGScheduler和TaskScheduler

初始化SparkContext,创建DAG/TaskScheduler

SparkContext成员变量: SparkConf、日志目录、SparkEnv(保存运行中的Spark实例,包括序列化起、RPCEnv、块管理器等,Spark可以通过一个全局变量找到SparkEnv,所有的线程都可以访问同一个SparkEnv)、SparkUI、心跳线程、DAG/TaskScheduler等

 private var _conf: SparkConf = _
  private var _eventLogDir: Option[URI] = None
  private var _eventLogCodec: Option[String] = None
  private var _listenerBus: LiveListenerBus = _
  private var _env: SparkEnv = _
  private var _statusTracker: SparkStatusTracker = _
  private var _progressBar: Option[ConsoleProgressBar] = None
  private var _ui: Option[SparkUI] = None
  private var _hadoopConfiguration: Configuration = _
  private var _executorMemory: Int = _
  private var _schedulerBackend: SchedulerBackend = _
  private var _taskScheduler: TaskScheduler = _
  private var _heartbeatReceiver: RpcEndpointRef = _
  @volatile private var _dagScheduler: DAGScheduler = _
  private var _applicationId: String = _
  private var _applicationAttemptId: Option[String] = None
  private var _eventLogger: Option[EventLoggingListener] = None
  private var _executorAllocationManager: Option[ExecutorAllocationManager] = None
  private var _cleaner: Option[ContextCleaner] = None
  private var _listenerBusStarted: Boolean = false
  private var _jars: Seq[String] = _
  private var _files: Seq[String] = _
  private var _shutdownHookRef: AnyRef = _
  private var _statusStore: AppStatusStore = _

初始化上下文:

  1. 检查验证SparkConf
   _conf = config.clone()
    _conf.validateSettings()

    if (!_conf.contains("spark.master")) 
      throw new SparkException("A master URL must be set in your configuration")
    
    if (!_conf.contains("spark.app.name")) 
      throw new SparkException("An application name must be set in your configuration")
    

    // log out spark.app.name in the Spark driver logs
    logInfo(s"Submitted application: $appName")

    // System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster
    if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) 
      throw new SparkException("Detected yarn cluster mode, but isn't running on a cluster. " +
        "Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")
    
    .....
  1. 设置默认参数
 _jars = Utils.getUserJars(_conf)
    _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
      .toSeq.flatten
    _eventLogDir =
      if (isEventLogEnabled) 
        val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
          .stripSuffix("/")
        Some(Utils.resolveURI(unresolvedDir))
       else 
        None
      
 _statusTracker = new SparkStatusTracker(this, _statusStore)
 _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
 ...
  1. 创建常见Env
    SparkEnv: Spark执行环境(缓存、映射输出跟踪器等)
    创建Driver、executor的Env,均是调用create方法:
private[spark] def createDriverEnv(
      conf: SparkConf,
      isLocal: Boolean,
      listenerBus: LiveListenerBus,
      numCores: Int,
      mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = 
    val bindAddress = conf.get(DRIVER_BIND_ADDRESS)
    val advertiseAddress = conf.get(DRIVER_HOST_ADDRESS)
    val port = conf.get("spark.driver.port").toInt
    val ioEncryptionKey = if (conf.get(IO_ENCRYPTION_ENABLED)) 
      Some(CryptoStreamUtils.createKey(conf))
     else 
      None
    
    create(
      conf,
      SparkContext.DRIVER_IDENTIFIER,
      bindAddress,
      advertiseAddress,
      Option(port),
      isLocal,
      numCores,
      ioEncryptionKey,
      listenerBus = listenerBus,
      mockOutputCommitCoordinator = mockOutputCommitCoordinator
    )
  

create方法创建执行环境包括:

 private def create(
       conf: SparkConf,
      executorId: String,
      bindAddress: String,
      advertiseAddress: String,
      port: Option[Int],
      isLocal: Boolean,
      numUsableCores: Int,
      ioEncryptionKey: Option[Array[Byte]],
      listenerBus: LiveListenerBus = null,
      mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = 
      //安全管理器
       val securityManager = new SecurityManager(conf, ioEncryptionKey)
      //RPC环境
      val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
      securityManager, numUsableCores, !isDriver)
      //序列化器
      val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)
      //广播管理器
       val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
       //Shuffle管理器
       val shortShuffleMgrNames = Map(
      "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
      "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
    val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
    val shuffleMgrClass =
      shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
    val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
    //内存管理器:默认是统一内存管理模型,可选静态管理模型
       val memoryManager: MemoryManager =
      if (useLegacyMemoryManager) 
        new StaticMemoryManager(conf, numUsableCores)
       else 
        UnifiedMemoryManager(conf, numUsableCores)
      
      //块管理器
        val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
      serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,
      blockTransferService, securityManager, numUsableCores)
      // Spark Metrics System 定期轮询指标数据到sink/源处
       MetricsSystem.createMetricsSystem("driver", conf, securityManager)
	//返回SparkEnv对象
   val envInstance = new SparkEnv(
      executorId,
      rpcEnv,
      serializer,
      closureSerializer,
      serializerManager,
      mapOutputTracker,
      shuffleManager,
      broadcastManager,
      blockManager,
      securityManager,
      metricsSystem,
      memoryManager,
      outputCommitCoordinator,
      conf)
  1. 重要:创建Task/DAGScheduler
   val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
   _schedulerBackend = sched
   _taskScheduler = ts
   _dagScheduler = new DAGScheduler(this)
   _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
   //初始化完DAG/TaskScheduler后start
   _taskScheduler.start()

创建TaskScheduler:
只讨论StandAlone模式:
创建TaskSchedulerImpl,并由StandaloneSchedulerBackend接收TaskSchedulerImpl的控制,StandaloneSchedulerBackend会创建一个caseclass,将appName、maxCores、executorMemory等信息包装成ApplicationDescription,并创建一个AppClient,AppClient内部调用tryRegisterAllMasters 方法注册所有Master,一旦我们成功连接到一个Master,所有调度工作将被取消。

  private def createTaskScheduler(
      sc: SparkContext,
      master: String,
      deployMode: String): (SchedulerBackend, TaskScheduler) = 
      case SPARK_REGEX(sparkUrl) =>
       //1. 创建TaskSchedulerImpl,TaskScheduler的实际执行对象
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        //2. StandaloneSchedulerBackend包装TaskSchedulerImpl,接收TaskSchedulerImpl的控制
        val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        (backend, scheduler)
	

StandaloneSchedulerBackend:
StandaloneSchedulerBackend内部接收SparkContext,获取SparkConf信息,调用start方法

 override def start() 
	//3. 将application的信息包装成appDesc
  val appDesc = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
      webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
     //4. 并创建AppClient,传入appDesc
    client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
    client.start()
    launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
    //等待Master注册成功
    waitForRegistration()
    launcherBackend.setState(SparkAppHandle.State.RUNNING)

StandaloneAppClient
StandaloneAppClient接收rpcEnv、所有Master的URL、Application信息发送给cluster manager(StandAlone中是Master):

override def onStart(): Unit = 
      try 
        registerWithMaster(1)
       catch 
        case e: Exception =>
          logWarning("Failed to connect to master", e)
          markDisconnected()
          stop()
      
 private def tryRegisterAllMasters(): Array[JFuture[_]] = 
      for (masterAddress <- masterRpcAddresses) yield 
        registerMasterThreadPool.submit(new Runnable 
          override def run(): Unit = try 
            if (registered.get) 
            //只要有一个Master响应获取注册App信息,则返回
              return
            
            logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
            //5. 通过rpc向所有Master发送注册信息RegisterApplication
            val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
            masterRef.send(RegisterApplication(appDescription, self))
           catch 
            case ie: InterruptedException => // Cancelled
            case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
          
        )
      
    

最后,initialize中创建task调度器(FIFO/FIAR):

  private def createTaskScheduler
  ....
 case SPARK_REGEX(sparkUrl) =>
      val scheduler = new TaskSchedulerImpl(sc)
      val masterUrls = sparkUrl.split(",").map("spark://" + _)
      val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
      scheduler.initialize(backend)
      (backend, scheduler)

  def initialize(backend: SchedulerBackend) 
    this.backend = backend
    schedulableBuilder = 
      schedulingMode match 
        case SchedulingMode.FIFO =>
          new FIFOSchedulableBuilder(rootPool)
        case SchedulingMode.FAIR =>
          new FairSchedulableBuilder(rootPool, conf)
        case _ =>
          throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
          s"$schedulingMode")
      
    
    schedulableBuilder.buildPools()
  

DAGScheduler:
DAGScheduler中最重要的是DAGSchedulerEventProcessLoop,负责接收各种事件和各组件通信

private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
  extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging 
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match 
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
    case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
      dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
    case StageCancelled(stageId, reason) =>
      dagScheduler.handleStageCancellation(stageId, reason)
    case JobCancelled(jobId, reason) =>
      dagScheduler.handleJobCancellation(jobId, reason)
    case JobGroupCancelled(groupId) =>
      dagScheduler.handleJobGroupCancelled(groupId)
    case AllJobsCancelled =>
      dagScheduler.doCancelAllJobs()
    case ExecutorAdded(execId, host) =>
      dagScheduler.handleExecutorAdded(execId, host)
	...
	...
   
  
	//如果出现异常,则取消所有job
	  override def onError(e: Throwable): Unit = 
    logError("DAGSchedulerEventProcessLoop failed; shutting down SparkContext", e)
    try 
      dagScheduler.doCancelAllJobs()
     catch 
      case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t)
    
    dagScheduler.sc.stopInNewThread()
  

SparkContext运行作业

  1. Spark在触发Action操作时,会调用SparkContext的runJob操作:
  def collect(): Array[T] = withScope 
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  
  1. 将作业提交给DAGScheduler
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = 
    if (stopped.get()) 
      throw new IllegalStateException("SparkContext has been shutdown")
    
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) 
      logInfo("RDD's recursive dependencies:\\n" + rdd.toDebugString)
    
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    //保存当前RDD,在Job完成之后调用父rdd
    rdd.doCheckpoint()
  
  ....
  def runJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): Unit = 
   
    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
 	....
    
  

提交任务——submitJob

def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = 
   //JobWaiter等待Job被执行结束
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    //eventProcessLoop匹配事件
    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      SerializationUtils.clone(properties)))
    waiter
  

//匹配到DAGSchedulerEventProcessLoop的event——JobSubmitted
//调用 dagScheduler.handleJobSubmitted方法
...
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match 
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) 以上是关于Spark Sql 源码剖析(二): TreeNode的主要内容,如果未能解决你的问题,请参考以下文章

spark 源码分析之十五 -- Spark内存管理剖析

Spark 源码分析系列

Spark源码剖析:stage划分原理与源码剖析

Spark SQL源码解析Antlr4解析Sql并生成树

Spark源码剖析:如何将spark源码导入到IDEA中

《Apache Spark源码剖析》学习笔记之Spark作业提交