spark1.3.x涓巗park2.x鍚姩executor涓嶅悓鐨刢pu core鍒嗛厤鏂瑰紡

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark1.3.x涓巗park2.x鍚姩executor涓嶅悓鐨刢pu core鍒嗛厤鏂瑰紡相关的知识,希望对你有一定的参考价值。

鏍囩锛?a href='http://www.mamicode.com/so/1/ecif' title='ecif'>ecif   rev   def   BMI   core   鏈哄櫒   submit   鏍规嵁   ons   

***杩欓噷鐨別xecutor鍦╳orker涓婂垎閰嶇瓥鐣ヤ互spreadOut 涓轰緥***

1.3鐗堟湰鍏抽敭鐐癸細

for (app <- waitingApps if app.coresLeft > 0) { //瀵硅繕鏈瀹屽叏鍒嗛厤璧勬簮鐨刟pps澶勭悊
        val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
          .filter(canUse(app, _)).sortBy(_.coresFree).reverse //鏍规嵁core Free瀵瑰彲鐢╓orker杩涜闄嶅簭鎺掑簭銆?/span>
        val numUsable = usableWorkers.length //鍙敤worker鐨勪釜鏁?eg:鍙敤5涓獁orker
        val assigned = new Array[Int](numUsable) //鍊欓€塛orker锛屾瘡涓猈orker涓€涓笅鏍囷紝鏄竴涓暟缁勶紝鍒濆鍖栭粯璁ら兘鏄?
        var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)//杩樿鍒嗛厤鐨刢ores = 闆嗙兢涓彲鐢╓orker鐨勫彲鐢╟ores鎬诲拰锛?0锛夛紝 褰撳墠鏈垎閰峜ore锛?锛変腑鎵炬渶灏忕殑
        var pos = 0
        while (toAssign > 0) { 
          if (usableWorkers(pos).coresFree - assigned(pos) > 0) { //浠ound robin鏂瑰紡鍦ㄦ墍鏈夊彲鐢╓orker閲屽垽鏂綋鍓峸orker绌洪棽cpu鏄惁澶т簬褰撳墠鏁扮粍宸茬粡鍒嗛厤core鍊?/span>
            toAssign -= 1
            assigned(pos) += 1 //褰撳墠涓嬫爣pos鐨刉orker鍒嗛厤1涓猚ore +1
          }
          pos = (pos + 1) % numUsable //round-robin杞瀵绘壘鏈夎祫婧愮殑Worker
        }
        // Now that we鈥榲e decided how many cores to give on each node, let鈥榮 actually give them
        for (pos <- 0 until numUsable) {
          if (assigned(pos) > 0) { //濡傛灉assigned鏁扮粍涓殑鍊?gt;0锛屽皢鍚姩涓€涓猠xecutor鍦紝鎸囧畾涓嬫爣鐨勬満鍣ㄤ笂銆?/span>
            val exec = app.addExecutor(usableWorkers(pos), assigned(pos)) //鏇存柊app閲岀殑Executor淇℃伅
            launchExecutor(usableWorkers(pos), exec)  //閫氱煡鍙敤Worker鍘诲惎鍔‥xecutor
            app.state = ApplicationState.RUNNING
          }
        }
      }

浠ヤ笂绾㈣壊浠g爜娓呮櫚鐨勫睍绀轰簡鍦ㄥ钩鍧囧垎閰嶇殑鍦烘櫙涓嬶紝姣忔浼氱粰worker鍒嗛厤1涓猚ore锛屾墍浠ヨ鍦╯park-submit涓鏋滆缃簡 --executor-cores灞炴€ф湭蹇呰捣浣滅敤锛?/p>

浣嗗湪2.x鐗堟湰鐨剆park涓嵈鍋氫簡杩欐柟闈㈢殑鐭锛屽畠纭疄浼氬幓璇诲彇--executor-cores灞炴€т腑鐨勫€硷紝濡傛灉璇ュ€兼湭璁剧疆鍒欎緷鐒舵寜鐓?.3.x鐨勬柟寮忔墽琛岋紝浠g爜濡備笅锛?/p>

 private def scheduleExecutorsOnWorkers(
      app: ApplicationInfo,
      usableWorkers: Array[WorkerInfo],
      spreadOutApps: Boolean): Array[Int] = {
    val coresPerExecutor = app.desc.coresPerExecutor
    val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
    val oneExecutorPerWorker = coresPerExecutor.isEmpty
    val memoryPerExecutor = app.desc.memoryPerExecutorMB
    val numUsable = usableWorkers.length
    val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
    val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker
    var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)

    /** Return whether the specified worker can launch an executor for this app. */
    def canLaunchExecutor(pos: Int): Boolean = {
      val keepScheduling = coresToAssign >= minCoresPerExecutor
      val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor

      // If we allow multiple executors per worker, then we can always launch new executors.
      // Otherwise, if there is already an executor on this worker, just give it more cores.
      val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0
      if (launchingNewExecutor) {
        val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
        val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
        val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
        keepScheduling && enoughCores && enoughMemory && underLimit
      } else {
        // We鈥榬e adding cores to an existing executor, so no need
        // to check memory and executor limits
        keepScheduling && enoughCores
      }
    }

    // Keep launching executors until no more workers can accommodate any
    // more executors, or if we have reached this application鈥榮 limits
    var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)
    while (freeWorkers.nonEmpty) {
      freeWorkers.foreach { pos =>
        var keepScheduling = true
        while (keepScheduling && canLaunchExecutor(pos)) {
          coresToAssign -= minCoresPerExecutor
          assignedCores(pos) += minCoresPerExecutor

          // If we are launching one executor per worker, then every iteration assigns 1 core
          // to the executor. Otherwise, every iteration assigns cores to a new executor.
          if (oneExecutorPerWorker) {
            assignedExecutors(pos) = 1
          } else {
            assignedExecutors(pos) += 1
          }

          // Spreading out an application means spreading out its executors across as
          // many workers as possible. If we are not spreading out, then we should keep
          // scheduling executors on this worker until we use all of its resources.
          // Otherwise, just move on to the next worker.
          if (spreadOutApps) {
            keepScheduling = false
          }
        }
      }
      freeWorkers = freeWorkers.filter(canLaunchExecutor)
    }
    assignedCores
  }

 


以上是关于spark1.3.x涓巗park2.x鍚姩executor涓嶅悓鐨刢pu core鍒嗛厤鏂瑰紡的主要内容,如果未能解决你的问题,请参考以下文章

tomcat 6.0鍚姩

CentOS7涓嬪埄鐢╥nit.d鍚姩鑴氭湰瀹炵幇tomcat寮€鏈鸿嚜鍚姩

LINUX7(鏃ュ織绠$悊)

Android5 Zygote 涓?SystemServer 鍚姩娴佺▼鍒嗘瀽

tmux浣跨敤鈥斺€?019骞?1鏈?0鏃?6:40:15

Python-鍩轰簬socket鍜宻elect妯″潡瀹炵幇IO澶氳矾澶嶇敤