Kafka问题优化之消费重复问题
Posted 长安不及十里
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka问题优化之消费重复问题相关的知识,希望对你有一定的参考价值。
一 重复消费
1.1 原因
- 强行kill线程,导致消费后的数据,offset没有提交(消费系统宕机、重启等)
- 网络波动,导致offset没提交
- 当消费者消费的速度很慢的时候,可能在一个session周期内还未完成,导致心跳机制检测报告出问题
- 消费后的数据,当offset还没有提交时,partition就断开连接
- 最根本的原因是消费之后的offset未提交
1.2 解决方法
- 第一种思路是提高消费能力,提高单条消息的处理速度,例如对消息处理中比 较耗时的步骤可通过异步的方式进行处理、利用多线程处理等。在缩短单条消息消费时常的同时,根据实际场景可将max.poll.interval.ms值设置大一点,避免不 必要的rebalance,此外可适当减小max.poll.records的值,默认值是500,可根 据实际消息速率适当调小。这种思路可解决因消费时间过长导致的重复消费问题, 对代码改动较小,但无法绝对避免重复消费问题。
- 第二种思路是引入单独去重机制,例如生成消息时,在消息中加入唯一标识符如消息id等。在消费端,我们可以保存最近的1000条消息id到redis或mysql表中,配置max.poll.records的值小于1000。在消费消息时先通过前置表去重后再进行消息的处理。
1.3 重复消费问题
- 模拟网络问题
package com.demo.demo.kafka;
import com.demo.demo.pojo.MsgInfo;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
/**
* @Author shu
* @Date: 2021/10/28/ 19:55
* @Description
**/
@Component
public class KafkaTest {
//topic
private final static String TOPIC_NAME = "my-replicated-topic";
//程序执行的初始时间,只会保留一份
private static final AtomicLong lastRecieveMessage = new AtomicLong(System.currentTimeMillis());
//时间转换
private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//缓存
private final List<ConsumerRecord<String,String>> DataList = new ArrayList<>(500);
//json
private final Gson gson = new GsonBuilder().create();
//kafka
@Autowired
private KafkaTemplate<Object,Object> kafkaTemplate;
/**
* 消息接受者(每隔1分钟执行)
*/
@Scheduled(cron = "0 */1 * * * ?")
public void Consumer() {
long last = lastRecieveMessage.get();
long current = System.currentTimeMillis();
if ((current - last) > (60 * 1000)){
System.out.println(DataList);
for (ConsumerRecord<String, String> consumerRecord : DataList) {
MsgInfo info = gson.fromJson(consumerRecord.value(), MsgInfo.class);
System.out.println("消息:"+info);
}
DataList.clear();
}
}
/**
* 消息发送者(30s执行一次)
*/
@Scheduled(cron = "0/30 * * * * ? ")
public void Provide(){
long last = lastRecieveMessage.get();
long current = System.currentTimeMillis();
if ((current - last) > (30 * 1000) ){
MsgInfo msgInfo=new MsgInfo(current-last,"消息服务",last,new Date());
kafkaTemplate.send(TOPIC_NAME,"power",gson.toJson(msgInfo));
}
}
/**
* 单条消费
* @param record
* @param ack
*/
@KafkaListener(topics = TOPIC_NAME,groupId = "MyGroup1")
public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
DataList.add(record);
//⼿动提交offset
//ack.acknowledge();
}
}
- 观察结果
- 恢复网络,正常通信
/**
* 单条消费
* @param record
* @param ack
*/
@KafkaListener(topics = TOPIC_NAME,groupId = "MyGroup1")
public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
DataList.add(record);
//⼿动提交offset
ack.acknowledge();
}
- 观察结果
- 对比
结论:造成了重复消费
1.4 解决
package com.demo.demo.config;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author shu
* @Date: 2021/10/29/ 19:34
* @Description redisson配置
**/
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://ip:6379")
.setPassword("admin123");
return Redisson.create(config);
}
}
package com.demo.demo.kafka;
import com.demo.demo.pojo.MsgInfo;
import com.demo.demo.utils.RedisUtil;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
/**
* @Author shu
* @Date: 2021/10/29/ 19:17
* @Description 分布式锁解决kafka消费重复的问题
**/
@Component
public class RedissonKafka {
private final static String TOPIC_NAME = "my-replicated-topic";
//程序执行的初始时间,只会保留一份
private static final AtomicLong lastRecieveMessage = new AtomicLong(System.currentTimeMillis());
//时间转换
private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//前缀
private static final String KEY_PREFIX = "key";
//缓存
private final List<ConsumerRecord<String,String>> DataList = new ArrayList<>(500);
//json
private final Gson gson = new GsonBuilder().create();
//kafka
@Resource
private KafkaTemplate<Object,Object> kafkaTemplate;
//redisson
@Resource
private RedissonClient redissonClient;
/**
* 消息接受者(每隔30分钟执行)
*/
@Scheduled(cron = "0/35 * * * * ?")
public void Consumer() {
long last = lastRecieveMessage.get();
long current = System.currentTimeMillis();
if ((current - last) > (60 * 1000)){
//初始化
redissonClient.getBucket("key").set(1);
for (ConsumerRecord<String, String> consumerRecord : DataList) {
//获取锁
RLock lock = redissonClient.getLock(redissonClient.getBucket(consumerRecord.key()).get().toString());
//上锁
lock.lock();
MsgInfo info = gson.fromJson(consumerRecord.value(), MsgInfo.class);
System.out.println("消息:"+info);
redissonClient.getBucket(consumerRecord.key()).set(consumerRecord.offset()+1);
//解锁
lock.unlock();
}
DataList.clear();
}
}
/**
* 消息发送者(30s执行一次)
*/
@Scheduled(cron = "0/30 * * * * ? ")
public void Provide(){
long last = lastRecieveMessage.get();
long current = System.currentTimeMillis();
if ((current - last) > (30 * 1000) ){
MsgInfo msgInfo=new MsgInfo(current-last,"消息服务",last,new Date());
kafkaTemplate.send(TOPIC_NAME,"key",gson.toJson(msgInfo));
}
}
/**
* 单条消费
* @param record
* @param ack
*/
@KafkaListener(topics = TOPIC_NAME,groupId = "MyGroup1")
public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
DataList.add(record);
ack.acknowledge();
}
}
}
以上是关于Kafka问题优化之消费重复问题的主要内容,如果未能解决你的问题,请参考以下文章
kafka线上问题优化:消息丢失重复消费消息积压延时队列顺序消费