spring-消息
Posted 嗜血蚂蚁
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spring-消息相关的知识,希望对你有一定的参考价值。
1、异步消息
当一个消息发送时候,消息会被交给消息代理,消息代理可以确保消息被发送到指定的目的地,同时解放发送者,使其能够继续进行其它业务。消息代理通常有ActiveMQ、RabbitMQ...,目的地通常有队列和主题,队列采用点对点的模型,主题采用发布订阅模型
- 点对点模型:消息队列可以有多个接受者,但每条消息只能被一个接收者取走
- 发布订阅模型:消息队列可以有多个订阅者,每条消息可以发送给多个主题订阅者
2、JMS发送/接收消息
1)activemq配置,使用ActiveMQ,并使用了JMSTemplate。
JMS模板为开发者提供了与消息代理进行交互发送和接收消息的标准API,几乎每个消息代理都支持JMS。Jms模板和spring Date提供的jdbc模板一样可以消除样板代码,让开发更集中在业务处理上。
<!-- ActiveMQ连接工厂 --> <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616"></property> </bean> <!-- 消息队列 --> <bean id="queue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="userqueue"></constructor-arg> </bean> <!-- 消息主题 --> <bean id="topic" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="usertopic"></constructor-arg> </bean> <!-- 定义模板 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="activeMQConnectionFactory"></property> </bean>
2)发送消息
package com.cn.activemq; import com.cn.pojo.User; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Component; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; @Component public class SendMessageMQUtil { @Autowired private JmsTemplate jmsTemplate; @Autowired @Qualifier("queue") private Destination queue; @Autowired @Qualifier("topic") private Destination topic; /** * 队列--发送消息 * @param user */ public void sendUserQueue(final User user){ jmsTemplate.send(queue, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createObjectMessage(user); } }); } /** * 主题--发送消息 * @param user */ public void sendUserTopic(final User user){ jmsTemplate.send(topic, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createObjectMessage(user); } }); } }
3)接收消息
package com.cn.activemq; import com.cn.pojo.User; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Component; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.ObjectMessage; @Component public class ReceiveMessageMQUtil { @Autowired private JmsTemplate jmsTemplate; @Autowired @Qualifier("queue") private Destination queue; @Autowired @Qualifier("topic") private Destination topic; /** * 队列--接受消息 * @return */ public User receiveUserQueue(){ try { ObjectMessage objectMessage=(ObjectMessage) jmsTemplate.receive(queue); return (User)objectMessage.getObject(); } catch (JMSException e) { e.printStackTrace(); } return null; } /** * 主题--接受消息 * @return */ public User receiveUserTopic(){ try { ObjectMessage objectMessage=(ObjectMessage) jmsTemplate.receive(topic); return (User)objectMessage.getObject(); } catch (JMSException e) { e.printStackTrace(); } return null; } }
4)测试
分别新建测试类
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = {"classpath:spring-activemq.xml", "classpath:spring-redis.xml","classpath:springMvc.xml"})//不可使用classpath:spring-*.xml,否则配置文件不起作用 public class SendMessageMQUtilTest { @Autowired private SendMessageMQUtil sendMessageMQUtil; @Test public void sendUserQueue() throws Exception { User user=new User("computer1", "111111"); sendMessageMQUtil.sendUserQueue(user); } @Test public void sendUserTopic() throws Exception { User user=new User("computer2", "222222"); sendMessageMQUtil.sendUserTopic(user); } }
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = {"classpath:spring-activemq.xml", "classpath:spring-redis.xml","classpath:springMvc.xml"}) public class ReceiveMessageMQUtilTest { @Autowired private ReceiveMessageMQUtil receiveMessageMQUtil; @Test public void receiveUserQueue() throws Exception { User user=receiveMessageMQUtil.receiveUserQueue(); System.out.println("ActiveMQ接收到的数据:"+user); } @Test public void receiveUserTopic() throws Exception { User user=receiveMessageMQUtil.receiveUserTopic(); System.out.println("ActiveMQ接收到的数据:"+user); } @Test public void receiveUserTopic2() throws Exception { User user=receiveMessageMQUtil.receiveUserTopic(); System.out.println("ActiveMQ接收到的数据:"+user); } }
分别运行测试方法,结合activeMQ的控制台可以看出队列、主题以及各自的消费者等情况
显示队列:
显示主题:
3、其它
1)设置默认的目的地
上述在发送消息和接收消息时,每次调用发送/接收消息的方法都传入了一个目的地参数。然而,可以在JmsTemplate实例化的时候,指定默认的目的地,如下:
<!-- 定义队列 --> <bean id="queue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="userqueue"></constructor-arg> </bean> <!-- 定义模板 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="activeMQConnectionFactory"></property> <property name="defaultDestination" ref="queue"></property>//此处注入队列,也可以注入主题 </bean>
采用指定默认目的地的方式,则发送/接收消息调用的方法不用传递目的地了
/** * 队列--发送消息 * @param user */ public void sendUserQueue(final User user){ jmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createObjectMessage(user); } }); /** * 队列--接受消息 * @return */ public User receiveUserQueue(){ try { ObjectMessage objectMessage=(ObjectMessage) jmsTemplate.receive(); return (User)objectMessage.getObject(); } catch (JMSException e) { e.printStackTrace(); } return null; } }
2)发送消息对消息进行转换
除了send(..)方法,JmsTemplate还提供了convertAndSend()方法,该方法不需要MessageCreator参数,而使用内置的消息转换器创建消息并发送。在JmsTemplate实例化时未指定消息转换器,在调用convertAndSend()方法则使用默认的SimpleMessageConverter消息转换器;receiveAndConvert()方法则在接收时候使用消息转换器
/** * 队列--发送消息 * @param user */ public void sendUserQueue(final User user){ jmsTemplate.convertAndSend(user); } /** * 队列--接受消息 * @return */ public User receiveUserQueue(){ return (User)jmsTemplate.receiveAndConvert(); }
在JmsTemplate实例化指定消息转换器,则会在使用convertAndSend()/receiveAndConvert()方法使用指定的消息转换器
<!-- 定义队列 --> <bean id="queue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="userqueue"></constructor-arg> </bean> <!-- 消息转换器 --> <bean id="mappingJackson2MessageConverter" class="org.springframework.jms.support.converter.MappingJackson2MessageConverter"></bean> <!-- 定义模板 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="activeMQConnectionFactory"></property> <property name="defaultDestination" ref="queue"></property> <property name="messageConverter" ref="mappingJackson2MessageConverter"></property> </bean>
综合1)2),使用convertAndSend()/receiveAndConvert()发送和接收消息更加简单;在某些情况下,统一配置目的地也简化的使用
3)使用消息监听器实现异步接收消息
receive()相关方法是阻塞等到队列和主题是否有消息,或者超时才会返回。Spring提供了以POJO的方式处理消息的能力-创建消息监听器,可以在队列和主题有消息时候出发监听器的方法处理消息,而不需要等待
创建POJO
package com.cn.pojo;public class MyMessageHandler2 { public void handlerMessage(User user) { System.out.println("接收到的消息是:"+user); } }
配置消息监听器,如果在队列userqueue中有消息,则会调用handlerMessage的方法。
<!-- 消息处理POJO -->
<bean id="myMessageHandler2" class="com.cn.pojo.MyMessageHandler2"></bean>
<!-- 配置消息监听器 -->
<jms:listener-container connection-factory="activeMQConnectionFactory">
<jms:listener destination="userqueue" ref="myMessageHandler2" method="handlerMessage"></jms:listener>
</jms:listener-container>
如果消息处理的POJO实现了MessageListener 接口
package com.cn.pojo; import javax.jms.Message; import javax.jms.MessageListener; public class MyMessageHandler2 implements MessageListener { public void onMessage(Message message) { System.out.println("接收到的消息是:"+message); } }
则配置消息监听器时候,不需在<jms:listener>中配置 method元素。如果在队列userqueue中有消息,则会调用onMessage的方法。
###ActiveMQ的应用场景https://blog.csdn.net/he90227/article/details/50800646
以上是关于spring-消息的主要内容,如果未能解决你的问题,请参考以下文章
初识Spring源码 -- doResolveDependency | findAutowireCandidates | @Order@Priority调用排序 | @Autowired注入(代码片段
SpringCloud系列十一:SpringCloudStream(SpringCloudStream 简介创建消息生产者创建消息消费者自定义消息通道分组与持久化设置 RoutingKey)(代码片段
Spring boot:thymeleaf 没有正确渲染片段
What's the difference between @Component, @Repository & @Service annotations in Spring?(代码片段
spring练习,在Eclipse搭建的Spring开发环境中,使用set注入方式,实现对象的依赖关系,通过ClassPathXmlApplicationContext实体类获取Bean对象(代码片段