SparkStreaming进行 生产消费的单词统计

Posted xjqi

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SparkStreaming进行 生产消费的单词统计相关的知识,希望对你有一定的参考价值。

 1 package com.bawei.review01
 2 
 3 import java.net.InetSocketAddress
 4 
 5 import org.apache.kafka.clients.consumer.ConsumerRecord
 6 import org.apache.kafka.common.serialization.StringDeserializer
 7 import org.apache.spark.storage.StorageLevel
 8 import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream}
 9 import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
10 import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
11 import org.apache.spark.streaming.kafka010.KafkaUtils
12 import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
13 import org.apache.spark.streaming.{Seconds, StreamingContext}
14 import org.apache.spark.{SparkConf, SparkContext}
15 
16 
17 object SparkStreamingWC {
18   def main(args: Array[String]): Unit = {
19 
20     val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingWC").setMaster("local[2]")
21     val sparkContext = new SparkContext(sparkConf)
22 
23     sparkContext.setLogLevel("WARN")
24     val ssc = new StreamingContext(sparkContext,Seconds(5))
25 
26 
27 
28     val kafkaParams = Map[String, Object](
29       "bootstrap.servers" -> "192.168.182.147:9092,192.168.182.148:9092,192.168.182.149:9092",
30       "key.deserializer" -> classOf[StringDeserializer],
31       "value.deserializer" -> classOf[StringDeserializer],
32       "group.id" -> "group1"
33     )
34     //5、定义一个topics ,是一个集合,可以存放多个topic
35     val topics=Set("test")
36     //6、利用KafkaUtils.createDirectStream构建Dstream
37     val kafkaTopicDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams))
38     //获取kafka中topic的数据
39     val socketline: DStream[String] = kafkaTopicDS.map(x=>x.value())
40 
41     val dataDS: DStream[(String, Int)] = socketline.flatMap(_.split(" ")).map((_,1))
42 
43 
44     dataDS.reduceByKey(_+_).print()   ///(kello,3),  (hello,2)
45 
46 
47     ssc.start()
48     ssc.awaitTermination()
49   }
50 
51 }

 

以上是关于SparkStreaming进行 生产消费的单词统计的主要内容,如果未能解决你的问题,请参考以下文章

sparkStreaming 读kafka的数据

SparkStreaming消费kafka数据

kafka查看消费了多少条数据

kafka查看消费了多少条数据

智慧能源管控系统开发智慧能源系统开发

SparkStreaming读取kafka生产的数据,进行累计词频统计后将最新结果存入MySQL数据库