Spark Streaming
Posted scls
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark Streaming相关的知识,希望对你有一定的参考价值。
1、课程目标
-
1、掌握sparkStreaming底层原理和架构
-
2、掌握DStream的原理
-
3、掌握sparkStreaming与flume整合
-
4、掌握sparkStreaming与kafka整合
2、sparkStreaming
-
它是一个可扩展,高吞吐具有容错性的流式计算。
3、sparkStreaming特性
-
1、易用性
-
可以像编写离线批处理一样去编写流式程序
-
可以使用java/python/R
-
-
2、容错性
-
保证数据恰好只被处理一次
-
-
3、融合spark体系
4、sparkStreaming原理
-
Spark Streaming 是基于spark的流式批处理引擎,其基本原理是把输入数据以某一时间间隔批量的处理,当批处理间隔缩短到秒级时,便可以用于处理实时数据流。
5、Dstream
-
Discretized Stream是Spark Streaming的基础抽象,代表持续性的数据流和经过各种Spark算子操作后的结果数据流.
6、Dstream操作
-
transformation
-
它是一个转换,与RDD中的transformation算子操作类似,它会生成一个新的Dstream
-
-
outputOperations
-
类似于rdd中的action操作,触发任务的运行,得到结果数据。
-
7、DStream操作实战
需要引入对应的jar包
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.0.2</version> </dependency>
-
1.SparkStreaming接受socket数据,实现单词计数WordCount
package cn.itcast.stream ? import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} ? //todo:利用sparkStreaming接受socket数据,实现单词计数 object SparkStreamingSocket { def main(args: Array[String]): Unit = { //1、创建sparkConf 设置master的地址local[N] ,n必须大于1,其中1个线程负责去接受数据,另一线程负责处理接受到的数据 val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingSocket").setMaster("local[2]") // 2、创建sparkContext val sc = new SparkContext(sparkConf) sc.setLogLevel("WARN") //3、创建streamingContext,需要sparkContext和以多久时间间隔为一个批次 val ssc = new StreamingContext(sc,Seconds(5)) //4、通过streaming接受socket数据 val stream: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.200.100",9999) //5、切分每一行 val words: DStream[String] = stream.flatMap(_.split(" ")) //6、每个单词记为1 val wordAndOne: DStream[(String, Int)] = words.map((_,1)) //7、相同单词出现的次数累加 val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_) ? //8、打印 result.print() ? //9、开启流式计算 ssc.start() //一直会阻塞,等待退出 ssc.awaitTermination() } }
-
2.SparkStreaming接受socket数据,实现单词计数累加
package cn.itcast.stream ? import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} ? //todo:利用sparkStreaming接受socket数据,实现所有批次单词计数结果累加 object SparkStreamingSocketTotal { ? //定义一个方法 //currentValues:他表示在当前批次每个单词出现的所有的1 (hadoop,1) (hadoop,1)(hadoop,1) //historyValues:他表示在之前所有批次中每个单词出现的总次数 (hadoop,100) def updateFunc(currentValues:Seq[Int], historyValues:Option[Int]):Option[Int] = { val newValue: Int = currentValues.sum+historyValues.getOrElse(0) Some(newValue) } ? def main(args: Array[String]): Unit = { //1、创建sparkConf val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingSocketTotal").setMaster("local[2]") //2、创建sparkContext val sc = new SparkContext(sparkConf) sc.setLogLevel("WARN") //3、创建streamingContext val ssc = new StreamingContext(sc,Seconds(5)) //设置checkpoint目录 ssc.checkpoint("./ck") //4、接受socket数据 val stream: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.200.100",9999) //5、切分每一行 val words: DStream[String] = stream.flatMap(_.split(" ")) //6、把每一个单词计为1 val wordAndOne: DStream[(String, Int)] = words.map((_,1)) //7、相同单词出现的次数累加 val result: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunc) ? //8、打印结果数据 result.print() ? //9、开启流式计算 ssc.start() ssc.awaitTermination() } } ?
-
3.SparkStreaming开窗函数reduceByKeyAndWindow,实现单词计数
package cn.itcast.stream ? import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} ? //todo:利用sparkStreaming开窗函数 reducebyKeyAndWindow实现单词计数 object SparkStreamingSocketWindow { ? //定义一个方法 //currentValues:他表示在当前批次每个单词出现的所有的1 (hadoop,1) (hadoop,1)(hadoop,1) //historyValues:他表示在之前所有批次中每个单词出现的总次数 (hadoop,100) def updateFunc(currentValues:Seq[Int], historyValues:Option[Int]):Option[Int] = { val newValue: Int = currentValues.sum+historyValues.getOrElse(0) Some(newValue) } ? def main(args: Array[String]): Unit = { //1、创建sparkConf val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingSocketWindow").setMaster("local[2]") //2、创建sparkContext val sc = new SparkContext(sparkConf) sc.setLogLevel("WARN") //3、创建streamingContext val ssc = new StreamingContext(sc,Seconds(5)) //设置checkpoint目录 ssc.checkpoint("./ck") //4、接受socket数据 val stream: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.200.100",9999) //5、切分每一行 val words: DStream[String] = stream.flatMap(_.split(" ")) //6、把每一个单词计为1 val wordAndOne: DStream[(String, Int)] = words.map((_,1)) //7、相同单词出现的次数累加 //reduceByKeyAndWindow该方法需要三个参数 //reduceFunc:需要一个函数 //windowDuration:表示窗口的长度 //slideDuration:表示窗口滑动时间间隔,即每隔多久计算一次 val result: DStream[(String, Int)] = wordAndOne.reduceByKeyAndWindow((x:Int,y:Int)=>x+y,Seconds(5),Seconds(10)) ? //8、打印结果数据 result.print() ? //9、开启流式计算 ssc.start() ssc.awaitTermination() } }
* 4.SparkStreaming开窗函数统计一定时间内的热门词汇
package cn.itcast.stream import org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.rdd.RDDimport org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} //todo:利用sparkStreaming开窗函数统计单位时间内热门词汇----出现频率比较高的词汇object SparkStreamingSocketWindowHotWords { //定义一个方法 //currentValues:他表示在当前批次每个单词出现的所有的1 (hadoop,1) (hadoop,1)(hadoop,1) //historyValues:他表示在之前所有批次中每个单词出现的总次数 (hadoop,100) def updateFunc(currentValues:Seq[Int], historyValues:Option[Int]):Option[Int] = { val newValue: Int = currentValues.sum+historyValues.getOrElse(0) Some(newValue) } def main(args: Array[String]): Unit = { //1、创建sparkConf val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingSocketWindowHotWords").setMaster("local[2]") //2、创建sparkContext val sc = new SparkContext(sparkConf) sc.setLogLevel("WARN") //3、创建streamingContext val ssc = new StreamingContext(sc,Seconds(5)) //设置checkpoint目录 ssc.checkpoint("./ck") //4、接受socket数据 val stream: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.200.100",9999) //5、切分每一行 val words: DStream[String] = stream.flatMap(_.split(" ")) //6、把每一个单词计为1 val wordAndOne: DStream[(String, Int)] = words.map((_,1)) //7、相同单词出现的次数累加 //reduceByKeyAndWindow该方法需要三个参数 //reduceFunc:需要一个函数 //windowDuration:表示窗口的长度 //slideDuration:表示窗口滑动时间间隔,即每隔多久计算一次 val result: DStream[(String, Int)] = wordAndOne.reduceByKeyAndWindow((x:Int,y:Int)=>x+y,Seconds(10),Seconds(5)) ? //8、按照单词出现的次数降序排列 val sortedDstream: DStream[(String, Int)] = result.transform(rdd => { //将rdd中数据按照单词出现的次数降序排列 val sortedRDD: RDD[(String, Int)] = rdd.sortBy(_._2, false) //取出前3位 val sortHotWords: Array[(String, Int)] = sortedRDD.take(3) //打印前3位结果数据 sortHotWords.foreach(x => println(x)) sortedRDD }) //9、打印排序后的结果数据 sortedDstream.print() ? //10、开启流式计算 ssc.start() ssc.awaitTermination() }}
8、SparkStreaming整合Flume
-
Poll方式
package cn.itcast.dstream.flume ? import java.net.InetSocketAddress ? import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent} ? //todo:利用sparkStreaming对接flume数据,实现单词计算------Poll拉模式 object SparkStreamingFlume_Poll { def main(args: Array[String]): Unit = { //1、创建sparkConf val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingFlume_Poll").setMaster("local[2]") //2、创建sparkContext val sc = new SparkContext(sparkConf) sc.setLogLevel("WARN") //3、创建StreamingContext val ssc = new StreamingContext(sc,Seconds(5)) //定义一个flume地址集合,可以同时接受多个flume的数据 val address=Seq(new InetSocketAddress("192.168.200.100",9999),new InetSocketAddress("192.168.200.101",9999)) ? //4、获取flume中数据 val stream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(ssc,address,StorageLevel.MEMORY_AND_DISK_SER_2) //5、从Dstream中获取flume中的数据 {"header":xxxxx "body":xxxxxx} val lineDstream: DStream[String] = stream.map(x => new String(x.event.getBody.array())) //6、切分每一行,每个单词计为1 val wordAndOne: DStream[(String, Int)] = lineDstream.flatMap(_.split(" ")).map((_,1)) //7、相同单词出现的次数累加 val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_) //8、打印输出 result.print() ? //开启计算 ssc.start() ssc.awaitTermination() ? } }
-
push方式?
package cn.itcast.dstream.flume ? import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent} ? //todo:利用sparkStreaming对接flume数据,实现单词计数------Push推模式 object SparkStreamingFlume_Push { ? def main(args: Array[String]): Unit = { //1、创建sparkConf val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingFlume_Push").setMaster("local[2]") //2、创建sparkContext val sc = new SparkContext(sparkConf) sc.setLogLevel("WARN") //3、创建StreamingContext val ssc = new StreamingContext(sc,Seconds(5)) //4、获取flume中的数据 val stream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createStream(ssc,"192.168.11.123",9999) //5、从Dstream中获取flume中的数据 {"header":xxxxx "body":xxxxxx} val lineDstream: DStream[String] = stream.map(x => new String(x.event.getBody.array())) //6、切分每一行,每个单词计为1 val wordAndOne: DStream[(String, Int)] = lineDstream.flatMap(_.split(" ")).map((_,1)) //7、相同单词出现的次数累加 val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_) //8、打印输出 result.print() ? //开启计算 ssc.start() ssc.awaitTermination() } }
9、Spark Streaming整合kafka实战
-
KafkaUtils.createDstream方式(基于kafka高级Api-----偏移量由zk保存)
package cn.itcast.kafka ? import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.kafka.KafkaUtils ? import scala.collection.immutable ? //todo:sparkStreaming整合kafka---基于receiver(高级api) object SparkStreamingKafkaReceiver { def main(args: Array[String]): Unit = { //1、创建sparkConf val sparkConf: SparkConf = new SparkConf() .setAppName("SparkStreamingKafkaReceiver") .setMaster("local[4]") //线程数要大于receiver个数 .set("spark.streaming.receiver.writeAheadLog.enable","true") //表示开启WAL预写日志,保证数据源的可靠性 //2、创建sparkContext val sc = new SparkContext(sparkConf) sc.setLogLevel("WARN") //3、创建streamingContext val ssc = new StreamingContext(sc,Seconds(5)) ssc.checkpoint("./spark_receiver") //4、准备zk地址 val zkQuorum="node1:2181,node2:2181,node3:2181" //5、准备groupId val groupId="spark_receiver" //6、定义topic 当前这个value并不是topic对应的分区数,而是针对于每一个分区使用多少个线程去消费 val topics=Map("spark_01" ->2) //7、KafkaUtils.createStream 去接受kafka中topic数据 //(String, String) 前面一个string表示topic名称,后面一个string表示topic中的数据 //使用多个reveiver接受器去接受kafka中topic数据 val dstreamSeq: immutable.IndexedSeq[ReceiverInputDStream[(String, String)]] = (1 to 3).map(x => { val stream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics) stream } ) ? //利用streamingcontext调用union,获取得到所有receiver接受器的数据 val totalDstream: DStream[(String, String)] = ssc.union(dstreamSeq) ? //8、获取kafka中topic的数据 val topicData: DStream[String] = totalDstream.map(_._2) //9、切分每一行 val wordAndOne: DStream[(String, Int)] = topicData.flatMap(_.split(" ")).map((_,1)) //10、相同单词出现的次数累加 val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_) //11、打印结果数据 result.print() ? ? //12、开启流式计算 ssc.start() ssc.awaitTermination() ? } }
-
KafkaUtils.createDirectStream方式(基于kafka低级Api-----偏移量由客户端程序保存)
package cn.itcast.kafka ? import kafka.serializer.StringDecoder import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka.KafkaUtils ? //todo:sparkSteaming整合kafka----采用direct(低级Api) object SparkStreamingKafkaDirect { def main(args: Array[String]): Unit = { //1、创建sparkConf val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingKafkaDirect").setMaster("local[2]") //2、创建sparkContext val sc = new SparkContext(sparkConf) sc.setLogLevel("WARN") //3、创建streamingcontext val ssc = new StreamingContext(sc,Seconds(5)) ssc.checkpoint("./spark_direct") //它会保存topic的偏移量 //4、准备kafka参数 val kafkaParams=Map("metadata.broker.list"->"node1:9092,node2:9092,node3:9092","group.id" ->"spark_direct") //5、准备topic val topics=Set("spark_01") //6、获取kafka中的数据 val dstream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics) //7、获取topic中的数据 val data: DStream[String] = dstream.map(_._2) //8、切分每一行,每个单词计为1,把相同单词出现的次数累加 val result: DStream[(String, Int)] = data.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) ? //9、打印结果数据 result.print() ? //10、开启流式计算 ssc.start() ssc.awaitTermination() ? } }
以上是关于Spark Streaming的主要内容,如果未能解决你的问题,请参考以下文章
整合Kafka到Spark Streaming——代码示例和挑战