如何保证消息队列的可靠性传输?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何保证消息队列的可靠性传输?相关的知识,希望对你有一定的参考价值。

参考技术A

消息丢失分成三种情况,可能出现生产者、RabbitMQ、消费者。

首先要确保写入 RabbitMQ 的消息别丢,消息队列通过 请求确认机制 ,保证消息的可靠传输。生产开启 comfirm 模式,在生产者开启 comfirm 模式之后,每次发送消息都会分配一个唯一的id。

RabbitMQ 丢失数据,需要开启 RabbitMQ 持久化,开启持久化之后,生产者发送的消息会持久化到磁盘,RabbitMQ 就算是挂了,恢复启动后也会读取之前存储的数据。
还有一种少见的情况,就是RabbitMQ还没将消息持久化,自己就挂了。这种情况需要生产者那边的确认机制结合起来。只有消息被持久化到磁盘以后,才会回传 ack 消息。生产者没有接收到 ack,也可以自己重发。

消费丢失数据,刚消费到 RabbitMQ 发送的数据,消费进程就挂了,重启进程后,RabbitMQ 也不会重新发送消息。
这个时候需要关闭 RabbitMQ 关闭自动的 ack 机制。每次在消费端处理后,再在程序里做 ack 确认,这样的话,如果没有处理完,就没有 ack 确认,那 RabbitMQ 就认为你还没有处理完,这个时候 RabbitMQ 会重新发送消息给消费者。

如果觉得文章对你有帮助的话,请点个赞吧!

消息队列专题(高级特性篇):RabbitMQ 如何保证消息的可靠性投递传输和消费

我们使用 RabbitMQ 进行消息处理一般都需要保证消息的可靠性,而消息的可靠性又可以根据消息的不同处理阶段分为可靠性投递、传输消费

本篇博客将针对这三种情况介绍相应的设计方案,首先来看一下消息的可靠性投递。


消息的可靠性投递

1. 消息投递模式

消息可靠性投递,是指保证生产者能够把消息 100% 发送到消息队列中,生产者 Producer 为我们提供了两种消息投递模式:Confirm 确认模式Return 退回模式 ,这两种模式是保障消息可靠性投递的核心,先来了解下这两种模式。

Confirm 确认模式

是指生产者投递消息后,如果 MQ Broker 收到消息,会给生产者一个应答。生产者接收到应答,就可以确定这条消息被正常发送到了 Broker,具体流程如下图所示。

Confirm 确认模式的代码实现

生产者代码实现:

public class Producer 
    public static void main(String[] args) throws Exception 
        //1 创建 ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("主机地址");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        //2 获取 Connection
        Connection connection = connectionFactory.newConnection();
        //3 通过 Connection 创建 Channel
        Channel channel = connection.createChannel();
        //4 指定消息投递模式: 消息确认模式
        channel.confirmSelect();
        String exchangeName = "test_confirm_exchange";
        String routingKey = "confirm.save";
        //5 发送消息
        String msg = "Hello RabbitMQ Send confirm message!";
        channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
        //6 添加确认监听
        channel.addConfirmListener(new ConfirmListener() 
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException 
                System.err.println("-------no ack!-----------");
            

            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException 
                System.err.println("-------ack!-----------");
            
        );
    

消费者代码实现:

public class Consumer 
    public static void main(String[] args) throws Exception 
        //1 创建 ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("主机地址");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        //2 获取 Connection
        Connection connection = connectionFactory.newConnection();
        //3 通过 Connection 创建 Channel
        Channel channel = connection.createChannel();
        String exchangeName = "test_confirm_exchange";
        String routingKey = "confirm.#";
        String queueName = "test_confirm_queue";
        //4 声明交换机和队列,然后进行绑定设置,最后制定路由 Key
        channel.exchangeDeclare(exchangeName, "topic", true);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        //5 创建消费者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, queueingConsumer);
        while (true) 
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.err.println("消费端: " + msg);
        
    

通过代码可以看出,Confirm 确认模式实现的关键就是添加了以下两个步骤:
1 在 channel 上开启确认模式:channel.confirmSelect()
2 在 channel 上添加监听:channel.addConfirmListener 监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理。

Return 退回模式

是指在发送消息的时候,如果消息没有被正确投递(可能是当前的 Exchange 不存在或指定的 RoutingKey 路由不到),那么此时 RabbitMQ 会返回给生产者一个信号,信号中包括消息不可达的原因,以及消息本身的内容。

有一个关键的配置项:mandatory,如果为 true,则 Return Listener 监听器会接收到路由不可达的消息,进行后续处理,如果为 false,那么 Broker 端会自动删除该消息。

所以这里我们需要把这个配置项的属性配置为 true,Return Listener 监听器才可以正常工作。

spring.rabbitmq.template.mandatory = true

Return 退回模式的代码实现

生产者代码实现:

public class Producer 
    public static void main(String[] args) throws Exception 
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("主机地址");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        String exchange = "test_return_exchange";
        // 正常情况下可以到达
        String routingKey = "return.save";
        String routingKeyError = "abc.save";
        String msg = "Hello RabbitMQ Return Message";
        channel.addReturnListener(new ReturnListener() 
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange,
                                     String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException 
                System.err.println("---------handle  return----------");
                System.err.println("replyCode: " + replyCode);
                System.err.println("replyText: " + replyText);
                System.err.println("exchange: " + exchange);
                System.err.println("routingKey: " + routingKey);
                System.err.println("properties: " + properties);
                System.err.println("body: " + new String(body));
            
        );
        // 消息投递成功,会被消费者所消费
        // channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
        // 消息不可达,将触发 ReturnListener
        channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
    

消费者代码实现:

public class Consumer 
    public static void main(String[] args) throws Exception 
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("主机地址");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "test_return_exchange";
        String routingKey = "return.#";
        String queueName = "test_return_queue";

        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);

        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, queueingConsumer);

        while(true)
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.err.println("消费者: " + msg);
        

    

2.消息可靠性投递实现方案

可靠性投递方案一:消息入库,消息状态记录

从图中可以得出,消息的可靠性投递主要需要保证四个方面:

  • 保障消息成功发送;
  • 保障消息队列节点成功接收消息;
  • 消息发送端收到消息队列服务的确认应答;
  • 完善的消息补偿机制;

具体实现流程:

  • Step 1:把业务数据 BIZ 和消息记录数据 MSG 存储到 DB;
    这里的消息记录数据 MSG 中包括了一个重要的消息状态字段 status,初始值设置为 0,表示消息尚未得到确认。

  • Step 2:消息发送者给 MQ Broker 发送数据;

  • Step 3:MQ Broker 收到消息后,给生产者 Producer 一个消息应答;

  • Step 4:生产者 Producer 的应答监听器监听到消息,更新消息记录数据 MSG 状态;
    就是把消息记录数据 MSG 的确认状态由 “未确认(status=0)” 更新为 “已确认(status=1)”;

  • Step 5-6:设置定时任务,获取消息记录数据 MSG 中的超时未确认数据进行重新发送;

  • Step 7:重试次数内,重复执行流程;超过重试次数,更新数据库状态;
    将消息记录数据 MSG 的确认状态由 “未确认(status=0)” 更新为 “确认失败(status=2)”;

方案一可以 100% 保证消息的可靠性投递,但是两次数据入库,并不适合在高并发场景使用。

可靠性投递方案二:消息延迟发送,做二次检查,回调检查

  • Step 1:业务记录入库,业务记录正常入库后向 MQ 发送消息;

  • Step 2:第二次消息延迟 3 - 5 分钟,向 MQ 发送消息;

  • Step 3:消费端监听指定队列,对收到的消息进行处理;

  • Step 4:处理完成后,发送 confirm 消息,也就是回送响应,但是这里响应不是一般的 ACK,而是重新生成一条消息,投递到 MQ 中;

  • Step 5:Callback 服务监听消费者发送的 confirm 消息,如果正常完成,消息入库;
    这里的 Callback service 是一个单独的服务,它扮演了方案一的存储消息的 DB 角色,它通过 MQ 去监听下游服务发送的 confirm 消息,如果 Callback service 收到 confirm 消息,那么就对消息做持久化存储操作;

  • Step 6:Callback 服务处理延迟消息;
    几分钟之后延迟消息发送到 MQ,Callback service 监听到对应队列中的延迟消息后,就去检查 DB 中是否存在消息:如果存在,则不需要做任何处理;如果不存在或者消费失败,那么 Callback service 就需要主动发起 RPC 通信给上游服务,表明未找到延迟投递的消息,需要重新发送,生产端收到信息后就会重新查询业务消息然后重发消息;

方案二不一定能保障 100% 投递成功,但是基本上可以保障大概 99.9% 的消息的可靠性投递,有些特别极端的情况只能是人工去做补偿,或者使用定时任务去完成。

主要目的是为了减少数据库操作,提高并发量,毕竟在高并发场景下,最关心的不是消息 100% 投递成功,而是一定要保证性能,所以能减少数据库的操作就尽量减少,异常数据可以异步进行补偿。


消息的可靠性传输

可靠性传输就是保证不丢失数据,丢失数据一般分为三种情况,分别是生产端丢失数据消息队列丢失数据消费端丢失数据

1. 生产端丢失数据

原因

生产端发送消息后,由于网络传输问题导致消息的丢失;

解决方案

生产端保证数据不丢失可以采取两种方式:

1.使用 RabbitMQ 事务,在发送消息前开启事务 (channel.txSelect),正常发送后提交事务 (channel.txCommit),如果出现异常,需要回滚事务 (channel.txRollback) 做消息重发等操作;

这种方式在实际生产中不建议使用:RabbitMQ 事务采用同步的方式进行事务处理,提交一个事务后,只有等这个事务执行完才能执行下一个事务,严重影响 RabbitMQ 性能。

2.使用 confirm 模式,当消息成功处理,将回传 ack,消息处理失败,回传 nack;

实际生产中推荐使用这种方式:采用异步模式,消费者发送一条消息后,不用等消息回传消息,就可以发送下一条消息,RabbitMQ 将异步接受这条回传信息,如果是 nack,将进行消息重试,性能比较高。

2. 消息队列丢失数据

原因

消息队列异常宕机,原来存储在队列中的消息就会丢失;

解决方案

消息队列中的 Exchange,Queue 以及消息本身均开启持久化配置;

3. 消费端丢失数据

原因

消费端如果设置了自动确认机制,那么消息到达消费端就会自动进行确认,但是如果此时消费端发生异常,消息并没有被正常消费,就会导致数据的丢失;

解决方案

消费端关闭自动确认,设置为手动确认;


消息的可靠性消费

消息的可靠性消费主要指的是消息的不重复消费,也就是保证消息消费的幂等性,目前有两种主流的实现方式:

  1. 数据库主键形式 (指纹码 + 全局唯一 ID) 去重;
  2. 利用 Redis 原子性实现去重;

1. 数据库主键形式 (指纹码 + 全局唯一 ID) 去重

指纹码:唯一的系统码,可以是内部规则(如时间戳)和外部返回(如银行流水号),通过指纹码和全局唯一 ID 做主键,利用数据库主键进行去重。

如图所示,具体流程如下:

1. 为消息生成统一 ID,发送消息给 MQ Broker

统一 ID 生成服务负责生成 ID,为了保证可靠性,上游服务也要有个本地的 ID 生成服务,为消息生成 ID 后发送给 MQ Broker;

2. ID 规则路由组件监听消息,并进行入库操作

如果入库成功,证明没有重复,就直接发给下游服务;如果发现库里面有了这条消息,表明消息重复,则不进行处理;

这里我们根据 ID 进行分库分表策略,采用 hash 路由算法去进行分压分流,避免了高并发下的数据库性能瓶颈。通过这种算法,消息即使投递多次都会落到同一个数据库分片上,这样就由单台数据库幂等变成了多库的幂等。

2. 利用 Redis 原子性实现去重

某种意义上来说,Redis 是单线程的,性能非常好,提供了很多原子性的命令,比如 setnx 命令。

当接收到消息后,可以将消息 ID 作为 key 执行 setnx 命令,如果执行成功就表示没有处理过这条消息,可以进行消费,执行失败表示消息已经被消费。

利用 Redis 原子性实现去重需要考虑两个问题:

  • 是否需要进行数据落库,如果落库,怎样保证数据库和 Redis 中数据的一致性?(一般保证幂等不会入库)
  • 如果不入库,如何设计定时清理策略,以及怎样保证缓存的可靠性和可用性?
    定时清理策略:由于唯一 ID 生成非常频繁,我们需要进行定时清理,可以执行定时任务,按照业务生成 ID 的数据量来指定清理周期;
    保证缓存的可靠性:采用高可用的集群架构,比如哨兵模式来保证高可用;采用合适的持久化机制,比如 RDB + AOF 混合持久化机制来保证数据的可靠性。

本篇博客就介绍到这里,参考了以下博客链接,非常感谢,下一篇博客将会介绍 RabbitMQ 消息队列的集群架构模式,敬请期待。

https://www.cnblogs.com/luhan777/p/11171515.html
https://blog.csdn.net/weixin_42687829/article/details/104328651
https://zhuanlan.zhihu.com/p/137555204
http://events.jianshu.io/p/d8042d7f62e1
https://www.cnblogs.com/dc-earl/articles/11176603.html

以上是关于如何保证消息队列的可靠性传输?的主要内容,如果未能解决你的问题,请参考以下文章

消息队列专题(高级特性篇):RabbitMQ 如何保证消息的可靠性投递传输和消费

消息队列专题(高级特性篇):RabbitMQ 如何保证消息的可靠性投递传输和消费

如何保证消息的可靠性传输

消息队列面试题要点

消息队列面试题要点

Mark | 分布式之消息队列