spring多数据源事务不生效解决方法及源码分析

Posted 我是程序汪

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spring多数据源事务不生效解决方法及源码分析相关的知识,希望对你有一定的参考价值。

上一章我们已经讲解了spring的读写分离,

但是如果加入事务的时候切换数据源就失败了,搞了半天居然换来这种结果。。。。

这边文章就会带着大家深入了解问什么加入事务之后切换数据源会失败及解决办法。下面的文章会将一些源码,这些鬼东西读起来很烦,但是我们已经走到这步了只能硬着头皮硬钢!!!spring多数据源事务不生效解决方法及源码分析

@Transactional
  public void all(){
    TestService currentclass= (TestService ) AopContext.currentProxy();
    String title = currentclass.getLiveTitle();
    String nickName = currentclass.getNickName();
    System.out.println(title+"---"+nickName);
  }

  @Transactional(rollbackFor = Exception.class)
  public void insert(){
    TestService currentclass= (TestService ) AopContext.currentProxy();
    currentclass.insertLive();
    currentclass.insertUser();
  }

上述的代码其实会出问题的,什么问题呢?因为用了事务所以切换数据源就不生效了,这里关于**==spring事务的原理源码的分析会在别的章节详细的讲解==**,这里只会讲解mybatis为啥在事务的情况下不能切换数据源了。

mybaits 在执行的时候首先就会执行下面的代码

private class SqlSessionInterceptor implements InvocationHandler {
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
      SqlSession sqlSession = getSqlSession(
          SqlSessionTemplate.this.sqlSessionFactory,
          SqlSessionTemplate.this.executorType,
          SqlSessionTemplate.this.exceptionTranslator);
      try {
        Object result = method.invoke(sqlSession, args);
        if (!isSqlSessionTransactional(sqlSession, SqlSessionTemplate.this.sqlSessionFactory)) {
          // force commit even on non-dirty sessions because some databases require
          // a commit/rollback before calling close()
          sqlSession.commit(true);
        }
        return result;
      } catch (Throwable t) {
        Throwable unwrapped = unwrapThrowable(t);
        if (SqlSessionTemplate.this.exceptionTranslator != null && unwrapped instanceof PersistenceException) {
          // release the connection to avoid a deadlock if the translator is no loaded. See issue #22
          closeSqlSession(sqlSession, SqlSessionTemplate.this.sqlSessionFactory);
          sqlSession = null;
          Throwable translated = SqlSessionTemplate.this.exceptionTranslator.translateExceptionIfPossible((PersistenceException) unwrapped);
          if (translated != null) {
            unwrapped = translated;
          }
        }
        throw unwrapped;
      } finally {
        if (sqlSession != null) {
          closeSqlSession(sqlSession, SqlSessionTemplate.this.sqlSessionFactory);
        }
      }
    }
  }

看代码就可以看见,首先是获取SqlSession 之后动态代理执行真正的sql语句,这里我们就要看看这个SqlSession是怎么获取的,了解每块代码的具体含义。之后带大家看一下真正运行起来的差异。

public static SqlSession getSqlSession(SqlSessionFactory sessionFactory, ExecutorType executorType,
      PersistenceExceptionTranslator exceptionTranslator) {

    notNull(sessionFactory, NO_SQL_SESSION_FACTORY_SPECIFIED);
    notNull(executorType, NO_EXECUTOR_TYPE_SPECIFIED);

   //如果开启了事务的话 SqlSessionHolder 会从事务管理起的ThreadLocal中取出 具体代码如下
    SqlSessionHolder holder = (SqlSessionHolder) TransactionSynchronizationManager.getResource(sessionFactory);


   // 从SqlSessionHolder 取出SqlSession
    SqlSession session = sessionHolder(executorType, holder);
    if (session != null) {
      return session;
    }

    LOGGER.debug(() -> "Creating a new SqlSession");
    //上面的获取不到SqlSession 自己创建一个SqlSession
    session = sessionFactory.openSession(executorType);


  // 如果当前线程的事务同步处于活动状态 则新建一个SqlSessionHolder并将其放入到,所以第二次执行的时候我们的SessionFactory都是相同的,所以可以直接在上面从TransactionSynchronizationManager.getResource()取出SqlSessionHolder,然后取出SqlSession,所以加了事务之后SqlSession就永远不会再变了
  TransactionSynchronizationManager的resouse 中
    registerSessionHolder(sessionFactory, executorType, exceptionTranslator, session);

    return session;
  }

getSqlSession代码每个部分的详细解释TransactionSynchronizationManager的resource resource就是一个ThreadLocal

public abstract class TransactionSynchronizationManager {

    private static final Log logger = LogFactory.getLog(TransactionSynchronizationManager.class);

    private static final ThreadLocal<Map<Object, Object>> resources =
            new NamedThreadLocal<>("Transactional resources");

sessionHolder 从SqlSessionHolder中获取SqlSession

private static SqlSession sessionHolder(ExecutorType executorType, SqlSessionHolder holder) {
    SqlSession session = null;
    // holder不为空且开启了事务 才会从holder中获取SqlSession
    if (holder != null && holder.isSynchronizedWithTransaction()) {
      if (holder.getExecutorType() != executorType) {
        throw new TransientDataAccessResourceException(
            "Cannot change the ExecutorType when there is an existing transaction");
      }

      holder.requested();

      LOGGER.debug(() -> "Fetched SqlSession [" + holder.getSqlSession() + "] from current transaction");
      session = holder.getSqlSession();
    }
    return session;
  }

看见上述的代码我们不难发现:holder不为空且开启了事务 才会从holder中获取SqlSession

当从SqlSessionHolder获取的SqlSession为空的时候,就会创建SqlSession,这里调用openSession的实现类是DefaultSqlSessionFactory

@Override
  public SqlSession openSession(ExecutorType execType) {
    return openSessionFromDataSource(execType, null, false);
  }
private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level, boolean autoCommit) {
    Transaction tx = null;
    try {
      final Environment environment = configuration.getEnvironment();
      final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);
      tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit);
      //这里的Transaction是 执行器的一个属性哦
      final Executor executor = configuration.newExecutor(tx, execType);
      return new DefaultSqlSession(configuration, executor, autoCommit);
    } catch (Exception e) {
      closeTransaction(tx); // may have fetched a connection so lets call close()
      throw ExceptionFactory.wrapException("Error opening session.  Cause: " + e, e);
    } finally {
      ErrorContext.instance().reset();
    }
  }

在上数代码中创建了SqlSession,这里主要为SqlSession 创建一些环境属性Environment,执行器Executor,和Transaction 事务,这里主要主要讲的是Transaction,这里的具体的实现类是SpringManagedTransaction,Transaction是Executor的一个属性,Executor是SqlSession的一个属性。总结每个SQLSession都有自己的一个Executor,每个Executor都有自己的一个Transaction,记住这个这个很有用的。

public class SpringManagedTransaction implements Transaction {

  private static final Logger LOGGER = LoggerFactory.getLogger(SpringManagedTransaction.class);

  private final DataSource dataSource;

  private Connection connection;

这里要重点看一下他有两个属性一个是DataSource,Connection,就因为这个这两个属性才不能切换数据源的spring多数据源事务不生效解决方法及源码分析

之后就是如果开启了事务的话就会把SqlSessionHolder放入到TransactionSynchronizationManager的resource中

private static void registerSessionHolder(SqlSessionFactory sessionFactory, ExecutorType executorType,
      PersistenceExceptionTranslator exceptionTranslator, SqlSession session) {
    SqlSessionHolder holder;
    //判断是否开启了事务
    if (TransactionSynchronizationManager.isSynchronizationActive()) {
      Environment environment = sessionFactory.getConfiguration().getEnvironment();

      if (environment.getTransactionFactory() instanceof SpringManagedTransactionFactory) {
        LOGGER.debug(() -> "Registering transaction synchronization for SqlSession [" + session + "]");

       //创建新的SqlSessionHolder
        holder = new SqlSessionHolder(session, executorType, exceptionTranslator);
        // 将新创建的SqlSessionHolder放入TransactionSynchronizationManager的resource中
        TransactionSynchronizationManager.bindResource(sessionFactory, holder);
        TransactionSynchronizationManager
            .registerSynchronization(new SqlSessionSynchronization(holder, sessionFactory));
        holder.setSynchronizedWithTransaction(true);
        holder.requested();
      } else {
        if (TransactionSynchronizationManager.getResource(environment.getDataSource()) == null) {
          LOGGER.debug(() -> "SqlSession [" + session
              + "] was not registered for synchronization because DataSource is not transactional");
        } else {
          throw new TransientDataAccessResourceException(
              "SqlSessionFactory must be using a SpringManagedTransactionFactory in order to use Spring transaction synchronization");
        }
      }
    } else {
      LOGGER.debug(() -> "SqlSession [" + session
          + "] was not registered for synchronization because synchronization is not active");
    }

  }

接下来Object result = method.invoke(sqlSession, args);这个动态代理真正执行了什么方法呢?这里不会对mybatis具体的执行流程做过多的解释,只会列出简单一些关键方法(以后会出mybatis的源码分析 哈哈哈哈哈,为啥现在不出呢!因为我还不怎么会,没看过这块的源码呢哈哈哈哈)

spring多数据源事务不生效解决方法及源码分析

基本上都会执行SimpleExecutor中的方法比如查询

 @Override
  public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
    Statement stmt = null;
    try {
      Configuration configuration = ms.getConfiguration();
      StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);
      stmt = prepareStatement(handler, ms.getStatementLog());
      return handler.query(stmt, resultHandler);
    } finally {
      closeStatement(stmt);
    }
  }

prepareStatement方法中就有getConnection获取连接信息

 private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
    Statement stmt;
    Connection connection = getConnection(statementLog);
    stmt = handler.prepare(connection, transaction.getTimeout());
    handler.parameterize(stmt);
    return stmt;
  }

getConnections分析

protected Connection getConnection(Log statementLog) throws SQLException {
    Connection connection = transaction.getConnection();
    if (statementLog.isDebugEnabled()) {
      return ConnectionLogger.newInstance(connection, statementLog, queryStack);
    } else {
      return connection;
    }
  }

SpringManagedTransaction中的getConnection

@Override
  public Connection getConnection() throws SQLException {
  //加了事务之后,第二次执行的时候这个collection就不会为空了
    if (this.connection == null) {
      openConnection();
    }
    return this.connection;
  }

到了这里基本上加了事务为什么不能切换数据源大家基本上也就了解了。因为我上面说过一个SQLSession有自己的执行器,执行器有自己的事务,因为加了事务之后每次执行都是同一个SQLSession,所以SpringManagedTransaction都是相同的,第一次可能会执行openConnection,但是第二次执行的这个connection就不为空了,所以切换不了数据源了

总结一下流程(实际断点的流程):SqlSessionInterceptor的invoke----->getSqlSession----->                                                   第一次获取session请求流程图 看里面的注释

public static SqlSession getSqlSession(SqlSessionFactory sessionFactory, ExecutorType executorType,
      PersistenceExceptionTranslator exceptionTranslator) {

    notNull(sessionFactory, NO_SQL_SESSION_FACTORY_SPECIFIED);
    notNull(executorType, NO_EXECUTOR_TYPE_SPECIFIED);


    //第一次请求  holder == null
    SqlSessionHolder holder = (SqlSessionHolder) TransactionSynchronizationManager.getResource(sessionFactory);

    // 第一次请求session ==null
    SqlSession session = sessionHolder(executorType, holder);
    if (session != null) {
      return session;
    }

    LOGGER.debug(() -> "Creating a new SqlSession");
    // 新建 session
    session = sessionFactory.openSession(executorType);
    // sessionHolder 放入TransactionSynchronizationManager的resource中
    registerSessionHolder(sessionFactory, executorType, exceptionTranslator, session);

    return session;
  }

第一次获取collection流程图

@Override
  public Connection getConnection() throws SQLException {
   //this.collection == null 所以新建 collection
    if (this.connection == null) {
      openConnection();
    }
    return this.connection;
  }

切换数据源 第二次请求

public static SqlSession getSqlSession(SqlSessionFactory sessionFactory, ExecutorType executorType,
      PersistenceExceptionTranslator exceptionTranslator) {

    notNull(sessionFactory, NO_SQL_SESSION_FACTORY_SPECIFIED);
    notNull(executorType, NO_EXECUTOR_TYPE_SPECIFIED);


    //第二次请求  holder != null 从TransactionSynchronizationManager取到了sessionFactory 因为sessionFactory都是一样的
    SqlSessionHolder holder = (SqlSessionHolder) TransactionSynchronizationManager.getResource(sessionFactory);

    // 第二次请求session !=null
    SqlSession session = sessionHolder(executorType, holder);
    if (session != null) {
    //第二次请求直接在这里返回
      return session;
    }


    LOGGER.debug(() -> "Creating a new SqlSession");
    session = sessionFactory.openSession(executorType);
    registerSessionHolder(sessionFactory, executorType, exceptionTranslator, session);

    return session;
  }
  
  
  @Override
  public Connection getConnection() throws SQLException {
   //第二次请求 这里是相同SqlSession 所以对应的事务SpringManagedTransaction也是相同,所以collection不为空,直接返回。
    if (this.connection == null) {
      openConnection();
    }
    return this.connection;
  }

看完上述的流程大家基本上也就理解为什么加了事务不能切换数据源,总的来说就是使用事务之后SqlSession都是相同的了,所以collection也是相同所以不能切换数据源了。

既然原因我们已经找到了那怎么解决呢?

是不是我们每次使用不同的SqlSession问题就解决了哈哈哈,那怎么取到不同的SqlSession呢?SqlSession使用过SqlSessionFactory获取的那我们是不是每个数据源都可以动态切换SqlSessionFactory就可以了。

马上就要看见胜利的曙光了

spring多数据源事务不生效解决方法及源码分析

这里我们就不使用动态的SqlSessionFactory,自定义两个SqlSessionFactory 代码如下

//  @Bean
//  SqlSessionFactory sqlSessionFactory(@Qualifier("dataSource") RoutingDataSource dataSource) {
//    SqlSessionFactory sessionFactory = null;
//    try {
//      SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
//      bean.setDataSource(dataSource);
//      sessionFactory = bean.getObject();
//    } catch (Exception e) {
//      e.printStackTrace();
//    }
//    return sessionFactory;
//  }

  @Primary
  @Bean("masterSqlSessionFactory")
  SqlSessionFactory masterSqlSessionFactory(@Qualifier("master") DataSource dataSource) {
    SqlSessionFactory sessionFactory = null;
    try {
      SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
      bean.setDataSource(dataSource);
      bean.setTypeAliasesPackage("com.dongtai.datasource.mapper.master");
      bean.setMapperLocations(
          new PathMatchingResourcePatternResolver().getResources("classpath:mybatis/master/*.xml"));
      sessionFactory = bean.getObject();
    } catch (Exception e) {
      e.printStackTrace();
    }
    return sessionFactory;
  }
  @Bean("slaveSqlSessionFactory")
  SqlSessionFactory slaveSqlSessionFactory(@Qualifier("slave") DataSource dataSource) {
    SqlSessionFactory sessionFactory = null;
    try {
      SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
      bean.setDataSource(dataSource);
      bean.setTypeAliasesPackage("com.dongtai.datasource.mapper.slave");
      bean.setMapperLocations(
          new PathMatchingResourcePatternResolver().getResources("classpath:mybatis/slave/*.xml"));
      sessionFactory = bean.getObject();
    } catch (Exception e) {
      e.printStackTrace();
    }
    return sessionFactory;
  }
  @Bean("sqlSessionTemplate")
  public RouteSqlSessionTemplate routeSqlSessionTemplate(@Qualifier("masterSqlSessionFactory") SqlSessionFactory masterSqlSessionFactory,@Qualifier("slaveSqlSessionFactory") SqlSessionFactory slaveSqlSessionFactory){
    Map<String,SqlSessionFactory> sqlSessionFactoryMap = new HashMap<>();
    sqlSessionFactoryMap.put("master",masterSqlSessionFactory);
    sqlSessionFactoryMap.put("slave",slaveSqlSessionFactory);
    RouteSqlSessionTemplate customSqlSessionTemplate = new RouteSqlSessionTemplate(masterSqlSessionFactory);
    customSqlSessionTemplate.setTargetFactorys(sqlSessionFactoryMap);
    return customSqlSessionTemplate;
  }
public abstract class AbstractRoutingSqlSessionTemplate extends SqlSessionTemplate {

  private final ExecutorType executorType;
  private final SqlSession sqlSessionProxy;
  private final PersistenceExceptionTranslator exceptionTranslator;
  protected Map<String,SqlSessionFactory> targetFactorys=new HashMap<>();
  public AbstractRoutingSqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
    this(sqlSessionFactory, sqlSessionFactory.getConfiguration().getDefaultExecutorType());
  }

  public AbstractRoutingSqlSessionTemplate(SqlSessionFactory sqlSessionFactory, ExecutorType executorType) {
    this(sqlSessionFactory, executorType, new MyBatisExceptionTranslator(sqlSessionFactory.getConfiguration()
        .getEnvironment().getDataSource(), true));
  }

  public AbstractRoutingSqlSessionTemplate(SqlSessionFactory sqlSessionFactory, ExecutorType executorType,
      PersistenceExceptionTranslator exceptionTranslator) {
    super(sqlSessionFactory, executorType, exceptionTranslator);
    this.executorType = executorType;
    this.exceptionTranslator = exceptionTranslator;
    this.sqlSessionProxy = (SqlSession) newProxyInstance(
        SqlSessionFactory.class.getClassLoader(),
        new Class[] { SqlSession.class },
        new SqlSessionInterceptor());
  }
  @Override
  public abstract SqlSessionFactory getSqlSessionFactory();

  @Override
  public Configuration getConfiguration() {
    return getSqlSessionFactory().getConfiguration();
  }
  @Override
  public ExecutorType getExecutorType() {
    return executorType;
  }
  @Override
  public PersistenceExceptionTranslator getPersistenceExceptionTranslator() {
    return exceptionTranslator;
  }
  @Override
  public <T> T selectOne(String statement) {
    return sqlSessionProxy.<T> selectOne(statement);
  }
  @Override
  public <T> T selectOne(String statement, Object parameter) {
    return sqlSessionProxy.<T> selectOne(statement, parameter);
  }
  @Override
  public <K, V> Map<K, V> selectMap(String statement, String mapKey) {
    return sqlSessionProxy.<K, V> selectMap(statement, mapKey);
  }
  @Override
  public <K, V> Map<K, V> selectMap(String statement, Object parameter, String mapKey) {
    return sqlSessionProxy.<K, V> selectMap(statement, parameter, mapKey);
  }

  @Override
  public <K, V> Map<K, V> selectMap(String statement, Object parameter, String mapKey, RowBounds rowBounds) {
    return sqlSessionProxy.<K, V> selectMap(statement, parameter, mapKey, rowBounds);
  }

  @Override
  public <E> List<E> selectList(String statement) {
    return sqlSessionProxy.<E> selectList(statement);
  }

  @Override
  public <E> List<E> selectList(String statement, Object parameter) {
    return sqlSessionProxy.<E> selectList(statement, parameter);
  }

  @Override
  public <E> List<E> selectList(String statement, Object parameter, RowBounds rowBounds) {
    return sqlSessionProxy.<E> selectList(statement, parameter, rowBounds);
  }

  @Override
  @SuppressWarnings("rawtypes")
  public void select(String statement, ResultHandler handler) {
    sqlSessionProxy.select(statement, handler);
  }

  @Override
  @SuppressWarnings("rawtypes")
  public void select(String statement, Object parameter, ResultHandler handler) {
    sqlSessionProxy.select(statement, parameter, handler);
  }

  @Override
  @SuppressWarnings("rawtypes")
  public void select(String statement, Object parameter, RowBounds rowBounds, ResultHandler handler) {
    sqlSessionProxy.select(statement, parameter, rowBounds, handler);
  }

  @Override
  public int insert(String statement) {
    return sqlSessionProxy.insert(statement);
  }

  @Override
  public int insert(String statement, Object parameter) {
    return sqlSessionProxy.insert(statement, parameter);
  }

  @Override
  public int update(String statement) {
    return sqlSessionProxy.update(statement);
  }

  @Override
  public int update(String statement, Object parameter) {
    return sqlSessionProxy.update(statement, parameter);
  }
  @Override
  public int delete(String statement) {
    return sqlSessionProxy.delete(statement);
  }
  @Override
  public int delete(String statement, Object parameter) {
    return sqlSessionProxy.delete(statement, parameter);
  }
  @Override
  public <T> T getMapper(Class<T> type) {
    return getConfiguration().getMapper(type, this);
  }
  @Override
  public void commit() {
    throw new UnsupportedOperationException("Manual commit is not allowed over a Spring managed SqlSession");
  }
  @Override
  public void commit(boolean force) {
    throw new UnsupportedOperationException("Manual commit is not allowed over a Spring managed SqlSession");
  }
  @Override
  public void rollback() {
    throw new UnsupportedOperationException("Manual rollback is not allowed over a Spring managed SqlSession");
  }
  @Override
  public void rollback(boolean force) {
    throw new UnsupportedOperationException("Manual rollback is not allowed over a Spring managed SqlSession");
  }
  @Override
  public void close() {
    throw new UnsupportedOperationException("Manual close is not allowed over a Spring managed SqlSession");
  }
  @Override
  public void clearCache() {
    sqlSessionProxy.clearCache();
  }
  @Override
  public Connection getConnection() {
    return sqlSessionProxy.getConnection();
  }
  @Override
  public List<BatchResult> flushStatements() {
    return sqlSessionProxy.flushStatements();
  }

  /**
   * Proxy needed to route MyBatis method calls to the proper SqlSession got from Spring's Transaction Manager It also
   * unwraps exceptions thrown by {@code Method#invoke(Object, Object...)} to pass a {@code PersistenceException} to
   * the {@code PersistenceExceptionTranslator}.
   */
  private class SqlSessionInterceptor implements InvocationHandler {
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
      final SqlSession sqlSession = getSqlSession(
          getSqlSessionFactory(),
          executorType,
          exceptionTranslator);
      try {
        Object result = method.invoke(sqlSession, args);
        if (!isSqlSessionTransactional(sqlSession, getSqlSessionFactory())) {
          // force commit even on non-dirty sessions because some databases require
          // a commit/rollback before calling close()
          sqlSession.commit(true);
        }
        return result;
      } catch (Throwable t) {
        Throwable unwrapped = unwrapThrowable(t);
        if (exceptionTranslator != null && unwrapped instanceof PersistenceException) {
          Throwable translated = exceptionTranslator
              .translateExceptionIfPossible((PersistenceException) unwrapped);
          if (translated != null) {
            unwrapped = translated;
          }
        }
        throw unwrapped;
      } finally {
        closeSqlSession(sqlSession, getSqlSessionFactory());
      }
    }
  }

  public Map<String, SqlSessionFactory> getTargetFactorys() {
    return targetFactorys;
  }

  public void setTargetFactorys(
      Map<String, SqlSessionFactory> targetFactorys) {
    this.targetFactorys = targetFactorys;
  }

对比于SqlSessionTemplate,改变的地方:

新增protected Map<String,SqlSessionFactory> targetFactorys=new HashMap<>(); 修改了getSqlSession方法中传入SqlSessionFactory的方法

 private class SqlSessionInterceptor implements InvocationHandler {
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
      final SqlSession sqlSession = getSqlSession(
          getSqlSessionFactory(),
          executorType,
          exceptionTranslator);
      try {
        Object result = method.invoke(sqlSession, args);
        if (!isSqlSessionTransactional(sqlSession, getSqlSessionFactory())) {
          // force commit even on non-dirty sessions because some databases require
          // a commit/rollback before calling close()
          sqlSession.commit(true);
        }
        return result;
      } catch (Throwable t) {
        Throwable unwrapped = unwrapThrowable(t);
        if (exceptionTranslator != null && unwrapped instanceof PersistenceException) {
          Throwable translated = exceptionTranslator
              .translateExceptionIfPossible((PersistenceException) unwrapped);
          if (translated != null) {
            unwrapped = translated;
          }
        }
        throw unwrapped;
      } finally {
        closeSqlSession(sqlSession, getSqlSessionFactory());
      }
    }
  }

原版:

  SqlSession sqlSession = getSqlSession(SqlSessionTemplate.this.sqlSessionFactory,
          SqlSessionTemplate.this.executorType, SqlSessionTemplate.this.exceptionTranslator);

原版本的sqlSessionFactory是取得SqlSessionTemplated的sqlsessionFactory 新版本使用自己自定义的getSqlSessionFactory()

 @Override
  public abstract SqlSessionFactory getSqlSessionFactory();

AbstractRoutingSqlSessionTemplate是一个抽象类,所以我们需要一个真正的具体的类来实现getSqlSessionFactory方法

public class RouteSqlSessionTemplate extends AbstractRoutingSqlSessionTemplate {


  public RouteSqlSessionTemplate(
      SqlSessionFactory sqlSessionFactory) {
    super(sqlSessionFactory);
  }

  public RouteSqlSessionTemplate(
      SqlSessionFactory sqlSessionFactory, ExecutorType executorType) {
    super(sqlSessionFactory, executorType);
  }

  public RouteSqlSessionTemplate(
      SqlSessionFactory sqlSessionFactory, ExecutorType executorType,
      PersistenceExceptionTranslator exceptionTranslator) {
    super(sqlSessionFactory, executorType, exceptionTranslator);
  }

  @Override
  public SqlSessionFactory getSqlSessionFactory() {

    String dataSourceKey = DataSourceContextHolder.getDataSourceKey();
    return targetFactorys.get(dataSourceKey);


  }

配置类中增加

@Bean("sqlSessionTemplate")
  public RouteSqlSessionTemplate routeSqlSessionTemplate(@Qualifier("masterSqlSessionFactory") SqlSessionFactory masterSqlSessionFactory,@Qualifier("slaveSqlSessionFactory") SqlSessionFactory slaveSqlSessionFactory){
    Map<String,SqlSessionFactory> sqlSessionFactoryMap = new HashMap<>();
    sqlSessionFactoryMap.put("master",masterSqlSessionFactory);
    sqlSessionFactoryMap.put("slave",slaveSqlSessionFactory);
    RouteSqlSessionTemplate customSqlSessionTemplate = new RouteSqlSessionTemplate(masterSqlSessionFactory);
    customSqlSessionTemplate.setTargetFactorys(sqlSessionFactoryMap);
    return customSqlSessionTemplate;
  }

我们完成了上述的改造之后是否就可以在事务的情况下成功切换数据源了呢?我们来试一下 执行这个方法

  @Transactional(rollbackFor = Exception.class)
  public void insert(){
    TestService currentclass= (TestService ) AopContext.currentProxy();
    currentclass.insertLive();
    currentclass. insertUser();
  }

第一次请求spring多数据源事务不生效解决方法及源码分析

可以看见SqlSessionHolder 现在是空的

sessionFactory是spring多数据源事务不生效解决方法及源码分析

SqlSession是新建的

spring多数据源事务不生效解决方法及源码分析

第二次已经切换数据源了spring多数据源事务不生效解决方法及源码分析SqlSessionFactory spring多数据源事务不生效解决方法及源码分析

Sqlsession spring多数据源事务不生效解决方法及源码分析

可以看见两次执行sessionFactory都不相同,所以sqlsession也是不相同的,根据我前面所讲的sqlsession不相同对应的执行器也不相同对应的SpringManagedTransaction也不相同所以对应的connection也是不相同的这样就可以完成数据源的切换了。

spring多数据源事务不生效解决方法及源码分析

但是你以为这样就完事了,这个事务可能不是你想的那种样子,就是往a库插入一条语句,再往b库插入一条语句,如果往b库插入的时候报错,a库回回滚?是这个样子吗?实际上不是这个样子的,插入两个库是两个connection,所以不可能同时回滚的。想要同时回滚需要自己再次的自定义两个事务管理器或者使用分布式事务(后续的章节回讲解分布式事务完成上述功能)。

那不能同时回滚两个connection,那可以自己connection中发生了异常回滚自己的connection中的操作总可以吧!实际上上述的这种方式是有缺陷的,还是不能自己的connection回滚自己的操作,为什么是有缺陷的呢?涉及到部分spring事务的代码这里就先简单看一下(后续会有详细的源码分析)

执行下面的代码

  @Transactional(rollbackFor = Exception.class)
  public void insert(){
    TestService currentclass= (TestService ) AopContext.currentProxy();
    currentclass.insertLive();
    currentclass.insertUser();
  }
  @TargetDataSource("slave")
  public void insertLive(){
    liveMapper.insertLive("abc","cba");
    //这个代码会执行失败
    liveUserMapper.insertV2(12);
  }

当我们的sql语句出了问题的时候就会执行这段代码     completeTransactionAfterThrowing(txInfo, ex);

PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
        final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

        if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
            // Standard transaction demarcation with getTransaction and commit/rollback calls.
            TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

            Object retVal;
            try {
                // This is an around advice: Invoke the next interceptor in the chain.
                // This will normally result in a target object being invoked.
                retVal = invocation.proceedWithInvocation();
            }
            catch (Throwable ex) {
                // target invocation exception
                //sql出了问题的时候就会执行这个代码
                completeTransactionAfterThrowing(txInfo, ex);
                throw ex;
            }
            finally {
                cleanupTransactionInfo(txInfo);
            }

真正执行回滚到是下面的代码spring多数据源事务不生效解决方法及源码分析

可以看见回滚的时候取出的connection是从 DataSourceTransactionObject的ConnectionHolder的connection取出的,之后用的是这个connection进行的回滚。那这个connection和我们上述那个mybatis的那个切换的connection是相同的吗?

在我看来是不相同的,这个connection是不会切换的,简单来说不管你mybatis切换了什么数据源进行sql的查询写入等操作,但是发生异常的时候都只会在DataSourceTransactionObject的这个connection中进行rollback,所以就会发生有的库可以数据回滚有的库不能数据回滚。那这个DataSourceTransactionObject的这个connection到底是哪个数据库的连接,什么时候写入的呢,别急带你简单看一下spring事务的源码

在spring开启事务的时候,会初始化这个数据库连接

spring多数据源事务不生效解决方法及源码分析

根据我断掉调试的结果,这时候的数据库连接其实就是你在初始化数据源的时候,加入这个注解@Primary的数据源,就是你的主数据源

 @Bean("master")
  DataSource masterDataSource() {
    return dataSource(props.getMasterUrl(),props.getMasterUsername(),props.getMasterPassword());
  }
  @Primary
  @Bean("slave")
  DataSource slaveDataSource() {
    return dataSource(props.getSlaveUrl(),props.getSlaveUsername(),props.getSlavePassword());
  }

哈哈哈哈 基本上这种spring的多数据源的东西都分析完了,如果你使用的是上述的方式,那么如果你正好是在主数据源的连接中发生了异常进行了数据回滚那个恭喜你是没有问题的,如果是在其他的数据源中发生了异常进行回滚,那就gg喽!!!上述的这种方式是不行的。

我也曾做过在这基础上的弥补,比如重写了DataSourceTransactionManager,为其注入动态的数据源,希望他也可以动态的切换connection,但是没成功。

1 需要重写的地方太多了

2 先进入事务的aop,之后才进入我们自定义的选择数据源的aop,先后顺序不对啊,初始化事务的时候还没选数据源呢。。。。。。

public class CustomDataSourceTransactionManager extends DataSourceTransactionManager {

    public CustomDataSourceTransactionManager() {
    }

    public CustomDataSourceTransactionManager(DataSource dataSource) {
        super(dataSource);
    }
}

  @Bean
  public DataSourceTransactionManager dataSourceTransactionManager(@Qualifier("dataSource") RoutingDataSource routingDataSource){
    return new CustomDataSourceTransactionManager(routingDataSource);
  }

后续我会分享分布式事务来解决上述的问题。

哈哈哈以上就是我分享的内容,如果有什么讲的不对的地方,及时给我留言我立刻改正,要是误导了其他的正在学的小朋友,那老夫就罪孽深重了哈哈哈哈哈哈!!!


本demo代码点击下方公众号
后台回复「 344 」即可获取