Driver在Cluster模式下的启动两种不同的资源调度方式源码彻底解析资源调度内幕总结(DT大数据梦工厂)

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Driver在Cluster模式下的启动两种不同的资源调度方式源码彻底解析资源调度内幕总结(DT大数据梦工厂)相关的知识,希望对你有一定的参考价值。

内容:

1、分配 Driver(Cluster);

2、为Application分配资源;

3、两种不同的资源分配方式彻底解密;

4、Spark资源分配的思考;

Spark最最重要的,这个内容每个IMF成员必须掌握,后面的性能优化全部跟这个有关。

==========任务调度与资源调度的区别============

1、任务调度是通过DAGScheduler、TaskScheduler、SchedulerBackend等进行的作业调度;

2、资源调度是指应用程序如何获取资源;

3、任务调度是在资源调度的基础上进行的,没有资源调度,任务调度就无从谈起,就成为了无源之水、无本之木

4、Spark资源调度算法的方法是:schedule()

==========资源调度内幕天机解密============

1、因为Master负责资源管理和调度,所以资源调度的方法Schedule位于Master.scala类中,当注册程序或者资源发生改变的时候,都会导致schedule的调用,例如注册程序的时候:

case RegisterApplication(descriptiondriver) => {
  // TODO Prevent repeated registrations from some driver
  if (state == RecoveryState.STANDBY{
    // ignore, don‘t send response
  } else {
    logInfo("Registering app " + description.name)
    val app = createApplication(descriptiondriver)
    registerApplication(app)
    logInfo("Registered app " + description.name + " with ID " + app.id)
    persistenceEngine.addApplication(app)
    driver.send(RegisteredApplication(app.idself))
    schedule()
  }
}

2、schedule()调用时机:每次有新的应用程序或者集群资源状态发生改变的时候(包括Executor增加或者减少、Worker增加或者减少等);

3、当前Master必须是Alive的状态才能进行资源的调度,如果不是Alive的状态,会直接返回,也就是说,Standby Master不会进行Application的资源调用;

4、使用Random.shuffle把Master中保留的集群中所有Worker的信息随机打乱,其算法内部是循环随机交换所有Worker在Master缓存数据结构中的位置;

5、接下来要判断所有Worker中哪些Worker是ALIVE级别的Worker,ALIVE才能参与资源的分配工作;

6、当Spark submit指定Driver在Cluster模式的情况下,此时driver会加入waitingDrivers等待列表中,在每个Driver的DrvierInfo中的driverDescription中有要启动Driver时候对Worker的内存及Cores的要求等内容(这个Driver如果设置了supervise,则drvier挂掉之后可以自动重启);

private[deploy] case class DriverDescription(
    jarUrl: String,
    mem: Int,
    cores: Int,
    supervise: Boolean,
    command: Command) {

  override def toStringString s"DriverDescription (${command.mainClass})"
}

7、然后在符合资源要求的基础上用随机打乱的一个Worker来启动Driver,Master发指令给远程的Worker让远程的Worker启动driver,然后driver的state就编程RUNNING了;

private def launchDriver(worker: WorkerInfodriver: DriverInfo) {
  logInfo("Launching driver " + driver.id + " on worker " + worker.id)
  worker.addDriver(driver)
  driver.worker Some(worker)
  worker.endpoint.send(LaunchDriver(driver.iddriver.desc))//Master发指令给Worker启动对应的driver
  driver.state = DriverState.RUNNING
}

8、先启动Drvier才会发生后续的一切的资源调度的模式;

/**
 * Schedule the currently available resources among waiting apps. This method will be called
 * every time a new app joins or resource availability changes.
 */
private def schedule(): Unit = {
  if (state != RecoveryState.ALIVE) { return }
  // Drivers take strict precedence over executors
  val shuffledWorkers = Random.shuffle(workers// Randomization helps balance drivers
  for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
    for (driver <- waitingDrivers) {
      if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
        launchDriver(workerdriver)
        waitingDrivers -= driver
      }
    }
  }
  startExecutorsOnWorkers()
}

8、spark默认为应用程序启动Executor的方式是采用FIFO(先进先出,排队)的方式,也就是说所有提交的应用程序都是放在调度的等待队列中的,先进先出,只有满足了前面应用程序的资源分配基础上,才能够满足下一个应用程序资源的分配;

9、为应用程序具体分配Executor之前要判断应用程序是否还需要分配cores,如果不需要,则不会为应用程序分配Executor;

10、具体分配Executor之前要对要求Worker必须是ALIVE状态且必须满足Application对每个Executor的内存和cores 的要求,并且在此基础上进行排序,把cores多的放在前面

在FIFO情况下默认是spreadOutApps来让应用程序尽可能多的运行在所有的node上

11、为应用程序分配Executors有两种方式,第一种方式是尽可能在集群的所有Worker上分配Executor,这种方式往往会带来潜在的更好的数据本地性;

/**
 * Schedule and launch executors on workers
 */
private def startExecutorsOnWorkers(): Unit = {
  // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
  // in the queue, then the second app, etc.
  for (app <- waitingApps if app.coresLeft > 0) {
    val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
    // Filter out workers that don‘t have enough resources to launch an executor
    val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
      .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
        worker.coresFree >= coresPerExecutor.getOrElse(1))
      .sortBy(_.coresFree).reverse
    val assignedCores = scheduleExecutorsOnWorkers(appusableWorkersspreadOutApps)

    // Now that we‘ve decided how many cores to allocate on each worker, let‘s allocate them
    for (pos <- until usableWorkers.length if assignedCores(pos) > 0) {
      allocateWorkerResourceToExecutors(
        appassignedCores(pos)coresPerExecutorusableWorkers(pos))
    }
  }
}

12、具体在集群上分配cores的时候,会尽可能满足我们的要求;

13、如果是每个Worker下面只能够为当前的应用程序分配一个Executor的话,每次只分配一个Core!

var coresToAssign = math.min(app.coresLeftusableWorkers.map(_.coresFree).sum)

// If we are launching one executor per worker, then every iteration assigns 1 core
// to the executor. Otherwise, every iteration assigns cores to a new executor.
if (oneExecutorPerWorker) {
  assignedExecutors(pos) = 1
else {
  assignedExecutors(pos) += 1
}

假设4个Worker,spreadOut时候,会一轮一轮为executor分配core的,一个一个,循环分配,直到资源耗尽

14、然后就是分配了,准备好具体要为当前应用程序分配的Executor信息后,具体Master要通过远程通信发指令给Worker来具体启动ExecutorBackEnd进程;

// Now that we‘ve decided how many cores to allocate on each worker, let‘s allocate them
for (pos <- until usableWorkers.length if assignedCores(pos) > 0) {
  allocateWorkerResourceToExecutors(
    appassignedCores(pos)coresPerExecutorusableWorkers(pos))
}

/**
 * Allocate a worker‘s resources to one or more executors.
 * @param app the info of the application which the executors belong to
 * @param assignedCores number of cores on this worker for this application
 * @param coresPerExecutor number of cores per executor
 * @param worker the worker info
 */
private def allocateWorkerResourceToExecutors(
    app: ApplicationInfo,
    assignedCores: Int,
    coresPerExecutor: Option[Int],
    worker: WorkerInfo): Unit = {
  // If the number of cores per executor is specified, we divide the cores assigned
  // to this worker evenly among the executors with no remainder.
  // Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
  val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
  val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
  for (i <- to numExecutors) {
    val exec = app.addExecutor(workercoresToAssign)
    launchExecutor(workerexec)
    app.state = ApplicationState.RUNNING
  }
}

15、紧接着给我们应用程序的Driver发送一个ExecutorAdded的信息

private def launchExecutor(worker: WorkerInfoexec: ExecutorDesc): Unit = {
  logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
  worker.addExecutor(exec)
  worker.endpoint.send(LaunchExecutor(masterUrl,
    exec.application.idexec.idexec.application.descexec.coresexec.memory))
  exec.application.driver.send(
    ExecutorAdded(exec.idworker.idworker.hostPortexec.coresexec.memory))
}

王家林老师名片:

中国Spark第一人

新浪微博:http://weibo.com/ilovepains

微信公众号:DT_Spark

博客:http://blog.sina.com.cn/ilovepains

手机:18610086859

QQ:1740415547

邮箱:[email protected]


本文出自 “一枝花傲寒” 博客,谢绝转载!

以上是关于Driver在Cluster模式下的启动两种不同的资源调度方式源码彻底解析资源调度内幕总结(DT大数据梦工厂)的主要内容,如果未能解决你的问题,请参考以下文章

Yarn模式下的监控界面介绍

.Spark基于Standalone提交任务两种方式

Spark的运行模式--Yarn-Cluster

Spark on yarn的两种模式 yarn-cluster 和 yarn-client

ZooKeeper源码学习笔记--Cluster模式下的ZooKeeper

Spark on Yarn 流程