SparkStreaming---窗口函数
Posted Shall潇
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SparkStreaming---窗口函数相关的知识,希望对你有一定的参考价值。
文章目录
一、窗口函数
package window
import org.apache.kafka.clients.consumer.ConsumerConfig, ConsumerRecord
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream, InputDStream
import org.apache.spark.streaming.kafka010.ConsumerStrategies, KafkaUtils, LocationStrategies
import org.apache.spark.streaming.Seconds, StreamingContext
/*
* 窗口函数的窗口大小和滑动大小必须是 采集周期的整数倍
* */
object WindowDemo
def main(args: Array[String]): Unit =
val conf = new SparkConf().setMaster("local[*]").setAppName("Windows")
val streamingContext = new StreamingContext(conf,Seconds(2))
streamingContext.checkpoint("checkpoint")
val kafkaParams = Map(
(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.XXX.100:9092"),
(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG->"org.apache.kafka.common.serialization.StringDeserializer"),
(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG->"org.apache.kafka.common.serialization.StringDeserializer"),
// (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG->"earliest"),
(ConsumerConfig.GROUP_ID_CONFIG->"kafkaGroup4")
)
val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe(Set("sparkKafka"), kafkaParams)
)
// 操作窗口
// val wind1: DStream[(String, Int)] = kafkaStream.flatMap(_.value().toString.split("\\\\s+"))
// .map(x => (x, 1))
// .window(Seconds(6)) // 普通窗口
// .window(Seconds(8), Seconds(4)) // 滑动窗口
// countByValueAndWindow
// val wind1: DStream[(String, Long)] = kafkaStream.flatMap(_.value().toString.split("\\\\s+"))
// .map(x => (x, 1))
// .countByValueAndWindow(Seconds(8),Seconds(4)) //统计value个数的窗口
// reduceByWindow -1
// val wind1 = kafkaStream.flatMap(_.value().toString.split("\\\\s+"))
// wind1.reduceByWindow((x,y)=>x+"%"+y,Seconds(8),Seconds(4))
// reduceByWindow -2
// val wind1 = kafkaStream.flatMap(_.value().toString.split("\\\\s+")).map(x=>(x,1))
// .reduceByWindow((x,y)=>("count:",x._2+y._2),Seconds(8),Seconds(4))
//reduceByKeyAndWindow
// val wind1 = kafkaStream.flatMap(_.value().toString.split("\\\\s+")).map(x=>(x,1))
// .reduceByKeyAndWindow((x:Int,y:Int)=>x+y,Seconds(8),Seconds(4))
val wind1 = kafkaStream.flatMap(_.value().toString.split("\\\\s+")).map(x=>(x,1))
.reduceByKeyAndWindow((x:Int,y:Int)=>println("one");x+y,(x:Int,y:Int)=>println("two");x+y,Seconds(8),Seconds(4))
wind1.print()
streamingContext.start()
streamingContext.awaitTermination()
二、transform
package window
import java.text.SimpleDateFormat
import org.apache.kafka.clients.consumer.ConsumerConfig, ConsumerRecord
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds, StreamingContext
import org.apache.spark.streaming.dstream.DStream, InputDStream
import org.apache.spark.streaming.kafka010.ConsumerStrategies, KafkaUtils, LocationStrategies
object WindowDemo2
def main(args: Array[String]): Unit =
val conf = new SparkConf().setMaster("local[*]").setAppName("Windows")
val streamingContext = new StreamingContext(conf,Seconds(2))
streamingContext.checkpoint("checkpoint")
val kafkaParams = Map(
(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.XXX.100:9092"),
(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG->"org.apache.kafka.common.serialization.StringDeserializer"),
(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG->"org.apache.kafka.common.serialization.StringDeserializer"),
// (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG->"earliest"),
ConsumerConfig.GROUP_ID_CONFIG->"kafkaGroup4"
)
val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe(Set("sparkKafka"), kafkaParams)
)
// TODO
// transform --- 完成数据结构的转化
val trans: DStream[((String, String), Int)] = kafkaStream.transform((rdd, timestamp) =>
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val time: String = sdf.format(timestamp.milliseconds)
val value = rdd.flatMap(x => x.value().split("\\\\s+").map(x => ((x, time), 1))).reduceByKey(_ + _)
value
)
kafkaStream.transform((x,y)=>
x.flatMap(_.value().split("\\\\s+").map(x=>(x,1)))
)
trans.print()
streamingContext.start()
streamingContext.awaitTermination()
以上是关于SparkStreaming---窗口函数的主要内容,如果未能解决你的问题,请参考以下文章
Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十二)Spark Streaming接收流数据及使用窗口函数