Springboot 整合Kafka 实现手动提交 offset
Posted 张志翔ۤ
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Springboot 整合Kafka 实现手动提交 offset相关的知识,希望对你有一定的参考价值。
前言
大名鼎鼎的消息中间件Kafka大家一定不陌生吧,使用消息中间件的时候最怕的就是消息丢失了,如何解决这个问题呢?或许大家都知道,消费者端手动提交offset嘛。那么具体代码该怎么写呢?本文就基于springboot来进行消费者手动提交offset的试验。
配置
application.yml
spring:
kafka:
# 指定 kafka 地址可以多个
bootstrap-servers:
- 192.168.130.128:9092
- 192.168.130.128:9093
- 192.168.130.128:9094
# 指定listener 容器中的线程数,用于提高并发量
listener:
concurrency: 3
ack-mode: manual
# 消费者的配置
consumer:
# 指定默认消费者group id
group-id: test-group
#earliest
#当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
#latest
#当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
#none
#topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
auto-offset-reset: earliest
# 是否开启自动提交
enable-auto-commit: false
# key,value的解码方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
Consumer
package com.study.springboot.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Component
public class MyConsumer {
@KafkaListener(topics = {"offsettest"})
public void processMessage(ConsumerRecord<?, ?> record, Acknowledgment ack) {
try {
System.out.printf("topic is %s, offset is %d,partition is %s, value is %s \\n", record.topic(), record.offset(),record.partition(), record.value());
// 手动提交offset
ack.acknowledge();
}catch (Exception e){
e.printStackTrace();
}
}
}
重点是ack.acknowledge();这一行,手动提交offset,我们先注释掉该行,启动springboot项目,然后发送消息hello world
可以看到消费者消费到了该条消息,但是由于我们没有提交offset,此时重启springboot项目。
发现该条消息仍然被消费到了。
然后我们取消注释,再重启,此时仍然可以消费到该条消息,但是与之前不同的是,此时我们提交了offset,所以再重启的时候就不会消费到该条消息了。
到此 Springboot 整合Kafka 实现手动提交 offset 介绍完成。
以上是关于Springboot 整合Kafka 实现手动提交 offset的主要内容,如果未能解决你的问题,请参考以下文章