Spark Streaming资源动态申请和动态控制消费速率原理剖析

Posted snail_gesture

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark Streaming资源动态申请和动态控制消费速率原理剖析相关的知识,希望对你有一定的参考价值。

为什么需要动态?
a) Spark默认情况下粗粒度的,先分配好资源再计算。对于Spark Streaming而言有高峰值和低峰值,但是他们需要的资源是不一样的,如果按照高峰值的角度的话,就会有大量的资源浪费。
b) Spark Streaming不断的运行,对资源消耗和管理也是我们要考虑的因素。
Spark Streaming资源动态调整的时候会面临挑战:
Spark Streaming是按照Batch Duration运行的,Batch Duration需要很多资源,下一次Batch Duration就不需要那么多资源了,调整资源的时候还没调整完Batch Duration运行就已经过期了。这个时候调整时间间隔。

Spark Streaming资源动态申请
1. 在SparkContext中默认是不开启动态资源分配的,但是可以通过手动在SparkConf中配置。

// Optionally scale number of executors dynamically based on workload. Exposed for testing.
val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
if (!dynamicAllocationEnabled && 
//参数配置是否开启资源动态分配
_conf.getBoolean("spark.dynamicAllocation.enabled", false)) 
  logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.")


_executorAllocationManager =
  if (dynamicAllocationEnabled) 
    Some(new ExecutorAllocationManager(this, listenerBus, _conf))
   else 
    None
  
_executorAllocationManager.foreach(_.start())
2.  ExecutorAllocationManager: 有定时器会不断的去扫描Executor的情况,正在运行的Stage,要运行在不同的Executor中,要么增加Executor或者减少。
3.  ExecutorAllocationManager中schedule方法会被周期性触发进行资源动态调整。
/**
 * This is called at a fixed interval to regulate the number of pending executor requests
 * and number of executors running.
 *
 * First, adjust our requested executors based on the add time and our current needs.
 * Then, if the remove time for an existing executor has expired, kill the executor.
 *
 * This is factored out into its own method for testing.
 */
private def schedule(): Unit = synchronized 
  val now = clock.getTimeMillis

  updateAndSyncNumExecutorsTarget(now)

  removeTimes.retain  case (executorId, expireTime) =>
    val expired = now >= expireTime
    if (expired) 
      initializing = false
      removeExecutor(executorId)
    
    !expired
  

4.  在ExecutorAllocationManager中会在线程池中定时器会不断的运行schedule.
/**
 * Register for scheduler callbacks to decide when to add and remove executors, and start
 * the scheduling task.
 */
def start(): Unit = 
  listenerBus.addListener(listener)

  val scheduleTask = new Runnable() 
    override def run(): Unit = 
      try 
        schedule()
       catch 
        case ct: ControlThrowable =>
          throw ct
        case t: Throwable =>
          logWarning(s"Uncaught exception in thread $Thread.currentThread().getName", t)
      
    
  
// intervalMillis定时器触发时间
  executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)

动态控制消费速率:
Spark Streaming提供了一种弹性机制,流进来的速度和处理速度的关系,是否来得及处理数据。如果不能来得及的话,他会自动动态控制数据流进来的速度,spark.streaming.backpressure.enabled参数设置。

以上是关于Spark Streaming资源动态申请和动态控制消费速率原理剖析的主要内容,如果未能解决你的问题,请参考以下文章

Spark Streaming资源动态申请和动态控制消费速率原理剖析

Spark Streaming资源动态申请和动态控制消费速率原理剖析

(版本定制)第17课:Spark Streaming资源动态申请和动态控制消费速率原理剖析

Spark Streaming发行版笔记17:资源动态分配和动态控制消费速率

Spark Streaming 的动态分配

简析Spark Streaming/Flink的Kafka动态感知