RabbitMQ怎么保证消息不丢失
Posted wx61721d9ccd9c5
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ怎么保证消息不丢失相关的知识,希望对你有一定的参考价值。
1.背景
在面试过程中,该题目是毕不可少的必答题。博主在这里经过多方面整理,总结了下文。如果有面试需求,可以适当的背背。
2.消息投递的过程
先来说明一个概念,什么是可靠投递呢?在RabbitMQ中,一个消息从生产者发送到RabbitMQ服务器,需要经历这么几个步骤:
- 生产者准备好需要投递的消息。
- 生产者与RabbitMQ服务器建立连接。
- 生产者发送消息。
- RabbitMQ服务器接收到消息,并将其路由到指定队列。
- RabbitMQ服务器发起回调,告知生产者消息发送成功。
所谓可靠投递,就是确保消息能够百分百从生产者发送到服务器。
3.解决方案
队列存在的以下问题:消息丢失问题 重复消费问题 以下为解决点
1.队列持久化硬盘
丢失的过程就只有在内存发送到磁盘时会丢失消息,如果保存到磁盘后,重启服务消息不会丢失,但是会影效率。
new Queue("demo_queue", true, false, false, args); 第二个参数为true
2.手动ack
告知生产者消息成功/失败,否则,如果失败此队列会保持挂起状态,他们消息会等待。所以在消费完成之后通知生产者消费是否成功/失败,这里使用ack/nack来实现。主要通过配置文件来实现。
rabbitmq:
host: 192.168.xx.xx
port: 5672
username: root
password: root
virtual-host: /
listener:
simple:
acknowledge-mode: manual #手动应答
prefetch: 1 # 每次只处理一个信息
publisher-confirms: true #开启消息确认机制
publisher-returns: true #支持消息发送失败返回队列
@RabbitListener(queues = "demo_queue")
protected void consumerDead(Message message, Channel channel) throws Exception {
RabbitEnum ackSign = RabbitEnum.ACCEPT;
try {
int i = 10 / 0;
} catch (Exception e) {
ackSign = RabbitEnum.RETRY;
throw e;
} finally {
// 通过finally块来保证Ack/Nack会且只会执行一次
if (ackSign == RabbitEnum.ACCEPT) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} else if (ackSign == RabbitEnum.RETRY) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
}
3.确认是否发送成功
判断消息是否发送到交换机 。
@Bean
public RabbitTemplate rabbitTemplate() {
Logger logger = LoggerFactory.getLogger(RabbitTemplate.class);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 消息发送失败返回到队列中, yml需要配置 publisher-returns: true
rabbitTemplate.setMandatory(true);
// 发送消息确认, yml需要配置 publisher-confirms: true 消息是否发送的核心配置
rabbitTemplate.setConfirmCallback(msgSendConfirmCallBack());
// 消息返回, yml需要配置 publisher-returns: true 消息返回的配置
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
String correlationId = message.getMessageProperties().getCorrelationId().toString();
logger.debug("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {} 路由键: {}", correlationId, replyCode, replyText, exchange,
routingKey);
});
return rabbitTemplate;
}
/**
* 确认发送消息是否成功
*
* @return
*/
@Bean
public MsgSendConfirmCallBack msgSendConfirmCallBack() {
return new MsgSendConfirmCallBack();
}
public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {
/**
* 回调方法
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("MsgSendConfirmCallBack , 回调id:" + correlationData);
if (ack) {
System.out.println("消息发送成功");
} else {
//可以将消息写入本地,使用定时任务重新发送
System.out.println("消息发送失败:" + cause + "\\n重新发送");
}
}
}
4.集群化处理
不多说,生产环境集群是标配。
5.异地容灾
6.持久化
发送消息持久化到db中 进行消息的重新发送。消费者消息固话到db中,通过消息id判断是否重复消费。
以上是关于RabbitMQ怎么保证消息不丢失的主要内容,如果未能解决你的问题,请参考以下文章