分布式事务详解,并带有lcn源码解析。
Posted Nuan_Feng
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式事务详解,并带有lcn源码解析。相关的知识,希望对你有一定的参考价值。
文章目录
1):为什么需要分布式事务?
假设用户购买商品时,会生成一个订单,修改库存,修改用户积分。
倘若单库的话,可通过开启事务方式,一起失败,或者一起成功。
但是当体量达到一个级别,不得不采用分库分表的方式,将其垂直拆分成多个库。比如用户中心库,库存中心库,订单模块库。
而服务也对应的进行了拆分,用户中心,库存中心,订单中心三个服务。修改数据,不得不跨服务访问库,这个时候,假设订单模块库成功了,库存中心库执行失败了,那么就要通知订单中心进行回滚操作。这就添加了复杂性。
随之而来出现了很多针对此场景的解决方案。
2):常见的解决方案如下?
分布式事务实现方案从类型上去分刚性事务、柔型事务。
刚性事务:通常无业务改造,强一致性,原生支持回滚/隔离性,低并发,适合短事务。一般出现在金融领域。
柔性事务:有业务改造,最终一致性,实现补偿接口,实现资源锁定接口,高并发,适合长事务。只追求数据最终结果的可采用。
刚性事务:XA 协议(2PC、JTA、JTS)、3PC
柔型事务:TCC/FMT、Saga(状态机模式、Aop模式)、本地事务消息、消息事务(半消息)
2)1):二阶段提交(2PC)
引入一个协调者,统一掌控所有参与者,并指示他们是否操作结果提交还是回滚
分为2个阶段
投票阶段:参与则通知协调者,协调者反馈结果
提交阶段:收到参与者反馈后,协调者再向参与者发出通知,根据反馈情况决定每个参与者到底还是提交还是回滚
例子:老大BOSS说要吃饭,统计下面员工意见,有员工ABC同意去,但D没回复,则BOSS依旧处于接受信息状态,ABC则阻塞.当统计了ABCD员工意见后,如果有一个员工不同意,则通知ABCD不去(回滚),然后ABCD收到后ACK BOSS,这个阶段如果BOSS没收到会一直阻塞
缺点
执行过程中,所有节点处于阻塞,所有节点持有资源锁定状态,当有一个节点出现问题无法回复,则会一直阻塞,就需要更复杂的超时机制处理.
2)2):TXC逆向SQL
1. 拦截,并解析SQL
2. 查询受影响的行数,并记录值
TXM通知事务提交,则删除记录
若回滚则,将记录的值创建逆向SQL,然后执行
2)3):TCC(Try、Confirm、Cancel)
//尝试方法
function try()
//记录日志
todo save A 转出了 100 元
todo save B 转入了 100 元
//执行转账
update amount set balacne = balacne-100 where id = 1
update amount set balacne = balacne+100 where id = 2
//确认方法
function confirm()
//清理日志
clean save A 转出了 100 元
clean save B 转出了 100 元
//取消方法
function cancle()
//加载日志
load log A
load log B
//退钱
update amount set balacne = balacne+100 where id = 1
update amount set balacne = balacne-100 where id = 2
每个服务需要实现三个方法,开发较重。但是针对db和redis的混合开发,有不错的支持。
2)4):增量日志
确保最终一致性,延迟取决于日志对比时间
模仿mysql主从复制,binLog,涉及修改添加到日志,然后间隔多长时间进行正向反向表数据比对.
2)5):补偿事务
在业务端执行业务逆向操作事务
flag1=执行事务一返回结果
if(flag1)
flag2=执行事务二返回结果
if(flag2)
...执行事务...
else
回滚事务二
else
回滚事务一
缺点
嵌套过多
不同业务需要写不同的补偿事务
不具备通用性
没有考虑补偿事务失败
2)6):后置提交优化
一个事务,分别为执行和提交2阶段
执行比提交耗时长
改变事务执行与提交时序,变成事务先执行然后一起提交
执行事务一,提交.执行事务耗时200ms,提交2ms
执行事务二,提交.执行事务耗时200ms,提交2ms
执行事务三,提交.执行事务耗时200ms,提交2ms
当执行事务二~三之间出现异常,数据就不一致,时间范围为202+202ms
通过改变时序后:
执行事务一,执行事务耗时200ms
执行事务二,执行事务耗时200ms
执行事务三,执行事务耗时200ms
提交事务一,提交2ms
提交事务二,提交2ms
提交事务三,提交2ms
后置优化后,在提交事务二~三阶段才会出现不一致情况,时间耗时4ms
虽然不能根本解决数据一致问题,但通过缩小时间概率,降低不一致概率
缺点:
串行执行事务提交后,连接就释放了,但后置提交优化后,所有库的连接,需要等待全部事务执行完才能释放,数据库连接占用时间加长,吞吐降低了
3):txlcn源码解析
执行时序图
3)1):事务发起方执行分布式事务流程
仔细思考,作为发起方在整个分布式事务中出现情况如下:
- 发起方执行成功(在这种情况下,所有参与方均执行成功,发起方才能成功。)
- 参与方第二阶段成功,提交。
- 参与方第二阶段失败,回滚。通知txm加入补偿日志。
- 自身提交or回滚失败,通知txm加入补偿日志。
- 发起方失败,自身回滚。通知tx-m执行失败,通知其他服务回滚。
当我们在方法上添加TxTransaction时,表示开启分布式事务。注意作为服务发起方需要标记isStart=true。
这个时候会被aop环绕注解拦截。com.codingapi.tx.springcloud.interceptor.TransactionAspect#transactionRunning
@Around("@annotation(com.codingapi.tx.annotation.TxTransaction)")
public Object transactionRunning(ProceedingJoinPoint point) throws Throwable
logger.debug("annotation-TransactionRunning-start---->");
Object obj = txManagerInterceptor.around(point);
logger.debug("annotation-TransactionRunning-end---->");
return obj;
从请求头中获取事务组id和事务模式。
com.codingapi.tx.springcloud.interceptor.TxManagerInterceptor#around
public Object around(ProceedingJoinPoint point) throws Throwable
String groupId = null;
String mode = null;
try
//如果是发起方,这里都是null。
//如果是调用方,则这里会有发起方通过feign发送过来的事务组id和事务模式。
//事务组id:事务唯一标识。
//事务模式:包括lcn,或者tcc等。
RequestAttributes requestAttributes = RequestContextHolder.currentRequestAttributes();
HttpServletRequest request = requestAttributes == null ? null : ((ServletRequestAttributes) requestAttributes).getRequest();
groupId = request == null ? null : request.getHeader("tx-group");
mode = request == null ? null : request.getHeader("tx-mode");
catch (Exception e)
//继续调用切面方法
return aspectBeforeService.around(groupId, point, mode);
封装事务信息,调用核心类执行。
com.codingapi.tx.aop.service.impl.AspectBeforeServiceImpl#around
@Override
public Object around(String groupId, ProceedingJoinPoint point, String mode) throws Throwable
MethodSignature signature = (MethodSignature) point.getSignature();
//获取接口执行方法
Method method = signature.getMethod();
Class<?> clazz = point.getTarget().getClass();
//获取方法参数
Object[] args = point.getArgs();
//获取具体执行的子类方法
Method thisMethod = clazz.getMethod(method.getName(), method.getParameterTypes());
//获取分布式事务注解
TxTransaction transaction = thisMethod.getAnnotation(TxTransaction.class);
TxTransactionLocal txTransactionLocal = TxTransactionLocal.current();
logger.debug("around--> groupId-> " +groupId+",txTransactionLocal->"+txTransactionLocal);
//封装事务方法
TransactionInvocation invocation = new TransactionInvocation(clazz, thisMethod.getName(), thisMethod.toString(), args, method.getParameterTypes());
//封装事务信息,包含注解信息,方法信息,事务组id。
TxTransactionInfo info = new TxTransactionInfo(transaction,txTransactionLocal,invocation,groupId);
try
//设置事务类型
info.setMode(TxTransactionMode.valueOf(mode));
catch (Exception e)
info.setMode(TxTransactionMode.TX_MODE_LCN);
//创建事务服务
TransactionServer server = transactionServerFactoryService.createTransactionServer(info);
//执行核心事务方法
return server.execute(point, info);
3)1)1):创建事务服务
com.codingapi.tx.aop.service.impl.TransactionServerFactoryServiceImpl
public TransactionServer createTransactionServer(TxTransactionInfo info) throws Throwable
if (!SocketManager.getInstance().isNetState())
//检查socket通讯是否正常 (第一次执行时启动txRunningTransactionServer的业务处理控制,然后嵌套调用其他事务的业务方法时都并到txInServiceTransactionServer业务处理下)
logger.warn("tx-manager not connected.");
return txDefaultTransactionServer;
/*********分布式事务处理逻辑***********/
logger.info("分布式事务处理逻辑...开始");
/** 事务发起方:仅当TxTransaction注解不为空,其他都为空时。表示分布式事务开始启动 **/
//注解不为空,且注解声明了为发起方,且事务组id为空,这就说明为事务发起方。
if (info.getTxTransaction() != null && info.getTxTransaction().isStart() && info.getTxTransactionLocal() == null && StringUtils.isEmpty(info.getTxGroupId()))
//检查socket通讯是否正常 (当启动事务的主业务方法执行完以后,再执行其他业务方法时将进入txInServiceTransactionServer业务处理)
if (SocketManager.getInstance().isNetState())
return txStartTransactionServer;
else
logger.warn("tx-manager not connected.");
return txDefaultTransactionServer;
/** 事务参与方:分布式事务已经开启,业务进行中 **/
logger.debug("事务参与方:分布式事务已经开启,业务进行中");
if (info.getTxTransactionLocal() != null || StringUtils.isNotEmpty(info.getTxGroupId()))
//检查socket通讯是否正常 (第一次执行时启动txRunningTransactionServer的业务处理控制,然后嵌套调用其他事务的业务方法时都并到txInServiceTransactionServer业务处理下)
if (SocketManager.getInstance().isNetState())
if (info.getTxTransactionLocal() != null)
return txDefaultTransactionServer;
else
/** 表示整个应用没有获取过DB连接 || 无事务业务的操作 **/
if (transactionControl.isNoTransactionOperation() || info.getTxTransaction().readOnly())
return txRunningNoTransactionServer;
else
return txRunningTransactionServer;
else
logger.warn("tx-manager not connected.");
return txDefaultTransactionServer;
/*********分布式事务处理逻辑*结束***********/
logger.debug("分布式事务处理逻辑*结束");
return txDefaultTransactionServer;
当出现网络不通时,会选择com.codingapi.tx.aop.service.impl.TxDefaultTransactionServerImpl,就是一个简单的调用。
@Override
public Object execute(ProceedingJoinPoint point, TxTransactionInfo info) throws Throwable
logger.info("默认事务管理器...");
return point.proceed();
com.codingapi.tx.aop.service.impl.TxDefaultTransactionServerImpl类继承TransactionServer,该类下有4个实现,分别针对发起方和调用方,默认以及无事务4种实现。
3)1)2):执行分布式事务
作为调用方,这里只看com.codingapi.tx.aop.service.impl.TxStartTransactionServerImpl#execute
-
构建事务组id
-
向tx-mange发起事务创建,对应图中这步。
-
封装事务上下文
-
执行业务方法,失败或成功通过state记录状态。
-
通知tx-m关闭事务组,进入事务提交第一阶段,tx-m会通知所有参与者提交。获取所有参与者执行后的最终结果。
- 对应如图
-
若task不为空,则唤醒阻塞任务,这里会执行本地db连接池commit,或者回滚。然后归还db连接。
-
发起方执行失败但是参与方成功,或者发起方成功,参与方失败。则通知tx-M记录补偿
public Object execute(ProceedingJoinPoint point,final TxTransactionInfo info) throws Throwable
//分布式事务开始执行
logger.info("事务发起方...");
logger.debug("--->分布式事务开始执行 begin start transaction");
final long start = System.currentTimeMillis();
//标记执行状态,0失败,1成功。
int state = 0;
//1:构建事务组id
final String groupId = TxCompensateLocal.current()==null?KidUtils.generateShortUuid():TxCompensateLocal.current().getGroupId();
//2:向tx-mange发起事务创建
logger.debug("创建事务组并发送消息");
txManagerService.createTransactionGroup(groupId);
//3:封装事务上下文(封装组id,事务超时时间,事务模式,是否发起方),并设置当前线程中。这里贯穿整个生命周期。
TxTransactionLocal txTransactionLocal = new TxTransactionLocal();
txTransactionLocal.setGroupId(groupId);
txTransactionLocal.setHasStart(true);
txTransactionLocal.setMaxTimeOut(Constants.txServer.getCompensateMaxWaitTime());
txTransactionLocal.setMode(info.getTxTransaction().mode());
txTransactionLocal.setReadOnly(info.getTxTransaction().readOnly());
TxTransactionLocal.setCurrent(txTransactionLocal);
try
//4:执行业务方法
Object obj = point.proceed();
//修改执行结果状态
state = 1;
return obj;
catch (Throwable e)
//5:回滚事务
state = rollbackException(info,e);
throw e;
finally
final String type = txTransactionLocal.getType();
//6:通知tx-m关闭事务组,进入事务提交第一阶段,tx-m会通知所有参与者提交。获取所有参与者执行后的最终结果。
int rs = txManagerService.closeTransactionGroup(groupId, state);
int lastState = rs==-1?0:state;
int executeConnectionError = 0;
//获取task,走到这里的时候,发起方若执行了回滚,这里为null。
final TxTask waitTask = TaskGroupManager.getInstance().getTask(groupId, type);
if(waitTask!=null)
//设置最终执行结果
waitTask.setState(lastState);
//唤醒阻塞任务,这里会执行本地db连接池commit,或者回滚。然后归还db连接。
waitTask.signalTask();
//自旋等待线程池删除任务。
while (!waitTask.isRemove())
try
Thread.sleep(1);
catch (InterruptedException e)
e.printStackTrace();
if(waitTask.getState()== TaskState.connectionError.getCode())
//本地执行失败.
executeConnectionError = 1;
lastState = 0;
final TxCompensateLocal compensateLocal = TxCompensateLocal.current();
if (compensateLocal == null)
long end = System.currentTimeMillis();
long time = end - start;
if ((executeConnectionError == 1&&rs == 1)||(lastState == 1 && rs == 0))
//发起方执行失败但是参与方成功,或者发起方成功,参与方失败。则通知tx-M记录补偿
txManagerService.sendCompensateMsg(groupId, time, info,executeConnectionError);
else
if(rs==1)
lastState = 1;
else
lastState = 0;
//清除事务上下文信息
TxTransactionLocal.setCurrent(null);
logger.debug("<---分布式事务 end start transaction");
logger.debug("start transaction over, res -> groupId:" + groupId + ", now state:" + (lastState == 1 ? "commit" : "rollback"));
3)1)3):获取线程连接
当执行业务方法的时,会通过封装db连接,达到控制commit或者rollback操作。
拦截获取数据库连接方法com.codingapi.tx.datasource.aspect.DataSourceAspect#around
@Around("execution(* javax.sql.DataSource.getConnection(..))")
public Connection around(ProceedingJoinPoint point)throws Throwable
logger.debug("getConnection-start---->");
//获取封装之后的线程池对象。
Connection connection = lcnConnection.getConnection(point);
logger.debug("connection-->"+connection);
logger.debug("getConnection-end---->");
return connection;
com.codingapi.tx.datasource.relational.LCNTransactionDataSource#getConnection
@Override
public Connection getConnection(ProceedingJoinPoint point) throws Throwable
//说明有db操作.
hasTransaction = true;
//设置db类型
initDbType();
//从缓存中获取db资源
Connection connection = (Connection)loadConnection();
//获取不到则新建
if(connection==null)
//没获取到利用spring生成db连接,并进行封装,加入缓存中。
connection = initLCNConnection((Connection) point.proceed());
if(connection==null)
throw new SQLException("connection was overload");
return connection;
else
return connection;
从缓存中获取db资源
protected ILCNResource loadConnection()
//从当前线程获取事务远程调用控制对象
TxTransactionLocal txTransactionLocal = TxTransactionLocal.浅谈分布式事务与TX-LCN