Spring Integration的DefaultMessageListenerContainer和JPA中的事务

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring Integration的DefaultMessageListenerContainer和JPA中的事务相关的知识,希望对你有一定的参考价值。

假设我有简单的IntegrationFlow,它以异步方式读取JMS消息,转换,应用业务逻辑等。

@Bean
public IntegrationFlow upstreamEventFlow() {
    return IntegrationFlows.from(
            Jms.messageDrivenChannelAdapter(jmsConnectionFactory)
                    .configureListenerContainer(container -> container.destinationResolver(destinationResolver))
                    .destination("myQueue")
                    .get()
    )
            .transform(xmlToObjectTransformer)
            .transform(upstreamTransformer)
            .handle(evaluationHandler)
            .transform(objectToXmlTransformer)
            .channel(downstreamEventChannel)
            .get();
}

evaluationHandler是GenericHandler,它使用JPA从数据库中获取数据并应用一些业务逻辑

我的问题与Spring和Spring Integration中的事务有关

当我使用以下记录器启用日志记录时:

<logger name="org.springframework.transaction" level="trace"/>
<logger name="org.springframework.jms.connection.JmsTransactionManager" level="debug"/>
<logger name="org.springframework.orm.jpa.JpaTransactionManager" level="debug"/>
<logger name="org.springframework.orm.hibernate5.HibernateTransactionManager" level="debug"/>
<logger name="org.springframework.jdbc.datasource.DataSourceTransactionManager" level="debug"/>
<logger name="org.springframework.integration" level="debug"/>
<logger name="org.springframework.messaging" level="debug"/>
<logger name="org.springframework.jms" level="debug"/>

我可以看到它始于JPA事务,而不是Spring Integration或至少JMS创建的事务

来自日志:

o.s.orm.jpa.JpaTransactionManager        : Creating new transaction with name ...: PROPAGATION_REQUIRED,ISOLATION_DEFAULT; ''
o.s.orm.jpa.JpaTransactionManager        : Opened new EntityManager ... for JPA transaction
o.s.orm.jpa.JpaTransactionManager        : Exposing JPA transaction as JDBC transaction ...

这是为什么?

您能否详细说明DefaultMessageListenerContainer中的事务如何工作以及它如何与JMS事务,JMS事务会话,JPA事务等相关联?

交易边界在哪里?

交易会话和使用DefaultMessageListenerContainer中的事务管理器有什么区别?

如果我在处理程序中抛出RuntimeException,我可以看到它被回滚并多次查看消息Execution of JMS message listener failed, and no ErrorHandler has been set.和activemq broker redelivers消息

我应该设置ErrorHandler吗?

答案

最好的猜测是你的一个豆类,例如evaluationHandler,或方法,注释@Transactional。那是JPA交易开始的时候。该方法退出时,事务将提交(或回滚)。

由于默认情况下会对MD通道适配器进行事务处理,因此JMS(本地)事务将包装整个流,并且在整个流完成(或中止)之前不会提交(或回滚)。

所以,这两个交易并没有真正以任何方式同步;他们是独立的。

如果将JPA事务管理器注入适配器的侦听器容器,框架将提供“Best Efforts 1PC”事务同步 - 请参阅Dave Syer's Javaworld Article "Distributed transactions in Spring, with and without XA"

JPA事务将由容器启动,并且两个事务将背靠背(DB优先)提交,从而有效地同步它们。这不是真正的JTA,并且在JMS Tx回滚时DB Tx可能会提交的可能性很小 - 因此您必须处理重复交付的可能性。

以上是关于Spring Integration的DefaultMessageListenerContainer和JPA中的事务的主要内容,如果未能解决你的问题,请参考以下文章

使用 Spring Integration HttpInbound 示例

spring-integration-file 的 junit 测试用例

MyBatis(3.2.3) - Integration with Spring

了解spring-integration service-activator

INTEGRATION TESTING WITH SPRING AND JUNIT

spring-integration-smb 是不是支持 SMB2 和 SMB3?