Spark(14)——spark streaming 监控方案

Posted Java学习基地Java毕设定制

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark(14)——spark streaming 监控方案相关的知识,希望对你有一定的参考价值。

spark streaming 监控方案

SparkStreaming监控从spark2.2.0版本开始支持,目前不支持2.1.0

调研背景介绍

业务反应sparkstreaming任务数据处理存在堆积情况,但是仍然会不断从kafka拉取数据,针对这种情况调研,给出解决的方案

问题现象描述

如果批处理时间始终大于批处理间隔或者队列等待延时持续增加,表明系统无法在批处理间隔内处理完毕消息,通过抽查sparkstreaming任务,发现部分任务开启了背压参数,部分任务未开启。
开启的任务参数如下:

appId=application_1535616282827_0003kafka连接方式为receiver方式spark.streaming.backpressure.enabled=truespark.streaming.backpressure.initialRate=5000spark.streaming.kafka.maxRatePerPartition=2000

spark反压介绍

spark streaming任务在处理不断流入的数据是通过每间隔一段时间即(batch interval),将这段时间内流入的数据积累为一个batch,然后对这个batch的数据作为DAG的输入RDD提交新的job运行。
当一个batch处理所需的时间大于批周期(batch interval)时,意味着数据处理速度跟不上数据的接收速度,此时在数据接收端即Receiver(zk方式receiver运行在driver上,direct方式运行在executor)就会累积数据,因为数据时存储在spark中是由blockmanager管理的,存储方式采用MEMORY_ONLY模式就会导致OOM,采用MEMORY_AND_DISK多余的数据溢出到磁盘会增加数据读写时间,增加延迟。为了解决这一问题,spark在1.5版本后加入反压机制(back-pressure),根据前一批次数据的处理情况,动态、自动的调整后续数据的摄入量。

反压参数解释

spark.streaming.backpressure.enabled  true 是否开启反压。spark.streaming.backpressure.initialRate 5000 反压机制时初始化的摄入量,该参数只对receiver模式起作用,并不适用于direct模式spark.streaming.kafka.maxRatePerPartition 2000 每次作业中每个,Kafka分区最多读取的记录条数。可以防止第一个批次流量出现过载情况,也可以防止批次处理出现数据流量过载情况

建议所有streaming任务都开启反压机制

sparkstreaming任务监控

界面监控

在spark任务界面上提供了‘Streaming’选项卡,可以显示有关receiver和已完成的batch,队列延迟等情况。

主要指标:

1.Processing Time 处理每批数据的时间2.Scheduling Delay 批次等待处理的延迟时间


程序监控

如何通过编程的方式获取某一个任务的执行情况等,下面通过解释sparkstreaming wordcount的方式解释几个重要指标的采集。

wc程序代码:

 def main(args: Array[String]): Unit = {
// uncomment these 2 lines if you want to see only your logs // Logger.getLogger("org").setLevel(Level.OFF) // Logger.getLogger("akka").setLevel(Level.OFF)
ApplicationProperties.parse(args.toList)
val sparkConf = new SparkConf().setMaster(ApplicationProperties.sparkMaster).setAppName(ApplicationProperties.appName)
val ssc = new StreamingContext(sparkConf, Seconds(ApplicationProperties.batchInterval)) //添加监控 ssc.addStreamingListener(new SparkMonitoringListener)
// Create a DStream that will connect to hostname:port, like localhost:9999 val lines = ssc.socketTextStream(ApplicationProperties.socketStreamHost, ApplicationProperties.socketStreamPort)
// Split each line into words val words = lines.flatMap(_.split(" "))
// Count each word in each batch val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print()
ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminate }

通过在程序中设置listener实现对应用的监控,监控类实现如下:

case class SparkMonitoringListener() extends StreamingListener {
override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = { println(">>> Batch started...records in batch = " + batchStarted.batchInfo.numRecords) }
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = { val start = .batchInfo.processingStartTime.get val end = batchCompleted.batchInfo.processingEndTime.get val batchTime = batchCompleted.batchInfo.batchTime val numRecords = batchCompleted.batchInfo.numRecords println("batch finished", start, end, end-start, batchTime.toString(), numRecords) }}

通过继承StreamingListener,实现内部onBatchStarted,onBatchCompleted等方法收集适合自己业务的相关参数或告警设置。StreamingListener API可以参考:https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.scheduler.StreamingListener

API监控


以上是关于Spark(14)——spark streaming 监控方案的主要内容,如果未能解决你的问题,请参考以下文章

sparkstreaming+kafka

Apache Spark 2.2.0 中文文档 - Structured Streaming 编程指南 | ApacheCN

Spark Streaming性能优化: 如何在生产环境下应对流数据峰值巨变

Spark(14)——spark streaming 监控方案

Spark深入学习 -14Spark应用经验与程序调优

Spark -14:spark Hadoop 高可用模式下读写hdfs