实战Spark streaming与kafka
Posted 合格的程序猿
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了实战Spark streaming与kafka相关的知识,希望对你有一定的参考价值。
本文将介绍Spark streaming消费kafka主要有两种方式:receiver方式和直连方式。
一、receiver方式
1、利用kafka高阶API,offset由zookeeper维护。
2、接口:
KafkaUtils.createStream(streamingContext, [ZK quorum], [consumer group id],[per-topic number of Kafka partitions to consume])
3、主要实现逻辑:
Receiver方式接收数据的方式如下图:
Receiver源源不断的从数据源kafka接收数据,然后发送数据到spark内存,最后更新zookeeper中kafka offset的内存。
这样就存在这样一个问题:
如果receiver接收kafka消息发送到spark内存交由spark处理,更新完offset,这个时候如果driver程序挂掉,那么spark内存中正在处理中的数据就会丢失;driver重启后,因为kafka中的offset已经更新为最新,那么会继续从最新offset处处理数据,就会造成上一次处理数据的丢失。
Receiver方式为了保证数据零丢失,引入了checkpoint和wal机制。
(1)、checkpoint:spark提供的一种容错机制,主要有两种:
①元信息checkpoint:用来保存DStreamGraph以及相关配置信息,
以便在Driver崩溃重启的时候能够接着之前进度继续进行处理;
②消费数据的checkpoint:保存生成的RDD到一个可靠的存储系统中,常用的HDFS,通常有状态的数据横跨多个batch流的时候,需要做checkpoint.
(2)、WAL预写日志:receiver接收数据后,先将数据持久化到日志中(HDFS),如果driver重启,将会从日志文件中恢复数据。
启用checkpoint和wal机制的receiver方式如下图:
Receiver从kafka获取数据,将数据通过WAL持久化到HDFS上面,然后发送给spark内存进行处理,处理过程中将元数据信息checkpoint到hdfs上面,最后更新kafka的offset。
如果在处理过程中driver挂掉,恢复成功后会从checkpoint目录中寻找未执行的任务元信息,然后从wal日志中进行恢复,避免了数据丢失。
但是采用这种方式会存在重复消费的问题,如果最后一步更新kafka offset失败的话,那么spark下一次batch会重新从上一次的offset处重新拿去数据,造成另一次处理。
综上,receiver有如下特点:
1、至少处理一次
2、WAL减少了接收器的吞吐量,因为接受到的数据必须保存到可靠的分布式文件系统中;而且kafka和HDFS中存在两份数据,造成了资源的浪费。
为了解决recevier这些问题,spark streaming1.3引入了kafka直连的方式,而在实际生产环境中,大多数都使用的是直连方式。
二、直连方式
1、利用kafka低阶API,offset由spark checkpoint维护。
2、接口:
createDirectStream(StreamingContext ssc, scala.collection.immutable.Map<String,String> kafkaParams, scala.collection.immutable.Set<String> topics, scala.reflect.ClassTag<K> evidence$19, scala.reflect.ClassTag<V> evidence$20, scala.reflect.ClassTag<KD> evidence$21, scala.reflect.ClassTag<VD> evidence$22) |
3、主要优点:
(1)、不采用wal机制,减少了数据冗余存储。
(2)、创建的DStream的rdd的partition做到了和Kafka中topic的partition一一对应。
(3)、基于direct的方式,使用kafka的简单api,Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。
但是直连方式也有一个特点:因为kafka的offset由checkpoint维护,这就导致了zookeeper无法知道此时kafka的offset信息,会导致一些常用的工具如kafkaOffsetMoniter等无法使用,如果想继续使用这些监控offset的工具,可以在spark处理完数据后手动更新zookeeper的offset。
三、自定义维护kafka offset
在消费kafka过程中,checkpoint起到至关重要的作用,但是checkpoint有个弊端,每次流式程序升级的时候会报错误,因为checkpoint第一次持久化的时候会把整个相关的jar给序列化成一个二进制文件,每次重启都会从里面恢复,但是当你新的程序打包之后序列化加载的仍然是旧的序列化文件,这就会导致报错或者依旧执行旧代码,为了解决checkpoint这个问题,我们可以废弃offset,自己维护offset的变化,具体思路如下:
1、首次启动,先从第三方存储介质(可以使redis或者zk)中找是否有上次存储的偏移量,如果没有就从最新的消费,然后保存偏移量至第三方存储介质中
2、如果从第三方存储介质中找到了偏移量,那么就从指定的偏移量处开始消费处理,每个批处理处理完毕后,都会更新新的offset到第三方存储介质中, 这样以来无论是程序故障,还是宕机,再次启动后都会从上次的消费的偏移量处继续开始消费,而且程序的升级或功能改动新版本的发布都能正常运行并做到了消息不丢。
下面看一个具体例子:
val calculateReulstInputDstream = KafkaIO.createCustomDirectKafkaStream(ssc, kafkaParams, calculateResultTopics, dmpRealtimeSequenceUrl, Constants.CROWD_CALCULATE_KAFKA_SEQUENCE_ID) |
KafkaIO为定义的工具类,createCustomDirectKafkaStream为定义的获取kafka信息的方法,它的主要功能为每次从redis中获取最新的kafka offset,如果获取到了,那么根据offset去kafka集群获取消息然后更新最新的offset到redis中;如果获取不到,则从最新offset处开始获取数据,并更新offset到redis,具体代码如下:
def createCustomDirectKafkaStream(ssc: StreamingContext, kafkaParams: Map[String, String], topics: Set[String], key: String): InputDStream[(String, String)] = { val kafkaStream = readOffsetFromJimDB(dmpRealtimeSequenceUrl, key) match { case None => KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) case Some(offset) => val msgHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message) KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, offset, msgHandler) } kafkaStream.foreachRDD(rdd => saveOffsetToJimDB(rdd, dmpRealtimeSequenceUrl, key)) kafkaStream }
def saveOffsetToJimDB(rdd: RDD[_],key: String): Unit = { var offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges var value = offsets.map(f => s"${f.topic}:${f.partition}:${f.fromOffset}").mkString(",") writeOffset(value, key) }
def readOffsetFromJimDB(dmpRealtimeSequenceUrl: String, key: String): Option[Map[TopicAndPartition, Long]] = { var value = readOffset(dmpRealtimeSequenceUrl, key) value.map { v => v.split(",").map(_.split(":")).map { case Array(topic, partition, offset) => TopicAndPartition(topic, partition.toInt) -> offset.toLong }.toMap } }
def writeOffset(offset: String, key: String): Unit = { // 存储offset信息到db }
def readOffset(key: String): Option[String] = { var offset: String = null // 从db中获取存储的offset信息 Option(offset) }
|
相关阅读:
以上是关于实战Spark streaming与kafka的主要内容,如果未能解决你的问题,请参考以下文章
Spark Streaming实时流处理项目实战Spark Streaming整合Kafka实战一
Spark Streaming实时流处理项目实战Spark Streaming整合Kafka实战一
慕课网实战Spark Streaming实时流处理项目实战笔记十之铭文升级版
Spark Streaming实时流处理项目实战Spark Streaming整合Flume实战二