JDBC与Spring事务及事务传播性原理解析-下篇

Posted _微风轻起

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JDBC与Spring事务及事务传播性原理解析-下篇相关的知识,希望对你有一定的参考价值。

上篇,在看这篇的时候,推荐先查看上篇。上篇我们主要介绍了JDBC的一些基本操作,以及Spring事务传播的一些概念,主要是从JDBC的角度来说的,这篇我们从Spring的角度来梳理下事务及其传播的使用以及源码的处理流程。

一、Spring不同事务传播demo

1、一些前置知识点说明

1)、Datasource

​ 我们在前面的demo中获取Connection是名称直接通过DriverManager.getConnection创建的,但对于这个JDBC也有抽象一个接口也就是Datasource

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OLer05ZF-1668333065095)(F:\\MarkDown-File\\知识点资料\\JDBC与Spring事务及事务传播性原理解析.assets\\image-20221113100132934.png)]

​ 同时通过这个Datasource,我们更常见的是线程池,例如Spring默认使用的线程池实现HikariDataSource

public static void main(String[] args) throws SQLException, ClassNotFoundException 
    HikariConfig config = new HikariConfig();
    config.setJdbcUrl("jdbc:mysql://127.0.0.1:3306/test");
    config.setUsername("root");
    config.setPassword("xxx");
    config.setDriverClassName("com.mysql.cj.jdbc.Driver");
    DataSource dataSource = new HikariDataSource(config);
    Connection connection = dataSource.getConnection();
    int insert = jdbcUtils.insert(connection);
    System.out.println("-----insert ---- " + insert);
    int i = delete(connection);
    System.out.println("--------delete ------" + i );

2)、事务的传播性

​ 我们上面说过,Spring事务的传播性是对我们demo的几种操作的抽象及具体的实现,下面我们就以PROPAGATION_REQUIRE_NEW来说明下

PROPAGATION_REQUIRE_NEW	若当前没有事务,则新建一个事务。若当前存在事务,则新建 一个事务,新老事务相互独立。外部事务抛出异常回滚不会影响内部事务的正常提交

​ 上面关于这个定义,我们从代码中来说:

​ 若当前存在事务,则新建 一个事务,新老事务相互独立。我们可以将其理解为,如果调用当前方法的上面的方法已经有了事务(已经有了一个Connectiom),我们就需要再新建一个事务来处理当前方法(获取一个新Connection来操作当前方法,同时也需要上一个事务的信息进行挂起,也就是暂时保存,例如上一个Connection,由于同时会有很多线程操作,所以这里一般会用到ThreadLocal),也就是到时如果当前方法发生异常需要回滚的话,就回滚当前方法的sql,而不影响其他调用当前方法的上面的方法的执行(这里其实与PROPAGATION_NESTED,有点类似,但他们两个不是一个概念,因为PROPAGATION_NESTED用到是同一个Connection,而这里用的是两个Connnection)。

2、Spring的事务demo

1)、Propagation.REQUIRES_NEW与Propagation.NEVER

@Transactional(propagation = Propagation.REQUIRED)
public void studentOrderAdd()

    //  Propagation.REQUIRES_NEW
    studentService.add();
    //  Propagation.NEVER
    insOrderService.add();

@Transactional(propagation = Propagation.REQUIRES_NEW)
public void add()

    Student student = new Student();
    student.setAge((byte) 30);
    student.setName("瑞环"+Math.random());
    studentMapper.insert(student);

@Transactional(propagation = Propagation.NEVER)
public int add()

    InsOrder insOrder = new InsOrder();
    insOrder.setName("订单"+Math.random());
    insOrder.setAge((int) Math.random()*100);
    return insOrderMapper.insert(insOrder);

​ 可以看到目前我们这个demo,在最外面我们是设置的Propagation.REQUIRED,也就是需要事务,如果没有就会建一个事务。然后对于studentService.add()我们设置的是Propagation.REQUIRES_NEW,也就是会新建一个事务,与studentOrderAdd的事务是分开的,互不影响。然后对于insOrderService.add()方法,我们设置的是Propagation.NEVER,也就是不需要事务,当它的上面由于studentOrderAdd()是已经创建了,所以其就会不执行。然后对于这个整体来说,我们就可以知道,最后Student会插入成功,然后Order就不会执行

2)、Propagation.REQUIRED与Propagation.NEVER

下面我们改一下demo,也就是将studentService.add(),改为Propagation.REQUIRED,也就是它会与上面发放用一个事务。

@Transactional(propagation = Propagation.REQUIRED)
public void studentOrderAdd()

    //  Propagation.REQUIRED
    studentService.add();
    //  Propagation.NEVER
    insOrderService.add();

​ 可以看到这次我们两张表都没有数据。

​ 上面我们是通过简单demo说明了下不同事务传播机制的区别,要具体了解几种事务传播的操作可以搜索其他博文,这方面的博文应该有很多。

三、Spring事务传播源码解析

​ 下面我们就来具体分析下Spring事务相关的源码

1、invokeWithinTransaction

​ 关于事务的入口逻辑是在TransactionInterceptor中处理。首先是通过方法获取其的事务描述,例如事务的传播类型、隔离级别:

​ 然后再获取其的事务管理器,因为我们的事务注解@Transactional是能指定对应的事务管理器的。

protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
      final InvocationCallback invocation) throws Throwable 

   // If the transaction attribute is null, the method is non-transactional.
   TransactionAttributeSource tas = getTransactionAttributeSource();
   final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
   final TransactionManager tm = determineTransactionManager(txAttr);
		//编程式事务、先不管
   PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
   final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

   if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) 
      // Standard transaction demarcation with getTransaction and commit/rollback calls.
      TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
      Object retVal;
      try 
         // This is an around advice: Invoke the next interceptor in the chain.
         // This will normally result in a target object being invoked.
         retVal = invocation.proceedWithInvocation();
      
      catch (Throwable ex) 
         // target invocation exception
         completeTransactionAfterThrowing(txInfo, ex);
         throw ex;
      
      finally 
         cleanupTransactionInfo(txInfo);
      
   	............
      commitTransactionAfterReturning(txInfo);
      return retVal;
   
   else 
      	..............
   

​ 这里就是Spring事务的核心处理逻辑了,获取到事务处理后,然后再获取到当期线程的事务信息TransactionInfo,再执行业务方法,通过invocation.proceedWithInvocation();,当执行业务方法失败的话,就通过completeTransactionAfterThrowing方法,不然的话,就通过commitTransactionAfterReturning,进行提交。

2、createTransactionIfNecessary

protected static final class TransactionInfo 

   @Nullable
   private final PlatformTransactionManager transactionManager;

   @Nullable
   private final TransactionAttribute transactionAttribute;

   private final String joinpointIdentification;

   @Nullable
   private TransactionStatus transactionStatus;

   @Nullable
   private TransactionInfo oldTransactionInfo;

​ 我们可以其有oldTransactionInfo,也就是当前事务方法的前面的事务方法,然后其他的transactionManagertransactionStatus是当前事务的信息记录。

protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
      @Nullable TransactionAttribute txAttr, final String joinpointIdentification) 

   // If no name specified, apply method identification as transaction name.
   if (txAttr != null && txAttr.getName() == null) 
      txAttr = new DelegatingTransactionAttribute(txAttr) 
         @Override
         public String getName() 
            return joinpointIdentification;
         
      ;
   

   TransactionStatus status = null;
   if (txAttr != null) 
      if (tm != null) 
         status = tm.getTransaction(txAttr);
      
      else 
         if (logger.isDebugEnabled()) 
            logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
                  "] because no transaction manager has been configured");
         
      
   
   return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);

​ 这里主要是通过tm.getTransaction(txAttr) 初始化创建、获取当前事务状态。

1)、getTransaction方法

public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
      throws TransactionException 

   // Use defaults if no transaction definition given.
   TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());

   Object transaction = doGetTransaction();
   boolean debugEnabled = logger.isDebugEnabled();

   if (isExistingTransaction(transaction)) 
      // Existing transaction found -> check propagation behavior to find out how to behave.
      return handleExistingTransaction(def, transaction, debugEnabled);
   

   // Check definition settings for new transaction.
   if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) 
      throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
   
		........

@Override
protected Object doGetTransaction() 
   DataSourceTransactionObject txObject = new DataSourceTransactionObject();
   txObject.setSavepointAllowed(isNestedTransactionAllowed());
   ConnectionHolder conHolder =
         (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
   txObject.setConnectionHolder(conHolder, false);
   return txObject;

​ 这里就是获取当前事务状态信息,对于doGetTransaction()方法,获取DataSourceTransactionObject,也就是当前的ConnectionHolder,也就是JDBC中的Connection获取当前线程的Connection。如果获取不到,也就是当前线程还没有事务,conHolder就会为空。

public class ConnectionHolder extends ResourceHolderSupport 

   @Nullable
   private ConnectionHandle connectionHandle;

   @Nullable
   private Connection currentConnection;

    public ConnectionHolder(ConnectionHandle connectionHandle) 
		Assert.notNull(connectionHandle, "ConnectionHandle must not be null");
		this.connectionHandle = connectionHandle;
	
private static final ThreadLocal<Map<Object, Object>> resources =
			new NamedThreadLocal<>("Transactional resources");
.............
private static Object doGetResource(Object actualKey) 
   Map<Object, Object> map = resources.get();
   if (map == null) 
      return null;
   
   Object value = map.get(actualKey);
	........
   return value;

​ 获取到这个后,再是通过isExistingTransaction(transaction),判断当前线程是不是已经有了事务。

@Override
protected boolean isExistingTransaction(Object transaction) 
   DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
   return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());

​ 这个方法主要就是的判断依据就是ConnectionHolder,看ConnectionHolder有没有,以及是否被激活

public boolean hasConnectionHolder() 
   return (this.connectionHolder != null);

​ 如果有并被激活,我们的代码逻辑就是走的return handleExistingTransaction(def, transaction, debugEnabled);处理。

​ 如果还没有事务,就是对几种不同传播机制进行处理:

if (isExistingTransaction(transaction)) 
   // Existing transaction found -> check propagation behavior to find out how to behave.
   return handleExistingTransaction(def, transaction, debugEnabled);

// Check definition settings for new transaction.
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) 
   throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());


// No existing transaction found -> check propagation behavior to find out how to proceed.
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) 
   throw new IllegalTransactionStateException(
         "No existing transaction found for transaction marked with propagation 'mandatory'");

else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
      def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
      def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) 
   SuspendedResourcesHolder suspendedResources = suspend(null);
   try 
      return startTransaction(def, transaction, debugEnabled, suspendedResources);
   
   catch (RuntimeException | Error ex) 
      resume(null, suspendedResources);
      throw ex;
   


   boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
   return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);

​ 如果是PROPAGATION_MANDATORY,也就是当前事务方法一定也运行在已经有事务的环境下,不然就抛出异常No existing transaction found for transaction marked with propagation 'mandatory'

​ 而另外的几种TransactionDefinition.PROPAGATION_REQUIREDPROPAGATION_REQUIRES_NEWPROPAGATION_NESTED,其都是自己本身如果不满足要求,能自己直接创建事务。我们可以看到如果走的是这个逻辑分支,然后会调用suspend方法,这个方法主要是挂起当前线程中本身已存在的事务信息。当我们注意。

2)、suspend方法

protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException 
   if (TransactionSynchronizationManager.isSynchronizationActive()) 
      List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
      try 
         Object suspendedResources = null;
         if (transaction != null) 
            suspendedResources = doSuspend(transaction);
         
          //下面主要是将当前线程的事务描叙内容设置为null,也就是还没有激活状态
         String name = TransactionSynchronizationManager.getCurrentTransactionName();
         TransactionSynchronizationManager.setCurrentTransactionName(null);
         boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
         TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
         Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
         TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
         boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
         TransactionSynchronizationManager.setActualTransactionActive(false);
         return new SuspendedResourcesHolder(
               suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
      
      catch (RuntimeException | Error ex) 
         // doSuspend failed - original transaction is still active...
         doResumeSynchronization(suspendedSynchronizations);
         throw ex;
      
   
   else if (transaction != null) 
      // Transaction active but no synchronization active.
      Object suspendedResources = doSuspend(transaction);
      return new SuspendedResourcesHolder(suspendedResources);
   
   else 
      // Neither transaction nor synchronization active.
      return null;
   

​ 但我们注意,我们目前还没有事务,也就是TransactionSynchronizationManager.isSynchronizationActive()transaction != null都是不满足的,所以我们这里返回的是null。同时这个方法的主要是doSuspendSynchronization(),这个主要是获取到当前线程已经存在的事务描叙信息,例如当前事务方法前面事务方法的描述。同时在调用suspend是,如果已经有事务,其会通过doSuspend(transaction),来处理挂起逻辑:

protected Object doSuspend(Object transaction) 
   DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
   txObject.setConnectionHolder(null);
   return Tran

以上是关于JDBC与Spring事务及事务传播性原理解析-下篇的主要内容,如果未能解决你的问题,请参考以下文章

JDBC与Spring事务及事务传播性原理解析-下篇

JDBC与Spring事务及事务传播性原理解析-下篇

理解 spring 事务传播行为与数据隔离级别

关于spring事务的传播性这篇文章解析的非常清楚了,建议是先收藏再看!

事务及Spring事务传播机制

事务及Spring事务传播机制