分布式事务(Seata)原理 详解篇,建议收藏

Posted 牧小农

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式事务(Seata)原理 详解篇,建议收藏相关的知识,希望对你有一定的参考价值。

前言

在之前的系列中,我们讲解了关于Seata基本介绍和实际应用,今天带来的这篇,就给大家分析一下Seata的源码是如何一步一步实现的。读源码的时候我们需要俯瞰起全貌,不要去扣一个一个的细节,这样我们学习起来会快捷而且有效率,我们学习源码需要掌握的是整体思路和核心点。

首先 Seata客户端启动一般分为以下几个流程:

  1. 自动加载Bean属性和配置信息
  2. 初始化TM
  3. 初始化RM
  4. 初始化分布式事务客户端完成,完成代理数据库配置
  5. 连接TC(Seata服务端),注册RM和TM
  6. 开启全局事务

在这篇源码的讲解中,我们主要以AT模式为主导,官网也是主推AT模式,我们在上篇的文章中也讲解过,感兴趣的小伙伴可以去看一看分布式事务(Seata) 四大模式详解,在官网中也提供了对应的流程地址:https://seata.io/zh-cn/docs/dev/mode/at-mode.html ,在这里我们只是做一些简单的介绍,AT模式主要分为两个阶段:

一阶段:

  • 解析SQL,获取SQL类型(CRUD)、表信息、条件(where) 等相关信息
  • 查询前镜像(改变之前的数据),根据解析得到的条件信息,生成查询语句,定位数据
  • 执行业务SQL,更新数据
  • 查询后镜像(改变后的数据),根据前镜像的结果,通过主键都给你为数据
  • 插入回滚日志,将前后镜像数据以及业务SQL等信息,组织成一条回滚日志记录,插入到undo Log表中
  • 提交前,向TC注册分支,申请全局锁
  • 本地事务提交,业务数据的更细腻和生成的undoLog一起提交
  • 将本地事务提交的结果通知给TC

二阶段:

如果TC收到的是回滚请求

  • 开启本地事务,通过XID和BranchID查找到对应的undo Log记录
  • 根据undoLog中的前镜像和业务SQL的相关信息生成并执行回滚语句
  • 提交本地事务,将本地事务的执行结果(分支事务回滚的信息)通知给TC

如果没问题,执行提交操作

  • 收到TC分支提交请求,将请求放入到一个异步任务的队列中,马上返回提交成功的结果给TC
  • 异步任务阶段的分支提交请求删除undoLog中记录

源码入口

接下来,我们就需要从官网中去下载源码,下载地址:https://seata.io/zh-cn/blog/download.html,选择 source 即可,下载完成之后,通过IDEA打开项目。

源码下载下来之后,我们应该如何去找入口呢?首先我们需要找到对应引入的 Seataspring-alibaba-seata,我们在回想一下,我们开启事务的时候,是不是添加过一个@GlobalTransactional的注解,这个注解就是我们入手的一个点,我们在 spring.factories中看到有一个 GlobalTransactionAutoConfiguration,这个就是我们需要关注的点,也就是我们源码的入口

GlobalTransactionAutoConfiguration中我们找到一个用Bean注入的方法 globalTransactionScanner,这个就是全局事务扫描器,这个类型主要负责加载配置,注入相关的Bean

这里给大家展示了当前GlobalTransactionScanner的类关系图,其中我们现在继承了Aop的AbstractAutoProxyCreator类型,在这其中有一个重点方法,这个方法就是判断Bean对象是否需要代理,是否需要增强。

@Configuration
@EnableConfigurationProperties(SeataProperties.class)
public class GlobalTransactionAutoConfiguration 

    //全局事务扫描器
    @Bean
    public GlobalTransactionScanner globalTransactionScanner() 

      String applicationName = applicationContext.getEnvironment()
          .getProperty("spring.application.name");

      String txServiceGroup = seataProperties.getTxServiceGroup();

      if (StringUtils.isEmpty(txServiceGroup)) 
        txServiceGroup = applicationName + "-fescar-service-group";
        seataProperties.setTxServiceGroup(txServiceGroup);
      
      // 构建全局扫描器,传入参数:应用名、事务分组名,失败处理器
      return new GlobalTransactionScanner(applicationName, txServiceGroup);
    


在这其中我们要关心的是 GlobalTransactionScanner 这个类型,这个类型扫描 @GlobalTransactional 注解,并对代理方法进行拦截增强事务的功能。我们就从源码中搜索这个GlobalTransactionScanner类,看看里面具体是做了什么

/**
 * The type Global transaction scanner.
 * 全局事务扫描器
 * @author slievrly
 */
public class GlobalTransactionScanner
        //AbstractAutoProxyCreator AOP动态代理 增强Bean
        extends AbstractAutoProxyCreator
        /**
         * ConfigurationChangeListener: 监听器基准接口
         * InitializingBean: Bean初始化
         * ApplicationContextAware: Spring容器
         * DisposableBean: Spring 容器销毁
         */
        implements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean 
        
    private final String applicationId;//服务名
    private final String txServiceGroup;//事务分组        

  private void initClient() 
        //启动日志
        if (LOGGER.isInfoEnabled()) 
            LOGGER.info("Initializing Global Transaction Clients ... ");
        
        //检查应用名以及事务分组名,为空抛出异常IllegalArgumentException
        if (DEFAULT_TX_GROUP_OLD.equals(txServiceGroup)) 
            LOGGER.warn("the default value of seata.tx-service-group:  has already changed to  since Seata 1.5, " +
                    "please change your default configuration as soon as possible " +
                    "and we don't recommend you to use default tx-service-group's value provided by seata",
                    DEFAULT_TX_GROUP_OLD, DEFAULT_TX_GROUP);
        
        if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) 
            throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
        
        //init TM
        //初始化TM
        TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
        if (LOGGER.isInfoEnabled()) 
            LOGGER.info("Transaction Manager Client is initialized. applicationId[] txServiceGroup[]", applicationId, txServiceGroup);
        
        //init RM
        //初始化RM
        RMClient.init(applicationId, txServiceGroup);
        if (LOGGER.isInfoEnabled()) 
            LOGGER.info("Resource Manager is initialized. applicationId[] txServiceGroup[]", applicationId, txServiceGroup);
        

        if (LOGGER.isInfoEnabled()) 
            LOGGER.info("Global Transaction Clients are initialized. ");
        
        registerSpringShutdownHook();

    

    @Override
    public void afterPropertiesSet() 
        if (disableGlobalTransaction) 
            if (LOGGER.isInfoEnabled()) 
                LOGGER.info("Global transaction is disabled.");
            
            ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                    (ConfigurationChangeListener)this);
            return;
        
        if (initialized.compareAndSet(false, true)) 
            initClient();
        
    
    
   private void initClient() 
        //启动日志
        if (LOGGER.isInfoEnabled()) 
            LOGGER.info("Initializing Global Transaction Clients ... ");
        
        //检查应用名以及事务分组名,为空抛出异常IllegalArgumentException
        if (DEFAULT_TX_GROUP_OLD.equals(txServiceGroup)) 
            LOGGER.warn("the default value of seata.tx-service-group:  has already changed to  since Seata 1.5, " +
                    "please change your default configuration as soon as possible " +
                    "and we don't recommend you to use default tx-service-group's value provided by seata",
                    DEFAULT_TX_GROUP_OLD, DEFAULT_TX_GROUP);
        

        //检查应用名以及事务分组名,为空抛出异常IllegalArgumentException
        if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) 
            throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
        
        //init TM
        //初始化TM
        TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
        if (LOGGER.isInfoEnabled()) 
            LOGGER.info("Transaction Manager Client is initialized. applicationId[] txServiceGroup[]", applicationId, txServiceGroup);
        
        //init RM
        //初始化RM
        RMClient.init(applicationId, txServiceGroup);
        if (LOGGER.isInfoEnabled()) 
            LOGGER.info("Resource Manager is initialized. applicationId[] txServiceGroup[]", applicationId, txServiceGroup);
        

        if (LOGGER.isInfoEnabled()) 
            LOGGER.info("Global Transaction Clients are initialized. ");
        
        registerSpringShutdownHook();

    

    //代理增强,Spring 所有的Bean都会经过这个方法
    @Override
    protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) 
        // do checkers
        //检查bean和beanName
        if (!doCheckers(bean, beanName)) 
            return bean;
        

        try 
            //加锁防止并发
            synchronized (PROXYED_SET) 
                if (PROXYED_SET.contains(beanName)) 
                    return bean;
                
                interceptor = null;
                //check TCC proxy
                //检查是否为TCC模式
                if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) 
                    // init tcc fence clean task if enable useTccFence
                    //如果启用useTccFence 失败 ,则初始化TCC清理任务
                    TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);
                    //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
                    //如果是,添加TCC拦截器
                    interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
                    ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                            (ConfigurationChangeListener)interceptor);
                 else 
                    //不是TCC
                    Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
                    Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);

                    //判断是否有相关事务注解,如果没有不进行代理
                    if (!existsAnnotation(new Class[]serviceInterface)
                        && !existsAnnotation(interfacesIfJdk)) 
                        return bean;
                    

                    //发现存在全局事务注解标注的Bean对象,添加拦截器
                    if (globalTransactionalInterceptor == null) 
                        //添加拦截器
                        globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                        ConfigurationCache.addConfigListener(
                                ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                                (ConfigurationChangeListener)globalTransactionalInterceptor);
                    
                    interceptor = globalTransactionalInterceptor;
                

                LOGGER.info("Bean[] with name [] would use interceptor []", bean.getClass().getName(), beanName, interceptor.getClass().getName());
                //检查是否为代理对象
                if (!AopUtils.isAopProxy(bean)) 
                    //不是代理对象,调用父级
                    bean = super.wrapIfNecessary(bean, beanName, cacheKey);
                 else 
                    //是代理对象,反射获取代理类中已经存在的拦截器组合,然后添加到这个集合中
                    AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
                    Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
                    int pos;
                    for (Advisor avr : advisor) 
                        // Find the position based on the advisor's order, and add to advisors by pos
                        pos = findAddSeataAdvisorPosition(advised, avr);
                        advised.addAdvisor(pos, avr);
                    
                
                PROXYED_SET.add(beanName);
                return bean;
            
         catch (Exception exx) 
            throw new RuntimeException(exx);
        
    



InitializingBean:中实现了一个 afterPropertiesSet()方法,在这个方法中,调用了initClient()

AbstractAutoProxyCreator:APO动态代理,在之前的的Nacos和Sentiel中都有这个代理类,AOP在我们越往深入学习,在学习源码的会见到的越来越多,越来越重要,很多相关代理,都是通过AOP进行增强,在这个类中,我们需要关注有一个wrapIfNecessary()方法, 这个方法主要是判断被代理的bean或者类是否需要代理增强,在这个方法中会调用GlobalTransactionalInterceptor.invoke()进行带来增强。

具体代码如下:

public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor, SeataInterceptor 

    public GlobalTransactionalInterceptor(FailureHandler failureHandler) 
        this.failureHandler = failureHandler == null ? DEFAULT_FAIL_HANDLER : failureHandler;
        this.disable = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
            DEFAULT_DISABLE_GLOBAL_TRANSACTION);
        this.order =
            ConfigurationFactory.getInstance().getInt(ConfigurationKeys.TM_INTERCEPTOR_ORDER, TM_INTERCEPTOR_ORDER);
        degradeCheck = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.CLIENT_DEGRADE_CHECK,
            DEFAULT_TM_DEGRADE_CHECK);
        if (degradeCheck) 
            ConfigurationCache.addConfigListener(ConfigurationKeys.CLIENT_DEGRADE_CHECK, this);
            degradeCheckPeriod = ConfigurationFactory.getInstance()
                .getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_PERIOD, DEFAULT_TM_DEGRADE_CHECK_PERIOD);
            degradeCheckAllowTimes = ConfigurationFactory.getInstance()
                .getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_ALLOW_TIMES, DEFAULT_TM_DEGRADE_CHECK_ALLOW_TIMES);
            EVENT_BUS.register(this);
            if (degradeCheckPeriod > 0 && degradeCheckAllowTimes > 0) 
                startDegradeCheck();
            
        
        this.initDefaultGlobalTransactionTimeout();
    

    @Override
    public Object invoke(final MethodInvocation methodInvocation) throws Throwable 
        //获取执行的方法
        Class<?> targetClass =
            methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
        Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
        if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) 
            final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
            //获取GlobalTransactional(全局事务)、GlobalLock(全局锁)元数据
            final GlobalTransactional globalTransactionalAnnotation =
                getAnnotation(method, targetClass, GlobalTransactional.class);
            //GlobalLock会将本地事务的执行纳入Seata分布式事务的管理,共同竞争全局锁
            //保证全局事务在执行的时候,本地事务不可以操作全局事务的记录
            final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);//获取全局锁
            boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
            if (!localDisable) 
                if (globalTransactionalAnnotation != null || this.aspectTransactional != null) 
                    AspectTransactional transactional;
                    if (globalTransactionalAnnotation != null) 
                        transactional = new AspectTransactional(globalTransactionalAnnotation.timeoutMills(),
                            globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(),
                            globalTransactionalAnnotation.noRollbackForClassName(),
                            globalTransactionalAnnotation.noRollbackFor(),
                            globalTransactionalAnnotation.noRollbackForClassName(),
                            globalTransactionalAnnotation.propagation(),
                            globalTransactionalAnnotation.lockRetryInterval(),
                            globalTransactionalAnnotation.lockRetryTimes());
                     else 
                        transactional = this.aspectTransactional;
                    
                    //执行全局事务
                    return handleGlobalTransaction(methodInvocation, transactional);
                 else if (globalLockAnnotation != null) 
                    //执行全局锁
                    return handleGlobalLock(methodInvocation, globalLockAnnotation);
                
            
        
        return methodInvocation.proceed();
    


具体流程图如下所示:

核心源码

在上面我们讲解到 GlobalTransactionalInterceptor 作为全局事务拦截器,一旦执行拦截,就会进入invoke方法,其中,我们会做 @GlobalTransactional 注解的判断,如果有这个注解的存在,会执行全局事务和全局锁,再执行全局事务的时候会调用 handleGlobalTransaction 全局事务处理器,获取事务信息,那我们接下来就来看一下 GlobalTransactionalInterceptor.handleGlobalTransaction 到底是如何执行全局事务的

Object handleGlobalTransaction(final MethodInvocation methodInvocation,
        final AspectTransactional aspectTransactional) throws Throwable 
        boolean succeed = true;
        try 
            return transactionalTemplate.execute(new TransactionalExecutor() 
                @Override
                public Object execute(分布式事务(Seata)原理 详解篇,建议收藏

分布式事务(Seata)原理 详解篇,建议收藏

分布式事务(Seata)原理 详解篇,建议收藏

分布式事务 Seata Saga 模式首秀以及三种模式详解 | Meetup#3 回顾

Seata框架详解

Seata框架详解