Seata流程源码梳理上篇-TMRM处理

Posted _微风轻起

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Seata流程源码梳理上篇-TMRM处理相关的知识,希望对你有一定的参考价值。

这一篇我们主要来分析下Seata的AT模式的流程处理。

一、流程案例

1、案例源码

​ 我们本地流程梳理用的是基于spring-cloud框架,注册中心是eurak,服务间调用的是feign,源码下载的是官网的(当然你如果对dubbo更熟悉,也可以使用dubbo结构的),案例地址–https://github.com/seata/seata-samples。

​ 这个我本地的话,将其他的模块都删除了。然后启动的话,注意数据库连接信息的修改以及脚本初始化。

​ 这个业务是一个用户账号(account)下订单(order),然后减库存(stock),整个业务的发起就是在business发起的。

2、服务器源码

​ 然后seata服务端,我下载的1.4.2版本的源码在本地启动。

​ 对于服务端的启动,用的是db模式

​ 注册的话,修改为eurka

​ 注意运行对应的数据库脚本。

二、一些前置概念的介绍

​ 可以先再看下其官网的介绍–http://seata.io/zh-cn/docs/overview/what-is-seata.html。然后我们结合源码再来说下。

1、整个案例的TM、RM、TC业务逻辑

TC (Transaction Coordinator) - 事务协调者

维护全局和分支事务的状态,驱动全局事务提交或回滚。

TM (Transaction Manager) - 事务管理器

定义全局事务的范围:开始全局事务、提交或回滚全局事务。

RM (Resource Manager) - 资源管理器

管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

​ 这里主要我们需要明白TM、TC的概念,TM其实就是全局事务的发起者,TC就是协调者,例如如果全局事务需要回滚或者提交,就是TM去通知TC,然后TC再获取、调用该全局事务下的各个分支,通知他们去进行提交或回滚。

​ TC其实就是我们的seata服务端,如果我们要开启全局事务,通过TM(对于我们代码来说,TM也就是TransactionManager接口)发起(发起也就是调用begin方法,获取到本次分布式事务唯一全局标识xid),也就是我们在调用businessService.purchase("U100000", "C100000", 30)方法时,就会通过拦截器拦截,进行对应分布式事务的逻辑织入,然后在里面调用TC,告诉它我们要开始分布式事务了,然后TC就是在global_table添加一条记录,就是本次分布式事务的唯一标识记录:

CREATE TABLE `global_table` (
  `xid` varchar(128) NOT NULL,
  `transaction_id` bigint DEFAULT NULL,
  `status` tinyint NOT NULL,
  `application_id` varchar(64) DEFAULT NULL,
  `transaction_service_group` varchar(64) DEFAULT NULL,
  `transaction_name` varchar(64) DEFAULT NULL,
  `timeout` int DEFAULT NULL,
  `begin_time` bigint DEFAULT NULL,
  `application_data` varchar(2000) DEFAULT NULL,
  `gmt_create` datetime DEFAULT NULL,
  `gmt_modified` datetime DEFAULT NULL,
  PRIMARY KEY (`xid`),
  KEY `idx_gmt_modified_status` (`gmt_modified`,`status`),
  KEY `idx_transaction_id` (`transaction_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

​ 然后TC(也就是seata服务端)将xid返回,之后就会具体来调用到业务方法,也就是purchase(...)方法的业务逻辑:

@GlobalTransactional
public void purchase(String userId, String commodityCode, int orderCount) 
    stockFeignClient.deduct(commodityCode, orderCount);

    orderFeignClient.create(userId, commodityCode, orderCount);

    if (!validData()) 
        throw new RuntimeException("账户或库存不足,执行回滚");
    

​ 然后我们就会通过feign去调用stock服务以及order服务,在调用的时候,将xid加在请求头部中。

stock接受到调用后,seata就会通过拓展spring提供的HandlerInterceptor接口(具体可以去了解下这个接口)实现SeataHandlerInterceptor,从头部中获取到xid,然后绑定到当前线程中,然后去执行对应的业务。

​ 对应的业务方法就会去操作DB,而这个时候,seata同样有个织入逻辑,也就是代理spring本身的DataSource-DataSourceProxy类,当操作数据库时,通过DataSource获取Connection时,其又会返回ConnectionProxy类来代理本身的Connection,然后在操作数据库之前、之后,添加对应的seata逻辑。例如:

​ 1、记录undo_log表的数据,用于回滚时候的恢复重做

CREATE TABLE `undo_log` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `branch_id` bigint NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=147 DEFAULT CHARSET=utf8mb3;

​ 2、通过RM(也就是ResourceManager接口)进行分支注册(调用ResourceManager接口的branchRegister方法),也就是调用TC告诉它,然后TC就会在branch_table表添加一条记录,这里的一条记录,就是一个TM对应的数据

CREATE TABLE `branch_table` (
  `branch_id` bigint NOT NULL,
  `xid` varchar(128) NOT NULL,
  `transaction_id` bigint DEFAULT NULL,
  `resource_group_id` varchar(32) DEFAULT NULL,
  `resource_id` varchar(256) DEFAULT NULL,
  `lock_key` varchar(128) DEFAULT NULL,
  `branch_type` varchar(8) DEFAULT NULL,
  `status` tinyint DEFAULT NULL,
  `client_id` varchar(64) DEFAULT NULL,
  `application_data` varchar(2000) DEFAULT NULL,
  `gmt_create` datetime DEFAULT NULL,
  `gmt_modified` datetime DEFAULT NULL,
  PRIMARY KEY (`branch_id`),
  KEY `idx_xid` (`xid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

​ 然后当purchase(String userId, String commodityCode, int orderCount)方法调用orderstock成功完成后,这个时候就会在进入到TM也就是对应的拦截器逻辑中,如果成功执行分布式事务提交操作,失败就会捕获到异常,进行全局事务的回滚操作。例如TM调用TC告诉它完成分布式事务的提交,然后TC就会通过xid获取到当前事务下有几个分支,也就是对应RM完成提交(ResourceManagerbranchCommit方法),这个就是整个seata定义分布式事务的基本逻辑。

2、TM(TransactionManager)

public interface TransactionManager 

    /**
     * 开启一个全局事务(也就是调用),返回xid,也就是本次全局事务的标识
     */
    String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
        throws TransactionException;

    /**
     * Global commit.
     *
     * @param xid XID of the global transaction.
     * @return Status of the global transaction after committing.
     * @throws TransactionException Any exception that fails this will be wrapped with TransactionException and thrown
     * out.
     */
    GlobalStatus commit(String xid) throws TransactionException;

    /**
     * Global rollback.
     *
     * @param xid XID of the global transaction
     * @return Status of the global transaction after rollbacking.
     * @throws TransactionException Any exception that fails this will be wrapped with TransactionException and thrown
     * out.
     */
    GlobalStatus rollback(String xid) throws TransactionException;
	.........

3、SeataHandlerInterceptor

public class SeataHandlerInterceptor implements HandlerInterceptor 

   private static final Logger log = LoggerFactory
         .getLogger(SeataHandlerInterceptor.class);

    /**
     *	从头部中获取xid,绑定到当前线程
    **/
   @Override
   public boolean preHandle(HttpServletRequest request, HttpServletResponse response,
         Object handler) 

      String xid = RootContext.getXID();
       //public static final String KEY_XID = "TX_XID";
      String rpcXid = request.getHeader(RootContext.KEY_XID);
      if (log.isDebugEnabled()) 
         log.debug("xid in RootContext  xid in RpcContext ", xid, rpcXid);
      

      if (xid == null && rpcXid != null) 
         RootContext.bind(rpcXid);
         if (log.isDebugEnabled()) 
            log.debug("bind  to RootContext", rpcXid);
         
      
      return true;
   

    /**
     *	业务执行完后,删除
    **/
   @Override
   public void afterCompletion(HttpServletRequest request, HttpServletResponse response,
         Object handler, Exception e) 

      String rpcXid = request.getHeader(RootContext.KEY_XID);

      if (StringUtils.isEmpty(rpcXid)) 
         return;
      

      String unbindXid = RootContext.unbind();
      if (log.isDebugEnabled()) 
         log.debug("unbind  from RootContext", unbindXid);
      
      if (!rpcXid.equalsIgnoreCase(unbindXid)) 
         log.warn("xid in change during RPC from  to ", rpcXid, unbindXid);
         if (unbindXid != null) 
            RootContext.bind(unbindXid);
            log.warn("bind  back to RootContext", unbindXid);
         
      
   


4、RM(ResourceManager)

三、TM相关流程源码

​ 我们上面简单介绍了seata分布式事务的简单流程,下面我们就来具体分析下源码的流转。将上面的各个应用启动后,我们调用http://localhost:8084/purchase/commit,正式执行我们的业务流程。

1、拦截对应的使用了全局事务的方法

​ 我们在purchase(...)方法使用的@GlobalTransactional注解,就会有对应的拦截器来解析对应的逻辑增强,加入对应的全局事务。

​ 如果没有对应全局事务的标签,就不处理,直接通过methodInvocation.proceed()执行对应的方法。我们这里是有全局事务的,就通过handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation)来开始对应的逻辑处理。

2、处理全局事务

Object handleGlobalTransaction(final MethodInvocation methodInvocation,
    final GlobalTransactional globalTrxAnno) throws Throwable 
    boolean succeed = true;
    try 
        return transactionalTemplate.execute(new TransactionalExecutor() 
            @Override
            public Object execute() throws Throwable 
                return methodInvocation.proceed();
            
			.......
            @Override
            public TransactionInfo getTransactionInfo() 
                // reset the value of timeout
                int timeout = globalTrxAnno.timeoutMills();
                if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) 
                    timeout = defaultGlobalTransactionTimeout;
                
				// 这个是本次事务的对应信息、例如传播级别、超时时间
                TransactionInfo transactionInfo = new TransactionInfo();
                transactionInfo.setTimeOut(timeout);
                transactionInfo.setName(name());
                transactionInfo.setPropagation(globalTrxAnno.propagation());
                transactionInfo.setLockRetryInternal(globalTrxAnno.lockRetryInternal());
                transactionInfo.setLockRetryTimes(globalTrxAnno.lockRetryTimes());
               	.........
                return transactionInfo;
            
        );
     catch (TransactionalExecutor.ExecutionException e) 
        	.........
     finally 
        if (degradeCheck) 
            EVENT_BUS.post(new DegradeCheckEvent(succeed));
        
    

​ 这里的逻辑主要是给到TransactionalTemplate来处理对应的逻辑。

3、全局事务的整个流程逻辑

public class TransactionalTemplate 
		..........
    public Object execute(TransactionalExecutor business) throws Throwable 
        // 1. Get transactionInfo
        TransactionInfo txInfo = business.getTransactionInfo();
        if (txInfo == null) 
            throw new ShouldNeverHappenException("transactionInfo does not exist");
        
        // 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
        GlobalTransaction tx = GlobalTransactionContext.getCurrent();

        // 1.2 Handle the transaction propagation.
        Propagation propagation = txInfo.getPropagation();
        SuspendedResourcesHolder suspendedResourcesHolder = null;
        try 
            switch (propagation) 
                case NOT_SUPPORTED:
                    // If transaction is existing, suspend it.
                    if (existingTransaction(tx)) 
                        suspendedResourcesHolder = tx.suspend();
                    
                    // Execute without transaction and return.
                    return business.execute();
                case REQUIRES_NEW:
                    // If transaction is existing, suspend it, and then begin new transaction.
                    if (existingTransaction(tx)) 
                        suspendedResourcesHolder = tx.suspend();
                        tx = GlobalTransactionContext.createNew();
                    
                    // Continue and execute with new transaction
                    break;
               			............
                default:
                    throw new TransactionException("Not Supported Propagation:" + propagation);
            

            // 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
            if (tx == null) 
                tx = GlobalTransactionContext.createNew();
            

            // set current tx config to holder
            GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);

            try 
              		..........
                return rs;
             finally 
                //5. clear
                resumeGlobalLockConfig(previousConfig);
                triggerAfterCompletion();
                cleanUp();
            
         finally 
            // If the transaction is suspended, resume it.
            if (suspendedResourcesHolder != null) 
                tx.resume(suspendedResourcesHolder);
            
        
    

​ 这里其实主要分为两部分:

​ 一部分是分布式事务其本身也是一个事务,所以前面的switch (propagation)里面的逻辑,就是遵循spring本身的事务传播机制。例如如果是NOT_SUPPORTED,其不需要事务,就会将原来的事务挂起,然后直接执行对应的业务,而如果是REQUIRES_NEW的话,其就需要先将原来的事务挂起,再开启一个新事务来继续处理往下的逻辑。

​ 另一部分就是具体的分布式事务的处理了。

 // 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
    if (tx == null) 
        tx = GlobalTransactionContext.createNew();
    

    // set current tx config to holder
    GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);

    try 
        // 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
        //    else do nothing. Of course, the hooks will still be triggered.
        beginTransaction(txInfo, tx);

        Object rs;
        try 
            // Do Your Business
            rs = business.execute();
         catch (Throwable ex) 
            // 3. The needed business exception to rollback.
            completeTransactionAfterThrowing(txInfo, tx, ex);
            throw ex;
        

        // 4. everything is fine, commit.
        commitTransaction(tx);

        return rs;
     finally 
        //5. clear
        resumeGlobalLockConfig(previousConfig);
        triggerAfterCompletion();
        cleanUp();
    
 finally 
    // If the transaction is suspended, resume it.
    if (suspendedResourcesHolder != null) 
        tx.resume(suspendedResourcesHolder);
    

1、获取DefaultGlobalTransaction

​ 这里首先就是通过GlobalTransactionContext.createNew()获取DefaultGlobalTransaction

public static GlobalTransaction createNew() 
    return new DefaultGlobalTransaction();

DefaultGlobalTransaction() 
    this(null, GlobalStatus.UnKnown, GlobalTransactionRole.Launcher);


/**
 * Instantiates a new Default global transaction.
 *
 * @param xid    the xid
 * @param status the status
 * @param role   the role
 */
DefaultGlobalTransaction(String xid, GlobalStatus status, GlobalTransactionRole role) 
    this.transactionManager = TransactionManagerHolder.get();
    this.xid = xid;
    this.status = status;
    this.role = role;

​ 这个类里面可以获取到TransactionManager事务管理器,也就是TMtransactionManager是单例模式创建的。

2、开启事务(调用TC获取xid)

private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException <

以上是关于Seata流程源码梳理上篇-TMRM处理的主要内容,如果未能解决你的问题,请参考以下文章

Seata 源码分析 - tmrm 中 xid 传递过程

微服务39分布式事务Seata源码解析七:图解Seata事务执行流程之开启全局事务

Seata执行整体流程(AT模式)| Seata源码 - 自动配置数据库代理 | AT和XA的区别

Seata执行整体流程(AT模式)| Seata源码 - 自动配置数据库代理 | AT和XA的区别

MyBatis源码分析insert方法update方法delete方法处理流程(上篇)

深度挖掘RocketMQ底层源码「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(上篇)