Spark Runtime(DriverMassterWorkerExecutor)内幕解密(DT大数据梦工厂)

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark Runtime(DriverMassterWorkerExecutor)内幕解密(DT大数据梦工厂)相关的知识,希望对你有一定的参考价值。

内容:

1、再论Spark集群部署;

2、Job提交解密;

3、Job的生成和接受;

4、Task的运行;

5、再论Shuffle;

从一个作业视角,透过Master、Drvier、Executor来透视Spark Runtime

==========再论Spark集群部署============

官网中关于集群的部署:

技术分享

默认情况下,每个Worker下有一个Executor,会最大化的使用内存和CPU。

Master发指令给Worker来分配资源,不关心Worker能不能分配到这个资源,他发给多少资源就是多少资源。

1、从Spark Runtime的角度讲,由五大核心对象:Master、Worker、Executor、Drvier、CoarseGrainedExecutorBackend;

2、Spark在做分布式集群系统设计的时候,最大化功能独立、模块化封装具体独立的对象、强内聚松耦合;

技术分享

3、当Drvier中的SparkContext初始化的时候会提交程序给Master,Master如果接受该程序在Spark中运行的话,就会为当前的程序分配AppID,同时会分配具体的计算资源,需要特别注意的是,Master是根据当前提交程序的配置信息来给集群中的Worker发指令分配具体的计算资源,但是,Master发出指令后并不关心具体的资源是否已经分配,转过来说Master是发指令后就记录了分配的资源,以后客户端再次提交其它的程序程序的话,就不能够使用该资源了

弊端:可能导致其它要提交的程序无法提交到本来要分配到的计算资源,没法运行;最重要的优势:在Spark分布式系统功能在弱耦合的基础上最快的运行系统(否则如果Master要等到资源最终分配成功后才通知Driver的话,就会造成Driver阻塞,不能够最大化并行计算资源的使用率);

因为Spark默认情况下,程序是排队执行的。

需要补充说明的是,Spark在默认情况下由于集群中一般都只有一个Application在运行,所以Master分配资源策略的弊端就没有那么明显了。

==========Job提交过程源码解密============

先运行一个程序,看日志

1、一个非常重要的技巧,通过在spark-shell中运行一个Job来了解Job提交的过程,然后再用源码验证这个过程;

scala> sc.textFile("/library/dataforSortedShuffle").flatMap(_.split(" ")).map(word=>(word,1)).reduceByKey(_+_).saveAsTextFile("/library/dataoutput2")

日志信息:

技术分享

技术分享

技术分享

技术分享

技术分享

技术分享

技术分享

2、Spark中所有的Action都会至少一个Job

上面日志显示,在saveAsTextFile的时候,会触发一个Job。

3、SparkContext在实例化的时候会构造SparkDeploySchedulerBackend、DAGScheduler、TaskSchedulerImpl、MapOutputTrackerMaster等对象,其中SparkDeploySchedulerBackend负责集群计算资源的管理和调度,DAGScheduler负责高层调度(例如Job中Stage划分、数据本地性等内容),TaskSchedulerImpl负责具体Stage内部的底层调度(例如:具体每个Task的调度、Task的容错等),MapOutputTrackerMaster负责Shuffle中数据输出和读取的管理;

4、TaskSchedulerImpl内部的调度(就是整段日志的一部分):

技术分享

==========Task的运行解密============

1、Task是运行在Executor中,而Executor又是位于CoarseGrainedExecutorBackend中的,且CoarseGrainedExecutorBackend和Executor是一一对应的;

这里进程就有:

技术分享

2、当CoarseGrainedExecutorBackend接收到TaskSetManager发过来的LaunchTask消息后,会反序列化Task Description,然后使用CoarseGrainedExecutorBackend中唯一的executor来执行任务

override def receive: PartialFunction[Any, Unit] = {
  case RegisteredExecutor(hostname) =>
    logInfo("Successfully registered with driver")
    executor new Executor(executorIdhostnameenvuserClassPathisLocal = false)

  case RegisterExecutorFailed(message) =>
    logError("Slave registration failed: " + message)
    System.exit(1)

  case LaunchTask(data) =>
    if (executor == null) {
      logError("Received LaunchTask command but executor was null")
      System.exit(1)
    } else {
      val taskDesc = ser.deserialize[TaskDescription](data.value)
      logInfo("Got assigned task " + taskDesc.taskId)
      executor.launchTask(thistaskId = taskDesc.taskIdattemptNumber = taskDesc.attemptNumber,
        taskDesc.nametaskDesc.serializedTask)
    }

集群:

private[spark] object CoarseGrainedClusterMessages {

  case object RetrieveSparkProps extends CoarseGrainedClusterMessage

  // Driver to executors
  case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage

  case class KillTask(taskId: Long, executor: StringinterruptThread: Boolean)
    extends CoarseGrainedClusterMessage

  sealed trait RegisterExecutorResponse

  case class RegisteredExecutor(hostname: Stringextends CoarseGrainedClusterMessage
    with RegisterExecutorResponse

  case class RegisterExecutorFailed(message: Stringextends CoarseGrainedClusterMessage
    with RegisterExecutorResponse

补充说明:LaunchTask是case class

executor中看到

// Start worker thread pool
private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker")
private val executorSource new ExecutorSource(threadPoolexecutorId)

线程池啊!!!

作业:

Job提交和执行的过程总结一下。

王家林老师名片:

中国Spark第一人

新浪微博:http://weibo.com/ilovepains

微信公众号:DT_Spark

博客:http://blog.sina.com.cn/ilovepains

手机:18610086859

QQ:1740415547

邮箱:[email protected]


本文出自 “一枝花傲寒” 博客,谢绝转载!

以上是关于Spark Runtime(DriverMassterWorkerExecutor)内幕解密(DT大数据梦工厂)的主要内容,如果未能解决你的问题,请参考以下文章

结合Spark讲一下Flink的runtime

SPARK最新特性Runtime Filtering(运行时过滤)以及与动态分区裁剪的区别

Pytorch基础教程33spark或dl模型部署(MLFlow/ONNX/Runtime/tensorflow serving)

org.antlr.v4.runtime.atn.ATN; Could not deserialize ATN with UUID

不支持的文字类型类 scala.runtime.BoxedUnit

Java代码如何向Spark注册无参数UDF