Spark Sql 源码剖析(二): TreeNode
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark Sql 源码剖析(二): TreeNode相关的知识,希望对你有一定的参考价值。
参考技术A 使用 object CurrentOrigin 为 TreeNodes 提供一个可以查找上下文的地方,比如当前正在解析哪行 code。object CurrentOrigin 主要包含一个 private val value = new ThreadLocal[Origin]() ,目前 CurrentOrigin 仅在 parser 中使用,在 visit 每个节点的时候都会使用,记录当前 parse 的节点是哪行哪列
另外,从 value 是 ThreadLocal 类型可以看出,在 Spark SQL 中,parse sql 时都是在单独的 thread 里进行的(不同的 sql 不同的 thread)
返回该节点的 seq of children,children 是不可变的。有三种情况:
查找第一个符合 f 条件(比如某个类型的)的 TreeNode,先序遍历。
将函数 f 递归应用于节点及其子节点
与 foreach 不同的是,foreach 先应用于 parent,再应用与 child;而 foreachUp 是先应用于 child 再应用与 parent
调用 foreach,foreach 中应用的函数是 ret += f(_) ,最终返回一个 seq,包含将 f 通过 foreach 方式应用于所有节点并 add 到 ret。其中 f 本身是 BaseType => A 类型
原理与 map 一致,只是 f 变成了 BaseType => TraversableOnce[A]
PartialFunction#lift :将 partial func 转换为一个返回 Option 结果的函数。将 pf 函数应用于符合 pf 定义的节点(即 pf.lift(node)返回的 Option 不是 None )并都 add 到 ret = new collection.mutable.ArrayBuffer[B] 以 Seq 形式返回
以 Seq 的形式返回 tree 的所有叶子节点
def collectFirst[B](pf: PartialFunction[BaseType, B]): Option[B] :注意,因为可能没有符合 pf 定义的节点,所有返回的 Option 可能是 None
相当于 productIterator.map(f).toArray ,即对于 productIterator 每个元素执行 f 然后将 ret 组成一个 arr 返回
注意:TreeNode 没有实现 Product 相关方法,都由其子类自行实现
使用 new children 替换并返回该节点的拷贝。该方法会对 productElement 每个元素进行模式匹配,根据节点类型及一定规则进行替换。
调用 transformDown
rule: PartialFunction[BaseType, BaseType]
返回 f 应用于所有子节点(非递归,一般将递归操作放在调用该函数的地方)后该节点的 copy。其内部的原理是调用 mapProductIterator,对每一个 productElement(i) 进行各种模式匹配,若能匹配上某个再根据一定规则进行转换,核心匹配转换如下:
以上都是适用于有 children 的 node,如果是 children 为 null 的 node 直接返回
反射生成节点副本
返回该类型 TreeNode 的 name,默认为 class name;注意,会移除物理操作的 Exec$ 前缀
所有应该以该节点内嵌套树表示的 nodes,比如,可以被用来表示 sub-queries
(children ++ innerChildren).toSet[TreeNode[_]]
主要用于交互式 debug,返回该 tree 指定下标的节点,num 可以在 numberedTreeString 找到。最终调用的
我的博客即将搬运同步至腾讯云+社区,邀请大家一同入驻: https://cloud.tencent.com/developer/support-plan?invite_code=x2lzoxh4s5hi
Spark源码剖析——SparkContext
文章目录
SparkContext:通常而言,用户开发的Spark应用程序的提交与执行都离不开SparkContex的支持。在正式提交应用程序之前, 首先需要初始化SparkContext。SparkContext隐藏了网络通信、分布式部署、消息通信、存储体系、计算引擎、度量系统、文件服务、Web UI等内容,应用程序开发者只需要使用SparkContext提供的API完成功能开发。但SparkContext中最重要的两个功能就是: DAGScheduler和TaskScheduler
初始化SparkContext,创建DAG/TaskScheduler
SparkContext成员变量: SparkConf、日志目录、SparkEnv(保存运行中的Spark实例,包括序列化起、RPCEnv、块管理器等,Spark可以通过一个全局变量找到SparkEnv,所有的线程都可以访问同一个SparkEnv)、SparkUI、心跳线程、DAG/TaskScheduler等
private var _conf: SparkConf = _
private var _eventLogDir: Option[URI] = None
private var _eventLogCodec: Option[String] = None
private var _listenerBus: LiveListenerBus = _
private var _env: SparkEnv = _
private var _statusTracker: SparkStatusTracker = _
private var _progressBar: Option[ConsoleProgressBar] = None
private var _ui: Option[SparkUI] = None
private var _hadoopConfiguration: Configuration = _
private var _executorMemory: Int = _
private var _schedulerBackend: SchedulerBackend = _
private var _taskScheduler: TaskScheduler = _
private var _heartbeatReceiver: RpcEndpointRef = _
@volatile private var _dagScheduler: DAGScheduler = _
private var _applicationId: String = _
private var _applicationAttemptId: Option[String] = None
private var _eventLogger: Option[EventLoggingListener] = None
private var _executorAllocationManager: Option[ExecutorAllocationManager] = None
private var _cleaner: Option[ContextCleaner] = None
private var _listenerBusStarted: Boolean = false
private var _jars: Seq[String] = _
private var _files: Seq[String] = _
private var _shutdownHookRef: AnyRef = _
private var _statusStore: AppStatusStore = _
初始化上下文:
- 检查验证SparkConf
_conf = config.clone()
_conf.validateSettings()
if (!_conf.contains("spark.master"))
throw new SparkException("A master URL must be set in your configuration")
if (!_conf.contains("spark.app.name"))
throw new SparkException("An application name must be set in your configuration")
// log out spark.app.name in the Spark driver logs
logInfo(s"Submitted application: $appName")
// System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster
if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id"))
throw new SparkException("Detected yarn cluster mode, but isn't running on a cluster. " +
"Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")
.....
- 设置默认参数
_jars = Utils.getUserJars(_conf)
_files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
.toSeq.flatten
_eventLogDir =
if (isEventLogEnabled)
val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
.stripSuffix("/")
Some(Utils.resolveURI(unresolvedDir))
else
None
_statusTracker = new SparkStatusTracker(this, _statusStore)
_hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
...
- 创建常见Env
SparkEnv: Spark执行环境(缓存、映射输出跟踪器等)
创建Driver、executor的Env,均是调用create方法:
private[spark] def createDriverEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus,
numCores: Int,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv =
val bindAddress = conf.get(DRIVER_BIND_ADDRESS)
val advertiseAddress = conf.get(DRIVER_HOST_ADDRESS)
val port = conf.get("spark.driver.port").toInt
val ioEncryptionKey = if (conf.get(IO_ENCRYPTION_ENABLED))
Some(CryptoStreamUtils.createKey(conf))
else
None
create(
conf,
SparkContext.DRIVER_IDENTIFIER,
bindAddress,
advertiseAddress,
Option(port),
isLocal,
numCores,
ioEncryptionKey,
listenerBus = listenerBus,
mockOutputCommitCoordinator = mockOutputCommitCoordinator
)
create方法创建执行环境包括:
private def create(
conf: SparkConf,
executorId: String,
bindAddress: String,
advertiseAddress: String,
port: Option[Int],
isLocal: Boolean,
numUsableCores: Int,
ioEncryptionKey: Option[Array[Byte]],
listenerBus: LiveListenerBus = null,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv =
//安全管理器
val securityManager = new SecurityManager(conf, ioEncryptionKey)
//RPC环境
val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
securityManager, numUsableCores, !isDriver)
//序列化器
val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)
//广播管理器
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
//Shuffle管理器
val shortShuffleMgrNames = Map(
"sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
"tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
val shuffleMgrClass =
shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
//内存管理器:默认是统一内存管理模型,可选静态管理模型
val memoryManager: MemoryManager =
if (useLegacyMemoryManager)
new StaticMemoryManager(conf, numUsableCores)
else
UnifiedMemoryManager(conf, numUsableCores)
//块管理器
val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,
blockTransferService, securityManager, numUsableCores)
// Spark Metrics System 定期轮询指标数据到sink/源处
MetricsSystem.createMetricsSystem("driver", conf, securityManager)
//返回SparkEnv对象
val envInstance = new SparkEnv(
executorId,
rpcEnv,
serializer,
closureSerializer,
serializerManager,
mapOutputTracker,
shuffleManager,
broadcastManager,
blockManager,
securityManager,
metricsSystem,
memoryManager,
outputCommitCoordinator,
conf)
- 重要:创建Task/DAGScheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
//初始化完DAG/TaskScheduler后start
_taskScheduler.start()
创建TaskScheduler:
只讨论StandAlone模式:
创建TaskSchedulerImpl,并由StandaloneSchedulerBackend接收TaskSchedulerImpl的控制,StandaloneSchedulerBackend会创建一个caseclass,将appName、maxCores、executorMemory等信息包装成ApplicationDescription,并创建一个AppClient,AppClient内部调用tryRegisterAllMasters 方法注册所有Master,一旦我们成功连接到一个Master,所有调度工作将被取消。
private def createTaskScheduler(
sc: SparkContext,
master: String,
deployMode: String): (SchedulerBackend, TaskScheduler) =
case SPARK_REGEX(sparkUrl) =>
//1. 创建TaskSchedulerImpl,TaskScheduler的实际执行对象
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
//2. StandaloneSchedulerBackend包装TaskSchedulerImpl,接收TaskSchedulerImpl的控制
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
(backend, scheduler)
StandaloneSchedulerBackend:
StandaloneSchedulerBackend内部接收SparkContext,获取SparkConf信息,调用start方法
override def start()
//3. 将application的信息包装成appDesc
val appDesc = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
//4. 并创建AppClient,传入appDesc
client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
client.start()
launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
//等待Master注册成功
waitForRegistration()
launcherBackend.setState(SparkAppHandle.State.RUNNING)
StandaloneAppClient
StandaloneAppClient接收rpcEnv、所有Master的URL、Application信息发送给cluster manager(StandAlone中是Master):
override def onStart(): Unit =
try
registerWithMaster(1)
catch
case e: Exception =>
logWarning("Failed to connect to master", e)
markDisconnected()
stop()
private def tryRegisterAllMasters(): Array[JFuture[_]] =
for (masterAddress <- masterRpcAddresses) yield
registerMasterThreadPool.submit(new Runnable
override def run(): Unit = try
if (registered.get)
//只要有一个Master响应获取注册App信息,则返回
return
logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
//5. 通过rpc向所有Master发送注册信息RegisterApplication
val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
masterRef.send(RegisterApplication(appDescription, self))
catch
case ie: InterruptedException => // Cancelled
case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
)
最后,initialize中创建task调度器(FIFO/FIAR):
private def createTaskScheduler
....
case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
(backend, scheduler)
def initialize(backend: SchedulerBackend)
this.backend = backend
schedulableBuilder =
schedulingMode match
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
case _ =>
throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
s"$schedulingMode")
schedulableBuilder.buildPools()
DAGScheduler:
DAGScheduler中最重要的是DAGSchedulerEventProcessLoop,负责接收各种事件和各组件通信
private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
case StageCancelled(stageId, reason) =>
dagScheduler.handleStageCancellation(stageId, reason)
case JobCancelled(jobId, reason) =>
dagScheduler.handleJobCancellation(jobId, reason)
case JobGroupCancelled(groupId) =>
dagScheduler.handleJobGroupCancelled(groupId)
case AllJobsCancelled =>
dagScheduler.doCancelAllJobs()
case ExecutorAdded(execId, host) =>
dagScheduler.handleExecutorAdded(execId, host)
...
...
//如果出现异常,则取消所有job
override def onError(e: Throwable): Unit =
logError("DAGSchedulerEventProcessLoop failed; shutting down SparkContext", e)
try
dagScheduler.doCancelAllJobs()
catch
case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t)
dagScheduler.sc.stopInNewThread()
SparkContext运行作业
- Spark在触发Action操作时,会调用SparkContext的runJob操作:
def collect(): Array[T] = withScope
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
- 将作业提交给DAGScheduler
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit =
if (stopped.get())
throw new IllegalStateException("SparkContext has been shutdown")
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false))
logInfo("RDD's recursive dependencies:\\n" + rdd.toDebugString)
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
//保存当前RDD,在Job完成之后调用父rdd
rdd.doCheckpoint()
....
def runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit =
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
....
提交任务——submitJob
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] =
//JobWaiter等待Job被执行结束
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
//eventProcessLoop匹配事件
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
waiter
//匹配到DAGSchedulerEventProcessLoop的event——JobSubmitted
//调用 dagScheduler.handleJobSubmitted方法
...
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) 以上是关于Spark Sql 源码剖析(二): TreeNode的主要内容,如果未能解决你的问题,请参考以下文章