RocketMQ解决历史消息(过期消息)消费问题

Posted keep-go-on

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ解决历史消息(过期消息)消费问题相关的知识,希望对你有一定的参考价值。

解决方案

比较当前时间和消息产生时间。

代码如下 :

if (System.currentTimeMillis() - messageExt.getBornTimestamp() < 1000 * 60)
                        

使用原生客户端

  @PostConstruct
    private void listenCustom() 
        try 
            log.info("【 构建RocketMq消费者监听 】消费组 [    ]", contractConfig.getRocket().getCustomGroup());
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(contractConfig.getRocket().getCustomGroup());
            log.info("【 构建RocketMq消费者监听 】名称服务器 [    ]", contractConfig.getRocket().getNamesrvAddr());
            consumer.setNamesrvAddr(contractConfig.getRocket().getNamesrvAddr());
            consumer.setUnitName(IdUtil.objectId());
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
            String topicStr = contractConfig.getRocket().getTopic();
            String[] topicArray = topicStr.split(",");
            for (String topic : topicArray) 
                log.info("【 构建RocketMq消费者监听 】消费主题 [    ]", topic);
                if (Topics.PDF_SIGN_RESULT.value.equals(topic)) 
                    continue;
                
                consumer.subscribe(topic, "*");
            
            
            consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> 
                msgs.forEach(messageExt -> 
                    if (System.currentTimeMillis() - messageExt.getBornTimestamp() < 1000 * 60)
                        log.warn("【消息已过期】");
                    else 
                        
                    
                );
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            );
            consumer.start();
            log.info("【 构建RocketMq消费者监听 】消费监听器启动完成");
         catch (MQClientException e) 
            e.printStackTrace();
        
    

使用Spring-RocketMq封装

    @Service
    @RocketMQMessageListener(topic = "linkman-friend-request", consumerGroup = "im-linkman-consumer-group")
    public class AddFriendConsumer implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener 
    private final static Integer MESSAGE_OVERDUE = 1000 * 60 *60;
        @Override
        public void onMessage(MessageExt sysUserLinkmanMessage) 
            if (System.currentTimeMillis()- sysUserLinkmanMessage.getBornTimestamp() > MESSAGE_OVERDUE)
                log.warn("【添加好友MQ】消息[  ]已过期!",sysUserLinkmanMessage.getMsgId());
                return;
            
       
             catch (Exception e) 
                log.error("【添加好友】异常追踪码[  ],error:  ", TraceId.logTraceID.get(), e.getMessage());
            

        

以上是关于RocketMQ解决历史消息(过期消息)消费问题的主要内容,如果未能解决你的问题,请参考以下文章

程序重启RocketMQ消息重复消费

源码分析RocketMQ消息消费机制----消费者拉取消息机制

分布式事务解决方案 | Seata | 本地消息表 | 事务消息 | 最大努力通知 | 消息丢失重复消费堆积有序

分布式事务解决方案 | Seata | 本地消息表 | 事务消息 | 最大努力通知 | 消息丢失重复消费堆积 有序| 缓存数据库一致性

消息中间件需要解决的问题及RocketMQ发展历程

浅谈RocketMQ如何保证消息不丢失