RabbitMQ持久化机制

Posted 琦彦

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ持久化机制相关的知识,希望对你有一定的参考价值。

RabbitMQ持久化机制

rabbitmq的持久化分为队列持久化、消息持久化和交换器持久化。 不管是持久化的消息还是非持久化的消息都可以被写入到磁盘。

持久化消息会同时写入磁盘和内存(加快读取速度),非持久化消息会在内存不够用时,将消息写入磁盘(一般重启之后就没有了)。

  • 持久化

  • 非持久化

持久化

队列持久化

(1)队列的持久化是在定义队列时的durable参数来决定的,当durable为true时,才代表队列会持久化。

Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//第二个餐胡设置为true,代表队列持久化
channel.queueDeclare("queue.persistent.name", true, false, false, null);

关键的是第二个参数设置为true,即durable=true.

Channel类中queueDeclare的完整定义如下:

    /**
     * Declare a queue
     * @see com.rabbitmq.client.AMQP.Queue.Declare
     * @see com.rabbitmq.client.AMQP.Queue.DeclareOk
     * @param queue the name of the queue
     * @param durable true if we are declaring a durable queue (the queue will survive a server restart)
     * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
     * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
     * @param arguments other properties (construction arguments) for the queue
     * @return a declaration-confirm method to indicate the queue was successfully declared
     * @throws java.io.IOException if an error is encountered
     */
    Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                 Map<String, Object> arguments) throws IOException;

参数说明:

  1. queue:queue的名称
  2. exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次申明它的连接可见,并在连接断开时自动删除。这里需要注意三点:1. 排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一连接创建的排他队列;2.“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同;3.即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的,这种队列适用于一个客户端发送读取消息的应用场景。
  3. autoDelete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。

queueDeclare相关的有4种方法,分别是:

Queue.DeclareOk queueDeclare() throws IOException;
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                 Map<String, Object> arguments) throws IOException;
void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                            Map<String, Object> arguments) throws IOException;
Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;

其中需要说明的是queueDeclarePassive(String queue)可以用来检测一个queue是否已经存在。如果该队列存在,则会返回true;如果不存在,就会返回异常,但是不会创建新的队列。

消息的持久化

如果将queue的持久化标识durable设置为true,则代表是一个持久的队列,那么在服务重启之后,也会存在,因为服务会把持久化的queue存放在硬盘上,当服务重启的时候,会重新什么之前被持久化的queue。**队列是可以被持久化,但是里面的消息是否为持久化那还要看消息的持久化设置。**也就是说,重启之前那个queue里面还没有发出去的消息的话,重启之后那队列里面是不是还存在原来的消息,这个就要取决于发生着在发送消息时对消息的设置了。

如果要在重启后保持消息的持久化必须设置消息是持久化的标识。

设置消息的持久化:

//通过传入MessageProperties.PERSISTENT_PLAIN就可以实现消息持久化
channel.basicPublish("exchange.persistent", "persistent", MessageProperties.PERSISTENT_TEXT_PLAIN, "persistent_test_message".getBytes());

这里的关键是:MessageProperties.PERSISTENT_TEXT_PLAIN
首先看一下basicPublish的方法:

void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
        throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
        throws IOException;
  • exchange表示exchange的名称
  • routingKey表示routingKey的名称
  • body代表发送的消息体
  • 有关mandatory和immediate的详细解释可以参考:RabbitMQ之mandatory和immediate.

这里关键的是BasicProperties props这个参数了,这里看下BasicProperties的定义:

public BasicProperties(
            String contentType,//消息类型如:text/plain
            String contentEncoding,//编码
            Map<String,Object> headers,
            Integer deliveryMode,//1:nonpersistent 2:persistent
            Integer priority,//优先级
            String correlationId,
            String replyTo,//反馈队列
            String expiration,//expiration到期时间
            String messageId,
            Date timestamp,
            String type,
            String userId,
            String appId,
            String clusterId)

这里的deliveryMode=1代表不持久化,deliveryMode=2代表持久化。

上面的实现代码使用的是MessageProperties.PERSISTENT_TEXT_PLAIN,那么这个又是什么呢?

public static final BasicProperties PERSISTENT_TEXT_PLAIN =
    new BasicProperties("text/plain",
                        null,
                        null,
                        2,
                        0, null, null, null,
                        null, null, null, null,
                        null, null);

可以看到这其实就是讲deliveryMode设置为2的BasicProperties的对象,为了方便编程而出现的一个东东。
换一种实现方式:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(2);
AMQP.BasicProperties properties = builder.build();
channel.basicPublish("exchange.persistent", "persistent",properties, "persistent_test_message".getBytes());

设置了队列和消息的持久化之后,当broker服务重启的之后,消息依旧存在。

单只设置队列持久化,重启之后消息会丢失;单只设置消息的持久化,重启之后队列消失,既而消息也丢失。

exchange的持久化

上面阐述了队列的持久化和消息的持久化,如果不设置exchange的持久化对消息的可靠性来说没有什么影响,**但是同样如果exchange不设置持久化,那么当broker服务重启之后,exchange将不复存在,那么既而发送方rabbitmq producer就无法正常发送消息。**这里建议,同样设置exchange的持久化。exchange的持久化设置也特别简单,方法如下:

Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,
                                   Map<String, Object> arguments) throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange,
                                          String type,
                                          boolean durable,
                                          boolean autoDelete,
                                          boolean internal,
                                          Map<String, Object> arguments) throws IOException;
void exchangeDeclareNoWait(String exchange,
                           String type,
                           boolean durable,
                           boolean autoDelete,
                           boolean internal,
                           Map<String, Object> arguments) throws IOException;
Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;

一般只需要:``channel.exchangeDeclare(exchangeName, “direct/topic/header/fanout”, true);`即在声明的时候讲durable字段设置为true即可。

交换器也需要在定义的时候设置持久化标志,否则在RabbitMQ重启后将丢失。

//durable为true则开启持久化
Exchange.DeclareOk exchangeDeclare(String exchange,String type,boolean durable)throws IOException
  • exchange 交换器的名称
  • type 交换器的类型,常见的如 fanout direct topic
  • durable: 设置是否持久 durab 设置为 true 表示持久化, 反之是非持久 。持久化可以将交换器存盘,在服务器重启 的时候不会丢失相关信息。

2 内存控制

内存告警

当内存使用超过配置的阈值或者磁盘剩余控件低于配置的阈值时,RabbitMQ会暂时阻塞客户端的连接,并停止接收从客户端发来的消息,以避免服务崩溃,客户端与服务端的心跳检测也会失败。

当出现内存告警时,可以通过管理命令临时调整内存大小。

RabbitMQctl set_vm_memory_high_watermark <fraction>

fraction为内存阈值,RabbitMQ默认是0.4,表示当RabbitMQ使用的内存超过总内存的40%时,就会产生告警并阻塞所有生产则连接。
通过此命令修改的阈值在RabbitMQ重启之后将会失效,通过修改配置文件的方式设置的阈值才会永久有效,但需要重启服务才会生效。

配置文件:RabbitMQ.conf

#相对值,也就是前面的fraction,建议设置在0.4~0.66之间,不要超过0.7
vm_memory_high_watermark.relative=0.4
#绝对值,单位为KB,MB,GB,对应的临时命令是:RabbitMQctl set_vm_memory_high_watermark absolute <value>
#vm_memory_high_watermark.absolute=1GB

内存换页

在某个broker节点触及内存并阻塞生产者之前,他会尝试将队列内存中的消息换页存储到磁盘以释放内存空间。持久化和非持久化的消息都会被转储到磁盘中,其中持久化的消息本身就在磁盘中有一个备份,所以这里会将持久化的消息从内存中清除掉。

默认情况下,在内存达到内存阈值的50%时会进行换页操作。也就是说,在默认的内存阈值40%的情况下,当内存超过0.4*0.5=0.2时会经行换页动作。

内存换页可以通过在配置文件中设置来进行调整。

vm_memory_high_watermark_paging_ratio=0.75

以上配置将会在RabbitMQ内存使用率达到30%(假设内存阈值时0.4)时进行换页动作,并在40%时阻塞生产者。当``vm_memory_high_watermark_paging_ratio`的值大于1时,相当于禁用了换页功能。

3 磁盘控制

磁盘告警

当磁盘剩余空间低于设置的阈值时,RabbitMQ同样会阻塞生产者,这样可以避免因非持久化的消息持续换页而耗尽磁盘空间导致服务崩溃。

默认情况下,磁盘的阈值是50M,表示当磁盘剩余空间低于50M时,会阻塞生产者并停止内存中消息的换页动作。这个阈值的设置可以减小,但不能完全消除因磁盘耗尽而导致崩溃的可能性。比如在两次磁盘空间检测期间内,磁盘空间从大于50M被耗尽到0M。

备注:一个相对谨慎的做法是将磁盘阈值设置为与操作系统所显示的内存大小一致。

磁盘限制

通过以下命令可以临时调整磁盘阈值

#设置具体大小,单位为KB/MB/GB
RabbitMQctl set_disk_free_limit <disk_limit>
#设置相对值,建议取值为1.0~2.0(相对于内存的倍数,如内存大小是8G,若为1.0,则表示磁盘剩余8G时,阻塞)
RabbitMQctl set_disk_free_limit mem_relative <fraction>

对应的配置文件配置如下:
disk_free_limit.relative=2.0
#disk_free_limit_absolute=50MB

QA

消息什么时候需要持久化?

根据 官方博文 的介绍,RabbitMQ在两种情况下会将消息写入磁盘:

  1. 消息本身在publish的时候就要求消息写入磁盘;
  2. 内存紧张,需要将部分内存中的消息转移到磁盘;

消息什么时候会刷到磁盘?

  1. 写入文件前会有一个Buffer,大小为1M(1048576),数据在写入文件时,首先会写入到这个Buffer,如果Buffer已满,则会将Buffer写入到文件(未必刷到磁盘);
  2. 有个固定的刷盘时间:25ms,也就是不管Buffer满不满,每隔25ms,Buffer里的数据及未刷新到磁盘的文件内容必定会刷到磁盘;
  3. 每次消息写入后,如果没有后续写入请求,则会直接将已写入的消息刷到磁盘:使用Erlang的receive x after 0来实现,只要进程的信箱里没有消息,则产生一个timeout消息,而timeout会触发刷盘操作。

消息在磁盘文件中的格式

消息保存于$MNESIA/msg_store_persistent/x.rdq文件中,其中x为数字编号,从1开始,每个文件最大为16M(16777216),超过这个大小会生成新的文件,文件编号加1。消息以以下格式存在于文件中:

<<Size:64, MsgId:16/binary, MsgBody>>

MsgId为RabbitMQ通过rabbit_guid:gen()每一个消息生成的GUID,MsgBody会包含消息对应的exchange,routing_keys,消息的内容,消息对应的协议版本,消息内容格式(二进制还是其它)等等。

文件何时删除?

当所有文件中的垃圾消息(已经被删除的消息)比例大于阈值(GARBAGE_FRACTION = 0.5)时,会触发文件合并操作(至少有三个文件存在的情况下),以提高磁盘利用率。

publish消息时写入内容,ack消息时删除内容(更新该文件的有用数据大小),当一个文件的有用数据等于0时,删除该文件。

将queue,exchange, message等都设置了持久化之后就能保证100%保证数据不丢失了?

答案是否定的。

首先,从consumer端来说,如果这时autoAck=true,那么当consumer接收到相关消息之后,还没来得及处理就crash掉了,那么这样也算数据丢失,这种情况也好处理,只需将autoAck设置为false(方法定义如下),然后在正确处理完消息之后进行手动ack(channel.basicAck).

String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

其次,关键的问题是消息在正确存入RabbitMQ之后,还需要有一段时间(这个时间很短,但不可忽视)才能存入磁盘之中,RabbitMQ并不是为每条消息都做fsync的处理,可能仅仅保存到cache中而不是物理磁盘上,在这段时间内RabbitMQ broker发生crash, 消息保存到cache但是还没来得及落盘,那么这些消息将会丢失。那么这个怎么解决呢?

首先可以引入RabbitMQ的mirrored-queue即镜像队列,这个相当于配置了副本,当master在此特殊时间内crash掉,可以自动切换到slave,这样有效的保障了HA, 除非整个集群都挂掉,这样也不能完全的100%保障RabbitMQ不丢消息,但比没有mirrored-queue的要好很多,很多现实生产环境下都是配置了mirrored-queue的。

还有要在producer引入事务机制或者Confirm机制来确保消息已经正确的发送至broker端

RabbitMQ的可靠性涉及producer端的确认机制、broker端的镜像队列的配置以及consumer端的确认机制,要想确保消息的可靠性越高,那么性能也会随之而降,鱼和熊掌不可兼得,关键在于选择和取舍。

参考链接:

https://blog.RabbitMQ.com/posts/2011/01/RabbitMQ-backing-stores-databases-and-disks

https://www.RabbitMQ.com/persistence-conf.html

以上是关于RabbitMQ持久化机制的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ持久化机制

RabbitMQ详解

RabbitMQ基础——和——持久化机制

如何保证消息队列的可靠性传输?

深入浅出 RabbitMQ

RabbitMQ面试题:如何确保消息不丢失? --- 2022-04-03