Spark系列Master中的资源调度

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark系列Master中的资源调度相关的知识,希望对你有一定的参考价值。

资源调度

技术分享

说明:

Application的调度算法有两种,分别为spreadOutApps和非spreadOutApps

spreadOutApps

  • 在spark-submit脚本中,可以指定要多少个executor,executor需要多少个cpu及多少内存,基于该机制,最后executor的实际数量,以及每个executor的cpu可能与配置是不一样的。
  • 因为spreadOutApps调度算法的总是基于总CPU总和来分配,比如要求3个executor每个要3个CPU,如果有9个worker每个有1个CPU,因为总共要分配9个core,所以每个worker分配一个core然后每个worker启动一个executor
  • 最后启动9个executor每个executor1个cput core

非spreadOutApps

  • 每个application都尽可能分配到尽量少的worker上,比如总共有10个worker,每个有10个core app总共要分配20个core,那么其实只会分配到两个worker上,每个worker都占满10个core.

 

Schdule方法源码分析

/**
   * Schedule the currently available resources among waiting apps. This method will be called
   * every time a new app joins or resource availability changes.
   */
  private def schedule() {
    // 判断master状态,不为ALIVE时直接返回
    if (state != RecoveryState.ALIVE) { return }
 
    // First schedule drivers, they take strict precedence over applications
10      // Randomization helps balance drivers
11      // 获取状态为ALIVE的worker,并且随机打乱
12      val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
13      // 可用worker数量
14      val numWorkersAlive = shuffledAliveWorkers.size
15      var curPos = 0
16   
17      // diriver调度过程(yarn-client模式下)
18      for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
19        // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
20        // start from the last worker that was assigned a driver, and continue onwards until we have
21        // explored all alive workers.
22        var launched = false
23        var numWorkersVisited = 0
24        // 判读还有可用的worker且Driver还未启动
25        while (numWorkersVisited < numWorkersAlive && !launched) {
26          val worker = shuffledAliveWorkers(curPos)
27          numWorkersVisited += 1
28          // 判断当前worker空闲内存是否大于等于driver需要的内存,且Worker空闲的core数量大于等于dirver需要的core的数量
29          if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
30            // 启动driver
31            launchDriver(worker, driver)
32            waitingDrivers -= driver
33            launched = true
34          }
35          curPos = (curPos + 1) % numWorkersAlive
36        }
37      }
38   
39      // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
40      // in the queue, then the second app, etc.
41      // spreadOutApps调度方式
42      if (spreadOutApps) {
43        // Try to spread out each app among all the nodes, until it has all its cores
44        // 遍历需要调度的app(Application),且该app中的core还需要调度
45        for (app <- waitingApps if app.coresLeft > 0) {
46          val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
47            .filter(canUse(app, _)).sortBy(_.coresFree).reverse
48          // 可用worker的数量
49          val numUsable = usableWorkers.length
50          // 存放app 需要分配core的结果
51          val assigned = new Array[Int](numUsable) // Number of cores to give on each node
52          // 获取Application剩余需要分配的cpu数量与worker总共可用cpu数量中的最小值
53          var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
54          var pos = 0
55          while (toAssign > 0) {
56            // 如果worker空闲的cpu数量大于已经分配出去的cpu数量,那么woker还可继续分配cpu
57            if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
58              // 还需分配core的总数量减1
59              toAssign -= 1
60              // 在已分配app core结果集中加1
61              assigned(pos) += 1
62            }
63            pos = (pos + 1) % numUsable
64          }
65          // Now that we‘ve decided how many cores to give on each node, let‘s actually give them
66          for (pos <- 0 until numUsable) {
67            if (assigned(pos) > 0) {
68              // 根据WorkerInfo和所需的core构建ExecutorDesc
69              val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
70              // 启动Executor
71              launchExecutor(usableWorkers(pos), exec)
72              app.state = ApplicationState.RUNNING
73            }
74          }
75        }
76      } 
77      // 非spreadOutApps调度方式
78      else {
79        // Pack each app into as few nodes as possible until 

以上是关于Spark系列Master中的资源调度的主要内容,如果未能解决你的问题,请参考以下文章

spark中资源调度任务调度

Spark资源调度

Spark schedule资源调度分配详解

Spark 资源调度 与 任务调度

Spark资源调度和任务调度

Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法