30 RocketMQ事务消息的代码实现细节

Posted 鮀城小帅

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了30 RocketMQ事务消息的代码实现细节相关的知识,希望对你有一定的参考价值。

基于官方文档提供的事务消息API使用的例子来进行分析,这里会把订单系统的业务场景房子里面,加入一些伪代码进行参考。

1. 发送half事务消息出去

package com.mqTrsMessage;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.concurrent.*;

/**
 * @ClassName TransactionProducer
 * @Description TODO 事务消息机制
 * @Author wushaopei
 * @Date 2021/7/18 15:05
 * @Version 1.0
 */
public class TransactionProducer {

    public static void main(String[] args) throws Exception {

        // 这个东西就是用来接收RcoketMQ回调的一个监听器接口
        // 这里会实现执行订单本地事务,commit,rollback,回调查询等逻辑
        TransactionListener transactionListener = new TransactionListenerImpl();

        // 下面这个就是创建支持事务消息的Producer
        // 对这个Producer还得指定要指定一个生产者分组,根据业务指定名字
        TransactionMQProducer producer = new
                TransactionMQProducer("TestProducerGroup");

        producer.setNamesrvAddr("192.168.133.115:9876");

        // 下面这个是指定了一个线程池,里面会包含一些线程
        // 这个线程池里的线程就是用来处理RocketMQ回调你的请求
        ExecutorService executorService = new ThreadPoolExecutor(
                2,
                5,
                100,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r);
                        thread.setName("TestThread");
                        return thread;
                    }
        });

        // 给事务消息生产者设置对应的线程池,负责执行RocketMQ回调请求
        producer.setExecutorService(executorService);
        // 给事务消息生产者设置对应的回避函数
        producer.setTransactionListener(transactionListener);
        //启动实例
        producer.start();


        // 构造一条订单支付成功的消息,指定Topic是谁
        Message msg = new Message("PayOrderSuccessTopic" /* Topic */,
                "TestTag" /* Tag */,
                "TestKey",
                ("订单支付消息").getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        );
        //Call send message to deliver message to one of brokers.
        SendResult sendResult = producer.sendMessageInTransaction(msg,null);
        System.out.printf("%s%n", sendResult);
    }
}

2.加入half消息发送失败,或者没收到half消息响应怎么办?

假如发送half消息失败了,就会在执行“producer.sendMessageInTransaction(msg,null);” 的时候,收到一个异常,发现消息发送失败了。

可以使用下面的代码去关注half消息发送失败的问题:

 try{

    SendResult sendResult = producer.sendMessageInTransaction(msg,null);

 }catch(Exception e){
    // half消息发送失败
    // 订单系统执行回滚逻辑,比如说触发支付退款,更新订单状态为“已关闭”
 }

如果一直没有收到half消息发送成功的通知呢?

针对这个问题,可以把发送出去的half消息放在内存里,或者写入本地磁盘文件,后台开启一个线程去检查,如果一个half消息超过比如10分钟都没有收到响应,那就自动触发回滚逻辑。这个补偿机制是MQ客户端内部自己的。

3.如果half消息成功了,如何执行订单本地事务?

public class TransactionListenerImpl implements TransactionListener {
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        // 执行订单本地事务
        // 接着执行本地一连串事务执行结果,去选择执行commit or rollback
        try {
            // 如果本地事务都执行成功了,返回commit
            return LocalTransactionState.COMMIT_MESSAGE;
        }catch (Exception e){
            // 本地事务执行失败,回滚所有一切执行过的操作
            // 如果本地事务执行失败了,返回rollback,标记half消息无效
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }
}

4.如果没有返回commit或者rollback,如何进行回调?

package com.mqTrsMessage;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

/**
 * @ClassName TransactionListenerImpl
 * @Description TODO 事务消息监听器
 * @Author wushaopei
 * @Date 2021/7/18 15:15
 * @Version 1.0
 */
public class TransactionListenerImpl implements TransactionListener {
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        // 执行订单本地事务
        // 接着执行本地一连串事务执行结果,去选择执行commit or rollback
        try {
            // 如果本地事务都执行成功了,返回commit
            return LocalTransactionState.COMMIT_MESSAGE;
        }catch (Exception e){
            // 本地事务执行失败,回滚所有一切执行过的操作
            // 如果本地事务执行失败了,返回rollback,标记half消息无效
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }

    // 如果因为各种原因,没有返回 commit 或者rollback
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 查询本地事务,是否执行成功了
        Integer status = LocalTrans.transMap.get(msg.getTransactionId());
        // 根据本地事务的情况去选择执行 commit or rollback
        if(null != status){
            switch (status){
                case 0: return LocalTransactionState.UNKNOW;
                case 1: return LocalTransactionState.COMMIT_MESSAGE;
                case 2: return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

5.debug测试代码

由截图可知,该事务消息已成功写入MQ进程服务上,并返回了commit。

以上是关于30 RocketMQ事务消息的代码实现细节的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ源码分析之从官方示例窥探:RocketMQ事务消息实现基本思想

❧消息队列解析 RocketMQ 业务消息——“事务消息”

分布式事务之 RocketMQ 事务消息详解

RocketMQ事务消息实现原理

RocketMQ事务消息实战

RocketMQ实现事务消息方案