分布式事务详解,并带有lcn源码解析。

Posted Nuan_Feng

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式事务详解,并带有lcn源码解析。相关的知识,希望对你有一定的参考价值。

文章目录

1):为什么需要分布式事务?

假设用户购买商品时,会生成一个订单,修改库存,修改用户积分。

倘若单库的话,可通过开启事务方式,一起失败,或者一起成功。

但是当体量达到一个级别,不得不采用分库分表的方式,将其垂直拆分成多个库。比如用户中心库,库存中心库,订单模块库。

而服务也对应的进行了拆分,用户中心,库存中心,订单中心三个服务。修改数据,不得不跨服务访问库,这个时候,假设订单模块库成功了,库存中心库执行失败了,那么就要通知订单中心进行回滚操作。这就添加了复杂性。

随之而来出现了很多针对此场景的解决方案。

2):常见的解决方案如下?

分布式事务实现方案从类型上去分刚性事务、柔型事务。

刚性事务:通常无业务改造,强一致性,原生支持回滚/隔离性,低并发,适合短事务。一般出现在金融领域。

柔性事务:有业务改造,最终一致性,实现补偿接口,实现资源锁定接口,高并发,适合长事务。只追求数据最终结果的可采用。

刚性事务:XA 协议(2PC、JTA、JTS)、3PC

柔型事务:TCC/FMT、Saga(状态机模式、Aop模式)、本地事务消息、消息事务(半消息)

2)1):二阶段提交(2PC)

引入一个协调者,统一掌控所有参与者,并指示他们是否操作结果提交还是回滚
分为2个阶段
	投票阶段:参与则通知协调者,协调者反馈结果
	提交阶段:收到参与者反馈后,协调者再向参与者发出通知,根据反馈情况决定每个参与者到底还是提交还是回滚
例子:老大BOSS说要吃饭,统计下面员工意见,有员工ABC同意去,但D没回复,则BOSS依旧处于接受信息状态,ABC则阻塞.当统计了ABCD员工意见后,如果有一个员工不同意,则通知ABCD不去(回滚),然后ABCD收到后ACK BOSS,这个阶段如果BOSS没收到会一直阻塞
缺点
	执行过程中,所有节点处于阻塞,所有节点持有资源锁定状态,当有一个节点出现问题无法回复,则会一直阻塞,就需要更复杂的超时机制处理.

2)2):TXC逆向SQL

1. 拦截,并解析SQL
2. 查询受影响的行数,并记录值
   TXM通知事务提交,则删除记录
   若回滚则,将记录的值创建逆向SQL,然后执行

2)3):TCC(Try、Confirm、Cancel)

//尝试方法
function try()
    //记录日志
    todo save A 转出了 100 元 
    todo save B 转入了 100 元 
    //执行转账
    update amount set balacne = balacne-100 where id = 1
    update amount set balacne = balacne+100 where id = 2

//确认方法
function confirm()
    //清理日志
    clean save A 转出了 100 元 
    clean save B 转出了 100 元 


//取消方法
function cancle()
    //加载日志
    load log A
    load log B
//退钱
update amount set balacne = balacne+100 where id = 1
update amount set balacne = balacne-100 where id = 2    

每个服务需要实现三个方法,开发较重。但是针对db和redis的混合开发,有不错的支持。

2)4):增量日志

确保最终一致性,延迟取决于日志对比时间
模仿mysql主从复制,binLog,涉及修改添加到日志,然后间隔多长时间进行正向反向表数据比对.

2)5):补偿事务

在业务端执行业务逆向操作事务
	flag1=执行事务一返回结果
	if(flag1)
		flag2=执行事务二返回结果
		if(flag2)
			...执行事务...
		else
		回滚事务二
		
	else
		回滚事务一
	
缺点
	嵌套过多
	不同业务需要写不同的补偿事务
	不具备通用性
	没有考虑补偿事务失败

2)6):后置提交优化

一个事务,分别为执行和提交2阶段
执行比提交耗时长

改变事务执行与提交时序,变成事务先执行然后一起提交

执行事务一,提交.执行事务耗时200ms,提交2ms
执行事务二,提交.执行事务耗时200ms,提交2ms
执行事务三,提交.执行事务耗时200ms,提交2ms
当执行事务二~三之间出现异常,数据就不一致,时间范围为202+202ms

通过改变时序后:
执行事务一,执行事务耗时200ms
执行事务二,执行事务耗时200ms
执行事务三,执行事务耗时200ms
提交事务一,提交2ms
提交事务二,提交2ms
提交事务三,提交2ms
后置优化后,在提交事务二~三阶段才会出现不一致情况,时间耗时4ms

虽然不能根本解决数据一致问题,但通过缩小时间概率,降低不一致概率

缺点:
	串行执行事务提交后,连接就释放了,但后置提交优化后,所有库的连接,需要等待全部事务执行完才能释放,数据库连接占用时间加长,吞吐降低了

3):txlcn源码解析

执行时序图

3)1):事务发起方执行分布式事务流程

仔细思考,作为发起方在整个分布式事务中出现情况如下:

  1. 发起方执行成功(在这种情况下,所有参与方均执行成功,发起方才能成功。)
    1. 参与方第二阶段成功,提交。
    2. 参与方第二阶段失败,回滚。通知txm加入补偿日志。
    3. 自身提交or回滚失败,通知txm加入补偿日志。
  2. 发起方失败,自身回滚。通知tx-m执行失败,通知其他服务回滚。

当我们在方法上添加TxTransaction时,表示开启分布式事务。注意作为服务发起方需要标记isStart=true

这个时候会被aop环绕注解拦截。com.codingapi.tx.springcloud.interceptor.TransactionAspect#transactionRunning

@Around("@annotation(com.codingapi.tx.annotation.TxTransaction)")
public Object transactionRunning(ProceedingJoinPoint point) throws Throwable

    logger.debug("annotation-TransactionRunning-start---->");
    Object obj = txManagerInterceptor.around(point);
    logger.debug("annotation-TransactionRunning-end---->");
    return obj;

从请求头中获取事务组id和事务模式。

com.codingapi.tx.springcloud.interceptor.TxManagerInterceptor#around

public Object around(ProceedingJoinPoint point) throws Throwable 
    String groupId = null;
    String mode = null;
    try 
        //如果是发起方,这里都是null。
        //如果是调用方,则这里会有发起方通过feign发送过来的事务组id和事务模式。
        //事务组id:事务唯一标识。
        //事务模式:包括lcn,或者tcc等。
        RequestAttributes requestAttributes = RequestContextHolder.currentRequestAttributes();
        HttpServletRequest request = requestAttributes == null ? null : ((ServletRequestAttributes) requestAttributes).getRequest();
        groupId = request == null ? null : request.getHeader("tx-group");
        mode = request == null ? null : request.getHeader("tx-mode");
    catch (Exception e)
    //继续调用切面方法
    return aspectBeforeService.around(groupId, point, mode);

封装事务信息,调用核心类执行。

com.codingapi.tx.aop.service.impl.AspectBeforeServiceImpl#around

@Override
public Object around(String groupId, ProceedingJoinPoint point, String mode) throws Throwable 

    MethodSignature signature = (MethodSignature) point.getSignature();
    //获取接口执行方法
    Method method = signature.getMethod();
    Class<?> clazz = point.getTarget().getClass();
    //获取方法参数
    Object[] args = point.getArgs();
    //获取具体执行的子类方法
    Method thisMethod = clazz.getMethod(method.getName(), method.getParameterTypes());

    //获取分布式事务注解
    TxTransaction transaction = thisMethod.getAnnotation(TxTransaction.class);

    TxTransactionLocal txTransactionLocal = TxTransactionLocal.current();

    logger.debug("around--> groupId-> " +groupId+",txTransactionLocal->"+txTransactionLocal);
    //封装事务方法
    TransactionInvocation invocation = new TransactionInvocation(clazz, thisMethod.getName(), thisMethod.toString(), args, method.getParameterTypes());

    //封装事务信息,包含注解信息,方法信息,事务组id。
    TxTransactionInfo info = new TxTransactionInfo(transaction,txTransactionLocal,invocation,groupId);
    try 
        //设置事务类型
        info.setMode(TxTransactionMode.valueOf(mode));
     catch (Exception e) 
        info.setMode(TxTransactionMode.TX_MODE_LCN);
    
    //创建事务服务
    TransactionServer server = transactionServerFactoryService.createTransactionServer(info);
            
    //执行核心事务方法
    return server.execute(point, info);

3)1)1):创建事务服务

com.codingapi.tx.aop.service.impl.TransactionServerFactoryServiceImpl

public TransactionServer createTransactionServer(TxTransactionInfo info) throws Throwable 
    if (!SocketManager.getInstance().isNetState()) 
        //检查socket通讯是否正常 (第一次执行时启动txRunningTransactionServer的业务处理控制,然后嵌套调用其他事务的业务方法时都并到txInServiceTransactionServer业务处理下)
        logger.warn("tx-manager not connected.");
        return txDefaultTransactionServer;
    

    /*********分布式事务处理逻辑***********/
    logger.info("分布式事务处理逻辑...开始");

    /** 事务发起方:仅当TxTransaction注解不为空,其他都为空时。表示分布式事务开始启动 **/
    //注解不为空,且注解声明了为发起方,且事务组id为空,这就说明为事务发起方。
    if (info.getTxTransaction() != null && info.getTxTransaction().isStart() && info.getTxTransactionLocal() == null && StringUtils.isEmpty(info.getTxGroupId())) 
        //检查socket通讯是否正常 (当启动事务的主业务方法执行完以后,再执行其他业务方法时将进入txInServiceTransactionServer业务处理)
        if (SocketManager.getInstance().isNetState()) 
            return txStartTransactionServer;
         else 
            logger.warn("tx-manager not connected.");
            return txDefaultTransactionServer;
        
    

    /** 事务参与方:分布式事务已经开启,业务进行中 **/
    logger.debug("事务参与方:分布式事务已经开启,业务进行中");
    if (info.getTxTransactionLocal() != null || StringUtils.isNotEmpty(info.getTxGroupId())) 
        //检查socket通讯是否正常 (第一次执行时启动txRunningTransactionServer的业务处理控制,然后嵌套调用其他事务的业务方法时都并到txInServiceTransactionServer业务处理下)
        if (SocketManager.getInstance().isNetState()) 
            if (info.getTxTransactionLocal() != null) 
                return txDefaultTransactionServer;
             else 
                /** 表示整个应用没有获取过DB连接 || 无事务业务的操作 **/
                if (transactionControl.isNoTransactionOperation() || info.getTxTransaction().readOnly()) 
                    return txRunningNoTransactionServer;
                 else 
                    return txRunningTransactionServer;
                
            
         else 
            logger.warn("tx-manager not connected.");
            return txDefaultTransactionServer;
        
    
    /*********分布式事务处理逻辑*结束***********/
    logger.debug("分布式事务处理逻辑*结束");
    return txDefaultTransactionServer;

当出现网络不通时,会选择com.codingapi.tx.aop.service.impl.TxDefaultTransactionServerImpl,就是一个简单的调用。

@Override
public Object execute(ProceedingJoinPoint point, TxTransactionInfo info) throws Throwable 
    logger.info("默认事务管理器...");
    return point.proceed();

com.codingapi.tx.aop.service.impl.TxDefaultTransactionServerImpl类继承TransactionServer,该类下有4个实现,分别针对发起方和调用方,默认以及无事务4种实现。

3)1)2):执行分布式事务

作为调用方,这里只看com.codingapi.tx.aop.service.impl.TxStartTransactionServerImpl#execute

  1. 构建事务组id

  2. 向tx-mange发起事务创建,对应图中这步。

  1. 封装事务上下文

  2. 执行业务方法,失败或成功通过state记录状态。

  3. 通知tx-m关闭事务组,进入事务提交第一阶段,tx-m会通知所有参与者提交。获取所有参与者执行后的最终结果。

    1. 对应如图

  1. 若task不为空,则唤醒阻塞任务,这里会执行本地db连接池commit,或者回滚。然后归还db连接。

  2. 发起方执行失败但是参与方成功,或者发起方成功,参与方失败。则通知tx-M记录补偿

public Object execute(ProceedingJoinPoint point,final TxTransactionInfo info) throws Throwable 
    //分布式事务开始执行

    logger.info("事务发起方...");

    logger.debug("--->分布式事务开始执行 begin start transaction");

    final long start = System.currentTimeMillis();

    //标记执行状态,0失败,1成功。
    int state = 0;

    //1:构建事务组id
    final String groupId = TxCompensateLocal.current()==null?KidUtils.generateShortUuid():TxCompensateLocal.current().getGroupId();

    //2:向tx-mange发起事务创建
    logger.debug("创建事务组并发送消息");
    txManagerService.createTransactionGroup(groupId);

    //3:封装事务上下文(封装组id,事务超时时间,事务模式,是否发起方),并设置当前线程中。这里贯穿整个生命周期。
    TxTransactionLocal txTransactionLocal = new TxTransactionLocal();
    txTransactionLocal.setGroupId(groupId);
    txTransactionLocal.setHasStart(true);
    txTransactionLocal.setMaxTimeOut(Constants.txServer.getCompensateMaxWaitTime());
    txTransactionLocal.setMode(info.getTxTransaction().mode());
    txTransactionLocal.setReadOnly(info.getTxTransaction().readOnly());
    TxTransactionLocal.setCurrent(txTransactionLocal);

    try 
        //4:执行业务方法
        Object obj = point.proceed();
        //修改执行结果状态
        state = 1;
        return obj;
     catch (Throwable e) 
        //5:回滚事务
        state = rollbackException(info,e);
        throw e;
     finally 

        final String type = txTransactionLocal.getType();
        //6:通知tx-m关闭事务组,进入事务提交第一阶段,tx-m会通知所有参与者提交。获取所有参与者执行后的最终结果。
        int rs = txManagerService.closeTransactionGroup(groupId, state);

        int lastState = rs==-1?0:state;

        int executeConnectionError = 0;

        //获取task,走到这里的时候,发起方若执行了回滚,这里为null。
        final TxTask waitTask = TaskGroupManager.getInstance().getTask(groupId, type);
        if(waitTask!=null)
            //设置最终执行结果
            waitTask.setState(lastState);
            //唤醒阻塞任务,这里会执行本地db连接池commit,或者回滚。然后归还db连接。
            waitTask.signalTask();
            //自旋等待线程池删除任务。
            while (!waitTask.isRemove())
                try 
                    Thread.sleep(1);
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
            

            if(waitTask.getState()== TaskState.connectionError.getCode())
                //本地执行失败.
                executeConnectionError = 1;

                lastState = 0;
            
        

        final TxCompensateLocal compensateLocal =  TxCompensateLocal.current();

        if (compensateLocal == null) 
            long end = System.currentTimeMillis();
            long time = end - start;
            if ((executeConnectionError == 1&&rs == 1)||(lastState == 1 && rs == 0)) 
                //发起方执行失败但是参与方成功,或者发起方成功,参与方失败。则通知tx-M记录补偿
                txManagerService.sendCompensateMsg(groupId, time, info,executeConnectionError);
            
        else
            if(rs==1)
                lastState = 1;
            else
                lastState = 0;
            
        
        //清除事务上下文信息
        TxTransactionLocal.setCurrent(null);
        logger.debug("<---分布式事务 end start transaction");
        logger.debug("start transaction over, res -> groupId:" + groupId + ", now state:" + (lastState == 1 ? "commit" : "rollback"));

    

3)1)3):获取线程连接

当执行业务方法的时,会通过封装db连接,达到控制commit或者rollback操作。

拦截获取数据库连接方法com.codingapi.tx.datasource.aspect.DataSourceAspect#around

@Around("execution(* javax.sql.DataSource.getConnection(..))")
public Connection around(ProceedingJoinPoint point)throws Throwable

    logger.debug("getConnection-start---->");

    //获取封装之后的线程池对象。
    Connection connection = lcnConnection.getConnection(point);
    logger.debug("connection-->"+connection);

    logger.debug("getConnection-end---->");

    return connection;

com.codingapi.tx.datasource.relational.LCNTransactionDataSource#getConnection

@Override
public Connection getConnection(ProceedingJoinPoint point) throws Throwable 
    //说明有db操作.
    hasTransaction = true;

    //设置db类型
    initDbType();

    //从缓存中获取db资源
    Connection connection = (Connection)loadConnection();
    //获取不到则新建
    if(connection==null) 
        //没获取到利用spring生成db连接,并进行封装,加入缓存中。
        connection = initLCNConnection((Connection) point.proceed());
        if(connection==null)
            throw new SQLException("connection was overload");
        
        return connection;
    else 
        return connection;
    

从缓存中获取db资源

protected ILCNResource loadConnection()
    //从当前线程获取事务远程调用控制对象
    TxTransactionLocal txTransactionLocal = TxTransactionLocal.浅谈分布式事务与TX-LCN

90 SpringCloud 解决分布式事务--lcn解决分布式事务

分布式事务LCN原理

介绍一个分布式事务框架项目

分布式事务框架 --- LCN

分布式事务-Tx-lcn