大数据(8y)Spark3.0内核
Posted 小基基o_O
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据(8y)Spark3.0内核相关的知识,希望对你有一定的参考价值。
文章目录
1、Spark On YARN 部署模式的运行机制
- 任务提交后,启动Driver;
- Driver向集群管理器注册应用程序;
- 集群管理器根据此任务的配置文件分配Executor并启动;
- Driver开始执行
main
函数,当执行到行动算子时开始反向推算,根据宽依赖进行Stage的划分,随后每一个Stage对应一个Taskset,Taskset中有多个Task,查找可用资源Executor进行调度; - 根据本地化原则,Task会被分发到指定的Executor去执行,在任务执行的过程中,Executor也会不断与Driver进行通信,报告任务运行情况。
1.1、Spark On YARN Cluster
spark-submit \\
--master yarn \\
--deploy-mode cluster \\
--class org.apache.spark.examples.SparkPi \\
$SPARK_HOME/examples/jars/spark-examples_2.12-3.0.0.jar \\
999
提交Spark任务,并配置
--master yarn
和--deploy-mode cluster
,然后查看Java进程
1.2、Spark On YARN Client
spark-submit \\
--master yarn \\
--class org.apache.spark.examples.SparkPi \\
$SPARK_HOME/examples/jars/spark-examples_2.12-3.0.0.jar \\
999
提交Spark任务,并配置
--master yarn
,然后查看Java进程
2、任务调度机制
- Job是以行动算子为界
- Stage是Job的子集,以RDD宽依赖为界
- Task是Stage的子集, 分 区 数 = T a s k 数 量 分区数=Task数量 分区数=Task数量
-
Stage级调度(DAGScheduler)
DAGScheduler负责Stage级的调度,主要是将Job切分成若干Stages,并将每个Stage打包成TaskSet交给TaskScheduler调度 -
Task级调度(TaskScheduler)
TaskScheduler负责Task级的调度,将TaskSet按照指定的调度策略分发到Executor上执行
调度过程中SchedulerBackend负责提供可用资源
源码解析简化图
3、Shuffle机制
在划分stage时
最后一个stage为ResultStage
前面的stage为ShuffleMapStage
,其结束伴随shuffle
sort-based shuffle流程简化图
1、写入内存缓冲
2、缓冲满后,溢写到临时文件
3、所有临时文件合并成一个数据文件,并创建一个索引文件
4、一个MapTask生成一个数据文件和一个索引文件
5、sort-based shuffle生成文件总数:MapTask数量 x 2
6、ReduceTask根据索引文件从数据文件读取数据
4、内存管理
堆内(On-heap)和堆外(Off-heap)内存
Executor 的内存管理建立在JVM的内存管理之上
Spark引入了堆外内存,使之可以直接在工作节点的系统内存中开辟空间来使用
内存空间分配
动态占用机制
5、附录
5.1、相关单词
en | 🔉 | cn |
---|---|---|
RPC | Remote Procedure Call | 远程过程调用 |
spill | spɪl | v. (使)溢出;n. 溢出液 |
eviction | ɪˈvɪkʃn | n. 逐出;收回 |
5.2、源码截取
查看源码需要先导入依赖并下载对应源码
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn_2.12</artifactId>
<version>3.0.0</version>
</dependency>
5.2.1、SparkOnYARNCluster
spark-submit --master yarn --deploy-mode cluster …
-----------------------------------
第一部分: 客户端和YARN通信
org.apache.spark.deploy.SparkSubmit.main
-- submit.doSubmit(args)
--super.doSubmit(args)
// 解析spark-submit后传入的参数,加载spark默认的参数
-- val appArgs = parseArguments(args)
--submit(appArgs, uninitLog)
--doRunMain()
//运行spark-submit 提交的主类 中的main方法
--runMain(args, uninitLog)
// 如果deployMode == CLIENT,此时childMainClass=自己提交的全类名
// 如果deployMode == CLUSTER,是YARN集群 childMainClass=org.apache.spark.deploy.yarn.YarnClusterApplication
--val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
//创建childMainClass的Class类型
--mainClass = Utils.classForName(childMainClass)
--val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
//如果是cluster 模式,此时运行下一行
mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
} else {
new JavaMainApplication(mainClass)
}
-- app.start(childArgs.toArray, sparkConf)
// YarnClusterApplication.start() Client是可以和yarn进行通信的一个客户端
--new Client(new ClientArguments(args), conf, null).run()
--this.appId = submitApplication()
// 提交应用程序到YARN上,获取YARN的返回值
-- val newApp = yarnClient.createApplication()
//确定app保存临时数据的作业目录,通常是在hdfs中的/tmp目录中生成一个子目录
-- val appStagingBaseDir = sparkConf.get(STAGING_DIR)
//确保YARN有足够的资源运行当前app的 AM
// 如果集群资源不足,此时会阻塞,一直阻塞到超时,会FAILD
-- verifyClusterResources(newAppResponse)
-- val containerContext = createContainerLaunchContext(newAppResponse)
// 确定AM的主类名
// AM是YARN提供的一个接口
// 任何的应用程序如果希望提交APP到YARN,实现AM的接口
// MR写的app,MR实现AM
// spark写的App,spark实现AM
--val amClass =
if (isClusterMode) {
//集群模式AM的全类名是org.apache.spark.deploy.yarn.ApplicationMaster
Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
} else {
Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
}
-- val appContext = createApplicationSubmissionContext(newApp, containerContext)
//提交运行AM
-- yarnClient.submitApplication(appContext)
----------------------------------------
第二部分 AM启动
ApplicationMaster.main
--master.run()
--if (isClusterMode) {
//如果是Cluster模式就在AM中运行Driver
--runDriver()
} else {
runExecutorLauncher()
}
// 启动一个线程,运行用户自己编写的app应用程序的main方法
--userClassThread = startUserApplication()
//等待用户自己编写的app的main方法运行结束后,获取用户自己创建的SparkContext
--val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
Duration(totalWaitTime, TimeUnit.MILLISECONDS))
// 向RM回报,AM已经启动成功了
--registerAM(host, port, userConf, sc.ui.map(_.webUrl), appAttemptId)
// 创建一个可以向RM申请资源的对象,申请资源启动Executor
-- createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf)
//allocator: YarnAllocator 作用就是向RM申请Container,决定获取到COntainer后,使用Container干什么
--allocator = client.createAllocator()
--allocator.allocateResources()
//发送申请请求
-- val allocateResponse = amClient.allocate(progressIndicator)
//从YARN的响应中获取已经分配的Container
--val allocatedContainers = allocateResponse.getAllocatedContainers()
//处理申请到的Container
-- handleAllocatedContainers(allocatedContainers.asScala)
//在Container中运行进程
-- runAllocatedContainers(containersToUse)
//每个Container都会创建一个ExecutorRunnable,交给线程池运行
--new ExecutorRunnable.run
-- nmClient = NMClient.createNMClient()
nmClient.init(conf)
nmClient.start()
//准备容器中的启动进程
startContainer()
--org.apache.spark.executor.YarnCoarseGrainedExecutorBackend 容器中启动的进程
--val commands = prepareCommand()
5.2.2、SparkOnYARNClient
spark-submit --master yarn …
-----------------------------------
第一部分: 客户端和YARN通信
org.apache.spark.deploy.SparkSubmit.main
-- submit.doSubmit(args)
--super.doSubmit(args)
// 解析spark-submit后传入的参数,加载spark默认的参数
-- val appArgs = parseArguments(args)
--submit(appArgs, uninitLog)
--doRunMain()
//运行spark-submit 提交的主类 中的main方法
--runMain(args, uninitLog)
// 如果deployMode == CLIENT,此时childMainClass=自己提交的全类名
// 如果deployMode == CLUSTER,是YARN集群 childMainClass=org.apache.spark.deploy.yarn.YarnClusterApplication
--val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
//创建childMainClass的Class类型
--mainClass = Utils.classForName(childMainClass)
--val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
} else {
//如果是client 模式,此时运行下一行
new JavaMainApplication(mainClass)
}
//JavaMainApplication.start
-- app.start(childArgs.toArray, sparkConf)
//获取用户编写的 app类的main
-- val mainMethod = klass.getMethod("main", new Array[String](0).getClass)
//执行main方法,Driver启动
--mainMethod.invoke(null, args)
------------------------------------------------------------------------
第二部分: WordCount.main
SparkContext sc=new SparkContext
SparkContext的重要组件:
private var _taskScheduler: TaskScheduler = _ //负责Task的调度
private var _dagScheduler: DAGScheduler = _ // Job中DAG Stage的切分
private var _env: SparkEnv = _ // RpcEnv(通信环境) BlockManager(存数据)
--------------
_taskScheduler = ts
_taskScheduler.start()
//YarnClientSchedulerBackend.start()
--backend.start()
-- client = new Client(args, conf, sc.env.rpcEnv)
//提交应用程序到YARN
-- bindToYarn(client.submitApplication(), None)
//参考cluster模式
--......
--在容器中运行的AM的全类名: org.apache.spark.deploy.yarn.ExecutorLauncher
-----------------------------
第二部分:启动AM
org.apache.spark.deploy.yarn.ExecutorLauncher.main
//ExecutorLauncher 是对AM的封装,也是AM的实现
ApplicationMaster.main(args)
-- --master.run()
if (isClusterMode) {
--runDriver()
} else {
//client模式运行
runExecutorLauncher() //申请Container启动Executor
}
----------------------------------------------------------------------------
// YarnClusterApplication.start() Client是可以和yarn进行通信的一个客户端
--new Client(new ClientArguments(args), conf, null).run()
--this.appId = submitApplication()
// 提交应用程序到YARN上,获取YARN的返回值
-- val newApp = yarnClient.createApplication()
//确定app保存临时数据的作业目录,通常是在hdfs中的/tmp目录中生成一个子目录
-- val appStagingBaseDir = sparkConf.get(STAGING_DIR)
//确保YARN有足够的资源运行当前app的 AM
// 如果集群资源不足,此时会阻塞,一直阻塞到超时,会FAILD
-- verifyClusterResources(newAppResponse)
-- val containerContext = createContainerLaunchContext(newAppResponse)
// 确定AM的主类名
// AM是YARN提供的一个接口
// 任何的应用程序如果希望提交APP到YARN,实现AM的接口
// MR写的app,MR实现AM
// spark写的App,spark实现AM
--val amClass =
if (isClusterMode) {
//集群模式AM的全类名是org.apache.spark.deploy.yarn.ApplicationMaster
Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
} else {
Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
}
-- val appContext = createApplicationSubmissionContext(newApp, containerContext)
//提交运行AM
-- yarnClient.submitApplication(appContext)
5.2.3、Executor启动之反向注册
org.apache.spark.executor.YarnCoarseGrainedExecutorBackend 本身就是一个通信端点
//RpcEndpointRef:某个通信端点的引用
var driver: Option[RpcEndpointRef] = None
------------------
第一部分: YarnCoarseGrainedExecutorBackend进程启动
--YarnCoarseGrainedExecutorBackend.main
-- CoarseGrainedExecutorBackend.run(backendArgs, createFn)
//向Driver请求spark的配置信息
--val executorConf = new SparkConf
// 当前进程在网络中有一个通信的别名称为Executor
-- env.rpcEnv.setupEndpoint("Executor",
backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile))
//阻塞,一直运行,除非Driver发送停止命令
--env.rpcEnv.awaitTermination()
constructor -> onStart -> receive* -> onStop
--------------------------------------
第二部分: YarnCoarseGrainedExecutorBackend进程向Driver发送注册请求
onStart
// ref:RpcEndpointRef 代表Driver的通信端点引用(电话号)
// 向Driver发RegisterExecutor消息,请求答复一个Boolean的值
-- ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls,
extractAttributes, _resources, resourceProfile.id))
}(ThreadUtils.sameThread).onComplete {
case Success(_) =>
//成功,自己给自己发 RegisteredExecutor消息
self.send(RegisteredExecutor)
//失败,进程就退出
case Failure(e) =>
exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
-------------------------------------
第三部分: Driver处理注册请求
DriverEndPoint.receiveAndReply
--case RegisterExecutor
// 判断是否已经注册过了,如果注册过了,回复失败;判断是否拉黑,拉黑,注册失败
// 否则注册,注册完成后,回复true
context.reply(true)
----------------------------------------
第四部分: YarnCoarseGrainedExecutorBackend 注册成功后
YarnCoarseGrainedExecutorBackend.receive
-- case RegisteredExecutor =>
logInfo("Successfully registered with driver")
try {
// Spark的计算者,维护了一个线程池,用来计算各种Task
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,
resources = _resources)
// 给Driver发LaunchedExecutor消息
driver.get.send(LaunchedExecutor(executorId))
-----------------------------------------------------
第五部分: Driver处理LaunchedExecutor请求
DriverEndPoint.receive
-- case LaunchedExecutor(executorId) =>
executorDataMap.get(executorId).foreach { data =>
data.freeCores = data.totalCores
}
//Driver向Executor发送工作任务(Task)
makeOffers(executorId)
//从调度池中,根据TaskSet的优先级,调度其中的Task。
// 按照轮流发送的原则将Task调度到哦所有的Executor,保证负载均衡。
//调度应该发往次Executor的Task
--scheduler.resourceOffers(workOffers)
-- launchTasks(taskDescs)
以上是关于大数据(8y)Spark3.0内核的主要内容,如果未能解决你的问题,请参考以下文章
将运行时 7.3LTS(Spark3.0.1) 升级到 9.1LTS(Spark3.1.2) 后创建 PySpark 数据帧 Databricks 时,json 文件中的重复列会引发错误