ActiveMQ之spring集成消息转换器MessageConverter
Posted Alamps 沁园春
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ActiveMQ之spring集成消息转换器MessageConverter相关的知识,希望对你有一定的参考价值。
MessageConverter的作用主要有两方面,一方面它可以把我们的非标准化Message对象转换成我们的目标Message对象,这主要是用在发送消息的时候;另一方面它又可以把我们的Message对象转换成对应的目标对象,这主要是用在接收消息的时候。
下面我们就拿发送一个对象消息来举例, 传输USer对象:
package com.xinwei.order.entity; import java.io.Serializable; import java.util.Date; public class User implements Serializable{ private static final long serialVersionUID = 1L; private String id; private String name; private int age; private Date birthDate; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public Date getBirthDate() { return birthDate; } public void setBirthDate(Date birthDate) { this.birthDate = birthDate; } }
定义转换器
import java.io.Serializable; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.ObjectMessage; import javax.jms.Session; import org.springframework.jms.support.converter.MessageConversionException; import org.springframework.jms.support.converter.MessageConverter; /** * spring 消息转换器 * @author slimina * */ public class UserMessageConverter implements MessageConverter { public Message toMessage(Object object, Session session) throws JMSException, MessageConversionException { return session.createObjectMessage((Serializable) object); } public Object fromMessage(Message message) throws JMSException, MessageConversionException { ObjectMessage objMessage = (ObjectMessage) message; return objMessage.getObject(); } }
消息接收
import com.google.gson.Gson; public class UserMessageListener { public void receiveMessage(User user) { System.out.println(new Gson().toJson(user)); } public void receiveMessage(String message) { System.out.println("接收到一个纯文本消息,消息内容是:" + message); } }
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xmlns:jee="http://www.springframework.org/schema/jee" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:util="http://www.springframework.org/schema/util" xmlns:lang="http://www.springframework.org/schema/lang" xsi:schemaLocation=" http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-3.0.xsd"> <!-- 配置JMS连接工厂 --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- Session缓存数量 --> <property name="sessionCacheSize" value="10" /> <property name="targetConnectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <!-- MQ地址 --> <property name="brokerURL" value="tcp://192.168.233.128:61616" /> <property name="trustAllPackages" value="true"/> <!-- 是否异步发送 --> <property name="useAsyncSend" value="false" /> </bean> </property> </bean> <!-- 定义队列 --> <bean id="testQueue" class="org.apache.activemq.command.ActiveMQQueue"> <property name="physicalName" value="spring.queue.converte" /> </bean> <!-- jmsTemplate,用于向任意地址发送消息 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> <property name="defaultDestination" ref="testQueue" /> <property name="sessionTransacted" value="false" /> <!-- receiveTimeout表示接收消息时的超时时间 --> <property name="receiveTimeout" value="30000" /> <!-- 消息转换器 --> <property name="messageConverter" ref="userMessageConverter"/> </bean> <!-- 类型转换器 --> <bean id="userMessageConverter" class="com.xinwei.order.controller.UserMessageConverter"/> </beans>
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xmlns:jee="http://www.springframework.org/schema/jee" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:util="http://www.springframework.org/schema/util" xmlns:lang="http://www.springframework.org/schema/lang" xsi:schemaLocation=" http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-3.0.xsd"> <!-- 配置JMS连接工厂 --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- Session缓存数量 --> <property name="sessionCacheSize" value="10" /> <!-- 接收者ID 持久化订阅 --> <property name="targetConnectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <!-- MQ地址 --> <property name="brokerURL" value="tcp://192.168.233.128:61616" /> <property name="trustAllPackages" value="true"/> </bean> </property> </bean> <!-- <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration"> <property name="connectionFactory" ref="connectionFactory"/> </bean> <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> <property name="configuration" ref="jmsConfig"/> </bean> --> <!-- 定义队列 --> <bean id="testQueue" class="org.apache.activemq.command.ActiveMQQueue"> <property name="physicalName" value="spring.queue.converte" /> </bean> <!-- 异步接收Queue消息Container --> <bean id="queueContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="testQueue" /> <!-- 使用MessageListenerAdapter来作为消息监听器 --> <property name="messageListener" ref="messageListenerAdapter" /> <property name="receiveTimeout" value="30000" /> </bean> <!-- 消息监听适配器 --> <bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <property name="delegate" ref="userMessageListener" /> <property name="defaultListenerMethod" value="receiveMessage" /> <property name="messageConverter" ref="userMessageConverter" /> </bean> <bean id="userMessageListener" class="com.xinwei.order.controller.UserMessageListener"></bean> <!-- 类型转换器 --> <bean id="userMessageConverter" class="com.xinwei.order.controller.UserMessageConverter" /> </beans>
package com.xinwei.util.quartz; import java.util.Date; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.jms.core.JmsTemplate; import com.xinwei.order.entity.User; @SuppressWarnings("resource") public class ProducerMain { public static void main(String[] args) { ApplicationContext context = new ClassPathXmlApplicationContext("classpath:msgconverter/producer.xml"); JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class); User user = new User(); user.setId("1001"); user.setName("jack"); user.setAge(22); user.setBirthDate(new Date()); jmsTemplate.convertAndSend(user); } } ---- package com.xinwei.util.quartz; import org.springframework.context.support.ClassPathXmlApplicationContext; public class ConsumerMain { @SuppressWarnings("resource") public static void main(String[] args) { new ClassPathXmlApplicationContext("classpath:msgconverter/consumer.xml"); } } ---- SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/E:/maven_repository/org/slf4j/slf4j-log4j12/1.7.6/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/E:/maven_repository/org/apache/activemq/activemq-all/5.13.2/activemq-all-5.13.2.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. 2017-12-04 09:51:54,246 [main] INFO [org.springframework.context.support.ClassPathXmlApplicationContext] - Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@11cf437c: startup date [Mon Dec 04 09:51:54 CST 2017]; root of context hierarchy 2017-12-04 09:51:54,285 [main] INFO [org.springframework.beans.factory.xml.XmlBeanDefinitionReader] - Loading XML bean definitions from class path resource [msgconverter/consumer.xml] 2017-12-04 09:51:54,790 [main] INFO [org.springframework.context.support.DefaultLifecycleProcessor] - Starting beans in phase 2147483647 2017-12-04 09:51:54,973 [main] INFO [org.springframework.jms.connection.CachingConnectionFactory] - Established shared JMS Connection: ActiveMQConnection {id=ID:fangping-54864-1512352314851-1:1,clientId=null,started=false} {"id":"1001","name":"jack","age":22,"birthDate":"Dec 4, 2017 9:51:44 AM"}
org.apache.activemq.command.ActiveMQObjectMessage.getObject(ActiveMQObjectMessage.java:206)
解决方法: http://activemq.apache.org/objectmessage.html
以上是关于ActiveMQ之spring集成消息转换器MessageConverter的主要内容,如果未能解决你的问题,请参考以下文章
ActiveMQ(07):ActiveMQ结合Spring开发--建议