Streaming的单词统计

Posted xjqi

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Streaming的单词统计相关的知识,希望对你有一定的参考价值。

 1 package com.bawei.stream
 2 
 3 import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
 4 import org.apache.spark.streaming.{Seconds, StreamingContext}
 5 import org.apache.spark.{SparkConf, SparkContext}
 6 
 7 object StreamWC {
 8 
 9   def main(args: Array[String]): Unit = {
10     //配置sparkConf参数
11     val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingTCP").setMaster("local[2]")
12     //构建sparkContext对象
13     val sc: SparkContext = new SparkContext(sparkConf)
14     //设置日志输出级别
15     sc.setLogLevel("WARN")
16     //构建StreamingContext对象,每个批处理的时间间隔
17     val scc: StreamingContext = new StreamingContext(sc,Seconds(5))
18     scc.checkpoint("C:\Users\Desktop\checkpoint")
19     //注册一个监听的IP地址和端口  用来收集数据
20     val lines: ReceiverInputDStream[String] = scc.socketTextStream("192.168.182.147",9999)
21     //切分每一行记录
22     val words: DStream[String] = lines.flatMap(_.split(" "))
23     //每个单词记为1
24     val wordAndOne: DStream[(String, Int)] = words.map((_,1))
25     //分组聚合
26     //val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_)
27     //窗口时间,滑动时间
28     //val result: DStream[(String, Int)] = wordAndOne.reduceByKeyAndWindow((a:Int,b:Int)=>a+b,Seconds(10),Seconds(5))
29     //持续更新每个单词出现的次数
36 
37     wordAndOne.updateStateByKey((list:Seq[Int],option:Option[Int])=>{//
38       var before = option.getOrElse(0)//获取上一次的累加结果
39       for (value<-list){
40         before += value
41       }
42       Option(before)
43     }).print()
44 
45 
46 
47     scc.start()
48     scc.awaitTermination()
49     //scc.awaitTerminationOrTimeout(15000)
50   }
51 
52 }

 

以上是关于Streaming的单词统计的主要内容,如果未能解决你的问题,请参考以下文章

Spark Streaming从Kafka中获取数据,并进行实时单词统计,统计URL出现的次数

spark配置-----Spark Streaming

Spark Streaming实时处理

Spark Streaming实时处理

大数据之Streming单词统计

spark streaming