消息队列-延时消息实现

Posted Java入门到放弃

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息队列-延时消息实现相关的知识,希望对你有一定的参考价值。

        需求:模拟微信红包,24小时退款业务


        实现思路:

    

        1.发完红包,生产消息,消费者拿到消息后,睡一会(sleep),看看红包领了没有,没有继续睡觉,循环往复,直到距离红包发放时间已经过了24小时,发起退款。消息消费完毕。

工作时间,占着工位,睡大觉。不可取)


        2.发完红包,数据入库,搞一个定时任务,每60s查询一次状态为‘未领取’的红包,时间超过24小时的,执行退款,状态为修改‘已退回‘。

(基本每个红包,最多判断24*60次!!!浪费,每次查出来的红包数量也有点恐怖)


        3. ①发送消息到队列A,设置24小时过期时间,并且设置消息到期后转发到消息队列B。

            ②A队列无人监听,A队列内的消息都会在24小时候转发到队列B。

            ③消费者拿到B队列的消息,说明红包已经到24小时了,查询红包状态,已被领取,消费结束。没有被领取,执行退款。

(各司其职,A-专门睡觉的地方。B-专门干活的地方。恭喜中标



        开工:使用RabbitMq,定义三个队列。 定义一个死信队列的消息实体

/**
* 死信队列 (队列A)
*/

public Queue deadLetterQueue() {
   Map<String
, Object> arguments = new HashMap<>();
   
arguments.put("x-dead-letter-exchange", "exchange");
   
arguments.put("x-dead-letter-routing-key", "forwardQueue");
   
Queue queue = new Queue("deadLetterQueue",true,false,false,arguments);
   return
queue;
}

       

/**
* 消息到期后转发到本队列(队列C)
*/
@Bean
public Queue forwardQueue() {
return new Queue("forwardQueue");
}


//过期红包队列(队列B)
@Bean
public Queue timeuut() { return new Queue("timeout"); }

       

//死信队列消息实体
public class
DLXMessage {
private String queueName;
  private
Object content;
  private long
times;
}

   

准备工作完毕。红包发送,开始发送消息到A

public void sendMessageLatter(String queueName,Object message,long time){
DLXMessage dlxMessage = new DLXMessage(queueName,message,time);
   MessagePostProcessor mpp = m -> {
m.getMessageProperties().setExpiration(time+"");
       return m;
   };
   this.rabbitTemplate.convertAndSend("deadLetterQueue", dlxMessage, mpp);
}

睡到自然醒,24小时后,自动转发到队列C。


@RabbitListener(queues="forwardQueue")
public void forwardReceive(DLXMessage dlxMessage){
  messageSender.sendMessage(dlxMessage.getQueueName(),dlxMessage.getContent());
}

队列C的消费者,充当一个路由器,收到的是所有的过期的消息,然后根据定义好的队列名,发送消息到相应队列,就是队列B。


@RabbitListener(queues="timeout")
public void timeout(Object obj){
  Object 红包 = select(obj);
  if(红包.getStatus().equals("未领取")){
  //退款业务
  return;
  }
  //已经领取过了。啥也不干了。
}


终于轮到最终处理业务逻辑的队列B了,消费者拿到最终的消息,查询红包状态,如果依然是没领取的状态,那就该退款了。如果领取了,那啥也不用干了。


功能完成,下班走人。(bug明天再说。)

以上是关于消息队列-延时消息实现的主要内容,如果未能解决你的问题,请参考以下文章

Spring boot实战项目整合阿里云RocketMQ 消息队列实现发送普通消息,延时消息

rocketmq实现延时队列

消息队列-延时消息实现

RabbitMQ实现延时队列(死信队列)

RabbitMQ实现延时队列(死信队列)

Rabbitmq实现延时消息的两种方式