IBM MQ 集成CXF 发送JMS 消息

Posted 莫大人

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了IBM MQ 集成CXF 发送JMS 消息相关的知识,希望对你有一定的参考价值。

0.POM依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>com.yun.base</groupId>
    <artifactId>cxf</artifactId>
    <version>0.0.1-SNAPSHOT</version>
  </parent>
  <artifactId>cxf_jms_server</artifactId>
  
  <properties>
        <cxf.version>3.1.12</cxf.version>
        <spring.version>4.2.5.RELEASE</spring.version>
        <slf4j.version>1.7.7</slf4j.version>
        <logback.version>1.1.2</logback.version>
        <websphereMq.version>5.3.07</websphereMq.version>
        <!-- 本地jar目录 -->
        <wmq.jars>E:/installFile/ibmmq</wmq.jars>
    </properties>
    <dependencies>
        <!-- springmvc start -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webmvc</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <!-- springmvc end -->
        
        
        <!-- logback -->
        <dependency>
        <!--主要介绍的是这个jar包,这个包是负责logback随着项目启动的jar包-->
            <groupId>org.logback-extensions</groupId>
            <artifactId>logback-ext-spring</artifactId>
            <version>0.1.4</version>
        </dependency>
        <!-- slf4j start -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>jcl-over-slf4j</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-access</artifactId>
            <version>${logback.version}</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>${logback.version}</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>${logback.version}</version>
        </dependency>
        <!-- slf4j end -->
        
    <dependency>
            <groupId>org.apache.cxf</groupId>
            <artifactId>cxf-rt-frontend-jaxws</artifactId>
            <version>3.1.12</version>
        </dependency>
        <dependency>
            <groupId>org.apache.cxf</groupId>
            <artifactId>cxf-rt-transports-jms</artifactId>
            <version>3.1.12</version>
        </dependency>
         <dependency>
            <groupId>org.apache.cxf.xjc-utils</groupId>
            <artifactId>cxf-xjc-runtime</artifactId>
            <version>3.0.5</version>
            <exclusions>
                    <exclusion>
                        <groupId>javax.xml.bind</groupId>
                        <artifactId>jaxb-api</artifactId>
                    </exclusion>
                </exclusions>
        </dependency>
    
    <!--Websphere MQ dependencies-->

        <dependency>
            <groupId>com.ibm</groupId>
            <artifactId>com.ibm.mqjms</artifactId>
            <version>${websphereMq.version}</version>
            <scope>system</scope>
            <systemPath>${wmq.jars}/com.ibm.mqjms.jar</systemPath>
        </dependency>
        <dependency>
            <groupId>com.ibm</groupId>
            <artifactId>com.ibm.mq.headers</artifactId>
            <version>${websphereMq.version}</version>
            <scope>system</scope>
            <systemPath>${wmq.jars}/com.ibm.mq.headers.jar</systemPath>
        </dependency>

        <dependency>
            <groupId>com.ibm</groupId>
            <artifactId>com.ibm.mq.jmqi</artifactId>
            <version>${websphereMq.version}</version>
            <scope>system</scope>
            <systemPath>${wmq.jars}/com.ibm.mq.jmqi.jar</systemPath>
        </dependency>

        <dependency>
            <groupId>com.ibm</groupId>
            <artifactId>com.ibm.dhbcore</artifactId>
            <version>${websphereMq.version}</version>
            <scope>system</scope>
            <systemPath>${wmq.jars}/dhbcore.jar</systemPath>
        </dependency>

        <!--Spring-->

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>${spring.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>${spring.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.geronimo.specs</groupId>
            <artifactId>geronimo-jms_1.1_spec</artifactId>
            <version>1.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webmvc</artifactId>
            <version>${spring.version}</version>
        </dependency>

        <!--Javax Servlet API-->

        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>javax.servlet-api</artifactId>
            <version>3.1.0</version>
            <scope>provided</scope>
        </dependency>
        
    </dependencies>
  
</project>

 

1.修改wsdl 协议类型为 jms

替换 

<soap:binding style="document" transport="http://schemas.xmlsoap.org/soap/http"/>

为 

<soap:binding style="document" transport="http://cxf.apache.org/transports/jms"/>

2.根据wsdl生成服务端代码,前提安装cxf,请参看 http://www.cnblogs.com/yun965861480/p/7400552.html

wsdl2java -server -impl -encoding UTF8 -d E:\\work\\waikuai\\pom\\cxf\\cxf_jms_server\\src\\main\\java cxf\\EsbJmsServer.wsdl

3.服务端服务转发拦截器

package com.srcb.esb.interceptor;

import java.io.BufferedInputStream;
import java.io.InputStream;
import java.util.List;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamConstants;
import javax.xml.stream.XMLStreamReader;
import org.apache.cxf.Bus;
import org.apache.cxf.binding.Binding;
import org.apache.cxf.binding.soap.SoapMessage;
import org.apache.cxf.binding.soap.SoapVersion;
import org.apache.cxf.binding.soap.SoapVersionFactory;
import org.apache.cxf.bus.CXFBusFactory;
import org.apache.cxf.endpoint.Endpoint;
import org.apache.cxf.endpoint.Server;
import org.apache.cxf.endpoint.ServerRegistry;
import org.apache.cxf.interceptor.Fault;
import org.apache.cxf.interceptor.InterceptorChain;
import org.apache.cxf.interceptor.StaxInInterceptor;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.Message;
import org.apache.cxf.phase.AbstractPhaseInterceptor;
import org.apache.cxf.phase.Phase;
import org.apache.cxf.service.Service;
import org.apache.cxf.staxutils.DepthXMLStreamReader;
import org.apache.cxf.staxutils.StaxUtils;

public class MediatorInterceptor extends
        AbstractPhaseInterceptor<SoapMessage> {

    public MediatorInterceptor() {
        super(Phase.POST_STREAM);
        addBefore(StaxInInterceptor.class.getName());
    }

    public void handleMessage(SoapMessage message) throws Fault {

        /* =======================解析报文的命名空间================================= */
        String schemaNamespace = "";
        try {
            // create a buffered stream so that we get back the original stream
            // after scaning
            InputStream is = message.getContent(InputStream.class);
            BufferedInputStream pis = new BufferedInputStream(is);
            pis.mark(pis.available());
            message.setContent(InputStream.class, pis);

            String encoding = (String) message.get(Message.ENCODING);
            XMLStreamReader reader = XMLInputFactory.newInstance().createXMLStreamReader(pis, encoding);
            DepthXMLStreamReader xmlReader = new DepthXMLStreamReader(reader);
            if (xmlReader.nextTag() == XMLStreamConstants.START_ELEMENT) {
                String ns = xmlReader.getNamespaceURI();
                SoapVersion soapVersion = SoapVersionFactory.getInstance().getSoapVersion(ns);
                StaxUtils.toNextTag(xmlReader, soapVersion.getBody());
                xmlReader.nextTag();
            }
            schemaNamespace = xmlReader.getName().getNamespaceURI();
            pis.reset();
        } catch (Exception e) {
            e.printStackTrace();
        }

        /* =======================根据Namespace选取对应的Endpoint================================== */
        Bus bus = CXFBusFactory.getDefaultBus();
        ServerRegistry serverRegistry = bus.getExtension(ServerRegistry.class);
        List<Server> servers = serverRegistry.getServers();

        Endpoint ep = null;
        for (Server server : servers) {
            ep = server.getEndpoint();
            if (schemaNamespace.startsWith(ep.getEndpointInfo().getName().getNamespaceURI())) {
                break;
            } else {
                ep = null;
            }
        }
        if (ep == null) {
            return;
        }

        /* =======================设置新Endpoint================================== */
        Exchange ex = message.getExchange();
        ex.put(Endpoint.class, ep);
        ex.put(Binding.class, ep.getBinding());
        ex.put(Service.class, ep.getService());

        // set for PE\'s OperationId(see ServiceDispatcher.java)
        ex.put(javax.xml.ws.Endpoint.WSDL_SERVICE, ep.getService().getName());

        InterceptorChain chain = message.getInterceptorChain();
        chain.add(ep.getInInterceptors());
        chain.add(ep.getBinding().getInInterceptors());
        chain.add(ep.getService().getInInterceptors());
        chain.setFaultObserver(ep.getOutFaultObserver());        
    }
}

 

 

4.配置服务端

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:cxf="http://cxf.apache.org/core"
    xmlns:soap="http://cxf.apache.org/bindings/soap" xmlns:jaxws="http://cxf.apache.org/jaxws"
    xmlns:jms="http://cxf.apache.org/transports/jms" xmlns:p="http://www.springframework.org/schema/p"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://cxf.apache.org/core http://cxf.apache.org/schemas/core.xsd
                        http://cxf.apache.org/bindings/soap http://cxf.apache.org/schema/bindings/soap.xsd
                        http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd
                        http://cxf.apache.org/transports/jms http://cxf.apache.org/schemas/configuration/jms.xsd">

    <import resource="classpath:META-INF/cxf/cxf.xml" />

    <jaxws:endpoint id="esbJmsServer"
        implementor="com.midea.service.fxms.abstraction.atomic.technology_esbjmsserver.v1.EsbJmsServerPortImpl"
        address="jms://">
                <jaxws:properties>
                        <entry key="org.apache.cxf.message.Message.ENCODING" value="UTF-8" />
                </jaxws:properties>
                <jaxws:features>
                        <bean class="org.apache.cxf.feature.LoggingFeature" />
                       
                        <bean class="org.apache.cxf.transport.jms.JMSConfigFeature">
                               <property name="jmsConfig">
                                    <ref bean="esbJmsServerJmsConfig" />
                                </property>
                        </bean>
                </jaxws:features>
                 <jaxws:inInterceptors>
                        <bean class="com.srcb.esb.interceptor.MediatorInterceptor"></bean>
                </jaxws:inInterceptors> 
        </jaxws:endpoint>

    <bean id="esbJmsServerJmsConfig" class="org.apache.cxf.transport.jms.JMSConfiguration"
        p:connectionFactory-ref="jmsConnectionFactory"    p:targetDestination="LOCALQ.P.FXMS.REQ"
             />
             
    <bean id="jmsConnectionFactory"
        class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory">
            <bean class="com.ibm.mq.jms.MQConnectionFactory">
                <property name="channel" value="SYSTEM.DEF.SVRCONN"></property>
                <property name="connectionNameList" value="10.16.24.180(11101)"></property>
                <property name="CCSID" value="1381"></property>
                <property name="transportType" value="1"></property>
            </bean>
        </property>
    </bean>
    
    <jaxws:endpoint id="esbJmsServer1"
        implementor="com.midea.service.fxms.abstraction.atomic.technology_esbjmsserver.v1.EsbJmsServerPortImpl"
        address="jms://">
                <jaxws:properties>
                        <entry key="org.apache.cxf.message.Message.ENCODING" value="UTF-8" />
                </jaxws:properties>
                <jaxws:features>
                        <bean class="org.apache.cxf.feature.LoggingFeature" />
                       
                        <bean class="org.apache.cxf.transport.jms.JMSConfigFeature">
                               <property name="jmsConfig">
                                    <ref bean="esbJmsServerJmsConfig1" />
                                </property>
                        </bean>
                </jaxws:features>
                 <jaxws:inInterceptors>
                        <bean class="com.srcb.esb.interceptor.MediatorInterceptor"></bean>
                </jaxws:inInterceptors> 
        </jaxws:endpoint>

    <bean id="esbJmsServerJmsConfig1" class="org.apache.cxf.transport.jms.JMSConfiguration"
        p:connectionFactory-ref="jmsConnectionFactory1"    p

以上是关于IBM MQ 集成CXF 发送JMS 消息的主要内容,如果未能解决你的问题,请参考以下文章

IBM MQ消息中间件jms消息中RHF2消息头的处理

spring 集成 IBM MQ 启动报错, Uncategorized exception occured during JMS processing;

如何通过 IBM MQ 使用 JMS 点对点消息传递模型来测量 SuT 的响应(处理)时间?

JMS 的 Spring 配置(Websphere MQ - SSL、Tomcat、JNDI、非 IBM JRE)

IBM WebSphere MQ安装集成

IBM WebSphere MQ安装集成