RabbitMQ----消费端限流TTL和使用代码生成交换机队列

Posted 未来.....

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ----消费端限流TTL和使用代码生成交换机队列相关的知识,希望对你有一定的参考价值。

RabbitMQ----消费端限流和TTL

先想一想什么是消费端限流?
  假设一个场景,首先我们rabbitmq服务器上面有上万条没有处理的消息,我们随便打开一个消费者客户端,会出现下面情况:巨量消息瞬间全部推送过来,但是我们当个客户端没有办法进行处理这么多的数据,可能会造成服务器宕机。
上面这个假设可能不好理解,看看下面这个例子。

  说一家餐馆,你去吃饭,你本来只能吃10碗,老板做了100碗,你本来是吃完一碗叫老板再给你端一碗,但是老板一下子把所有的都给你,说让你吃完,这时候吃的掉吗?此时就引入了一个新名词,自动确认信息,老板认为你能吃完,就全部给你,如果此时你是手动确认信息,你吃完一碗告诉老板,老板给你端上来一碗,每次只吃掉一碗,那么这样就可以解决老板一次性给你端上100百碗了。

此时引出来今天的内容,消费端限流

1、消费端限流简介

  rabbitmq提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(给channel或者consume设置Qos值)未被确认前,不进行消费新消息。

1、1 消费端限流代码:

老规矩:创建springboot项目和两个子项目

1、2 创建application.yml文件

server:
  port: 8001

spring:
  rabbitmq:
    host: 192.168.31.33
    listener:
      simple:
        # none表示自动确认模式
        # manual表示手动确认模式
        acknowledge-mode: manual
        # 设置每次消费的个数。
        prefetch: 3

1、3 创建消费者

@Component
public class MyListener 
    @RabbitListener(queues = "ban_queue_direct02")
    public void listeber(Message message, Channel channel)throws Exception
        //获取信息的标识
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        byte[] body = message.getBody();
        String msg = new String(body);
        System.out.println(msg);
        try
            System.out.println("处理业务逻辑");
            //消费端手动确认消息
            //long deliveryTag, 表示的标识。
            // boolean multiple:是否允许多确认
            //为了测试一次只收到3条信息,收到信息不确认即可
            //channel.basicAck(deliveryTag,true);//从队列中删除该消息。
        catch (Exception e)
            //(long deliveryTag, boolean multiple, boolean requeue: 是否让队列再次发送该消息。
            channel.basicNack(deliveryTag,true,true);
        
    

1、4 生产者:

@Test
    public void testReturn() 
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() 
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) 
                //只要交换机到队列失败时才会触发该方法。失败之后可以继续发送也可以取消相应的业务功能。
                System.out.println("消息从交换机到队列失败" + returnedMessage.getReplyText());
            
        );
        for (int i = 0; i < 10; i++) 
            rabbitTemplate.convertAndSend("ban_exchange_direct",  "error", "hello confirm2");
        
    

1、5 测试结果

可以发现现在消费者每次只接收到了3条信息,测试成功。
里面测试我加了输出语句,所以一次会出现两条。

2、 TTL简介

RabbitMQ可以对消息和队列设置TTL. 目前有两种方法可以设置。
  第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。
  第二种方法是对消息进行单独设置,每条消息TTL可以不同。如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就称为dead message, 消费者将无法再收到该消息。

2、1 测试代码:

这里只需要测试生产者,消费者用不到。

 //为队列设置过期时间  相当于该队列里面的消息都由过期时间
    @Test
    public void testSend()
        rabbitTemplate.convertAndSend("myexchange","","hello xiaoxi");
    

    //设置消息的过期时间 如果由设置了队列的过期时间 也设置了消息的过期时间 谁的过期时间短 以谁为准。
    //该消息必须在头部才能从队列中移除。
    @Test
    public void testSend02()

        for(int i=0;i<10;i++) 
            if(i==3)
                MessagePostProcessor messagePostProcessor = new MessagePostProcessor() 
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException 
                        message.getMessageProperties().setExpiration("20000");
                        return message;
                    
                ;
                //String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor
                rabbitTemplate.convertAndSend("myexchange", "", "hello xiaoxi"+i, messagePostProcessor);
            else 

                //String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor
                rabbitTemplate.convertAndSend("myexchange", "", "hello xiaoxi"+i);
            
        
    

3、 通过代码创建队列和交换机以及绑定。

没什么讲的,直接上代码:

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig 
    private final String exchange_name="myexchange";
    private final String queue_name="myqueue";
    //创建交换机对象
    @Bean
    public Exchange exchange()
        Exchange exchange= ExchangeBuilder.fanoutExchange(exchange_name).durable(true).build();
        return exchange;
    

    //创建队列
    @Bean(value = "queue")
    public Queue queue()
        Queue queue= QueueBuilder.durable(queue_name).withArgument("x-message-ttl",20000).build();
        return queue;
    

    //绑定交换机和队列
    @Bean
    public Binding binding(Queue queue, Exchange exchange)
        return BindingBuilder.bind(queue).to(exchange).with("").noargs();
    

在测试单元调用代码测试:

@Test
    public void send()
        rabbitTemplate.convertAndSend("myexchange","","我饿了");
    

以上是关于RabbitMQ----消费端限流TTL和使用代码生成交换机队列的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ之消息模式(下)

RabbitMQ消费端限流策略

RabbitMq高级特性之消费端限流 通俗易懂 超详细 内含案例

消费端限流策略

面试官:RabbitMQ怎么实现消费端限流

RabbitMQ 如何对消费端限流?