维护偏移量之zookeeper保存Kafka偏移量
Posted Mr.zhou_Zxy
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了维护偏移量之zookeeper保存Kafka偏移量相关的知识,希望对你有一定的参考价值。
使用zookeeper保存偏移量
- 代码
package com.qf.bigdata.spark.streaming.day2
import com.qf.bigdata.spark.streaming.day2.Demo2_Offset_Zookeeper.client
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.zookeeper.CreateMode
import scala.collection.{JavaConversions, mutable}
object Demo2_Offset_Zookeeper extends LoggerTrait {
// 就是启动之后的zk client
val client = {
val client = CuratorFrameworkFactory.builder()
.connectString("hadoop:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build()
client.start()
client
}
val BASEPATH = "/kafka/consumers/offsets"
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
.setAppName("Demo2_Offset_Zookeeper")
.setMaster("local[*]")
val duration = Seconds(2)
val ssc = new StreamingContext(sparkConf, duration)
val kafkaParams = Map[String, String](
"bootstrap.servers" -> "hadoop:9092",
"group.id" -> "hzbigdata2103",
"auto.offset.reset" -> "smallest"
)
val topics: Set[String] = "kafka-zk".split(",").toSet
val messages: InputDStream[(String, String)] = createMsg(ssc, kafkaParams, topics)
messages.foreachRDD((rdd, time) => {
if (!rdd.isEmpty()) {
println("-" * 30)
println(s"Time : ${time}")
println(s"####### RDD Count : ${rdd.count()}")
// 存储偏移量
saveOffsets(rdd.asInstanceOf[HasOffsetRanges].offsetRanges, kafkaParams("group.id"))
println("-" * 30)
}
})
ssc.start()
ssc.awaitTermination()
}
/**
* 从指定偏移量开始获取kafka的数据
* 1. 从zk中读取到指定主题的偏移量
* 2. 然后kafka从指定的偏移量开始消费
* 3. 如果没有读取到偏移量说明,这个消费者组是第一次消费这个主题,那么我们需要在zk中创建目录,然后从kafka的这个主题的最开始的位置开始消费
*/
def createMsg(ssc:StreamingContext, kafkaParams:Map[String, String], topics: Set[String]): InputDStream[(String, String)] = {
// 1. 从zk中读取到指定主题的偏移量
val fromOffsets: Map[TopicAndPartition, Long] = getFromOffsets(topics, kafkaParams("group.id"))
// 2. 没有读取到,fromOffsets是Map()
var messages:InputDStream[(String, String)] = null
// 3. 根据偏移量的map决定如何读取得到kafka的message
if (fromOffsets.isEmpty) { // 没有读取到偏移量信息, 从头开始读取
messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
}else { // 从偏移量位置开始读取
val messageHandler = (msgHandler:MessageAndMetadata[String, String]) => (msgHandler.key(), msgHandler.message())
messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder,(String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
}
messages
}
/**
* 从zk中读取指定消费者组的指定的主题的偏移量信息
* 官方:/kafka/consumers/${group.id}/offsets/${topic}/${partition} --> data offset
* 存放:/kafka/consumers/offsets/${topic}/${group.id}/${partition} --> data offset
*
* zk:Curator
*/
def getFromOffsets(topics:Set[String], groupId:String):Map[TopicAndPartition, Long] = {
//1. 创建一个可变map用于存放结果
val offsets: mutable.Map[TopicAndPartition, Long] = mutable.Map[TopicAndPartition, Long]()
//2. 遍历我需要的主题
topics.foreach(topic => {
val path = s"${BASEPATH}/${topic}/${groupId}"
// 3. 判断此路径是否存在
checkExists(path)
// 4. 遍历partition
JavaConversions.asScalaBuffer(client.getChildren.forPath(path)).foreach(partition => {
val fullPath = s"${path}/${partition}" // 拼凑完整路径
val offset = new String(client.getData.forPath(fullPath)).toLong // 获取到偏移量
offsets.put(TopicAndPartition(topic, partition.toInt), offset)
})
})
//5. 返回存放的结果即可
offsets.toMap
}
/**
* 校验路径是否存在,如果存在,啥也不做,如果不存在,就创建之
*/
def checkExists(path: String) = {
if (client.checkExists().forPath(path) == null) { // 说明不存在
client.create() // 就创建
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(path)
}
}
/**
* 存储偏移量到zk
*/
def saveOffsets(offsetRanges:Array[OffsetRange], groupId:String) = {
for(offsetRange <- offsetRanges) {
val topic = offsetRange.topic
val partition = offsetRange.partition
val offset = offsetRange.untilOffset + 1L
val path = s"${BASEPATH}/${topic}/${groupId}/${partition}"
checkExists(path)
client.setData().forPath(path, offset.toString.getBytes)
println(s"${topic} -> ${partition} -> ${offsetRange.fromOffset} -> ${offset}")
}
}
}
- 测试
##1. 创建一个新的主题
[root@hadoop bin]# kafka-topics.sh --create --topic kafka-zk --partitions 3 --replication-factor 1 --zookeeper hadoop:2181/kafka
##2. 开启生产者
[root@hadoop bin]# kafka-console-producer.sh --topic kafka-zk --broker-list hadoop:9092
##3. 执行代码即可
以上是关于维护偏移量之zookeeper保存Kafka偏移量的主要内容,如果未能解决你的问题,请参考以下文章
spark streaming 和kafka 数据丢失怎么处理