RabbitMQ笔记08消息队列RabbitMQ之防止消息丢失的三种方式(生产者消息确认消费者消息确认消息持久化)

Posted 朱友斌

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ笔记08消息队列RabbitMQ之防止消息丢失的三种方式(生产者消息确认消费者消息确认消息持久化)相关的知识,希望对你有一定的参考价值。

这篇文章,主要介绍消息队列RabbitMQ之防止消息丢失的三种方式(生产者消息确认、消费者消息确认、消息持久化)。

目录

一、防止消息丢失

1.1、消息确认机制(生产者)

(1)生产者丢失消息

(2)生产者消息确认机制

1.2、消息确认机制(消费者)

(1)消费者丢失消息

(2)消费者消息确认机制

1.3、消息持久化(RabbitMQ)

(1)RabbitMQ丢失消息

(2)消息持久化机制


一、防止消息丢失

RabbitMQ消息队列,在使用的时候,可能会存在消息丢失的情况,所谓的消息丢失就是生产者发送的消息没办法被消费者正确的消费,消息队列中导致消息丢失的地方有三个,分别是:

  • 第一种情况:生产者发送的消息没有正确的发送到RabbitMQ里面,导致发送的消息丢失。
  • 第二种情况:消费者从RabbitMQ消费消息时候,消费失败,但是RabbitMQ认为消费成功,从而删除了消息。
  • 第三种情况:RabbitMQ中保存的消息还没有被消费者消费,此时RabbitMQ服务宕机,导致内存中的消息丢失。

1.1、消息确认机制(生产者)

(1)生产者丢失消息

生产者丢失消息,是指:当生产者发送消息给RabbitMQ的时候,此时消息发送失败了,并且生产者又没有重新发送这一条消息,所以这个时候,生产者这一条失败的消息就丢失了。

既然是生产者发送消息失败导致这一条消息丢失的,那么我们在处理这个丢失消息问题的时候,就可以这样做:当生产者消息发送失败之后,可以让生产者再次发送这一条消息,这里就有一个问题啦,那就是生产者怎么知道消息有没有发送成功???

RabbitMQ给我们提供了一个机制,即:发布确认机制,大致思想是:当生产者将消息发送到RabbitMQ之后,并且RabbitMQ正确接收到消息并将其放入Queue队列里面时,RabbitMQ会返回一个ACK标识给生产者,生产者接收到ACK标识就可以认为消息发送成功啦;如果消息接收失败,RabbitMQ会返回一个NACK标识,表示接收失败。

(2)生产者消息确认机制

生产者消息确认机制,上一篇文章已经介绍了(【RabbitMQ笔记07】消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式),这里就不再重复。

1.2、消息确认机制(消费者)

(1)消费者丢失消息

如果生产者已经将消息正确的发送到RabbitMQ里面了,消费者从Queue队列里面获取消息消费时候,如果消费失败,那么此时就会导致这一条消息丢失,这是因为,默认情况下,RabbitMQ将消息分发给消费者之后,消费者接收到消息时候,就会返回一个ACK标识给消息队列RabbitMQ,此时RabbitMQ就会将这一条消息从Queue队列里面删除,但是这种情况下,消费者是否正确将这条消息消费了,RabbitMQ是不知道的,所以这就有可能导致丢失。

如何解决消费者丢失消息???

  • 既然丢失消息是因为消费者消费失败,并且RabbitMQ把消息删除了,那么我们就可以开启手动确认的方式来告诉RabbitMQ,消费者是否正确的消费消息,是否可以将消息从Queue队列里面删除了。

(2)消费者消息确认机制

  • 消费者进行消息确认,需要关闭自动确认,将【basicConsume()】方法的第二个参数设置为【false】。
  • 消息成功消费之后,主动调用【basicAck()】方法,返回ACK标识给RabbitMQ。
package com.rabbitmq.demo.dropmsg;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @version 1.0.0
 * @Date: 2023/2/25 16:30
 * @Copyright (C) ZhuYouBin
 * @Description: 消息消费者
 */
public class Consumer 
    public static void main(String[] args) 
        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2、设置连接的 RabbitMQ 服务地址
        factory.setHost("127.0.0.1"); // 默认就是本机
        factory.setPort(5672); // 默认就是 5672 端口
        // 3、获取连接
        Connection connection = null; // 连接
        Channel channel = null; // 通道
        try 
            connection = factory.newConnection();
            // 4、获取通道
            channel = connection.createChannel();
            // 5、声明 Exchange,如果不存在,则会创建
            String exchangeName = "exchange_dropmsg_2023";
            channel.exchangeDeclare(exchangeName, "direct");
            // 6、指定需要操作的消息队列,如果队列不存在,则会创建
            String queueName = "queue_dropmsg_2023";
            channel.queueDeclare(queueName, false, false, false, null);
            // 7、绑定 Exchange 和 Queue, 接收 routingKey = "info" 的消息
            channel.queueBind(queueName, exchangeName, "key_2023");
            // 8、消费消息
            Channel finalChannel = channel;
            DeliverCallback callback = new DeliverCallback() 
                public void handle(String s, Delivery delivery) throws IOException 
                    // 接收消息
                    System.out.println("这是接收的消息:" + new String(delivery.getBody()));
                    // TODO 消费者正确消费消息之后,主动返回 ACK 标识
                    finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);
                
            ;
            // TODO 这第二个参数修改为 false,表示消费者需要手动发送 ACK 标识给 RabbitMQ(默认是true)
            channel.basicConsume(queueName, false, callback, i->);
         catch (Exception e) 
            e.printStackTrace();
        
    

1.3、消息持久化(RabbitMQ)

(1)RabbitMQ丢失消息

上面介绍了两种丢失消息的情况,分别是生产者和消费者丢失消息,还有一种丢失消息的情况,那就是RabbitMQ消息队列将消息丢失了。假设,现在存在这一种情况,生产者已经正确将消息发送到RabbitMQ里面,正准备将消息发送给消费者的时候,此时RabbitMQ服务宕机了,导致RabbitMQ中的消息丢失了(默认情况下,RabbitMQ是将消息保存在内存中的),由于内存中的数据断电即失,所以这就导致消息丢失情况。

如何解决RabbitMQ出现的消息丢失问题呢???

  • 既然RabbitMQ是将消息保存在内存中的,那么为了避免消息丢失,可以将内存中的消息保存到磁盘文件里面,这样即使RabbitMQ宕机了,重新启动的时候也可以从磁盘文件里面读取消息到内存里面。

(2)消息持久化机制

  • 在调用【queueDeclare()】方法,创建Queue队列的时候,设置第二个参数等于【true】,表示消息允许持久化。
  • 生产者调用【basicPublish()】方法发送消息的时候,设置消息属性等于【MessageProperties.PERSISTENT_TEXT_PLAIN】,表示文本持久化。
// 第二个参数设置为true,表示开启持久化消息
channel.queueDeclare("Queue队列名称", true, false, false, null);

// 生产者发送消息时候,设置消息属性是文本持久化
channel.basicPublish("Exchange交换机名称", "Queue队列名称", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

到此,RabbitMQ消息队列防止消息丢失的三种方式介绍完啦。

综上,这篇文章结束了,主要介绍消息队列RabbitMQ之防止消息丢失的三种方式(生产者消息确认、消费者消息确认、消息持久化)。

以上是关于RabbitMQ笔记08消息队列RabbitMQ之防止消息丢失的三种方式(生产者消息确认消费者消息确认消息持久化)的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ消息队列笔记

RabbitMQ消息队列笔记

RabbitMQ消息队列笔记

消息队列之RabbitMQ

RabbitMQ学习笔记

RabbitMQ学习笔记