维护偏移量之zookeeper保存Kafka偏移量

Posted Mr.zhou_Zxy

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了维护偏移量之zookeeper保存Kafka偏移量相关的知识,希望对你有一定的参考价值。

使用zookeeper保存偏移量

  • 代码
package com.qf.bigdata.spark.streaming.day2

import com.qf.bigdata.spark.streaming.day2.Demo2_Offset_Zookeeper.client
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.zookeeper.CreateMode

import scala.collection.{JavaConversions, mutable}

object Demo2_Offset_Zookeeper extends LoggerTrait {
    // 就是启动之后的zk client
    val client = {
        val client = CuratorFrameworkFactory.builder()
            .connectString("hadoop:2181")
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build()
        client.start()
        client
    }

    val BASEPATH = "/kafka/consumers/offsets"

    def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf()
            .setAppName("Demo2_Offset_Zookeeper")
            .setMaster("local[*]")
        val duration = Seconds(2)
        val ssc = new StreamingContext(sparkConf, duration)
        val kafkaParams = Map[String, String](
            "bootstrap.servers" -> "hadoop:9092",
            "group.id" -> "hzbigdata2103",
            "auto.offset.reset" -> "smallest"
        )
        val topics: Set[String] = "kafka-zk".split(",").toSet
        val messages: InputDStream[(String, String)] = createMsg(ssc, kafkaParams, topics)

        messages.foreachRDD((rdd, time) => {
            if (!rdd.isEmpty()) {
                println("-" * 30)
                println(s"Time : ${time}")
                println(s"####### RDD Count : ${rdd.count()}")
                // 存储偏移量
                saveOffsets(rdd.asInstanceOf[HasOffsetRanges].offsetRanges, kafkaParams("group.id"))
                println("-" * 30)
            }
        })

        ssc.start()
        ssc.awaitTermination()
    }

    /**
     * 从指定偏移量开始获取kafka的数据
     * 1. 从zk中读取到指定主题的偏移量
     * 2. 然后kafka从指定的偏移量开始消费
     * 3. 如果没有读取到偏移量说明,这个消费者组是第一次消费这个主题,那么我们需要在zk中创建目录,然后从kafka的这个主题的最开始的位置开始消费
     */
    def createMsg(ssc:StreamingContext, kafkaParams:Map[String, String], topics: Set[String]): InputDStream[(String, String)] = {
        // 1. 从zk中读取到指定主题的偏移量
        val fromOffsets: Map[TopicAndPartition, Long] = getFromOffsets(topics, kafkaParams("group.id"))
        // 2. 没有读取到,fromOffsets是Map()
        var messages:InputDStream[(String, String)] = null
        // 3. 根据偏移量的map决定如何读取得到kafka的message
        if (fromOffsets.isEmpty) { // 没有读取到偏移量信息, 从头开始读取
            messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
        }else { // 从偏移量位置开始读取
            val messageHandler = (msgHandler:MessageAndMetadata[String, String]) => (msgHandler.key(), msgHandler.message())
            messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder,(String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
        }
        messages
    }

    /**
     * 从zk中读取指定消费者组的指定的主题的偏移量信息
     * 官方:/kafka/consumers/${group.id}/offsets/${topic}/${partition} --> data offset
     * 存放:/kafka/consumers/offsets/${topic}/${group.id}/${partition} --> data offset
     *
     * zk:Curator
     */
    def getFromOffsets(topics:Set[String], groupId:String):Map[TopicAndPartition, Long] = {
        //1. 创建一个可变map用于存放结果
        val offsets: mutable.Map[TopicAndPartition, Long] = mutable.Map[TopicAndPartition, Long]()
        //2. 遍历我需要的主题
        topics.foreach(topic => {
            val path = s"${BASEPATH}/${topic}/${groupId}"
            // 3. 判断此路径是否存在
            checkExists(path)
            // 4. 遍历partition
            JavaConversions.asScalaBuffer(client.getChildren.forPath(path)).foreach(partition => {
                val fullPath = s"${path}/${partition}" // 拼凑完整路径
                val offset = new String(client.getData.forPath(fullPath)).toLong // 获取到偏移量
                offsets.put(TopicAndPartition(topic, partition.toInt), offset)
            })
        })

        //5. 返回存放的结果即可
        offsets.toMap
    }

    /**
     * 校验路径是否存在,如果存在,啥也不做,如果不存在,就创建之
     */
    def checkExists(path: String) = {
        if (client.checkExists().forPath(path) == null) { // 说明不存在
            client.create() // 就创建
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath(path)
        }
    }

    /**
     * 存储偏移量到zk
     */
    def saveOffsets(offsetRanges:Array[OffsetRange], groupId:String) = {
        for(offsetRange <- offsetRanges) {
            val topic = offsetRange.topic
            val partition = offsetRange.partition
            val offset = offsetRange.untilOffset + 1L
            val path = s"${BASEPATH}/${topic}/${groupId}/${partition}"
            checkExists(path)
            client.setData().forPath(path, offset.toString.getBytes)
            println(s"${topic} -> ${partition} ->  ${offsetRange.fromOffset} -> ${offset}")
        }
    }
}
  • 测试
##1. 创建一个新的主题
[root@hadoop bin]# kafka-topics.sh --create --topic kafka-zk --partitions 3 --replication-factor 1 --zookeeper hadoop:2181/kafka  

##2. 开启生产者
[root@hadoop bin]# kafka-console-producer.sh --topic kafka-zk --broker-list hadoop:9092

##3. 执行代码即可

以上是关于维护偏移量之zookeeper保存Kafka偏移量的主要内容,如果未能解决你的问题,请参考以下文章

Kafka消费者偏移量

存储在 Zookeeper 或 Kafka 中的偏移量?

kafka查询和修改topic的offset

spark streaming 和kafka 数据丢失怎么处理

无法在kafka-storm中将偏移数据写入zookeeper

如何重置 Kafka 偏移量以匹配尾部位置?