069 在SparkStreaming的窗口分析
Posted juncaoit
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了069 在SparkStreaming的窗口分析相关的知识,希望对你有一定的参考价值。
一:说明
1.图例说明
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------
2.对比说明
DStream:
batchInterval: 批次产生间隔时间
Window DStream:
windowInterval: 窗口间隔时间, 必须是父DStream的batchInterval的倍数(k >= 1, 整数)
slideInterval:窗口滑动间隔时间, 必须是父DStream的batchInterval的倍数(k >= 1, 整数)
3.API
使用CTRL+F3,可以参考这篇文档的快捷键:https://blog.csdn.net/qq_36901488/article/details/80704245
二:程序
1.程序
1 package com.window.it 2 import org.apache.spark.{SparkConf, SparkContext} 3 import org.apache.spark.storage.StorageLevel 4 import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext} 5 import org.apache.spark.streaming.dstream.DStream 6 import org.apache.spark.streaming.kafka.KafkaUtils 7 8 object ReduceWindow { 9 def main(args: Array[String]): Unit = { 10 val conf = new SparkConf() 11 .setAppName("StreamingWindowOfKafka") 12 .setMaster("local[*]") 13 val sc = SparkContext.getOrCreate(conf) 14 val ssc = new StreamingContext(sc, Seconds(5)) 15 // 当调用updateStateByKey函数API的时候,必须给定checkpoint dir 16 // 路径对应的文件夹不能存在 17 ssc.checkpoint("hdfs://linux-hadoop01.ibeifeng.com:8020/beifeng/spark/streaming/452512") 18 19 val kafkaParams = Map( 20 "group.id" -> "streaming-kafka-78912151", 21 "zookeeper.connect" -> "linux-hadoop01.ibeifeng.com:2181/kafka", 22 "auto.offset.reset" -> "smallest" 23 ) 24 val topics = Map("beifeng" -> 4) // topics中value是读取数据的线程数量,所以必须大于等于1 25 val dstream = KafkaUtils.createStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder]( 26 ssc, // 给定SparkStreaming上下文 27 kafkaParams, // 给定连接kafka的参数信息 ===> 通过Kafka HighLevelConsumerAPI连接 28 topics, // 给定读取对应topic的名称以及读取数据的线程数量 29 StorageLevel.MEMORY_AND_DISK_2 // 指定数据接收器接收到kafka的数据后保存的存储级别 30 ).map(_._2) 31 32 val resultWordCount = dstream 33 .filter(line => line.nonEmpty) 34 .flatMap(line => line.split(" ").map((_, 1))) 35 .reduceByKeyAndWindow( 36 (a: Int, b: Int) => a + b, 37 Seconds(15), // 窗口大小 38 Seconds(10) // 滑动大小 39 ) 40 resultWordCount.print() // 这个也是打印数据 41 42 // 启动开始处理 43 ssc.start() 44 ssc.awaitTermination() // 等等结束,监控一个线程的中断操作 45 } 46 }
2.效果
这里主要看的是页面的DAG。
以上是关于069 在SparkStreaming的窗口分析的主要内容,如果未能解决你的问题,请参考以下文章