文末送书 | Spark Streaming 性能调优

Posted 过往记忆大数据

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了文末送书 | Spark Streaming 性能调优相关的知识,希望对你有一定的参考价值。

01合理的批处理时间(batchDuration)


关于Spark Streaming的批处理时间设置是非常重要的Spark Streaming在不断接数据的同时需要处理数据的时间所以如果设置过段的批处理时间会造成数据堆积即未完成的batch数据越来越多从而发生阻塞

另外值得注意的是batchDuration本身也不能设置小于500ms,这会导致Spark Streaming进行频繁提交作业造成额外的开销减少整个系统的吞吐量;相反如果将batchDuration时间设置得过长,又会影响整个系统的吞吐量。

如何设置一个合理的批处理时间需要根据应用本身集群资源情况以及关注和监控Spark Streaming系统的运行情况来调整重点关注监控界面中的Total Delay,如图7.1所示。

7.1  Spark UI全局延迟


02合理的Kafka拉取量(maxRatePerPartition参数设置)


对于数据源是KafkaSpark Streaming应用Kafka数据频率过高的情况下调整这个参数是非常必要的。我们可以改变spark.streaming.kafka.maxRatePerPartition参数的值来进行上限调整,默认是无上限的,即Kafka有多少数据,Spark Streaming就会一次性全拉出但是上节提到的批处理时间是一定的不可能动态变化如果持续数据频率过高同样会造成数据堆积、阻塞的现象。

所以需要结合batchDuration设置的值,调整spark.streaming.kafka.maxRatePerPatition参数,注意该参数配置的是Kafka每个partition拉取的上限,数据总量还需乘以所有的partition数量,调整两个参数maxRatePerPartitionbatchDuration使得数据的拉取和处理能够平衡尽可能增加整个系统的吞吐量可以观察监控界面中的Input RateProcessing Time如图7.2所示。

文末送书 | Spark Streaming 性能调优

7.2  Spark UI输入速率和平均处理时间


03缓存反复使用的Dstream(RDD)


Spark中的RDDSparkStreaming中的Dstream如果被反复使用,最好利用cache()函数将该数据流缓存起来,防止过度地调度资源造成的网络开销。可以参考并观察Scheduling Delay参数,如图7.3所示。

 

文末送书 | Spark Streaming 性能调优

7.3  SparkUI中调度延迟


04其他一些优化策略


除了以上针对Spark StreamingKafka这个特殊场景方面的优化外,对于前提到的一些常规优化也可以通过下面几点来完成。

  • 设置合理的GC方式使用--conf "spark.executor.extraJavaOptions=-XX:+UseConc MarkSweepGC"配置垃圾回收机制。

  • 设置合理的parallelism:在SparkStreaming+kafka的使用中,我们采用了Direct连接方式,前面讲过Spark中的partitionKafka中的Partition是一一对应的,一般默认设置为KafkaPartition的数量。

  • 设置合理的CPU资源数:CPU的core数量,每个Executor可以占用一个或多个core,观察CPU使用率(Linux命令top)来了解计算资源的使用情况。例如,很常见的一种浪费是一个Executor占用了多个core,但是总的CPU使用率却不高(因为一个Executor并不会一直充分利用多核的能力),这个时候可以考虑让单个Executor占用更少的core,同时Worker下面增加更多的Executor;或者从另一个角度,增加单个节点的worker数量,当然这需要修改Spark集群的配置,从而增加CPU利用率。值得注意是,这里的优化有一个平衡,Executor的数量需要考虑其他计算资源的配置,Executor的数量和每个Executor分到的内存大小成反比,如果每个Executor的内存过小,容易产生内存溢出(out of memory)的问题。

  • 高性能的算子:所谓高性能算子也要看具体的场景通常建议使用reduceByKey/aggregateByKey来代替groupByKey而存在数据库连接资源加载创建等需求时我们可以使用带partition的操作这样在每一个分区进行一次操作即可,因为分区是物理同机器的并不存在这些资源序列化的问题从而大大减少这部分操作的开销例如可以用mapPartitionsforeachPartitions操作来代替map、foreach操作。另外在进行coalesce操作时,因为会进行重组分区操作,所以最好进行必要的数据过滤filter操作。

  • Kryo优化序列化性能:7.1已经详细介绍了这部分内容,我们只要设置序列化类,再注册要序列化的自定义类型即可(比如算子函数中使用到的外部变量类型、作为RDD泛型类型的自定义类型等)。

文末送书 | Spark Streaming 性能调优


05结果


通过以上种种调整优化,最终我们想要达到的目的便是,个流式处理系统保持稳定,即Spark Streaming消费Kafka数据的速率赶上爬虫向Kafka生产数据的速率使得Kafka中的数据尽可能快被处理掉,减少积压,才能保证实时性,如图7.4所示。

 

7.4  Spark StreamingKafka稳定运行监控图

当然不同的应用场景会有不同的图形,这是本文词频统计优化稳定后的监控图,我们可以看到在Processing Time柱形图中有一条Stable的虚线,而大多数Batch都能够在这一虚线下处理完毕,说明整体Spark Streaming是运行稳定的。

对于项目中具体的性能调优,有以下几个点需要注意:

  • 一个DStream流只关联单一接收器,如果需要并行多个接收器来读取数据,那么需要创建多个DStream一个接收器至少需要运行在一个Executor上,甚至更多我们需要保证在接收器占用了部分后,还能有足够的核来处理接收到的数据。例如在设置spark.cores.max需要将接收器的占用考虑进来同时注意在分配Executor给接收器时,采用的是轮循的方式(round robin fashion)。

  • 当接收器从数据源接收到数据时,会创建数据块每个微秒级的数据块间隔(blockInterval milliseconds)中都会有一个新的数据块生成。在每个批处理间隔内(batchInterval数据块的数量N=batchInterval/blockInterval。这些数据块会当前执行器Executor的数据块管理器(BlockManager分发到他执行器的数据块管理器之后Driver点上运行的输入网络追踪器(Network Input Tracker通知数据块所在位置,以期进一步处理

  • RDD是基于Driver节点上每个批处理间隔产生的数据块(blocks创建的,这些数据块RDD的分支(partitions每个分支是Spark中的一个任务(task如果blockInterval == batchInterval那么意味着创建了单一分支,并且可能直接在本地处理

  • 数据块上的映射map任务在执行器(一个接收块,另一个复制块)中处理,该执行器不考虑块间隔,除非出现非本地调度。拥有更大的块间隔(blockInterval)意味着更大的数据块,如果将spark.locality.wait设置一个更大的值,那么更有可能在本地点处理数据块。我们需要在两个参数间(blockIntervalspark.locality.wait)做一个折中,确保越大的数据块更可能在本地被处理

  • 除了依赖于batchIntervalblockInterval我们可以直接通过inputDstream. repartition(n)来确定分支的数量。这个操作会重新打乱(reshufflesRDD中的数据,随机的分配n个分支。当然打乱(shuffle过程会造成一定的开销,但是会有更高的并行度。RDD的处理是由驱动程序的jobscheduler作为作业安排的。在给定的时间点上,只有一个作业是活动的。因此,如果一个作业正在执行,那么其他作业将排队。

  • 如果我们有两个Dstreams,那么将形成两个RDDs,并将创建两个作业,每个作业(job)都被安排为一个接着一个地执行。为了避免这种情况,可以联合两个Dstreams(union)。这将确保为Dstreams的两个RDD形成单一unionRDD。而这个unionRDD会被视为一个作业,但是RDDs的分区不会受到影响。

  • 如果批处理时间大于batchinterval,那么很明显,接收方的内存将逐渐被填满,并最终抛出异常(很可能是BlockNotFoundException)。目前没有办法暂停接收,那么可以利用SparkConf配置项中的spark.streaming.receiver.maxRate来控制接收器的速率。

本文摘编自《Spark Streaming 实时流式大数据处理实战 》,经出版方授权发布。