RocketMQ事务消息篇之事务消息的使用
Posted NetWhite
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ事务消息篇之事务消息的使用相关的知识,希望对你有一定的参考价值。
前言
在RocketMQ事务消息篇(一)之事务消息的介绍里对RocketMQ的事务消息作了相关说明,本文提供一些基本的开发示例。
java示例
依赖
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<!-- 用到acl需要添加这个依赖,不使用,忽略这个依赖-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>${rocketmq.version}</version>
</dependency>
</dependencies>
示例代码
@Slf4j
public class TransactionProducer {
// name server地址
private static final String NAME_SRV = "127.0.0.1:9876";
// topic名称
private static final String TOPIC = "test_topic";
// 生产者组名
private static final String PRODUCER_GROUP = "producer_transaction_group_demo";
// access key of ACL
private static final String ACCESS_KEY = "xuxiaodong";
// secret key of ACL
private static final String SECRET_KEY = "12345678";
private static final Map<String, Boolean> resultCache = new HashMap<>();
public static void main(String[] args) throws Exception {
// TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP);
// 支持acl事务生产者
TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP, new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY)));
producer.setNamesrvAddr(NAME_SRV);
// 设置事务监听器
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 1.执行本地事务逻辑
log.info("execute local transaction, msgId: {}", msg.getTransactionId());
log.info("this is params: {}", arg);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 2.事务回查
String id = msg.getTransactionId();
log.info("check local transaction state, msgId: {}", id);
// 事务执行失败了,回滚消息
if (resultCache.containsKey(id) && !resultCache.get(id)) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
// 事务执行成功
resultCache.put(id, true);
return LocalTransactionState.COMMIT_MESSAGE;
}
});
producer.start();
Message message = new Message();
message.setTopic(TOPIC);
message.setBody("This is a transaction message".getBytes());
SendResult result = producer.sendMessageInTransaction(message, "params");
log.info(result.toString());
}
TransactionListener说明
TransactionListener的两个接口是rocketmq二阶段执行本地事务及事务回查的入口。其返回值LocalTransactionState如下:
- COMMIT_MESSAGE:提交,本地事务执行成功,返回状态
- ROLLBACK_MESSAGE:回滚,本地事务执行失败,返回状态
- UNKNOW:未知,其它情况,返回该状态,会进行事务回查
executeLocalTransaction()方法不同返回值场景说明:
- COMMIT_MESSAGE:事务提交,消费方可以消费消息,不会执行checkLocalTransaction方法
- ROLLBACK_MESSAGE:事务回滚,消费方不会消费这条消息,不会执行checkLocalTransaction方法
- UNKNOW:开始事务回查,执行checkLocalTransaction方法
checkLocalTransaction()方法不同返回值场景说明:
- COMMIT_MESSAGE:事务提交,消费方可以消费消息,不会再执行checkLocalTransaction方法
- ROLLBACK_MESSAGE:事务回滚,消费方不会消费这条消息,不会再执行checkLocalTransaction方法
- UNKNOW:事务回查,继续执行checkLocalTransaction方法
go示例
go mod
require (
github.com/apache/rocketmq-client-go/v2 v2.1.0-rc3
)
示例代码
package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
"os"
"strconv"
"sync"
"time"
)
type TransactionListener struct {
transCache *sync.Map
}
func (listener *TransactionListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {
listener.transCache.Store(msg.TransactionId, true)
fmt.Printf("执行本地事务")
return primitive.UnknowState
}
func (listener *TransactionListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {
fmt.Printf("事务回查, msgID : %v\\n", msg.MsgId)
execSuccess, existed := listener.transCache.LoadAndDelete(msg.TransactionId)
if !existed {
fmt.Printf("unknow msg: %v", msg)
return primitive.UnknowState
//return primitive.CommitMessageState
}
if execSuccess.(bool) {
fmt.Println("本地事务执行成功")
return primitive.CommitMessageState
} else {
fmt.Println("本地事务执行失败")
return primitive.RollbackMessageState
}
}
func main() {
// 设置日志,默认使用logrus
//rlog.SetLogger()
nameSrv := []string{"10.100.101.20:9876", "10.100.108.208:9876", "10.100.111.68:9876"}
p, _ := rocketmq.NewTransactionProducer(
&TransactionListener{transCache: new(sync.Map)},
producer.WithNameServer(nameSrv), //设置name server地址,<=2.0.0版本,不支持域名
//分别设置框架组提供的accessKey和secretKey
producer.WithCredentials(primitive.Credentials{AccessKey: "rmq_access_key", SecretKey: "rmq_secret_key"}),
producer.WithTrace(&primitive.TraceConfig{NamesrvAddrs: nameSrv}), //启用消息轨迹,把该行代码注释掉,则不启用
producer.WithGroupName("test_topic_producer"), //设置生产组名
)
// 启动producer
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}
// 换成自己业务使用的topic
topic := "test_topic"
for i := 0; i < 1; i++ {
msg := &primitive.Message{
Topic: topic,
Body: []byte("Hello RocketMQ Go Client! " + strconv.Itoa(i)), //消息体
}
// 设置tag,如果有需要再设置
//msg.WithTag("tagA | tagB")
// 设置消息的keys,建议设置成业务流水ID等字段可以用来标志业务唯一性的,消息的keys可以检索消息
msg.WithKeys([]string{"key" + strconv.Itoa(i)})
res, err := p.SendMessageInTransaction(context.Background(), msg)
if err != nil {
fmt.Printf("send message error: %s\\n", err)
} else {
fmt.Printf("send message success: result=%s\\n", res.String())
}
}
time.Sleep(time.Minute * 10)
if err = p.Shutdown(); err != nil {
fmt.Printf("producer shutdown error: %s\\n", err)
}
}
关于代码中事务的几个返回字段和上面的java等同。
注意事项
- 同一类事务消息及相关topic,使用相同的生产组名称,进行事务回查的时候,broker端会根据生产者组名称查询相关生产者实例,如果所有的生产者都混用同一个生产组名称,那就查到其它实例上了
- 生产组保证最少有2个生产者实例,万一其中一个宕机,重启等broker回查还能找到另一个生产者实例,保证可用性
- 本地事务执行状态需要自己维护,可以考虑使用第三方存储介质,如:mysql
- 事务回查达到消息最大次数(默认15次,每次1分钟)便会丢弃该事务消息。用户可以通过 Broker 配置文件的 `transactionCheckMax`参数来修改此限制。
以上是关于RocketMQ事务消息篇之事务消息的使用的主要内容,如果未能解决你的问题,请参考以下文章