通过RabbitMQ的DIRECT模式以及死信队列实现延时任务

Posted 学习使得吾快乐

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了通过RabbitMQ的DIRECT模式以及死信队列实现延时任务相关的知识,希望对你有一定的参考价值。

运用RabbitMQ的DIRECT模式以及死信队列实现延时操作以及不同间隔时间后重试

一 、原理描述

图解:

  • 一条绑定路由为【FOR_QUEUE1】的消息被发送到交换机【EXCHANGE】上
RabbitTemplate.convertSendAndReceive("EXCHANGE","FOR_QUEUE1","我是一条消息");
  • 【EXCHANGE】根据自己身上队列绑定的路由,将这个消息发给了队列【QUEUE_NAME1】
  • 消费者代码在处理队列【QUEUE_NAME1】中的这条消息的时候,没有整好,出现了异常,导致这条消息没有被消费
  • 这条消息就会根据队列【QUEUE_NAME1】绑定的死信路由和死信交换机 变成一条
    绑定路由为【FOR_DEAD_QUEUE1】的消息发往交换机【DEAD_EXCHANGE】
  • 消息在死信队列的时候就不搞它了,等它蹲满日子——死信队列【DEAD_QUEUE_NAME1】的TTL:30s
  • 日子蹲满后,这条消息就会根据死信队列【DEAD_QUEUE_NAME1】绑定的死信路由和死信交换机 自动变成一条
    绑定路由为【FOR_QUEUE2】的消息发往交换机【EXCHANGE】

然后接着重复上面的流程·······

具体的效果就可以是:

处理RabbitMQ中的消息时,因为响应时间过久,导致出现异常。于是,系统会在30s再重新处理该消息。要是还出现异常,系统就会再1min后再次处理该消息。
最后还出现异常,就去他的,爱咋咋。

二、 具体代码

配置文件:src\\main\\resources\\application.yml

spring:
  rabbitmq:
    host:  #地址
    username:  #用户名
    password:  #密码
    port: #端口
    virtual-host: #虚拟主机
    listener:
      type: simple #监听类型
      simple:
        default-requeue-rejected: false #无法消费的消息因此进入死信队列
        acknowledge-mode: manual  #设置手动消费消息
config:
  rabbitmg:
    ttl1: 30000 # 30 s
    ttl2: 60000 # 1 min

RabbitMQ生成文件:com/study/config/RabbitMQConfig.java


@Configuration
public class RabbitMQConfig 
    //交换机
    public static final String EXCHANGE_NAME = "exchange";
    public static final String DEAD_EXCHANGE_NAME = "dead.exchange";
    //队列
    public static final String QUEUE_NAME1 = "queue1";
    public static final String QUEUE_NAME2 = "queue2";
    public static final String QUEUE_NAME3 = "queue3";
    public static final String DEAD_QUEUE_NAME1 = "delay.queue1";
    public static final String DEAD_QUEUE_NAME2 = "delay.queue2";
    public static final String DEAD_QUEUE_NAME3 = "delay.queue3";
    //各队列绑定的路由
    public static final String QUEUE1_ROUTINGKEY = "key.notify1";
    public static final String QUEUE2_ROUTINGKEY = "key.notify2";
    public static final String QUEUE3_ROUTINGKEY = "key.notify3";
    public static final String DEAD_QUEUE1_ROUTINGKEY = "key.delay1";
    public static final String DEAD_QUEUE2_ROUTINGKEY = "key.delay2";
    public static final String DEAD_QUEUE3_ROUTINGKEY = "key.delay3";

    //各死信队列的ttl 未消费消息的过期时间
    @Value("$config.rabbitmg.ttl1")
    private int DEAD_QUEUE_TTL1;
    @Value("$config.rabbitmg.ttl2")
    private int DEAD_QUEUE_TTL2;

    //声明业务EXchange
    @Bean("exchange")
    public DirectExchange exchange()
        return new DirectExchange(EXCHANGE_NAME);
    

    //声明私信Exchange
    @Bean("deadExchange")
    public DirectExchange deadExchange()
        return new DirectExchange(DEAD_EXCHANGE_NAME);
    

    //声明支付信息队列1
    @Bean("queue1")
    public Queue queue1()
        Map<String,Object> args = new HashMap<>(2);
        //       x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange",DEAD_EXCHANGE_NAME);
        //       x-dead-letter-routing-key  这里声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key",DEAD_QUEUE1_ROUTINGKEY);
        return QueueBuilder.durable(QUEUE_NAME1).withArguments(args).build();
    
    //声明支付信息队列2
    @Bean("queue2")
    public Queue queue2()
        Map<String,Object> args = new HashMap<>(2);
        //       x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange",DEAD_EXCHANGE_NAME);
        //       x-dead-letter-routing-key  这里声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key",DEAD_QUEUE2_ROUTINGKEY);
        return QueueBuilder.durable(QUEUE_NAME2).withArguments(args).build();
    
    //声明支付信息队列3
    @Bean("queue3")
    public Queue queue3()
        Map<String,Object> args = new HashMap<>(1);
        //       x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange",DEAD_EXCHANGE_NAME);
//        //       x-dead-letter-routing-key  这里声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key",DEAD_QUEUE3_ROUTINGKEY);
        return QueueBuilder.durable(QUEUE_NAME3).withArguments(args).build();
    
   

    //声明死信队列1
    @Bean("deadQueue1")
    public Queue deadQueue1()
        Map<String,Object> args = new HashMap<>(3);
        //       x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange",EXCHANGE_NAME);
        //       x-dead-letter-routing-key  这里声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key",QUEUE2_ROUTINGKEY);
        //       x-message-ttl  这里声明消息未被消费的过期时间
        args.put("x-message-ttl",DEAD_QUEUE_TTL1);
        return QueueBuilder.durable(DEAD_QUEUE_NAME1).withArguments(args).build();
    
    //声明死信队列2
    @Bean("deadQueue2")
    public Queue deadQueue2()
        Map<String,Object> args = new HashMap<>(3);
        //       x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange",EXCHANGE_NAME);
        //       x-dead-letter-routing-key  这里声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key",QUEUE3_ROUTINGKEY);
        //       x-message-ttl  这里声明消息未被消费的过期时间
        args.put("x-message-ttl",DEAD_QUEUE_TTL2);
        return QueueBuilder.durable(DEAD_QUEUE_NAME2).withArguments(args).build();
    
    //声明死信队列3
    @Bean("deadQueue3")
    public Queue deadQueue3()
        Map<String,Object> args = new HashMap<>(2);
        //       x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange",EXCHANGE_NAME);
        //       x-dead-letter-routing-key  这里声明当前队列的死信路由key
        // args.put("x-dead-letter-routing-key",QUEUE4_ROUTINGKEY);
        return QueueBuilder.durable(DEAD_QUEUE_NAME3).withArguments(args).build();
    
   

   //声明支付信息序列1绑定的关系
    @Bean
    public Binding infoBinding1(@Qualifier("queue1") Queue queue,
                                   @Qualifier("exchange") DirectExchange exchange)
        return BindingBuilder.bind(queue).to(exchange).with(QUEUE1_ROUTINGKEY);
    
    //声明支付信息序列2绑定的关系
    @Bean
    public Binding infoBinding2(@Qualifier("queue2") Queue queue,
                                   @Qualifier("exchange") DirectExchange exchange)
        return BindingBuilder.bind(queue).to(exchange).with(QUEUE2_ROUTINGKEY);
    
    //声明支付信息序列3绑定的关系
    @Bean
    public Binding infoBinding3(@Qualifier("queue3") Queue queue,
                                   @Qualifier("exchange") DirectExchange exchange)
        return BindingBuilder.bind(queue).to(exchange).with(QUEUE3_ROUTINGKEY);
    

    //声明死信队列1绑定的关系
    @Bean
    public Binding deadInfoBinding1(@Qualifier("deadQueue1") Queue queue,
                                       @Qualifier("deadExchange") DirectExchange exchange)
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_QUEUE1_ROUTINGKEY);
    
    //声明死信队列2绑定的关系
    @Bean
    public Binding deadInfoBinding2(@Qualifier("deadQueue2") Queue queue,
                                       @Qualifier("deadExchange") DirectExchange exchange)
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_QUEUE2_ROUTINGKEY);
    
    //声明死信队列3绑定的关系
    @Bean
    public Binding deadInfoBinding3(@Qualifier("deadQueue3") Queue queue,
                                       @Qualifier("deadExchange") DirectExchange exchange)
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_QUEUE3_ROUTINGKEY);
       


消费者:com/test/listener/InfoListener.java

@Component
public class InfoListener 

    public static final Logger logger = LoggerFactory.getLogger(InfoListener.class);
	
    @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME1, RabbitMQConfig.QUEUE_NAME2, RabbitMQConfig.QUEUE_NAME3)
    public void listenInfo(Message message, Channel channel) throws Exception 
		String queueName = message.getMessageProperties().getConsumerQueue(); //获取消息序列的名字
        String msg = new String(message.getBody());
        Exception e = null;
        logger.info("-------------开始处理序列[" + queueName + "]中的消息-------------");
        try 
            //相关的业务流程
            if ("1".equals(msg)) 
                throw new Exception("找茬的来了");
            
         catch (Exception finalException) 
            e =finalException;
        
        if (e == null) 
            logger.info("-------------成功处理了序列[" + queueName + "]中的消息-------------");
            //因为配置文件中的acknowledge-mode设为了manual,需要手动ACK来标识该消息已被消费
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
         else 
            logger.info("-------------处理序列[" + queueName + "]中的消息时出现了异常:" + e.getMessage()+"-------------");
            //Nack后消息就会被发往死信序列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        

生产者:com/test/sender/InfoSender.java

@Component
public class PayInfoSender 

    @Autowired
    private RabbitTemplate template;

    public void sendMsg(String msg)
	//生成一条绑定路由为RabbitMQConfig.QUEUE1_ROUTINGKEY的发往RabbitMQConfig.EXCHANGE_NAME交换机的消息
        template.convertSendAndReceive(RabbitMQConfig.EXCHANGE_NAME,RabbitMQConfig.QUEUE1_ROUTINGKEY,msg); 

Controller:com/test/controller/TestController.java

@RestController
@RequestMapping("rabbitmq")
public class TestController 
    @Autowired
    PayInfoSender payInfoSender;
    @RequestMapping("sendmsg")
    public void test(HttpServletRequest request )
        String id = request.getParameter("id");        
        payInfoSender.sendMsg(id);
    

三、具体效果

成功演示:

发送请求:http://localhost:8081/rabbitmq/sendmsg?id=2

控制台:

失败演示:

发送请求:http://localhost:8081/rabbitmq/sendmsg?id=1

控制台:

四、参考文献

http://www.rabbitmq.com/getstarted.html
https://blog.csdn.net/whitebearclimb/article/details/108959110
https://www.cnblogs.com/mfrank/p/11184929.html

转载声明:https://blog.csdn.net/weixin_45884459/article/details/111508104

以上是关于通过RabbitMQ的DIRECT模式以及死信队列实现延时任务的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ死信交换永远不会得到消息

RabbitMQ入门案例

RabbitMQ一文带你搞定RabbitMQ延迟队列

RabbitMQ学习(中)——交换机死信队列和延迟队列

RabbitMQ延迟队列

RabbitMQ:第二章:Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,防止消息丢失,TTL,死信队列,延迟队列,消息积压,消息幂等性)(代码