使用 CoD over Camel JMS 组件实现本机 websphere MQ

Posted

技术标签:

【中文标题】使用 CoD over Camel JMS 组件实现本机 websphere MQ【英文标题】:Implementing native websphere MQ with CoD over Camel JMS component 【发布时间】:2016-01-09 01:04:17 【问题描述】:

我在使用 Apache CAMEL 实现 Websphere MQ (WMQ) 连接器时遇到了很多困难,该连接器可以毫无例外地处理 MQ 交付确认 (CoD) 报告,也不会产生不需要的响应数据报形式的副作用。最后,我让它按照我想要的方式工作,如果您习惯于编写本地 MQ 客户端,这是一种非常标准和常见的方式。我在同一主题的帖子中记录了该方法,但我发现该解决方案过于复杂,非常感谢任何建议或示例,以使实现更简洁、更优雅。

我知道问题的根源在于 MQ 设计请求-回复消息交换模式 (MEP) 的方式,与 JMS 规范的方式,以及在其 JMS 组件中请求-回复 MEP 的 CAMEL 实现。三种不同的哲学!

    WMQ 具有 MessageType 标头(请参阅 MQMD fields 和 constants),其值为 1 表示请求,2 表示回复,8 表示数据报(单向 MEP)。此外,值 4 用于以 CoD (Conf. of Delivery)、PAN (Positive AckNowledge) 和 NAN (Negative AckNowledge) 的形式标记 Report 消息,这在消息流方面也做出了额外的回复信息。可以使用另一个名为“报告”的标头字段为请求消息、回复或数据报请求 CoD、PAN 和 NAN 确认,其中可以组合所有报告变体的标志。附加标头字段“ReplyToQ”和“ReplyToQMgr”指定原始发件人期望报告和回复的队列和队列管理器,固定的 24 字节“CorrelId”字段 - 可选 - 可以帮助关联对原始数据报或请求消息的报告和回复。更复杂的是,确实可以发回具有相同原始消息 ID 且没有 CorrelID 的回复和报告,或者在 CorrelId 中提供原始消息 ID,或者在原始请求或数据报中已经指定时返回 CorrelId 值。 IBM 提供了JMS API over WMQ,允许通过 WMQ 将普通 JMS 交换隧道作为传输(借助额外的消息头名称 MQRFH2),或到map native MQ messages onto JMS messages,反之亦然。 另一方面,JMS 规范提供了一个可选的“JMSReplyTo”标头字段和一个“JMSCorrelationID”,但将确切的 MEP 语义留给客户端应用程序;即在规范中声明:“响应可能是可选的;由客户决定。” CAMEL 具有 XML 或 Java DSL 中的“路由”和内部 Exchange 对象模型,旨在支持 EIP Patterns,其中 Request-Reply 模式。然后,CAMEL 在其JMS Component 中假设,如果设置了 JMSReplyTo 字段,这必然是一个期望回复的请求,导致 Exchange 的 Out 部分(或如果 Out 为空,则修改 In 部分)返回到定义的队列中JMSReplyTo。

我愿意通过远程 Websphere 队列管理器支持带有交付确认 (CoD) 报告的本地 MQ 消息交换,这样,除了事务和持久性(即无丢失、无重复)之外,人们还可以跟踪何时消息被消耗并在延迟的情况下发出警报。

入站问题:

默认情况下,当队列中的消息消费完成时,Websphere 队列管理器会生成 CoD 报告。因此,没有任何特定设置,远程 MQ 客户端发送带有 CoD 标志的数据报(以及随后强制的 ReplyToQ)将在 CAMEL 端点消费时从队列管理器获得第一个回复作为 MQ 报告消息,然后是由 CAMEL 显式返回的第二条(意外)回复消息,其中包含 CAMEL 路由末尾的 Exchange 对象中剩余的任何内容,因为 CAMEL 假定存在 JMSReplyTo 字段(映射来自 MQ ReplyToQ 和 ReplyToQMgr,都需要支持 CoD 返回流)。

出站问题:

在没有特定设置的情况下,CAMEL 默认情况下也假定出站连接上的请求-回复 EIP / MEP。然后,CAMEL JMS/MQ 端点将等待 1 响应。当出站消息是基于 MQ 的 JMS(因此带有 MQRFH2 标头)时,这可以正常工作。当强制使用普通 MQ 时,即如下删除 MQRFH2 标头,我无法让端点侦听器与相关的传入 MQ 报告匹配,尽管跟踪的值似乎都是正确的(强制使用 24 个字符相关 ID,以便截断更长的 CorrelId 值或空填充MQ 无法对相关过滤器进行地理分配)。有没有人能够解决这个问题?

详细信息: 尽管 IBM JMS API 接受传递特定的 JMS 属性值 WMQ_MESSAGE_BODY=1|0 / WMQ_TARGET_CLIENT=1|0 来控制生成的消息中是否存在 JMS 标头 MQRFH2 ,这些选项通过 CAMEL 变得无效。必须使用 CamelJmsDestinationName 标头 (as explained in CAMEL JMS doc) 为具有选项“targetClient=1”的目的地提供 IBM 队列 URL,以便摆脱 MQRFH2 标头。但如果没有此标头,则 CoD 报告或 MQ 回复上的 CAMEL 关联确实会失败。

上述问题的解决方案确实是关于构建特定的 CAMEL 路由来处理远程方返回的 CoD 报告(以及相关的 MQ 回复)。因此,CAMEL 出站消息必须强制执行为“InOnly”ExchangePattern,因此不等待任何回复。但这会导致 CAMEL 抑制所有的回复字段。然后,如果在出站 MQ 数据报上请求 MQ CoD,则会发生 CAMEL 异常,其原因是 MQException JMSCMQ0001: WebSphere MQ call failed with compcode '2' ('MQCC_FAILED') reason '2027' ('MQRC_MISSING_REPLY_TO_Q')

CAMEL 记录了一个 URI 选项“disableReplyTo=true”,以禁用对交换模式的回复侦听,但保留 ReplyTo 字段 - 显然在入站和出站交换中。但是这个选项在出站 JMS 交换中不起作用(正如观察到的那样,我可能错了),必须改用不太直观的“preserveMessageQos”选项。

欢迎为这些问题提供优雅的解决方案。

【问题讨论】:

【参考方案1】:

我所能得到的最好的记录在下面,说明为一个 Spring XML 应用程序上下文,它本身托管 CAMEL 上下文和路由。它与 IBM 本机 MQ JCA 兼容的资源适配器 v7.5、CAMEL 2.15、Spring core 4.2 一起使用。我可以将它部署到 Glassfish 和 Weblogic 服务器。

考虑到众多变量,在实际实现中当然会使用 java DSL。这个基于 CAMEL XML DSL 的示例是独立的,易于测试。

我们从 Spring 和 Camel 声明开始:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:util="http://www.springframework.org/schema/util" 
    xmlns:context="http://www.springframework.org/schema/context" 
    xmlns:camel="http://camel.apache.org/schema/spring"
    xmlns:jee="http://www.springframework.org/schema/jee"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd   
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
        http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
        http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd">

CAMEL 上下文遵循 2 条路线:MQ 到 JMS 和 JMS 到 MQ,这里链接起来形成一个桥梁以简化测试。

<camel:camelContext id="mqBridgeCtxt">

  <camel:route id="mq2jms" autoStartup="true">

奇怪:当使用本机 MQ 资源适配器时,获得(例如)3 个侦听器的唯一方法是强制执行 3 个连接(依次使用 3 个 Camel:from 语句),每个连接最多 1 个会话,否则会出现 MQ 错误:@ 987654325@。但是,如果您改用 MQ 客户端 jar,则 CAMEL JMS 中的 concurentConsumers 选项可以正常工作。

    <camel:from uri="wmq:queue:TEST.Q1?concurrentConsumers=1&amp;disableReplyTo=true&amp;
            acknowledgementModeName=SESSION_TRANSACTED"/> 
    <camel:from uri="wmq:queue:TEST.Q1?concurrentConsumers=1&amp;disableReplyTo=true&amp;
            acknowledgementModeName=SESSION_TRANSACTED"/> 
    <camel:from uri="wmq:queue:TEST.Q1?concurrentConsumers=1&amp;disableReplyTo=true&amp;
            acknowledgementModeName=SESSION_TRANSACTED"/> 

上面的 disable disableReplyTo 选项确保 CAMEL 在我们测试 MQ 消息类型为 1=Request(-reply) 或 8=datagram(一种方式!)之前不会产生回复。此处未说明测试和回复结构。

然后我们在下一次发布到普通 JMS 时将 EIP 强制为 InOnly,以与 Inbound MQ 模式保持一致。

    <camel:setExchangePattern pattern="InOnly"/>
    <!-- camel:process ref="reference to your MQ message processing bean fits here" / -->
    <camel:to uri="ref:innerQueue" />
  </camel:route>

接下来是 jms-to-MQ 路由:

  <camel:route id="jms2mq"  autoStartup="true">
    <camel:from uri="ref:innerQueue" />
    <!-- remove inner message headers and properties to test without inbound side effects! -->
    <camel:removeHeaders pattern="*"/> 
    <camel:removeProperties pattern="*" />
    <!-- camel:process ref="reference to your MQ message preparation bean fits here" / -->

现在是远程目标返回的 MQ CoD 报告的请求标志。我们还强制 MQ 消息为数据报类型(值 8)。

    <camel:setHeader headerName="JMS_IBM_Report_COD"><camel:simple resultType="java.lang.Integer">2048</camel:simple></camel:setHeader>
    <camel:setHeader headerName="JMS_IBM_Report_Pass_Correl_ID"><camel:simple resultType="java.lang.Integer">64</camel:simple></camel:setHeader>
    <camel:setHeader headerName="JMS_IBM_MsgType"><camel:simple resultType="java.lang.Integer">8</camel:simple></camel:setHeader>

ReplyTo 队列可以通过 ReplyTo uri 选项指定,也可以作为下面的标头指定。

接下来我们使用 CamelJmsDestinationName 标头来强制抑制 JMS MQ 消息标头 MQRFH2(使用 targetClient MQ URL 选项值 1)。换句话说,我们想发送一个普通的 MQ 二进制消息(即只有 MQMD 消息描述符后跟有效负载)。

    <camel:setHeader headerName="JMSReplyTo"><camel:constant>TEST.REPLYTOQ</camel:constant></camel:setHeader>
    <camel:setHeader headerName="CamelJmsDestinationName"><camel:constant>queue://MYQMGR/TEST.Q2?targetClient=1</camel:constant></camel:setHeader>

更多 MQMD 字段可以通过reserved JMS properties 控制,如下图所示。请参阅 IBM 文档中的 restrictions。

    <camel:setHeader headerName="JMS_IBM_Format"><camel:constant>MQSTR   </camel:constant></camel:setHeader>
    <camel:setHeader headerName="JMSCorrelationID"><camel:constant>_PLACEHOLDER_24_CHARS_ID_</camel:constant></camel:setHeader>

URI 中的目标队列被上面的 CamelJmsDestinationName 覆盖,因此 URI 中的队列名称成为占位符。

URI 选项 preserveMessageQos 是一个选项 - 正如观察到的那样 - 允许发送设置了回复数据的消息(以获取 MQ CoD 报告),但阻止 CAMEL 实例化回复消息侦听器通过执行 InOnly MEP。

    <camel:to uri="wmq:queue:PLACEHOLDER.Q.NAME?concurrentConsumers=1&amp;
                exchangePattern=InOnly&amp;preserveMessageQos=true&amp;
                includeSentJMSMessageID=true" />
  </camel:route>
</camel:camelContext>

以下内容必须根据您的上下文进行调整。它为本地 JMS 提供者和 Websphere MQ(通过本地 IBM WMQ JCA 资源适配器)提供队列工厂。我们在这里使用 JNDI 查找管理对象。

<camel:endpoint id="innerQueue" uri="jmsloc:queue:transitQueue">
</camel:endpoint>

<jee:jndi-lookup id="mqQCFBean" jndi-name="jms/MYQMGR_QCF"/>
<jee:jndi-lookup id="jmsraQCFBean" jndi-name="jms/jmsra_QCF"/>

<bean id="jmsloc" class="org.apache.camel.component.jms.JmsComponent">
  <property name="connectionFactory" ref="jmsraQCFBean" />
</bean>

<bean id="wmq" class="org.apache.camel.component.jms.JmsComponent">
  <property name="connectionFactory" ref="mqQCFBean" />
</bean>

</beans>

或者,如果您使用 MQ 客户端 jars 而不是资源适配器,您将声明连接工厂 bean,如(代替上面的 JNDI 查找):

<bean id="mqCFBean" class="com.ibm.mq.jms.MQXAConnectionFactory">
    <property name="hostName" value="$mqHost"/>
    <property name="port" value="$mqPort"/>
    <property name="queueManager" value="$mqQueueManager"/>
    <property name="channel" value="$mqChannel"/>
    <property name="transportType" value="1"/> <!-- This parameter is fixed and compulsory to work with pure MQI java libraries -->
    <property name="appName" value="$connectionName"/>
</bean>

<bean id="wmq" class="org.apache.camel.component.jms.JmsComponent">
    <property name="connectionFactory" ref="mqCFBean"/>
    <property name="transacted" value="true"/>
    <property name="acknowledgementModeName" value="AUTO_ACKNOWLEDGE"/>
</bean>

欢迎提出意见和改进。

【讨论】:

以上是关于使用 CoD over Camel JMS 组件实现本机 websphere MQ的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Apache Camel 从 Java 类访问 JMS 队列?

Camel 和 JMS 以正确的顺序从高级队列中消费消息

如何在 Apache Camel 中检测损坏/恢复的 JMS 连接?

路由之间的JMS提交 - 我想在Camel中的路由之间提交事务

Camel JMS 确保在从死信通道取消边缘时进行排序

Apache Camel JMS - 异常未通过请求/回复返回给调用者