SparkStreaming进行 生产消费的单词统计
Posted xjqi
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SparkStreaming进行 生产消费的单词统计相关的知识,希望对你有一定的参考价值。
1 package com.bawei.review01 2 3 import java.net.InetSocketAddress 4 5 import org.apache.kafka.clients.consumer.ConsumerRecord 6 import org.apache.kafka.common.serialization.StringDeserializer 7 import org.apache.spark.storage.StorageLevel 8 import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream} 9 import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent} 10 import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe 11 import org.apache.spark.streaming.kafka010.KafkaUtils 12 import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent 13 import org.apache.spark.streaming.{Seconds, StreamingContext} 14 import org.apache.spark.{SparkConf, SparkContext} 15 16 17 object SparkStreamingWC { 18 def main(args: Array[String]): Unit = { 19 20 val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingWC").setMaster("local[2]") 21 val sparkContext = new SparkContext(sparkConf) 22 23 sparkContext.setLogLevel("WARN") 24 val ssc = new StreamingContext(sparkContext,Seconds(5)) 25 26 27 28 val kafkaParams = Map[String, Object]( 29 "bootstrap.servers" -> "192.168.182.147:9092,192.168.182.148:9092,192.168.182.149:9092", 30 "key.deserializer" -> classOf[StringDeserializer], 31 "value.deserializer" -> classOf[StringDeserializer], 32 "group.id" -> "group1" 33 ) 34 //5、定义一个topics ,是一个集合,可以存放多个topic 35 val topics=Set("test") 36 //6、利用KafkaUtils.createDirectStream构建Dstream 37 val kafkaTopicDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams)) 38 //获取kafka中topic的数据 39 val socketline: DStream[String] = kafkaTopicDS.map(x=>x.value()) 40 41 val dataDS: DStream[(String, Int)] = socketline.flatMap(_.split(" ")).map((_,1)) 42 43 44 dataDS.reduceByKey(_+_).print() ///(kello,3), (hello,2) 45 46 47 ssc.start() 48 ssc.awaitTermination() 49 } 50 51 }
以上是关于SparkStreaming进行 生产消费的单词统计的主要内容,如果未能解决你的问题,请参考以下文章