RabbitMQ消息和队列的持久化

Posted 我叫向同学

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ消息和队列的持久化相关的知识,希望对你有一定的参考价值。

持久化的概念

上一篇测试已经看到了如何处理任务不丢失的情况,但是如何保障当 RabbitMQ 服务停掉以后消
息生产者发送过来的消息不丢失。默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列
和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事: 我们需要将队列和消息都标
记为持久化。

队列持久化

前几次的测试我们创建的队列都是非持久化的, rabbitmq 如果重启的化,该队列就会被删除掉,如果
要队列实现持久化 需要在声明队列的时候把 durable 参数设置为持久化。

但是需要注意的就是如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新创建一个持久化的队列,不然就会出现错误。

错误信息展示

以下为RabbitMQ可视化界面中持久化与非持久化队列的显示区的区别

这个时候即使重启 rabbitmq 队列也依然存在

消息持久化

要想让消息实现持久化需要在消息生产者修改代码, MessageProperties.PERSISTENT_TEXT_PLAIN 添加这个属性。

将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是
这里依然存在当消息刚准备存储在磁盘的时候 但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。如果需要更强有力的持久化策略,后面的文章将会揭晓。

/**
 * 生产者
 * 消息手动应答
 */
public class Produce 

    public static final String QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception 

        Channel channel = RabbitMqUtils.getChannel();
        // 声明队列
        boolean durable = true; // 列队持久化
        channel.queueDeclare(QUEUE_NAME,durable,false,false,null);

        //从控制台当中接收信息
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext())
            String message = scanner.next();
            // MessageProperties.PERSISTENT_TEXT_PLAIN 消息持久化  (保存在内存中)
            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
            System.out.println("PRODUCE发送消息完成:"+message);
        

    


不公平分发

在前面的文章学习到 RabbitMQ 分发消息采用的轮训分发,但是在某种场景下这种策略并不是很好,比方说有两个消费者在处理任务,其中有个消费者 1 处理任务的速度非常快,而另外一个消费者 2处理速度却很慢,这个时候我们还是采用轮训分发的化就会到这处理速度快的这个消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下其实就不太好,但是RabbitMQ 并不知道这种情况它依然很公平的进行分发。为了避免这种情况,我们可以设置参数channel.basicQos(1);

应该在消费者 接收消息之前设置


我们来启动来测试一下

测试案例

连个消费者 Q1 处理消息速度快和Q2处理消息慢,生产者开始发送消息,现在还是轮询的么??


结论

意思就是如果这个任务我还没有处理完或者我还没有应答你,你先别分配给我,我目前只能处理一个任务,然后 rabbitmq 就会把该任务分配给没有那么忙的那个空闲消费者,当然如果所有的消费者都没有完成手上任务,队列还在不停的添加新任务,队列有可能就会遇到队列被撑满的情况,这个时候就只能添加新的 worker 或者改变其他存储任务的策略。

注意事项。
测试过程中,好几次没有成功为什么呢?
1。自动应答改为false。
2。 channel.basicQos(prefetchCount,false); 代码位置的原因,要在消费的前面。

测试代码


import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.xiang.rabbitmq.util.RabbitMqUtils;

import java.util.concurrent.TimeUnit;

/**
 * 消费者
 * 消息手动应答,消息不丢失,消息重新入队
 */
public class Consumer01 

    public static final String QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception 

        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("【Q2等待接收消息】");
        DeliverCallback deliverCallback = (String consumerTag, Delivery message)->
            try 
                TimeUnit.SECONDS.sleep(30); // 睡30s
             catch (InterruptedException e) 
                e.printStackTrace();
            
            System.out.println("接收到的消息:" + new String(message.getBody(),"UTF-8"));

            /**
             * 手动应答
             * 1.long deliveryTag, 消息标记
             * 2 boolean multiple  是否批量应答
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        ;
        CancelCallback cancelCallback = (consumerTag)->
            System.out.println("消费者取消了消费消息");
        ;

        //消费端设置 不公平分发
        int prefetchCount = 1;
        channel.basicQos(prefetchCount,false);
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);

    


预取值

本身消息的发送就是异步发送的,所以在任何时候, channel 上肯定不止只有一个消息另外来自消费者的手动确认本质上也是异步的。因此这里就存在一个未确认的消息缓冲区,因此希望开发人员能限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。这个时候就可以通过使用 basic.qos 方法设置“预取计数”值来完成的。 该值定义通道上允许的未确认消息的最大数量。一旦数量达到配置的数量,RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认,例如,假设在通道上有未确认的消息 5、 6、 7, 8,并且通道的预取计数设置为 4,此时 RabbitMQ 将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被 ack。比方说 tag=6 这个消息刚刚被确认 ACK, RabbitMQ 将会感知这个情况到并再发送一条消息。消息应答和 QoS 预取值对用户吞吐量有重大影响。通常,增加预取将提高向消费者传递消息的速度。 虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理的消息的数量也会增加,从而增加了消费者的 RAM 消耗(随机存取存储器 ) - 应该小心使用具有无限预处理的自动确认模式或手动确认模式,消费者消费了大量的消息如果没有确认的话,会导致消费者连接节点的内存消耗变大,所以找到合适的预取值是一个反复试验的过程,不同的负载该值取值也不同 100 到 300 范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。预取值为 1 是最保守的。当然这将使吞吐量变得很低,特别是消费者连接延迟很严重的情况下,特别是在消费者连接等待时间较长的环境中。对于大多数应用来说,稍微高一点的值将是最佳的。

以上是关于RabbitMQ消息和队列的持久化的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ:工作队列模式

[RabbitMQ]队列持久化

RabbitMQ学习-- 队列持久化

RabbitMQ

RabbitMQ消息队列怎样做到服务宕机或重启消息不丢失

如何保证消息队列的可靠性传输?