04 RabbitMQ进阶2之集群和延迟投递

Posted IT BOY

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了04 RabbitMQ进阶2之集群和延迟投递相关的知识,希望对你有一定的参考价值。

目录

Pt2 集群与高可用

Pt2.1 普通集群模式

Pt2.2 镜像队列模式

Pt3 Spring AMQP使用

Pt3.1 Spring RabbitMQ

(1) 代码案例

(2) 核心对象

Pt3.2 SpringBoot RabbitMQ

(1) 代码案例

(2) 参数说明

Pt4 使用规范

Pt4.1 资源管理

Pt4.2 命名规范

Pt4.3 消息持久化

Pt4.4 连接数控制

Pt5 延迟投递实现

Pt5.1 基于死信队列

Pt5.2 基于延迟插件

Pt5.3 死信队列

Pt5.4 流量控制

(1) 服务端限流

(2) 消费端限流


Pt2 集群与高可用

        RabbitMQ本身是基于Erlang编写,Erlang语言天生具备分布式特性(通过同步Erlang集群各节点的erlang.cookie来实现)。因此,RabbitMQ天然支持集群。集群是保证可靠性的一种方式,同时可以通过水平扩展以达到增加消息吞吐量能力的目的。

        在高并发场景下,单台MQ服务器处理能力总会有瓶颈,通过集群增加MQ服务器节点,可以线型的提升MQ消息处理能力。 另外,节点故障往往不可避免,单节点一旦发生故障,将会导致服务不可用,集群可以有效的避免这个问题。集群中某一节点发生故障,客户端可以连接到其它MQ服务器,不会导致服务完全不可用。

RabbitMQ集群有两种节点类型:磁盘节点(Disc Node)和内存节点(RAM Node)。

  • 磁盘节点:将元数据(VHost、Exchange名称/属性/类型、Queue名称/属性、绑定关系)存放在磁盘中。如果不指定类型,默认是磁盘节点。集群中至少需要一个磁盘节点来负责持久化数据,否则当集群宕机时,节点在内存中的数据将全部丢失,将会导致丢失全部元数据。

  • 内存节点:将元数据存放在内存中。内存节点读写速度快,一般用来连接客户端程序,磁盘节点则用于备份数据和恢复数据。内存节点会将磁盘节点的磁盘上,用于故障时找到磁盘上的数据进行恢复。同时,如果是持久化消息,会同时存储在内存和磁盘节点上。

RabbitMQ有两种集群模式:普通集群模式和镜像队列模式。

Pt2.1 普通集群模式

普通集群模式下,不同节点之间只会互相同步元数据(VHost,Exchange,Queue,绑定关系),不会互相同步消息。

 这种模式和Redis集群很相似,如果客户端连接到节点1上发送消息,但是消息要发送的Exchange和Queue是在节点3上,消息最终会被转发到节点3上存储。

普通集群模式的优势是可以线型的增加性能和存储容量,并且可以减少存储和同步消息数据的网络开销。但是缺点也很明显,就是每个节点上的消息存储实际上还是单机的,如果节点挂了将导致数据丢失。

如果你是对数据一致性没有很高的要求,能够容忍可能偶尔发生的宕机造成的数据丢失(有补偿方案、人工差错或者丢失无所谓了),可以使用普通集群模式,性能非常高。但是如果对数据一致性要求比较高的场景,比如支付,这种模式的可靠性是达不到要求的,所以要选择镜像队列模式。

Pt2.2 镜像队列模式

镜像队列模式非常容易理解,节点间除了元数据的同步外,还会同步消息,相当于每个节点存储的都是同样的数据,如同镜像一样。

镜像节点可用性非常高,任何节点宕机或数据丢失都不会造成数据不一致,因为在其他节点存储了数据副本。但是节点间消息同步带来了存储和性能损耗。

集群模式可以通过控台、命令行或者API进行设置,这里就不介绍了,比较简单,百度下就出来了。


Pt3 Spring AMQP使用

Pt3.1 Spring RabbitMQ

Spring AMQP 是Spring基于AMQP的消息收发处理的封装,直接看案例比较直观。

(1) 代码案例

基于Spring 对AMQP封装的RabbitMQ案例。

pom.xml引入依赖

 <!-- RabbitMQ相关依赖 -->
 <dependency>
   <groupId>org.springframework.amqp</groupId>
   <artifactId>spring-rabbit</artifactId>
   <version>2.3.6</version>
 </dependency>
 <dependency>
   <groupId>com.fasterxml.jackson.core</groupId>
   <artifactId>jackson-databind</artifactId>
   <version>2.9.5</version>
 </dependency>

applicationContext.xml配置RabbitMQ

 <?xml version="1.0" encoding="UTF-8"?>
 <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:context="http://www.springframework.org/schema/context"
        xmlns:rabbit="http://www.springframework.org/schema/rabbit"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
 ​
     <!-- 包扫描根路径 -->
     <context:component-scan base-package="com.example"/>
 ​
     <!-- RabbitMQ配置 -->
     <!-- AMQP连接工厂接口 -->
     <rabbit:connection-factory id="connectionFactory" virtual-host="/" username="guest" password="guest"
                                host="121.4.33.15" port="5672"/>
     <!-- 封装了RabbitMQ的基础管理操作,包括元数据的声明和删除等 -->
     <rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory"/>
     <!-- 声明队列 -->
     <rabbit:queue name="SPRING_RABBIT_QUEUE" durable="true" auto-delete="false" exclusive="false"
                   declared-by="rabbitAdmin"/>
     <!-- 声明直连类型的EXCHANGE -->
     <rabbit:direct-exchange name="SPRING_DIRECT_CHANGE" durable="true" auto-delete="false" declared-by="rabbitAdmin">
         <rabbit:bindings>
             <!-- EXCHANGE绑定QUEUE -->
             <rabbit:binding queue="SPRING_RABBIT_QUEUE" key="spring"></rabbit:binding>
         </rabbit:bindings>
     </rabbit:direct-exchange>
     <!-- 消息序列化处理 -->
     <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>
     <!-- RabbitTemplate是AMQPTemplate的实现,用来简化消息的收发操作 -->
     <rabbit:template id="rabbitTemplate" exchange="SPRING_DIRECT_CHANGE" connection-factory="connectionFactory"
                      message-converter="jsonMessageConverter"/>
 ​
     <bean id="messageReceiver" class="com.example.mq.rabbitmq.springfw.SpringConsumer" />
     <rabbit:listener-container connection-factory="connectionFactory" message-converter="jsonMessageConverter">
         <rabbit:listener ref="messageReceiver" queues="SPRING_RABBIT_QUEUE" />
     </rabbit:listener-container>
 </beans>

生产者代码

 @Controller
 public class SpringProducer 
     private Logger logger = LoggerFactory.getLogger(SpringProducer.class);
 ​
     @Autowired
     @Qualifier("rabbitTemplate")
     private AmqpTemplate rabbitTemplate;
 ​
     @RequestMapping(value = "/spring_rabbit_demo")
     public void sendMessage(Object message) 
         // amqpTemplate 默认交换机 SPRING_DIRECT_CHANGE
         rabbitTemplate.convertAndSend("FirstKey", "[Direct,FirstKey] " + message);
         rabbitTemplate.convertAndSend("SecondKey", "[Direct,SecondKey] " + message);
     
 

消费者代码

 public class SpringConsumer implements MessageListener 
     private Logger logger = LoggerFactory.getLogger(SpringConsumer.class);
 ​
     public void onMessage(Message message) 
         logger.info("The first consumer received message : " + message.getBody());
     
 

(2) 核心对象

对象描述
ConnectionFactorySpring AMQP 的连接工厂接口,用于创建连接。CachingConnectionFactory 是ConnectionFactory 的一个实现类。能指定Confirm Type和开启生产者确认模式和创建连接等
RabbitAdminRabbitAdmin 是 AmqpAdmin 的实现,封装了对 RabbitMQ 的基础管理操作,比如对交换机、队列、绑定的声明和删除等。
MessageMessage 是 Spring AMQP 对消息的封装。
RabbitTemplateSpring AMQP提供了一个发送和接收消息的操作模板类AmqpTemplate。 AmqpTemplate它定义包含了发送和接收消息等的一些基本的操作功能。RabbitTemplate是AmqpTemplate的一个实现。
MessageListenerMessageListener 是 Spring AMQP 异步消息投递的监听器接口,它只有一个方法 onMessage,作用类似于 Java API 中的 Consumer。
MessageListenerContainerMessageListenerContainer可以理解为MessageListener的容器,一个Container只有一个 Listener,但是可以生成多个线程使用相同的 MessageListener 同时消费消息。 Container 可以管理 Listener 的生命周期,可以用于对于消费者进行配置。
MessageListenerContainerFactory可以在消费者上指定,当我们需要监听多个 RabbitMQ 的服务器的时候,指定不同的 MessageListenerContainerFactory。
MessageConvertor在使用RabbitTemplate的convertAndSend()方法发送消息时,会使用MessageConvertor进行消息的序列化。 AmqpTemplate 定义提供了各种发送和接收委拖给MessageConverter转化对象消息的方法。MessageConverter 本身比较简单,它提供了消息对象的转化,可将object转化成Message 对象,或者将Message 对象转化成Object对象。它提供了默认的SimpleMessageConverter实现,以及第三方的MessageConverter,如Jackson2JsonMessageConverter,MarshallingMessageConverter等,来处理消息与对象之间的转换。

Pt3.2 SpringBoot RabbitMQ

(1) 代码案例

pom.xml引入依赖包

 <!-- SpringBoot AMQP依赖-->
 <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>

配置RabbitMQ

 @Configuration
 public class RabbitConfig 
 ​
     @Bean
     public ConnectionFactory connectionFactory() throws Exception 
         CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
         cachingConnectionFactory.setUri("amqp://guest:guest@121.4.33.15:5673");
         return cachingConnectionFactory;
     
 ​
     @Bean
     public RabbitAdmin amqpAdmin(ConnectionFactory connectionFactory) 
         RabbitAdmin admin = new RabbitAdmin(connectionFactory);
         admin.setAutoStartup(true);
         return admin;
     
 ​
     @Bean
     public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) 
         return new RabbitTemplate(connectionFactory);
     
 ​
     @Bean
     public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) 
         SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
         container.setConsumerTagStrategy(new ConsumerTagStrategy() 
             public String createConsumerTag(String queue) 
                 return null;
             
         );
         return container;
     
 ​
     // 定义交换机
     @Bean("springBootExchange")
     public TopicExchange getSpringBootExchange() 
         return new TopicExchange("SPRING_BOOT_EXCHANGE");
     
 ​
     // 定义队列
     @Bean("springBootQueue")
     public Queue getSpringBootQueue() 
         Map<String, Object> args = new HashMap<String, Object>();
         args.put("x-message-ttl", 6000);
         Queue queue = new Queue("SPRING_BOOT_QUEUE", false, false, true, args);
         return queue;
     
 ​
     // 定义交换机和队列的绑定关系
     @Bean
     public Binding bindSecond(@Qualifier("springBootQueue") Queue queue, @Qualifier("springBootExchange") TopicExchange exchange) 
         return BindingBuilder.bind(queue).to(exchange).with("springboot");
     
 

定义生产者

 @Controller
 public class SpringBootProducer 
 ​
     @Autowired
     RabbitTemplate rabbitTemplate;
 ​
     @RequestMapping(value = "/basic")
     public String send() 
         rabbitTemplate.convertAndSend("SPRING_BOOT_EXCHANGE", "springboot", "这是一条测试消息。");
         System.out.println("消息发送成功");
         return "PROCESS_OK";
     
 

定义消费者

 @Component
 @RabbitListener(queues = "SPRING_BOOT_QUEUE")
 public class SpringBootConsumer 
 ​
     @RabbitHandler
     public void consumer(String message) throws IOException 
         System.out.println("SpringBoot Consumer reveive message: " + message);
     
 

(2) 参数说明

SpringBoot中配置RabbitMQ参数如下。

  • base

    • spring.rabbitmq.host: 服务Host

    • spring.rabbitmq.port: 服务端口

    • spring.rabbitmq.username: 登陆用户名

    • spring.rabbitmq.password: 登陆密码

    • spring.rabbitmq.virtual-host: 连接到rabbitMQ的vhost

    • spring.rabbitmq.addresses: 指定client连接到的server的地址,多个以逗号分隔(优先取addresses,然后再取host)

    • spring.rabbitmq.requested-heartbeat: 指定心跳超时,单位秒,0为不指定;默认60s

    • spring.rabbitmq.publisher-confirms: 是否启用【发布确认】

    • spring.rabbitmq.publisher-returns: 是否启用【发布返回】

    • spring.rabbitmq.connection-timeout: 连接超时,单位毫秒,0表示无穷大,不超时

    • spring.rabbitmq.parsed-addresses:

  • ssl

    • spring.rabbitmq.ssl.enabled: 是否支持ssl

    • spring.rabbitmq.ssl.key-store: 指定持有SSL certificate的key store的路径

    • spring.rabbitmq.ssl.key-store-password: 指定访问key store的密码

    • spring.rabbitmq.ssl.trust-store: 指定持有SSL certificates的Trust store

    • spring.rabbitmq.ssl.trust-store-password: 指定访问trust store的密码

    • spring.rabbitmq.ssl.algorithm: ssl使用的算法,例如,TLSv1.1

  • cache

    • spring.rabbitmq.cache.channel.size: 缓存中保持的channel数量

    • spring.rabbitmq.cache.channel.checkout-timeout: 当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channel

    • spring.rabbitmq.cache.connection.size: 缓存的连接数,只有是CONNECTION模式时生效

    • spring.rabbitmq.cache.connection.mode: 连接工厂缓存模式:CHANNEL 和 CONNECTION

  • listener

    • spring.rabbitmq.listener.simple.auto-startup: 是否启动时自动启动容器

    • spring.rabbitmq.listener.simple.acknowledge-mode: 表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto

    • spring.rabbitmq.listener.simple.concurrency: 最小的消费者数量

    • spring.rabbitmq.listener.simple.max-concurrency: 最大的消费者数量

    • spring.rabbitmq.listener.simple.prefetch: 指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量.

    • spring.rabbitmq.listener.simple.transaction-size: 指定一个事务处理的消息数量,最好是小于等于prefetch的数量.

    • spring.rabbitmq.listener.simple.default-requeue-rejected: 决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)

    • spring.rabbitmq.listener.simple.idle-event-interval: 多少长时间发布空闲容器时间,单位毫秒

    • spring.rabbitmq.listener.simple.retry.enabled: 监听重试是否可用

    • spring.rabbitmq.listener.simple.retry.max-attempts: 最大重试次数

    • spring.rabbitmq.listener.simple.retry.initial-interval: 第一次和第二次尝试发布或传递消息之间的间隔

    • spring.rabbitmq.listener.simple.retry.multiplier: 应用于上一重试间隔的乘数

    • spring.rabbitmq.listener.simple.retry.max-interval: 最大重试时间间隔

    • spring.rabbitmq.listener.simple.retry.stateless: 重试是有状态or无状态

  • template

    • spring.rabbitmq.template.mandatory: 启用强制信息;默认false

    • spring.rabbitmq.template.receive-timeout: receive() 操作的超时时间

    • spring.rabbitmq.template.reply-timeout: sendAndReceive() 操作的超时时间

    • spring.rabbitmq.template.retry.enabled: 发送重试是否可用

    • spring.rabbitmq.template.retry.max-attempts: 最大重试次数

    • spring.rabbitmq.template.retry.initial-interval: 第一次和第二次尝试发布或传递消息之间的间隔

    • spring.rabbitmq.template.retry.multiplier: 应用于上一重试间隔的乘数

    • spring.rabbitmq.template.retry.max-interval: 最大重试时间间隔


Pt4 使用规范

Pt4.1 资源管理

实际应用中,通常生产者和消费者可能不是一个应用,甚至不是一个团队,那这些资源应该谁来创建谁来管理呢?

基本原则就是,谁使用谁创建,谁污染谁治理。

生产者需要往Exchange发送消息,所以通常生产者负责创建Exchange。

消费者需要从Queue获取Exchange下发的消息,所以消费者负责创建Queue和绑定关系。

Pt4.2 命名规范

MQ元数据不要硬编码,尽可能放在配置中心,可以采用如下命名规范:

虚拟机:XXX_VHOST

交换机:XXX_EXCHANGE

队列:XXX_QUEUE

Pt4.3 消息持久化

发送给MQ的消息持久化到数据库中,并维护发送状态,可以实现消息的可追溯和防重控制,结合定时任务可以灵活的处理消息。

  1. 将消息持久化到数据库;

  2. 定时任务批量扫描未处理的消息,发送到MQ,更新发送状态;

  3. 如果消息处理失败,需要重发,则更新数据库中消息状态为未处理。

Pt4.4 连接数控制

消息处理创建的连接有不小的性能消耗,如果发送大批量的数据,可以进行消息合并处理,比如通过JSON数据类型,可以将很多消息合并到一起,减少生产者和消费者从MQ服务器处理消息时产生的连接数。

建议单条消息不要超过4M(4096KB)。


Pt5 延迟投递实现

Pt5.1 基于死信队列

场景:业务系统发起订单支付时,如果30分钟后没有收到支付结果,就自动发起关单处理。

分析:因为RabbitMQ本身不支持延迟发送消息的功能,所以要通过别的机制实现消息的延迟投递。最简单的是通过数据库处理,支付请求发送之后,同步入数据库,批处理每分钟扫描数据库中的订单,发现30分钟前的订单还没有支付时,就发起关单的MQ请求。这种方式比较简单,也能实现业务需求,但是通常支付订单非常多,每分钟扫描一次在大数据量的情况下,对数据库造成的压力也比较大,更有可能会影响其他业务操作。那能不能用RabbitMQ来实现呢。

当然可以,虽然RabbitMQ本身不支持延迟发送消息,但是可以曲线救国。我们知道,队列中的消息可以设置超时时间,超过时间没有消费就会过期,被丢弃或者被投入死信队列,那可以这样处理:

  1. 消息发送时指定30分钟的超时时间:有两种方式可以指定超时时间。一种是设置队列的过期时间,这个时间是对队列中所有消息生效的。另一种是设置单条消息的过期时间,这个设置只会对当前消息有效。如果同时指定了队列的过期时间和消息的过期时间,那么时间小的会生效。

  2. 指定队列的死信交换机DLX和死信队列DLQ:消息过期后,会被投递到死信队列,死信交换机和死信队列实际上也只是普通的交换机和队列。

  3. 消费死信队列的消息:消息被转发到DLQ之后,就可以配置消费者进行消费,从而达到延迟投递的效果。

案例如下。

​​​​​​​ // 生产者
 public class DirectProducer 
     public static void main(String[] args) 
         // 创建消息信道
         Connection connection = RabbitMQUtils.getConnection();
         Channel channel = RabbitMQUtils.createChannel(connection);
 ​
         try 
             // 创建死信交换机和死信队列。
             channel.exchangeDeclare("DLX_PAY_EXCHANGE", "topic", false, false, false, null);
             channel.queueDeclare("DLQ_PAY_QUEUE", false, false, false, null);
             // 绑定DLX和DLQ,无条件转发。
             channel.queueBind("DLQ_PAY_QUEUE", "DLX_PAY_EXCHANGE", "#");
             System.out.println("死信队列和死信交换机创建成功。");
 ​
             // 创建正常的交换机。
             channel.exchangeDeclare("NORMAL_PAY_EXCHANGE", "topic", false, false, false, null);
             // 创建正常的队列,并制定死信交换机。
             Map<String, Object> arguments = new HashMap<>();
             arguments.put("x-dead-letter-exchange", "DLX_PAY_EXCHANGE"); // 指定DLX
             arguments.put("x-expires", 30000); // 30秒
             channel.queueDeclare("NORMAL_PAY_QUEUE", true, false, false, arguments);
             // 绑定交换机和队列
             channel.queueBind("NORMAL_PAY_QUEUE", "NORMAL_PAY_EXCHANGE", "dopay");
             System.out.println("普通队列和普通交换机创建成功。");
 ​
             // 将消息信道设置为普通确认模式
             channel.confirmSelect();
             // 往正常交换机发送消息
             channel.basicPublish("NORMAL_PAY_EXCHANGE", "dopay", MessageProperties.PERSISTENT_TEXT_PLAIN,
                 "发送支付请求".getBytes());
             // 消息确认,但是因为NORMAL_PAY_QUEUE没有消费者,30秒之后消息会被投递到死信交换机。
             if (channel.waitForConfirms()) 
                 System.out.println("消息发送成功,当前时间:" + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")).format(new Date()));
             
          catch (Exception ex) 
             ex.printStackTrace();
             // 消息发送异常,需要考虑重发
         
     
 

 // 消费者
 public class DirectConsumer1 
     public static void main(String[] args) 
         Connection connection = RabbitMQUtils.getConnection();
         Channel channel = RabbitMQUtils.createChannel(connection);
 ​
         try 
             // 创建消费者,绑定队列
             Consumer consumer = new DefaultConsumer(channel) 
                 @SneakyThrows @Override
                 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                     byte[] body) throws IOException 
                     System.out.println(
                         "收到消息:" + new String(body, "UTF-8") + "时间:" + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"))
                             .format(new Date()));
                 
             ;
 ​
             while (true) 
                 // 消费者直接消费死信队列的消息
                 channel.basicConsume("DLQ_PAY_QUEUE", true, consumer);
                 TimeUnit.SECONDS.sleep(1);
             
          catch (Exception ex) 
             ex.printStackTrace();
         
     
 

但是上面这种方式,也有一些缺点:

  1. 如果统一用队列的TTL,当衰减梯队非常多的情况下,需要建立非常多的DLX和DLQ;

  2. 如果为每个消息设置单独过期时间,那么这个消息队列不可以和正常的混用,因为队列中消息等待过期会影响其他消息的投递,因为是FIFO,下一条消息要等待当前消息过期才能继续投递。

  3. 时间控制的不是很精准,存在时间误差。

当然,RabbitMQ也提供了更加简单和有效的办法,就是插件:rabbitmq_delayed_message_exchange插件。

Pt5.2 基于延迟插件

安装rabbitmq_delayed_message_exchange延迟消息插件,可以轻松实现延迟消息的处理。

插件的具体安装和使用,这里不做详细说明,网上案例很多。

Pt5.3 死信队列

通常在以下场景,消息会进入死信队列:

  1. 消息消费不及时,过期进入死信队列。

  2. 消息被消费者拒绝,并且未设置重回队列。

  3. 队列达到最大长度,超过了MaxLength或者MaxLengthBytes,最先入队的消息会被发送到死信队列。

Pt5.4 流量控制

如果MQ生产者的速度远远大于消费者速度,就会产生大量消息堆积,占用内存等系统资源,导致服务端处理性能下降,直至MQ崩溃。为了防止这种情况,需要对消息进行限流,限流可以基于服务端和客户端分别设置。

(1) 服务端限流

队列长度限制

通过限制队列长度控制消息数量。

  • x-max-length:队列中最大存储的消息数量,超过这个数据,队头的消息会被丢弃。

  • x-max-length-bytes:队列中存储的最大消息容量(单位bytes),超过这个容量,队头的消息会被丢弃。

通过限制队列长度,在消息有堆积的情况下有意义,不过会删除先入队的消息,跟传统限流的理解不一样。

内存限制

RabbitMQ会在启动时监测机器的物理内存数值。默认当MQ占用40%以上内存时,会主动抛出一个内存警告并阻塞所有连接,可以通过配置文件修改这个比例。

磁盘限制

磁盘限制和内存一样,当磁盘剩余可用空间低于指定数值时,会触发流控措施,默认50MB。可以通过参数配置修改数值。

(2) 消费端限流

在push模式下, 消息会被主动推送到消费者,如果消费者处理比较慢,消息就会在应用中大量堆积,最终打爆我们的应用程序。这是我们最不想看到的,所以除了服务端限流,还要进行客户端限流。

设置prefetch count,当超过这个数值的消息未被确认,RabbitMQ会停止投递新的消息给消费端,直到有消息被确认后继续投递。

 本章到此结束,更多关于RabbitMQ的文章,请点击如下专栏连接。

00. 消息队列专栏开篇_Java是世界上最好的语言-CSDN博客这部分主要讲解关于消息队列的内容,包括RabbitMQ、Kafka和RocketMQ,目前主要整理了一些基本用法和组件介绍,后续会逐渐深入介绍些原理部分。目录https://blog.csdn.net/moonlight821/article/details/118068777

以上是关于04 RabbitMQ进阶2之集群和延迟投递的主要内容,如果未能解决你的问题,请参考以下文章

04 RabbitMQ进阶2之集群和延迟投递

04 RabbitMQ进阶2之集群和延迟投递

RabbitMQ之消息可靠性死信交换机惰性队列及集群

RabbitMQ之消息可靠性死信交换机惰性队列及集群

RRabbitMQ05_消息可靠性投递ACK限流处理TTL队列死信交换机延迟队列

RabbitMQ学习(中)——交换机死信队列和延迟队列