使用 TransactionManager(JMS、数据库)启动多个事务

Posted

技术标签:

【中文标题】使用 TransactionManager(JMS、数据库)启动多个事务【英文标题】:Spring multiple transactions with TransactionManager (JMS, database) 【发布时间】:2022-01-23 01:32:39 【问题描述】:

我有一个方法,我想执行两个事务:一个使用 DB,一个使用 JMS。我希望一个接一个地提交。我正在尝试为此使用PlatformTransactionManager。有两种方法可以做到这一点:使用TransactionTemplateDefaultTransactionDefinition。但是我没有找到任何一个多次使用的例子。我想做的是:

void do()
 T dbTransaction = ...; // here goes: new TransactionTemplate(transactionManager) two times
 T jmsTransaction = ...; // or: new DefaultTransactionDefinition() and transactionManager.getTransaction(definition); two times
 saveDb();
 sendJms();
 dbTransaction.commit();
 jmsTransaction.commit();

但我不确定要使用什么以及如何使用,因为在 this article 中它说:

无论如何,一旦我们创建了一个带有配置的 TransactionTemplate,所有事务都将使用该配置来执行。所以,如果我们需要多个配置,我们应该创建多个模板实例。

那么我如何正确地创建两个事务并一个接一个地关闭呢?我应该创建两个单独的definitions 还是可以重复使用一个?我可以在两个templates 中重复使用相同的transactionManager 吗?我知道 DB 有一个 @Transcational 注释,我也可以将 JMS 配置为使用事务,但是:

    我没有找到如何配置 JMS 以使用事务的好示例 我不确定他们会在哪个订单关闭

所以我想手动执行此操作。我也不确定此手动事务是否适用于 JMS(例如 IBM-MQ),因为我只看到了数据库事务的示例。

【问题讨论】:

您实际上是否分别在每个saveDb()sendJms() 方法中执行多个数据库和JMS 操作?如果没有,则使用事务没有任何好处。此外,通常在这种情况下,您希望数据库和 JMS 操作在 相同 事务中,而不是在 2 个不同的事务中。你不需要你的数据库和 JMS 操作是原子的吗?如果数据库事务提交成功,JMS 事务失败回滚怎么办? @JustinBertram,DB 中有一项操作,JMS 中有一项操作。但问题是,使用 @Transactional 它在保存到 DB 之前发送到 JMS,并且在确认它成功发送到 JMS 比它在 DB 中的速度更快之后我收到。在手动场景中,如果 JMS 失败,它将失败,但如果 JMS 没有异常,则会将其保存到 DB,即使在与 JMS 的事务失败之后我也可以,因为我将状态保存在 DB 中。 @JustinBertram If not, there's no benefit to using a transaction. 我认为能够重构代码(可能拆分查询)而不必担心未来的原子性是一个巨大的好处。事实上,我认为每次写入都应该是事务性的,并有明确的选择退出。但是,完全同意您的其余观点。 鉴于您的功能要求(即发送 JMS 消息时需要存在数据库中的数据),我认为您实际上 确实 希望这两个操作都在相同的事务,并且由于您正在使用多个资源管理器,因此您将希望使用 XA 事务。这不能与@Transactional 一起使用这一事实仅意味着某些配置不正确(例如,您的应用程序可能没有为 JDBC 和 JMS 使用正确的 XA 资源)。我不是 Spring 专家(我在 ActiveMQ 上工作)所​​以我不能推荐一个具体的解决方案,但我绝对认为你需要 XA。 不需要XA,只需要两个同步的本地事务。 【参考方案1】:

您的用例简单而常见。您希望发送一条 JMS 消息,等待该消息完成,然后提交到数据库。这是在两个事务上完成的——一个用于 JMS 消息,另一个用于数据库。这些事务都存在于单个事务上下文中。当您启动 JMS 事务时,将不存在事务上下文,因此将创建一个。当您启动数据库事务时,它将加入现有的事务上下文。这些事务将被同步,因为 JMS 事务必须在数据库事务提交之前成功完成。

此操作的核心是事务管理器。查看您链接的文章,他们多次引用PlatformTransactionManager。在您的用例中,PlatformTransactionManager 必须是支持 JTA 的事务管理器。 JTA 事务管理器将能够创建事务上下文并注册和同步事务。

请注意,这是两个本地事务,这绝不是 XA 或分布式事务。在这种情况下,如果 JMS 本地事务失败,那么数据库本地事务将被回滚。更具体地说,只有事务上下文被标记为回滚。如果发生任何未处理的异常,则事务上下文仅标记为回滚。对本地事务调用 commit() 的任何尝试都将失败,并显示一条消息,指出事务上下文仅回滚。

实现这一点取决于平台。例如,如果您的 Spring 项目部署在 JBoss 等应用程序服务器上的 WAR 文件中,则 PlatformTransactionManager 将自动自动装配。如果您使用的是 Spring Boot,那么大多数配置甚至不包括事务管理器。

我有一个用于 Spring Boot here 的事务性 Spring JMS 和 Camel。这是一个用于 IBM MQ 的简单消息桥。如果不出意外,Spring JMS 和注释以及事务性 IBM MQ 的示例应该很有用。也许骆驼钻头也很有用。

注意pom.xml 文件包含:

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-jta-narayana</artifactId>
    </dependency>

这个 Spring Boot 启动器将安装和配置 Arjuna JTA 事务管理器为 PlatformTransactionManager

在我的例子中,我有:

<logger name="com.arjuna" level="TRACE" additivity="false">
    <appender-ref ref="STDOUT" />
</logger>

这为 Arjuna JTA 事务管理器提供了非常好的日志记录。

最后,获取配置为PlatformTransactionManager 的JTA 事务管理器。使用它来创建一个事务上下文,并在该上下文中拥有两个本地同步事务。

示例项目应该易于运行。日志记录非常丰富。

【讨论】:

【参考方案2】:

目前尚不清楚为什么您希望在这种特殊情况下使用 JMS 事务,我什至会反对它 - 至少正如您在上面介绍的那样。

您基本上希望在状态成功存储到数据库后发布消息。

既然您的事实来源是数据库,为什么不将所有后续操作都建立在该操作成功完成的基础上?

例如,构建它的一种方法是(面向 Spring,因为您已经提到您正在使用它):

    创建一个作用域为当前事务的JmsSender bean。这可以通过实现BeanFactoryPostProcessor 并执行以下操作来完成:
    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException 
        SimpleTransactionScope transactionScope = new SimpleTransactionScope();
        // The scope exists, but is not registered by Spring by default.
        beanFactory.registerScope("transaction", transactionScope);
    

    // in a separate configuration class defining your JmsSender bean
    @Bean
    @Scope("transaction")
    public JmsSender jmsSender()  return new JmsSender(); 
    每次调用此bean 的send() 方法时,都会向内部队列添加一条消息。这通常是ThreadLocal&lt;List&lt;T&gt;&gt; - 事实上,Spring 处理事务管理的方式几乎相同。 创建一个AfterCommitJmsPublisher bean,它是一个TransactionSynchronizationAdapter - 这意味着我们希望在提交时有额外的行为。 注册AfterCommitJmsPublisher。这意味着在事务之前调用TransactionSynchronizationManager.registerSynchronization(jmsPublisher)。一种方法是使用例如方面、声明式事务管理 (@Transactional) 和 Spring AOP 将是:
@Aspect
@Component
public class AfterCommitJmsPublisher extends TransactionSynchronizationAdapter 

    private final JmsPublisher;

    @Pointcut("@annotation(org.springframework.transaction.annotation.Transactional)")
    private void transactionalPointcut() 
    

    @Before("transactionalPointcut()")
    public void registerTransactionSynchronization() 
        TransactionSynchronizationManager.registerSynchronization(this);
    
    提交数据库事务后,调用类似jmsPublisher.publish() 的方法。这可以在TransactionSynchronizationAdapterafterCommit() 方法中完成:
    // In AfterCommitJmsPublisher
    @Override
    public void afterCommit() 
        jmsPublisher.publish();
    
    如果事务被回滚,则调用类似jmsPublisher.clear() 的方法。您可能不想发布任何有关失败操作的消息。

这样,您的 JMS 消息始终绑定到它们所源自的事务 - 如果数据库事务失败,则不会发送任何消息。

取消您的评论:

在手动场景中,如果 JMS 失败,它将失败,但如果 JMS 没有异常,则将其保存到数据库中,即使在与 JMS 的事务失败之后我也可以,因为我正在保存状态在数据库中。

这可能足以满足您的要求。但是,您可能需要考虑到,您拥有的组件越多,系统需要的容错能力就越高,并考虑到外部服务可能不可用。

这可能意味着将 JMS 消息保存在一个特殊的数据库表中(作为事务的一部分!)并且仅在成功提交后发布,在成功发布后删除保存的消息。如果不成功,您可以实施管家任务,重新尝试发布您的消息。

最后,关于分布式事务的一句话:我个人建议尽可能不要使用它们,尤其是对于您当前的用例。它们是复杂的野兽,几乎肯定会影响应用程序的可用性,并增加事务中涉及的所有进程的端到端延迟。像Saga pattern 这样的东西通常更适合分布式系统。

当然,这可能不适用于您的用例,并且您的一致性要求可能超过任何可用性要求,因此请谨慎对待。

【讨论】:

这不是我的解决方案,因为我需要根据方法内部的条件使用一个 JMS 或另一个 JMS,我需要手动交易,请您回答我的问题 你是什么意思?非此即彼?据我所知,这不是你的问题所说的。你到底想做什么?

以上是关于使用 TransactionManager(JMS、数据库)启动多个事务的主要内容,如果未能解决你的问题,请参考以下文章

transactionManager 应该使用哪个 SessionFactory?

spring transactionmanager

Jboss 7 - Spring - 使用 Jboss TransactionManager

没有为限定符“transactionManager”找到匹配的 PlatformTransactionManager bean

Hibernate:无法访问 TransactionManager 或 UserTransaction 以进行物理事务委托

DdlTransactionIsolatorJtaImpl 找不到 TransactionManager