spring事务内如何查询别的数据源

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spring事务内如何查询别的数据源相关的知识,希望对你有一定的参考价值。

参考技术A 要使用哪个数据源? | 码农家园
要使用多个事务处理程序,只需指定一个限定符,然后对其进行引用。对于具有两个不同数据源的两个不同的DAO,您将需要两个不同的事务管理器。
参考技术B spring事务内查询别的数据源方法:
增加repository隔离层,增加Dbswitch注解,通过AOP拦截实现对repository的mysql数据库操作方法进行方法前数据源切换。
不使用spring声明式事务,改为手动式事务,切换数据源的时候都获取request_new新事务,保证数据源能切换成功,提供事务改为从队尾开始链式提交,

在一个全局事务的范围内,使用 JTA 对不同的数据源同时调用几个查询

【中文标题】在一个全局事务的范围内,使用 JTA 对不同的数据源同时调用几个查询【英文标题】:Invoking few queries to different data sources concurrently, using JTA, in scope of one global transaction 【发布时间】:2013-09-21 17:43:33 【问题描述】:

我有一个包含 3 个分布式数据源 (com.atomikos.jdbc.AtomikosDataSourceBean) 的应用程序。我正在使用 Atomikos 事务管理器作为 JTA 实现。每个数据源都适用于 PostgreSQL 数据库。 现在,我依次对每个数据源调用我的查询,一切正常。

我想知道,是否可以使用 JTA 并行调用我的查询(多线程,并发)?

我尝试使用 jdbcTemplate (Spring) 在新创建的线程中简单地调用查询。首先,我遇到了一个春天的问题。 Spring 将事务上下文存储在 ThreadLocal 字段中,因此在我的新线程 (Spring transaction manager and multithreading) 中没有正确解析它。我通过将相同的事务上下文设置到新创建的线程的 ThreadLocal 中解决了这个问题。 但我在 Atomikos 代码中面临同样的问题。它们还将 CompositeTransactionImp 存储在线程范围映射 (BaseTrancationManager#getCurrentTx) 中。但在 Atomikos 的情况下,不可能为新线程设置值。 所以我不能同时执行我的查询,因为 Atomikos 似乎不支持这种方法。 但我也查看了 JTA 规范,发现它们如下:“多个线程可能同时与同一个全局事务相关联。” (《3.2 TransactionManager 接口》,http://download.oracle.com/otndocs/jcp/jta-1.1-spec-oth-JSpec/?submit=Download)

问题:如何在一个全局事务的范围内使用 JTA(2 阶段提交)同时调用两个或多个对不同数据源的查询?

tomcat 上下文中的数据源配置:

<Resource name="jdbc/db1" auth="Container" type="com.atomikos.jdbc.AtomikosDataSourceBean"
          factory="com.company.package.AtomikosDataSourceBeanFactory"
          xaDataSourceClassName="org.postgresql.xa.PGXADataSource"
          xaProperties.serverName="localhost"
          xaProperties.portNumber="5451"
          xaProperties.databaseName="db1"
          uniqueResourceName="jdbc/db1"
          xaProperties.user="secretpassword"
          xaProperties.password="secretpassword"
          minPoolSize="5"
          maxPoolSize="10"
          testQuery="SELECT 1"  />

<Resource name="jdbc/db2" auth="Container" type="com.atomikos.jdbc.AtomikosDataSourceBean"
          factory="com.company.package.AtomikosDataSourceBeanFactory"
          xaDataSourceClassName="org.postgresql.xa.PGXADataSource"
          xaProperties.serverName="localhost"
          xaProperties.portNumber="5451"
          xaProperties.databaseName="db2"
          uniqueResourceName="jdbc/db2"
          xaProperties.user="secretpassword"
          xaProperties.password="secretpassword"
          minPoolSize="5"
          maxPoolSize="10"
          testQuery="SELECT 1"  />

<Resource name="jdbc/db3" auth="Container" type="com.atomikos.jdbc.AtomikosDataSourceBean"
          factory="com.company.package.AtomikosDataSourceBeanFactory"
          xaDataSourceClassName="org.postgresql.xa.PGXADataSource"
          xaProperties.serverName="localhost"
          xaProperties.portNumber="5451"
          xaProperties.databaseName="db3"
          uniqueResourceName="jdbc/db3"
          xaProperties.user="secretpassword"
          xaProperties.password="secretpassword"
          minPoolSize="5"
          maxPoolSize="10"
          testQuery="SELECT 1"  />

Spring 上下文中的事务管理器配置:

 <bean id="transactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"
  init-method="init" destroy-method="close" lazy-init="true">
  <property name="forceShutdown" value="false" />
 </bean>

代码:

    final SqlParameterSource parameters = getSqlParameterSourceCreator().convert(entity);

    // Solving Spring's ThreadLocal issue: saving thread local params
    final Map<Object, Object> resourceMap = TransactionSynchronizationManager.getResourceMap();
    final List<TransactionSynchronization> synchronizations = TransactionSynchronizationManager.getSynchronizations();
    final boolean actualTransactionActive = TransactionSynchronizationManager.isActualTransactionActive();
    final String currentTransactionName = TransactionSynchronizationManager.getCurrentTransactionName();
    final AtomicReference<Throwable> exceptionHolder = new AtomicReference<Throwable>();

    // Running query in a separate thread.
    final Thread thread = new Thread(new Runnable() 
        @Override
        public void run() 
            try 
                // Solving Spring's ThreadLocal issue: setting thread local values to newly created thread.
                for (Map.Entry<Object, Object> entry : resourceMap.entrySet()) 
                    TransactionSynchronizationManager.bindResource(entry.getKey(), entry.getValue());
                
                if (synchronizations != null && !synchronizations.isEmpty()) 
                    TransactionSynchronizationManager.initSynchronization();
                    for (TransactionSynchronization synchronization : synchronizations) 
                        TransactionSynchronizationManager.registerSynchronization(synchronization);
                    
                
                TransactionSynchronizationManager.setActualTransactionActive(actualTransactionActive);
                TransactionSynchronizationManager.setCurrentTransactionName(currentTransactionName);

                // Executing query.
                final String query = "insert into ...";
                NamedParameterJdbcTemplate template = new NamedParameterJdbcTemplate(dataSourceOne);

                template.update(query, parameters);
             catch (final Throwable ex) 
                exceptionHolder.set(ex);
            
        
    );
    thread.start();

    // ... same code as above for other dataSources.

    // allThreds.join(); - joining to all threads.

【问题讨论】:

【参考方案1】:

我认为您解决 TransactionSynchronizationManager 必须使用单线程事务这一事实的想法很有趣,但可能很危险。

在 TransactionSynchronizationManager 中,transactionnal 资源存储在 ThreadLocal Map 中,其中关键是资源工厂,我想知道当您使用同一资源工厂使用多个线程执行此解决方法时会附加什么 - 它可能不会适用于您的情况,因为您有 3 个数据源-。 (乍一看,我会说您的一个事务资源将被另一个替换,但也许我遗漏了一些东西......)。

无论如何,我认为您可以尝试使用javax.transaction.TransactionManager.resume() 来实现您想要做的事情。

想法是直接使用JTA api,从而绕过单线程Spring事务支持。

这里有一些代码来说明我的想法:

@Autowired
JtaTransactionManager txManager;  //from Spring

javax.transaction.TransactionManager jtaTransactionManager;

public void parallelInserts() 
    jtaTransactionManager = txManager.getTransactionManager();  //we are getting the underlying implementation
    jtaTransactionManager.begin();
    final Transaction jtaTransaction  = jtaTransactionManager.getTransaction();
    try 
      Thread t1 = new Thread()
        @Override
        public void run() 
            try 
                jtaTransactionManager.resume(jtaTransaction);
                //... do the insert
             catch (InvalidTransactionException e) 
                try 
                    jtaTransaction.setRollbackOnly();
                 catch (SystemException e1) 
                    e1.printStackTrace();
                
                e.printStackTrace();
             catch (SystemException e) 
                e.printStackTrace();
            
        
      ;
      t1.start();
      //same with t2 and t3
     catch (Exception ex) 
        jtaTransactionManager.setRollbackOnly();
        throw ex;
    
    //join threads and commit
    jtaTransactionManager.commit();

我认为这个解决方案可能有效(我必须说我自己没有尝试过)。我现在看到的唯一限制是您不能重用线程,因为没有与 resume() 调用对应的部分,并且您第二次调用 resume() 时可能会遇到 IllegalStateException。

【讨论】:

以上是关于spring事务内如何查询别的数据源的主要内容,如果未能解决你的问题,请参考以下文章

解读底层原理,分析Spring事务管理那些事

Spring-事务

spring-事务管理

Spring事务管理那些事

spring执行事务提交后进行一些逻辑操作

spring事务管理属性为只读是啥意思