RRabbitMQ05_消息可靠性投递ACK限流处理TTL队列死信交换机延迟队列

Posted 所得皆惊喜

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RRabbitMQ05_消息可靠性投递ACK限流处理TTL队列死信交换机延迟队列相关的知识,希望对你有一定的参考价值。

文章目录

①. 消息可靠性投递 - 生产端

  • ①. 在使用RabbitMQ的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ为我们提供了两种方式用来控制消息的投递可靠性模式
  1. confirm确认模式:消息从producer到exchange则会返回一个confirmCallback
  2. return退回模式:消息从exchange到queue投递失败则会返回一个returnCallback
  3. 我们将利用这两个callback控制消息的可靠性投递
  • ②. rabbitmq整个消息投递的路径为:
  • ③. confirm生产端配置文件编写
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       https://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <!--加载配置文件-->
    <context:property-placeholder location="classpath:rabbitmq.properties"/>

    <!-- 定义rabbitmq connectionFactory -->
    <rabbit:connection-factory id="connectionFactory" host="$rabbitmq.host"
                               port="$rabbitmq.port"
                               username="$rabbitmq.username"
                               password="$rabbitmq.password"
                               virtual-host="$rabbitmq.virtual-host"
                               publisher-confirms="true"/>
    <!--定义管理交换机、队列-->
    <rabbit:admin connection-factory="connectionFactory"/>

    <!--消息的可靠性投递(生产端)-->
    <rabbit:queue id="test_queue_confirm" name="test_queue_confirm"></rabbit:queue>
    <rabbit:direct-exchange id="test_exchange_confirm" name="test_exchange_confirm">
        <rabbit:bindings>
            <rabbit:binding queue="test_queue_confirm" key="confirm"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
</beans>
  • ④. 发送消息
    @Test
    public void testConfirm()
        /**
         * 确认模式,步骤:
         * 1. 确认模式开启:connectionFactory中publisher-confirms="true
         * 2. 在rabbitTemplate定义ConfirmCallBack回调函数
         */
        //2.定义回调
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() 
            /**
             * @param correlationData 相关配置信息
             * @param ack exchange交换机,是否成功收到了消息,true 成功 false 失败
             * @param cause 失败原因,如果ack为false,那么cause就有值
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) 
                if(ack)
                    // 接收成功
                    System.out.println("接收成功消息:"+cause);
                else
                    // 接收失败
                    System.out.println("接收失败信息:"+cause);
                    // 做一些处理,让消息再次发送
                
                System.out.println("confirm方法被执行了...");
            
        );
        //3. 发送消息
        rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","message confirm");
    

  • ⑤. 回退模式ReturnCallBack代码编写
  1. 开启回退模式:publisher-returns=“true”
  2. 设置ReturnCallBack
  3. 设置Exchange处理消息的模式:
    如果消息没有路由到Queue,则丢弃消息(默认)
    如果消息没有路由到Queue,返回给消息发送方ReturnCallBack
	@Test
    public void testReturn() 
        //设置交换机处理失败消息的模式
        rabbitTemplate.setMandatory(true);
        //2.设置ReturnCallBack
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() 
            /**
             *
             * @param message   消息对象
             * @param replyCode 错误码
             * @param replyText 错误信息
             * @param exchange  交换机
             * @param routingKey 路由键
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) 
                System.out.println("return 执行了....");
                System.out.println(message);
                System.out.println(replyCode);
                System.out.println(replyText);
                System.out.println(exchange);
                System.out.println(routingKey);
                //处理
            
        );
        //3. 发送消息
        rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm2", "message confirm....");
    
return 执行了....
10:44:56.551 [main] DEBUG org.springframework.test.context.cache - Spring test ApplicationContext cache statistics: [DefaultContextCache@41e350f1 size = 1, maxSize = 32, parentContextCount = 0, hitCount = 2, missCount = 1]
(Body:'message confirm....' MessageProperties [headers=, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
312
NO_ROUTE
test_exchange_confirm
confirm2

②. Consumer ACK - 消费端

  • ①. ack指Acknowledge,确认。 表示消费端收到消息后的确认方式,有三种确认方式:
  1. 自动确认:acknowledge=“none”,如果业务处理失败了,消息就丢失了
  2. 手动确认:acknowledge=“manual”,会等待业务处理成功,才去消费
  3. 根据异常情况确认:acknowledge=“auto”(这种方式使用麻烦,不作讲解)
  • ②. 其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message从RabbitMQ的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失

  • ③. 如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息

  • ④. 消费端代码实现

  1. 配置文件中设置acknowledge=“manual”,手动签收
  2. 让监听器类实现ChannelAwareMessageListener接口
  3. 如果消息成功处理,则调用channel的basicAck()签收
  4. 如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       https://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <!--加载配置文件-->
    <context:property-placeholder location="classpath:rabbitmq.properties"/>

    <!-- 定义rabbitmq connectionFactory -->
    <rabbit:connection-factory id="connectionFactory" host="$rabbitmq.host"
                               port="$rabbitmq.port"
                               username="$rabbitmq.username"
                               password="$rabbitmq.password"
                               virtual-host="$rabbitmq.virtual-host"/>

    <bean id="ackListener" class="com.xiaozhi.rabbitmq.listener.AckListener"/>

    <rabbit:listener-container connection-factory="connectionFactory" auto-declare="true" acknowledge="manual">
        <rabbit:listener ref="ackListener" queue-names="test_queue_confirm"/>
    </rabbit:listener-container>
</beans>
public class AckListener implements ChannelAwareMessageListener 
    
    /**
     * Consumer ACK机制:
     *  1. 设置手动签收。acknowledge="manual"
     *  2. 让监听器类实现ChannelAwareMessageListener接口
     *  3. 如果消息成功处理,则调用channel的 basicAck()签收
     *  4. 如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer
     */
    @Override
    public void onMessage(Message message, Channel channel) throws Exception 
        // 接收到的一个标签
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try 
            //1.接收转换消息
            System.out.println(new String(message.getBody()));

            //2. 处理业务逻辑
            System.out.println("处理业务逻辑...");
            //int i = 3/0;//出现错误
            //3. 手动签收
            channel.basicAck(deliveryTag,false);
            System.out.println("============");
         catch (Exception e) 
            //e.printStackTrace();

            //4.拒绝签收
            /*
            第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
             */
            channel.basicNack(deliveryTag,true,true);
            //channel.basicReject(deliveryTag,true);
        
    

③. 限流处理limit - 消费端

  • ①. 如下图所示:如果在A系统中需要维护相关的业务功能,可能需要将A系统的服务停止,那么这个时候消息的生产者还是一直会向MQ中发送待处理的消息,消费者此时服务已经关闭,导致大量的消息都会在MQ中累积。如果当A系统成功启动后,默认情况下消息的消费者会一次性将MQ中累积的大量的消息全部拉取到自己的服务,导致服务在短时间内会处理大量的业务,可能会导致系统服务的崩溃。 所以消费端限流是非常有必要的

  • ②. 给mq中发送100条消息

  • ③. 可以通过MQ中的 listener-container配置属性
    perfetch = 1,表示消费端每次从mq拉去一条消息来消费,直到手动确认消费完毕后,才会继续拉取下一条消息

  • ④. 代码实现

  1. 编写QosListener监听类,保证当前的监听类消息处理机制是ACK为手动方式
  2. 在配置文件的listener-container配置属性中添加配置
    perfetch = 1,表示消费端每次从mq拉去一条消息来消费,直到手动确认消费完毕后,才会继续拉去下一条消息。
public class QosListener implements ChannelAwareMessageListener 

    @Override
    public void onMessage(Message message, Channel channel) throws Exception 

        Thread.sleep(1000);
        //1.获取消息
        System.out.println(new String(message.getBody()));
        //2. 处理业务逻辑
        //3. 签收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    

<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1" >

④. TTL过期队列 - 生产端

  • ①. TTL全称Time To Live(存活时间/过期时间)。当消息到达存活时间后,还没有被消费,会被自动清除。
    RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间

  • ②. RabbitMQ管理控制台设置过期时间


  • ③. 设置队列的过期时间
  1. 在消息的生产方中,在 spring-rabbitmq-producer.xml 配置文件中,添加如下配置:
  2. 编写发送消息测试方法
    <!--ttl-->
    <rabbit:queue name="test_queue_ttl" id="test_queue_ttl">
        <!--设置queue的参数-->
        <rabbit:queue-arguments>
            <!--队列的过期时间-->
            <entry key="x-message-ttl" value="30000" value-type="java.lang.Integer"></entry>
        </rabbit:queue-arguments>
    </rabbit:queue>

    <rabbit:topic-exchange name="test_exchange_ttl">
        <rabbit:bindings>
            <rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    /**
     * TTL:过期时间
     * 1. 队列统一过期
     * 2. 消息的单独过期
     */
    @Test
    public void ttlTest()
        for (int i = 0; i < 10; i++) 
            rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.message","ttl message");
        
    

  • ④. 设置单个消息的过期时间
  1. 如果设置了消息的过期时间,也设置了队列的过期时间,它以短的为准
  2. 队列过期后,会将队列所有消息全部移除
  3. 消息过期后,只有消息在队列顶端,才会去判断是否过期,如果过期,会移除掉
    /**
     * 如果设置了消息的过期时间,也设置了队列的过期时间,它以短的为准
     * 队列过期后,会将队列所有消息全部移除
     * 消息过期后,只有消息在队列顶端,才会去判断是否过期,如果过期,会移除掉
     */
    @Test
    public void ttlTest2()
        for (int i = 0; i < 10; i++) 
             MessagePostProcessor postProcessor = new MessagePostProcessor()
                 @Override
                 public Message postProcessMessage(Message message) throws AmqpException 
                     //1. 设置message的信息
                     message.getMessageProperties().setExpiration("5000");//消息的过期时间
                     return message;
                 
             ;
            rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.message","ttl message",postProcessor);
        
    
  • ⑤. TTL队列总结
  1. 设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期
  2. 设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期
  3. 如果两者都进行了设置,以时间短的为准

⑤. 死信队列 - 交换机

  • ①. 死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX
  • ②. 消息成为死信的三种情况
  1. 队列消息长度到达限制(设置了某个消息最大为10,超过10后的消息为死信)
  2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false
  3. 原队列存在消息过期设置,消息到达超时时间未被消费
  • ③. 队列绑定死信交换机
    给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key

  • ④. 代码编码

    <!--死信队列-->
    <!--1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)-->
    <rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
        <!--3. 正常队列绑定死信交换机-->
        <rabbit:queue-arguments>
            <!--3.1 x-dead-letter-exchange:死信交换机名称-->
            <!--<entry key="x-dead-letter-exchange" value="exchange_dlx"/>-->
            <!--3.2 x-dead-letter-routing-key:发送给死信交换机的routing key (发送消息时指定的route key)-->
            <entry key="x-dead-letter-routing-key" value="dlx.dead"></entry>
            <!--(1). 设置队列的过期时间 x-message-ttl-->
            <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"></entry>
            <!--(2). 设置队列的长度限制 x-max-length-->
            <entry key="x-max-length" value="10" value-type="java.lang.Integer"></entry>
        </rabbit:queue-arguments>
    </rabbit:queue>
    <rabbit:topic-exchange name="test_exchange_dlx">
        <rabbit:bindings>
            <rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    
    <!--2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)-->
    <rabbit:queue name="queue_dlx" id="queue_dlx">以上是关于RRabbitMQ05_消息可靠性投递ACK限流处理TTL队列死信交换机延迟队列的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ高级进阶(2021.05.29)

RabbitMQ高级进阶(2021.05.29)

(二)RocketMQ订阅与发布

消息队列专题(高级特性篇):RabbitMQ 如何保证消息的可靠性投递传输和消费

消息队列专题(高级特性篇):RabbitMQ 如何保证消息的可靠性投递传输和消费

消息队列专题(高级特性篇):RabbitMQ 如何保证消息的可靠性投递传输和消费