Seata流程源码梳理上篇-TMRM处理
Posted _微风轻起
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Seata流程源码梳理上篇-TMRM处理相关的知识,希望对你有一定的参考价值。
这一篇我们主要来分析下Seata的AT模式的流程处理。
一、流程案例
1、案例源码
我们本地流程梳理用的是基于spring-cloud
框架,注册中心是eurak,服务间调用的是feign,源码下载的是官网的(当然你如果对dubbo更熟悉,也可以使用dubbo结构的),案例地址–https://github.com/seata/seata-samples。
这个我本地的话,将其他的模块都删除了。然后启动的话,注意数据库连接信息的修改以及脚本初始化。
这个业务是一个用户账号(account)下订单(order),然后减库存(stock),整个业务的发起就是在business发起的。
2、服务器源码
然后seata服务端,我下载的1.4.2版本的源码在本地启动。
对于服务端的启动,用的是db模式
注册的话,修改为eurka
注意运行对应的数据库脚本。
二、一些前置概念的介绍
可以先再看下其官网的介绍–http://seata.io/zh-cn/docs/overview/what-is-seata.html。然后我们结合源码再来说下。
1、整个案例的TM、RM、TC业务逻辑
TC (Transaction Coordinator) - 事务协调者
维护全局和分支事务的状态,驱动全局事务提交或回滚。
TM (Transaction Manager) - 事务管理器
定义全局事务的范围:开始全局事务、提交或回滚全局事务。
RM (Resource Manager) - 资源管理器
管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
这里主要我们需要明白TM、TC的概念,TM其实就是全局事务的发起者,TC就是协调者,例如如果全局事务需要回滚或者提交,就是TM去通知TC,然后TC再获取、调用该全局事务下的各个分支,通知他们去进行提交或回滚。
TC其实就是我们的seata服务端,如果我们要开启全局事务,通过TM
(对于我们代码来说,TM也就是TransactionManager
接口)发起(发起也就是调用begin
方法,获取到本次分布式事务唯一全局标识xid
),也就是我们在调用businessService.purchase("U100000", "C100000", 30)
方法时,就会通过拦截器拦截,进行对应分布式事务的逻辑织入,然后在里面调用TC
,告诉它我们要开始分布式事务了,然后TC
就是在global_table
添加一条记录,就是本次分布式事务的唯一标识记录:
CREATE TABLE `global_table` (
`xid` varchar(128) NOT NULL,
`transaction_id` bigint DEFAULT NULL,
`status` tinyint NOT NULL,
`application_id` varchar(64) DEFAULT NULL,
`transaction_service_group` varchar(64) DEFAULT NULL,
`transaction_name` varchar(64) DEFAULT NULL,
`timeout` int DEFAULT NULL,
`begin_time` bigint DEFAULT NULL,
`application_data` varchar(2000) DEFAULT NULL,
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`,`status`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
然后TC
(也就是seata服务端)将xid
返回,之后就会具体来调用到业务方法,也就是purchase(...)
方法的业务逻辑:
@GlobalTransactional
public void purchase(String userId, String commodityCode, int orderCount)
stockFeignClient.deduct(commodityCode, orderCount);
orderFeignClient.create(userId, commodityCode, orderCount);
if (!validData())
throw new RuntimeException("账户或库存不足,执行回滚");
然后我们就会通过feign
去调用stock
服务以及order
服务,在调用的时候,将xid
加在请求头部中。
stock
接受到调用后,seata
就会通过拓展spring提供的HandlerInterceptor
接口(具体可以去了解下这个接口)实现SeataHandlerInterceptor
,从头部中获取到xid,然后绑定到当前线程中,然后去执行对应的业务。
对应的业务方法就会去操作DB,而这个时候,seata同样有个织入逻辑,也就是代理spring本身的DataSource
-DataSourceProxy
类,当操作数据库时,通过DataSource
获取Connection
时,其又会返回ConnectionProxy
类来代理本身的Connection
,然后在操作数据库之前、之后,添加对应的seata
逻辑。例如:
1、记录undo_log
表的数据,用于回滚时候的恢复重做
CREATE TABLE `undo_log` (
`id` bigint NOT NULL AUTO_INCREMENT,
`branch_id` bigint NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=147 DEFAULT CHARSET=utf8mb3;
2、通过RM
(也就是ResourceManager
接口)进行分支注册(调用ResourceManager
接口的branchRegister方法),也就是调用TC
告诉它,然后TC
就会在branch_table
表添加一条记录,这里的一条记录,就是一个TM
对应的数据
CREATE TABLE `branch_table` (
`branch_id` bigint NOT NULL,
`xid` varchar(128) NOT NULL,
`transaction_id` bigint DEFAULT NULL,
`resource_group_id` varchar(32) DEFAULT NULL,
`resource_id` varchar(256) DEFAULT NULL,
`lock_key` varchar(128) DEFAULT NULL,
`branch_type` varchar(8) DEFAULT NULL,
`status` tinyint DEFAULT NULL,
`client_id` varchar(64) DEFAULT NULL,
`application_data` varchar(2000) DEFAULT NULL,
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
然后当purchase(String userId, String commodityCode, int orderCount)
方法调用order
、stock
成功完成后,这个时候就会在进入到TM
也就是对应的拦截器逻辑中,如果成功执行分布式事务提交操作,失败就会捕获到异常,进行全局事务的回滚操作。例如TM
调用TC
告诉它完成分布式事务的提交,然后TC
就会通过xid
获取到当前事务下有几个分支,也就是对应RM
完成提交(ResourceManager
的branchCommit
方法),这个就是整个seata定义分布式事务的基本逻辑。
2、TM(TransactionManager)
public interface TransactionManager
/**
* 开启一个全局事务(也就是调用),返回xid,也就是本次全局事务的标识
*/
String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException;
/**
* Global commit.
*
* @param xid XID of the global transaction.
* @return Status of the global transaction after committing.
* @throws TransactionException Any exception that fails this will be wrapped with TransactionException and thrown
* out.
*/
GlobalStatus commit(String xid) throws TransactionException;
/**
* Global rollback.
*
* @param xid XID of the global transaction
* @return Status of the global transaction after rollbacking.
* @throws TransactionException Any exception that fails this will be wrapped with TransactionException and thrown
* out.
*/
GlobalStatus rollback(String xid) throws TransactionException;
.........
3、SeataHandlerInterceptor
public class SeataHandlerInterceptor implements HandlerInterceptor
private static final Logger log = LoggerFactory
.getLogger(SeataHandlerInterceptor.class);
/**
* 从头部中获取xid,绑定到当前线程
**/
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response,
Object handler)
String xid = RootContext.getXID();
//public static final String KEY_XID = "TX_XID";
String rpcXid = request.getHeader(RootContext.KEY_XID);
if (log.isDebugEnabled())
log.debug("xid in RootContext xid in RpcContext ", xid, rpcXid);
if (xid == null && rpcXid != null)
RootContext.bind(rpcXid);
if (log.isDebugEnabled())
log.debug("bind to RootContext", rpcXid);
return true;
/**
* 业务执行完后,删除
**/
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response,
Object handler, Exception e)
String rpcXid = request.getHeader(RootContext.KEY_XID);
if (StringUtils.isEmpty(rpcXid))
return;
String unbindXid = RootContext.unbind();
if (log.isDebugEnabled())
log.debug("unbind from RootContext", unbindXid);
if (!rpcXid.equalsIgnoreCase(unbindXid))
log.warn("xid in change during RPC from to ", rpcXid, unbindXid);
if (unbindXid != null)
RootContext.bind(unbindXid);
log.warn("bind back to RootContext", unbindXid);
4、RM(ResourceManager)
三、TM相关流程源码
我们上面简单介绍了seata分布式事务的简单流程,下面我们就来具体分析下源码的流转。将上面的各个应用启动后,我们调用http://localhost:8084/purchase/commit
,正式执行我们的业务流程。
1、拦截对应的使用了全局事务的方法
我们在purchase(...)
方法使用的@GlobalTransactional
注解,就会有对应的拦截器来解析对应的逻辑增强,加入对应的全局事务。
如果没有对应全局事务的标签,就不处理,直接通过methodInvocation.proceed()
执行对应的方法。我们这里是有全局事务的,就通过handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation)
来开始对应的逻辑处理。
2、处理全局事务
Object handleGlobalTransaction(final MethodInvocation methodInvocation,
final GlobalTransactional globalTrxAnno) throws Throwable
boolean succeed = true;
try
return transactionalTemplate.execute(new TransactionalExecutor()
@Override
public Object execute() throws Throwable
return methodInvocation.proceed();
.......
@Override
public TransactionInfo getTransactionInfo()
// reset the value of timeout
int timeout = globalTrxAnno.timeoutMills();
if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT)
timeout = defaultGlobalTransactionTimeout;
// 这个是本次事务的对应信息、例如传播级别、超时时间
TransactionInfo transactionInfo = new TransactionInfo();
transactionInfo.setTimeOut(timeout);
transactionInfo.setName(name());
transactionInfo.setPropagation(globalTrxAnno.propagation());
transactionInfo.setLockRetryInternal(globalTrxAnno.lockRetryInternal());
transactionInfo.setLockRetryTimes(globalTrxAnno.lockRetryTimes());
.........
return transactionInfo;
);
catch (TransactionalExecutor.ExecutionException e)
.........
finally
if (degradeCheck)
EVENT_BUS.post(new DegradeCheckEvent(succeed));
这里的逻辑主要是给到TransactionalTemplate
来处理对应的逻辑。
3、全局事务的整个流程逻辑
public class TransactionalTemplate
..........
public Object execute(TransactionalExecutor business) throws Throwable
// 1. Get transactionInfo
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null)
throw new ShouldNeverHappenException("transactionInfo does not exist");
// 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
GlobalTransaction tx = GlobalTransactionContext.getCurrent();
// 1.2 Handle the transaction propagation.
Propagation propagation = txInfo.getPropagation();
SuspendedResourcesHolder suspendedResourcesHolder = null;
try
switch (propagation)
case NOT_SUPPORTED:
// If transaction is existing, suspend it.
if (existingTransaction(tx))
suspendedResourcesHolder = tx.suspend();
// Execute without transaction and return.
return business.execute();
case REQUIRES_NEW:
// If transaction is existing, suspend it, and then begin new transaction.
if (existingTransaction(tx))
suspendedResourcesHolder = tx.suspend();
tx = GlobalTransactionContext.createNew();
// Continue and execute with new transaction
break;
............
default:
throw new TransactionException("Not Supported Propagation:" + propagation);
// 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
if (tx == null)
tx = GlobalTransactionContext.createNew();
// set current tx config to holder
GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
try
..........
return rs;
finally
//5. clear
resumeGlobalLockConfig(previousConfig);
triggerAfterCompletion();
cleanUp();
finally
// If the transaction is suspended, resume it.
if (suspendedResourcesHolder != null)
tx.resume(suspendedResourcesHolder);
这里其实主要分为两部分:
一部分是分布式事务其本身也是一个事务,所以前面的switch (propagation)
里面的逻辑,就是遵循spring本身的事务传播机制。例如如果是NOT_SUPPORTED
,其不需要事务,就会将原来的事务挂起,然后直接执行对应的业务,而如果是REQUIRES_NEW
的话,其就需要先将原来的事务挂起,再开启一个新事务来继续处理往下的逻辑。
另一部分就是具体的分布式事务的处理了。
// 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
if (tx == null)
tx = GlobalTransactionContext.createNew();
// set current tx config to holder
GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
try
// 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
// else do nothing. Of course, the hooks will still be triggered.
beginTransaction(txInfo, tx);
Object rs;
try
// Do Your Business
rs = business.execute();
catch (Throwable ex)
// 3. The needed business exception to rollback.
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
// 4. everything is fine, commit.
commitTransaction(tx);
return rs;
finally
//5. clear
resumeGlobalLockConfig(previousConfig);
triggerAfterCompletion();
cleanUp();
finally
// If the transaction is suspended, resume it.
if (suspendedResourcesHolder != null)
tx.resume(suspendedResourcesHolder);
1、获取DefaultGlobalTransaction
这里首先就是通过GlobalTransactionContext.createNew()
获取DefaultGlobalTransaction
public static GlobalTransaction createNew()
return new DefaultGlobalTransaction();
DefaultGlobalTransaction()
this(null, GlobalStatus.UnKnown, GlobalTransactionRole.Launcher);
/**
* Instantiates a new Default global transaction.
*
* @param xid the xid
* @param status the status
* @param role the role
*/
DefaultGlobalTransaction(String xid, GlobalStatus status, GlobalTransactionRole role)
this.transactionManager = TransactionManagerHolder.get();
this.xid = xid;
this.status = status;
this.role = role;
这个类里面可以获取到TransactionManager
事务管理器,也就是TM
,transactionManager
是单例模式创建的。
2、开启事务(调用TC获取xid)
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException <以上是关于Seata流程源码梳理上篇-TMRM处理的主要内容,如果未能解决你的问题,请参考以下文章
微服务39分布式事务Seata源码解析七:图解Seata事务执行流程之开启全局事务
Seata执行整体流程(AT模式)| Seata源码 - 自动配置数据库代理 | AT和XA的区别
Seata执行整体流程(AT模式)| Seata源码 - 自动配置数据库代理 | AT和XA的区别