RocketMQ事务消息篇之事务消息源码分析
Posted NetWhite
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ事务消息篇之事务消息源码分析相关的知识,希望对你有一定的参考价值。
前言
本文继前两篇对事务消息源码进行分析。
事务消息处理基本流程
在介绍事务消息的时候,画了一个简单的流程图说明事务消息的整体处理流程:
p.s. 下面的序号(1、2、3...)表示顺序,与上图中的(1、2、3...)无关。
- 事务生产者调用事务消息发送接口,发送消息
- 开始预提交阶段,客户端发送预消息并在请求头标记这是一条事务消息。消息体就是我们实际要发送的消息内容
- broker接收到消息,发现这是一条事务消息,于是将当前消息备份。所谓“备份”即将当前消息的所有数据写入内部的事务topic中而不是我们实际要发送的topic,该事务topic由于消费端并没有订阅,所以这条消息对消费端不可见,然后响应客户端的发送请求
- 客户端确认发送成功,则执行本地事务,并标记事务执行状态。如果发送失败,就不需要执行本地事务了,直接标记事务执行失败,需要回滚。
- 基于事务的执行状态,给本次发送事务消息的那个broker发送一条结束事务的请求(请求头里包含是提交还是回滚,亦或者是未知状态)
- broker收到事务结束的请求,如果是未知状态就打条日志直接返回了;如果是提交事务,就将备份的那条事务消息恢复过来,写入到原始的topic里,此时就对消费端可见了,然后要在op队列里(另一个内部topic)写入一条消息,消息体就是当前这条事务消息的队列偏移值。如果是回滚事务,就只用在op队列里写入一条消息即可,就不还原事务消息了,这样对消费端就不可见。关于op队列的具体作用,后面源码部分再详说。
- 说一下事务回查。事务回查就是broker扫描到那些没有提交也没回滚的消息,找到客户端,发一个请求,让客户端再次提交一下事务结束状态。
源码剖析
整体流程涉及的代码还是比较多的,接下来对每一部分的源码拆开进行分析。
客户端处理,事务执行流程
客户端处理基本流程如下:
源码的主要入口实现部分在这个方法里:
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException;
代码如下,我已经加上相关注释:
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
// 获取我们创建的事务消息监听器(本地事务执行及事务回查)
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
// 事务消息不支持延时消息
// ignore DelayTimeLevel parameter
if (msg.getDelayTimeLevel() != 0) {
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}
Validators.checkMessage(msg, this.defaultMQProducer);
SendResult sendResult = null;
// 设置半消息属性(TRAN_MSG),标志这是一个事务半消息
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
// 设置生产组属性(PGROUP)
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
// 消息发送,这里就是平常同步发送普通消息的默认发送流程了,在客户端这里,就已经没有太多其它额外的针对事务消息的处理了,主要还是会判断是半消息的话,打个事务消息的标记
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
// transactionId
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
// UNIQ_KEY,这是客户端发送的时侯生成的一个唯一ID,也就是我们平常用的sendResult里的msgId
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
// 在这种情况下,transactionId其实就是message的客户端msgId
msg.setTransactionId(transactionId);
}
// 一般,我平常使用的时候,不会采用localTransactionExecuter方式调用事务消息接口,所以这里一般是空
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
// 执行我们本地事务
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
// 未发送成功,回滚消息
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
try {
// 事务结束处理,是提交还是回滚还是要做其它操作
this.endTransaction(msg, sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
// 返回事务消息发送结果,这里已经返回本地事务执行状态了
return transactionSendResult;
}
关于上面调用事务结束请求的方法,具体代码及注释如下:
public void endTransaction(
final Message msg,
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
final MessageId id;
// getOffsetMsgId,这个是服务端的msgId,包含了不少消息的元信息
if (sendResult.getOffsetMsgId() != null) {
id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
} else {
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
}
String transactionId = sendResult.getTransactionId();
// 半消息发到了哪个broker上,最后提交也得到这个broker上
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
// 设置事务消息的提交偏移(提交到内部的事务topic上了)
requestHeader.setCommitLogOffset(id.getOffset());
switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}
doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
// 竟然还带本地事务异常信息
String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
// 2阶段执行的消息
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout());
}
通过查阅上面两个方法的代码基本对客户的事务消息发送部分,已经比较清楚了(事务回查的处理部分在后面)
broker端处理,接收事务半消息(预提交)
broker端在接收到事务消息的基本处理流程如下:
简单来说,事务消息也如普通消息一样发送到broker,broker像接收普通一样接收,接收到之后会判断是否有事务标记,有的话,就把这条消息的所有信息写入一个内部的事物topic里,来保证暂时对消费端不可见,关键源码如下(以异步写入为示例):
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
SendMessageContext mqtraceContext,
SendMessageRequestHeader requestHeader) {
final RemotingCommand response = preSend(ctx, request, requestHeader);
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
...
// 省略上面一部分代码,主要看下面判断这是一条事务消息
// 如果这个属性存在,说明是发送的事务消息
if (transFlag != null && Boolean.parseBoolean(transFlag)) {
// broker检查是否启用事务消息了
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return CompletableFuture.completedFuture(response);
}
// 从这里可心看到,事务消息是一个单独的流程处理,和其它消息不一样
putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {
putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}
return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
}
发现这是一条事务消息后,备份事务消息的代码如下:
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
// 会把消息的原始topic及队列信息存储到属性中,因为要写到事务topic的队列里,就是备份原消息
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
// 把事务标记也去掉
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
// 设置当前存储的消息的topic为:RMQ_SYS_TRANS_HALF_TOPIC, 事务半消息的topic
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
// 发送到队列0里, 这个topic也只有一条队列,另外还用到的一个topic是:RMQ_SYS_TRANS_OP_HALF_TOPIC,也是只有一条队列在每个broker上
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
然后就是将这条事务半消息如果普通消息一样写入到内部的事务topic里了
public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) {
// 见名知义,事务半消息处理
// 事务半消息存储完成,基本半消息发送(一阶段)已经算是结束了,在写入commitlog的时候,基本没有对这个事务topic做额外处理了,就像普通消息那样了
return store.asyncPutMessage(parseHalfMessageInner(messageInner));
}
broker端处理,事务结束
前面提到客户端在一阶段(发送事务半消息后,然后执行本地事务),会再根据事务执行状态给broker发送一条事务结束的请求,告诉broker是提交还是要回滚,基本流程如下:
事务处理上还是做了不少动作的,看一下它的关键源码实现:
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
RemotingCommandException {
// 事务消息结束(二阶段)处理
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final EndTransactionRequestHeader requestHeader =
(EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
LOGGER.debug("Transaction request:{}", requestHeader);
// 从节点是不允许处理事务消息的
if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
return response;
}
// 事务回查标记,是否为事务回查
if (requestHeader.getFromTransactionCheck()) {
switch (requestHeader.getCommitOrRollback()) {
case MessageSysFlag.TRANSACTION_NOT_TYPE: {
LOGGER.warn("Check producer[{}] transaction state, but it's pending status."
+ "RequestHeader: {} Remark: {}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.toString(),
request.getRemark());
return null;
}
case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
LOGGER.warn("Check producer[{}] transaction state, the producer commit the message."
+ "RequestHeader: {} Remark: {}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.toString(),
request.getRemark());
break;
}
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
LOGGER.warn("Check producer[{}] transaction state, the producer rollback the message."
+ "RequestHeader: {} Remark: {}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.toString(),
request.getRemark());
break;
}
default:
return null;
}
} else {
// 只是为了打条日志
switch (requestHeader.getCommitOrRollback()) {
// 本地事务执行状态返回的是UNKNOW,该回查了
case MessageSysFlag.TRANSACTION_NOT_TYPE: {
LOGGER.warn("The producer[{}] end transaction in sending message, and it's pending status."
+ "RequestHeader: {} Remark: {}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.toString(),
request.getRemark());
return null;
}
case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
break;
}
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
LOGGER.warn("The producer[{}] end transaction in sending message, rollback the message."
+ "RequestHeader: {} Remark: {}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.toString(),
request.getRemark());
break;
}
default:
return null;
}
}
OperationResult result = new OperationResult();
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
// 开始提交事务消息
// 这里就是根据之前提交的内部事务topic的半消息偏移,查出来提交的这条消息
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
// result.getPrepareMessage()就是之前提交到内部的事务topic上的那条半消息,检查下这条信息是否正确,日志偏移呀什么的是否匹配,是不是查错消息了
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
// 一切都OK了,准备提交事务,这里就是把原始消息信息,原原本本的恢复过来
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
// 清除事务消息属性
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
// 原始消息写入对应的topic,此时对消费端就可见了,可以正常消费了
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
// 删除的动作是在op队列(RMQ_SYS_TRANS_OP_HALF_TOPIC)写入该消息,tag是d,消息体是在事务topic里的消息偏移
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
// 事务回滚
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
}
/**
* 总结一下:
* commit:就是先把原始消息写入到原始topic里,然后删除半消息就是在op 的事务topic里写入一条tag为d的消息,消息体就是半消息的偏移值
* rollback: 就是直接删除过程了
* unknown: 就是上面 两步都没做,原始消息未写入,op队列里也没有
*/
// UNKNOW状态了,看来得回查了,其实这里返不返回都一样,客户端是one way调用
response.setCode(result.getResponseCode());
response.setRemark(result.getResponseRemark());
return response;
}
事务回查
前面的流程呢,都是事务的正常处理,但是如果客户端在发送事务请求的时候,宕机、重启、网络原因等,最终是导致事务结束的请求没有正确发送给broker处理,那就需要事务回查机制。
broker启动的时候,会启动一个定时任务(默认是1分钟),从前面提到的事务topic的队列里拉取消息,检查拉取到的消息是否已经处理过了(比如提交或回滚),如果没有,根据是否要进行事务回查,让客户再检查一下本地事务的执行状态并告诉broker或者丢弃。
其实这里涉及到几个关键问题需要明白:
- 写入到事务topic里的事务半消息在事务结束后进行删除,但是rocketmq是追加写的方式,所以这里的删除并不是从消息队列里真正的删除一条消息。
- broker怎么知道一条事务半消息是否已经提交或者回滚了,正如前面说的,这里引入一个op队列,即另一个内部topic,如果一条消息已经提交或回滚了,就向op队列里写入一条消息消息体就是在事务topic队列里的偏移值,如果op队列里没有,那就说明这条事务消息的状态还没有提交,还是未知的,可能需要事务回查。
- 我们知道写入到事务topic的事务半消息也如普通消息一样,是顺序写顺序读的,如果此时已经写入1、2、3、4、5、6共6条事务消息了,1、2、5的事务状态已经提交或者回滚了,但是3、4还是未知的,那总不能再重新回头消费吧。并没有,如果broker发现这条消息是未知状态的,那在处理的时候,把这条消息再追回写入到事务topic的队列里,然后找客户端回查。继续下一条消息处理,等到再处理到刚才重新追加的这条事务消息的时候,再从op队列里检查一下,这条事务半消息是否已经处理过了,如果还没有而且也没达到事务回查的最大次数,那就再追回写回去,再继续呗。如果已经达到最大次数,就丢弃(其实是写到另一个内部topic,也就是说事务消息这里用到了3个内部topic来存储数据)
关于事务回查这里主要采用文字描述说明了,就不再画一个流程图了,关键源码如下:
broker默认每分钟检查一次,从内部事务topic队列和op队列里拉取消息,然后比对,当前的事务半消息是否已经处理过了,是否需要回查:
// 定时检查事务消息(1分钟查一次)
@Override
public void check(long transactionTimeout, int transactionCheckMax,
AbstractTransactionalMessageCheckListener listener) {
try {
String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
if (msgQueues == null || msgQueues.size() == 0) {
log.warn("The queue of topic is empty :" + topic);
return;
}
log.debug("Check topic={}, queues={}", topic, msgQueues);
for (MessageQueue messageQueue : msgQueues) {
long startTime = System.currentTimeMillis();
// 一条预消息队列对应一个op队列(实际也就1条队列)
MessageQueue opQueue = getOpQueue(messageQueue);
// 获取事务topic和op topic的消费偏移
long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
if (halfOffset < 0 || opOffset < 0) {
log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,
halfOffset, opOffset);
continue;
}
List<Long> doneOpOffset = new ArrayList<>();
HashMap<Long, Long> removeMap = new HashMap<>();
PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
if (null == pullResult) {
log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",
messageQueue, halfOffset, opOffset);
continue;
}
// single thread
int getMessageNullCount = 1;
long newOffset = halfOffset;
long i = halfOffset;
while (true) {
if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
break;
}
// 已经处理过的,没必要再处理一次
if (removeMap.containsKey(i)) {
log.debug("Half offset {} has been committed/rolled back", i);
Long removedOpOffset = removeMap.remove(i);
// op的队列偏移
doneOpOffset.add(removedOpOffset);
} else {
// 获取当前要处理的half消息
GetResult getResult = getHalfMsg(messageQueue, i);
MessageExt msgExt = getResult.getMsg();
if (msgExt == null) {
if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
break;
}
if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,
messageQueue, getMessageNullCount, getResult.getPullResult());
break;
} else {
log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}",
i, messageQueue, getMessageNullCount, getResult.getPullResult());
i = getResult.getPullResult().getNextBeginOffset();
newOffset = i;
continue;
}
}
// 超过15次丢弃,或者消息过期了(超过了设置的文件保存时间)
if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
// 默认实现是移动到TRANS_CHECK_MAXTIME_TOPIC这个topic里
listener.resolveDiscardMsg(msgExt);
newOffset = i + 1;
i++;
continue;
}
if (msgExt.getStoreTimestamp() >= startTime) {
log.debug("Fresh stored. the miss offset={}, check it later, store={}", i,
new Date(msgExt.getStoreTimestamp()));
break;
}
long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
long checkImmunityTime = transactionTimeout;
// 未找到写入这个属性的地方(除了test)
String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
if (null != checkImmunityTimeStr) {
checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
if (valueOfCurrentMinusBorn < checkImmunityTime) {
// 超过这个检查时间,重新写回半消息队列
if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
newOffset = i + 1;
i++;
continue;
}
}
} else {
// 新提交的半消息,暂不处理,估计是认为事务也可能没执行完,处理也没意义
if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,
checkImmunityTime, new Date(msgExt.getBornTimestamp()));
break;
}
}
List<MessageExt> opMsg = pullResult.getMsgFoundList();
// checkImmunityTime默认是6秒,第一次可以检查的时间
// 正常来说,每条提交/回滚就是已经处理过的消息,在op队列里都有一条消息,如果没有(第一次回查),或者已经有了,但是存放时间已经满足检查条件了,都得回查
boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
|| (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
|| (valueOfCurrentMinusBorn <= -1);
if (isNeedCheck) {
// 把这个消息重新写回half队列里
if (!putBackHalfMsgQueue(msgExt, i)) {
continue;
}
// 事务回查,确认状态后,下次再处理上边这个写回的半消息
listener.resolveHalfMsg(msgExt);
} else {
pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
messageQueue, pullResult);
continue;
}
}
newOffset = i + 1;
i++;
}
if (newOffset != halfOffset) {
transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
}
//已经连接处理的偏移,如果2,3,4,6,7,则最偏移到4.
long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
if (newOpOffset != opOffset) {
transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
}
}
} catch (Throwable e) {
log.error("Check error", e);
}
// 总之,一条一条拉取,如果在op队列,就是已经commit或者rollback的,不用再管了,否则就检查是否需要回查,需要的话,这条写再写回half队列
关于如何从op队列里确认事务半消息已经处理过了,主要就是根据op队列里拉取的消息的消息体(保存的是事务半消息的偏移值)来判断当前偏移的事务消息是否已经处理过了:
/**
* Read op message, parse op message, and fill removeMap
*
* @param removeMap Half message to be remove, key:halfOffset, value: opOffset.
* @param opQueue Op message queue.
* @param pullOffsetOfOp The begin offset of op message queue.
* @param miniOffset The current minimum offset of half message queue.
* @param doneOpOffset Stored op messages that have been processed.
* @return Op message result.
*/
private PullResult fillOpRemoveMap(HashMap<Long, Long> removeMap,
MessageQueue opQueue, long pullOffsetOfOp, long miniOffset, List<Long> doneOpOffset) {
// 使用CID_RMQ_SYS_TRANS拉取op队列里的消息
PullResult pullResult = pullOpMsg(opQueue, pullOffsetOfOp, 32);
if (null == pullResult) {
return null;
}
if (pullResult.getPullStatus() == PullStatus.OFFSET_ILLEGAL
|| pullResult.getPullStatus() == PullStatus.NO_MATCHED_MSG) {
log.warn("The miss op offset={} in queue={} is illegal, pullResult={}", pullOffsetOfOp, opQueue,
pullResult);
transactionalMessageBridge.updateConsumeOffset(opQueue, pullResult.getNextBeginOffset());
return pullResult;
} else if (pullResult.getPullStatus() == PullStatus.NO_NEW_MSG) {
log.warn("The miss op offset={} in queue={} is NO_NEW_MSG, pullResult={}", pullOffsetOfOp, opQueue,
pullResult);
return pullResult;
}
List<MessageExt> opMsg = pullResult.getMsgFoundList();
if (opMsg == null) {
log.warn("The miss op offset={} in queue={} is empty, pullResult={}", pullOffsetOfOp, opQueue, pullResult);
return pullResult;
}
// 对拉取的消息做过滤处理,判断一下这些op消息对应的half消息是否处理过了
for (MessageExt opMessageExt : opMsg) {
// 记录这条op消息对应在事务队列里的偏移值
Long queueOffset = getLong(new String(opMessageExt.getBody(), TransactionalMessageUtil.charset));
log.debug("Topic: {} tags: {}, OpOffset: {}, HalfOffset: {}", opMessageExt.getTopic(),
opMessageExt.getTags(), opMessageExt.getQueueOffset(), queueOffset);
if (TransactionalMessageUtil.REMOVETAG.equals(opMessageExt.getTags())) {
// 找到的都是需要"删除"的半消息
// miniOffset就是halfOffset,将要消费的最小偏移 ,这是处理完成待删除的op消息
if (queueOffset < miniOffset) {
doneOpOffset.add(opMessageExt.getQueueOffset());
} else {
// op消息保存的是half消息的偏移,这个值竟然大于当前half消息的偏移,这是已经处理过的,不需要再处理了
removeMap.put(queueOffset, opMessageExt.getQueueOffset());
}
} else {
log.error("Found a illegal tag in opMessageExt= {} ", opMessageExt);
}
}
log.debug("Remove map: {}", removeMap);
log.debug("Done op list: {}", doneOpOffset);
return pullResult;
}
broker在该生产组下找到一个生产者客户端发送回查请求:
public void sendCheckMessage(MessageExt msgExt) throws Exception {
CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());
checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
msgExt.setStoreSize(0);
String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
// 根据生产组找到对应的生产者实例,发送一个回查请求
Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);
if (channel != null) {
brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
} else {
LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
}
}
客户端收到请求后,执行事务回查逻辑,并将事务状态发回broker:
// 事务反查
public RemotingCommand checkTransactionState(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final CheckTransactionStateRequestHeader requestHeader =
(CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
final MessageExt messageExt = MessageDecoder.decode(byteBuffer);
if (messageExt != null) {
if (StringUtils.isNotEmpty(this.mqClientFactory.getClientConfig().getNamespace())) {
messageExt.setTopic(NamespaceUtil
.withoutNamespace(messageExt.getTopic(), this.mqClientFactory.getClientConfig().getNamespace()));
}
String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
// 就是客户端 msg Id,如果用户没有自定义设置这个值
messageExt.setTransactionId(transactionId);
}
final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
if (group != null) {
MQProducerInner producer = this.mqClientFactory.selectProducer(group);
if (producer != null) {
final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
producer.checkTransactionState(addr, messageExt, requestHeader);
} else {
log.debug("checkTransactionState, pick producer by group[{}] failed", group);
}
} else {
log.warn("checkTransactionState, pick producer group failed");
}
} else {
log.warn("checkTransactionState, decode message failed");
}
return null;
}
结语
关于rocketmq事务消息篇到此结束,后续如果有需要会再进行补充。
以上是关于RocketMQ事务消息篇之事务消息源码分析的主要内容,如果未能解决你的问题,请参考以下文章
RocketMQ源码分析之RocketMQ事务消息实现原理中篇----事务消息状态回查