一文详解,RocketMQ事务消息

Posted Hollis Chuang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一文详解,RocketMQ事务消息相关的知识,希望对你有一定的参考价值。

在RocketMQ中有一个非常有用的功能,就是事务消息功能,事务消息机制,可以让我们确保发送的消息一定能写进MQ里,绝不会丢失掉。

MQ事务消息机制还是挺有用的,在业内还是比较常见的,所以今天我们就来分析下RocketMQ事务消息的原理。

1、发送half消息到MQ去,试探MQ是否正常

使用RocketMQ的事务消息,我们首先要发送一条half消息到MQ中去,这个half你可以理解为一个试探消息,这个时候消费者系统是看不见这个half消息的。

然后生产者就等待接收这个half消息写入成功的响应通知,我们看图1:

图1 生产者发送half消息

如果half消息写入失败了怎么办?

假如你MQ挂了,或者网络故障了,导致你half消息没有发送成功。这个时候,后续业务逻辑就可以不执行了。

half消息成功之后,我们就应该执行相应的业务逻辑,比如对数据库执行一些增删改操作。

图2 half消息成功,生产者执行本地业务逻辑

2、生产者本地事务执行失败了怎么办?

比如生产者增删改数据库的时候失败了,这个时候就要发送一条rollback请求给MQ,就是告诉MQ把我之前发给你的half消息给删除了,因为我自己出现问题了,没法执行后续的业务逻辑。

图3 生产者执行本地事务失败

请求MQ删除half消息后,生产者本地事务就回滚了,不执行后边的流程了。

3、生产者完成了本地事务

如果生产者完成了本地事务,此时你就可以发送一个commit消息给MQ要求MQ对之前的half消息进行commit。

图4 生产者发送commmit消息

消息是half的时候,消费者是看不到的,现在MQ收到了commit消息,就可以执行后边的流程了。

4、假如half消息发送成功了,生产者没收到响应怎么办?

MQ事务消息流程就是上面的,但是我们来进行比较严谨的分析,如果我们把half消息发送给MQ了,MQ给保存下来了,但是MQ返回给我们的响应生产者没收到,会怎么样?

这时候,生产者会误以为发送到MQ的half消息失败了,就不会执行后边的流程。

但MQ已经保存下来了一条half消息,这个消息怎么处理?

其实RocketMQ这里有一个补偿机制,他会去扫描自己处于half状态的消息,如果我们一直没有对这个消息执行commit或rollback操作,超过了一定的时间,他就会回调你的订单系统的一个接口,看看你这个消息什么情况,你生产者到底是打算commit这个消息,还是打算rollback这个消息?

图5 消息补偿机制

生产者收到MQ的回调请求,就会去查看下本地事务执行的结果,比如查询数据库数据状态。

5、rollback或者commit发送失败了呢,怎么办?

假如生产者收到half消息发送成功的消息了, 同时尝试执行自己本地事务,然后也执行了rollback或者commit,结果因为网络故障或其他原因,导致rollback或者commit请求发送失败了,怎么办?

这个时候MQ中的消息一直处于half状态,过了一定的超时时间就会发现这个half消息有问题,会回调你的生产者系统接口。

此时你要判断一下,如果本地事务执行成功了,那你就得再次执行commit请求,反之则再次执行rollback请求。

这个MQ的回调就是一个补偿机制,如果你的half消息响应没收到,或者rollback、commit请求没发送成功,MQ都会来找你询问后续如何处理。

再假设一种场景,如果生产者系统收到了half消息写入成功的响应了,同时尝试执行自己本地事务,然后根据失败或者成功去执行rollback或者commit请求,发送给MQ了。很不巧,mq在这个时候挂掉了,导致rollback或者commit请求发送失败,怎么办?

这种情况的话,那就等MQ自己重启了,重启之后他会扫描half消息,然后还是通过上面说到的补偿机制,去回调你的接口。

总结:

MQ事务机制都在那些环节保证了数据一定可以投递到MQ?

如果你的MQ有问题或者网络有问题,half消息根本发不出去,此时half消息肯定是失败的,那么生产者系统就不会执行后续流程了!

如果要是half消息发送出去了,但是half消息的响应都没收到,然后执行了回滚操作,那MQ会有补偿机制来回调找你询问要commit还是rollback,此时你选择rollback删除消息就可以了,不会执行后续流程!

如果要是生产者系统收到half消息了,结果自己更新数据库失败了,那么他也会进行回滚,不会执行后续流程了!

如果要是生产者系统收到half消息了,然后还更新自己数据库成功了,订单状态是“已完成”了,此时就必然会发送commit请求给MQ,一旦消息commit了,那么必然保证消费者系统可以收到这个消息!

即使你commit请求发送失败了,MQ也会有补偿机制,回调你接口让你判断是否重新发送commit请求。

总之,就是你的生产者系统只要成功了,那么必然要保证MQ里的消息是commit了可以让消费者系统看到他!

所以大家可以梳理下上面的流程,通过这套事务消息的机制,就可以保证我们的生产者系统一旦成功执行了数据库操作,生产者系统到MQ之间的消息传输是不会有丢失的问题了!

关于RocketMQ事务消息的代码,大家可以参考官方example。

有道无术,术可成;有术无道,止于术

欢迎大家关注Java之道公众号

好文章,我在看❤️

以上是关于一文详解,RocketMQ事务消息的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ 消息详解

一文详解RocketMQ的存储模型

RocketMQ事务消费和顺序消费详解

一文详解RocketMQ的存储模型

一文详解RocketMQ的存储模型

RocketMQ 事务消息 详解