如何保证消息的可靠性传输
Posted 一江溪水
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何保证消息的可靠性传输相关的知识,希望对你有一定的参考价值。
在生产环境中,因为机器以及网络设备的不可靠,保证消息的可靠是待解决的问题。在特定场景下消息可能存在丢失风险
消息发送流程
我们可以将 RabbitMQ 消息处理的过程分为三个步骤:
- 生产阶段:生产者生产消息并且发送到消息队列;
- 储存阶段:消息队列存储和处理消息;
- 消费阶段:消息队列将消息转发到消费者。
上述每个步骤都有可能出现消息丢失的风险;
生产者生产消息并且发送到消息队列
丢失场景
生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,比如:
- 网络故障。网络环境的不可靠导致消息发送失败,例如网络丢包、网络故障。
- 数据在网络中传输会经过诸多网络设备,只要其中一个网络链接在数据抵达前已经流量满载,新到的数据将会阻塞一段时间段。
AMQP事务机制
使用AMQP协议的事务机制。生产者在发出消息之后,消息是否到达RabbitMQ服务器是默认不可知的,在生产者发送消息之前,调用channel.txSelect 语句开启事务
- 如果消息发送失败,那么调用channel.txRollback回滚事务,尝试重新发送一条消息;
- 如果消息发送成功,那么调用channel.txCommit提交事务。
采用事务的缺点是增加耗时,会降低RabbitMQ的吞吐性能。
Confirm机制
RabbitMQ有一种性能改进方案,即Confirm机制
- 生产者调用channel.confirmSelect将通信方式设置为confirm模式;
- 生产者发送的所有消息都会被分配一个唯一 ID;
- 当生产者发送的消息成功投递到队列之后,RabbitMQ会发送一个ack给生产者,生产者即得知这条消息已经成功发送。
- 如果 RabbitMQ 没能处理这个消息,会回调你一个nack接口,告诉你这个消息接收失败,需要重试
我们也可以结合这个机制在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么就需要重新发送
区别
- 事务机制是同步的,你提交一个事务之后会阻塞在那里
- confirm机制是异步的,发送消息之后就可以发送下一个消息,然后那个消息RabbitMQ 接收了之后会异步回调你一个接口通知你这个消息接收到了。
消息队列存储和处理消息
消息存储在 RabbitMQ 队列中,如果队列没有持久化,如果服务器宕机,RabbitMQ 服务器重启后会导致消息丢失。
解决方案
开启 RabbitMQ 的持久化,就是消息写入之后会持久化到磁盘,如果 RabbitMQ 自己宕机了,重启之后会自动读取之前存储的数据;
也有可能RabbitMQ 还没持久化,就宕机了,这种情况发生的概率比较小
持久化也可以跟生产者的confirm机制配合起来,只有消息被持久化到磁盘之后,才给生产者发送ack,所以哪怕是出现上面这种情况,RabbitMQ 宕机了,消息丢了,生产者收不到ack,还是回重发的。
消息队列持久化
-
Exchange 持久化:以 Direct 模式为例,将 durable 参数设置为 true。
-
Queue 持久化:将 durable 参数设置为 true,但是这样只能保证持久化 Queue 的元数据,但是不会持久化 Queue 里存储的消息。
-
消息持久化:发送消息的时候将deliveryMode设置为2,将消息设置为持久化的
-
SpringBoot中的rabbitTemplate默认设置消息是持久化,不需要手动配置
消息队列将消息转发到消费者
消费者在收到消息之后,还没来得及处理消息的消费逻辑,所在机器就宕机了,导致内存中的消息丢失。
解决方案
RabbitMQ 默认采用自动 ACK 机制,在没有处理业务逻辑之前,消费者就会告知消息队列已经成功收到消息,这种方式并不能解决这种问题
- 我们可以关闭自动ACK模式,通过一个 调用API接口就行,消费完消息后,再返回ACK。这样的话,如果你还没处理完,就没有ACK
- 这样 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。
以上是关于如何保证消息的可靠性传输的主要内容,如果未能解决你的问题,请参考以下文章
消息队列专题(高级特性篇):RabbitMQ 如何保证消息的可靠性投递传输和消费
消息队列专题(高级特性篇):RabbitMQ 如何保证消息的可靠性投递传输和消费