RocketMQ事务消息篇之事务消息源码分析

Posted NetWhite

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ事务消息篇之事务消息源码分析相关的知识,希望对你有一定的参考价值。

前言

RocketMQ事务消息篇(一)之事务消息的介绍

RocketMQ事务消息篇(二)之事务消息的使用

本文继前两篇对事务消息源码进行分析。

事务消息处理基本流程

在介绍事务消息的时候,画了一个简单的流程图说明事务消息的整体处理流程:

p.s. 下面的序号(1、2、3...)表示顺序,与上图中的(1、2、3...)无关。

  1. 事务生产者调用事务消息发送接口,发送消息
  2. 开始预提交阶段,客户端发送预消息并在请求头标记这是一条事务消息。消息体就是我们实际要发送的消息内容
  3. broker接收到消息,发现这是一条事务消息,于是将当前消息备份。所谓“备份”即将当前消息的所有数据写入内部的事务topic中而不是我们实际要发送的topic,该事务topic由于消费端并没有订阅,所以这条消息对消费端不可见,然后响应客户端的发送请求
  4. 客户端确认发送成功,则执行本地事务,并标记事务执行状态。如果发送失败,就不需要执行本地事务了,直接标记事务执行失败,需要回滚。
  5. 基于事务的执行状态,给本次发送事务消息的那个broker发送一条结束事务的请求(请求头里包含是提交还是回滚,亦或者是未知状态)
  6. broker收到事务结束的请求,如果是未知状态就打条日志直接返回了;如果是提交事务,就将备份的那条事务消息恢复过来,写入到原始的topic里,此时就对消费端可见了,然后要在op队列里(另一个内部topic)写入一条消息,消息体就是当前这条事务消息的队列偏移值。如果是回滚事务,就只用在op队列里写入一条消息即可,就不还原事务消息了,这样对消费端就不可见。关于op队列的具体作用,后面源码部分再详说。
  7. 说一下事务回查。事务回查就是broker扫描到那些没有提交也没回滚的消息,找到客户端,发一个请求,让客户端再次提交一下事务结束状态。

源码剖析

整体流程涉及的代码还是比较多的,接下来对每一部分的源码拆开进行分析。

客户端处理,事务执行流程

客户端处理基本流程如下:

源码的主要入口实现部分在这个方法里:

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException;

 代码如下,我已经加上相关注释:

    public TransactionSendResult sendMessageInTransaction(final Message msg,
        final LocalTransactionExecuter localTransactionExecuter, final Object arg)
        throws MQClientException {
        // 获取我们创建的事务消息监听器(本地事务执行及事务回查)
        TransactionListener transactionListener = getCheckListener();
        if (null == localTransactionExecuter && null == transactionListener) {
            throw new MQClientException("tranExecutor is null", null);
        }

        // 事务消息不支持延时消息
        // ignore DelayTimeLevel parameter
        if (msg.getDelayTimeLevel() != 0) {
            MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
        }

        Validators.checkMessage(msg, this.defaultMQProducer);

        SendResult sendResult = null;
        // 设置半消息属性(TRAN_MSG),标志这是一个事务半消息
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
        // 设置生产组属性(PGROUP)
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
        try {
            // 消息发送,这里就是平常同步发送普通消息的默认发送流程了,在客户端这里,就已经没有太多其它额外的针对事务消息的处理了,主要还是会判断是半消息的话,打个事务消息的标记
            sendResult = this.send(msg);
        } catch (Exception e) {
            throw new MQClientException("send message Exception", e);
        }

        LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
        Throwable localException = null;
        switch (sendResult.getSendStatus()) {
            case SEND_OK: {
                try {
                    // transactionId
                    if (sendResult.getTransactionId() != null) {
                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                    }
                    // UNIQ_KEY,这是客户端发送的时侯生成的一个唯一ID,也就是我们平常用的sendResult里的msgId
                    String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                    if (null != transactionId && !"".equals(transactionId)) {
                        // 在这种情况下,transactionId其实就是message的客户端msgId
                        msg.setTransactionId(transactionId);
                    }
                    // 一般,我平常使用的时候,不会采用localTransactionExecuter方式调用事务消息接口,所以这里一般是空
                    if (null != localTransactionExecuter) {
                        localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                    } else if (transactionListener != null) {
                        log.debug("Used new transaction API");
                        // 执行我们本地事务
                        localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                    }
                    if (null == localTransactionState) {
                        localTransactionState = LocalTransactionState.UNKNOW;
                    }

                    if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                        log.info("executeLocalTransactionBranch return {}", localTransactionState);
                        log.info(msg.toString());
                    }
                } catch (Throwable e) {
                    log.info("executeLocalTransactionBranch exception", e);
                    log.info(msg.toString());
                    localException = e;
                }
            }
            break;
            // 未发送成功,回滚消息
            case FLUSH_DISK_TIMEOUT:
            case FLUSH_SLAVE_TIMEOUT:
            case SLAVE_NOT_AVAILABLE:
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                break;
            default:
                break;
        }

        try {
            // 事务结束处理,是提交还是回滚还是要做其它操作
            this.endTransaction(msg, sendResult, localTransactionState, localException);
        } catch (Exception e) {
            log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
        }

        TransactionSendResult transactionSendResult = new TransactionSendResult();
        transactionSendResult.setSendStatus(sendResult.getSendStatus());
        transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
        transactionSendResult.setMsgId(sendResult.getMsgId());
        transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
        transactionSendResult.setTransactionId(sendResult.getTransactionId());
        transactionSendResult.setLocalTransactionState(localTransactionState);
        // 返回事务消息发送结果,这里已经返回本地事务执行状态了
        return transactionSendResult;
    }

关于上面调用事务结束请求的方法,具体代码及注释如下:

    public void endTransaction(
        final Message msg,
        final SendResult sendResult,
        final LocalTransactionState localTransactionState,
        final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
        final MessageId id;
        // getOffsetMsgId,这个是服务端的msgId,包含了不少消息的元信息
        if (sendResult.getOffsetMsgId() != null) {
            id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
        } else {
            id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
        }
        String transactionId = sendResult.getTransactionId();
        // 半消息发到了哪个broker上,最后提交也得到这个broker上
        final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
        EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
        requestHeader.setTransactionId(transactionId);
        // 设置事务消息的提交偏移(提交到内部的事务topic上了)
        requestHeader.setCommitLogOffset(id.getOffset());
        switch (localTransactionState) {
            case COMMIT_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                break;
            case ROLLBACK_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                break;
            case UNKNOW:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                break;
            default:
                break;
        }

        doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);
        requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
        requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
        requestHeader.setMsgId(sendResult.getMsgId());
        // 竟然还带本地事务异常信息
        String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
        // 2阶段执行的消息
        this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
            this.defaultMQProducer.getSendMsgTimeout());
    }

通过查阅上面两个方法的代码基本对客户的事务消息发送部分,已经比较清楚了(事务回查的处理部分在后面) 

broker端处理,接收事务半消息(预提交)

broker端在接收到事务消息的基本处理流程如下:

简单来说,事务消息也如普通消息一样发送到broker,broker像接收普通一样接收,接收到之后会判断是否有事务标记,有的话,就把这条消息的所有信息写入一个内部的事物topic里,来保证暂时对消费端不可见,关键源码如下(以异步写入为示例):

    private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
                                                                SendMessageContext mqtraceContext,
                                                                SendMessageRequestHeader requestHeader) {
        final RemotingCommand response = preSend(ctx, request, requestHeader);
        final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();

        ...
        // 省略上面一部分代码,主要看下面判断这是一条事务消息
        // 如果这个属性存在,说明是发送的事务消息
        if (transFlag != null && Boolean.parseBoolean(transFlag)) {
            // broker检查是否启用事务消息了
            if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
                response.setCode(ResponseCode.NO_PERMISSION);
                response.setRemark(
                        "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                                + "] sending transaction message is forbidden");
                return CompletableFuture.completedFuture(response);
            }
            // 从这里可心看到,事务消息是一个单独的流程处理,和其它消息不一样
            putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
        } else {
            putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
        }
        return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
    }

 发现这是一条事务消息后,备份事务消息的代码如下:

    private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
        // 会把消息的原始topic及队列信息存储到属性中,因为要写到事务topic的队列里,就是备份原消息
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
            String.valueOf(msgInner.getQueueId()));
        // 把事务标记也去掉
        msgInner.setSysFlag(
            MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
        // 设置当前存储的消息的topic为:RMQ_SYS_TRANS_HALF_TOPIC, 事务半消息的topic
        msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
        // 发送到队列0里, 这个topic也只有一条队列,另外还用到的一个topic是:RMQ_SYS_TRANS_OP_HALF_TOPIC,也是只有一条队列在每个broker上
        msgInner.setQueueId(0);
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
        return msgInner;
    }

然后就是将这条事务半消息如果普通消息一样写入到内部的事务topic里了

    public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) {
        // 见名知义,事务半消息处理
        // 事务半消息存储完成,基本半消息发送(一阶段)已经算是结束了,在写入commitlog的时候,基本没有对这个事务topic做额外处理了,就像普通消息那样了
        return store.asyncPutMessage(parseHalfMessageInner(messageInner));
    }

broker端处理,事务结束

前面提到客户端在一阶段(发送事务半消息后,然后执行本地事务),会再根据事务执行状态给broker发送一条事务结束的请求,告诉broker是提交还是要回滚,基本流程如下:

事务处理上还是做了不少动作的,看一下它的关键源码实现:

    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
        RemotingCommandException {
        // 事务消息结束(二阶段)处理
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        final EndTransactionRequestHeader requestHeader =
            (EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
        LOGGER.debug("Transaction request:{}", requestHeader);
        // 从节点是不允许处理事务消息的
        if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
            response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
            LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
            return response;
        }

        // 事务回查标记,是否为事务回查
        if (requestHeader.getFromTransactionCheck()) {
            switch (requestHeader.getCommitOrRollback()) {
                case MessageSysFlag.TRANSACTION_NOT_TYPE: {
                    LOGGER.warn("Check producer[{}] transaction state, but it's pending status."
                            + "RequestHeader: {} Remark: {}",
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                        requestHeader.toString(),
                        request.getRemark());
                    return null;
                }

                case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
                    LOGGER.warn("Check producer[{}] transaction state, the producer commit the message."
                            + "RequestHeader: {} Remark: {}",
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                        requestHeader.toString(),
                        request.getRemark());

                    break;
                }

                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
                    LOGGER.warn("Check producer[{}] transaction state, the producer rollback the message."
                            + "RequestHeader: {} Remark: {}",
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                        requestHeader.toString(),
                        request.getRemark());
                    break;
                }
                default:
                    return null;
            }
        } else {
            // 只是为了打条日志
            switch (requestHeader.getCommitOrRollback()) {
                // 本地事务执行状态返回的是UNKNOW,该回查了
                case MessageSysFlag.TRANSACTION_NOT_TYPE: {
                    LOGGER.warn("The producer[{}] end transaction in sending message,  and it's pending status."
                            + "RequestHeader: {} Remark: {}",
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                        requestHeader.toString(),
                        request.getRemark());
                    return null;
                }

                case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
                    break;
                }

                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
                    LOGGER.warn("The producer[{}] end transaction in sending message, rollback the message."
                            + "RequestHeader: {} Remark: {}",
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                        requestHeader.toString(),
                        request.getRemark());
                    break;
                }
                default:
                    return null;
            }
        }
        OperationResult result = new OperationResult();
        if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
            // 开始提交事务消息
            // 这里就是根据之前提交的内部事务topic的半消息偏移,查出来提交的这条消息
            result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
            if (result.getResponseCode() == ResponseCode.SUCCESS) {
                // result.getPrepareMessage()就是之前提交到内部的事务topic上的那条半消息,检查下这条信息是否正确,日志偏移呀什么的是否匹配,是不是查错消息了
                RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
                if (res.getCode() == ResponseCode.SUCCESS) {
                    // 一切都OK了,准备提交事务,这里就是把原始消息信息,原原本本的恢复过来
                    MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
                    msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
                    msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
                    msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
                    msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
                    // 清除事务消息属性
                    MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
                    // 原始消息写入对应的topic,此时对消费端就可见了,可以正常消费了
                    RemotingCommand sendResult = sendFinalMessage(msgInner);
                    if (sendResult.getCode() == ResponseCode.SUCCESS) {
                        // 删除的动作是在op队列(RMQ_SYS_TRANS_OP_HALF_TOPIC)写入该消息,tag是d,消息体是在事务topic里的消息偏移
                        this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                    }
                    return sendResult;
                }
                return res;
            }
        } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
            // 事务回滚
            result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
            if (result.getResponseCode() == ResponseCode.SUCCESS) {
                RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
                if (res.getCode() == ResponseCode.SUCCESS) {
                    this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                }
                return res;
            }
        }
        /**
         * 总结一下:
         * commit:就是先把原始消息写入到原始topic里,然后删除半消息就是在op 的事务topic里写入一条tag为d的消息,消息体就是半消息的偏移值
         * rollback: 就是直接删除过程了
         * unknown: 就是上面 两步都没做,原始消息未写入,op队列里也没有
         */
        // UNKNOW状态了,看来得回查了,其实这里返不返回都一样,客户端是one way调用
        response.setCode(result.getResponseCode());
        response.setRemark(result.getResponseRemark());
        return response;
    }

事务回查

前面的流程呢,都是事务的正常处理,但是如果客户端在发送事务请求的时候,宕机、重启、网络原因等,最终是导致事务结束的请求没有正确发送给broker处理,那就需要事务回查机制。

broker启动的时候,会启动一个定时任务(默认是1分钟),从前面提到的事务topic的队列里拉取消息,检查拉取到的消息是否已经处理过了(比如提交或回滚),如果没有,根据是否要进行事务回查,让客户再检查一下本地事务的执行状态并告诉broker或者丢弃。

其实这里涉及到几个关键问题需要明白:

  1. 写入到事务topic里的事务半消息在事务结束后进行删除,但是rocketmq是追加写的方式,所以这里的删除并不是从消息队列里真正的删除一条消息。
  2. broker怎么知道一条事务半消息是否已经提交或者回滚了,正如前面说的,这里引入一个op队列,即另一个内部topic,如果一条消息已经提交或回滚了,就向op队列里写入一条消息消息体就是在事务topic队列里的偏移值,如果op队列里没有,那就说明这条事务消息的状态还没有提交,还是未知的,可能需要事务回查。
  3. 我们知道写入到事务topic的事务半消息也如普通消息一样,是顺序写顺序读的,如果此时已经写入1、2、3、4、5、6共6条事务消息了,1、2、5的事务状态已经提交或者回滚了,但是3、4还是未知的,那总不能再重新回头消费吧。并没有,如果broker发现这条消息是未知状态的,那在处理的时候,把这条消息再追回写入到事务topic的队列里,然后找客户端回查。继续下一条消息处理,等到再处理到刚才重新追加的这条事务消息的时候,再从op队列里检查一下,这条事务半消息是否已经处理过了,如果还没有而且也没达到事务回查的最大次数,那就再追回写回去,再继续呗。如果已经达到最大次数,就丢弃(其实是写到另一个内部topic,也就是说事务消息这里用到了3个内部topic来存储数据)

关于事务回查这里主要采用文字描述说明了,就不再画一个流程图了,关键源码如下:

broker默认每分钟检查一次,从内部事务topic队列和op队列里拉取消息,然后比对,当前的事务半消息是否已经处理过了,是否需要回查:

    // 定时检查事务消息(1分钟查一次)
    @Override
    public void check(long transactionTimeout, int transactionCheckMax,
        AbstractTransactionalMessageCheckListener listener) {
        try {
            String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
            Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
            if (msgQueues == null || msgQueues.size() == 0) {
                log.warn("The queue of topic is empty :" + topic);
                return;
            }
            log.debug("Check topic={}, queues={}", topic, msgQueues);
            for (MessageQueue messageQueue : msgQueues) {
                long startTime = System.currentTimeMillis();
                // 一条预消息队列对应一个op队列(实际也就1条队列)
                MessageQueue opQueue = getOpQueue(messageQueue);
                // 获取事务topic和op topic的消费偏移
                long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
                long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
                log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
                if (halfOffset < 0 || opOffset < 0) {
                    log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,
                        halfOffset, opOffset);
                    continue;
                }

                List<Long> doneOpOffset = new ArrayList<>();
                HashMap<Long, Long> removeMap = new HashMap<>();
                PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
                if (null == pullResult) {
                    log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",
                        messageQueue, halfOffset, opOffset);
                    continue;
                }
                // single thread
                int getMessageNullCount = 1;
                long newOffset = halfOffset;
                long i = halfOffset;
                while (true) {
                    if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
                        log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
                        break;
                    }
                    // 已经处理过的,没必要再处理一次
                    if (removeMap.containsKey(i)) {
                        log.debug("Half offset {} has been committed/rolled back", i);
                        Long removedOpOffset = removeMap.remove(i);
                        // op的队列偏移
                        doneOpOffset.add(removedOpOffset);
                    } else {
                        // 获取当前要处理的half消息
                        GetResult getResult = getHalfMsg(messageQueue, i);
                        MessageExt msgExt = getResult.getMsg();
                        if (msgExt == null) {
                            if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
                                break;
                            }
                            if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
                                log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,
                                    messageQueue, getMessageNullCount, getResult.getPullResult());
                                break;
                            } else {
                                log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}",
                                    i, messageQueue, getMessageNullCount, getResult.getPullResult());
                                i = getResult.getPullResult().getNextBeginOffset();
                                newOffset = i;
                                continue;
                            }
                        }

                        // 超过15次丢弃,或者消息过期了(超过了设置的文件保存时间)
                        if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
                            // 默认实现是移动到TRANS_CHECK_MAXTIME_TOPIC这个topic里
                            listener.resolveDiscardMsg(msgExt);
                            newOffset = i + 1;
                            i++;
                            continue;
                        }
                        if (msgExt.getStoreTimestamp() >= startTime) {
                            log.debug("Fresh stored. the miss offset={}, check it later, store={}", i,
                                new Date(msgExt.getStoreTimestamp()));
                            break;
                        }

                        long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
                        long checkImmunityTime = transactionTimeout;
                        // 未找到写入这个属性的地方(除了test)
                        String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
                        if (null != checkImmunityTimeStr) {
                            checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
                            if (valueOfCurrentMinusBorn < checkImmunityTime) {
                                // 超过这个检查时间,重新写回半消息队列
                                if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
                                    newOffset = i + 1;
                                    i++;
                                    continue;
                                }
                            }
                        } else {
                            // 新提交的半消息,暂不处理,估计是认为事务也可能没执行完,处理也没意义
                            if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
                                log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,
                                    checkImmunityTime, new Date(msgExt.getBornTimestamp()));
                                break;
                            }
                        }
                        List<MessageExt> opMsg = pullResult.getMsgFoundList();
                        // checkImmunityTime默认是6秒,第一次可以检查的时间
                        // 正常来说,每条提交/回滚就是已经处理过的消息,在op队列里都有一条消息,如果没有(第一次回查),或者已经有了,但是存放时间已经满足检查条件了,都得回查
                        boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
                            || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
                            || (valueOfCurrentMinusBorn <= -1);

                        if (isNeedCheck) {
                            // 把这个消息重新写回half队列里
                            if (!putBackHalfMsgQueue(msgExt, i)) {
                                continue;
                            }
                            // 事务回查,确认状态后,下次再处理上边这个写回的半消息
                            listener.resolveHalfMsg(msgExt);
                        } else {
                            pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
                            log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
                                messageQueue, pullResult);
                            continue;
                        }
                    }
                    newOffset = i + 1;
                    i++;
                }
                if (newOffset != halfOffset) {
                    transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
                }
                //已经连接处理的偏移,如果2,3,4,6,7,则最偏移到4.
                long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
                if (newOpOffset != opOffset) {
                    transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
                }
            }
        } catch (Throwable e) {
            log.error("Check error", e);
        }

        // 总之,一条一条拉取,如果在op队列,就是已经commit或者rollback的,不用再管了,否则就检查是否需要回查,需要的话,这条写再写回half队列

关于如何从op队列里确认事务半消息已经处理过了,主要就是根据op队列里拉取的消息的消息体(保存的是事务半消息的偏移值)来判断当前偏移的事务消息是否已经处理过了:

    /**
     * Read op message, parse op message, and fill removeMap
     *
     * @param removeMap Half message to be remove, key:halfOffset, value: opOffset.
     * @param opQueue Op message queue.
     * @param pullOffsetOfOp The begin offset of op message queue.
     * @param miniOffset The current minimum offset of half message queue.
     * @param doneOpOffset Stored op messages that have been processed.
     * @return Op message result.
     */
    private PullResult fillOpRemoveMap(HashMap<Long, Long> removeMap,
        MessageQueue opQueue, long pullOffsetOfOp, long miniOffset, List<Long> doneOpOffset) {
        // 使用CID_RMQ_SYS_TRANS拉取op队列里的消息
        PullResult pullResult = pullOpMsg(opQueue, pullOffsetOfOp, 32);
        if (null == pullResult) {
            return null;
        }
        if (pullResult.getPullStatus() == PullStatus.OFFSET_ILLEGAL
            || pullResult.getPullStatus() == PullStatus.NO_MATCHED_MSG) {
            log.warn("The miss op offset={} in queue={} is illegal, pullResult={}", pullOffsetOfOp, opQueue,
                pullResult);
            transactionalMessageBridge.updateConsumeOffset(opQueue, pullResult.getNextBeginOffset());
            return pullResult;
        } else if (pullResult.getPullStatus() == PullStatus.NO_NEW_MSG) {
            log.warn("The miss op offset={} in queue={} is NO_NEW_MSG, pullResult={}", pullOffsetOfOp, opQueue,
                pullResult);
            return pullResult;
        }
        List<MessageExt> opMsg = pullResult.getMsgFoundList();
        if (opMsg == null) {
            log.warn("The miss op offset={} in queue={} is empty, pullResult={}", pullOffsetOfOp, opQueue, pullResult);
            return pullResult;
        }
        // 对拉取的消息做过滤处理,判断一下这些op消息对应的half消息是否处理过了
        for (MessageExt opMessageExt : opMsg) {
            // 记录这条op消息对应在事务队列里的偏移值
            Long queueOffset = getLong(new String(opMessageExt.getBody(), TransactionalMessageUtil.charset));
            log.debug("Topic: {} tags: {}, OpOffset: {}, HalfOffset: {}", opMessageExt.getTopic(),
                opMessageExt.getTags(), opMessageExt.getQueueOffset(), queueOffset);
            if (TransactionalMessageUtil.REMOVETAG.equals(opMessageExt.getTags())) {
                // 找到的都是需要"删除"的半消息
                // miniOffset就是halfOffset,将要消费的最小偏移 ,这是处理完成待删除的op消息
                if (queueOffset < miniOffset) {
                    doneOpOffset.add(opMessageExt.getQueueOffset());
                } else {
                    // op消息保存的是half消息的偏移,这个值竟然大于当前half消息的偏移,这是已经处理过的,不需要再处理了
                    removeMap.put(queueOffset, opMessageExt.getQueueOffset());
                }
            } else {
                log.error("Found a illegal tag in opMessageExt= {} ", opMessageExt);
            }
        }
        log.debug("Remove map: {}", removeMap);
        log.debug("Done op list: {}", doneOpOffset);
        return pullResult;
    }

broker在该生产组下找到一个生产者客户端发送回查请求:

    public void sendCheckMessage(MessageExt msgExt) throws Exception {
        CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
        checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());
        checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
        checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
        checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
        checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
        msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
        msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
        msgExt.setStoreSize(0);
        String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
        // 根据生产组找到对应的生产者实例,发送一个回查请求
        Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);
        if (channel != null) {
            brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
        } else {
            LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
        }
    }

客户端收到请求后,执行事务回查逻辑,并将事务状态发回broker:

    // 事务反查
    public RemotingCommand checkTransactionState(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        final CheckTransactionStateRequestHeader requestHeader =
            (CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
        final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
        final MessageExt messageExt = MessageDecoder.decode(byteBuffer);
        if (messageExt != null) {
            if (StringUtils.isNotEmpty(this.mqClientFactory.getClientConfig().getNamespace())) {
                messageExt.setTopic(NamespaceUtil
                    .withoutNamespace(messageExt.getTopic(), this.mqClientFactory.getClientConfig().getNamespace()));
            }
            String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
            if (null != transactionId && !"".equals(transactionId)) {
                // 就是客户端 msg Id,如果用户没有自定义设置这个值
                messageExt.setTransactionId(transactionId);
            }
            final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
            if (group != null) {
                MQProducerInner producer = this.mqClientFactory.selectProducer(group);
                if (producer != null) {
                    final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                    producer.checkTransactionState(addr, messageExt, requestHeader);
                } else {
                    log.debug("checkTransactionState, pick producer by group[{}] failed", group);
                }
            } else {
                log.warn("checkTransactionState, pick producer group failed");
            }
        } else {
            log.warn("checkTransactionState, decode message failed");
        }

        return null;
    }

结语

关于rocketmq事务消息篇到此结束,后续如果有需要会再进行补充。

以上是关于RocketMQ事务消息篇之事务消息源码分析的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ事务消息篇之事务消息的使用

RocketMQ事务消息实战

rocketmq源码分析:事务消息延时消息消息查询

RocketMQ源码分析之RocketMQ事务消息实现原理中篇----事务消息状态回查

RocketMQ源码分析之RocketMQ事务消息实现原理中篇----事务消息状态回查

RocketMQ事务消息篇之事务消息的介绍