分布式事务(Seata)原理 详解篇,建议收藏
Posted 牧小农
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式事务(Seata)原理 详解篇,建议收藏相关的知识,希望对你有一定的参考价值。
前言
在之前的系列中,我们讲解了关于Seata基本介绍和实际应用,今天带来的这篇,就给大家分析一下Seata的源码是如何一步一步实现的。读源码的时候我们需要俯瞰起全貌,不要去扣一个一个的细节,这样我们学习起来会快捷而且有效率,我们学习源码需要掌握的是整体思路和核心点。
首先 Seata
客户端启动一般分为以下几个流程:
- 自动加载Bean属性和配置信息
- 初始化TM
- 初始化RM
- 初始化分布式事务客户端完成,完成代理数据库配置
- 连接TC(Seata服务端),注册RM和TM
- 开启全局事务
在这篇源码的讲解中,我们主要以AT模式为主导,官网也是主推AT模式,我们在上篇的文章中也讲解过,感兴趣的小伙伴可以去看一看分布式事务(Seata) 四大模式详解,在官网中也提供了对应的流程地址:https://seata.io/zh-cn/docs/dev/mode/at-mode.html ,在这里我们只是做一些简单的介绍,AT模式主要分为两个阶段:
一阶段:
- 解析SQL,获取SQL类型(CRUD)、表信息、条件(where) 等相关信息
- 查询前镜像(改变之前的数据),根据解析得到的条件信息,生成查询语句,定位数据
- 执行业务SQL,更新数据
- 查询后镜像(改变后的数据),根据前镜像的结果,通过主键都给你为数据
- 插入回滚日志,将前后镜像数据以及业务SQL等信息,组织成一条回滚日志记录,插入到undo Log表中
- 提交前,向TC注册分支,申请全局锁
- 本地事务提交,业务数据的更细腻和生成的undoLog一起提交
- 将本地事务提交的结果通知给TC
二阶段:
如果TC收到的是回滚请求
- 开启本地事务,通过XID和BranchID查找到对应的undo Log记录
- 根据undoLog中的前镜像和业务SQL的相关信息生成并执行回滚语句
- 提交本地事务,将本地事务的执行结果(分支事务回滚的信息)通知给TC
如果没问题,执行提交操作
- 收到TC分支提交请求,将请求放入到一个异步任务的队列中,马上返回提交成功的结果给TC
- 异步任务阶段的分支提交请求删除undoLog中记录
源码入口
接下来,我们就需要从官网中去下载源码,下载地址:https://seata.io/zh-cn/blog/download.html,选择 source
即可,下载完成之后,通过IDEA打开项目。
源码下载下来之后,我们应该如何去找入口呢?首先我们需要找到对应引入的 Seata
包 spring-alibaba-seata
,我们在回想一下,我们开启事务的时候,是不是添加过一个@GlobalTransactional
的注解,这个注解就是我们入手的一个点,我们在 spring.factories
中看到有一个 GlobalTransactionAutoConfiguration
,这个就是我们需要关注的点,也就是我们源码的入口
在 GlobalTransactionAutoConfiguration
中我们找到一个用Bean注入的方法 globalTransactionScanner
,这个就是全局事务扫描器,这个类型主要负责加载配置,注入相关的Bean
这里给大家展示了当前GlobalTransactionScanner的类关系图,其中我们现在继承了Aop的AbstractAutoProxyCreator类型,在这其中有一个重点方法,这个方法就是判断Bean对象是否需要代理,是否需要增强。
@Configuration
@EnableConfigurationProperties(SeataProperties.class)
public class GlobalTransactionAutoConfiguration
//全局事务扫描器
@Bean
public GlobalTransactionScanner globalTransactionScanner()
String applicationName = applicationContext.getEnvironment()
.getProperty("spring.application.name");
String txServiceGroup = seataProperties.getTxServiceGroup();
if (StringUtils.isEmpty(txServiceGroup))
txServiceGroup = applicationName + "-fescar-service-group";
seataProperties.setTxServiceGroup(txServiceGroup);
// 构建全局扫描器,传入参数:应用名、事务分组名,失败处理器
return new GlobalTransactionScanner(applicationName, txServiceGroup);
在这其中我们要关心的是 GlobalTransactionScanner
这个类型,这个类型扫描 @GlobalTransactional
注解,并对代理方法进行拦截增强事务的功能。我们就从源码中搜索这个GlobalTransactionScanner
类,看看里面具体是做了什么
/**
* The type Global transaction scanner.
* 全局事务扫描器
* @author slievrly
*/
public class GlobalTransactionScanner
//AbstractAutoProxyCreator AOP动态代理 增强Bean
extends AbstractAutoProxyCreator
/**
* ConfigurationChangeListener: 监听器基准接口
* InitializingBean: Bean初始化
* ApplicationContextAware: Spring容器
* DisposableBean: Spring 容器销毁
*/
implements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean
private final String applicationId;//服务名
private final String txServiceGroup;//事务分组
private void initClient()
//启动日志
if (LOGGER.isInfoEnabled())
LOGGER.info("Initializing Global Transaction Clients ... ");
//检查应用名以及事务分组名,为空抛出异常IllegalArgumentException
if (DEFAULT_TX_GROUP_OLD.equals(txServiceGroup))
LOGGER.warn("the default value of seata.tx-service-group: has already changed to since Seata 1.5, " +
"please change your default configuration as soon as possible " +
"and we dont recommend you to use default tx-service-groups value provided by seata",
DEFAULT_TX_GROUP_OLD, DEFAULT_TX_GROUP);
if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup))
throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
//init TM
//初始化TM
TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
if (LOGGER.isInfoEnabled())
LOGGER.info("Transaction Manager Client is initialized. applicationId[] txServiceGroup[]", applicationId, txServiceGroup);
//init RM
//初始化RM
RMClient.init(applicationId, txServiceGroup);
if (LOGGER.isInfoEnabled())
LOGGER.info("Resource Manager is initialized. applicationId[] txServiceGroup[]", applicationId, txServiceGroup);
if (LOGGER.isInfoEnabled())
LOGGER.info("Global Transaction Clients are initialized. ");
registerSpringShutdownHook();
@Override
public void afterPropertiesSet()
if (disableGlobalTransaction)
if (LOGGER.isInfoEnabled())
LOGGER.info("Global transaction is disabled.");
ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)this);
return;
if (initialized.compareAndSet(false, true))
initClient();
private void initClient()
//启动日志
if (LOGGER.isInfoEnabled())
LOGGER.info("Initializing Global Transaction Clients ... ");
//检查应用名以及事务分组名,为空抛出异常IllegalArgumentException
if (DEFAULT_TX_GROUP_OLD.equals(txServiceGroup))
LOGGER.warn("the default value of seata.tx-service-group: has already changed to since Seata 1.5, " +
"please change your default configuration as soon as possible " +
"and we dont recommend you to use default tx-service-groups value provided by seata",
DEFAULT_TX_GROUP_OLD, DEFAULT_TX_GROUP);
//检查应用名以及事务分组名,为空抛出异常IllegalArgumentException
if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup))
throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
//init TM
//初始化TM
TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
if (LOGGER.isInfoEnabled())
LOGGER.info("Transaction Manager Client is initialized. applicationId[] txServiceGroup[]", applicationId, txServiceGroup);
//init RM
//初始化RM
RMClient.init(applicationId, txServiceGroup);
if (LOGGER.isInfoEnabled())
LOGGER.info("Resource Manager is initialized. applicationId[] txServiceGroup[]", applicationId, txServiceGroup);
if (LOGGER.isInfoEnabled())
LOGGER.info("Global Transaction Clients are initialized. ");
registerSpringShutdownHook();
//代理增强,Spring 所有的Bean都会经过这个方法
@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey)
// do checkers
//检查bean和beanName
if (!doCheckers(bean, beanName))
return bean;
try
//加锁防止并发
synchronized (PROXYED_SET)
if (PROXYED_SET.contains(beanName))
return bean;
interceptor = null;
//check TCC proxy
//检查是否为TCC模式
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext))
// init tcc fence clean task if enable useTccFence
//如果启用useTccFence 失败 ,则初始化TCC清理任务
TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);
//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
//如果是,添加TCC拦截器
interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)interceptor);
else
//不是TCC
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
//判断是否有相关事务注解,如果没有不进行代理
if (!existsAnnotation(new Class[]serviceInterface)
&& !existsAnnotation(interfacesIfJdk))
return bean;
//发现存在全局事务注解标注的Bean对象,添加拦截器
if (globalTransactionalInterceptor == null)
//添加拦截器
globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
ConfigurationCache.addConfigListener(
ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)globalTransactionalInterceptor);
interceptor = globalTransactionalInterceptor;
LOGGER.info("Bean[] with name [] would use interceptor []", bean.getClass().getName(), beanName, interceptor.getClass().getName());
//检查是否为代理对象
if (!AopUtils.isAopProxy(bean))
//不是代理对象,调用父级
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
else
//是代理对象,反射获取代理类中已经存在的拦截器组合,然后添加到这个集合中
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
int pos;
for (Advisor avr : advisor)
// Find the position based on the advisors order, and add to advisors by pos
pos = findAddSeataAdvisorPosition(advised, avr);
advised.addAdvisor(pos, avr);
PROXYED_SET.add(beanName);
return bean;
catch (Exception exx)
throw new RuntimeException(exx);
InitializingBean
:中实现了一个 afterPropertiesSet()
方法,在这个方法中,调用了initClient()
AbstractAutoProxyCreator
:APO动态代理,在之前的的Nacos和Sentiel中都有这个代理类,AOP在我们越往深入学习,在学习源码的会见到的越来越多,越来越重要,很多相关代理,都是通过AOP进行增强,在这个类中,我们需要关注有一个wrapIfNecessary()
方法, 这个方法主要是判断被代理的bean或者类是否需要代理增强,在这个方法中会调用GlobalTransactionalInterceptor.invoke()
进行带来增强。
具体代码如下:
public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor, SeataInterceptor
public GlobalTransactionalInterceptor(FailureHandler failureHandler)
this.failureHandler = failureHandler == null ? DEFAULT_FAIL_HANDLER : failureHandler;
this.disable = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
DEFAULT_DISABLE_GLOBAL_TRANSACTION);
this.order =
ConfigurationFactory.getInstance().getInt(ConfigurationKeys.TM_INTERCEPTOR_ORDER, TM_INTERCEPTOR_ORDER);
degradeCheck = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.CLIENT_DEGRADE_CHECK,
DEFAULT_TM_DEGRADE_CHECK);
if (degradeCheck)
ConfigurationCache.addConfigListener(ConfigurationKeys.CLIENT_DEGRADE_CHECK, this);
degradeCheckPeriod = ConfigurationFactory.getInstance()
.getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_PERIOD, DEFAULT_TM_DEGRADE_CHECK_PERIOD);
degradeCheckAllowTimes = ConfigurationFactory.getInstance()
.getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_ALLOW_TIMES, DEFAULT_TM_DEGRADE_CHECK_ALLOW_TIMES);
EVENT_BUS.register(this);
if (degradeCheckPeriod > 0 && degradeCheckAllowTimes > 0)
startDegradeCheck();
this.initDefaultGlobalTransactionTimeout();
@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable
//获取执行的方法
Class<?> targetClass =
methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class))
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
//获取GlobalTransactional(全局事务)、GlobalLock(全局锁)元数据
final GlobalTransactional globalTransactionalAnnotation =
getAnnotation(method, targetClass, GlobalTransactional.class);
//GlobalLock会将本地事务的执行纳入Seata分布式事务的管理,共同竞争全局锁
//保证全局事务在执行的时候,本地事务不可以操作全局事务的记录
final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);//获取全局锁
boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
if (!localDisable)
if (globalTransactionalAnnotation != null || this.aspectTransactional != null)
AspectTransactional transactional;
if (globalTransactionalAnnotation != null)
transactional = new AspectTransactional(globalTransactionalAnnotation.timeoutMills(),
globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(),
globalTransactionalAnnotation.noRollbackForClassName(),
globalTransactionalAnnotation.noRollbackFor(),
globalTransactionalAnnotation.noRollbackForClassName(),
globalTransactionalAnnotation.propagation(),
globalTransactionalAnnotation.lockRetryInterval(),
globalTransactionalAnnotation.lockRetryTimes());
else
transactional = this.aspectTransactional;
//执行全局事务
return handleGlobalTransaction(methodInvocation, transactional);
else if (globalLockAnnotation != null)
//执行全局锁
return handleGlobalLock(methodInvocation, globalLockAnnotation);
return methodInvocation.proceed();
具体流程图如下所示:
核心源码
在上面我们讲解到 GlobalTransactionalInterceptor
作为全局事务拦截器,一旦执行拦截,就会进入invoke方法,其中,我们会做 @GlobalTransactional
注解的判断,如果有这个注解的存在,会执行全局事务和全局锁,再执行全局事务的时候会调用 handleGlobalTransaction
全局事务处理器,获取事务信息,那我们接下来就来看一下 GlobalTransactionalInterceptor.handleGlobalTransaction
到底是如何执行全局事务的
Object handleGlobalTransaction(final MethodInvocation methodInvocation,
final AspectTransactional aspectTransactional) throws Throwable
boolean succeed = true;
try
return transactionalTemplate.execute(new TransactionalExecutor()
@Override
public Object execute() throws Throwable
return methodInvocation.proceed();
//获取事务名称,默认获取方法名
public String name()
String name = aspectTransactional.getName();
if (!StringUtils.isNullOrEmpty(name))
return name;
return formatMethod(methodInvocation.getMethod());
/**
* 解析GlobalTransation注解属性,封装对对象
* @return
*/
@Override
public TransactionInfo getTransactionInfo()
// reset the value of timeout
//获取超时时间,默认60秒
int timeout = aspectTransactional.getTimeoutMills();
if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT)
timeout = defaultGlobalTransactionTimeout;
//构建事务信息对象
TransactionInfo transactionInfo = new TransactionInfo();
transactionInfo.setTimeOut(timeout);//超时时间
transactionInfo.setName(name());//事务名称
transactionInfo.setPropagation(aspectTransactional.getPropagation());//事务传播
transactionInfo.setLockRetryInterval(aspectTransactional.getLockRetryInterval());//校验或占用全局锁重试间隔
transactionInfo.setLockRetryTimes(aspectTransactional.getLockRetryTimes());//校验或占用全局锁重试次数
Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
//其他构建信息
for (Class<?> rbRule : aspectTransactional.getRollbackFor())
rollbackRules.add(new RollbackRule(rbRule));
for (String rbRule : aspectTransactional.getRollbackForClassName())
rollbackRules.add(new RollbackRule(rbRule));
for (Class<?> rbRule : aspectTransactional.getNoRollbackFor())
rollbackRules.add(new NoRollbackRule(rbRule));
for (String rbRule : aspectTransactional.getNoRollbackForClassName())
rollbackRules.add(new NoRollbackRule(rbRule));
transactionInfo.setRollbackRules(rollbackRules);
return transactionInfo;
);
catch (TransactionalExecutor.ExecutionException e)
//执行异常
TransactionalExecutor.Code code = e.getCode();
switch (code)
case RollbackDone:
throw e.getOriginalException();
case BeginFailure:
succeed = false;
failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case CommitFailure:
succeed = false;
failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case RollbackFailure:
failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());
throw e.getOriginalException();
case RollbackRetrying:
failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());
throw e.getOriginalException();
default:
throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));
finally
if (degradeCheck)
EVENT_BUS.post(new DegradeCheckEvent(succeed));
在这里我们,主要关注一个重点方法 execute()
,这个方法主要用来执行事务的具体流程:
- 获取事务信息
- 执行全局事务
- 发生异常全局回滚,各个数据通过UndoLog进行事务补偿
- 全局事务提交
- 清除所有资源
这个位置也是一个非常核心的一个位置,因为我们所有的业务进来以后都会去走这个位置,具体源码如下所示:
public Object execute(TransactionalExecutor business) throws Throwable
// 1. Get transactionInfo
//获取事务信息
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null)
throw new ShouldNeverHappenException("transactionInfo does not exist");
// 1.1 Get current transaction, if not null, the tx role is GlobalTransactionRole.Participant.
//获取当前事务,主要获取XID
GlobalTransaction tx = GlobalTransactionContext.getCurrent();
// 1.2 Handle the transaction propagation.
//根据配置的不同事务传播行为,执行不同的逻辑
Propagation propagation = txInfo.getPropagation();
SuspendedResourcesHolder suspendedResourcesHolder = null;
try
switch (propagation)
case NOT_SUPPORTED:
// If transaction is existing, suspend it.
if (existingTransaction(tx))
suspendedResourcesHolder = tx.suspend();
// Execute without transaction and return.
return business.execute();
case REQUIRES_NEW:
// If transaction is existing, suspend it, and then begin new transaction.
if (existingTransaction(tx))
suspendedResourcesHolder = tx.suspend();
tx = GlobalTransactionContext.createNew();
// Continue and execute with new transaction
break;
case SUPPORTS:
// If transaction is not existing, execute without transaction.
if (notExistingTransaction(tx))
return business.execute();
// Continue and execute with new transaction
break;
case REQUIRED:
// If current transaction is existing, execute with current transaction,
// else continue and execute with new transaction.
break;
case NEVER:
// If transaction is existing, throw exception.
if (existingTransaction(tx))
throw new TransactionException(
String.format("Existing transaction found for transaction marked with propagation never, xid = %s"
, tx.getXid()));
else
// Execute without transaction and return.
return business.execute();
case MANDATORY:
// If transaction is not existing, throw exception.
if (notExistingTransaction(tx))
throw new TransactionException("No existing transaction found for transaction marked with propagation mandatory");
// Continue and execute with current transaction.
break;
default:
throw new TransactionException("Not Supported Propagation:" + propagation);
// 1.3 If null, create new transaction with role GlobalTransactionRole.Launcher.
//如果当前事务为空,创建一个新的事务
if (tx == null)
tx = GlobalTransactionContext.createNew();
// set current tx config to holder
GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
try
// 2. If the tx role is GlobalTransactionRole.Launcher, send the request of beginTransaction to TC,
// else do nothing. Of course, the hooks will still be triggered.
//开始执行全局事务
beginTransaction(txInfo, tx);
Object rs;
try
// Do Your Business
// 执行当前业务逻辑
//1、在TC注册当前分支事务,TC会在branch_table中插入一条分支事务数据
//2、执行本地update语句,并在执行前后查询数据状态,并把数据前后镜像存入到undo_log中
//3、远程调用其他应用,远程应用接收到XID,也会注册分支事务,写入branch_table以及本地undo_log表
//4、会在lock_table表中插入全局锁数据(一个分支一条)
rs = business.execute();
catch (Throwable ex)
// 3. The needed business exception to rollback.
//发生异常全局回滚,每个事务通过undo_log表进行事务补偿
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
// 4. everything is fine, commit.
//全局提交
commitTransaction(tx);
return rs;
finally
//5. clear
//清理所有资源
resumeGlobalLockConfig(previousConfig);
triggerAfterCompletion();
cleanUp();
finally
// If the transaction is suspended, resume it.
if (suspendedResourcesHolder != null)
tx.resume(suspendedResourcesHolder);
这其中的第三步和第四步其实在向 TC(Seata-Server)发起全局事务的提交或者回滚,在这里我们首先关注执行全局事务的 beginTransaction()
方法
// 向TC发起请求,采用模板模式
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException
try
triggerBeforeBegin();
//对TC发起请求
tx.begin(txInfo.getTimeOut(), txInfo.getName());
triggerAfterBegin();
catch (TransactionException txe)
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.BeginFailure);
在来关注其中,向TC发起请求的 tx.begin()
方法,而调用begin()
方法的类为:DefaultGlobalTransaction
@Override
public void begin(int timeout, String name) throws TransactionException
//判断调用者是否为TM
if (role != GlobalTransactionRole.Launcher)
assertXIDNotNull();
if (LOGGER.isDebugEnabled())
LOGGER.debug("Ignore Begin(): just involved in global transaction []", xid);
return;
assertXIDNull();
String currentXid = RootContext.getXID();
if (currentXid != null)
throw new IllegalStateException("Global transaction already exists," +
" cant begin a new global transaction, currentXid = " + currentXid);
//获取XID
xid = transactionManager.begin(null, null, name, timeout);
status = GlobalStatus.Begin;
//绑定XID
RootContext.bind(xid);
if (LOGGER.isInfoEnabled())
LOGGER.info("Begin new global transaction []", xid);
再来看一下 transactionManager.begin()
方法,这个时候使用的是 DefaultTransactionManager.begin
默认的事务管理者,来获取XID,传入事务相关的信息 ,最好TC返回对应的全局事务XID,它调用的是DefaultTransactionManager.begin()
方法
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException
GlobalBeginRequest request = new GlobalBeginRequest();
request.setTransactionName(name);
request.setTimeout(timeout);
//发送请求得到响应
GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
if (response.getResultCode() == ResultCode.Failed)
throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
//返回XID
return response.getXid();
在这里我们需要关注一个syncCall
,在这里采用的是Netty通讯方式
private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException
try
// 通过Netty发送请求
return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);
catch (TimeoutException toe)
throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);
具体图解如下:
在这里我们需要重点了解 GlobalTransactionScanner
这个类型,在这个类型中继承了一些接口和抽象类,这个类主要作用就是扫描有注解的Bean,并做AOP增强。
-
ApplicationContextAware
:继承这个类型以后,需要实现其方法setApplicationContext()
,当Spring启动完成以后,会自动调用这个类型,将ApplicationContext
给bean
,也就是说,GlobalTransactionScanner
能够很自然的使用Spring环境 -
InitializingBean
: 继承这个接口,需要实现afterPropertiesSet()
,但凡是继承这个接口的类,在初始化的时候,当所有的properties
设置完成以后,会执行这个方法 -
DisposableBean
: 这个类,实现了一个destroy()
这个方法是在销毁的时候去调用 AbstractAutoProxyCreator
: 这个类是Spring实现AOP的一种方式,本质上是一个BeanPostProcessor
,在Bean初始化至去年,调用内部createProxy()
,创建一个Bean的AOP代理Bean并返回,对Bean进行增强。
Seata数据源代理
在上面的环节中,我们讲解了Seata AT模式2PC的执行流程,那么现在我们就来带大家了解一下关于AT数据源代理的信息,这也是AT模式中非常关键的一个重要知识点,大家可以拿起小本子,记下来。
首先AT模式的核心主要分为一下两个
- 开启全局事务,获取全局锁。
- 解析SQL并写入undoLog中。
关于第一点我们已经分析清楚了,第二点就是关于AT模式如何解析SQL并写入undoLog中,但是在这之前,我们需要知道Seata是如何选择数据源,并进行数据源代理的。虽然全局事务拦截成功后最终还是执行了业务方法进行SQL提交和操作,但是由于Seata对数据源进行了代理,所以SQL的解析和undoLog的操作,是在数据源代理中进行完成的。
数据源代理是Seata中一个非常重要的知识点,在分布式事务运行过程中,undoLog的记录、资源的锁定,用户都是无感知的,因为这些操作都是数据源的代理中完成了,恰恰是这样,我们才要去了解,这样不仅有利于我们了解Seata的核心操作,还能对以后源码阅读有所帮助,因为其实很多底层代码都会去使用这样用户无感知的方式(代理)去实现。
同样,我们在之前的寻找源码入口的时候,通过我们项目中引入的jar找到一个 SeataAutoConfiguration
类,我们在里面找到一个SeataDataSourceBeanPostProcessor()
,这个就是我们数据源代理的入口方法
我们进入SeataDataSourceBeanPostProcessor
类里面,发现继承了一个 BeanPostProcessor
,这个接口我们应该很熟悉,这个是Sprng的拓展接口,所有的Bean对象,都有进入两个方法 postProcessAfterInitialization()
和 postProcessBeforeInitialization()
这两个方法都是由 BeanPostProcessor
提供的,这两个方法,一个是初始化之前执行Before
。一个是在初始化之后执行After
,主要用来对比我们的的Bean是否为数据源代理对象。
在这里我们需要关注到一个postProcessAfterInitialization.proxyDataSource()
方法,这个里面
private Object proxyDataSource(Object originBean)
DataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) originBean);
if (this.useJdkProxy)
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), SpringProxyUtils.getAllInterfaces(originBean), (proxy, method, args) -> handleMethodProxy(dataSourceProxy, method, args, originBean));
else
return Enhancer.create(originBean.getClass(), (MethodInterceptor) (proxy, method, args, methodProxy) -> handleMethodProxy(dataSourceProxy, method, args, originBean));
这里有一个DataSourceProxy
代理对象,我们需要看的就是这个类,这个就是我们数据库代理的对象,我们从我们下载的源码项目中,搜索这个代理对象,当我们打开这个类的目录时发现,除了这个,还有ConnectionProxy
连接对象、StatementProxy
、PreparedStatementProxy
SQL执行对象,这些都被Seata进行了代理,为什么要对这些都进行代理,代理的目的其实为了执行Seata的业务逻辑,生成undoLog,全局事务的开启,事务的提交回滚等操作
DataSourceProxy
具体做了什么,主要功能有哪些,我们来看一下。他在源码中是如何体现的,我们需要关注的是init()
public class DataSourceProxy extends AbstractDataSourceProxy implements Resource
private String resourceGroupId;
private void init(DataSource dataSource, String resourceGroupId)
//资源组ID,默认是“default”这个默认值
this.resourceGroupId = resourceGroupId;
try (Connection connection = dataSource.getConnection())
//根据原始数据源得到JDBC连接和数据库类型
jdbcUrl = connection.getMetaData().getURL();
dbType = JdbcUtils.getDbType(jdbcUrl);
if (JdbcConstants.ORACLE.equals(dbType))
userName = connection.getMetaData().getUserName();
else if (JdbcConstants.MARIADB.equals(dbType))
dbType = JdbcConstants.mysql;
catch (SQLException e)
throw new IllegalStateException("can not init dataSource", e);
initResourceId();
DefaultResourceManager.get().registerResource(this);
if (ENABLE_TABLE_META_CHECKER_ENABLE)
//如果配置开关打开,会定时在线程池不断更新表的元数据缓存信息
tableMetaExecutor.scheduleAtFixedRate(() ->
try (Connection connection = dataSource.getConnection())
TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType())
.refresh(connection, DataSourceProxy.this.getResourceId());
catch (Exception ignore)
, 0, TABLE_META_CHECKER_INTERVAL, TimeUnit.MILLISECONDS);
//Set the default branch type to AT in the RootContext.
RootContext.setDefaultBranchType(this.getBranchType());
从上面我们可以看出,他主要做了以下几点的增强:
- 给每个数据源标识资源组ID
- 如果打开配置,会有一个定时线程池定时更新表的元数据信息并缓存到本地
- 生成代理连接
ConnectionProxy
对象
在这三个增强功能里面,第三个是最重要的,在AT模式里面,会自动记录undoLog,资源锁定,都是通过ConnectionProxy
完成的,除此之外 DataSrouceProxy
重写了一个方法 getConnection
,因为这里返回的是一个 ConnectionProxy
,而不是原生的Connection
@Override
public ConnectionProxy getConnection() throws SQLException
Connection targetConnection = targetDataSource.getConnection();
return new ConnectionProxy(this, targetConnection);
@Override
public ConnectionProxy getConnection(String username, String password) throws SQLException
Connection targetConnection = targetDataSource.getConnection(username, password);
return new ConnectionProxy(this, targetConnection);
ConnectionProxy
ConnectionProxy
继承 AbstractConnectionProxy
,在这个父类中有很多公用的方法,在这个父类有 PreparedStatementProxy
、StatementProxy
、DataSourceProxy
所以我们需要先来看一下AbstractConnectionProxy
,因为这里封装了需要我们用到的通用方法和逻辑,在其中我们需要关注的主要在于 PreparedStatementProxy
和 StatementProxy
,在这里的逻辑主要是数据源连接的步骤,连接获取,创建执行对象等等
@Override
public Statement createStatement() throws SQLException
//调用真实连接对象获取Statement对象
Statement targetStatement = getTargetConnection().createStatement();
//创建Statement的代理
return new StatementProxy(this, targetStatement);
@Override
public PreparedStatement prepareStatement(String sql) throws SQLException
//获取数据库类型 mysql/oracle
String dbType = getDbType();
// support oracle 10.2+
PreparedStatement targetPreparedStatement = null;
//如果是AT模式且开启全局事务
if (BranchType.AT == RootContext.getBranchType())
List<SQLRecognizer> sqlRecognizers = SQLVisitorFactory.get(sql, dbType);
if (sqlRecognizers != null && sqlRecognizers.size() == 1)
SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
if (sqlRecognizer != null && sqlRecognizer.getSQLType() == SQLType.INSERT)
//获取表的元数据
TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(getTargetConnection(),
sqlRecognizer.getTableName(), getDataSourceProxy().getResourceId());
//得到表的主键列名
String[] pkNameArray = new String[tableMeta.getPrimaryKeyOnlyName().size()];
tableMeta.getPrimaryKeyOnlyName().toArray(pkNameArray);
targetPreparedStatement = getTargetConnection().prepareStatement(sql,pkNameArray);
if (targetPreparedStatement == null)
targetPreparedStatement = getTargetConnection().prepareStatement(sql);
//创建PreparedStatementProxy代理
return new PreparedStatementProxy(this, targetPreparedStatement, sql);
在这两个代理对象中,都用到了以下几个方法:
@Override
public ResultSet executeQuery(String sql) throws SQLException
this.targetSQL = sql;
return ExecuteTemplate.execute(this, (statement, args) -> statement.executeQuery((String) args[0]), sql);
@Override
public int executeUpdate(String sql) throws SQLException
this.targetSQL = sql;
return ExecuteTemplate.execute(this, (statement, args) -> statement.executeUpdate((String) args[0]), sql);
@Override
public boolean execute(String sql) throws SQLException
this.targetSQL = sql;
return ExecuteTemplate.execute(this, (statement, args) -> statement.execute((String) args[0]), sql);
在这些方法中都调用了 ExecuteTemplate.execute()
,所以我们就看一下在 ExecuteTemplate
类中具体是做了什么操作:
public class ExecuteTemplate
public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
StatementProxy<S> statementProxy,
StatementCallback<T, S> statementCallback,
Object... args) throws SQLException
//如果没有全局锁,并且不是AT模式,直接执行SQL
if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType())
// Just work as original statement
return statementCallback.execute(statementProxy.getTargetStatement(), args);
//得到数据库类型- mysql/oracle
String dbType = statementProxy.getConnectionProxy().getDbType();
if (CollectionUtils.isEmpty(sqlRecognizers))
//sqlRecognizers 为SQL语句的解析器,获取执行的SQL,通过它可以获得SQL语句表名、相关的列名、类型等信息,最后解析出对应的SQL表达式
sqlRecognizers = SQLVisitorFactory.get(
statementProxy.getTargetSQL(),
dbType);
Executor<T> executor;
if (CollectionUtils.isEmpty(sqlRecognizers))
//如果seata没有找到合适的SQL语句解析器,那么便创建简单执行器PlainExecutor
//PlainExecutor直接使用原生的Statment对象执行SQL
executor = new PlainExecutor<>(statementProxy, statementCallback);
else
if (sqlRecognizers.size() == 1)
SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
switch (sqlRecognizer.getSQLType())
//新增
case INSERT:
executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
new Class[]StatementProxy.class, StatementCallback.class, SQLRecognizer.class,
new Object[]statementProxy, statementCallback, sqlRecognizer);
break;
//修改
case UPDATE:
executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
//删除
case DELETE:
executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
//加锁
case SELECT_FOR_UPDATE:
executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
//插入加锁
case INSERT_ON_DUPLICATE_UPDATE:
switch (dbType)
case JdbcConstants.MYSQL:
case JdbcConstants.MARIADB:
executor =
new MySQLInsertOrUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
break;
default:
throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE");
break;
//原生
default:
executor = new PlainExecutor<>(statementProxy, statementCallback);
break;
else
//批量处理SQL语句
executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
T rs;
try
//执行
rs = executor.execute(args);
catch (Throwable ex)
if (!(ex instanceof SQLException))
// Turn other exception into SQLException
ex = new SQLException(ex);
throw (SQLException) ex;
return rs;
在 ExecuteTemplate
就一个 execute()
,Seata将SQL执行委托给不同的执行器(模板),Seata提供了6种执行器也就是我们代码 case 中(INSERT
,UPDATE
,DELETE
,SELECT_FOR_UPDATE
,INSERT_ON_DUPLICATE_UPDATE
),这些执行器的父类都是AbstractDMLBaseExecutor
UpdateExecutor
: 执行update语句InsertExecutor
: 执行insert语句DeleteExecutor
: 执行delete语句SelectForUpdateExecutor
: 执行select for update语句PlainExecutor
: 执行普通查询语句MultiExecutor
: 复合执行器,在一条SQL语句中执行多条语句
关系图如下:
然后我们找到rs = executor.execute(args);
最终执行的方法,找到最顶级的父类BaseTransactionalExecutor.execute()
@Override
public T execute(Object... args) throws Throwable
String xid = RootContext.getXID();
if (xid != null)
//获取XID
statementProxy.getConnectionProxy().bind(xid);
//设置全局锁
statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
return doExecute(args);
在根据doExecute(args);
找到其中的重写方法 AbstractDMLBaseExecutor.doExecute()
@Override
public T doExecute(Object... args) throws Throwable
AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
//是否自动提交
if (connectionProxy.getAutoCommit())
return executeAutoCommitTrue(args);
else
return executeAutoCommitFalse(args);
对于数据库而言,本身都是自动提交的,所以我们进入executeAutoCommitTrue()
protected T executeAutoCommitTrue(Object[] args) throws Throwable
ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
try
//设置为手动提交
connectionProxy.changeAutoCommit();
return new LockRetryPolicy(connectionProxy).execute(() ->
//调用手动提交方法,得到分支执行的最终结果
T result = executeAutoCommitFalse(args);
//执行提交
connectionProxy.commit();
return result;
);
catch (Exception e)
// when exception occur in finally,this exception will lost, so just print it here
LOGGER.error("execute executeAutoCommitTrue error:", e.getMessage(), e);
if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict())
connectionProxy.getTargetConnection().rollback();
throw e;
finally
connectionProxy.getContext().reset();
connectionProxy.setAutoCommit(true);
connectionProxy.changeAutoCommit()
方法,修改为手动提交后,我们看来最关键的代码executeAutoCommitFalse()
protected T executeAutoCommitFalse(Object[] args) throws Exception
if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk())
throw new NotSupportYetException("multi pk only support mysql!");
//获取前镜像
TableRecords beforeImage = beforeImage();
//执行具体业务
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
//获取执行数量
int updateCount = statementProxy.getUpdateCount();
//判断如果执行数量大于0
if (updateCount > 0)
//获取后镜像
TableRecords afterImage = afterImage(beforeImage);
//暂存到undolog中,在Commit的时候保存到数据库
prepareUndoLog(beforeImage, afterImage);
return result;
我们再回到executeAutoCommitTrue
中,去看看提交做了哪些操作connectionProxy.commit();
@Override
public void commit() throws SQLException
try
lockRetryPolicy.execute(() ->
//具体执行
doCommit();
return null;
);
catch (SQLException e)
if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged())
rollback();
throw e;
catch (Exception e)
throw new SQLException(e);
进入到doCommit()
中
private void doCommit() throws SQLException
//判断是否存在全局事务
if (context.inGlobalTransaction())
processGlobalTransactionCommit();
else if (context.isGlobalLockRequire())
processLocalCommitWithGlobalLocks();
else
targetConnection.commit();
作为分布式事务,一定是存在全局事务的,所以我们进入 processGlobalTransactionCommit()
private void processGlobalTransactionCommit() throws SQLException
try
//注册分支事务
register();
catch (TransactionException e)
recognizeLockKeyConflictException(e, context.buildLockKeys());
try
//写入数据库undolog
UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
//执行原生提交 一阶段提交
targetConnection.commit();
catch (Throwable ex)
LOGGER.error("process connectionProxy commit error: ", ex.getMessage(), ex);
report(false);
throw new SQLException(ex);
if (IS_REPORT_SUCCESS_ENABLE)
report(true);
context.reset();
其中register()
方法就是注册分支事务的方法,同时还会将undoLog写入数据库和执行提交等操作
//注册分支事务,生成分支事务ID
private void register() throws TransactionException
if (!context.hasUndoLog() || !context.hasLockKey())
return;
//注册分支事务
Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
null, context.getXid(), context.getApplicationData(), context.buildLockKeys());
context.setBranchId(branchId);
然后我们在回到processGlobalTransactionCommit
中,看看写入数据库中的flushUndoLogs()
@Override
public void flushUndoLogs(ConnectionProxy cp) throws SQLException
ConnectionContext connectionContext = cp.getContext();
if (!connectionContext.hasUndoLog())
return;
//获取XID
String xid = connectionContext.getXid();
//获取分支ID
long branchId = connectionContext.getBranchId();
BranchUndoLog branchUndoLog = new BranchUndoLog();
branchUndoLog.setXid(xid);
branchUndoLog.setBranchId(branchId);
branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems());
UndoLogParser parser = UndoLogParserFactory.getInstance();
byte[] undoLogContent = parser.encode(branchUndoLog);
if (LOGGER.isDebugEnabled())
LOGGER.debug("Flushing UNDO LOG: ", new String(undoLogContent, Constants.DEFAULT_CHARSET));
CompressorType compressorType = CompressorType.NONE;
if (needCompress(undoLogContent))
compressorType = ROLLBACK_INFO_COMPRESS_TYPE;
undoLogContent = CompressorFactory.getCompressor(compressorType.getCode()).compress(undoLogContent);
//写入数据库具体位置
insertUndoLogWithNormal(xid, branchId, buildContext(parser.getName(), compressorType), undoLogContent, cp.getTargetConnection());
具体写入方法,此时我们使用的是MySql,所以执行的是MySql实现类MySQLUndoLogManager.insertUndoLogWithNormal()
@Override
protected void insertUndoLogWithNormal(String xid, long branchId, String rollbackCtx, byte[] undoLogContent,
Connection conn) throws SQLException
insertUndoLog(xid, branchId, rollbackCtx, undoLogContent, State.Normal, conn);
//具体写入操作
private void insertUndoLog(String xid, long branchId, String rollbackCtx, byte[] undoLogContent,
State state, Connection conn) throws SQLException
try (PreparedStatement pst = conn.prepareStatement(INSERT_UNDO_LOG_SQL))
pst.setLong(1, branchId);
pst.setString(2, xid);
pst.setString(3, rollbackCtx);
pst.setBytes(4, undoLogContent);
pst.setInt(5, state.getValue());
pst.executeUpdate();
catch (Exception e)
if (!(e instanceof SQLException))
e = new SQLException(e);
throw (SQLException) e;
具体流程如下所示:
Seata 服务端
我们找到Server.java
这里就是启动入口,在这个入口中找到协调者,因为TC整体的操作就是协调整体的全局事务
//默认协调者
DefaultCoordinator coordinator = DefaultCoordinator.getInstance(nettyRemotingServer);
在DefaultCoordinator
类中我们找到 一个doGlobalBegin
这个就是处理全局事务开始的方法,以及全局提交 doGlobalCommit
和全局回滚 doGlobalRollback
//处理全局事务
@Override
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
throws TransactionException
//响应客户端xid
response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
request.getTransactionName(), request.getTimeout()));
if (LOGGER.isInfoEnabled())
LOGGER.info("Begin new global transaction applicationId: ,transactionServiceGroup: , transactionName: ,timeout:,xid:",
rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());
//处理全局提交
@Override
protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)
throws TransactionException
MDC.put(RootContext.MDC_KEY_XID, request.getXid());
response.setGlobalStatus(core.commit(request.getXid()));
//处理全局回滚
@Override
protected void doGlobalRollback(GlobalRollbackRequest request, GlobalRollbackResponse response,
RpcContext rpcContext) throws TransactionException
MDC.put(RootContext.MDC_KEY_XID, request.getXid());
response.setGlobalStatus(core.rollback(request.getXid()));
在这里我们首先关注 doGlobalBegin
中 core.begin()
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException
//创建全局事务Session
GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
timeout);
MDC.put(RootContext.MDC_KEY_XID, session.getXid());
//为Session重添加回调监听,SessionHolder.getRootSessionManager() 获取一个全局Session管理器DataBaseSessionManager
//观察者设计模式,创以上是关于分布式事务(Seata)原理 详解篇,建议收藏的主要内容,如果未能解决你的问题,请参考以下文章