kafka重复消费的原因

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka重复消费的原因相关的知识,希望对你有一定的参考价值。

参考技术A 如果自动提交的偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理,

假设我们采用了自动提交,且提交时间间隔为5s,在最近一次提交之后的3s发生了再均衡,再均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后了3s,所以在这3s内到达的消息会被重复处理。可以通过修改提交时间间隔来更频繁地提交偏移量,减小可能出现重复悄息的时间窗,不过这种情况是无也完全避免的。

Kafka问题优化之消费重复问题

一 重复消费

1.1 原因

  • 强行kill线程,导致消费后的数据,offset没有提交(消费系统宕机、重启等)
  • 网络波动,导致offset没提交
  • 当消费者消费的速度很慢的时候,可能在一个session周期内还未完成,导致心跳机制检测报告出问题
  • 消费后的数据,当offset还没有提交时,partition就断开连接
  • 最根本的原因是消费之后的offset未提交

1.2 解决方法

  • 第一种思路是提高消费能力,提高单条消息的处理速度,例如对消息处理中比 较耗时的步骤可通过异步的方式进行处理、利用多线程处理等。在缩短单条消息消费时常的同时,根据实际场景可将max.poll.interval.ms值设置大一点,避免不 必要的rebalance,此外可适当减小max.poll.records的值,默认值是500,可根 据实际消息速率适当调小。这种思路可解决因消费时间过长导致的重复消费问题, 对代码改动较小,但无法绝对避免重复消费问题。
  • 第二种思路是引入单独去重机制,例如生成消息时,在消息中加入唯一标识符如消息id等。在消费端,我们可以保存最近的1000条消息id到redis或mysql表中,配置max.poll.records的值小于1000。在消费消息时先通过前置表去重后再进行消息的处理。

1.3 重复消费问题

  • 模拟网络问题
package com.demo.demo.kafka;

import com.demo.demo.pojo.MsgInfo;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @Author shu
 * @Date: 2021/10/28/ 19:55
 * @Description
 **/
@Component
public class KafkaTest {
    //topic
    private final static String TOPIC_NAME = "my-replicated-topic";
    //程序执行的初始时间,只会保留一份
    private static final AtomicLong lastRecieveMessage = new AtomicLong(System.currentTimeMillis());
    //时间转换
    private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    //缓存
    private final List<ConsumerRecord<String,String>> DataList = new ArrayList<>(500);
    //json
    private final Gson gson = new GsonBuilder().create();
    //kafka
    @Autowired
    private KafkaTemplate<Object,Object> kafkaTemplate;

    /**
     * 消息接受者(每隔1分钟执行)
     */
    @Scheduled(cron = "0 */1 * * * ?")
    public void Consumer() {
        long last = lastRecieveMessage.get();
        long current = System.currentTimeMillis();
        if ((current - last) > (60 * 1000)){
            System.out.println(DataList);
            for (ConsumerRecord<String, String> consumerRecord : DataList) {
                MsgInfo info = gson.fromJson(consumerRecord.value(), MsgInfo.class);
                System.out.println("消息:"+info);
            }
            DataList.clear();
        }
    }



    /**
     * 消息发送者(30s执行一次)
     */
    @Scheduled(cron = "0/30 * * * * ? ")
    public void Provide(){
        long last = lastRecieveMessage.get();
        long current = System.currentTimeMillis();
        if ((current - last) > (30 * 1000) ){
            MsgInfo msgInfo=new MsgInfo(current-last,"消息服务",last,new Date());
            kafkaTemplate.send(TOPIC_NAME,"power",gson.toJson(msgInfo));
        }

    }


    /**
     * 单条消费
     * @param record
     * @param ack
     */
    @KafkaListener(topics = TOPIC_NAME,groupId = "MyGroup1")
    public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
        DataList.add(record);
        //⼿动提交offset
        //ack.acknowledge();
    }
}
  • 观察结果

  • 恢复网络,正常通信
    /**
     * 单条消费
     * @param record
     * @param ack
     */
    @KafkaListener(topics = TOPIC_NAME,groupId = "MyGroup1")
    public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
        DataList.add(record);
        //⼿动提交offset
       ack.acknowledge();
    }
  • 观察结果

  • 对比

结论:造成了重复消费

1.4 解决

package com.demo.demo.config;

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author shu
 * @Date: 2021/10/29/ 19:34
 * @Description redisson配置
 **/
@Configuration
public class RedissonConfig {
    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        config.useSingleServer()
                .setAddress("redis://ip:6379")
                .setPassword("admin123");
        return Redisson.create(config);
    }
}

package com.demo.demo.kafka;

import com.demo.demo.pojo.MsgInfo;
import com.demo.demo.utils.RedisUtil;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @Author shu
 * @Date: 2021/10/29/ 19:17
 * @Description 分布式锁解决kafka消费重复的问题
 **/
@Component
public class RedissonKafka {

    private final static String TOPIC_NAME = "my-replicated-topic";
    //程序执行的初始时间,只会保留一份
    private static final AtomicLong lastRecieveMessage = new AtomicLong(System.currentTimeMillis());
    //时间转换
    private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    //前缀
    private static final String KEY_PREFIX = "key";
    //缓存
    private final List<ConsumerRecord<String,String>> DataList = new ArrayList<>(500);
    //json
    private final Gson gson = new GsonBuilder().create();
    //kafka
    @Resource
    private KafkaTemplate<Object,Object> kafkaTemplate;
    //redisson
    @Resource
    private RedissonClient redissonClient;



    /**
     * 消息接受者(每隔30分钟执行)
     */
    @Scheduled(cron = "0/35 * * * * ?")
    public void Consumer() {
        long last = lastRecieveMessage.get();
        long current = System.currentTimeMillis();
        if ((current - last) > (60 * 1000)){
            //初始化
            redissonClient.getBucket("key").set(1);
            for (ConsumerRecord<String, String> consumerRecord : DataList) {
                //获取锁
                RLock lock = redissonClient.getLock(redissonClient.getBucket(consumerRecord.key()).get().toString());
                //上锁
                lock.lock();
                MsgInfo info = gson.fromJson(consumerRecord.value(), MsgInfo.class);
                System.out.println("消息:"+info);
                redissonClient.getBucket(consumerRecord.key()).set(consumerRecord.offset()+1);
                //解锁
                lock.unlock();
            }
            DataList.clear();
        }
    }



    /**
     * 消息发送者(30s执行一次)
     */
    @Scheduled(cron = "0/30 * * * * ? ")
    public void Provide(){
        long last = lastRecieveMessage.get();
        long current = System.currentTimeMillis();
        if ((current - last) > (30 * 1000) ){
            MsgInfo msgInfo=new MsgInfo(current-last,"消息服务",last,new Date());
            kafkaTemplate.send(TOPIC_NAME,"key",gson.toJson(msgInfo));
        }

    }


    /**
     * 单条消费
     * @param record
     * @param ack
     */
    @KafkaListener(topics = TOPIC_NAME,groupId = "MyGroup1")
    public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
        DataList.add(record);
        ack.acknowledge();
    }

}

    }

以上是关于kafka重复消费的原因的主要内容,如果未能解决你的问题,请参考以下文章

kafka重复消费的问题

kafka防止消息重复消费

Kafka问题优化之消费重复问题

Springboot中kafka解决数据重复消费问题

消息队列怎么避免重复消费

Kafka重复消费数据