维护偏移量之HBase保存Kafka偏移量
Posted Mr.zhou_Zxy
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了维护偏移量之HBase保存Kafka偏移量相关的知识,希望对你有一定的参考价值。
使用HBase保存偏移量
- 导入依赖
<!-- HBase -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
- HBaseConnectionPool
package com.qf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.junit.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
public class HBaseConnectionPool {
private static LinkedList<Connection> pool = new LinkedList<Connection>();
static {
try {
Configuration config = HBaseConfiguration.create();
config.addResource(HBaseConfiguration.class.getClassLoader().getResourceAsStream("hbase-site.xml"));
for (int i = 0; i < 5; i++) {
pool.push(ConnectionFactory.createConnection(config));
}
}catch (Exception e) {
e.printStackTrace();
}
}
/**
* 获取的connection实际是从pool中获取
*/
public static Connection getConnection() {
while (pool.isEmpty()) {
try {
System.out.println("pool is empty, please wait for a moment");
Thread.sleep(1000);
}catch (Exception e) {
e.printStackTrace();
}
}
return pool.poll();
}
/**
* 将使用完毕之后的连接对象归还给连接池
*/
public static void realse(Connection connection) {
if (connection != null) pool.push(connection);
}
/**
* 向hbase中保存数据
*/
public static void set(Connection connection, TableName tableName, byte[] rowkey, byte[] columnFamily, byte[] column, byte[] value) {
try {
Table table = connection.getTable(tableName);
Put put = new Put(rowkey);
put.addColumn(columnFamily, column, value);
table.put(put);
table.close();
}catch (Exception e) {
e.printStackTrace();
}
}
/**
* 获取到指定的hbase中的表中的指定行键的列簇的列的值
*/
public static String getColValue(Connection connection, TableName tableName, byte[] rowkey, byte[] columnFamily, byte[] column) {
try {
Table table = connection.getTable(tableName);
Result result = table.get(new Get(rowkey));
return new String(result.getValue(columnFamily, column));
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
/**
* rowkey : topic-group
* return partion-offset
*/
public static Map<Integer, Long> getColValue(Connection connection, TableName tableName, byte[] rowkey) {
Map<Integer, Long> partition2Offset = new HashMap<>();
try {
Table table = connection.getTable(tableName);
Scan scan = new Scan();
RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(rowkey));
scan.setFilter(rowFilter);
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
List<Cell> cells = result.listCells();
for (Cell cell : cells) {
//col : partition
Integer partition = Integer.parseInt(new String(CellUtil.cloneQualifier(cell)));
//value : offset
Long offset = Long.parseLong(new String(CellUtil.cloneValue(cell)));
partition2Offset.put(partition, offset);
}
}
table.close();
} catch (IOException e) {
e.printStackTrace();
}
return partition2Offset;
}
@Test
public void test() {
Connection connection = HBaseConnectionPool.getConnection();
TableName tableName = TableName.valueOf("hbase-spark");
// HBaseConnectionPool.set(connection, tableName, "kafka-zk".getBytes(), "cf".getBytes(), "0".getBytes(), "0".getBytes());
Map<Integer, Long> map = HBaseConnectionPool.getColValue(connection, tableName, "kafka-zk".getBytes());
System.out.println(map.size());
for (Map.Entry<Integer, Long> entry : map.entrySet()) {
System.out.println(entry.getKey() + ":" + entry.getValue());
}
}
}
- hbase-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-->
<configuration>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.rootdir</name>
<value>hdfs://hadoop:9000/hbase</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>hadoop</value>
</property>
</configuration>
- Demo3_Offset_HBase
package com.qf.bigdata.spark.streaming.day2
import com.qf.HBaseConnectionPool
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.Connection
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.zookeeper.CreateMode
import java.{lang, util}
import scala.collection.{JavaConversions, mutable}
object Demo3_Offset_HBase extends LoggerTrait {
def main(args: Array[String]): Unit = {
if (args == null || args.length != 3) {
println(
"""
|usage <brokerList> <groupId> <topicstr>
|""".stripMargin)
System.exit(-1)
}
val Array(brokerList, groupId, topicstr) = args;
val sparkConf = new SparkConf()
.setAppName("Demo3_Offset_HBase")
.setMaster("local[*]")
val duration = Seconds(2)
val ssc = new StreamingContext(sparkConf, duration)
val kafkaParams = Map[String, String](
"bootstrap.servers" -> brokerList,
"group.id" -> groupId,
"auto.offset.reset" -> "smallest"
)
val topics: Set[String] = topicstr.split(",").toSet
val messages: InputDStream[(String, String)] = createMsg(ssc, kafkaParams, topics)
messages.foreachRDD((rdd, time) => {
if (!rdd.isEmpty()) {
println("-" * 30)
println(s"Time : ${time}")
println(s"####### RDD Count : ${rdd.count()}")
// 存储偏移量
saveOffsets(rdd.asInstanceOf[HasOffsetRanges].offsetRanges, kafkaParams("group.id"))
println("-" * 30)
}
})
ssc.start()
ssc.awaitTermination()
}
/**
* 从指定偏移量开始获取kafka的数据
* 1. 从zk中读取到指定主题的偏移量
* 2. 然后kafka从指定的偏移量开始消费
* 3. 如果没有读取到偏移量说明,这个消费者组是第一次消费这个主题,那么我们需要在zk中创建目录,然后从kafka的这个主题的最开始的位置开始消费
*/
def createMsg(ssc:StreamingContext, kafkaParams:Map[String, String], topics: Set[String]): InputDStream[(String, String)] = {
// 1. 从zk中读取到指定主题的偏移量
val fromOffsets: Map[TopicAndPartition, Long] = getFromOffsets(topics, kafkaParams("group.id"))
// 2. 没有读取到,fromOffsets是Map()
var messages:InputDStream[(String, String)] = null
// 3. 根据偏移量的map决定如何读取得到kafka的message
if (fromOffsets.isEmpty) { // 没有读取到偏移量信息, 从头开始读取
messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
}else { // 从偏移量位置开始读取
val messageHandler = (msgHandler:MessageAndMetadata[String, String]) => (msgHandler.key(), msgHandler.message())
messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder,(String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
}
messages
}
/**
* 从zk中读取指定消费者组的指定的主题的偏移量信息
* 官方:/kafka/consumers/${group.id}/offsets/${topic}/${partition} --> data offset
* 存放:/kafka/consumers/offsets/${topic}/${group.id}/${partition} --> data offset
*
* zk:Curator
*/
def getFromOffsets(topics:Set[String], groupId:String):Map[TopicAndPartition, Long] = {
//1. 创建一个可变map用于存放结果
val offsets: mutable.Map[TopicAndPartition, Long] = mutable.Map[TopicAndPartition, Long]()
//2. 遍历我需要的主题
val connection: Connection = HBaseConnectionPool.getConnection
val tableName: TableName = TableName.valueOf("hbase-spark")
val cf: Array[Byte] = Bytes.toBytes("cf")
topics.foreach(topic => {
val rk = Bytes.toBytes(s"${topic}-${groupId}")
val partition2Offsets: util.Map[Integer, lang.Long] = HBaseConnectionPool.getColValue(connection, tableName, rk)
JavaConversions.mapAsScalaMap(partition2Offsets).foreach {
case (partition, offset) => offsets.put(TopicAndPartition(topic, partition), offset)
}
})
HBaseConnectionPool.realse(connection)
//5. 返回存放的结果即可
offsets.toMap
}
/**
* 存储偏移量到zk
*/
def saveOffsets(offsetRanges:Array[OffsetRange], groupId:String) = {
val connection: Connection = HBaseConnectionPool.getConnection
val tableName: TableName = TableName.valueOf("hbase-spark")
val cf: Array[Byte] = Bytes.toBytes("cf")
for(offsetRange <- offsetRanges) {
val topic = offsetRange.topic
val partition = offsetRange.partition
val offset = offsetRange.untilOffset + 1L
val rk = s"${topic}-${partition}".getBytes()
HBaseConnectionPool.set(connection, tableName, rk, cf, partition.toString.getBytes(), offset.toString.getBytes())
println(s"${topic} -> ${partition} -> ${offsetRange.fromOffset} -> ${offset}")
}
HBaseConnectionPool.realse(connection)
}
}
- 测试
hbase(main):002:0> create 'hbase-spark','cf'
0 row(s) in 1.4630 seconds
=> Hbase::Table - hbase-spark
## 开启16000、16020端口
以上是关于维护偏移量之HBase保存Kafka偏移量的主要内容,如果未能解决你的问题,请参考以下文章