ActiveMQ之spring集成消息转换器MessageConverter

Posted Alamps 沁园春

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ActiveMQ之spring集成消息转换器MessageConverter相关的知识,希望对你有一定的参考价值。


MessageConverter的作用主要有两方面,一方面它可以把我们的非标准化Message对象转换成我们的目标Message对象,这主要是用在发送消息的时候;另一方面它又可以把我们的Message对象转换成对应的目标对象,这主要是用在接收消息的时候。


下面我们就拿发送一个对象消息来举例, 传输USer对象:


package com.xinwei.order.entity; import java.io.Serializable; import java.util.Date;
public class User implements Serializable{ private static final long serialVersionUID = 1L; private String id; private String name; private int age; private Date birthDate; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public Date getBirthDate() { return birthDate; } public void setBirthDate(Date birthDate) { this.birthDate = birthDate; } }

定义转换器

import java.io.Serializable;  
  
import javax.jms.JMSException;  
import javax.jms.Message;  
import javax.jms.ObjectMessage;  
import javax.jms.Session;  
  
import org.springframework.jms.support.converter.MessageConversionException;  
import org.springframework.jms.support.converter.MessageConverter;  
  
/** 
 * spring 消息转换器 
 * @author slimina 
 * 
 */  
public class UserMessageConverter implements MessageConverter {  
  
    public Message toMessage(Object object, Session session)    
            throws JMSException, MessageConversionException {    
        return session.createObjectMessage((Serializable) object);    
    }    
     
    public Object fromMessage(Message message) throws JMSException,    
            MessageConversionException {    
        ObjectMessage objMessage = (ObjectMessage) message;    
        return objMessage.getObject();    
    }  
}  

消息接收

import com.google.gson.Gson;  
  
public class UserMessageListener {  
  
    public void receiveMessage(User user) {  
        System.out.println(new Gson().toJson(user));  
    }  
      
     public void receiveMessage(String message) {    
            System.out.println("接收到一个纯文本消息,消息内容是:" + message);    
     }    
}  

 

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"  
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"  
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context"  
    xmlns:jee="http://www.springframework.org/schema/jee" xmlns:jms="http://www.springframework.org/schema/jms"  
    xmlns:tx="http://www.springframework.org/schema/tx" xmlns:util="http://www.springframework.org/schema/util"  
    xmlns:lang="http://www.springframework.org/schema/lang"  
    xsi:schemaLocation="  
            http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd  
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd  
            http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd  
            http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd  
            http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd  
            http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd  
            http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-3.0.xsd">  
  
     <!-- 配置JMS连接工厂 -->    
    <bean id="connectionFactory"    
        class="org.springframework.jms.connection.CachingConnectionFactory">    
        <!-- Session缓存数量 -->    
        <property name="sessionCacheSize" value="10" />    
        <property name="targetConnectionFactory">    
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">    
                <!-- MQ地址 -->    
                <property name="brokerURL" value="tcp://192.168.233.128:61616" />    
                  <property name="trustAllPackages" value="true"/>
                 <!-- 是否异步发送 -->    
                <property name="useAsyncSend" value="false" />    
            </bean>    
        </property>    
    </bean>   
  
    <!-- 定义队列 -->  
    <bean id="testQueue" class="org.apache.activemq.command.ActiveMQQueue">  
        <property name="physicalName" value="spring.queue.converte" />  
    </bean>  
      
    <!-- jmsTemplate,用于向任意地址发送消息 -->  
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
        <property name="connectionFactory" ref="connectionFactory" />  
        <property name="defaultDestination" ref="testQueue" />  
        <property name="sessionTransacted" value="false" />  
        <!-- receiveTimeout表示接收消息时的超时时间 -->  
        <property name="receiveTimeout" value="30000" />  
        <!-- 消息转换器 -->    
        <property name="messageConverter" ref="userMessageConverter"/>    
    </bean>  
       <!-- 类型转换器 -->    
    <bean id="userMessageConverter" class="com.xinwei.order.controller.UserMessageConverter"/>    
</beans>  

 

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"  
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"  
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context"  
    xmlns:jee="http://www.springframework.org/schema/jee" xmlns:jms="http://www.springframework.org/schema/jms"  
    xmlns:tx="http://www.springframework.org/schema/tx" xmlns:util="http://www.springframework.org/schema/util"  
    xmlns:lang="http://www.springframework.org/schema/lang"  
    xsi:schemaLocation="  
            http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd  
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd  
            http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd  
            http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd  
            http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd  
            http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd  
            http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-3.0.xsd">  
  
    <!-- 配置JMS连接工厂 -->  
    <bean id="connectionFactory"  
        class="org.springframework.jms.connection.CachingConnectionFactory">  
        <!-- Session缓存数量 -->  
        <property name="sessionCacheSize" value="10" />  
        <!-- 接收者ID 持久化订阅 -->  
        <property name="targetConnectionFactory">  
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">  
                <!-- MQ地址 -->  
                <property name="brokerURL" value="tcp://192.168.233.128:61616" />  
                 <property name="trustAllPackages" value="true"/>
            </bean>  
        </property>  
    </bean>  
    
<!--     <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
    <property name="connectionFactory" ref="connectionFactory"/>
</bean>
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="configuration" ref="jmsConfig"/>
</bean> -->
  
    <!-- 定义队列 -->  
    <bean id="testQueue" class="org.apache.activemq.command.ActiveMQQueue">  
        <property name="physicalName" value="spring.queue.converte" />  
    </bean>  
  
    <!-- 异步接收Queue消息Container -->  
    <bean id="queueContainer"  
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
        <property name="connectionFactory" ref="connectionFactory" />  
        <property name="destination" ref="testQueue" />  
        <!-- 使用MessageListenerAdapter来作为消息监听器 -->  
        <property name="messageListener" ref="messageListenerAdapter" />  
        <property name="receiveTimeout" value="30000" />  
    </bean>  
  
    <!-- 消息监听适配器 -->  
    <bean id="messageListenerAdapter"  
        class="org.springframework.jms.listener.adapter.MessageListenerAdapter">  
        <property name="delegate" ref="userMessageListener" />  
        <property name="defaultListenerMethod" value="receiveMessage" />  
        <property name="messageConverter" ref="userMessageConverter" />  
    </bean>  
  
    <bean id="userMessageListener"  
        class="com.xinwei.order.controller.UserMessageListener"></bean>  
    <!-- 类型转换器 -->  
    <bean id="userMessageConverter"  
        class="com.xinwei.order.controller.UserMessageConverter" />  
</beans>  

 

package com.xinwei.util.quartz;

import java.util.Date;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;

import com.xinwei.order.entity.User;
  
@SuppressWarnings("resource")  
public class ProducerMain {  
  
    public static void main(String[] args) {  
        ApplicationContext context = new ClassPathXmlApplicationContext("classpath:msgconverter/producer.xml");  
        JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class);  
        User user = new User();  
        user.setId("1001");  
        user.setName("jack");  
        user.setAge(22);  
        user.setBirthDate(new Date());  
        jmsTemplate.convertAndSend(user);  
    }  
}  


----
package com.xinwei.util.quartz;

import org.springframework.context.support.ClassPathXmlApplicationContext;  

public class ConsumerMain {  
    @SuppressWarnings("resource")  
    public static void main(String[] args) {  
          new ClassPathXmlApplicationContext("classpath:msgconverter/consumer.xml");  
    }  
}  


----

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/E:/maven_repository/org/slf4j/slf4j-log4j12/1.7.6/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/E:/maven_repository/org/apache/activemq/activemq-all/5.13.2/activemq-all-5.13.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
2017-12-04 09:51:54,246 [main] INFO  [org.springframework.context.support.ClassPathXmlApplicationContext] - Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@11cf437c: startup date [Mon Dec 04 09:51:54 CST 2017]; root of context hierarchy
2017-12-04 09:51:54,285 [main] INFO  [org.springframework.beans.factory.xml.XmlBeanDefinitionReader] - Loading XML bean definitions from class path resource [msgconverter/consumer.xml]
2017-12-04 09:51:54,790 [main] INFO  [org.springframework.context.support.DefaultLifecycleProcessor] - Starting beans in phase 2147483647
2017-12-04 09:51:54,973 [main] INFO  [org.springframework.jms.connection.CachingConnectionFactory] - Established shared JMS Connection: ActiveMQConnection {id=ID:fangping-54864-1512352314851-1:1,clientId=null,started=false}
{"id":"1001","name":"jack","age":22,"birthDate":"Dec 4, 2017 9:51:44 AM"}
 org.apache.activemq.command.ActiveMQObjectMessage.getObject(ActiveMQObjectMessage.java:206)
解决方法: http://activemq.apache.org/objectmessage.html


 



 

以上是关于ActiveMQ之spring集成消息转换器MessageConverter的主要内容,如果未能解决你的问题,请参考以下文章

ActiveMQ(07):ActiveMQ结合Spring开发--建议

ActiveMQ

Spring Boot学习笔记——Spring Boot与ActiveMQ的集成

ActiveMQ之整合spring

MQ 之 ActiveMQ

Spring boot 集成ActiveMQ(包含双向队列实现)