JMS,java消息服务之mQ

Posted hoge66的专栏

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JMS,java消息服务之mQ相关的知识,希望对你有一定的参考价值。

结合实地代码的MQ应用:

1、配置

spring-context.xml中的配置:

spring-jms.xml 配置:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
	xmlns:mvc="http://www.springframework.org/schema/mvc"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
		http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd">

	<description>spring-mq CHO3.0三网</description>
	
	<!-- JMS连接工厂_X网 -->
	<bean id="jmsConnectionFactoryByCHP30" class="org.apache.activemq.pool.PooledConnectionFactory"
		destroy-method="stop">
		<property name="connectionFactory">
			<bean class="org.apache.activemq.ActiveMQConnectionFactory">
				<property name="brokerURL" value="${jms.brokerURL}" />
				<property name="closeTimeout" value="${jms.closeTimeout}" />
				<property name="userName" value="${jms.userName}" />
				<property name="password" value="${jms.password}" />
				<property name="optimizedAckScheduledAckInterval" value="${jms.optimizedAckScheduledAckInterval}" />
			</bean>
		</property>
	</bean>

	<!-- JMS连接工厂_CRM系统 -->
	<bean id="jmsConnectionFactoryByMCRM" class="org.apache.activemq.pool.PooledConnectionFactory"
		destroy-method="stop">
		<property name="connectionFactory">
			<bean class="org.apache.activemq.ActiveMQConnectionFactory">
				<property name="brokerURL" value="${jms.brokerURL.MCRM}" />
				<property name="closeTimeout" value="${jms.closeTimeout.MCRM}" />
				<property name="userName" value="${jms.userName.MCRM}" />
				<property name="password" value="${jms.password.MCRM}" />
				<property name="optimizedAckScheduledAckInterval" value="${jms.optimizedAckScheduledAckInterval.MCRM}" />
			</bean>
		</property>
	</bean>
</beans>

 spring-jms-triple.xml 配置:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
	xmlns:mvc="http://www.springframework.org/schema/mvc"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
		http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd">

	<description>spring-mq CHO3.0三网通</description>

	<!-- ==================== A010 (发送) : 组织机构信息同步 ==================== -->
	<!-- 消息定义 :CPO3.0三网通 -->
    <bean id="OrgInfo" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="XH.DJR.Queue.A010.OrgInfo,XH.JX.Queue.A010.OrgInfo,XH.CX.Queue.A010.OrgInfo"/>
    </bean>
    <!-- 消息模板 :CPO3.0三网通 -->
	<bean id="jmsTemplate_OrgInfoSync" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="jmsConnectionFactoryByCPO30" />
		<property name="defaultDestination" ref="OrgInfo" />
		<property name="pubSubDomain" value="true" />
		<property name="receiveTimeout" value="${jms.receiveTimeout}" />
		<property name="explicitQosEnabled" value="true"/>
		<!-- 发送模式(2:持久) -->
		<property name="deliveryMode" value="2"/>
	</bean>
    
    <!-- ==================== A011 (发送) : 理财经理信息同步 ==================== -->
    <!-- 消息定义 :CPO3.0三网通(大金X/X信) -->
    <bean id="LeadManagerInfo" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="XH.DJR.Queue.A011.LeadManagerInfo,XH.JX.Queue.A011.LeadManagerInfo"/>
    </bean>
    <!-- 消息定义 :CPO3.0三网通(创新) -->
    <bean id="LeadManagerInfoByCX" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="XH.CX.Queue.A011.LeadManagerInfo"/>
    </bean>
    <!-- 消息模板 :CPO3.0三网通 -->
	<bean id="jmsTemplate_LeadManagerInfoSync" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="jmsConnectionFactoryByCPO30" />
		<property name="defaultDestination" ref="LeadManagerInfo" />
		<property name="pubSubDomain" value="true" />
		<property name="receiveTimeout" value="${jms.receiveTimeout}" />
		<property name="explicitQosEnabled" value="true"/>
		<property name="deliveryMode" value="2"/>
	</bean>
	<!-- 消息模板 :CPO3.0三网通(创新) -->
	<bean id="jmsTemplate_LeadManagerInfoSyncByCX" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="jmsConnectionFactoryByCPO30" />
		<property name="defaultDestination" ref="LeadManagerInfoByCX" />
		<property name="pubSubDomain" value="true" />
		<property name="receiveTimeout" value="${jms.receiveTimeout}" />
		<property name="explicitQosEnabled" value="true"/>
		<property name="deliveryMode" value="2"/>
	</bean>
    
	<!-- ==================== A012 (接收) : 新增客户信息 ==================== -->
	<!-- 消息定义 :CPO3.0三网通 -->
    <bean id="NewCustomer" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="XH.CPO.Queue.A012.NewCustomer"/>
    </bean>
	<!-- 消息监听模板 :CPO3.0三网通 -->
    <bean id="listenerContainer_NewCustomer"
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
        <property name="connectionFactory" ref="jmsConnectionFactoryByCPO30" />
        <property name="receiveTimeout" value="${jms.receiveTimeout}"/>
        <property name="destination" ref="NewCustomer" />
        <property name="messageListener" ref="triple_NewCustomerListenerService" />
    </bean>

	<!-- ==================== A013 (接收) : 客户首单 ==================== -->
	<!-- 消息定义 :CPO3.0三网通 -->
    <bean id="FirstOrder" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="XH.CPO.Queue.A013.FirstOrder"/>
    </bean>
	<!-- 消息监听模板 :CPO3.0三网通 -->
	<bean id="listenerContainer_FirstOrder"
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
        <property name="connectionFactory" ref="jmsConnectionFactoryByCPO30" />
        <property name="receiveTimeout" value="${jms.receiveTimeout}"/>
        <property name="destination" ref="FirstOrder" />
        <property name="messageListener" ref="triple_FirstOrderListenerService" />
    </bean>
	
	<!-- ==================== A014 (发送) : 客户理财经理变更 ==================== -->
	<!-- 消息定义 :CPO3.0三网通 -->
	<bean id="ChangeCustomerEmp" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="XH.DJR.Queue.A014.ChangeCustomerEmp,XH.JX.Queue.A014.ChangeCustomerEmp,XH.CX.Queue.A014.ChangeCustomerEmp,XH.ZCJ.Queue.A014.ChangeCustomerEmp"/>
    </bean>
	<!-- 消息定义 :MCRM系统 -->
	<bean id="ChangeCustomerEmpByMCRM" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="XH.Topic.A014.ChangeCustomerEmp"/>
    </bean>
	<!-- 消息模板 :CPO3.0三网通 -->
	<bean id="jmsTemplate_ChangeCustomerEmp" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="jmsConnectionFactoryByCPO30" />
		<property name="defaultDestination" ref="ChangeCustomerEmp" />
		<property name="pubSubDomain" value="true" />
		<property name="receiveTimeout" value="${jms.receiveTimeout}" />
		<property name="explicitQosEnabled" value="true"/>
		<property name="deliveryMode" value="2"/>
	</bean>
	<!-- 消息模板 :MCRM系统 -->
	<bean id="jmsTemplate_ChangeCustomerEmpByMCRM" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="jmsConnectionFactoryByMCRM" />
		<property name="defaultDestination" ref="ChangeCustomerEmpByMCRM" />
		<property name="pubSubDomain" value="true" />
		<property name="receiveTimeout" value="${jms.receiveTimeout.MCRM}" />
		<property name="explicitQosEnabled" value="true"/>
		<property name="deliveryMode" value="2"/>
	</bean>
	
	<!-- ==================== A015 (接收) : 客户手机号变更 ==================== -->
	<!-- 消息定义 :CPO3.0三网通 -->
	<bean id="ChangePhone" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="XH.CPO.Queue.A015.ChangePhone"/>
    </bean>
	<!-- 消息模板 :CPO3.0三网通 --> 
	<bean id="listenerContainer_ChangePhone"
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
        <property name="connectionFactory" ref="jmsConnectionFactoryByCPO30" />
        <property name="receiveTimeout" value="${jms.receiveTimeout}"/>
        <property name="destination" ref="ChangePhone" />
        <property name="messageListener" ref="triple_ChangePhoneListenerService" />
    </bean>
	
	<!-- ==================== A016 (发送) : 用户角色信息同步 ==================== -->
	<!-- 消息定义 :CPO3.0三网通 -->
	<bean id="RoleInfoSync" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="XH.CX.Queue.A016.RoleInfo"/>
    </bean>
	<!-- 消息模板 :CPO3.0三网通 -->
	<bean id="jmsTemplate_RoleInfoSync" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="jmsConnectionFactoryByCPO30" />
		<property name="defaultDestination" ref="RoleInfoSync" />
		<property name="pubSubDomain" value="true" />
		<property name="receiveTimeout" value="${jms.receiveTimeout}" />
		<property name="explicitQosEnabled" value="true"/>
		<property name="deliveryMode" value="2"/>
	</bean>

	

    <!-- ==================== A030 (CPO3.0三网通-发送 /监听) : 消息回执 ==================== -->
    <!-- 定义消息(接收) : 消息回馈 -->
    <bean id="CallBack" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="XH.CPO.Queue.A030.CallBack"/>
    </bean>
    
	<!-- 定义消息(发送) : 消息回馈(大金X) -->
    <bean id="DJR_CallBack" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="XH.DJR.Queue.A030.CallBack"/>
    </bean>
    <!-- 定义消息(发送) : 消息回馈(资产家) -->
    <bean id="ZCJ_CallBack" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="XH.ZCJ.Queue.A030.CallBack"/>
    </bean>
    <!-- 定义消息(发送) : 消息回馈(X信) -->
    <bean id="JX_CallBack" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="XH.JX.Queue.A030.CallBack"/>
    </bean>
    <!-- 定义消息(发送) : 消息回馈(创新) -->
    <bean id="CX_CallBack" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="XH.CX.Queue.A030.CallBack"/>
    </bean>
	<!-- 消息监听容器(接收) : 回执接收 -->
    <bean id="listenerContainer_CallBack"
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
        <property name="connectionFactory" ref="jmsConnectionFactoryByCPO30" />
        <property name="receiveTimeout" value="${jms.receiveTimeout}"/>
        <property name="destination" ref="CallBack" />
        <property name="messageListener" ref="triple_CallBackListenerService" />
    </bean>
</beans>

 上面的bean id="DJR_CallBack"  被注入到代码中:

    /** 消息目的地:大金X. */
    @Autowired
    @Qualifier("DJR_CallBack")
    private Destination destinationByDJR;

下面代码的例子是接收MQ消息的:

package com.creditharmony.adapter.service.thirdparty;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.listener.SessionAwareMessageListener;
import org.springframework.stereotype.Service;

import com.alibaba.fastjson.JSON;
import com.creditharmony.adapter.constant.ClientPoint;
import com.creditharmony.adapter.constant.ClientType;
import com.creditharmony.adapter.constant.Constant;
import com.creditharmony.adapter.constant.MsgKey;
import com.creditharmony.adapter.core.client.ClientServicePoxy;
import com.creditharmony.adapter.core.service.IParamRecord;
import com.creditharmony.adapter.exception.AdapterException;
import com.creditharmony.adapter.model.thirdparty.TripleChangePhoneInfoJsonModel;
import com.creditharmony.adapter.service.triple.bean.TripleChangePhoneInBean;
import com.creditharmony.adapter.service.triple.bean.TripleChangePhoneOutBean;
import com.creditharmony.adapter.utils.AdapterUtils;
import com.creditharmony.adapter.utils.Messages;
import com.creditharmony.adapter.utils.TripleUtils;
import com.creditharmony.common.util.PropertyUtil;

/**
 * 客户手机号变更 接口.<br>
 * ---消息接收---<br>
 * 各系统中手机号变更后将手机变更信息(旧手机号、新手机号、理财经理)推送到CPO3.0<br>
 * 接收方式 : JMS Queue 消息接收方<br>
 * 发送通道:<br>
 *      大金X -> CPO3.0<br>
 *      X信     -> CPO3.0<br>
 *      CPO -> 创新<br>
 *
 */
@Service
public class Triple_ChangePhoneListenerService implements SessionAwareMessageListener<TextMessage> {
    
    /** log日志 */
    private static final Logger logger = LoggerFactory.getLogger(Triple_ChangePhoneListenerService.class);

    /** 业务处理名. */
    private static String procName = PropertyUtil.getStrValue(MsgKey.LOG_PROP_FILE, MsgKey.TRIPLE_CHANGE_PHONE_TITLE, "三网合一(客户手机号变更)");

    /** JMS Header : FromSys. */
    private static final String FROM_SYS = "XH_CPO";
    
    /** 参数记录处理. */
    @Autowired
    protected IParamRecord paramRecord;
    
    /** 消息目的地:大金X. */
    @Autowired
    @Qualifier("DJR_CallBack")
    private Destination destinationByDJR;
    
    /** 消息目的地:X信. */
    @Autowired
    @Qualifier("JX_CallBack")
    private Destination destinationByJX;
    
    /** 消息目的地:创新. */
    @Autowired
    @Qualifier("CX_CallBack")
    private Destination destinationByCX;
    
    /** 消息目的地:资本家. */
    @Autowired
    @Qualifier("ZCJ_CallBack")
    private Destination destinationByZCJ;
    
    /**
     * 客户手机号变更 监听接口.
     * @param msgParam 监听消息
     */
    public void onMessage(TextMessage mapMessage, Session session) {
        logger.info(Messages.get(MsgKey.INFO_START, new String[] { procName }));
        // 唯一编号
        String serialNum = AdapterUtils.getSerialNum();
        try {
            
            // 发送方ID
            String fromSys = mapMessage.getStringProperty(Constant.TRIPLE_HEADER_FROM_SYS);
            // 发送消息ID
            String msgTypeID = mapMessage.getStringProperty(Constant.TRIPLE_HEADER_MSG_TYPE_ID);
            // 发送消息名
            String msgTypeName = mapMessage.getStringProperty(Constant.TRIPLE_HEADER_MSG_TYPE_NAME);
            // 消息主体
            String jsonMsg = mapMessage.getText();
            logger.info(
                    Messages.get(
                            MsgKey.GENERALSERVICE_INPARAM,
                            new String[] { procName, jsonMsg }));
            paramRecord.doInParamRecord(
                    serialNum,
                    null,
                    jsonMsg,
                    "triple_ChangePhoneService",
                    fromSys, msgTypeID, msgTypeName);

            TripleChangePhoneInfoJsonModel jsonModel = JSON.parseObject(jsonMsg, TripleChangePhoneInfoJsonModel.class);

            ClientServicePoxy service = new ClientServicePoxy(ClientType.Triple_ChangePhone, ClientPoint.CF);
            TripleChangePhoneInBean param = new TripleChangePhoneInBean();
            
            param.setSerialNum(serialNum);
            param.setOldPhone(jsonModel.getOldPhone());
            param.setNewPhone(jsonModel.getNewPhone());
            param.setEmpCode(jsonModel.getEmpCode());
            param.setOsType(jsonModel.getOsType());
            param.setCardId(jsonModel.getCardId());
            
            // 调用业务参数日志输出
            logger.info(
                    Messages.get(
                            MsgKey.INFO_POST_MESSAGE_DEC,
                            new String[] { procName, param.getParam() }));
            paramRecord.doSendContentRecord(
                    serialNum,
                    param.getParam(),
                    Messages.get(MsgKey.MEMO_PSOT_MESSAGE_BODY));
            
            // 业务调用
            TripleChangePhoneOutBean outParam = (TripleChangePhoneOutBean) service.callService(param);
            
            // 业务参数回执日志输出
            logger.info(Messages.get(MsgKey.INFO_RET_MESSAGE, new String[] { procName, outParam.getParam() }));
            paramRecord.doReceiveContentRecord(serialNum, outParam.getParam(), null);
            
            /*
             * 消息回馈处理
             * 转为Json串
             * 该监听部分收到消息后,判定发送方,将消息序列号反馈给发送发
             */
            String postJson = TripleUtils.getReturnMessage(outParam, jsonModel.getUniqueNum());
            
            MessageProducer producer = null;
            // 根据消息发送源,设定回执方
            if ("XH_DJR".equals(fromSys)) {
                producer = session.createProducer(destinationByDJR);
            } else if ("XH_ZCJ".equals(fromSys)) {
                producer = session.createProducer(destinationByZCJ);
            } else if ("XH_JX".equals(fromSys)) {
                producer = session.createProducer(destinationByJX);
            } else {
                producer = session.createProducer(destinationByCX);
            }

            TextMessage message = session.createTextMessage();
            message.setStringProperty("FromSys", FROM_SYS);
            message.setStringProperty("MsgTypeID", msgTypeID);
            message.setStringProperty("MsgTypeName", msgTypeName);
            message.setJMSTimestamp(System.currentTimeMillis());
            message.setText(postJson);
            
            // 参数日志记录: 返回参数
            paramRecord.doOutParamRecord(serialNum, postJson);
            logger.info(Messages.get(MsgKey.GENERALSERVICE_OUTPARAM, new String[] { postJson }));
            
            producer.send(message);
            
        } catch (JMSException e) {
            // 异常错误记录
            AdapterException businessException = new AdapterException(e);
            paramRecord.doExceptionRecord(serialNum, businessException);
        }
        
        logger.info(Messages.get(MsgKey.INFO_END, new String[] { procName }));
    }
}

 重点:onMessage 方法;接收的三个参数 fromid,etc。

下面是发送消息的方法:

  /**
     * 消息发送(Queue).
     * @param jsonParam json字符串
     */
    public void sendMqMessage(final String jsonParam) {
        // 取得消息
        Destination destination = jmsTemplate_OrgInfoSync.getDefaultDestination();
        // 发送消息
        jmsTemplate_OrgInfoSync.send(destination, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                TextMessage message = session.createTextMessage();
                message.setStringProperty("FromSys", FROM_SYS);
                message.setStringProperty("MsgTypeID", MSG_TYPE_ID);
                message.setStringProperty("MsgTypeName", MSG_TYPE_NAME);
                
                message.setText(jsonParam);
                return message;
            }
        });
    }

 

@Service
public class Triple_OrgInfoSyncService extends BaseService implements IBaseService {

    /** log日志 */
    private static final Logger logger = LoggerFactory.getLogger(Triple_OrgInfoSyncService.class);
    
    /** JMS Header : FromSys. */
    private static final String FROM_SYS = "XH_CPO";
    
    /** JMS Header : MsgTypeID. */
    private static final String MSG_TYPE_ID = "A010";
    
    /** JMS Header : MsgTypeName. */
    private static final String MSG_TYPE_NAME = "OrgInfo";
    
    /** 业务处理名. */
    private static String procName = PropertyUtil.getStrValue(MsgKey.LOG_PROP_FILE, MsgKey.TRIPLE_ORG_INFO_SYNC_TITLE, "三网合一(组织机构信息同步)");

    /** 消息模板(CPO3.0三网). */
    @Autowired
    private JmsTemplate jmsTemplate_OrgInfoSync;
    
    /**
     * 组织机构信息同步.
     * @param paramInfo 组织机构信息
     * @return 同步状态
     */
    public BaseOutInfo doExec(BaseInfo paramInfo) {
        logger.info(Messages.get(MsgKey.INFO_START, new String[] { procName }));
        TripleOrgInfoSyncOutInfo outParam = new TripleOrgInfoSyncOutInfo();
        TripleOrgInfoSyncInfo param = null;
        param = (TripleOrgInfoSyncInfo) paramInfo;
        
        try {
            TripleOrgInfoSyncInfoJsonModel jsonModel = new TripleOrgInfoSyncInfoJsonModel();
            
            BeanUtils.copyProperties(param, jsonModel);
            jsonModel.setOperation(param.getOperation().getName());
            jsonModel.setStatus(param.getStatus().getName());
            jsonModel.setLastModifyTime(DateUtils.formatDate(param.getLastModifyTime(), TripleOrgInfoSyncInfo.DATE_PATTERN));
            // 将传入参数Bean转为Json串
            String postJson = JSON.toJSONString(jsonModel);
            logger.info(
                    Messages.get(
                            MsgKey.INFO_POST_MESSAGE_BODY,
                            new String[] { procName, postJson }));
            paramRecord.doSendContentRecord(
                    paramInfo.getSerialNum(),
                    postJson,
                    Messages.get(MsgKey.MEMO_PSOT_MESSAGE_BODY));
            try {
                // 发送
                this.sendMqMessage(postJson);
            } catch (Exception e) {
                throw new AdapterException(ErrorType.ERROR_EXT_NET, e);
            }
            
        } catch (Exception e) {
            logger.error(
                    Messages.get(
                            MsgKey.ERROR_SYSTEM, new String[] { paramInfo.getSerialNum(), procName }));
            // 将新产生的例外封装
            if (e instanceof AdapterException) {
                throw (AdapterException) e;
            } else {
                throw new AdapterException(e);
            }
        }
        outParam.setRetCode(ReturnConstant.SUCCESS);
        logger.info(Messages.get(MsgKey.INFO_END, new String[] { procName }));
        return outParam;
    }
    
    /**
     * 消息发送(Queue).
     * @param jsonParam json字符串
     */
    public void sendMqMessage(final String jsonParam) {
        // 取得消息
        Destination destination = jmsTemplate_OrgInfoSync.getDefaultDestination();
        // 发送消息
        jmsTemplate_OrgInfoSync.send(destination, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                TextMessage message = session.createTextMessage();
                message.setStringProperty("FromSys", FROM_SYS);
                message.setStringProperty("MsgTypeID", MSG_TYPE_ID);
                message.setStringProperty("MsgTypeName", MSG_TYPE_NAME);
                
                message.setText(jsonParam);
                return message;
            }
        });
    }
}

 比较简单。

 

以上是关于JMS,java消息服务之mQ的主要内容,如果未能解决你的问题,请参考以下文章

MQ---JMS

MQ消息队列的JMS规范和AMQP协议的区别

关于JMS和MQ

JMS 之 Active MQ 消息存储

JMS 之 Active MQ 消息存储

JMS 之 Active MQ 的消息传输