消息队列专题(高级特性篇):RabbitMQ 如何保证消息的可靠性投递传输和消费
Posted 盛夏温暖流年
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息队列专题(高级特性篇):RabbitMQ 如何保证消息的可靠性投递传输和消费相关的知识,希望对你有一定的参考价值。
我们使用 RabbitMQ 进行消息处理一般都需要保证消息的可靠性,而消息的可靠性又可以根据消息的不同处理阶段分为可靠性投递、传输和消费。
本篇博客将针对这三种情况介绍相应的设计方案,首先来看一下消息的可靠性投递。
消息的可靠性投递
1. 消息投递模式
消息可靠性投递,是指保证生产者能够把消息 100% 发送到消息队列中,生产者 Producer 为我们提供了两种消息投递模式:Confirm 确认模式 和 Return 退回模式 ,这两种模式是保障消息可靠性投递的核心,先来了解下这两种模式。
Confirm 确认模式
是指生产者投递消息后,如果 MQ Broker 收到消息,会给生产者一个应答。生产者接收到应答,就可以确定这条消息被正常发送到了 Broker,具体流程如下图所示。
Confirm 确认模式的代码实现
生产者代码实现:
public class Producer
public static void main(String[] args) throws Exception
//1 创建 ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("主机地址");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 获取 Connection
Connection connection = connectionFactory.newConnection();
//3 通过 Connection 创建 Channel
Channel channel = connection.createChannel();
//4 指定消息投递模式: 消息确认模式
channel.confirmSelect();
String exchangeName = "test_confirm_exchange";
String routingKey = "confirm.save";
//5 发送消息
String msg = "Hello RabbitMQ Send confirm message!";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
//6 添加确认监听
channel.addConfirmListener(new ConfirmListener()
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException
System.err.println("-------no ack!-----------");
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException
System.err.println("-------ack!-----------");
);
消费者代码实现:
public class Consumer
public static void main(String[] args) throws Exception
//1 创建 ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("主机地址");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 获取 Connection
Connection connection = connectionFactory.newConnection();
//3 通过 Connection 创建 Channel
Channel channel = connection.createChannel();
String exchangeName = "test_confirm_exchange";
String routingKey = "confirm.#";
String queueName = "test_confirm_queue";
//4 声明交换机和队列,然后进行绑定设置,最后制定路由 Key
channel.exchangeDeclare(exchangeName, "topic", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//5 创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer);
while (true)
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.err.println("消费端: " + msg);
通过代码可以看出,Confirm 确认模式实现的关键就是添加了以下两个步骤:
1 在 channel 上开启确认模式:channel.confirmSelect()
2 在 channel 上添加监听:channel.addConfirmListener 监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理。
Return 退回模式
是指在发送消息的时候,如果消息没有被正确投递(可能是当前的 Exchange 不存在或指定的 RoutingKey 路由不到),那么此时 RabbitMQ 会返回给生产者一个信号,信号中包括消息不可达的原因,以及消息本身的内容。
有一个关键的配置项:mandatory,如果为 true,则 Return Listener 监听器会接收到路由不可达的消息,进行后续处理,如果为 false,那么 Broker 端会自动删除该消息。
所以这里我们需要把这个配置项的属性配置为 true,Return Listener 监听器才可以正常工作。
spring.rabbitmq.template.mandatory = true
Return 退回模式的代码实现
生产者代码实现:
public class Producer
public static void main(String[] args) throws Exception
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("主机地址");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchange = "test_return_exchange";
// 正常情况下可以到达
String routingKey = "return.save";
String routingKeyError = "abc.save";
String msg = "Hello RabbitMQ Return Message";
channel.addReturnListener(new ReturnListener()
@Override
public void handleReturn(int replyCode, String replyText, String exchange,
String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException
System.err.println("---------handle return----------");
System.err.println("replyCode: " + replyCode);
System.err.println("replyText: " + replyText);
System.err.println("exchange: " + exchange);
System.err.println("routingKey: " + routingKey);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
);
// 消息投递成功,会被消费者所消费
// channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
// 消息不可达,将触发 ReturnListener
channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
消费者代码实现:
public class Consumer
public static void main(String[] args) throws Exception
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("主机地址");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_return_exchange";
String routingKey = "return.#";
String queueName = "test_return_queue";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer);
while(true)
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.err.println("消费者: " + msg);
2.消息可靠性投递实现方案
可靠性投递方案一:消息入库,消息状态记录
从图中可以得出,消息的可靠性投递主要需要保证四个方面:
- 保障消息成功发送;
- 保障消息队列节点成功接收消息;
- 消息发送端收到消息队列服务的确认应答;
- 完善的消息补偿机制;
具体实现流程:
-
Step 1:把业务数据 BIZ 和消息记录数据 MSG 存储到 DB;
这里的消息记录数据 MSG 中包括了一个重要的消息状态字段 status,初始值设置为 0,表示消息尚未得到确认。 -
Step 2:消息发送者给 MQ Broker 发送数据;
-
Step 3:MQ Broker 收到消息后,给生产者 Producer 一个消息应答;
-
Step 4:生产者 Producer 的应答监听器监听到消息,更新消息记录数据 MSG 状态;
就是把消息记录数据 MSG 的确认状态由 “未确认(status=0)” 更新为 “已确认(status=1)”; -
Step 5-6:设置定时任务,获取消息记录数据 MSG 中的超时未确认数据进行重新发送;
-
Step 7:重试次数内,重复执行流程;超过重试次数,更新数据库状态;
将消息记录数据 MSG 的确认状态由 “未确认(status=0)” 更新为 “确认失败(status=2)”;
方案一可以 100% 保证消息的可靠性投递,但是两次数据入库,并不适合在高并发场景使用。
可靠性投递方案二:消息延迟发送,做二次检查,回调检查
-
Step 1:业务记录入库,业务记录正常入库后向 MQ 发送消息;
-
Step 2:第二次消息延迟 3 - 5 分钟,向 MQ 发送消息;
-
Step 3:消费端监听指定队列,对收到的消息进行处理;
-
Step 4:处理完成后,发送 confirm 消息,也就是回送响应,但是这里响应不是一般的 ACK,而是重新生成一条消息,投递到 MQ 中;
-
Step 5:Callback 服务监听消费者发送的 confirm 消息,如果正常完成,消息入库;
这里的 Callback service 是一个单独的服务,它扮演了方案一的存储消息的 DB 角色,它通过 MQ 去监听下游服务发送的 confirm 消息,如果 Callback service 收到 confirm 消息,那么就对消息做持久化存储操作; -
Step 6:Callback 服务处理延迟消息;
几分钟之后延迟消息发送到 MQ,Callback service 监听到对应队列中的延迟消息后,就去检查 DB 中是否存在消息:如果存在,则不需要做任何处理;如果不存在或者消费失败,那么 Callback service 就需要主动发起 RPC 通信给上游服务,表明未找到延迟投递的消息,需要重新发送,生产端收到信息后就会重新查询业务消息然后重发消息;
方案二不一定能保障 100% 投递成功,但是基本上可以保障大概 99.9% 的消息的可靠性投递,有些特别极端的情况只能是人工去做补偿,或者使用定时任务去完成。
主要目的是为了减少数据库操作,提高并发量,毕竟在高并发场景下,最关心的不是消息 100% 投递成功,而是一定要保证性能,所以能减少数据库的操作就尽量减少,异常数据可以异步进行补偿。
消息的可靠性传输
可靠性传输就是保证不丢失数据,丢失数据一般分为三种情况,分别是生产端丢失数据,消息队列丢失数据和消费端丢失数据。
1. 生产端丢失数据
原因
生产端发送消息后,由于网络传输问题导致消息的丢失;
解决方案
生产端保证数据不丢失可以采取两种方式:
1.使用 RabbitMQ 事务,在发送消息前开启事务 (channel.txSelect),正常发送后提交事务 (channel.txCommit),如果出现异常,需要回滚事务 (channel.txRollback) 做消息重发等操作;
这种方式在实际生产中不建议使用:RabbitMQ 事务采用同步的方式进行事务处理,提交一个事务后,只有等这个事务执行完才能执行下一个事务,严重影响 RabbitMQ 性能。
2.使用 confirm 模式,当消息成功处理,将回传 ack,消息处理失败,回传 nack;
实际生产中推荐使用这种方式:采用异步模式,消费者发送一条消息后,不用等消息回传消息,就可以发送下一条消息,RabbitMQ 将异步接受这条回传信息,如果是 nack,将进行消息重试,性能比较高。
2. 消息队列丢失数据
原因
消息队列异常宕机,原来存储在队列中的消息就会丢失;
解决方案
消息队列中的 Exchange,Queue 以及消息本身均开启持久化配置;
3. 消费端丢失数据
原因
消费端如果设置了自动确认机制,那么消息到达消费端就会自动进行确认,但是如果此时消费端发生异常,消息并没有被正常消费,就会导致数据的丢失;
解决方案
消费端关闭自动确认,设置为手动确认;
消息的可靠性消费
消息的可靠性消费主要指的是消息的不重复消费,也就是保证消息消费的幂等性,目前有两种主流的实现方式:
- 数据库主键形式 (指纹码 + 全局唯一 ID) 去重;
- 利用 Redis 原子性实现去重;
1. 数据库主键形式 (指纹码 + 全局唯一 ID) 去重
指纹码:唯一的系统码,可以是内部规则(如时间戳)和外部返回(如银行流水号),通过指纹码和全局唯一 ID 做主键,利用数据库主键进行去重。
如图所示,具体流程如下:
1. 为消息生成统一 ID,发送消息给 MQ Broker
统一 ID 生成服务负责生成 ID,为了保证可靠性,上游服务也要有个本地的 ID 生成服务,为消息生成 ID 后发送给 MQ Broker;
2. ID 规则路由组件监听消息,并进行入库操作
如果入库成功,证明没有重复,就直接发给下游服务;如果发现库里面有了这条消息,表明消息重复,则不进行处理;
这里我们根据 ID 进行分库分表策略,采用 hash 路由算法去进行分压分流,避免了高并发下的数据库性能瓶颈。通过这种算法,消息即使投递多次都会落到同一个数据库分片上,这样就由单台数据库幂等变成了多库的幂等。
2. 利用 Redis 原子性实现去重
某种意义上来说,Redis 是单线程的,性能非常好,提供了很多原子性的命令,比如 setnx 命令。
当接收到消息后,可以将消息 ID 作为 key 执行 setnx 命令,如果执行成功就表示没有处理过这条消息,可以进行消费,执行失败表示消息已经被消费。
利用 Redis 原子性实现去重需要考虑两个问题:
- 是否需要进行数据落库,如果落库,怎样保证数据库和 Redis 中数据的一致性?(一般保证幂等不会入库)
- 如果不入库,如何设计定时清理策略,以及怎样保证缓存的可靠性和可用性?
定时清理策略:由于唯一 ID 生成非常频繁,我们需要进行定时清理,可以执行定时任务,按照业务生成 ID 的数据量来指定清理周期;
保证缓存的可靠性:采用高可用的集群架构,比如哨兵模式来保证高可用;采用合适的持久化机制,比如 RDB + AOF 混合持久化机制来保证数据的可靠性。
本篇博客就介绍到这里,参考了以下博客链接,非常感谢,下一篇博客将会介绍 RabbitMQ 消息队列的集群架构模式,敬请期待。
https://www.cnblogs.com/luhan777/p/11171515.html
https://blog.csdn.net/weixin_42687829/article/details/104328651
https://zhuanlan.zhihu.com/p/137555204
http://events.jianshu.io/p/d8042d7f62e1
https://www.cnblogs.com/dc-earl/articles/11176603.html
以上是关于消息队列专题(高级特性篇):RabbitMQ 如何保证消息的可靠性投递传输和消费的主要内容,如果未能解决你的问题,请参考以下文章
消息队列专题(高级特性篇):RabbitMQ 如何保证消息的可靠性投递传输和消费