SpringBoot 重置 kafka 偏移量offset(kafka-0.10.1.0)
Posted 张志翔ۤ
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot 重置 kafka 偏移量offset(kafka-0.10.1.0)相关的知识,希望对你有一定的参考价值。
如果使用的自动提交偏移量的模式,偏移量会给到kafka或者zk进行管理,其中kafka的偏移量重置给了重新消费kafka内未过期的数据提供了机会,当消费者出错,比如消费了数据,但是中途处理失败,导致数据丢失,这时候重置偏移量就是一剂后悔药,让消费者能够重新来过,当然后悔药也是有保质期的,还得取决于数据的保留策略。
这里讨论一下kafka_2.11.0.10.1.0版本重置偏移量的方案
该版本kafka不像其他版本一样,通过执行一句方便的命令就可以重置到指定的偏移量,本文给出了一种通过Java代码来重置偏移量的方式,若有不足,还望指教。
先来看一下如下代码
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.HashMap;
import java.util.Properties;
public class ResetKafkaOffset {
public ResetKafkaOffset() {
}
public static void main(String[] args) {
try {
HashMap<String, String> params = new HashMap();
// 解析参数
for (String param : args) {
String[] split = param.split("=");
if (split.length == 2) {
System.out.println(split[0] + "=" + split[1]);
params.put(split[0], split[1]);
}
}
String topic = params.remove("topic");
//参数校验 topic、消费组、分区、sever地址没有设置则提示并退出
if (null == topic) {
System.out.println("please set topic==XXX");
System.exit(-1);
}
String groupId = params.remove("group.id");
if (null == groupId) {
System.out.println("please set group.id==XXX");
System.exit(-1);
}
String server = params.remove("server");
if (null == server) {
System.out.println("please set server==XXX");
System.exit(-1);
}
if (params.size() == 0) {
System.out.println("please set at lease one partition_x==XXX");
System.exit(-1);
}
// kafka设置
Properties properties = new Properties();
properties.put("bootstrap.servers", server);
properties.put("group.id", groupId);
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "6000");
properties.put("session.timeout.ms", "10000");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("auto.offset.reset", "earliest");
KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
HashMap offsets = new HashMap();
// 设置每个分区的偏移量
for (String key : params.keySet()) {
String[] partitions = key.split("_");
TopicPartition topicPartition = new TopicPartition(topic, Integer.parseInt(partitions[1]));
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(Long.parseLong(params.get(key)), "manual");
offsets.put(topicPartition, offsetAndMetadata);
}
// 提交
kafkaConsumer.commitSync(offsets);
System.out.println("ok");
} catch (Exception e) {
e.printStackTrace();
}
}
}
我们把kafka的连接以及我们需要重置的偏移量参数以等号“=”分割的方式传入程序中,然后解析,校验,kafka连接设置,分区偏移量设置,最后提交达到偏移量重置的目的。
参数设置如:
server=127.0.0.1:9092 group.id=console-consumer-48585 topic=testTopic partition_0=1 partition_1=1
server:kafka连接地址
group.id:消费组
topic:kafka的topic
partition_a = b:a表示几号分区数,b表示要重置到指定的偏移量,既要把分区a重置到b位置
现在console-consumer-48585消费组信息如下,两个分区的数据都已经完全消费了
现在停掉console-consumer-48585的消费者进程,把分区0的偏移量指到59,分区2的偏移量指到58,并运行程序
参数
server=127.0.0.1:9092 group.id=console-consumer-48585 topic=testTopic partition_0=59 partition_1=58
程序正常运行
这时候启动我们的消费者进程,会看到console-consumer-48585组下的消费者会重新消费分区1的后一个数和分区2的后一个数,表示重置成功。
sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testTopic --consumer.config ../config/consumer.properties
当然生产、灰度环境等遇到需要像这样来重置偏移量的时候,在本地肯定是连不上生产、灰度的地址,所以需要把改代码编译成class文件,然后把该class文件放到kafka的bin目录下(如果是集群放到任意一个下即可),停掉消费应用,然后执行如下的脚本命令(按需要替换掉分区、偏移量、地址、分组)
dir=../libs
for file in "$dir"/*.jar
do
path="$path":"$file"
done
java -cp $path ResetKafkaOffset server=127.0.0.1:9092 group.id=console-consumer-48585 topic=testTopic partition_0=59 partition_1=58
到此 SpringBoot 重置 kafka 偏移量offset(kafka-0.10.1.0)介绍完成。
以上是关于SpringBoot 重置 kafka 偏移量offset(kafka-0.10.1.0)的主要内容,如果未能解决你的问题,请参考以下文章
Kafka之Fetch offset xxx is out of range for partition xxx,resetting offset情况总结