(版本定制)第17课:Spark Streaming资源动态申请和动态控制消费速率原理剖析

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了(版本定制)第17课:Spark Streaming资源动态申请和动态控制消费速率原理剖析相关的知识,希望对你有一定的参考价值。

本期内容:

    1、Spark Streaming资源动态分配

    2、Spark Streaming动态控制消费速率

为什么需要动态? 
a)Spark默认情况下粗粒度的,先分配好资源再计算。对于Spark Streaming而言有高峰值和低峰值,但是他们需要的资源是不一样的,如果按照高峰值的角度的话,就会有大量的资源浪费。 

b) Spark Streaming不断的运行,对资源消耗和管理也是我们要考虑的因素。 
Spark Streaming资源动态调整的时候会面临挑战: 
Spark Streaming是按照Batch Duration运行的,Batch Duration需要很多资源,下一次Batch Duration就不需要那么多资源了,调整资源的时候还没调整完Batch Duration运行就已经过期了。这个时候调整时间间隔。


Spark Streaming资源动态申请 
1. 在SparkContext中默认是不开启动态资源分配的,但是可以通过手动在SparkConf中配置。

// Optionally scale number of executors dynamically based on workload. Exposed for testing.
val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
if (!dynamicAllocationEnabled && _conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
 logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.")
}

_executorAllocationManager =
if (dynamicAllocationEnabled) {
Some(new ExecutorAllocationManager(this, listenerBus, _conf))
 } else {
   None
 }
_executorAllocationManager.foreach(_.start())

设置spark.dynamicAllocation.enabled参数为true

这里会通过实例化ExecutorAllocationManager对象来动态分配资源,其内部是有定时器会不断的去扫描Executor的情况,通过线程池的方式调用schedule()来完成资源动态分配。

/**
* Register for scheduler callbacks to decide when to add and remove executors, and start
* the scheduling task.
*/
def start(): Unit = {
 listenerBus.addListener(listener)

val scheduleTask = new Runnable() {
override def run(): Unit = {
try {
       schedule()
//动态调整Executor分配数量
     }
catch {
case ct: ControlThrowable =>
throw ct
case t: Throwable =>
         logWarning(
s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
     }
   }
 }
executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
}

private def schedule(): Unit = synchronized {
val now = clock.getTimeMillis

 updateAndSyncNumExecutorsTarget(now)
//更新Executor数量

removeTimes.retain { case (executorId, expireTime) =>
val expired = now >= expireTime
if (expired) {
initializing = false
removeExecutor(executorId)
   }
   !expired
 }
}
/**
* Updates our target number of executors and syncs the result with the cluster manager.
*
* Check to see whether our existing allocation and the requests we‘ve made previously exceed our
* current needs. If so, truncate our target and let the cluster manager know so that it can
* cancel pending requests that are unneeded.
*
* If not, and the add time has expired, see if we can request new executors and refresh the add
* time.
*
*
@return the delta in the target number of executors.
*/
private def updateAndSyncNumExecutorsTarget(now: Long): Int = synchronized {
val maxNeeded = maxNumExecutorsNeeded

if (initializing) {
// Do not change our target while we are still initializing,
   // Otherwise the first job may have to ramp up unnecessarily
0
} else if (maxNeeded < numExecutorsTarget) {
// The target number exceeds the number we actually need, so stop adding new
   // executors and inform the cluster manager to cancel the extra pending requests
val oldNumExecutorsTarget = numExecutorsTarget
   numExecutorsTarget
= math.max(maxNeeded, minNumExecutors)
numExecutorsToAdd = 1

// If the new target has not changed, avoid sending a message to the cluster manager
if (numExecutorsTarget < oldNumExecutorsTarget) {
     client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
     logDebug(s"Lowering target number of executors to $numExecutorsTarget (previously " +
s"$oldNumExecutorsTarget) because not all requested executors are actually needed")
   }
numExecutorsTarget - oldNumExecutorsTarget
 } else if (addTime != NOT_SET && now >= addTime) {
val delta = addExecutors(maxNeeded)
   logDebug(s"Starting timer to add more executors (to " +
s"expire in $sustainedSchedulerBacklogTimeoutS seconds)")
addTime += sustainedSchedulerBacklogTimeoutS * 1000
delta
 } else {
0
}
}

动态控制消费速率: 
Spark Streaming提供了一种弹性机制,流进来的速度和处理速度的关系,是否来得及处理数据。如果不能来得及的话,他会自动动态控制数据流进来的速度,spark.streaming.backpressure.enabled参数设置。


备注:

资料来源于:DT_大数据梦工厂(Spark发行版本定制)

更多私密内容,请关注微信公众号:DT_Spark

如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上20:00开设的Spark永久免费公开课,地址YY房间号:68917580


本文出自 “DT_Spark大数据梦工厂” 博客,请务必保留此出处http://18610086859.blog.51cto.com/11484530/1787605

以上是关于(版本定制)第17课:Spark Streaming资源动态申请和动态控制消费速率原理剖析的主要内容,如果未能解决你的问题,请参考以下文章

(版本定制)第12课:Spark Streaming源码解读之Executor容错安全性

(版本定制)第16课:Spark Streaming源码解读之数据清理内幕彻底解密

(版本定制)第14课:Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密

(版本定制)第11课:Spark Streaming源码解读之Driver中的ReceiverTracker彻底研究和思考

(版本定制)第15课:Spark Streaming源码解读之No Receivers彻底思考

(版本定制)第10课:Spark Streaming源码解读之流数据不断接收全生命周期彻底研究和思考