30 RocketMQ事务消息的代码实现细节
Posted 鮀城小帅
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了30 RocketMQ事务消息的代码实现细节相关的知识,希望对你有一定的参考价值。
基于官方文档提供的事务消息API使用的例子来进行分析,这里会把订单系统的业务场景房子里面,加入一些伪代码进行参考。
1. 发送half事务消息出去
package com.mqTrsMessage;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.concurrent.*;
/**
* @ClassName TransactionProducer
* @Description TODO 事务消息机制
* @Author wushaopei
* @Date 2021/7/18 15:05
* @Version 1.0
*/
public class TransactionProducer {
public static void main(String[] args) throws Exception {
// 这个东西就是用来接收RcoketMQ回调的一个监听器接口
// 这里会实现执行订单本地事务,commit,rollback,回调查询等逻辑
TransactionListener transactionListener = new TransactionListenerImpl();
// 下面这个就是创建支持事务消息的Producer
// 对这个Producer还得指定要指定一个生产者分组,根据业务指定名字
TransactionMQProducer producer = new
TransactionMQProducer("TestProducerGroup");
producer.setNamesrvAddr("192.168.133.115:9876");
// 下面这个是指定了一个线程池,里面会包含一些线程
// 这个线程池里的线程就是用来处理RocketMQ回调你的请求
ExecutorService executorService = new ThreadPoolExecutor(
2,
5,
100,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("TestThread");
return thread;
}
});
// 给事务消息生产者设置对应的线程池,负责执行RocketMQ回调请求
producer.setExecutorService(executorService);
// 给事务消息生产者设置对应的回避函数
producer.setTransactionListener(transactionListener);
//启动实例
producer.start();
// 构造一条订单支付成功的消息,指定Topic是谁
Message msg = new Message("PayOrderSuccessTopic" /* Topic */,
"TestTag" /* Tag */,
"TestKey",
("订单支付消息").getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.sendMessageInTransaction(msg,null);
System.out.printf("%s%n", sendResult);
}
}
2.加入half消息发送失败,或者没收到half消息响应怎么办?
假如发送half消息失败了,就会在执行“producer.sendMessageInTransaction(msg,null);” 的时候,收到一个异常,发现消息发送失败了。
可以使用下面的代码去关注half消息发送失败的问题:
try{
SendResult sendResult = producer.sendMessageInTransaction(msg,null);
}catch(Exception e){
// half消息发送失败
// 订单系统执行回滚逻辑,比如说触发支付退款,更新订单状态为“已关闭”
}
如果一直没有收到half消息发送成功的通知呢?
针对这个问题,可以把发送出去的half消息放在内存里,或者写入本地磁盘文件,后台开启一个线程去检查,如果一个half消息超过比如10分钟都没有收到响应,那就自动触发回滚逻辑。这个补偿机制是MQ客户端内部自己的。
3.如果half消息成功了,如何执行订单本地事务?
public class TransactionListenerImpl implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
// 执行订单本地事务
// 接着执行本地一连串事务执行结果,去选择执行commit or rollback
try {
// 如果本地事务都执行成功了,返回commit
return LocalTransactionState.COMMIT_MESSAGE;
}catch (Exception e){
// 本地事务执行失败,回滚所有一切执行过的操作
// 如果本地事务执行失败了,返回rollback,标记half消息无效
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
}
4.如果没有返回commit或者rollback,如何进行回调?
package com.mqTrsMessage;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
/**
* @ClassName TransactionListenerImpl
* @Description TODO 事务消息监听器
* @Author wushaopei
* @Date 2021/7/18 15:15
* @Version 1.0
*/
public class TransactionListenerImpl implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
// 执行订单本地事务
// 接着执行本地一连串事务执行结果,去选择执行commit or rollback
try {
// 如果本地事务都执行成功了,返回commit
return LocalTransactionState.COMMIT_MESSAGE;
}catch (Exception e){
// 本地事务执行失败,回滚所有一切执行过的操作
// 如果本地事务执行失败了,返回rollback,标记half消息无效
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
// 如果因为各种原因,没有返回 commit 或者rollback
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 查询本地事务,是否执行成功了
Integer status = LocalTrans.transMap.get(msg.getTransactionId());
// 根据本地事务的情况去选择执行 commit or rollback
if(null != status){
switch (status){
case 0: return LocalTransactionState.UNKNOW;
case 1: return LocalTransactionState.COMMIT_MESSAGE;
case 2: return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
5.debug测试代码
由截图可知,该事务消息已成功写入MQ进程服务上,并返回了commit。
以上是关于30 RocketMQ事务消息的代码实现细节的主要内容,如果未能解决你的问题,请参考以下文章