维护偏移量之HBase保存Kafka偏移量

Posted Mr.zhou_Zxy

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了维护偏移量之HBase保存Kafka偏移量相关的知识,希望对你有一定的参考价值。

使用HBase保存偏移量

  • 导入依赖
<!-- HBase -->
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-server</artifactId>
    <version>${hbase.version}</version>
</dependency>
  • HBaseConnectionPool
package com.qf;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.junit.Test;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

public class HBaseConnectionPool {
    private static LinkedList<Connection> pool  = new LinkedList<Connection>();
    static {
        try {
            Configuration config = HBaseConfiguration.create();
            config.addResource(HBaseConfiguration.class.getClassLoader().getResourceAsStream("hbase-site.xml"));
            for (int i = 0; i < 5; i++) {
                pool.push(ConnectionFactory.createConnection(config));
            }
        }catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 获取的connection实际是从pool中获取
     */
    public static Connection getConnection() {
        while (pool.isEmpty()) {
            try {
                System.out.println("pool is empty, please wait for a moment");
                Thread.sleep(1000);
            }catch (Exception e) {
                    e.printStackTrace();
            }
        }
        return pool.poll();
    }

    /**
     * 将使用完毕之后的连接对象归还给连接池
     */
    public static void realse(Connection connection) {
        if (connection != null) pool.push(connection);
    }

    /**
     * 向hbase中保存数据
     */
    public static void set(Connection connection, TableName tableName, byte[] rowkey, byte[] columnFamily, byte[] column, byte[] value) {
        try {
            Table table = connection.getTable(tableName);
            Put put = new Put(rowkey);
            put.addColumn(columnFamily, column, value);
            table.put(put);
            table.close();
        }catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 获取到指定的hbase中的表中的指定行键的列簇的列的值
     */
     public static String getColValue(Connection connection, TableName tableName, byte[] rowkey, byte[] columnFamily, byte[] column) {
         try {
             Table table = connection.getTable(tableName);
             Result result = table.get(new Get(rowkey));
             return new String(result.getValue(columnFamily, column));
         } catch (IOException e) {
             e.printStackTrace();
         }
         return null;
     }

    /**
     * rowkey : topic-group
     * return partion-offset
     */
    public static Map<Integer, Long> getColValue(Connection connection, TableName tableName, byte[] rowkey) {
        Map<Integer, Long> partition2Offset = new HashMap<>();
        try {
            Table table = connection.getTable(tableName);
            Scan scan = new Scan();
            RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(rowkey));
            scan.setFilter(rowFilter);
            ResultScanner scanner = table.getScanner(scan);
            for (Result result : scanner) {
                List<Cell> cells = result.listCells();
                for (Cell cell : cells) {
                    //col : partition
                    Integer partition = Integer.parseInt(new String(CellUtil.cloneQualifier(cell)));
                    //value : offset
                    Long offset = Long.parseLong(new String(CellUtil.cloneValue(cell)));
                    partition2Offset.put(partition, offset);
                }
            }
            table.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return partition2Offset;
    }

    @Test
    public void test() {
        Connection connection = HBaseConnectionPool.getConnection();
        TableName tableName = TableName.valueOf("hbase-spark");
//        HBaseConnectionPool.set(connection, tableName, "kafka-zk".getBytes(), "cf".getBytes(), "0".getBytes(), "0".getBytes());
        Map<Integer, Long> map = HBaseConnectionPool.getColValue(connection, tableName, "kafka-zk".getBytes());
        System.out.println(map.size());
        for (Map.Entry<Integer, Long> entry : map.entrySet()) {
            System.out.println(entry.getKey() + ":" + entry.getValue());
        }
    }
}

  • hbase-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
/**
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
-->
<configuration>

<property>
  <name>hbase.cluster.distributed</name>
  <value>true</value>
</property>

<property>
  <name>hbase.rootdir</name>
  <value>hdfs://hadoop:9000/hbase</value>
</property>

<property>
  <name>hbase.zookeeper.quorum</name>
  <value>hadoop</value>
</property>


</configuration>

  • Demo3_Offset_HBase
package com.qf.bigdata.spark.streaming.day2

import com.qf.HBaseConnectionPool
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.Connection
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.zookeeper.CreateMode

import java.{lang, util}
import scala.collection.{JavaConversions, mutable}

object Demo3_Offset_HBase extends LoggerTrait {

    def main(args: Array[String]): Unit = {

        if (args == null || args.length != 3) {
            println(
                """
                  |usage <brokerList> <groupId> <topicstr>
                  |""".stripMargin)
            System.exit(-1)
        }

        val Array(brokerList, groupId, topicstr) = args;

        val sparkConf = new SparkConf()
            .setAppName("Demo3_Offset_HBase")
            .setMaster("local[*]")
        val duration = Seconds(2)
        val ssc = new StreamingContext(sparkConf, duration)
        val kafkaParams = Map[String, String](
            "bootstrap.servers" -> brokerList,
            "group.id" -> groupId,
            "auto.offset.reset" -> "smallest"
        )
        val topics: Set[String] = topicstr.split(",").toSet
        val messages: InputDStream[(String, String)] = createMsg(ssc, kafkaParams, topics)

        messages.foreachRDD((rdd, time) => {
            if (!rdd.isEmpty()) {
                println("-" * 30)
                println(s"Time : ${time}")
                println(s"####### RDD Count : ${rdd.count()}")
                // 存储偏移量
                saveOffsets(rdd.asInstanceOf[HasOffsetRanges].offsetRanges, kafkaParams("group.id"))
                println("-" * 30)
            }
        })

        ssc.start()
        ssc.awaitTermination()
    }

    /**
     * 从指定偏移量开始获取kafka的数据
     * 1. 从zk中读取到指定主题的偏移量
     * 2. 然后kafka从指定的偏移量开始消费
     * 3. 如果没有读取到偏移量说明,这个消费者组是第一次消费这个主题,那么我们需要在zk中创建目录,然后从kafka的这个主题的最开始的位置开始消费
     */
    def createMsg(ssc:StreamingContext, kafkaParams:Map[String, String], topics: Set[String]): InputDStream[(String, String)] = {
        // 1. 从zk中读取到指定主题的偏移量
        val fromOffsets: Map[TopicAndPartition, Long] = getFromOffsets(topics, kafkaParams("group.id"))
        // 2. 没有读取到,fromOffsets是Map()
        var messages:InputDStream[(String, String)] = null
        // 3. 根据偏移量的map决定如何读取得到kafka的message
        if (fromOffsets.isEmpty) { // 没有读取到偏移量信息, 从头开始读取
            messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
        }else { // 从偏移量位置开始读取
            val messageHandler = (msgHandler:MessageAndMetadata[String, String]) => (msgHandler.key(), msgHandler.message())
            messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder,(String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
        }
        messages
    }

    /**
     * 从zk中读取指定消费者组的指定的主题的偏移量信息
     * 官方:/kafka/consumers/${group.id}/offsets/${topic}/${partition} --> data offset
     * 存放:/kafka/consumers/offsets/${topic}/${group.id}/${partition} --> data offset
     *
     * zk:Curator
     */
    def getFromOffsets(topics:Set[String], groupId:String):Map[TopicAndPartition, Long] = {
        //1. 创建一个可变map用于存放结果
        val offsets: mutable.Map[TopicAndPartition, Long] = mutable.Map[TopicAndPartition, Long]()
        //2. 遍历我需要的主题
        val connection: Connection = HBaseConnectionPool.getConnection
        val tableName: TableName = TableName.valueOf("hbase-spark")
        val cf: Array[Byte] = Bytes.toBytes("cf")

        topics.foreach(topic => {
            val rk = Bytes.toBytes(s"${topic}-${groupId}")
            val partition2Offsets: util.Map[Integer, lang.Long] = HBaseConnectionPool.getColValue(connection, tableName, rk)
            JavaConversions.mapAsScalaMap(partition2Offsets).foreach {
                case (partition, offset) => offsets.put(TopicAndPartition(topic, partition), offset)
            }
        })
        HBaseConnectionPool.realse(connection)
        //5. 返回存放的结果即可
        offsets.toMap
    }

    /**
     * 存储偏移量到zk
     */
    def saveOffsets(offsetRanges:Array[OffsetRange], groupId:String) = {
        val connection: Connection = HBaseConnectionPool.getConnection
        val tableName: TableName = TableName.valueOf("hbase-spark")
        val cf: Array[Byte] = Bytes.toBytes("cf")

        for(offsetRange <- offsetRanges) {
            val topic = offsetRange.topic
            val partition = offsetRange.partition
            val offset = offsetRange.untilOffset + 1L
            val rk = s"${topic}-${partition}".getBytes()

            HBaseConnectionPool.set(connection, tableName, rk, cf, partition.toString.getBytes(), offset.toString.getBytes())
            println(s"${topic} -> ${partition} ->  ${offsetRange.fromOffset} -> ${offset}")
        }
        HBaseConnectionPool.realse(connection)
    }
}
  • 测试
hbase(main):002:0> create 'hbase-spark','cf'
0 row(s) in 1.4630 seconds

=> Hbase::Table - hbase-spark

## 开启16000、16020端口

以上是关于维护偏移量之HBase保存Kafka偏移量的主要内容,如果未能解决你的问题,请参考以下文章

Kafka消费者偏移量

spark维护偏移量二

如何将 Spark 消耗的最新偏移量保存到 ZK 或 Kafka 并在重启后可以读取

重置为 Kafka 分区中的自定义偏移量

Kafka消息的偏移量和顺序消费原理

kafka查询和修改topic的offset