小记--------spark资源调度机制源码分析-----Schedule
Posted yzqyxq
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了小记--------spark资源调度机制源码分析-----Schedule相关的知识,希望对你有一定的参考价值。
Master类位置所在:spark-core_2.11-2.1.0.jar的org.apache.spark.deploy.master下的Master类
/** * driver调度机制原理代码分析Schedule the currently available resources among waiting apps. This method will be called * every time a new app joins or resource availability changes. */ private def schedule(): Unit = { //首先判断,master撞他提不是ALIVE的话, 就直接返回 //也就是说 standby master是不会进行application等资源的调度的 if (state != RecoveryState.ALIVE) { return } // Rondom.shuffle的原理,就是对传入的集合的元素进行随机的打乱 // 取出了workers中的所有值钱注册上来的worker,进行过滤, 必须死活状态为ALIVE的worker // 对状态为ALIVE的worker, 调用Random的shuffle方法进行随机的打乱 val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE)) // 获取到当前可用worker的个数 val numWorkersAlive = shuffledAliveWorkers.size var curPos = 0 // 只有用yarn-cluster模式提交的时候,才会注册driver,因为standalone和yarn-client模式,都会在本地直接启动driver,而不会来注册driver,更不会让master调度driver了 // 首先会遍历waitingDrivers 这个ArrayBuffer for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers // We assign workers to each waiting driver in a round-robin fashion. For each driver, we // start from the last worker that was assigned a driver, and continue onwards until we have // explored all alive workers. var launched = false var numWorkersVisited = 0 // while的条件,numWorkersVisited小于numWorkersAlive,就是说只要还有活着的worker没有被遍历到,那么就继续遍历。而且,当前这个driver还没有被启动,也就是launched为false while (numWorkersVisited < numWorkersAlive && !launched) { val worker = shuffledAliveWorkers(curPos) numWorkersVisited += 1 // 如果当前这个worker的空闲内存量大于等于driver需要的内存 // 并且worker的空闲CPU数量大于等于driver需要的CPU数量 if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { //那么启动driver launchDriver(worker, driver)//详细代码见代码1 // 并且将driver从waitingDrivers队列中移除 waitingDrivers -= driver launched = true } //然后将指针指向下一个worker curPos = (curPos + 1) % numWorkersAlive } } startExecutorsOnWorkers() } 代码1 /** *在某一个worker上启动driver */ private def launchDriver(worker: WorkerInfo, driver: DriverInfo) { logInfo("Launching driver " + driver.id + " on worker " + worker.id) // 将driver加入worker内存的缓存结构中 // 将worker内使用的内存和CPU数量,都加上driver需要的内存和CPU数量 worker.addDriver(driver) // 同时把worker也加入到driver内部的缓存结构中 进行一个互相引用,互相能找到对方 driver.worker = Some(worker) // 然后调用worker的线程,给它发送LaunchDriver消息,让worker来启动driver worker.endpoint.send(LaunchDriver(driver.id, driver.desc)) // 然后将driver的状态设置为RUNNING driver.state = DriverState.RUNNING } //executor调度原理及源码分析 /** * Schedule and launch executors on workers */ private def startExecutorsOnWorkers(): Unit = { // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app // in the queue, then the second app, etc. // 首先遍历出waitingApps中的ApplicationInfo,并且过滤出还有需要调用的core的Application for (app <- waitingApps if app.coresLeft > 0) { val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor // Filter out workers that don‘t have enough resources to launch an executor // 从workers中,过滤出状态为ALIVE的,再次过滤可以被Application使用的worker,然后按照剩余CPU数量倒序排序 val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && worker.coresFree >= coresPerExecutor.getOrElse(1)) .sortBy(_.coresFree).reverse val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)//详细代码见:代码2 // Now that we‘ve decided how many cores to allocate on each worker, let‘s allocate them // 给每个worker分配完Application要求的CPU core之后 , 遍历每个worker // 并且判断之前给这个worker分配到了CPU core for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) { //在Application内部缓存中添加executor allocateWorkerResourceToExecutors( app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))//详细代码见:代码3 } } } 代码2 private def scheduleExecutorsOnWorkers( app: ApplicationInfo, usableWorkers: Array[WorkerInfo], spreadOutApps: Boolean): Array[Int] = { val coresPerExecutor = app.desc.coresPerExecutor // 每个executor上最小的cores数量,默认为1 val minCoresPerExecutor = coresPerExecutor.getOrElse(1) val oneExecutorPerWorker = coresPerExecutor.isEmpty val memoryPerExecutor = app.desc.memoryPerExecutorMB // 可用的worker数量 val numUsable = usableWorkers.length val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker // 取Application需要的core是和集群workers的空闲cores和的最小值 var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) /** Return whether the specified worker can launch an executor for this app. */ // 是否可以在一个worker上分配Executor def canLaunchExecutor(pos: Int): Boolean = { val keepScheduling = coresToAssign >= minCoresPerExecutor val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor // If we allow multiple executors per worker, then we can always launch new executors. // Otherwise, if there is already an executor on this worker, just give it more cores. val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0 // 检查worker的空闲core和内存是否够用 if (launchingNewExecutor) { val assignedMemory = assignedExecutors(pos) * memoryPerExecutor val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit keepScheduling && enoughCores && enoughMemory && underLimit } else { // We‘re adding cores to an existing executor, so no need // to check memory and executor limits // 不检查memory和executor的限制 keepScheduling && enoughCores } } // Keep launching executors until no more workers can accommodate any // more executors, or if we have reached this application‘s limits var freeWorkers = (0 until numUsable).filter(canLaunchExecutor) while (freeWorkers.nonEmpty) { freeWorkers.foreach { pos => var keepScheduling = true while (keepScheduling && canLaunchExecutor(pos)) { // 要分配的cores coresToAssign -= minCoresPerExecutor // 已分配的cores assignedCores(pos) += minCoresPerExecutor // If we are launching one executor per worker, then every iteration assigns 1 core // to the executor. Otherwise, every iteration assigns cores to a new executor. // 一个worker只启动一个Executor if (oneExecutorPerWorker) { assignedExecutors(pos) = 1 } else { assignedExecutors(pos) += 1 } // Spreading out an application means spreading out its executors across as // many workers as possible. If we are not spreading out, then we should keep // scheduling executors on this worker until we use all of its resources. // Otherwise, just move on to the next worker. // 如果没有开启spreadOut算法,就一直在一个worker上分配,直到不能在分配为止, // 这个算法的意思就是,每个Application,都尽可能分配到尽量少的worker上,比如总过有10个worker,每个有10个CPU core // 而我们的Application总共需要20个core, 那么其实 就只用到两个worker if (spreadOutApps) { keepScheduling = false } } } freeWorkers = freeWorkers.filter(canLaunchExecutor) } assignedCores } 代码3 //分配CPUcore的算法 private def allocateWorkerResourceToExecutors( app: ApplicationInfo, assignedCores: Int, coresPerExecutor: Option[Int], worker: WorkerInfo): Unit = { // If the number of cores per executor is specified, we divide the cores assigned // to this worker evenly among the executors with no remainder. // Otherwise, we launch a single executor that grabs all the assignedCores on this worker. // 需要的cores / 每个executor的cores 得到真正需要启动的executor数量 val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1) val coresToAssign = coresPerExecutor.getOrElse(assignedCores) for (i <- 1 to numExecutors) { // 首先在Application内部缓存结构中,添加executors // 并且创建ExecutorDesc对象,其中封装了,给这个executor分配多少个CPU core // spark 1.3.0版本的executor启动的内部机制 // 在spark-submit脚本中, 可以指定要多少个executor,每个executor多少个CPU,多少内存,那么基于我们的机制,实际上,最后,executor的实际数量, 以及每个executor的CPU, 可能与配置是不一样的。 // 因为我们这里是基于总的CPU来分配的,就是说,比如要求3个executor,每个executor要3个CPU,那么 // 当我们有9个worker,每个worker有1个CPU时,那么其实总共是要分配9个CPU core, 其实根据这种算法,会给每个worker分配一个CPU core, 然后给每个worker启动一个executor // 最后总过会启动9个executor ,每个executor有一个CPUcore // application 需要记录executor val exec = app.addExecutor(worker, coresToAssign) // 接着在worker上启动executor launchExecutor(worker, exec)// 详细代码见:代码4 // 然后将Application的状态修改为RUNNING app.state = ApplicationState.RUNNING } } 代码4 private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = { logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) // 将executor加入worker内部缓存中 worker.addExecutor(exec) // 向worker发送LaunchExecutor消息 worker.endpoint.send(LaunchExecutor(masterUrl, exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)) // 向executor对应的application的driver,发送executorAdded消息 exec.application.driver.send( ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)) }
以上是关于小记--------spark资源调度机制源码分析-----Schedule的主要内容,如果未能解决你的问题,请参考以下文章
小记--------spark的Master的Application注册机制源码分析及Master的注册机制原理分析