Spring框架之事务处理源码分析

Posted 叮咚叮咚

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring框架之事务处理源码分析相关的知识,希望对你有一定的参考价值。

1、事务就是以可控的方式对数据资源(数据库,文件系统)进行访问的一组操作。为了保证事务执行前后,数据资源所承载的系统状态处于“正确”状态,事务本身有4个限定属性(ACID):原子性,一致性,隔离性,持久性。

      原子性:事务包含的全部操作是一个不可分割的整体,要么全部提交成功,要么全部失败。

      一致性:一致性要求事务所包含的操作不能违反数据资源的一致性检查。

      隔离性:各个事务之间相互影响的程度,不同的隔离级别决定了各个事物对该数据资源访问的不同行为。隔离性是面向数据资源的并发访问。

        四种隔离级别,由弱到强分别为Read Uncommitted,Read Commited,Repeatable Read,Serializable

           Read Uncommitted: 一个事务可以读取另一个事务没有提交的更新结果,以低的隔离度来寻求较高的性能。事务回滚后,另一个事务看到的就是脏数据。

      Read Committed:默认级别。

      Repeatable Read: 保证在整个事务过程中,对同一笔数据的读取结果是相同的。不管其他事务是否同时在对同一笔数据进行更新,也不管其他事务对同一笔数据更新与否。

      Serializable:最严格的隔离级别。所有事务都必须按顺序执行,可以避免所有问题,但是是性能最差的隔离级别,很少场景会使用。通常会使用其他隔离级别加上相应的并发锁的机制来控制对数据的访问,这样既保证了系统性能不会损失太大,也能够在一定程度上保证数据的一致性。Oracle只支持:Read Committed、Serializable。EJB、Spring、JDBC等数据库访问方式,都提供4种隔离级别,但是最终是否以指定的隔离执行,由底层的数据资源来决定。

      持久性:一旦事务操作成功提交,对数据所做的变更将被记载并不可逆转。

2、事务管理代码

        Connection connection = null;
        boolean roolback = false;
        DataSource dataSource=null;
        try {
            connection = dataSource.getConnection();
            connection.setAutoCommit(false);
            //jdbc 操作。。。。。。
            connection.commit();
        } catch (SQLException e) {
            roolback = true;
            e.printStackTrace();
        }finally {
            if(connection!=null){
                if (roolback){
                    connection.rollback();
                }
                connection.close();
            }
        }

 Hibernate API的事务管理代码

        try {
            session = sessionFactory.openSession();
            transaction = session.beginTransaction();
            //数据库访问
            session.flush();
            transaction.commit();
        }catch (HibernateException e){
            transaction.rollback();
        }finally {
            session.close();
        }

     

3、JDBC的局部事务控制(事务只涉及到一个数据源)是由同一个java.sql.Connection来完成的,所以要保证两个DAO的数据访问方法会处于一个事务中,我们就得保证它们使用的是同一个java.sql.Connection。

Spring 的事务狂阶设计理念的基本原则是:让事务的关注点和数据访问的关注点相分离。

  当在业务层使用事务的抽象API进行事务界定时,不需要关心事务将要加诸于上的事务资源是什么,对不同的事务资源的管理将由响应的框架实现类来操心。
  当在数据访问层对可能参与事务的数据资源进行访问的时候,只需要使用响应的数据访问API进行数据访问,而不需要关心当前事务资源如何参与事务或是是否需要参与事务。这同样将由事务框架类来打理。

import org.springframework.dao.DataAccessException;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
public class Service {
    private PlatformTransactionManager transactionManager;
    public  void serviceMethod(){
        TransactionDefinition definition=null;
        TransactionStatus txStatus = getTransactionManager().getTransaction(definition);
        try {
            //数据库操作  dao1.doDataAccess(); dao2.doDataAccess();
        }catch (DataAccessException e){
            getTransactionManager().rollback(txStatus);
            throw e;
        }catch (Exception e){
            getTransactionManager().rollback(txStatus);
            throw e;
        }
        getTransactionManager().commit(txStatus);
    }
    private PlatformTransactionManager getTransactionManager() {
        return  transactionManager;
    }
}

  

public interface PlatformTransactionManager {
    TransactionStatus getTransaction(TransactionDefinition var1) throws TransactionException;

    void commit(TransactionStatus var1) throws TransactionException;

    void rollback(TransactionStatus var1) throws TransactionException;
}

  

 

如何传递Connection?图19-2虽然也可以保证Dao使用同一个connection,但是导致事务管理代码和数据访问代码之间通过Connection直接耦合。现在是JDBC使用Connection,如果是Hibernate的话就要声明对Session的依赖了。

正确方法:要传递Connection,可以将整个事务对应的java.sql.Connection实例放到统一的地方去,无论是谁,要使用该资源,都从这一个地方获取。具体:我们在事务开始之前取得一个java.sql.Connection,然后将该Connection绑定到当前的调用线程,之后,数据访问对象在使用Connection时,就可以从当前线程上获取这个事务开始的时候绑定的Connection实例。当所有的数据访问对象全部使用这个绑定当前线程的Connection完成了数据访问工作时,我们就使用这个Connection实例提交或者回滚事务,然后解除它到当前线程的绑定。

原型代码(不用于生产环境):

public class TransactionResourceManager {
    private static ThreadLocal resources = new ThreadLocal();
    public static Object getResource(){
        return resources.get();
    }
    public static void bindResource(Object resource){
        resources.set(resource);
    }
    public static Object unbindResource(){
        Object res = getResource();
        resources.set(null);
        return res;
    }
}

 

public class MyPlatformTransactionManager implements PlatformTransactionManager {
    private DataSource dataSource;
    public MyPlatformTransactionManager(DataSource dataSource){
        this.dataSource = dataSource;
    }
    @Override
    public TransactionStatus getTransaction(TransactionDefinition transactionDefinition) throws TransactionException {
        Connection connection;
        try {
            connection=dataSource.getConnection();
            TransactionResourceManager.bindResource(connection);
            return new DefaultTransactionStatus(connection,true,true,false,true,null);
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return null;
    }

    @Override
    public void commit(TransactionStatus transactionStatus) throws TransactionException {
        Connection connection = (Connection) TransactionResourceManager.unbindResource();
        try {
            connection.commit();
        } catch (SQLException e) {
            e.printStackTrace();
        }finally {
            try {
                connection.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }

    @Override
    public void rollback(TransactionStatus transactionStatus) throws TransactionException {
        Connection connection = (Connection) TransactionResourceManager.unbindResource();

        try {
            connection.rollback();
        } catch (SQLException e) {
            e.printStackTrace();
        }finally {
            try {
                connection.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
}

 Spring 框架中的DataSourceUtils工具类的最主要的工作是对connection管理。DataSourceUtils会从TransactionResourceManager(类似TransactionResourceManager类)那里获取Connection资源。如果当前线程没有绑定任何connection,那么就通过数据库访问对象的DataSource引用获取新的connection,否则就必须使用绑定的connection,这就是为什么强调,当我们使用Spring提供的事务支持的时候,必须通过DataSourceUtils来获取连接。而jdbcTemplate等类内部已经使用DataSourceUtils来管理连接了,所以我们不用操心细节。

 

4、Spring的事务抽象包括3个主要接口:即PlatformTransactionManager、TransactionDefinition、TransactionStatus:

PlatformTransactionManager 负责界定事务边界

TransactionDefinition 负责定义事务相关属性,包括隔离级别、传播行为等。

PlatformTransactionManager 将参照TransactionDefinition 的属性定义来开启相关事务。事务开启之后到事务结束期间的事务状态由TranscationStatus负责,我们也可以通过TransactionStatus对事务进行有限的控制。

 

5、DataSourceTransactionManager 简单介绍:

   

AbstractPlatformTransactionManager:

  • 确定如果有现有的事务;
  • 应用适当的传播行为;
  • 如果有必要暂停和恢复事务;
  • 提交时检查rollback-only标记;
  • 应用适当的修改当回滚(实际回滚或设置rollback-only);
  • 触发同步回调注册(如果事务同步是激活的)

 

public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {     //doGetTransaction()方法是AbstractPlatformTransactionManager的抽象接口,实现AbstractPlatformTransactionManager抽象类的类实现该方法返回的对象类型并不相同        Object transaction = this.doGetTransaction();

1
    
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
     Object transaction = this.doGetTransaction();  //doGetTransaction()方法是抽象接口,实现AbstractPlatformTransactionManager抽象类的类实现该方法返回的对象类型并不相同  
     boolean debugEnabled = this.logger.isDebugEnabled();//获取Log类的debug信息,避免之后的代码重复
     //如果definition参数为空,则创建一个默认的事务定义数据
        if (definition == null) {
            definition = new DefaultTransactionDefinition();
        }
     //根据先前获取的transaction object 判断是否存在当前事务,根据判定结果采取不同的处理方式
        if (this.isExistingTransaction(transaction)) {
            return this.handleExistingTransaction((TransactionDefinition)definition, transaction, debugEnabled);(2)
        } else if (((TransactionDefinition)definition).getTimeout() < -1) {
            throw new InvalidTimeoutException("Invalid transaction timeout", ((TransactionDefinition)definition).getTimeout());
        } else if (((TransactionDefinition)definition).getPropagationBehavior() == 2) {
            throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation \'mandatory\'");
        } else if (((TransactionDefinition)definition).getPropagationBehavior() != 0 && ((TransactionDefinition)definition).getPropagationBehavior() != 3 && ((TransactionDefinition)definition).getPropagationBehavior() != 6) {
            if (((TransactionDefinition)definition).getIsolationLevel() != -1 && this.logger.isWarnEnabled()) {
                this.logger.warn("Custom isolation level specified but no actual transaction initiated; isolation level will effectively be ignored: " + definition);
            }

            boolean newSynchronization = this.getTransactionSynchronization() == 0;
            return this.prepareTransactionStatus((TransactionDefinition)definition, (Object)null, true, newSynchronization, debugEnabled, (Object)null);
        } else {
            AbstractPlatformTransactionManager.SuspendedResourcesHolder suspendedResources = this.suspend((Object)null);
            if (debugEnabled) {
                this.logger.debug("Creating new transaction with name [" + ((TransactionDefinition)definition).getName() + "]: " + definition);
            }

            try {
                boolean newSynchronization = this.getTransactionSynchronization() != 2;
                DefaultTransactionStatus status = this.newTransactionStatus((TransactionDefinition)definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                this.doBegin(transaction, (TransactionDefinition)definition);
                this.prepareSynchronization(status, (TransactionDefinition)definition);
                return status;
            } catch (RuntimeException var7) {
                this.resume((Object)null, suspendedResources);
                throw var7;
            } catch (Error var8) {
                this.resume((Object)null, suspendedResources);
                throw var8;
            }
        }
    }

  return this.handleExistingTransaction((TransactionDefinition)definition, transaction, debugEnabled);//  如果已经存在事务,则做的处理:

2    private TransactionStatus handleExistingTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException {
        if (definition.getPropagationBehavior() == 5) {//如果definition定义的传播行为是propagation_never,则抛出异常并退出
            throw new IllegalTransactionStateException("Existing transaction found for transaction marked with propagation \'never\'");
        } else {
            AbstractPlatformTransactionManager.SuspendedResourcesHolder suspendedResources;
            boolean newSynchronization;
            if (definition.getPropagationBehavior() == 4) {//如果definition定义的传播行为是PROPAGATION_NOT_SUPPORTED,则挂起当前事务然后返回
                if (debugEnabled) {
                    this.logger.debug("Suspending current transaction");
                }

                suspendedResources = this.suspend(transaction); (3)
                newSynchronization = this.getTransactionSynchronization() == 0;
                return this.prepareTransactionStatus(definition, (Object)null, false, newSynchronization, debugEnabled, suspendedResources);
            } else if (definition.getPropagationBehavior() == 3) {//如果definition定义的传播行为是PROPAGATION_REQUIRES_NEW,则挂起当前事务然后开启一个新的事务并返回  
        if (debugEnabled) {
this.logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]");
}
suspendedResources = this.suspend(transaction);//挂起当前事务

try {
newSynchronization = this.getTransactionSynchronization() != 2;
DefaultTransactionStatus status = this.newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
this.doBegin(transaction, definition);//开启新事务。dobegin方法首先会检查传入的transaction,以提取必要信息判断之前是否存在绑定的connection信息,
                                //如果没有,则从DataSouce中获取新的connection,然后将其AutoCommit状态改为false,并绑定到TransactionSynchronizationManage.
this.prepareSynchronization(status, definition);
return status;
} catch (RuntimeException var7) {//如果开始事务过程中出现异常,那么恢复之前挂起的事务
this.resumeAfterBeginException(transaction, suspendedResources, var7);
throw var7;
} catch (Error var8) {
this.resumeAfterBeginException(transaction, suspendedResources, var8);
throw var8;
}
} else {
boolean newSynchronization;
if (definition.getPropagationBehavior() == 6) {
if (!this.isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException("Transaction manager does not allow nested transactions by default - specify \'nestedTransactionAllowed\' property with value \'true\'");
} else {
if (debugEnabled) {
this.logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}

if (this.useSavepointForNestedTransaction()) {
DefaultTransactionStatus status = this.prepareTransactionStatus(definition, transaction, false, false, debugEnabled, (Object)null);
status.createAndHoldSavepoint();
return status;
} else {
newSynchronization = this.getTransactionSynchronization() != 2;
DefaultTransactionStatus status = this.newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, (Object)null);
this.doBegin(transaction, definition);
this.prepareSynchronization(status, definition);
return status;
}
}
} else {
if (debugEnabled) {
this.logger.debug("Participating in existing transaction");
}

if (this.isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != -1) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] specifies isolation level which is incompatible with existing transaction: " + (currentIsolationLevel != null ? isoConstants.toCode(currentIsolationLevel, "ISOLATION_") : "(unknown)"));
}
}

if (!definition.isReadOnly() && TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] is not marked as read-only but existing transaction is");
}
}

newSynchronization = this.getTransactionSynchronization() != 2;
return this.prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, (Object)null);
}
}
}
}

  

事务挂起:如果当前线程存在事务,但事务传播特性又要求开启新事务,需要将已有的事务进行挂起,事务的挂起及涉及线程与事务信息的保存,实现源码如下:

3、    protected final AbstractPlatformTransactionManager.SuspendedResourcesHolder suspend(Object transaction) throws TransactionException {
     //如果事务是激活的,且当前线程事务同步机制也是激活状态 if (TransactionSynchronizationManager.isSynchronizationActive()) {
      //挂起当前线程中所有同步的事务(4) List suspendedSynchronizations = this.doSuspendSynchronization(); try { Object suspendedResources = null; if (transaction != null) { suspendedResources = this.doSuspend(transaction); }           //在线程中保存与事务处理有关的信息,并将线程里有关的线程局部变量重置 String name = TransactionSynchronizationManager.getCurrentTransactionName(); TransactionSynchronizationManager.setCurrentTransactionName((String)null); boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly(); TransactionSynchronizationManager.setCurrentTransactionReadOnly(false); Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel((Integer)null); boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive(); TransactionSynchronizationManager.setActualTransactionActive(false);
//将当前线程中事务相关信息保存 return new AbstractPlatformTransactionManager.SuspendedResourcesHolder(suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive); } catch (RuntimeException var8) { this.doResumeSynchronization(suspendedSynchronizations); throw var8; } catch (Error var9) { this.doResumeSynchronization(suspendedSynchronizations); throw var9; }
     //如果事务是激活的,但是事务同步机制不是激活的,则只需要保存事务状态,不需要重置事务相关的线程局部变量 } else if (transaction != null) { Object suspendedResources = this.doSuspend(transaction); return new AbstractPlatformTransactionManager.SuspendedResourcesHolder(suspendedResources); } else {///事务和事务同步机制都不是激活的,则不要想处理 return null; } }

  

4    private List<TransactionSynchronization> doSuspendSynchronization() {
        List<TransactionSynchronization> suspendedSynchronizations = TransactionSynchronizationManager.getSynchronizations();
        Iterator var2 = suspendedSynchronizations.iterator();

        while(var2.hasNext()) {
            TransactionSynchronization synchronization = (TransactionSynchronization)var2.next();
            synchronization.suspend();//if (this.holderActive) { TransactionSynchronizationManager.unbindResource(this.resourceKey); }
        }

        TransactionSynchronizationManager.clearSynchronization();
        return suspendedSynchronizations;
    }

  

以上是关于Spring框架之事务处理源码分析的主要内容,如果未能解决你的问题,请参考以下文章

JAVAEE框架整合技术之spring03-SpringJdbcTemplate模板技术和事务处理

Spring源码剖析-事务源码之@Transactionl解析

11Spring源码-分析篇-事务源码分析

11Spring源码-分析篇-事务源码分析

Spring源码分析——AOP实现

Spring事务JDBC方式下的事务使用示例