10.RabbitMQ持久化

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了10.RabbitMQ持久化相关的知识,希望对你有一定的参考价值。

参考技术A 持久化可以提高RabbitMQ的可靠性,以防在RabbitMQ重启、关闭、宕机下的数据丢失。

交换器的持久化是在声明交换器的时候,将durable设置为true。如果交换器不设置持久化,那么在RabbitMQ交换器服务重启之后,相关的交换器信息会丢失,不过消息不会丢失,但是不能将消息发送到这个交换器。

队列的持久化在声明队列的时候,将durable设置为true。如果队列不设置持久化,那么RabbitMQ交换器服务重启之后,相关的队列信息会丢失,同时队列中的消息也会丢失。

消息的持久化是在BasicProperties中设置deliveryMode设置为2。队列的持久化能保证本身的元数据不会因为异常而丢失,但是不能保证内部所存在的消息不会丢失。要确保消息不丢失,需要将消息持久化。
如果将所有的消息都进行持久化操作,这样会严重影响RabbitMQ的性能。写入磁盘的速度比写入内存的速度慢很多。所以要在可靠性和吞吐量之间做权衡。

将交换器、队列和消息都进行持久化操作后,也并不能保证消息一定不会丢失。
1.对于消费者来说,如果在订阅消息的时候,将autoAck设置为true,那么消费者接收到消息后,还没有处理,就出现了异常挂掉了,此时,队列中已经将消息删除,消费者不能够在收到消息。这种情况可以将autoAck设置为false,进行手动确认。
2.在持久化后的消息存入Rabbit'MQ之后,还需要一段时间才能存入磁盘。RabbitMQ并不会为每条消息都进行同步存盘,可能仅仅是保存到操作系统缓存之中而不是物理磁盘。如果在这段时间,服务器宕机或者重启,消息还没来得及保存到磁盘当中,从而丢失。对于这种情况,可以引入RabiitMQ镜像队列机制。

RabbitMQ常见问题

RabbitMQ持久化问题

首先我们需要知道两个熟悉:
1.durable: 是否开启持久化,true是持久化队列(默认),false非持久化队列
2.autoDelete: 是否为临时队列,true是临时队列当服务停止运行的时候会将队列进行删除,false是非临时队列(默认)

如果配置了durable:true(队列持久化) autoDelete:true(临时队列),那么服务关闭的时候队列也会消失,会造成消息丢失

如果配置了durable:false(队列非持久化) autoDelete:false(非临时队列) 那么当rabbitMQ服务重启过后队列也会消失,同样会造成消息丢失

所以为了避免消息丢失需要将durable熟悉设置为true,autoDelete熟悉设置为false(当然这是默认配置好了)


RabbitMQ消息可靠生产

解决方案流程图如下

修改配置文件

# 开启消息发送确认机制
spring:
	rabbitmq:
		publisher-confirms: true

实现代码如下

	@Autowired
    private RabbitTemplate rabbitTemplate;
    // 交换机名称
    @Value("$log.direct")
    private String exchange;


	/**
	* 消息生产
	*/
	public void logMessage(String routingKey)
        // 设置回调确认对象
        rabbitTemplate.setConfirmCallback(confirmCallback);
        // 消息内容
        String msg = "路由模式,时间:" + new Date();
        // 相关数据
        CorrelationData correlationData = new CorrelationData();
        // 设置id
        correlationData.setId("1001");
        // 发送消息
        rabbitTemplate.convertAndSend(exchange, routingKey, msg, correlationData);
    

    // 定义回调确认对象
    private RabbitTemplate.ConfirmCallback confirmCallback =
                new RabbitTemplate.ConfirmCallback() 
        // 消息发送完毕后,回调此确认方法
        @Override
        public void confirm(CorrelationData correlationData, boolean ack,
                            String cause) 
            // CorrelationData: 相关数据
            // 生产消息的时候设置好的id 在这边可以进行获取出来
            correlationData.getId();
            // ack: 是否确认收到(true已确认收到,false未确认收到)
            // case: 失败原因
            System.out.println("ack: " + ack);
            System.out.println("cause = " + cause);
            // 如果ack为true,代表MQ已经收到消息。
            if (ack)
                System.out.println("消息已投递成功!");
            else
            
                System.out.println("消息已投递失败: " + correlationData.getId());
                // 失败的消息业务处理代码
                // ...
            
        
    ;

RabbitMQ消费者异常死循环问题

当消费者获取数据进行消费的时候,必然会处理相关的业务。由于消息数据的不正确必然会导致报错与消息消费失败,就会一直重新消费。
本人亲自踩过的坑,当消息一直在重复消费结果导致到最后面一台消费者所部属的服务器直接挂了😂

那么如何避免消息重复消费呢?
我们只需要修改配置文件,设置最大重试次数即可

# 配置rabbitmq
spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 解决消息死循环问题-启用重试
          max-attempts: 3 # 最大重试3次(默认)

RabbitMQ如何保障消息可靠消费

需要开启手动消息确认ACK: 如果在处理消息的过程中,消费者在消费消息的时候服务器、网络、出现故障挂掉了,那可能这条正在处理的消息就没有完成,数据就会丢失。为了确保消息不会丢失,RabbitMQ支持消息确认ACK。

需要在配置文件开启手动确认ACK

# 配置rabbitmq
spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual # 开启手动ack消息确认

实现代码

@Component
public class ErrorConsumer 

    /**
     * 消息监听方法
     * bindings: 配置队列 通过路由key绑定到 交换机
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "$direct.error.queue"), // 队列
            key = "info","error", // 路由key
            exchange = @Exchange(name = "$log.direct",
                                 type = ExchangeTypes.DIRECT))) // 交换机
    public void handlerMessage(String msg, Channel channel,
                               Message message)
        try 
            System.out.println("================");
            // 制造异常
            int i = 10 / 0;
            System.out.println("error--->接受到的消息是:" + msg);
            // 手动ack确认
            //参数1:deliveryTag:消息唯一传输ID
            //参数2:multiple:true: 手动批量处理,false: 手动单条处理
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        catch (Exception ex)
        
    


RabbitMQ死信队列

消费失败

接着上面的问题,如果这个时候消费者确实失败了又不能一直堵塞消息通道那么该怎么处理?

  • 死信队列 + 消费预警 + 记录到redis数据库

1.修改yml配置

# 配置rabbitmq
spring:
  rabbitmq:
    listener:
      simple:
        default-requeue-rejected: false # 设置为false,会重发消息到死信队列

2.定义死信队列消费者

@Component
public class DeadLetterConsumer 

    /** 消息监听 */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "dlx-queue"),
            exchange = @Exchange(value = "dlx.exchange",
                        type = ExchangeTypes.TOPIC),
            key = "#")
    )
    public void handlerMessage(String message)
        // todo 存到redis / mysql 都可以
        System.out.println("死信队列接收到的消息:" + message);
    

3.业务消息消费之

/**
     * 消息监听方法
     * bindings: 配置队列 通过路由key绑定到 交换机
     */
    @RabbitListener(bindings = @QueueBinding(
          value = @Queue(name = "$direct.error.queue", arguments = 
                  @Argument(name="x-dead-letter-exchange", value = "dlx.exchange"),// 死信队列交换机
                  @Argument(name="x-dead-letter-routing-key", value = "xxx")), // 死信队列路由队列
            key = "info","error", // 路由key
            exchange = @Exchange(name = "$log.direct",
                                 type = ExchangeTypes.DIRECT))) // 交换机
    public void handlerMessage(String msg, Channel channel,
                               Message message)
        try 
            System.out.println("================");
            // 制造异常
            int i = 10 / 0;
            System.out.println("error--->接受到的消息是:" + msg);
            // 手动ack确认
            // 参数1:deliveryTag:消息唯一传输ID
            // 参数2:multiple:true: 手动批量处理,false: 手动单条处理
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        catch (Exception ex)
            // 如果真得出现了异常,我们采用消息重投
            // 获取redelivered,判断是否为重投: false没有重投,true重投
            Boolean redelivered = message.getMessageProperties().getRedelivered();
            System.out.println("redelivered = " + redelivered);
            try 
                // 判断是否为重新消费
                if (redelivered)  // 重新消费
                    /**
                     * 拒绝确认,从队列中删除该消息,防止队列阻塞(消息堆积)
                     * boolean requeue: false不重新入队列(丢弃消息)
                     */
                    channel.basicReject(message.getMessageProperties()
                                        .getDeliveryTag(), false);
                    System.out.println("消息已重新存入死信队列了。。。");
                 else  // 第一次消费

                    /**
                     * 消息重投,重新把消息放回队列中
                     * boolean multiple: 单条或批量
                     * boolean requeue: true重回队列
                     */
                    channel.basicNack(message.getMessageProperties()
                            .getDeliveryTag(), false, true);

                
            catch (Exception e)
                e.printStackTrace();
            
        
    

消息过期TTL

可以设置消息超过多少毫秒还没消费就进入死信队列

队列存储界限

设置队列中存储消息的最大数量

以上是关于10.RabbitMQ持久化的主要内容,如果未能解决你的问题,请参考以下文章

Redis 进阶 -- 持久化(RDB持久化AOF持久化RDB-AOF混合持久化无持久化)

Redis 进阶 -- 持久化(RDB持久化AOF持久化RDB-AOF混合持久化无持久化)

RabbitMQ持久化机制

Redis持久化策略剖析

Redis持久化

Redis RDB持久化和AOF持久化详细讲解