RocketMQ 事务消息 详解
Posted 小王曾是少年
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ 事务消息 详解相关的知识,希望对你有一定的参考价值。
🍊 Java学习:Java从入门到精通总结
🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想
🍊 绝对不一样的职场干货:大厂最佳实践经验指南
📆 最近更新:2023年4月9日
🍊 个人简介:通信工程本硕 for NJU💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是CRUD
🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!
文章目录
事务消息发送流程
半消息实现了分布式环境下的数据一致性的处理,生产者发送事务消息的流程如上图所示,通过对源码的学习,我们可以弄清楚下面几点,也是半消息机制的核心:
- 为什么prepare消息不会被
Consumer
消费? - 事务消息是如何提交和回滚的?
- 定时回查本地事务状态的实现细节。
发送事务消息源码分析
发送事务消息方法TransactionMQProducer.sendMessageInTransaction
:
msg
:消息tranExecuter
:本地事务执行器arg
:本地事务执行器参数
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener)
throw new MQClientException("tranExecutor is null", null);
// 忽视消息延迟的属性
if (msg.getDelayTimeLevel() != 0)
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
Validators.checkMessage(msg, this.defaultMQProducer);
// 发送半消息
SendResult sendResult = null;
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try
sendResult = this.send(msg);
catch (Exception e)
throw new MQClientException("send message Exception", e);
// 处理发送半消息的结果
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus())
// 发送半消息成功,执行本地事务逻辑
case SEND_OK:
try
if (sendResult.getTransactionId() != null)
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId))
msg.setTransactionId(transactionId);
// 执行本地事务逻辑
if (null != localTransactionExecuter)
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
else if (transactionListener != null)
log.debug("Used new transaction API");
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
if (null == localTransactionState)
localTransactionState = LocalTransactionState.UNKNOW;
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE)
log.info("executeLocalTransactionBranch return ", localTransactionState);
log.info(msg.toString());
catch (Throwable e)
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
break;
// 发送半消息失败,标记本地事务状态为回滚
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
// 结束事务,设置消息 COMMIT / ROLLBACK
try
this.endTransaction(msg, sendResult, localTransactionState, localException);
catch (Exception e)
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
// 返回事务发送结果
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
// 提取Prepared消息的uniqID
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
该方法的入参包含有一个需要用户实现本地事务的LocalTransactionExecuter executer
,executer
中会进行事务操作以保证本地事务和消息发送这两个操作的原子性。
由上面的源码可知:
Producer
会首先发送一个半消息到Broker
中:
- 半消息发送成功,执行事务
- 半消息发送失败,不执行事务
半消息发送到Broker
后不会被Consumer
消费掉的原因有以下两点:
Broker
在将消息写入CommitLog
时会判断消息类型,如果是prepare
或者rollback
消息,ConsumeQueue
的offset
不变Broker
在构造ConsumeQueue
时会判断是否是处于prepare
或者rollback
状态的消息,如果是则不会将该消息放入ConsumeQueue
里,Consumer
在拉取消息时也就不会拉取到这条消息
Producer
会根据半消息的发送结果和本地任务执行结果来决定如何处理事务(commit
或rollback
),方法最后调用了endTransaction
来处理事务的执行结果,源码如下:
sendResult
:发送半消息的结果localTransactionState
:本地事务状态localException
:执行本地事务逻辑产生的异常RemotingException
:远程调用异常MQBrokerException
:Broker
异常InterruptedException
:当线程中断异常UnknownHostException
:未知host异常
public void endTransaction(
final Message msg,
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException
// 解码消息id
final MessageId id;
if (sendResult.getOffsetMsgId() != null)
id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
else
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
// 创建请求
String transactionId = sendResult.getTransactionId();
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
switch (localTransactionState)
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
// 提交 commit / rollback 消息
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout());
该方法是将事务执行的结果发送给Broker
,再由Broker
决定是否进行消息投递,执行步骤如下:
- 收到消息后先检查是否是事务消息,如果不是事务消息则直接返回
- 根据请求头里的
offset
查询半消息,如果查询结果为空则直接返回 - 根据半消息构造新消息,新构造的消息会被重新写入到
CommitLog
里,rollback
消息的消息体为空 - 如果是
rollback
消息,则该消息不会被投递
具体原因上文中已经分析过:只有
commit
消息才会被Broker
投递给consumer
RocketMQ会将
commit
消息和rollback
消息都写入到commitLog
里,但rollback
消息的消息体为空且不会被投递,CommitLog
在删除过期消息时才会将其删除。当事务commit
成功之后,RocketMQ会重新封装半消息并将其投递给Consumer
端消费。
事务消息回查
Broker发起
相较于普通消息,事务消息主要依赖下面三个类:
TransactionStateService
:事务状态服务,负责对事务消息进行管理,包括存储和更新事务消息状态、回查状态等TranStateTable
:事务消息状态存储表,基于MappedFileQueue
实现TranRedoLog
:TranStateTable
的日志,每次写入操作都会记录日志,当Broker
宕机时,可以利用这个文件做数据恢复
存储半消息到CommitLog
时,使用offset
索引到对应的TranStateTable
的位置
一文详解,RocketMQ事务消息
在RocketMQ中有一个非常有用的功能,就是事务消息功能,事务消息机制,可以让我们确保发送的消息一定能写进MQ里,绝不会丢失掉。
MQ事务消息机制还是挺有用的,在业内还是比较常见的,所以今天我们就来分析下RocketMQ事务消息的原理。
1、发送half消息到MQ去,试探MQ是否正常
使用RocketMQ的事务消息,我们首先要发送一条half消息到MQ中去,这个half你可以理解为一个试探消息,这个时候消费者系统是看不见这个half消息的。
然后生产者就等待接收这个half消息写入成功的响应通知,我们看图1:
图1 生产者发送half消息
如果half消息写入失败了怎么办?
假如你MQ挂了,或者网络故障了,导致你half消息没有发送成功。这个时候,后续业务逻辑就可以不执行了。
half消息成功之后,我们就应该执行相应的业务逻辑,比如对数据库执行一些增删改操作。
图2 half消息成功,生产者执行本地业务逻辑
2、生产者本地事务执行失败了怎么办?
比如生产者增删改数据库的时候失败了,这个时候就要发送一条rollback请求给MQ,就是告诉MQ把我之前发给你的half消息给删除了,因为我自己出现问题了,没法执行后续的业务逻辑。
图3 生产者执行本地事务失败
请求MQ删除half消息后,生产者本地事务就回滚了,不执行后边的流程了。
3、生产者完成了本地事务
如果生产者完成了本地事务,此时你就可以发送一个commit消息给MQ要求MQ对之前的half消息进行commit。
图4 生产者发送commmit消息
消息是half的时候,消费者是看不到的,现在MQ收到了commit消息,就可以执行后边的流程了。
4、假如half消息发送成功了,生产者没收到响应怎么办?
MQ事务消息流程就是上面的,但是我们来进行比较严谨的分析,如果我们把half消息发送给MQ了,MQ给保存下来了,但是MQ返回给我们的响应生产者没收到,会怎么样?
这时候,生产者会误以为发送到MQ的half消息失败了,就不会执行后边的流程。
但MQ已经保存下来了一条half消息,这个消息怎么处理?
其实RocketMQ这里有一个补偿机制,他会去扫描自己处于half状态的消息,如果我们一直没有对这个消息执行commit或rollback操作,超过了一定的时间,他就会回调你的订单系统的一个接口,看看你这个消息什么情况,你生产者到底是打算commit这个消息,还是打算rollback这个消息?
图5 消息补偿机制
生产者收到MQ的回调请求,就会去查看下本地事务执行的结果,比如查询数据库数据状态。
5、rollback或者commit发送失败了呢,怎么办?
假如生产者收到half消息发送成功的消息了, 同时尝试执行自己本地事务,然后也执行了rollback或者commit,结果因为网络故障或其他原因,导致rollback或者commit请求发送失败了,怎么办?
这个时候MQ中的消息一直处于half状态,过了一定的超时时间就会发现这个half消息有问题,会回调你的生产者系统接口。
此时你要判断一下,如果本地事务执行成功了,那你就得再次执行commit请求,反之则再次执行rollback请求。
这个MQ的回调就是一个补偿机制,如果你的half消息响应没收到,或者rollback、commit请求没发送成功,MQ都会来找你询问后续如何处理。
再假设一种场景,如果生产者系统收到了half消息写入成功的响应了,同时尝试执行自己本地事务,然后根据失败或者成功去执行rollback或者commit请求,发送给MQ了。很不巧,mq在这个时候挂掉了,导致rollback或者commit请求发送失败,怎么办?
这种情况的话,那就等MQ自己重启了,重启之后他会扫描half消息,然后还是通过上面说到的补偿机制,去回调你的接口。
总结:
MQ事务机制都在那些环节保证了数据一定可以投递到MQ?
如果你的MQ有问题或者网络有问题,half消息根本发不出去,此时half消息肯定是失败的,那么生产者系统就不会执行后续流程了!
如果要是half消息发送出去了,但是half消息的响应都没收到,然后执行了回滚操作,那MQ会有补偿机制来回调找你询问要commit还是rollback,此时你选择rollback删除消息就可以了,不会执行后续流程!
如果要是生产者系统收到half消息了,结果自己更新数据库失败了,那么他也会进行回滚,不会执行后续流程了!
如果要是生产者系统收到half消息了,然后还更新自己数据库成功了,订单状态是“已完成”了,此时就必然会发送commit请求给MQ,一旦消息commit了,那么必然保证消费者系统可以收到这个消息!
即使你commit请求发送失败了,MQ也会有补偿机制,回调你接口让你判断是否重新发送commit请求。
总之,就是你的生产者系统只要成功了,那么必然要保证MQ里的消息是commit了可以让消费者系统看到他!
所以大家可以梳理下上面的流程,通过这套事务消息的机制,就可以保证我们的生产者系统一旦成功执行了数据库操作,生产者系统到MQ之间的消息传输是不会有丢失的问题了!
关于RocketMQ事务消息的代码,大家可以参考官方example。
有道无术,术可成;有术无道,止于术
欢迎大家关注Java之道公众号
好文章,我在看❤️
以上是关于RocketMQ 事务消息 详解的主要内容,如果未能解决你的问题,请参考以下文章