MQ-死信队列实现消息延迟

Posted 啃瓜子的松鼠

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MQ-死信队列实现消息延迟相关的知识,希望对你有一定的参考价值。

死信队列实现消息延迟

一、延迟队列

延迟队列:消息进入到队列之后,延迟指定的时间才能被消费者消费。

AMQP协议和RabbitMQ队列本身是不支持延迟队列功能的,但是可以通过TTL(Time To Live)特性模拟延迟队列的功能。

TTL就是消息的存活时间,RabbitMQ可以分别对队列和消息设置存活时间。

  • 在创建队列的时候可以设置队列的存活时间,消息进入队列后,在存活时间内没有被消费者消费,则此消息会从当前队列移除。
  • 创建消息队列没有设置TTL,但是消息设置了TTL,那么当消息的存活时间结束,也会被移除。
  • 当TTL结束之后,我们可以指定将当前队列的消息转存到其他指定的队列。

二、使用延迟队列实现订单支付监控

  • 实现流程图如图:

  • 创建路由交换机

  • 创建消息队列

  • 创建死信队列

  • 队列绑定

  • 发送消息到交换机delay_exchange的k1(即消息队列delay_queue1)

    //普通maven项目演示
    //发送消息
    public class SendMsg 
    
        public static void main(String[] args) 
            System.out.println("请输入消息:");
            Scanner input = new Scanner(System.in);
            String msg = input.nextLine();
    
            Connection connection = null;
    
            try 
                //获取连接,相当于JDBC的获取数据库连接
                connection = MQUtil.getConnection();
     
                Channel channel = connection.createChannel();
    
                //发消息之前开启消息确认
                channel.confirmSelect();
    
                channel.basicPublish("delay_exchange","k1",null,msg.getBytes());
    
                //消息发送之后等待消息反馈
                try 
                    boolean b = channel.waitForConfirms();
                    System.out.println("发送--->" + msg + (b ? "成功": "失败"));
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
    
                //关闭
                channel.close();
                connection.close();
    
             catch (IOException e) 
                e.printStackTrace();
             catch (TimeoutException e) 
                e.printStackTrace();
            
    
    

    发送消息:


此时开启接收队列delay_queue2(而不是delay_queue1)的消息:会发现不能实时接收,需要等到delay_queue1的TTL时间到后才能成功接收到消息。

//普通maven项目演示
//接受消息
public class ReceiveMsg 

    public static void main(String[] args) 

        Connection connection = null;

        //获取连接,相当于JDBC的获取数据库连接
        try 
            connection = MQUtil.getConnection();
            Channel channel = connection.createChannel();

            //声明要关注的队列
            //channel.queueDeclare("queue1", false, false, false, null);
            //DefaultConsumer类实现了Consumer接口,通过传入一个频道,
            // 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
            Consumer consumer = new DefaultConsumer(channel) 
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                    String message = new String(body, "UTF-8");
                    System.out.println("consumer2消费消息:'" + message + "'");
                
            ;
            //自动回复队列应答 -- RabbitMQ中的消息确认机制
            channel.basicConsume("delay_queue2", true, consumer);

         catch (IOException e) 
            e.printStackTrace();
         catch (TimeoutException e) 
            e.printStackTrace();
        
    

由于在前面创建死信队列设置的delay_queue1的TTL时间为10s,因此间隔10s后成功接收到消息:


演示完毕!

以上是关于MQ-死信队列实现消息延迟的主要内容,如果未能解决你的问题,请参考以下文章

MQ 死信队列/延迟队列-( 商品秒杀后30分钟之内付款)

RabbitMQ-消息可靠性&延迟消息

RabbitMQ之消息可靠性死信交换机惰性队列及集群

RabbitMQ之消息可靠性死信交换机惰性队列及集群

消息队列 - 死信、延迟、重试队列

「MQ实战」RabbitMQ 延迟队列,消息延迟推送