RocketMQ解决历史消息(过期消息)消费问题
Posted keep-go-on
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ解决历史消息(过期消息)消费问题相关的知识,希望对你有一定的参考价值。
解决方案
比较当前时间和消息产生时间。
代码如下 :
if (System.currentTimeMillis() - messageExt.getBornTimestamp() < 1000 * 60)
使用原生客户端
@PostConstruct
private void listenCustom()
try
log.info("【 构建RocketMq消费者监听 】消费组 [ ]", contractConfig.getRocket().getCustomGroup());
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(contractConfig.getRocket().getCustomGroup());
log.info("【 构建RocketMq消费者监听 】名称服务器 [ ]", contractConfig.getRocket().getNamesrvAddr());
consumer.setNamesrvAddr(contractConfig.getRocket().getNamesrvAddr());
consumer.setUnitName(IdUtil.objectId());
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
String topicStr = contractConfig.getRocket().getTopic();
String[] topicArray = topicStr.split(",");
for (String topic : topicArray)
log.info("【 构建RocketMq消费者监听 】消费主题 [ ]", topic);
if (Topics.PDF_SIGN_RESULT.value.equals(topic))
continue;
consumer.subscribe(topic, "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) ->
msgs.forEach(messageExt ->
if (System.currentTimeMillis() - messageExt.getBornTimestamp() < 1000 * 60)
log.warn("【消息已过期】");
else
);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
);
consumer.start();
log.info("【 构建RocketMq消费者监听 】消费监听器启动完成");
catch (MQClientException e)
e.printStackTrace();
使用Spring-RocketMq封装
@Service
@RocketMQMessageListener(topic = "linkman-friend-request", consumerGroup = "im-linkman-consumer-group")
public class AddFriendConsumer implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener
private final static Integer MESSAGE_OVERDUE = 1000 * 60 *60;
@Override
public void onMessage(MessageExt sysUserLinkmanMessage)
if (System.currentTimeMillis()- sysUserLinkmanMessage.getBornTimestamp() > MESSAGE_OVERDUE)
log.warn("【添加好友MQ】消息[ ]已过期!",sysUserLinkmanMessage.getMsgId());
return;
catch (Exception e)
log.error("【添加好友】异常追踪码[ ],error: ", TraceId.logTraceID.get(), e.getMessage());
以上是关于RocketMQ解决历史消息(过期消息)消费问题的主要内容,如果未能解决你的问题,请参考以下文章
源码分析RocketMQ消息消费机制----消费者拉取消息机制
分布式事务解决方案 | Seata | 本地消息表 | 事务消息 | 最大努力通知 | 消息丢失重复消费堆积有序
分布式事务解决方案 | Seata | 本地消息表 | 事务消息 | 最大努力通知 | 消息丢失重复消费堆积 有序| 缓存数据库一致性