消息队列-延时消息实现
Posted Java入门到放弃
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息队列-延时消息实现相关的知识,希望对你有一定的参考价值。
需求:模拟微信红包,24小时退款业务
实现思路:
1.发完红包,生产消息,消费者拿到消息后,睡一会(sleep),看看红包领了没有,没有继续睡觉,循环往复,直到距离红包发放时间已经过了24小时,发起退款。消息消费完毕。
(工作时间,占着工位,睡大觉。不可取)
2.发完红包,数据入库,搞一个定时任务,每60s查询一次状态为‘未领取’的红包,时间超过24小时的,执行退款,状态为修改‘已退回‘。
(基本每个红包,最多判断24*60次!!!浪费,每次查出来的红包数量也有点恐怖)
3. ①发送消息到队列A,设置24小时过期时间,并且设置消息到期后转发到消息队列B。
②A队列无人监听,A队列内的消息都会在24小时候转发到队列B。
③消费者拿到B队列的消息,说明红包已经到24小时了,查询红包状态,已被领取,消费结束。没有被领取,执行退款。
(各司其职,A-专门睡觉的地方。B-专门干活的地方。恭喜中标)
开工:使用RabbitMq,定义三个队列。 定义一个死信队列的消息实体
/**
* 死信队列 (队列A)
*/
public Queue deadLetterQueue() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "exchange");
arguments.put("x-dead-letter-routing-key", "forwardQueue");
Queue queue = new Queue("deadLetterQueue",true,false,false,arguments);
return queue;
}
/**
* 消息到期后转发到本队列(队列C)
*/
@Bean
public Queue forwardQueue() {
return new Queue("forwardQueue");
}
//过期红包队列(队列B)
@Bean
public Queue timeuut() { return new Queue("timeout"); }
//死信队列消息实体
public class DLXMessage {
private String queueName;
private Object content;
private long times;
}
准备工作完毕。红包发送,开始发送消息到A
public void sendMessageLatter(String queueName,Object message,long time){
DLXMessage dlxMessage = new DLXMessage(queueName,message,time);
MessagePostProcessor mpp = m -> {
m.getMessageProperties().setExpiration(time+"");
return m;
};
this.rabbitTemplate.convertAndSend("deadLetterQueue", dlxMessage, mpp);
}
睡到自然醒,24小时后,自动转发到队列C。
@RabbitListener(queues="forwardQueue")
public void forwardReceive(DLXMessage dlxMessage){
messageSender.sendMessage(dlxMessage.getQueueName(),dlxMessage.getContent());
}
队列C的消费者,充当一个路由器,收到的是所有的过期的消息,然后根据定义好的队列名,发送消息到相应队列,就是队列B。
@RabbitListener(queues="timeout")
public void timeout(Object obj){
Object 红包 = select(obj);
if(红包.getStatus().equals("未领取")){
//退款业务
return;
}
//已经领取过了。啥也不干了。
}
终于轮到最终处理业务逻辑的队列B了,消费者拿到最终的消息,查询红包状态,如果依然是没领取的状态,那就该退款了。如果领取了,那啥也不用干了。
功能完成,下班走人。(bug明天再说。)
以上是关于消息队列-延时消息实现的主要内容,如果未能解决你的问题,请参考以下文章