spring boot整合activeMQ
Posted leejin
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spring boot整合activeMQ相关的知识,希望对你有一定的参考价值。
spring boot整合MQ以后,对于消息的发送和接收操作更加便捷。本文将通过四个案例,分别讲解spring boot整合MQ:
-
spring boot整合MQ发送queue消息
-
spring boot整合MQ发送topic消息
-
spring boot整合MQ以后如何让queue和topic消息共存
-
spring boot整合MQ以后topic消息如何持久化
下面分别进行讲解:
一、 spring boot 整合MQ发送queue消息
搭建测试工程,pom.xml配置如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.itheima.demo</groupId>
<artifactId>activemq-topic</artifactId>
<version>0.0.1-SNAPSHOT</version>
<!-- 变更JDK版本【固定写法】 -->
<properties>
<java.version>1.7</java.version>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.3.RELEASE</version>
</parent>
<dependencies>
<!-- web工程启动器 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 热部署依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
</dependency>
<!-- 整合MQ起步依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
</dependencies>
</project>
application.properties文件
#set tomcat port
server.port=8083
?
#set MQ url
spring.activemq.broker-url=tcp://192.168.25.135:61616
?
ApplicationBoot.java
@SpringBootApplication
public class ApplicationBoot
public static void main(String[] args)
SpringApplication.run(ApplicationBoot.class, args);
?
消息发送端ProviderController
@RestController
public class ProviderController
@Autowired
private JmsMessagingTemplate template;
@RequestMapping("/sendQueue")
public void sendQueueString()
//第一个参数:消息目的地名称 第二个参数:消息内容
template.convertAndSend("a_queueMQ_1",
"发送了一个queue消息,发送时间:"+new Date().toLocaleString());
?
消息接收端QueueConsumer
@Component
public class QueueConsumer
/**
* @JmsListener 接收目的地要和发送端保持一致
* @param msg msg即消息内容,msg是什么类型取决于发送端发送的消息类型
*
*/
@JmsListener(destination="a_queueMQ_1")
public void consumerQueue(String msg)
System.out.println("consumerQueue--接收到消息:"+msg);
System.out.println("接收时间:"+new Date().toLocaleString());
测试发送queue消息
在浏览器中执行发送queue消息的URL
http://localhost:8083/sendQueue
查看消费端控制台输出信息:
consumerQueue--接收到消息:发送了一个queue消息,发送时间:2018-3-29 9:37:33
接收时间:2018-3-29 9:37:33
访问MQ管理页面
http://192.168.25.135:8161
查看queue消息
二、 spring boot 整合MQ发送topic消息
在spring boot整合MQ以后,默认发送的就是queue消息。如果要发送topic消息,则需要在application.properties中添加如下配置:
#set topic message 默认值:false 当值为true:发送topic消息 当值为false:发送queue消息
spring.jms.pub-sub-domain=true
在上述工程中添加发送topic消息的测试代码
消息发送端ProviderController.java
@RestController
public class ProviderController
@Autowired
private JmsMessagingTemplate template;
@RequestMapping("/sendQueue")
public void sendQueueString()
//第一个参数:消息目的地名称 第二个参数:消息内容
template.convertAndSend("a_queueMQ_1",
"发送了一个queue消息,发送时间:"+new Date().toLocaleString());
@RequestMapping("/sendTopic")
public void sendTopicString() throws JMSException
template.convertAndSend("a_MQtopic_1",
"发送了一个topic消息,发送时间:"+new Date().toLocaleString());
?
消息接收端TopicConsumer1
@Component
public class TopicConsumer1
?
@JmsListener(destination="a_MQtopic_1")
public void consumerTopic(String msg)
System.out.println("topicConsumer1--接收到消息:"+msg);
?
消息接收端TopicConsumer2
@Component
public class TopicConsumer2
?
@JmsListener(destination="a_MQtopic_1")
public void consumerTopic(String msg)
System.out.println("topicConsumer2--接收到消息:"+msg);
测试发送topic消息
在浏览器中执行发送topic消息的URL
http://localhost:8083/sendTopic
查看消费端控制台输出信息:
topicConsumer1--接收到消息:发送了一个topic消息,发送时间:2018-3-29 10:20:35
topicConsumer2--接收到消息:发送了一个topic消息,发送时间:2018-3-29 10:20:35
访问MQ管理页面
http://192.168.25.135:8161
查看topic消息
可以看到,我们已经可以发送topic消息了。
但这里有一个问题:当我们重新执行发送queue消息的url时,会发现实际发送的还是topic消息,如图:
也就是说:
在默认情况下,发送的是queue消息。当我们在application.properties中修改spring.jms.pub-sub-domain=true之后,发送的是topic消息。 queue消息和topic消息这两种消息模式在默认情况下无法共存!
而在实际开发中,queue消息和topic消息都是常见的需求。所以我们有必要让这两种消息能够共存,接下来就来看一看如何解决这个问题。
三、 spring boot整合MQ以后如何让queue和topic消息共存
spring boot整合MQ以后接收消息的基本原理:
接收消息使用的是监听器,由于在项目中可能有多个监听器,如果为每一个监听器都创建一个MQ的连接就太耗资源,所以这里提供了一个监听器的连接工厂JmsListenerContainerFactory,由它来完成消息的接收和分发。
基于上述原理,为了解决queue和topic消息共存的问题,我们可以进行如下处理:
-
为queue接收端和topic接收端分别创建监听器连接工厂
-
在topic的连接工厂中设置pub-sub-domain=true
-
在接收端的@JmsListener注解中,通过containerFactory指定使用的连接工厂
下面就来解决这个问题,测试代码如下
application.properties中取消spring.jms.pub-sub-domain=true配置
#set tomcat port
server.port=8083
?
#set MQ url
spring.activemq.broker-url=tcp://192.168.25.135:61616
?
#set topic message
#spring.jms.pub-sub-domain=true
编写MQConfig类,配监听工厂
/** * MQ配置类 * @author leejin * */ @Configuration public class MQConfig /** * 配置topic目的地 * bean的名字默认就是方法名 * @return */ @Bean public Topic topic() return new ActiveMQTopic("a1_MQtopic_3"); /** * 配置queue目的地 * bean的名字默认就是方法名 * @return */ @Bean public Queue queue() return new ActiveMQQueue("a1_queueMQ_3"); /** * 此bean的名称就是方法名 * 配置topic消息监听容器工厂 * @param connectionFactory 与MQ创建连接的工厂 * @return */ @Bean public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory connectionFactory) DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); //指定接收topic消息。相当于在applicatioin.properties中配置了spring.jms.pub-sub-domain=true bean.setPubSubDomain(true); bean.setConnectionFactory(connectionFactory); return bean; /** * 此bean的名称就是方法名 * 配置queue消息监听容器工厂 * @param connectionFactory 与MQ创建连接的工厂 * @return */ @Bean public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory connectionFactory) DefaultJmsListenerContainerFactory bean=new DefaultJmsListenerContainerFactory(); bean.setConnectionFactory(connectionFactory); return bean;
修改之前的发送端和消费端代码
发送端ProviderController
@RestController
public class ProviderController
@Autowired
private Topic topic;
@Autowired
private Queue queue;
@Autowired
private JmsMessagingTemplate template;
@RequestMapping("/sendQueue")
public void sendQueueString()
//第一个参数:消息目的地名称 第二个参数:消息内容
template.convertAndSend(queue,
"发送了一个queue消息,发送时间:"+new Date().toLocaleString());
@RequestMapping("/sendTopic")
public void sendTopicString() throws JMSException
template.convertAndSend(topic,
"发送了一个topic消息,发送时间:"+new Date().toLocaleString());
?
queue消费端QueueConsumer
@Component
public class QueueConsumer
/**
* @JmsListener 接收目的地要和发送端保持一致
* destination 指定目的地
* containerFactory 指定监听器连接工厂
* @param msg msg即消息内容,msg是什么类型取决于发送端发送的消息类型
*/
@JmsListener(destination="a1_queueMQ_3",containerFactory="jmsListenerContainerQueue")
public void consumerQueue(String msg)
System.out.println("consumerQueue--接收到消息:"+msg);
topic消费端TopicConsumer1
@Component
public class TopicConsumer1
/**
* @JmsListener 接收目的地要和发送端保持一致
* destination 指定目的地
* containerFactory 指定监听器连接工厂
* @param msg msg即消息内容,msg是什么类型取决于发送端发送的消息类型
*/
@JmsListener(destination="a1_MQtopic_3",containerFactory="jmsListenerContainerTopic")
public void consumerTopic(String msg)
System.out.println("topicConsumer1--接收到消息:"+msg);
topic消费端TopicConsumer2
@Component
public class TopicConsumer2
?
/**
* @JmsListener 接收目的地要和发送端保持一致
* destination 指定目的地
* containerFactory 指定监听器连接工厂
* @param msg msg即消息内容,msg是什么类型取决于发送端发送的消息类型
*/
@JmsListener(destination="a1_MQtopic_3",containerFactory="jmsListenerContainerTopic")
public void consumerTopic(String msg)
System.out.println("topicConsumer2--接收到消息:"+msg);
测试queue和topic消息共存
-
测试发送queue消息
在浏览器中执行发送queue消息的URL
http://localhost:8083/sendQueue
查看消费端控制台输出信息:
consumerQueue--接收到消息:发送了一个queue消息,发送时间:2018-3-29 14:39:55
访问MQ管理页面
http://192.168.25.135:8161
查看queue消息
-
测试发送topic消息在浏览器中执行发送topic消息的URL
http://localhost:8083/sendTopic
查看消费端控制台输出信息:
topicConsumer1--接收到消息:发送了一个topic消息,发送时间:2018-3-29 14:41:10
topicConsumer2--接收到消息:发送了一个topic消息,发送时间:2018-3-29 14:41:10
访问MQ管理页面
http://192.168.25.135:8161
查看topic消息
这样我们就解决了queue和topic消息的共存问题,主要的实现思路就是为queue消息和topic消息分别指定监听容器工厂。
接下来还有最后一个问题,就是topic消息的持久化问题。众所周知,topic消息默认情况下是不持久化的,也就是说发送端发送消息的时候,如果消费端不在线,这个消息就丢了,消费端再上线时是无法收到这个消息的。而很多时候我们是需要去接收到这个消息的,那么要解决这个问题,就需要对topic消息做持久化处理。
接下来我们就来看看怎么实现topic消息的持久化。
四、 spring boot整合MQ以后topic消息如何持久化
要实现topic消息的持久化,需要在消息发送端和消费端以及监听容器(JmsListenerContainerFactory)都做处理。在之前案例的基础上,我们做一些修改,加入topic消息持久化设置。测试代码如下:
监听容器(JmsListenerContainerFactory)配置
/**
* MQ配置类
* @author leejin
*
*/
@Configuration
public class MQConfig
/**
* 配置topic目的地
* bean的名字默认就是方法名
* @return
*/
@Bean
public Topic topic()
return new ActiveMQTopic("a11_1_MQtopic_5");
/**
* 配置queue目的地
* bean的名字默认就是方法名
* @return
*/
@Bean
public Queue queue()
return new ActiveMQQueue("a1_queueMQ_3");
/**
* 此bean的名称就是方法名
* 配置topic消息监听容器工厂
* @param connectionFactory 与MQ创建连接的工厂
* @return
*/
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory connectionFactory)
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
//指定接收topic消息。相当于在applicatioin.properties中配置了spring.jms.pub-sub-domain=true
bean.setPubSubDomain(true);
bean.setConnectionFactory(connectionFactory);
return bean;
/**
* 此bean的名称就是方法名
* 配置queue消息监听容器工厂
* @param connectionFactory 与MQ创建连接的工厂
* @return
*/
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory connectionFactory)
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setConnectionFactory(connectionFactory);
return bean;
//上面配置的那个topic监听工厂,对于不需要持久化的topic消息是可以用的
//下面配置的监听工厂,是给需要持久化的topic消息使用的
/**
* 第一个topic消费端的监听工厂
* @param connectionFactory 与MQ创建连接的工厂
* @return
*/
@Bean
public JmsListenerContainerFactory<?> topicFactory1(ConnectionFactory connectionFactory)
DefaultJmsListenerContainerFactory bean = defaultJmsListenerContainerFactoryTopic(connectionFactory);
//用来标识第一个topic客户端(clientId值可以自己指定)
bean.setClientId("10001");
return bean;
/**
* 第二个topic消费端的监听工厂
* @param connectionFactory 与MQ创建连接的工厂
* @return
*/
@Bean
public JmsListenerContainerFactory<?> topicFactory2(ConnectionFactory connectionFactory)
DefaultJmsListenerContainerFactory bean = defaultJmsListenerContainerFactoryTopic(connectionFactory);
//用来标识第二个topic客户端(clientId值可以自己指定)
bean.setClientId("10002");
return bean;
/**
* 对于一些相同配置,提取成方法
* @param connectionFactory
* @return
*/
private DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactoryTopic(ConnectionFactory connectionFactory)
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
//指定接收topic消息。相当于在applicatioin.properties中配置了spring.jms.pub-sub-domain=true
bean.setPubSubDomain(true);
bean.setConnectionFactory(connectionFactory);
//开启持久化订阅
bean.setSubscriptionDurable(true);
return bean;
消息发送端ProviderController
@RestController
public class ProviderController
@Autowired
private Topic topic;
@Autowired
private Queue queue;
@Autowired
private JmsMessagingTemplate template;
/**
* 发送queue消息
*/
@RequestMapping("/sendQueue")
public void sendQueueString()
//第一个参数:消息目的地名称 第二个参数:消息内容
template.convertAndSend(queue,
"发送了一个queue消息,发送时间:"+new Date().toLocaleString());
/**
* 发送topic消息
* @throws JMSException
*/
@RequestMapping("/sendTopic")
public void sendTopicString() throws JMSException
/**设置发送持久化的topic消息
* 注意:DeliveryMode是javax.jms的一个接口
* 不要导入了org.springframework.boot.autoconfigure.jms.JmsProperties.DeliveryMode的这个枚举类
**/
template.getJmsTemplate().setDeliveryMode(DeliveryMode.PERSISTENT);
template.convertAndSend(topic,
"发送了一个topic消息,发送时间:"+new Date().toLocaleString());
消费端TopicConsumer1
@Component
public class TopicConsumer1
/**
* @JmsListener 接收目的地要和发送端保持一致
* destination 指定目的地
* containerFactory 指定监听器连接工厂
* @param msg msg即消息内容,msg是什么类型取决于发送端发送的消息类型
*/
@JmsListener(destination="a11_1_MQtopic_5",containerFactory="topicFactory1")
public void consumerTopic(String msg)
System.out.println("topicConsumer1--接收到消息:"+msg);
消费端TopicConsumer2
@Component public class TopicConsumer2 /** * @JmsListener 接收目的地要和发送端保持一致 * destination 指定目的地 * containerFactory 指定监听器连接工厂 * @param msg msg即消息内容,msg是什么类型取决于发送端发送的消息类型 */ @JmsListener(destination="a11_1_MQtopic_5",containerFactory="topicFactory2") public void consumerTopic(String msg) System.out.println("topicConsumer2--接收到消息:"+msg);
测试topic消息的持久化【注意:在测试的时候需要让两个消费端先完成clientId注册(也就是要先在线接收一次消息)】
这里的测试分两步进行:
-
TopicConsumer1和TopicConsumer2同时在线(目的是为了完成clientId注册)
在浏览器中执行发送topic消息的URL
http://localhost:8083/sendTopic
查看消费端控制台输出信息:
topicConsumer1--接收到消息:发送了一个topic消息,发送时间:2018-3-29 16:03:54 topicConsumer2--接收到消息:发送了一个topic消息,发送时间:2018-3-29 16:03:54
访问MQ管理页面
http://192.168.25.135:8161
查看topic消息
-
TopicConsumer1不在线(这里是注释掉了TopicConsumer1接收消息的代码)
在浏览器中执行发送topic消息的URL
http://localhost:8083/sendTopic
查看消费端控制台输出信息(只有TopicConsumer2接收到了消息):
topicConsumer2--接收到消息:发送了一个topic消息,发送时间:2018-3-29 16:07:44
这时打开注释的代码(TopicConsumer1上线),查看控制台,发现TopicConsumer1在上线后,接收到了刚才的消息
这样,我们就完成了topic消息的持久化。
以上是关于spring boot整合activeMQ的主要内容,如果未能解决你的问题,请参考以下文章
Spring Boot:Spring Boot整合FreeMarker
spring boot 系列之四:spring boot 整合JPA