文末送书 | Spark Streaming 性能调优
Posted 过往记忆大数据
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了文末送书 | Spark Streaming 性能调优相关的知识,希望对你有一定的参考价值。
01合理的批处理时间(batchDuration)
关于Spark Streaming的批处理时间设置是非常重要的,Spark Streaming在不断接收数据的同时,需要处理数据的时间,所以如果设置过段的批处理时间,会造成数据堆积,即未完成的batch数据越来越多,从而发生阻塞。
另外值得注意的是,batchDuration本身也不能设置为小于500ms,这会导致Spark Streaming进行频繁地提交作业,造成额外的开销,减少整个系统的吞吐量;相反如果将batchDuration时间设置得过长,又会影响整个系统的吞吐量。
如何设置一个合理的批处理时间,需要根据应用本身、集群资源情况,以及关注和监控Spark Streaming系统的运行情况来调整,重点关注监控界面中的Total Delay,如图7.1所示。
图7.1 Spark UI中全局延迟
02合理的Kafka拉取量(maxRatePerPartition参数设置)
对于数据源是Kafka的Spark Streaming应用,在Kafka数据频率过高的情况下,调整这个参数是非常必要的。我们可以改变spark.streaming.kafka.maxRatePerPartition参数的值来进行上限调整,默认是无上限的,即Kafka有多少数据,Spark Streaming就会一次性全拉出,但是上节提到的批处理时间是一定的,不可能动态变化,如果持续数据频率过高,同样会造成数据堆积、阻塞的现象。
所以需要结合batchDuration设置的值,调整spark.streaming.kafka.maxRatePerPatition参数,注意该参数配置的是Kafka每个partition拉取的上限,数据总量还需乘以所有的partition数量,调整两个参数maxRatePerPartition和batchDuration使得数据的拉取和处理能够平衡,尽可能地增加整个系统的吞吐量,可以观察监控界面中的Input Rate和Processing Time,如图7.2所示。
图7.2 Spark UI中输入速率和平均处理时间
03缓存反复使用的Dstream(RDD)
Spark中的RDD和SparkStreaming中的Dstream如果被反复使用,最好利用cache()函数将该数据流缓存起来,防止过度地调度资源造成的网络开销。可以参考并观察Scheduling Delay参数,如图7.3所示。
图7.3 SparkUI中调度延迟
04其他一些优化策略
除了以上针对Spark Streaming和Kafka这个特殊场景方面的优化外,对于前面提到的一些常规优化,也可以通过下面几点来完成。
设置合理的GC方式:使用--conf "spark.executor.extraJavaOptions=-XX:+UseConc MarkSweepGC"来配置垃圾回收机制。
设置合理的parallelism:在SparkStreaming+kafka的使用中,我们采用了Direct连接方式,前面讲过Spark中的partition和Kafka中的Partition是一一对应的,一般默认设置为Kafka中Partition的数量。
设置合理的CPU资源数:CPU的core数量,每个Executor可以占用一个或多个core,观察CPU使用率(Linux命令top)来了解计算资源的使用情况。例如,很常见的一种浪费是一个Executor占用了多个core,但是总的CPU使用率却不高(因为一个Executor并不会一直充分利用多核的能力),这个时候可以考虑让单个Executor占用更少的core,同时Worker下面增加更多的Executor;或者从另一个角度,增加单个节点的worker数量,当然这需要修改Spark集群的配置,从而增加CPU利用率。值得注意是,这里的优化有一个平衡,Executor的数量需要考虑其他计算资源的配置,Executor的数量和每个Executor分到的内存大小成反比,如果每个Executor的内存过小,容易产生内存溢出(out of memory)的问题。
高性能的算子:所谓高性能算子也要看具体的场景,通常建议使用reduceByKey/aggregateByKey来代替groupByKey。而存在数据库连接、资源加载创建等需求时,我们可以使用带partition的操作,这样在每一个分区进行一次操作即可,因为分区是物理同机器的,并不存在这些资源序列化的问题,从而大大减少了这部分操作的开销。例如,可以用mapPartitions、foreachPartitions操作来代替map、foreach操作。另外在进行coalesce操作时,因为会进行重组分区操作,所以最好进行必要的数据过滤filter操作。
Kryo优化序列化性能:7.1节已经详细介绍了这部分内容,我们只要设置序列化类,再注册要序列化的自定义类型即可(比如算子函数中使用到的外部变量类型、作为RDD泛型类型的自定义类型等)。
05结果
通过以上种种调整和优化,最终我们想要达到的目的便是,整个流式处理系统保持稳定,即Spark Streaming消费Kafka数据的速率赶上爬虫向Kafka生产数据的速率,使得Kafka中的数据尽可能快地被处理掉,减少积压,才能保证实时性,如图7.4所示。
图7.4 Spark Streaming和Kafka稳定运行监控图
当然不同的应用场景会有不同的图形,这是本文词频统计优化稳定后的监控图,我们可以看到在Processing Time柱形图中有一条Stable的虚线,而大多数Batch都能够在这一虚线下处理完毕,说明整体Spark Streaming是运行稳定的。
对于项目中具体的性能调优,有以下几个点需要注意:
一个DStream流只关联单一接收器,如果需要并行多个接收器来读取数据,那么需要创建多个DStream流。一个接收器至少需要运行在一个Executor上,甚至更多,我们需要保证在接收器槽占用了部分核后,还能有足够的核来处理接收到的数据。例如在设置spark.cores.max时需要将接收器的占用考虑进来,同时注意在分配Executor给接收器时,采用的是轮循的方式(round robin fashion)。
当接收器从数据源接收到数据时,会创建数据块,在每个微秒级的数据块间隔(blockInterval milliseconds)中都会有一个新的数据块生成。在每个批处理间隔内(batchInterval)数据块的数量N=batchInterval/blockInterval。这些数据块会由当前执行器(Executor)的数据块管理器(BlockManager)分发到其他执行器的数据块管理器。之后在Driver节点上运行的输入网络追踪器(Network Input Tracker)会通知数据块所在位置,以期进一步处理。
RDD是基于Driver节点上每个批处理间隔产生的数据块(blocks)而创建的,这些数据块是RDD的分支(partitions),每个分支是Spark中的一个任务(task)。如果blockInterval == batchInterval,那么意味着创建了单一分支,并且可能直接在本地处理。
数据块上的映射(map)任务在执行器(一个接收块,另一个复制块)中处理,该执行器不考虑块间隔,除非出现非本地调度。拥有更大的块间隔(blockInterval)意味着更大的数据块,如果将spark.locality.wait设置一个更大的值,那么更有可能在本地节点处理数据块。我们需要在两个参数间(blockInterval和spark.locality.wait)做一个折中,确保越大的数据块更可能在本地被处理。
除了依赖于batchInterval和blockInterval,我们可以直接通过inputDstream. repartition(n)来确定分支的数量。这个操作会重新打乱(reshuffles)RDD中的数据,随机的分配给n个分支。当然打乱(shuffle)过程会造成一定的开销,但是会有更高的并行度。RDD的处理是由驱动程序的jobscheduler作为作业安排的。在给定的时间点上,只有一个作业是活动的。因此,如果一个作业正在执行,那么其他作业将排队。
如果我们有两个Dstreams,那么将形成两个RDDs,并将创建两个作业,每个作业(job)都被安排为一个接着一个地执行。为了避免这种情况,可以联合两个Dstreams(union)。这将确保为Dstreams的两个RDD形成单一的unionRDD。而这个unionRDD会被视为一个作业,但是RDDs的分区不会受到影响。
如果批处理时间大于batchinterval,那么很明显,接收方的内存将逐渐被填满,并最终抛出异常(很可能是BlockNotFoundException)。目前没有办法暂停接收,那么可以利用SparkConf配置项中的spark.streaming.receiver.maxRate来控制接收器的速率。
本文摘编自《Spark Streaming 实时流式大数据处理实战 》,经出版方授权发布。
以上是关于文末送书 | Spark Streaming 性能调优的主要内容,如果未能解决你的问题,请参考以下文章