Spark Streaming整合Kafka

Posted truekai

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark Streaming整合Kafka相关的知识,希望对你有一定的参考价值。

0)摘要

  主要介绍了Spark Streaming整合Kafka,两种整合方式:Receiver-based和Direct方式。这里使用的是Kafka broker version 0.8.2.1,官方文档地址:(http://spark.apache.org/docs/2.2.0/streaming-kafka-0-8-integration.html)。

1)Kafka准备

  • 启动zookeeper
    ./zkServer.sh start
  • 启动kafka
    ./kafka-server-start.sh -daemon ../config/server.properties //后台启动
  • 创建topic
    ./kafka-topics.sh --create --zookeeper hadoop:2181 --replication-factor 1 --partitions 1 --topic test
  • 通过控制台测试topic能否正常的生产和消费

       启动生产者脚本:
         ./kafka-console-producer.sh --broker-list hadoop:9092 --topic test

   启动消费者脚本:

    ./kafka-console-consumer.sh --zookeeper hadoop:2181 --topic test --from-beginning

  准备工作已经就绪。

2)Receiver-based方式整合

注意:这种方式为了保证数据不会丢失,需要开启Write Ahead Logs机制,开启后,接收数据的正确性只有被预写到日志以后Receive才会确认,可以从日志中恢复数据,会增加额外的开销。如何开启?设置SparkConf的“Spark Streaming writeAheadLog.enable”属性为“true”,这种模式基本被淘汰

1 添加kafka依赖

        <!--        kafka依赖-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>

2 本地代码编写

 1 package flume_streaming
 2 
 3 import org.apache.spark.SparkConf
 4 import org.apache.spark.streaming.kafka._
 5 import org.apache.spark.streaming.{Durations, StreamingContext}
 6 
 7 /**
 8  * @Author: SmallWild
 9  * @Date: 2019/10/30 10:00
10  * @Desc:
11  */
12 
13 object kafkaReceiveWordCount {
14   def main(args: Array[String]): Unit = {
15     if (args.length != 4) {
16       System.err.println("错误参数")
17       System.exit(1)
18     }
19     //接收参数
20     //numPartitions 线程数
21     val Array(zkQuorum, groupId, topics, numPartitions) = args
22     //一定不能使用local[1]
23     val sparkConf = new SparkConf().setMaster("local[2]").setAppName("kafkaReceiveWordCount")
24     val ssc = new StreamingContext(sparkConf, Durations.seconds(5))
25     //设置日志级别
26     ssc.sparkContext.setLogLevel("WARN")
27     //多个topic用,分开
28     val topicMap = topics.split(",").map((_, numPartitions.toInt)).toMap
29     //TODO 业务逻辑,简单进行wordcount,输出到控制台
30     /**
31      * * @param ssc       StreamingContext object
32      * * @param zkQuorum  Zookeeper quorum (hostname:port,hostname:port,..)
33      * * @param groupId   The group id for this consumer topic所在的组,可以设置为自己想要的名称
34      * * @param topics    Map of (topic_name to numPartitions) to consume. Each partition is consumed
35      * *                  in its own thread
36      * * @param storageLevel  Storage level to use for storing the received objects
37      * *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
38      */
39     val lineMap = KafkaUtils.createStream(ssc, zkQuorum, groupId, topicMap)
40     lineMap.map(_._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()
41     ssc.start()
42     ssc.awaitTermination()
43   }
44 }

3 提交到服器上运行

  如果生产中没有联网,需要使用  --jars 传入kafka的jar包

  • 把项目达成jar包
  • 使用local模式提交,提交的脚本:
提交到服务器上运行
 ./spark-submit --master local[2] /
 --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 /
 --class flume_streaming.kafkaReceiveWordCount /
 /smallwild/app/SparkStreaming-1.0.jar /
 hadoop:2181 1 sparkStreaming 1

4 运行结果

  首先在控制台,启动kafka生产者,输入一些单词,然后,启动SparkStreaming程序。

技术图片

 

  

3)Direct方式整合

使用的是:Simple Consumer API,自己管理offset,把kfka看成存储数据的地方,根据offset去读。没有使用zk管理消费者的offset,spark自己管理,默认的offset在内存中,如果设置了checkpoint,那么也也有一份,一般要设置。Direct模式生成的Dstream中的RDD的并行度与读取的topic中的partition一致(增加topic的partition个数)

注意点:

  • 没有使用receive,直接查询的kafka偏移量

1 添加kafka依赖

        <!--        kafka依赖-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>

2 代码编写

技术图片
 1 package kafka_streaming
 2 
 3 import kafka.serializer.StringDecoder
 4 import org.apache.spark.SparkConf
 5 import org.apache.spark.streaming.{Durations, StreamingContext}
 6 import org.apache.spark.streaming.kafka.KafkaUtils
 7 
 8 /**
 9  * @Author: SmallWild
10  * @Date: 2019/10/31 21:21
11  * @Desc:
12  */
13 object kafkaDirectWordCount {
14 
15   def main(args: Array[String]): Unit = {
16     if (args.length != 2) {
17       System.err.println("错误参数")
18       System.exit(1)
19     }
20     //接收参数
21     //numPartitions 线程数
22     val Array(brokers, topics) = args
23     //一定不能使用local[1]
24     val sparkConf = new SparkConf().setMaster("local[2]").setAppName("kafkaDirectWordCount")
25     val ssc = new StreamingContext(sparkConf, Durations.seconds(5))
26     //设置日志级别
27     ssc.sparkContext.setLogLevel("WARN")
28     //多个topic用,分开
29     val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers
30     )
31     val topicsa = topics.split(",").toSet
32     /**
33      *
34      */
35     val lineMap = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsa)
36     //TODO 业务逻辑,简单进行wordcount,输出到控制台
37     lineMap.map(_._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()
38     ssc.start()
39     ssc.awaitTermination()
40   }
41 
42 }
View Code

3 提交到服务器上运行和第一种方式是上面一样

4 自己管理offset

  使用spark自己管理offset方便,但是当业务逻辑改变的时候,恢复就难了,需要自己手动编写代码管理offset

4)总结

  注意两种模式差别,receive模式几乎被淘汰,可以扩展的地方,1)使程序具备高可用的能力,挂掉之后,能否从上次的状态恢复过来,2)手动管理offset,改变了业务逻辑也能从上次的状态恢复过来

  

以上是关于Spark Streaming整合Kafka的主要内容,如果未能解决你的问题,请参考以下文章

Spark 系列(十六)—— Spark Streaming 整合 Kafka

Michael G. Noll:整合Kafka到Spark Streaming——代码示例和挑战

spark-streaming与kafka的整合

Spark Streaming实时流处理项目实战Spark Streaming整合Kafka实战一

Spark Streaming实时流处理项目实战Spark Streaming整合Kafka实战一

Spark Streaming实时流处理项目实战Spark Streaming整合Kafka实战一