RabbitMQ高级特性(2)

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ高级特性(2)相关的知识,希望对你有一定的参考价值。

参考技术A 一、消费端限流

1.什么是消费端的限流

假设一个场景,首先,Rabbitmq服务器有上万条未处理的消息,我们随便打开一个消费者客户端,就会出现:巨大的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据!

2.RabbitMQ提供了一种Qos(服务质量控制)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于consume或者channel设置Qos的值)未被确认前,不进行消费新的消息。

void BasicQos(uint prefetchSize,ushort prefetchCount,bool global);

prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ACK,则该comsumer将block掉,直到有消息ack

global:true/false 是否将上面的设置应用于channel,就是限制是channel级别还是consumer级别

二、消费端的手动ACK和NACK

消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿!

如果由于服务器宕机等严重问题,那我们就需要手动进行ACK保障消费端消费成功!

三、消费端的重回队列

1.消费端重回队列是为了对没有处理成功的消息,把消息重新会递给Broker

2.一般我们在实际应用中,都会关闭重回队列,也就是设置为false

四、TTL队列/消息

1.TTL

TTL:time  to live 的缩写,也就是生存时间

RabbitMQ支持消息的过期时间,在消息发送时可以进行指定

RabbitMQ支持队列的过去时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动删除

2.DLX

DLX:Dead-Letter-Exchange,死信队列

利用DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX

DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性

当一个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列

可以监听DLX这个队列中的消息做相应的处理,这个特性可以弥补RabbitMQ3.0以前支持的immediate参数的功能

3.消息变成死信的情况

消息被拒绝(basic.reject/basic.nack),并且requeue=false

消息TTL过期

队列达到最大长度

4.DLX死信队列设置

首先需要设置死信队列的exchange(dlx.exchange)、queue(dlx.queue)和路由规则(RoutingKey:#),然后进行绑定

然后进行正常交换机、队列、绑定,只不过我们需要在队列上加上一个参数即可:arguments.put("x-dead-letter-exchange","dlx.exchange");

RabbitMQ——高级特性(SpringBoot实现)

本篇文章的内容与我之前如下这篇文章一样,只是使用技术不同,本篇文章使用SpringBoot实现RabbitMQ的高级特性!

RabbitMQ——高级特性_小曹爱编程!的博客-CSDN博客RabbitMQ——高级特性:1、RabbitMQ高级特性;2、RabbitMQ应用问题;3、RabbitMQ集群搭建https://blog.csdn.net/weixin_62993347/article/details/128521074?spm=1001.2014.3001.5502

准备工作

 我们首先建一个工程,接下来的代码都在这个工程里完成。

主要分为生产者和消费者

依赖

<!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

yml主要配置

server:
  port: 8100
spring:
  rabbitmq:
    host: 101.43.13.118
    port: 5672
    username: admin
    password: 123456

在生产者端和消费者端都编写模板配置文件RabbitConfig,后面在配置文件里加队列、交换机等

@Configuration
public class RabbitConfig 
    @Resource
    //定义管理交换机、队列
    CachingConnectionFactory connectionFactory;

    //rabbitmq模板
    //定义rabbitTemplate对象操作可以在代码中方便发送消息
    @Bean(value = "rabbitTemplate")
    public RabbitTemplate rabbitTemplate()
        return new RabbitTemplate(connectionFactory);
    
    

 1、消息的可靠性投递

说到RabbitMQ里的消息可靠性投递,那我们不得不提到TCC事务(分布式事务)。

TCC事务

TCC的核心思想是:针对每一个操作都需要注册一个和其相对应的确认和补偿的操作,他分为三个阶段Try、Confirm和Cancel

  • Try 阶段主要是对业务系统做检测及资源预留
  • Confirm 阶段主要是对业务系统做确认提交,Try阶段执行成功并开始执行Confirm阶段时,默认Confirm阶段是不会出错的。即:只要Try成功,Confirm一定成功。若Confirm阶段真的出错了,需引入重试机制或人工处理。
  • Cancel阶段主要是在业务执行错误,需要回滚的状态下执行的业务取消,预留资源释放。

RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。

⚫ confirm 确认模式

⚫ return 退回模式

1、确认模式confirm的SpringBoot实现

生产端yml

server:
  port: 8100
spring:
  rabbitmq:
    host: 101.43.13.118
    port: 5672
    username: admin
    password: 123456
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual  #开启ACK:手动确认
        retry:
          max-attempts: 5  #重试次数
          initial-interval: 5000  #重试间隔时间
          enabled: true  #开启消费重试
        default-requeue-rejected: false  #重试次数超过
    publisher-returns: true #开启回退模式
    publisher-confirm-type: correlated #确认模式开启(新版本)

首先在生产端的RabbitConfig文件里定义队列和交换机,并绑定。

    //队列
    @Bean
    public Queue confirmQueue()
        return new Queue("confirm.queue");
    
    //路由模式(routes)交换机(direct)
    @Bean
    public DirectExchange confirmExchange()
        return new DirectExchange("confirmExchange");
    
    //绑定路由交换机和队列
    //bind:队列
    //to:交换机
    //with:routingKey
    @Bean
    public Binding bindingDirectQueue(Queue confirmQueue,DirectExchange confirmExchange)
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with("confirm");
    

书写一个测试方法,测试生产端,确认模式

@SpringBootTest(classes = com.chw.ProducerMain.class)
public class ProducerTesty 
    @Resource
    private RabbitTemplate rabbitTemplate;
    /**
     * 确认模式:
     * 步骤:
     * 1. 确认模式开启:ConnectionFactory中开启publisher-confirms="true"
     * yml里面配置了publisher-confirm-type: correlated #确认模式开启(新版本)
     * 2. 在rabbitTemplate定义ConfirmCallBack回调函数
     */
    @Test
    public void testConfirm() throws InterruptedException 
        //设置交换机处理失败消息的模式
        rabbitTemplate.setMandatory(true);
        //2. 定义回调
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() 
            /**
             * @param correlationData 相关配置信息
             * @param ack   exchange交换机 是否成功收到了消息。true 成功,false代表失败
             * @param cause 失败原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) 
                System.out.println("confirm方法被执行了....");
                if (ack) 
                    //接收成功
                    System.out.println("接收成功消息" + cause);
                 else 
                    //接收失败
                    System.out.println("接收失败消息" + cause);
                    //做一些处理,让消息再次发送。
                
            
        );
        //关键关联数据
        CorrelationData data =new CorrelationData(UUID.randomUUID().toString());
        //3. 发送消息
        //接收成功
        //confirm方法被执行了....
        //接收成功消息null
        rabbitTemplate.convertAndSend("confirmExchange", "confirm", "message confirm....",data);
        //接收失败
        //confirm方法被执行了....
        //接收失败消息channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirmExchange111' in vhost '/', class-id=60, method-id=40)
//        rabbitTemplate.convertAndSend("confirmExchange111", "confirm", "message confirm....",data);
        Thread.sleep(200);
    

2、return 退回模式

书写一个测试方法,测试生产端,退回模式

/**
     * 回退模式: 当消息发送给Exchange后,Exchange路由到Queue失败时 才会执行 ReturnCallBack
     * 步骤:
     * 1. 开启回退模式:publisher-returns="true"
     * yml里开启了
     * 2. 设置ReturnCallBack
     * 3. 设置Exchange处理消息的模式:
     *      1. 如果消息没有路由到Queue,则丢弃消息(默认)
     *      2. 如果消息没有路由到Queue,返回给消息发送方ReturnCallBack
     */
    @Test
    public void testReturn() throws InterruptedException 
        //设置交换机处理失败消息的模式
        rabbitTemplate.setMandatory(true);
        //2.设置ReturnCallBack
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() 
            /**
             * @param message   消息对象
             * @param replyCode 错误码
             * @param replyText 错误信息
             * @param exchange  交换机
             * @param routingKey 路由键
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) 
                System.out.println("return 执行了....");
                System.out.println(message);
                System.out.println(replyCode);
                System.out.println(replyText);
                System.out.println(exchange);
                System.out.println(routingKey);
                //处理
            
        );
        //关键关联数据
        CorrelationData data =new CorrelationData(UUID.randomUUID().toString());
        //3. 发送消息
        rabbitTemplate.convertAndSend("confirmExchange", "confirm111", "message confirm....",data);
        /**
         * return 执行了....
         * (Body:'message confirm....' MessageProperties [headers=spring_returned_message_correlation=afa438b2-5e6e-41f9-91e5-cf0905bb4330, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
         * 312
         * NO_ROUTE
         * confirmExchange
         * confirm111
         */
        Thread.sleep(200);
    

1.2、Consumer ACK

ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。

有三种确认方式:

yml里都可以配置

• 自动确认:acknowledge="none"

• 手动确认:acknowledge="manual"

• 根据异常情况确认:acknowledge="auto",(这种方式使用麻烦,不作讲解)

我们来到消费端

消费端yml

server:
  port: 8100
spring:
  rabbitmq:
    host: 101.43.13.118
    port: 5672
    username: admin
    password: 123456
    publisher-returns: true #开启回退模式
    publisher-confirm-type: correlated #确认模式开启(新版本)
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual  #开启ACK:手动确认
        retry:
          max-attempts: 5  #重试次数
          initial-interval: 5000  #重试间隔时间
          enabled: true  #开启消费重试
        default-requeue-rejected: false  #重试次数超过
        prefetch: 1 #设置消费端一次拉取多少消息。

我们在业务层里写队列监听

/**
     * Consumer ACK机制:
     *  1. 设置手动签收。acknowledge="manual"
     *  yml里设置了
     *  2. 使用RabbitListener注解,写明监听的队列queue
     *  3. 如果消息成功处理,则调用channel的 basicAck()签收
     *  4. 如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer
     * @param message
     * @param channel
     * @throws IOException
     */
@Service
public class ConsumerService 
    @RabbitListener(queues = "confirm.queue")
    public void consumerAck(Message message, Channel channel) throws IOException 
        //message:队列里的消息的所有信息        
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try 
            //1.接收转换消息
            System.out.println(new String(message.getBody()));

            //2. 处理业务逻辑
            System.out.println("处理业务逻辑...");
//            int i = 3/0;//出现错误
            //3. 手动签收
            channel.basicAck(deliveryTag,true);
         catch (Exception e) 
            //e.printStackTrace();
            //4.拒绝签收
            //三个参数:tag -关联数据,bl-不批量确认,b1:true:放回队列,false:不放回队列
            /*
            第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
             */
            channel.basicNack(deliveryTag,true,true);
            //channel.basicReject(deliveryTag,true);
        
    

 1.3、消费端限流

/**
     * Consumer 限流机制
     *  1. 确保ack机制为手动确认。(在ack基础上)
     *  2. listener-container配置属性
     *  在yml里设置perfetch = 1,表示消费端每次从mq拉去一条消息来消费,直到手动确认消费完毕后,才会继续拉去下一条消息。
     * @param message
     * @param channel
     * @throws Exception
     */
    @RabbitListener(queues = "confirm.queue")
    public void consumerQos(Message message, Channel channel) throws Exception
        Thread.sleep(1000);
        //1.获取消息
        System.out.println(new String(message.getBody()));
        //2. 处理业务逻辑
        //3. 签收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    

小结

在yml中配置 prefetch属性设置消费端一次拉取多少消息。

消费端的确认模式一定为手动确认。yml配置里acknowledge="manual"。

4、TTL

TTL 全称 Time To Live(存活时间/过期时间)

  • 当消息到达存活时间后,还没有被消费,会被自动清除。
  • RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间

首先在生产端的RabbitConfig文件里定义队列和交换机,并绑定。

//ttl队列
    @Bean
    public Queue ttlQueue()
        Map<String, Object> args = new HashMap<>(1);
        // x-message-ttl  声明队列的TTL:队列的过期时间
        args.put("x-message-ttl",10000);
        return QueueBuilder.durable("ttl.queue").withArguments(args).build();
    
    //ttl通配符交换机
    @Bean
    public TopicExchange ttlExchange()
        return new TopicExchange("ttlExchange");
    
    //绑定TTL通配符交换机和TTL队列
    @Bean
    public Binding bindingDirectQueue(Queue ttlQueue,TopicExchange ttlExchange)
        return BindingBuilder.bind(ttlQueue).to(ttlExchange).with("ttl.#");
    

书写测试方法测试TTL

/**
     * TTL:过期时间
     *  1. 队列统一过期
     *  2. 消息单独过期
     * 如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。
     * 队列过期后,会将队列所有消息全部移除。
     * 消息过期后,只有消息在队列顶端,才会判断其是否过期(移除掉)
     */
    @Test
    public void testTtl() 
        /*for (int i = 0; i < 10; i++) 
            // 发送消息
            //十秒后队列里的消息会清空,因为ttl队列过期时间为10秒
            rabbitTemplate.convertAndSend("ttlExchange", "ttl.hehe", "message ttl....");
        */

        //消息的后处理对象,设置一些消息的参数信息
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() 
            @Override
            public Message postProcessMessage(Message message) throws AmqpException 
                //1.设置message的信息
                message.getMessageProperties().setExpiration("5000");//设置消息的过期时间
                //2.返回该消息
                return message;
            
        ;
        //消息单独过期
//        rabbitTemplate.convertAndSend("ttlExchange", "ttl.hehe", "message ttl....",messagePostProcessor);

        for (int i = 0; i < 10; i++) 
            if(i == 5)
                //消息单独过期
                rabbitTemplate.convertAndSend("ttlExchange", "ttl.hehe", "message ttl....",messagePostProcessor);
            else
                //不过期的消息
                rabbitTemplate.convertAndSend("ttlExchange", "ttl.hehe", "message ttl....");
            
        
    

TTL 小结

  • 设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。

  • 设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。

  • 如果两者都进行了设置,以时间短的为准。

1.5、死信队列

死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。

队列绑定死信交换机:

给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key

首先在生产端的RabbitConfig文件里定义队列和交换机,并绑定。

//正常dlx队列
    @Bean
    public Queue dlxQueue()
        //正常队列关联死信交换机
        Map<String, Object> args = new HashMap<>(4);
        // x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", "dlxDeadExchange");
        // x-dead-letter-routing-key  这里声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key","dlxDead.hehe");
        // x-message-ttl  声明队列的TTL 声明队列的过期时间
        args.put("x-message-ttl",10000);
        //设置队列的长度限制 max-length
        args.put("x-max-length",10);
        return QueueBuilder.durable("dlx.queue").withArguments(args).build();
    
    //dlx死信队列
    @Bean
    public Queue dlxDeadQueue()
        return new Queue("dlxDead.queue");
    
    //dlx通配符交换机
    @Bean
    public TopicExchange dlxExchange()
        return new TopicExchange("dlxExchange");
    
    //dlx死信通配符交换机
    @Bean
    public TopicExchange dlxDeadExchange()
        return new TopicExchange("dlxDeadExchange");
    
    //绑定dlx正常队列
    @Bean
    public Binding bindingDxlQueue(Queue dlxQueue,TopicExchange dlxExchange)
        return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("dlx.#");
    
    //绑定dlx死信队列
    @Bean
    public Binding bindingDxlDeadQueue(Queue dlxDeadQueue,TopicExchange dlxDeadExchange)
        return BindingBuilder.bind(dlxDeadQueue).to(dlxDeadExchange).with("dlxDead.#");
    

发送测试死信消息

/**
     * 发送测试死信消息:
     *  1. 过期时间
     *  2. 长度限制
     *  3. 消息拒收
     */
    @Test
    public void testDlx()
        //1. 测试过期时间,死信消息
//        rabbitTemplate.convertAndSend("dlxExchange","dlx.haha","我是一条消息,我会死吗?");

        //2. 测试长度限制后,消息死信
       /* for (int i = 0; i < 20; i++) 
            rabbitTemplate.convertAndSend("dlxExchange","dlx.haha","我是一条消息,我会死吗?");
        */

        //3. 测试消息拒收
        rabbitTemplate.convertAndSend("dlxExchange","dlx.haha","我是一条消息,我会死吗?");
    

测试消息拒收,需要涉及到消费者代码(消费者业务层里书写监听)

/**
     * 监听器监听正常队列
     * 打印结果:
     * 我是一条消息,我会死吗?
     * 处理业务逻辑...
     * 出现异常,拒绝接受
     * @param message
     * @param channel
     * @throws Exception
     */
    @RabbitListener(queues = "dlx.queue")
    public void consumerDlxQueue(Message message, Channel channel) throws Exception
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try 
            //1.接收转换消息
            System.out.println(new String(message.getBody()));
            //2. 处理业务逻辑
            System.out.println("处理业务逻辑...");
            int i = 3/0;//出现错误
            //3. 手动签收
            channel.basicAck(deliveryTag,true);
         catch (Exception e) 
            //e.printStackTrace();
            System.out.println("出现异常,拒绝接受");
            //4.拒绝签收,不重回队列 requeue=false
            //这样才能路由到死信队列去
            channel.basicNack(deliveryTag,true,false);
        
    

死信队列小结

1. 死信交换机和死信队列和普通的没有区别
2. 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列

消息成为死信的三种情况:

1. 队列消息长度到达限制;
2. 消费者拒接消费消息,并且不重回队列;
3. 原队列存在消息过期设置,消息到达超时时间未被消费;

1.6、延迟队列 

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

首先在生产端的RabbitConfig文件里定义队列和交换机,并绑定。

//正常队列
    //绑定,设置正常队列过期时间为30分钟
    @Bean
    public Queue orderQueue()
        //正常队列关联死信交换机
        Map<String, Object> args = new HashMap<>(4);
        // x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", "dlxDeadOrderExchange");
        // x-dead-letter-routing-key  这里声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key","dlxDeadOrder.hehe");
        // x-message-ttl  声明队列的TTL 声明队列的过期时间
        args.put("x-message-ttl",10000);
        //设置队列的长度限制 max-length
        args.put("x-max-length",10);
        return QueueBuilder.durable("Order.queue").withArguments(args).build();
    
    //dlx死信队列
    @Bean
    public Queue dlxDeadOrderQueue()
        return new Queue("dlxDeadOrder.queue");
    
    //通配符交换机
    @Bean
    public TopicExchange orderExchange()
        return new TopicExchange("orderExchange");
    
    //dlx死信通配符交换机
    @Bean
    public TopicExchange dlxDeadOrderExchange()
        return new TopicExchange("dlxDeadOrderExchange");
    
    //绑定正常队列
    @Bean
    public Binding bindingOrderQueue(Queue orderQueue,TopicExchange orderExchange)
        return BindingBuilder.bind(orderQueue).to(orderExchange).with("order.#");
    
    //绑定dlx死信队列
    @Bean
    public Binding bindingDxlDeadOrderQueue(Queue dlxDeadOrderQueue,TopicExchange dlxDeadOrderExchange)
        return BindingBuilder.bind(dlxDeadOrderQueue).to(dlxDeadOrderExchange).with("dlxDeadOrder.#");
    

测试延迟队列发送消息

    @Test
    public  void testDelay() throws InterruptedException 
        //1.发送订单消息。 将来是在订单系统中,下单成功后,发送消息
        rabbitTemplate.convertAndSend("order_exchange","order.msg","订单信息:id=1,time=2019年8月17日16:41:47");
 
        //2.打印倒计时10秒
        for (int i = 10; i > 0 ; i--) 
            System.out.println(i+"...");
            Thread.sleep(1000);
        
    

消费者代码(消费者业务层里书写监听)

/**
     * 监听的是延迟队列里的死信队列!
     * @param message
     * @param channel
     * @throws Exception
     */
    @RabbitListener(queues = "dlxDeadOrder.queue")
    public void consumerDlxDeadOrder(Message message, Channel channel) throws Exception
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try 
            //1.接收转换消息
            System.out.println(new String(message.getBody()));
            //2. 处理业务逻辑
            System.out.println("处理业务逻辑...");
            System.out.println("根据订单id查询其状态...");
            System.out.println("判断状态是否为支付成功");
            System.out.println("取消订单,回滚库存....");
            //3. 手动签收
            channel.basicAck(deliveryTag,true);
         catch (Exception e) 
            //e.printStackTrace();
            System.out.println("出现异常,拒绝接受");
            //4.拒绝签收,不重回队列 requeue=false
            channel.basicNack(deliveryTag,true,false);
        
    

延迟队列小结

1. 延迟队列 指消息进入队列后,可以被延迟一定时间,再进行消费。

2. RabbitMQ没有提供延迟队列功能,但是可以使用 : TTL + 死信队列来实现延迟队列效果。

以上是关于RabbitMQ高级特性(2)的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMq高级特性之延迟队列 通俗易懂 超详细 内含案例

UOS 4.0 - RabbitMQ 高级特性(测试篇)

RabbitMQ高级特性

RabbitMq高级特性之死信队列 通俗易懂 超详细 内含案例

消息队列 RabbitMq高级特性

RabbitMQ RabbitMQ高级特性