Stream From 整合 Kafka

Posted xjqi

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Stream From 整合 Kafka相关的知识,希望对你有一定的参考价值。

 1 package com.bawei.stream
 2 
 3 import org.apache.kafka.clients.consumer.ConsumerRecord
 4 import org.apache.kafka.common.serialization.StringDeserializer
 5 import org.apache.log4j.{Level, Logger}
 6 import org.apache.spark.streaming.dstream.{DStream, InputDStream}
 7 import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
 8 import org.apache.spark.streaming.kafka010.KafkaUtils
 9 import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
10 import org.apache.spark.streaming.{Seconds, StreamingContext}
11 import org.apache.spark.{SparkConf, SparkContext}
12 
13 
14 object StreamFromKafka {
15   def updateFunc(a: Seq[Int], b: Option[Int]): Option[Int] = {
16     Some(a.sum + b.getOrElse(0))
17   }
18 
19   def main(args: Array[String]): Unit = {
20 
21     val checkpointPath = "./kafka-direct"
22 
23     val ssc = StreamingContext.getOrCreate(checkpointPath, () => {
24       createFunc(checkpointPath)
25     })
26     ssc.start()
27     ssc.awaitTermination()
28   }
29   def createFunc(checkpointPath:String): StreamingContext = {
30     //todo:1、创建sparkConf
31     val sparkConf: SparkConf = new SparkConf()
32       .setAppName("SparkStreamingKafka_direct_checkpoint")
33       .setMaster("local[4]")
34 
35     //todo:2、创建sparkContext
36     val sc = new SparkContext(sparkConf)
37 
38     sc.setLogLevel("WARN")
39     //Logger.getLogger("org").setLevel(Level.ERROR)
40     //todo:3、创建StreamingContext
41     val ssc = new StreamingContext(sc, Seconds(5))
42     ssc.checkpoint(checkpointPath)
43     //todo:4、kafka的参数配置
44     /*val kafkaParams=Map("metadata.broker.list" ->"node1:9092,node2:9092,node3:9092"
45       ,"group.id" -> "kafka-direct01")*/
46 
47     val kafkaParams = Map[String, Object](
48       "bootstrap.servers" -> "192.168.182.147:9092,192.168.182.148:9092,192.168.182.149:9092",
49       "key.deserializer" -> classOf[StringDeserializer],
50       "value.deserializer" -> classOf[StringDeserializer],
51       "group.id" -> "group1"
52     )
53 
54     //todo:5、定义一个topics ,是一个集合,可以存放多个topic
55     val topics=Set("test")
56 
57     //todo:6、利用KafkaUtils.createDirectStream构建Dstream
58     //val kafkaTopicDS: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
59      val kafkaTopicDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams))
60     //todo:7、获取kafka中topic的数据
61     val kafkaData: DStream[String] = kafkaTopicDS.map(x=>x.value())
62 
63     //todo:8、切分每一行,每个单词记为1
64     val wordAndOne: DStream[(String, Int)] = kafkaData.flatMap(_.split(" ")).map((_,1))
65 
66     //todo:9、相同单词出现次数累加
67     val result: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunc)
68 
69     //todo:打印
70     result.print()
71     ssc
72   }
73 }

 

以上是关于Stream From 整合 Kafka的主要内容,如果未能解决你的问题,请参考以下文章

Spring Cloud(12)——基于Kafka的Stream实现

Spring cloud stream入门介绍

将Kafka Streams代码迁移到Spring Cloud Stream吗?

#yyds干货盘点# springcloud整合stream消费自己生产的消息

整合Kafka到Spark Streaming——代码示例和挑战

超详细!一文告诉你 SparkStreaming 如何整合 Kafka !附代码可实践