03 RabbitMQ进阶1之可靠性投递
Posted IT BOY
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了03 RabbitMQ进阶1之可靠性投递相关的知识,希望对你有一定的参考价值。
目录
Pt1 可靠性投递
在分布式系统中,引入新的组件需要考虑很多问题。新的组件带来新的链路节点,引入了新的风险,需要考虑组件本身的高可用,数据一致性等问题。我们将MQ加入到应用架构中负责消息通信,就需要考虑MQ丢失消息怎么办?如果重发需要考虑哪些问题?
Pt1.1 可靠性风险
先来看看RabbitMQ的工作模型中,可能存在哪些风险点:
从图中绿字部分可以看出,RabbitMQ模型中有以下风险可能导致消息传递不可靠:
-
生产者发送消息到Broker:生产者怎么知道Broker已经成功收到自己投递的消息呢?
-
Exchange分发消息到Queue:如果Exchange无法将消息分发到Queue怎么办?比如无法匹配Exchange的绑定规则,或者找不到匹配的Queue,这条消息该怎么处理?要怎么通知到生产者?
-
消息存储在Queue中:队列中存放的消息,如果保证稳定存储呢?万一MQ宕机重启,是不是还没有来得及消费的消息就丢了?
-
消费者订阅Queue消费消息:Queue是FIFO性质的,只有前一条消息投递之后才会继续投递下一条消息?如何确认消息者已经成功接收到消息了呢?
接下来看看,RabbitMQ是如何解决这些问题的。
Pt1.2 保证生产者发送消息给Broker
用过MQ的应该都清楚,MQ是一个比较容易出问题的环节。网络抖动、Broker故障(内存、磁盘)、发送超时等,都可能导致生产者无法成功将消息发送到Broker。当这种情况发生时,如何快速感知到,对于减少故障影响是至关重要的,换句话说,生产者如何明确知道发送的消息被Broker成功接收呢?
RabbitMQ中提供了两种服务端确认机制,来保证生产者及时获知消息的接收状态。就是在生产者发送消息给RabbitMQ服务端的时候,服务端会以某种方式返回一个应答,只要生产者收到应答,就知道Broker已经成功接收了消息。
-
事务模式(Transaction);
-
确认模式(Confirm);
(1) 事务模式(Transaction)
RabbitMQ中与事务机制有关的方法有三个:txSelect(),txCommit(),txRollback()。 txSelect用于将当前channel设置成transaction模式,txCommit用于提交事务,txRollback用于回滚事务,在通过txSelect开启事务之后,我们便可以发布消息给broker代理服务器了,如果txCommit提交成功了,则消息一定到达了broker了。如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback回滚事务了。
代码模板
try
channel.txSelect();
channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
channel.txCommit();
catch (Exception e)
// TODO
channel.txRollback();
事务模式的代码案例:
模拟两个RabbitMQ事务,一个成功事务,一个失败事务,如下图。
1. 首先,在RabbitMQ控台新建Exchange1和Exchange2,Exchange3不用创建用于模拟失败场景。
2. 新建MQ操作共通类(这个类下面的示例中都会用到,后面就不重复创建了)。
public class RabbitMQUtils
/**
* 配置连接工厂类参数
*
* @return
*/
public static ConnectionFactory getFactoryInstance()
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(RabbitMQConstants.HOST);
factory.setPort(RabbitMQConstants.PORT);
factory.setVirtualHost(RabbitMQConstants.VHOST);
factory.setUsername(RabbitMQConstants.USERNAME);
factory.setPassword(RabbitMQConstants.PASSWORD);
return factory;
/**
* 新建RabbitMQ连接
*
* @return
*/
public static Connection getConnection()
try
return getFactoryInstance().newConnection();
catch (Exception e)
e.printStackTrace();
return null;
/**
* 新建消息信道
*
* @return
*/
public static Channel createChannel(Connection conn)
try
return conn.createChannel();
catch (IOException e)
e.printStackTrace();
return null;
3. 新建生产者类
public class TransactionProducer
public static void main(String[] args)
// 创建消息信道
Connection connection = RabbitMQUtils.getConnection();
Channel channel = RabbitMQUtils.createChannel(connection);
// 这是第一个成功事务
try
// 使用事务的方式发送消息
channel.txSelect();
// 发送一个消息到Exchange:TRANSACTION_1_EXCHANGE,路由键:transaction
channel.basicPublish("TRANSACTION_1_EXCHANGE", "transaction", MessageProperties.PERSISTENT_TEXT_PLAIN,
"这是事务型消息1".getBytes());
// 发送一个消息到Exchange:TRANSACTION_2_EXCHANGE,路由键:transaction
channel.basicPublish("TRANSACTION_2_EXCHANGE", "transaction", MessageProperties.PERSISTENT_TEXT_PLAIN,
"这是事务型消息2".getBytes());
channel.txCommit();
System.out.println("消息1和2发送成功");
catch (Exception ex)
try
// 回滚消息的发送
System.out.println("消息1和2被回滚了");
channel.txRollback();
catch (IOException e)
e.printStackTrace();
// 这是第二个失败事务:因为Exchange3不存在。
try
// 使用事务的方式发送消息
channel.txSelect();
// 发送一个消息到Exchange:TRANSACTION_1_EXCHANGE,路由键:transaction
channel.basicPublish("TRANSACTION_1_EXCHANGE", "transaction", MessageProperties.PERSISTENT_TEXT_PLAIN,
"这是事务型消息3".getBytes());
// 发送一个消息到Exchange:TRANSACTION_3_EXCHANGE,路由键:transaction
channel.basicPublish("TRANSACTION_3_EXCHANGE", "transaction", MessageProperties.PERSISTENT_TEXT_PLAIN,
"这是事务型消息4".getBytes());
channel.txCommit();
System.out.println("消息3和4发送成功");
catch (Exception ex)
try
// 回滚消息的发送
System.out.println("消息3和4被回滚了");
channel.txRollback();
catch (IOException e)
e.printStackTrace();
4. 测试运行
消息1和2发送成功
消息3和4被回滚了
Exception in thread "main" com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'TRANSACTION_3_EXCHANGE' in vhost 'firstRabbitMQ', class-id=60, method-id=40)
at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258)
at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:341)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:282)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141)
at com.rabbitmq.client.impl.ChannelN.txRollback(ChannelN.java:1548)
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.txRollback(AutorecoveringChannel.java:675)
at com.example.mq.mqdemo.rabbitmq.javaapi.transaction.TransactionProducer.main(TransactionProducer.java:57)
测试结果符合预期。
事务确实能够解决生产者与broker之间消息确认的问题,只有消息成功被broker接收,事务提交才能成功,否则我们便可以在捕获异常进行事务回滚操作同时进行消息重发。但是使用事务机制的话会降低RabbitMQ的性能,因为事务是阻塞的,只有一条消息发送完毕才能发送一下消息,会严重影响RabbitMQ的性能,不太建议生产使用。
那么有没有更好的方法既能保障producer知道消息已经正确送到,又能基本上不带来性能上的损失呢?用confirm模式。
(2) 确认模式(Confirm)
确认模式就是生产者发送消息后,Broker会响应Ack作为成功接收消息的确认。确认模式有三种形式:
-
普通确认模式:每发送一条消息后,调用waitForConfirms()方法,等待Broker确认,这实际上是一种串行确认。
-
批量确认模式:每发送一批消息后,调用waitForConfirmsOrDie()方法,等待Broker确认。只要有一个未被Broker确认就会抛出IoException。
-
异步确认模式:提供一个回调方法,Broker确认了一条或者多条消息后Client端会回调这个方法。
普通确认模式
普通确认模式比较简单,发送一条消息后,等待Broker确认,如果服务端返回false或者超时时间内未返回,生产者则可以认为消息发送失败。如果网络错误会抛出连接异常,如果目标Exchange不存在则会抛出404错误。
案例场景:
代码实现:
public class ConfirmNormalProducer
public static void main(String[] args)
// 创建消息信道
Connection connection = RabbitMQUtils.getConnection();
Channel channel = RabbitMQUtils.createChannel(connection);
// 案例一:正确使用的场景。
try
// 将消息信道设置为普通确认模式
channel.confirmSelect();
// 发送一个消息
channel.basicPublish("CONFIRM_1_EXCHANGE", "confirm", MessageProperties.PERSISTENT_TEXT_PLAIN,
"这是普通确认模式消息1".getBytes());
// 消息确认
if (channel.waitForConfirms())
System.out.println("普通确认模式消息1发送成功");
catch (Exception ex)
ex.printStackTrace();
// 消息发送异常,需要考虑重发
// TODO
// 案例二:消息发往一个根本不存在的Exchange
try
// 将消息信道设置为普通确认模式
channel.confirmSelect();
try
// 发送一个消息,这里会报错,用try..catch来处理,重点是看waitForConfirms()结果
channel.basicPublish("CONFIRM_2_EXCHANGE", "confirm", MessageProperties.PERSISTENT_TEXT_PLAIN,
"这是普通确认模式消息2".getBytes());
catch (Exception e)
// 异常不输出了,太长了,占版面。
// e.printStackTrace();
System.out.println("这里打个标记。");
// 如果消息发送失败,waitForConfirms()还会返回true吗
if (channel.waitForConfirms())
System.out.println("普通确认模式消息2发送成功");
else
System.out.println("Oh, sorry, your message get lost.");
catch (Exception ex)
ex.printStackTrace();
// 消息发送异常,需要考虑重发
// TODO
测试结果:
普通确认模式消息1发送成功
这里打个标记。
com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'CONFIRM_2_EXCHANGE' in vhost 'firstRabbitMQ', class-id=60, method-id=40)
at com.rabbitmq.client.impl.ChannelN.waitForConfirms(ChannelN.java:210)
at com.rabbitmq.client.impl.ChannelN.waitForConfirms(ChannelN.java:195)
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.waitForConfirms(AutorecoveringChannel.java:691)
at com.example.mq.mqdemo.rabbitmq.javaapi.transaction.ConfirmNormalProducer.main(ConfirmNormalProducer.java:51)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'CONFIRM_2_EXCHANGE' in vhost 'firstRabbitMQ', class-id=60, method-id=40)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:517)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:341)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)
at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:739)
at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:666)
at java.base/java.lang.Thread.run(Thread.java:834)
执行channel.waitForConfirms()并没有返回false或者true,而是直接抛出了异常,404错误。
批量确认模式
批量确认模式稍微复杂一点,生产者需要定期(每隔多少秒)或者定量(达到多少条)或者两则结合起来发送消息,然后等待Broker确认。
相比普通confirm模式,批量极大提升确认效率,但是问题在于一旦出现确认返回false或者超时的情况时,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数量,并且,当消息经常丢失时,批量确认性能应该是不升反降的。
还有就是多少一个批次比较合适很难确定,需要非常熟悉业务才能很好的判定。
案例说明:
代码实现:
public class ConfirmBatchProducer
public static void main(String[] args)
// 创建消息信道
Connection connection = RabbitMQUtils.getConnection();
Channel channel = RabbitMQUtils.createChannel(connection);
// 案例一:正确使用的场景。
try
// 将消息信道设置为确认模式
channel.confirmSelect();
// 发送消息
channel.basicPublish("CONFIRM_1_EXCHANGE", "confirm", MessageProperties.PERSISTENT_TEXT_PLAIN,
"这是批量确认模式消息1".getBytes());
channel.basicPublish("CONFIRM_2_EXCHANGE", "confirm", MessageProperties.PERSISTENT_TEXT_PLAIN,
"这是批量确认模式消息2".getBytes());
// 批量确认
channel.waitForConfirmsOrDie();
System.out.println("第一批消息发送确认成功。");
catch (Exception ex)
ex.printStackTrace();
// 消息发送异常,需要考虑重发
// TODO
System.out.println("---两个案例的分割线-----------");
// 案例二:其中一个Exchange不存在导致批量确认失败。
try
// 将消息信道设置为确认模式
channel.confirmSelect();
// 发送消息,其中CONFIRM_3_EXCHANGE不存在将导致批量确认失败。
channel.basicPublish("CONFIRM_2_EXCHANGE", "confirm", MessageProperties.PERSISTENT_TEXT_PLAIN,
"这是普通确认模式消息3".getBytes());
channel.basicPublish("CONFIRM_3_EXCHANGE", "confirm", MessageProperties.PERSISTENT_TEXT_PLAIN,
"这是普通确认模式消息4".getBytes());
// 批量确认
channel.waitForConfirmsOrDie();
System.out.println("第二批消息发送确认成功。");
catch (Exception ex)
ex.printStackTrace();
// 消息发送异常,需要考虑重发
// TODO
测试结果:
第一批消息发送确认成功。
---两个案例的分割线-----------
com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'CONFIRM_3_EXCHANGE' in vhost 'firstRabbitMQ', class-id=60, method-id=40)
at com.rabbitmq.client.impl.ChannelN.waitForConfirms(ChannelN.java:210)
at com.rabbitmq.client.impl.ChannelN.waitForConfirmsOrDie(ChannelN.java:247)
at com.rabbitmq.client.impl.ChannelN.waitForConfirmsOrDie(ChannelN.java:237)
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.waitForConfirmsOrDie(AutorecoveringChannel.java:701)
at com.example.mq.mqdemo.rabbitmq.javaapi.transaction.ConfirmBatchProducer.main(ConfirmBatchProducer.java:48)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'CONFIRM_3_EXCHANGE' in vhost 'firstRabbitMQ', class-id=60, method-id=40)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:517)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:341)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)
at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:739)
at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:666)
at java.base/java.lang.Thread.run(Thread.java:834)
异步确认模式
异步确认模式的编程实现最复杂,Channel对象提供的ConfirmListener()回调方法只包含deliveryTag(当前Chanel发出的消息序号),我们需要自己为每一个Channel维护一个unconfirm的消息序号集合,每publish一条数据,集合中元素加1,每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或多条(multiple=true)记录。从程序运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构。
实际上,SDK中的waitForConfirms()方法也是通过SortedSet维护消息序号的。
案例说明:这次比较简单,发送批量的消息,然后异步Listener进行处理。
代码实现:
public class ConfirmListenerProducer
// 未确认消息列表
protected final static SortedSet<Long> unconfirmMessage = Collections.synchronizedSortedSet(new TreeSet<>());
public static void main(String[] args)
// 创建消息信道
Connection connection = RabbitMQUtils.getConnection();
Channel channel = RabbitMQUtils.createChannel(connection);
try
// 添加消息确认Listener
channel.addConfirmListener(new ConfirmListener()
// Broker确认消息
// multiple=true 标识批量确认了小于deliveryTag的所有消息,如果为false代表确认的是deliveryTag单条消息。
@Override public void handleAck(long deliveryTag, boolean multiple) throws IOException
System.out.println("本次确认情况如下:multiple=" + multiple + ", deliveryTag=" + deliveryTag);
// 删除 (deliveryTag + 1L) 之前所有元素
if (multiple)
unconfirmMessage.headSet(deliveryTag + 1L).clear();
else
// 删除deliveryTag单条元素
unconfirmMessage.remove(deliveryTag);
System.out.println("还未确认的消息情况:" + unconfirmMessage);
// Nack是消费失败的消息:可能超时,可能是因为消费时异常发生。
@Override public void handleNack(long deliveryTag, boolean multiple) throws IOException
System.out.println("本次确认情况如下:multiple=" + multiple + ", deliveryTag=" + deliveryTag);
// 删除 (deliveryTag + 1L) 之前所有元素
if (multiple)
unconfirmMessage.headSet(deliveryTag + 1L).clear();
else
// 删除deliveryTag单条元素
unconfirmMessage.remove(deliveryTag);
// 针对上面删除的消息进行重发,或者报警人工接确认
// TODO
);
// 将消息信道设置为确认模式
channel.confirmSelect();
// 批量发送大量消息
for (int i = 0; i < 10; i++)
// 获取接下来发送消息的deliveryTag
long deliveryTag = channel.getNextPublishSeqNo();
channel.basicPublish("CONFIRM_1_EXCHANGE", "confirm", MessageProperties.PERSISTENT_TEXT_PLAIN,
"这是批量确认模式消息".getBytes());
unconfirmMessage.add(deliveryTag);
System.out.println("所有发送消息:" + unconfirmMessage);
// 程序执行完成后,会关闭Connection和Channel,但是Listener要使用Channel来接受确认,所以要保持连接存活。不然报错。
TimeUnit.SECONDS.sleep(180);
catch (Exception ex)
ex.printStackTrace();
// 消息发送异常,需要考虑重发
// TODO
测试结果:
所有发送消息:[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
本次确认情况如下:multiple=false, deliveryTag=1
还未确认的消息情况:[2, 3, 4, 5, 6, 7, 8, 9, 10]
本次确认情况如下:multiple=false, deliveryTag=2
还未确认的消息情况:[3, 4, 5, 6, 7, 8, 9, 10]
本次确认情况如下:multiple=false, deliveryTag=3
还未确认的消息情况:[4, 5, 6, 7, 8, 9, 10]
本次确认情况如下:multiple=false, deliveryTag=4
还未确认的消息情况:[5, 6, 7, 8, 9, 10]
本次确认情况如下:multiple=false, deliveryTag=5
还未确认的消息情况:[6, 7, 8, 9, 10]
本次确认情况如下:multiple=false, deliveryTag=6
还未确认的消息情况:[7, 8, 9, 10]
本次确认情况如下:multiple=false, deliveryTag=7
还未确认的消息情况:[8, 9, 10]
本次确认情况如下:multiple=false, deliveryTag=8
还未确认的消息情况:[9, 10]
本次确认情况如下:multiple=false, deliveryTag=9
还未确认的消息情况:[10]
本次确认情况如下:multiple=false, deliveryTag=10
还未确认的消息情况:[]
Pt1.3 保证Exchange路由消息到队列
排除RabbitMQ自身可能存在的问题,有两种情况会导致消息从Exchange到Queue的异常:路由规则配置错误导致消息无法匹配,消息匹配规则后没有找到指定的Queue。这两种情况可能是我们在配置路由规则时存在问题,那如何处理这类消息呢?
-
Broker通知生产者,此类消息存在异常,让生产者来处理;
-
Exchange将消息路由到另一个备份的Exchange来处理。
(1) 消息回发
消息回发是当消息发送到Exchange处理失败时,及时通知生产者进行处理。
public class ReturnProducer
public static void main(String[] args)
Connection connection = RabbitMQUtils.getConnection();
Channel channel = RabbitMQUtils.createChannel(connection);
try
// 如果EXCHANGE无法处理消息,通过ReturnListener会回发给生产者。
channel.addReturnListener(new ReturnListener()
@Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
AMQP.BasicProperties basicProperties, byte[] body) throws IOException
// 输出无法处理的消息
System.out.println(
"MQ无法处理当前消息:replyCode=" + replyCode + ", replyText=" + replyText + ", exchange=" + exchange
+ ", routingKey=" + routingKey + ", body=" + new String(body));
// 无法处理的消息需要有补偿机制:重发或者其他处理
// TODO
);
// NOWHERE_EXCHANGE没有指定任何队列,所以消息无法被处理。
channel.basicPublish("NOWHERE_EXCHANGE", "confirm", true, MessageProperties.PERSISTENT_TEXT_PLAIN,
"我是条不知道去哪的消息".getBytes());
System.out.println("消息发送成功。");
catch (Exception ex)
ex.printStackTrace();
测试结果:
消息发送成功。
MQ无法处理当前消息:replyCode=312, replyText=NO_ROUTE, exchange=NOWHERE_EXCHANGE, routingKey=confirm, body=我是条不知道去哪的消息
(2) 消息路由到备份Exchange
在创建Exchange的时候,可以指定备份Exchange。
代码实现:
// 1、首先在MQ控台建立MAIN_EXCHANGE和BACK_EXCHANGE,并建立BACKUP_QUEUE和绑定关系,这里操作就不介绍了,前面有说过。
// 2、生产者
public class BackupProducer
public static void main(String[] args)
Connection connection = RabbitMQUtils.getConnection();
Channel channel = RabbitMQUtils.createChannel(connection);
try
// MAIN_EXCHANGE没有绑定QUEUE,会将消息转发到备用交换机BACKUP_EXCHANGE,然后消息会被订阅的队列BACKUP_QUEUE获取
channel.basicPublish("MAIN_EXCHANGE", "confirm", MessageProperties.PERSISTENT_TEXT_PLAIN,
"我是条不知道去哪的消息".getBytes());
System.out.println("消息发送成功。");
catch (Exception ex)
ex.printStackTrace();
// 3、消费者
public class BackupConsumer
public static void main(String[] args)
Connection connection = RabbitMQUtils.getConnection();
Channel channel = RabbitMQUtils.createChannel(connection);
try
// 创建消费者,绑定队列
Consumer consumer = new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException
String message = new String(body, "UTF-8");
System.out.println("收到消息:" + message);
;
while (true)
channel.basicConsume("BACKUP_QUEUE", true, consumer);
TimeUnit.SECONDS.sleep(1);
catch (Exception ex)
ex.printStackTrace();
测试结果:
// 生产者输出
消息发送成功。
// 消费者输出
收到消息:我是条不知道去哪的消息
要注意的是,Queue可以指定死信Exchange,Exchange可以指定备份Exchange。
Pt1.4 保证消息在队列存储
·队列是一个独立运行的服务,有自己的数据库(Mnesia),用来存储消息。如果还没有消费者来消费,那么消息要一直存储在队列中,队列是保存在内存中的,如果发生系统宕机、重启、关闭时,可能导致内存中的消息丢失。 怎么保证消息在队列稳定地存储呢?通过消息持久化。
(1) Queue持久化
队列元数据持久化。如果队列没有持久化,只保存在节点内存中,当节点宕机或者服务重启后,我们创建的队列就消失了。
// queueName, durable, exclusive, autoDelete, Properties
new Queue("TEST_QUEUE", true, false, false, new HashMap<>());
声明队列的时候,有如下参数:
-
durable:队列是否持久化,false的话队列不会持久化,保存在内存中。
-
exclusive:排他性队列。
-
autoDelete:没有消费者连接的时候,队列自动删除。
(2) Exchange持久化
和队列设定一致,Exchange元数据持久化。如果不想在节点重启后者宕机恢复后丢失已经创建的Exchange信息,那就要做持久化处理。
// exchangeName, durable, exclusive, autoDelete, Properties
new DirectExchange("TEST_EXCHANGE", true, false, new HashMap<>());
-
durable:队列是否持久化,false的话队列不会持久化,保存在内存中。
-
exclusive:排他性队列。
-
autoDelete:没有消费者连接的时候,队列自动删除。
(3) 消息持久化
元数据做持久化还不够,即时消息也要持久化处理,否则重启过后,发现队列还在,消息都没了。那依赖方系统会告诉你,消息丢了说其他的有个卵用。
public class PersistentProducer
public static void main(String[] args)
Connection connection = RabbitMQUtils.getConnection();
Channel channel = RabbitMQUtils.createChannel(connection);
try
// MessageProperties.PERSISTENT_TEXT_PLAIN实际上是设置Delivery_mode=2,代表消息持久化处理。
channel.basicPublish("NOWHERE_EXCHANGE", "confirm", true, MessageProperties.PERSISTENT_TEXT_PLAIN,
"我是条不知道去哪的消息".getBytes());
System.out.println("消息发送成功。");
catch (Exception ex)
ex.printStackTrace();
(4) 集群冗余
只有一个RabbitMQ 的节点,即使交换机、队列、消息做了持久化,如果服务崩溃或者硬件发生故障,一样会导致RabbitMQ服务不可用、数据丢失。通过多个RabbitMQ节点建立集群,冗余多个副本,提高MQ服务可用性,保障消息的传输。
Pt1.5 保证消息投递到消费者
RabbitMQ 提供了消费者的消息确认机制(message acknowledgement),消费者可以自动或者手动地发送ACK 给服务端。
消息确认有三种情况:
-
NONE:自动ACK。消费者在收到消息后会自动发送ack确认,而不是在消费完成之后发送,不保证消息的正确消费。
-
MANUAL:手动ACK。由消费者择机发送ack,可以在收到消息后发送,也可以在消息处理完成后再发送。
-
AUTO:如果方法未抛出异常,则发送ack。
没有收到ACK消息,消费者断开连接后,服务端会把这条消息发送给其它消费者。如果没有其它消费者,当前消费者重启后会重新消费这条消息。
前面的案例中都是自动ACK的场景,如果需要手动ACK,则需要再消费消息的时候设置autoAck为false。RabbitMQ会等待消费者显示的回复ACK后才从队列中移除消息。
channel.basicConsume("BACKUP_QUEUE", false, consumer)
代码示例如下:
// 创建消费者,绑定队列
Consumer consumer = new DefaultConsumer(channel)
@SneakyThrows @Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException
String message = new String(body, "UTF-8");
System.out.println("收到消息:" + message);
TimeUnit.SECONDS.sleep(10);
// ACK消息确认。
channel.basicAck(envelope.getDeliveryTag(), true);
// NACK消息确认失败,可以设置参数让消息重新入队列。
channel.basicNack(envelope.getDeliveryTag(), true, false);
// Reject拒绝消息
channel.basicReject(envelope.getDeliveryTag(), false);
System.out.println("通知服务端ACK");
;
Pt1.6 消费者回调
需要注意的是,消费者没有收到消息或者消费异常时,生产者是不知道的。如果想要通知生产者,则需要通过应用程序交互的方式(API调用或者消息通知)。
-
消费者调用生产者API,通知生产者当前处理已经完成。
-
消费者同样发送消息给生产者,通知生产者当前消息的处理状态。
Pt1.7 消息补偿机制
如果生产者没有收到消费者的响应,或者说出现了临时性故障,网络闪断、应用宕机重启等情况,生产者不知道消费者到底有没有成功接收并处理完成,那是否需要重发消息呢?如果重发之后还是没有响应呢?需要重发什么内容呢?
-
谁来重发消息?当然是生产者负责重发消息,生产者在发送消息时建议持久化到本地数据库,这样可以通过维护消息发送的状态来判断哪些消息需要重发。
-
多久进行重发?这要取决于故障解决的时间,当然除了突发状况之外,正常情况下生产者可以和消费者约定时间,比如10分钟没有响应,生产者就认为消息丢失,重发消息。也可以设置衰减机制,重试时间不止一次,比如5分钟,10分钟,30分钟,2小时这样的衰减时间进行重试。
-
需要重发几次?也许消费者应用真的宕机,或者有bug,重试几次都没有,再一直重试下去也没有意义了。所以要控制重发的次数,一般是重试三次,如果三次之后还是没有正确响应,那通常需要报警人工介入。
需要说明的是,如果选择重发或者重试,一定要从业务链路上考虑重复消息处理时的幂等性,否则可能引发业务上严重的事故。
Pt1.8 消息幂等性
前面说了生产者和消费者之间要有补偿机制,生产者要负责在异常情况下的消息重发。但是假如出问题的不是消费者呢?消费者正确处理了消息,但是生产者故障导致没有成功接收到响应,或者消费者响应生产者的网络出现闪断导致响应丢失?过段时间,生产者认为消息消费失败,又重发了消息,导致业务重复处理,那就问题很大了。
举例来说,业务系统通知交易系统扣款,结果交易系统扣款成功但是通知业务系统是网络闪断导致业务系统没有收到结果,那过了10分钟,生产者又发了一次扣款请求,那用户不是要炸了吗?同一件商品扣我两次钱?
实际场景中,有很多种情况可能会导致消息重复。
-
生产者逻辑有问题,重复发送了消息,比如没有正确判断confirm状态而认为消息没有成功消费,导致重复投递。
-
消费者逻辑有问题,接收消息后没有正确响应ack,导致生产者没有收到确认而认为消息丢失。
-
网络或其他问题,导致消费结果没有在生产者和消费者之间正确传递。
对于消息重复,要通过业务上唯一ID来做幂等,防止重复消费的情况。比如,消息要带上业务上唯一的订单号,消费者接受消息后,入库。如果有重复消息,判断订单号是否已经存在于数据库(缓存等都可以)并且有消费(正在处理或者已经处理完成),如果有则认为该消息已被处理,本次消息不再做处理,这样来防止消息重复的情况。
Pt1.9 最终一致性
虽然MQ本身有很高的可用性,应用程序中也有各种防重、幂等和补偿机制,但是在实际情况中,往往还是经常会出现消息消费异常导致的业务状态不一致问题。这其中可能有多方原因,你很难把整个处理过程考虑的很全面,能够防止各种突发情况,一是场景复杂,二是实现成本往往较高。
事后简单的核对机制往往更加简单高效,能够及时发现问题,作为最后一道有力的屏障。所以,对于使用MQ做通信的业务系统,需要有状态的核对机制,来解决异常情况下,消费状态不一致的问题。
举例来说,在支付系统中,使用MQ作为异步支付结果的通知,除此之外,会增加一定的反查机制和对账机制,业务系统发起支付后,等待支付系统的异步结果,如果2分钟(根据实际情况来定,可能是多次阶梯型)没有收到支付结果,业务系统就主动发起交易状态反查。如果2小时后还没有支付结果,业务系统就会发起关单,认为此笔交易失败。同时第二天,业务系统会发起和支付系统的对账操作,防止交易系统后续又处理成功的场景。使用多种方式来防止交易状态不一致的情况。
Pt1.10 消息顺序性
消息的顺序性指的是消费者消费消息的顺序跟生产者生产消息的顺序是一致的。
举例来说,业务上的操作顺序是:1、通知订单系统完成下单;2、通知支付系统完成订单支付;3、通知物流系统发货。
这种场景下消息的顺序是不能颠倒的,订单都没有支付成功不可能就发货,所以要保证这种场景下消息消费的顺序性。
在RabbitMQ中,队列本身是FIFO,能够保证消息进出队列的顺序是一致的,先入队列的消息先出队列。
当队列仅有一个消费者时,能够保证消息的顺序消费。
但是当队列有多个消费者时,消费者同时从队列获取多条消息,但是因为每个消费者处理速度不同,可能会导致消息被消费的顺序错乱。
所以,对于要保证消息顺序的场景,将消息都发送到一个队列,同时只能有一个消费者负责处理消息。
正常来说,除非是消息负载比较高,尽量不要用多个消费者消费消息。
本章到此结束,更多关于RabbitMQ的文章,请点击如下专栏连接。
以上是关于03 RabbitMQ进阶1之可靠性投递的主要内容,如果未能解决你的问题,请参考以下文章