RocketMQ事务消息篇之事务消息的使用

Posted NetWhite

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ事务消息篇之事务消息的使用相关的知识,希望对你有一定的参考价值。

前言

RocketMQ事务消息篇(一)之事务消息的介绍里对RocketMQ的事务消息作了相关说明,本文提供一些基本的开发示例。

java示例

依赖

    <dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>${rocketmq.version}</version>
        </dependency>

        <!--        用到acl需要添加这个依赖,不使用,忽略这个依赖-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-acl</artifactId>
            <version>${rocketmq.version}</version>
        </dependency>
    </dependencies>

示例代码 

@Slf4j
public class TransactionProducer {

    // name server地址
    private static final String NAME_SRV = "127.0.0.1:9876";
    // topic名称
    private static final String TOPIC = "test_topic";
    // 生产者组名
    private static final String PRODUCER_GROUP = "producer_transaction_group_demo";
    // access key of ACL
    private static final String ACCESS_KEY = "xuxiaodong";
    // secret key of ACL
    private static final String SECRET_KEY = "12345678";

    private static final Map<String, Boolean> resultCache = new HashMap<>();

    public static void main(String[] args) throws Exception {
//        TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP);
        // 支持acl事务生产者
        TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP, new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY)));
        producer.setNamesrvAddr(NAME_SRV);
        // 设置事务监听器
        producer.setTransactionListener(new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                // 1.执行本地事务逻辑
                log.info("execute local transaction, msgId: {}", msg.getTransactionId());
                log.info("this is params: {}", arg);
                return LocalTransactionState.UNKNOW;
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                // 2.事务回查
                String id = msg.getTransactionId();
                log.info("check local transaction state, msgId: {}", id);
                // 事务执行失败了,回滚消息
                if (resultCache.containsKey(id) && !resultCache.get(id)) {
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }
                // 事务执行成功
                resultCache.put(id, true);
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });
        producer.start();
        Message message = new Message();
        message.setTopic(TOPIC);
        message.setBody("This is a transaction message".getBytes());
        SendResult result = producer.sendMessageInTransaction(message, "params");
        log.info(result.toString());
    }

TransactionListener说明

TransactionListener的两个接口是rocketmq二阶段执行本地事务及事务回查的入口。其返回值LocalTransactionState如下:

  • COMMIT_MESSAGE:提交,本地事务执行成功,返回状态
  • ROLLBACK_MESSAGE:回滚,本地事务执行失败,返回状态
  • UNKNOW:未知,其它情况,返回该状态,会进行事务回查

executeLocalTransaction()方法不同返回值场景说明:

  • COMMIT_MESSAGE:事务提交,消费方可以消费消息,不会执行checkLocalTransaction方法
  • ROLLBACK_MESSAGE:事务回滚,消费方不会消费这条消息,不会执行checkLocalTransaction方法
  • UNKNOW:开始事务回查,执行checkLocalTransaction方法

checkLocalTransaction()方法不同返回值场景说明:

  • COMMIT_MESSAGE:事务提交,消费方可以消费消息,不会再执行checkLocalTransaction方法
  • ROLLBACK_MESSAGE:事务回滚,消费方不会消费这条消息,不会再执行checkLocalTransaction方法
  • UNKNOW:事务回查,继续执行checkLocalTransaction方法

go示例

go mod

require (
    github.com/apache/rocketmq-client-go/v2 v2.1.0-rc3
)

示例代码

package main

import (
	"context"
	"fmt"
	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	"github.com/apache/rocketmq-client-go/v2/producer"
	"os"
	"strconv"
	"sync"
	"time"
)

type TransactionListener struct {
	transCache *sync.Map
}

func (listener *TransactionListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {
	listener.transCache.Store(msg.TransactionId, true)

	fmt.Printf("执行本地事务")
	return primitive.UnknowState
}

func (listener *TransactionListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {
	fmt.Printf("事务回查, msgID : %v\\n", msg.MsgId)
	execSuccess, existed := listener.transCache.LoadAndDelete(msg.TransactionId)
	if !existed {
		fmt.Printf("unknow msg: %v", msg)
		return primitive.UnknowState
		//return primitive.CommitMessageState
	}
	if execSuccess.(bool) {
		fmt.Println("本地事务执行成功")
		return primitive.CommitMessageState
	} else {
		fmt.Println("本地事务执行失败")
		return primitive.RollbackMessageState
	}
}

func main() {

	// 设置日志,默认使用logrus
	//rlog.SetLogger()
	nameSrv := []string{"10.100.101.20:9876", "10.100.108.208:9876", "10.100.111.68:9876"}

	p, _ := rocketmq.NewTransactionProducer(
		&TransactionListener{transCache: new(sync.Map)},
		producer.WithNameServer(nameSrv), //设置name server地址,<=2.0.0版本,不支持域名
		//分别设置框架组提供的accessKey和secretKey
		producer.WithCredentials(primitive.Credentials{AccessKey: "rmq_access_key", SecretKey: "rmq_secret_key"}),
		producer.WithTrace(&primitive.TraceConfig{NamesrvAddrs: nameSrv}), //启用消息轨迹,把该行代码注释掉,则不启用
		producer.WithGroupName("test_topic_producer"),                     //设置生产组名
	)

	// 启动producer
	err := p.Start()
	if err != nil {
		fmt.Printf("start producer error: %s", err.Error())
		os.Exit(1)
	}
	// 换成自己业务使用的topic
	topic := "test_topic"

	for i := 0; i < 1; i++ {
		msg := &primitive.Message{
			Topic: topic,
			Body:  []byte("Hello RocketMQ Go Client! " + strconv.Itoa(i)), //消息体
		}
		// 设置tag,如果有需要再设置
		//msg.WithTag("tagA | tagB")
		// 设置消息的keys,建议设置成业务流水ID等字段可以用来标志业务唯一性的,消息的keys可以检索消息
		msg.WithKeys([]string{"key" + strconv.Itoa(i)})
		res, err := p.SendMessageInTransaction(context.Background(), msg)

		if err != nil {
			fmt.Printf("send message error: %s\\n", err)
		} else {
			fmt.Printf("send message success: result=%s\\n", res.String())
		}
	}
	time.Sleep(time.Minute * 10)
	if err = p.Shutdown(); err != nil {
		fmt.Printf("producer shutdown error: %s\\n", err)
	}
}

关于代码中事务的几个返回字段和上面的java等同。

注意事项

  1. 同一类事务消息及相关topic,使用相同的生产组名称,进行事务回查的时候,broker端会根据生产者组名称查询相关生产者实例,如果所有的生产者都混用同一个生产组名称,那就查到其它实例上了
  2. 生产组保证最少有2个生产者实例,万一其中一个宕机,重启等broker回查还能找到另一个生产者实例,保证可用性
  3. 本地事务执行状态需要自己维护,可以考虑使用第三方存储介质,如:mysql
  4. 事务回查达到消息最大次数(默认15次,每次1分钟)便会丢弃该事务消息。用户可以通过 Broker 配置文件的 `transactionCheckMax`参数来修改此限制。

以上是关于RocketMQ事务消息篇之事务消息的使用的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ事务消息篇之事务消息的介绍

RocketMQ源码分析之从官方示例窥探:RocketMQ事务消息实现基本思想

30 RocketMQ事务消息的代码实现细节

分布式事务之 RocketMQ 事务消息详解

rocketmq事务消息入门介绍

RocketMQ事务消息实战