Spark(14)——spark streaming 监控方案
Posted Java学习基地Java毕设定制
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark(14)——spark streaming 监控方案相关的知识,希望对你有一定的参考价值。
spark streaming 监控方案
SparkStreaming监控从spark2.2.0版本开始支持,目前不支持2.1.0
调研背景介绍
业务反应sparkstreaming任务数据处理存在堆积情况,但是仍然会不断从kafka拉取数据,针对这种情况调研,给出解决的方案
问题现象描述
如果批处理时间始终大于批处理间隔或者队列等待延时持续增加,表明系统无法在批处理间隔内处理完毕消息,通过抽查sparkstreaming任务,发现部分任务开启了背压参数,部分任务未开启。
开启的任务参数如下:
appId=application_1535616282827_0003
kafka连接方式为receiver方式
spark.streaming.backpressure.enabled=true
spark.streaming.backpressure.initialRate=5000
spark.streaming.kafka.maxRatePerPartition=2000
spark反压介绍
spark streaming任务在处理不断流入的数据是通过每间隔一段时间即(batch interval),将这段时间内流入的数据积累为一个batch,然后对这个batch的数据作为DAG的输入RDD提交新的job运行。
当一个batch处理所需的时间大于批周期(batch interval)时,意味着数据处理速度跟不上数据的接收速度,此时在数据接收端即Receiver(zk方式receiver运行在driver上,direct方式运行在executor)就会累积数据,因为数据时存储在spark中是由blockmanager管理的,存储方式采用MEMORY_ONLY模式就会导致OOM,采用MEMORY_AND_DISK多余的数据溢出到磁盘会增加数据读写时间,增加延迟。为了解决这一问题,spark在1.5版本后加入反压机制(back-pressure),根据前一批次数据的处理情况,动态、自动的调整后续数据的摄入量。
反压参数解释
spark.streaming.backpressure.enabled true 是否开启反压。
spark.streaming.backpressure.initialRate 5000 反压机制时初始化的摄入量,该参数只对receiver模式起作用,并不适用于direct模式
spark.streaming.kafka.maxRatePerPartition 2000 每次作业中每个,Kafka分区最多读取的记录条数。可以防止第一个批次流量出现过载情况,也可以防止批次处理出现数据流量过载情况
建议所有streaming任务都开启反压机制
sparkstreaming任务监控
界面监控
在spark任务界面上提供了‘Streaming’选项卡,可以显示有关receiver和已完成的batch,队列延迟等情况。
主要指标:
1.Processing Time 处理每批数据的时间
2.Scheduling Delay 批次等待处理的延迟时间
程序监控
如何通过编程的方式获取某一个任务的执行情况等,下面通过解释sparkstreaming wordcount的方式解释几个重要指标的采集。
wc程序代码:
def main(args: Array[String]): Unit = {
// uncomment these 2 lines if you want to see only your logs
// Logger.getLogger("org").setLevel(Level.OFF)
// Logger.getLogger("akka").setLevel(Level.OFF)
ApplicationProperties.parse(args.toList)
val sparkConf = new SparkConf().setMaster(ApplicationProperties.sparkMaster).setAppName(ApplicationProperties.appName)
val ssc = new StreamingContext(sparkConf, Seconds(ApplicationProperties.batchInterval))
//添加监控
ssc.addStreamingListener(new SparkMonitoringListener)
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream(ApplicationProperties.socketStreamHost, ApplicationProperties.socketStreamPort)
// Split each line into words
val words = lines.flatMap(_.split(" "))
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
}
通过在程序中设置listener实现对应用的监控,监控类实现如下:
case class SparkMonitoringListener() extends StreamingListener {
override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {
println(">>> Batch started...records in batch = " + batchStarted.batchInfo.numRecords)
}
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
val start = .batchInfo.processingStartTime.get
val end = batchCompleted.batchInfo.processingEndTime.get
val batchTime = batchCompleted.batchInfo.batchTime
val numRecords = batchCompleted.batchInfo.numRecords
println("batch finished", start, end, end-start, batchTime.toString(), numRecords)
}
}
通过继承StreamingListener,实现内部onBatchStarted,onBatchCompleted等方法收集适合自己业务的相关参数或告警设置。StreamingListener API可以参考:https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.scheduler.StreamingListener
API监控
以上是关于Spark(14)——spark streaming 监控方案的主要内容,如果未能解决你的问题,请参考以下文章
Apache Spark 2.2.0 中文文档 - Structured Streaming 编程指南 | ApacheCN
Spark Streaming性能优化: 如何在生产环境下应对流数据峰值巨变