如何确保来自 JMS 队列的消息被传递到外部 WebService (CXF)?

Posted

技术标签:

【中文标题】如何确保来自 JMS 队列的消息被传递到外部 WebService (CXF)?【英文标题】:How to make sure that message from JMS queue is delivered to external WebService (CXF)? 【发布时间】:2012-02-17 19:00:12 【问题描述】:

问题

应该如何在Mule ESB 3.2 中配置ActiveMQ<flow>,以确保从队列中拉出的消息最终被外部CXF service 正确处理?

场景

我有一个 CXF 端点,它应该接收传入的消息并尽快将其传输到三个外部服务。我们称它们为 EX1、EX2、EX3。这很容易,这要归功于 Mule 3.x 中引入的 <all> 组件。

整个解决方案最重要的要求,是确保每条收到的消息最终都被传递到所有三个 CXF 服务。所以我们最终有了这个想法,将每条传入的消息放入Persistent JMS queues(Q1、Q2、Q3)。在从队列 Qn 中读取消息后,将其直接传输到相应的 EXn 端点,从而 - 外部服务。

配置

(我可以根据要求提供完整的配置)

我们已经按照here 的描述配置了 ActiveMQ 代理,并将其与我们的 <flow> 配置连接起来。一切似乎都按预期工作,我已将 JConsole 连接到我的应用程序,因此我可以看到消息的类型为 PERSISTENT 并且它们最终出现在正确的队列中。如果一切顺利 - 所有三个服务 EXn 都接收到消息。

测试

当我们关闭其中一项服务(例如 EX2)并重新启动整个服务器以模拟故障时,就会出现问题。 消息最终会丢失(我想这不是那么持久,对吧?)。 最奇怪的是——如果我们在 EX2 宕机时发送了 10 条消息,那么在服务器重新启动后,其中 9 条消息被正确地重新发送了!所以我在想,也许,只是也许,这 10 条消息中有 9 条已正确排队,而当服务器出现故障时,一条正在不断地重新传递。

这让我想到,说实话,我无法理解 CXF 端点没有通过事务支持来处理。毕竟,当它试图重新传递时,我可以看到消息在队列中,所以它应该被持久化。显然不是,但为什么呢?

我自己的尝试 我尝试了很多东西,但都没有奏效。总是有一条消息丢失。

    不在流程中使用任何 <jms:transaction /> 标记 - 不起作用 在收到消息时启动 jms 事务,在发送到 <cxf:jaxws-client /> 时加入 将 XA 与 JBoss 和 <xa-transaction /> 一起使用 - 不起作用 提供 <default-exception-strategy> 配置 - 如果我记得它让事情变得更糟

感谢您的帮助。

配置

活动 MQ 配置

<spring:bean id="AmqDefaultPolicyEntry" class="org.apache.activemq.broker.region.policy.PolicyEntry">
    <spring:property name="queue" value="queue.*"/>
    <spring:property name="deadLetterStrategy" ref="AmqDeadLetterStrategy"/>
</spring:bean>

<spring:bean id="AmqPolicyMap" class="org.apache.activemq.broker.region.policy.PolicyMap">
    <spring:property name="defaultEntry" ref="AmqDefaultPolicyEntry"/>
</spring:bean>

<spring:bean name="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory" depends-on="AmqBroker">
    <spring:property name="brokerURL" value="vm://localhost?jms.prefetchPolicy.all=1&amp;broker.persistent=true&amp;broker.useJmx=true"/>
    <spring:property name="redeliveryPolicy">
        <spring:bean class="org.apache.activemq.RedeliveryPolicy">
            <spring:property name="initialRedeliveryDelay" value="$props.initialRedeliveryDelay"/>
            <spring:property name="redeliveryDelay" value="$props.redeliveryDelay"/>
            <spring:property name="maximumRedeliveries" value="$props.maximumRedeliveries"/>
            <spring:property name="backOffMultiplier" value="$props.backOffMultiplier"/>
        </spring:bean>
    </spring:property>
</spring:bean>

<spring:bean name="persistenceAdapter" class="org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter">
    <spring:property name="directory" value="/home/bachman/activemq"/>
</spring:bean>

<spring:bean name="AmqBroker"
             class="org.apache.activemq.broker.BrokerService"
             init-method="start"
             destroy-method="stop">
    <spring:property name="brokerName" value="esb-amq-broker"/>
    <spring:property name="persistent" value="true"/>
    <spring:property name="dataDirectory" value="/home/bachman/activemq"/>
    <spring:property name="useJmx" value="true"/>
    <spring:property name="useShutdownHook" value="false"/>
    <spring:property name="persistenceAdapter" ref="persistenceAdapter"/>
    <spring:property name="destinationPolicy" ref="AmqPolicyMap"/>
</spring:bean>

<jms:activemq-connector name="PersistentJMSConnector" specification="1.1"
                        numberOfConsumers="1" maxRedelivery="-1" persistentDelivery="true"
                        connectionFactory-ref="connectionFactory" acknowledgementMode="CLIENT_ACKNOWLEDGE"
                        disableTemporaryReplyToDestinations="true"/>

FLOW - 将传入消息分派到 3 个队列 Qn

<flow name="dispatch-to-queues">
        <inbound-endpoint ref="incoming-cxf"/>

        <!-- Each received message ends up to be sent to all destinations -->
        <all>
            <jms:outbound-endpoint name="queue.q1"
                queue="queue.q1" disableTransportTransformer="false"
                disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
                doc:name="JMS" doc:description="Receive messages on Q1"
                    connector-ref="PersistentJMSConnector"/>

            <jms:outbound-endpoint name="queue.q2"
                queue="queue.q2" disableTransportTransformer="false"
                disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
                doc:name="JMS" doc:description="Receive messages on q2"
                connector-ref="PersistentJMSConnector" />

            <jms:outbound-endpoint name="queue.q3"
                queue="queue.q3" disableTransportTransformer="false"
                disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
                doc:name="JMS" doc:description="Receive messages on Q3"
                connector-ref="PersistentJMSConnector" />

        </all>
        <custom-processor class="com.mycompany.just.a.component.to.return.OK.via.Cxf" />
    </flow>

FLOW - 处理从 Qn 到 EXn 的传递

<flow name="from-q1-to-ex1">
        <jms:inbound-endpoint queue="queue.q1" disableTransportTransformer="false"
            disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
            doc:name="JMS" doc:description="Pull from q1."
            connector-ref="PersistentJMSConnector">
                <jms:transaction action="ALWAYS_BEGIN" />
        </jms:inbound-endpoint>
        <logger message="Sending message to EX-1" level="INFO" />

        <!-- Handle errors at this point in flow
        <custom-processor class="pl.exception.lookup.Component">
            <spring:property name="targetModuleName" value="Not-important"/>
        </custom-processor>
        -->


        <outbound-endpoint ref="ex1-cxf-endpoint">
            <jms:transaction action="ALWAYS_JOIN" timeout="360000"/>
        </outbound-endpoint>
    </flow>

ENDPOINTS - 引用端点的声明

<endpoint name="incoming-cxf" address="http://incoming.mycompany.com/in" exchange-pattern="request-response">
        <cxf:jaxws-service serviceClass="com.mycompany.services.InService"/>
    </endpoint> 

<endpoint name="ex1-cxf-endpoint" address="http://com.mycompany.ex1" exchange-pattern="request-response">
        <cxf:jaxws-client
                clientClass="com.mycompany.services.Ex1"
                wsdlLocation="classpath:wsdl/ex1.wsdl"
                operation="someOperation"
                port="SomePort"/>
    </endpoint>

【问题讨论】:

仍然没有解决方案,我开始赏金了。我将接受任何带有 CXF 和活动 MQ 的持久消息工作示例的响应,在两个流程中实现此场景:IN_CXF -> QUEUE; QUEUE -> OUT_CXF 【参考方案1】:

在事务中使用 JMS 消息是解决方案按预期工作的必要条件:如果在 CXF 出站阶段发生异常,JMS 消息将最终回滚,然后重新传递,触发新的 CXF 调用。

您必须为您的 ActiveMQ 客户端仔细配置重新传递策略,以便重试足够多的时间并且可能不会太快(例如指数回退)。您还想适当地处理 DLQ。在 Mule 中使用 Spring Beans 的 ActiveMQ 客户端配置如图所示:http://www.mulesoft.org/mule-activemq-integration-examples

还要确保在您的配置工厂中引用正确的代理 URL。如果您的代理名称为 esb-amq-broker,您的配置工厂应该是:

<spring:bean name="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory" depends-on="AmqBroker">
    <spring:property name="brokerURL" value="vm://esb-amq-broker"/>
    ...

【讨论】:

亲爱的@David,我已经看到了你之前的回答,我希望你能回复:) 我明天上班时会发布所有必要的配置。感谢您的支持,这对我来说非常重要,因为我最有价值的客户正在等待解决方案。 好的,我已经添加了配置。如果您对 pl.exception.lookup.Component 感到好奇,它只是一个尝试预测是否会在流程中的下一个处理器中引发异常的组件。我也尝试在没有它的情况下测试解决方案,因为我认为它可能会影响潜在的异常。 好的,我将调整我的配置并在此处发布结果。但是有一个问题 - 我的配置缺少 和显式回滚可以吗?我假设在CXF端点指示异常后事务将自动回滚,还是我错了? 好的,所以我认为 Mule 的做法是正确的。当我想要有力地保证没有 JMS 消息丢失时,我会运行我的 JMS 提供程序,而不是嵌入式(即独立)和 HA 模式(集群)。现在对于您的特定情况,代理名称不匹配可能是问题所在:您可能会从连接工厂启动第二个内存代理。 我指的是 ActiveMQ 的 HA 选项,而不是 Mule 的。如果 JMS 代理是 HA,Mule 不是也可以。【参考方案2】:

我不知道我是否会帮助你,但这是关于你的问题的一些建议:

您是否尝试过使用不同于 Jboss 提供的事务管理器,我建议使用 Atomikos 进行此类测试 像 David 建议的那样,事务似乎是最好的方法,但另一种方法是使用显式确认策略 .... 设置可能很棘手,但类似拦截器的方法可以监视与某些特定端点的连接并发送将 ack 返回到您的 JMS 服务器,这可能很困难,但它肯定会确保消息已正确传递....

祝你好运 高温高压 杰罗姆

【讨论】:

我使用的不是 Jboss,而是 Mule ESB。它带有一个内置的服务器和默认的事务管理器。我也尝试过使用 JBoss 事务管理器,但没有运气(它的工作原理完全相同)。我会考虑第二个想法。【参考方案3】:

不确定这种考虑是否有帮助,但是确认模式呢?是否有可能消息已经传递(在自动确认模式下)但尚未被消费服务端点正确处理?

不知道如何在这种情况下配置显式确认,但可能值得进一步研究。

【讨论】:

我已经在两种情况下尝试了 ack:AUTO 和 CLIENT。没有帮助。另外,我想指出消息仍在队列中并且正在尝试重新传递(我可以看到 Mule 一直在尝试将消息发送到故障端点),但只是在服务器硬重置时丢失了。

以上是关于如何确保来自 JMS 队列的消息被传递到外部 WebService (CXF)?的主要内容,如果未能解决你的问题,请参考以下文章

替换 jms 队列中的消息

ActiveMQ / JMS 消息处理程序测试

重新传递 Mule ESB 处理的失败的 activemq jms 消息时保留异常原因

一条 JMS 消息复制到两个队列

Mule ESB HornetQ JMS 消息未传递

Spring JMS(ActiveMQ) 延迟消息传递