spring boot整合activeMQ

Posted leejin

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spring boot整合activeMQ相关的知识,希望对你有一定的参考价值。

spring boot整合activeMQ


spring boot整合MQ以后,对于消息的发送和接收操作更加便捷。本文将通过四个案例,分别讲解spring boot整合MQ:

  1. spring boot整合MQ发送queue消息

  2. spring boot整合MQ发送topic消息

  3. spring boot整合MQ以后如何让queue和topic消息共存

  4. spring boot整合MQ以后topic消息如何持久化

下面分别进行讲解:


一、 spring boot 整合MQ发送queue消息

搭建测试工程,pom.xml配置如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.itheima.demo</groupId>
<artifactId>activemq-topic</artifactId>
<version>0.0.1-SNAPSHOT</version>
<!-- 变更JDK版本【固定写法】 -->
<properties>
<java.version>1.7</java.version>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.3.RELEASE</version>
</parent>
<dependencies>
<!-- web工程启动器 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 热部署依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
</dependency>
<!-- 整合MQ起步依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
</dependencies>
</project>

application.properties文件

#set tomcat port
server.port=8083
?
#set MQ url
spring.activemq.broker-url=tcp://192.168.25.135:61616
?

ApplicationBoot.java


@SpringBootApplication
public class ApplicationBoot

  public static void main(String[] args)
  SpringApplication.run(ApplicationBoot.class, args);
 

?

消息发送端ProviderController

@RestController
public class ProviderController
  @Autowired
  private JmsMessagingTemplate template;
   
  @RequestMapping("/sendQueue")
  public void sendQueueString()
  //第一个参数:消息目的地名称   第二个参数:消息内容
  template.convertAndSend("a_queueMQ_1",
  "发送了一个queue消息,发送时间:"+new Date().toLocaleString());
 

?

消息接收端QueueConsumer


@Component
public class QueueConsumer

  /**
    * @JmsListener 接收目的地要和发送端保持一致
    * @param msg   msg即消息内容,msg是什么类型取决于发送端发送的消息类型
    *
    */
  @JmsListener(destination="a_queueMQ_1")
  public void consumerQueue(String msg)
  System.out.println("consumerQueue--接收到消息:"+msg);
  System.out.println("接收时间:"+new Date().toLocaleString());
 

测试发送queue消息

在浏览器中执行发送queue消息的URL


http://localhost:8083/sendQueue

查看消费端控制台输出信息:


consumerQueue--接收到消息:发送了一个queue消息,发送时间:2018-3-29 9:37:33
接收时间:2018-3-29 9:37:33

访问MQ管理页面


http://192.168.25.135:8161

查看queue消息

技术图片


二、 spring boot 整合MQ发送topic消息

在spring boot整合MQ以后,默认发送的就是queue消息。如果要发送topic消息,则需要在application.properties中添加如下配置:


#set topic message   默认值:false   当值为true:发送topic消息     当值为false:发送queue消息
spring.jms.pub-sub-domain=true

在上述工程中添加发送topic消息的测试代码

消息发送端ProviderController.java


@RestController
public class ProviderController
  @Autowired
  private JmsMessagingTemplate template;
   
  @RequestMapping("/sendQueue")
  public void sendQueueString()
  //第一个参数:消息目的地名称   第二个参数:消息内容
  template.convertAndSend("a_queueMQ_1",
  "发送了一个queue消息,发送时间:"+new Date().toLocaleString());
 
   
  @RequestMapping("/sendTopic")
  public void sendTopicString() throws JMSException
  template.convertAndSend("a_MQtopic_1",
  "发送了一个topic消息,发送时间:"+new Date().toLocaleString());
 
?

消息接收端TopicConsumer1


@Component
public class TopicConsumer1
?
  @JmsListener(destination="a_MQtopic_1")
  public void consumerTopic(String msg)
  System.out.println("topicConsumer1--接收到消息:"+msg);
 
?

消息接收端TopicConsumer2


@Component
public class TopicConsumer2
?
  @JmsListener(destination="a_MQtopic_1")
  public void consumerTopic(String msg)
  System.out.println("topicConsumer2--接收到消息:"+msg);
 

测试发送topic消息

在浏览器中执行发送topic消息的URL


http://localhost:8083/sendTopic

查看消费端控制台输出信息:


topicConsumer1--接收到消息:发送了一个topic消息,发送时间:2018-3-29 10:20:35
topicConsumer2--接收到消息:发送了一个topic消息,发送时间:2018-3-29 10:20:35

访问MQ管理页面


http://192.168.25.135:8161

查看topic消息

技术图片

可以看到,我们已经可以发送topic消息了。

但这里有一个问题:当我们重新执行发送queue消息的url时,会发现实际发送的还是topic消息,如图:

技术图片

也就是说:

在默认情况下,发送的是queue消息。当我们在application.properties中修改spring.jms.pub-sub-domain=true之后,发送的是topic消息。 queue消息和topic消息这两种消息模式在默认情况下无法共存!

而在实际开发中,queue消息和topic消息都是常见的需求。所以我们有必要让这两种消息能够共存,接下来就来看一看如何解决这个问题。


三、 spring boot整合MQ以后如何让queue和topic消息共存

spring boot整合MQ以后接收消息的基本原理:

接收消息使用的是监听器,由于在项目中可能有多个监听器,如果为每一个监听器都创建一个MQ的连接就太耗资源,所以这里提供了一个监听器的连接工厂JmsListenerContainerFactory,由它来完成消息的接收和分发。

基于上述原理,为了解决queue和topic消息共存的问题,我们可以进行如下处理:
  1. 为queue接收端和topic接收端分别创建监听器连接工厂

  2. 在topic的连接工厂中设置pub-sub-domain=true

  3. 在接收端的@JmsListener注解中,通过containerFactory指定使用的连接工厂

下面就来解决这个问题,测试代码如下

application.properties中取消spring.jms.pub-sub-domain=true配置


#set tomcat port
server.port=8083
?
#set MQ url
spring.activemq.broker-url=tcp://192.168.25.135:61616
?
#set topic message
#spring.jms.pub-sub-domain=true

编写MQConfig类,配监听工厂

/**
 * MQ配置类
 * @author leejin
 *
 */
@Configuration
public class MQConfig 
  
    /**
     * 配置topic目的地
     * bean的名字默认就是方法名
     * @return
     */
    @Bean
    public Topic topic()
    	return new ActiveMQTopic("a1_MQtopic_3");
    
    
    /**
     * 配置queue目的地
     * bean的名字默认就是方法名
     * @return
     */
    @Bean
    public Queue queue()
    	return new ActiveMQQueue("a1_queueMQ_3");
    

    /**
     * 此bean的名称就是方法名
     * 配置topic消息监听容器工厂
     * @param connectionFactory   与MQ创建连接的工厂
     * @return
     */
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory connectionFactory) 
    	DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        //指定接收topic消息。相当于在applicatioin.properties中配置了spring.jms.pub-sub-domain=true
        bean.setPubSubDomain(true);
        bean.setConnectionFactory(connectionFactory);
        return bean;
    
    
    /**
     * 此bean的名称就是方法名
     * 配置queue消息监听容器工厂
     * @param connectionFactory   与MQ创建连接的工厂
     * @return
     */
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory connectionFactory)
    	DefaultJmsListenerContainerFactory bean=new DefaultJmsListenerContainerFactory();
    	bean.setConnectionFactory(connectionFactory);
    	return bean;
    
	



修改之前的发送端和消费端代码

发送端ProviderController

@RestController
public class ProviderController

  @Autowired
  private Topic topic;
   
  @Autowired
  private Queue queue;
   
  @Autowired
  private JmsMessagingTemplate template;
   
  @RequestMapping("/sendQueue")
  public void sendQueueString()
  //第一个参数:消息目的地名称   第二个参数:消息内容
  template.convertAndSend(queue,
  "发送了一个queue消息,发送时间:"+new Date().toLocaleString());
 
   
  @RequestMapping("/sendTopic")
  public void sendTopicString() throws JMSException
  template.convertAndSend(topic,
  "发送了一个topic消息,发送时间:"+new Date().toLocaleString());
 
?

queue消费端QueueConsumer


@Component
public class QueueConsumer

  /**
    * @JmsListener 接收目的地要和发送端保持一致
    * destination 指定目的地
    * containerFactory 指定监听器连接工厂
    * @param msg   msg即消息内容,msg是什么类型取决于发送端发送的消息类型
    */
  @JmsListener(destination="a1_queueMQ_3",containerFactory="jmsListenerContainerQueue")
  public void consumerQueue(String msg)
  System.out.println("consumerQueue--接收到消息:"+msg);
 
   

topic消费端TopicConsumer1


@Component
public class TopicConsumer1

  /**
    * @JmsListener 接收目的地要和发送端保持一致
    * destination 指定目的地
    * containerFactory 指定监听器连接工厂
    * @param msg   msg即消息内容,msg是什么类型取决于发送端发送的消息类型
    */
  @JmsListener(destination="a1_MQtopic_3",containerFactory="jmsListenerContainerTopic")
  public void consumerTopic(String msg)
  System.out.println("topicConsumer1--接收到消息:"+msg);
 

topic消费端TopicConsumer2


@Component
public class TopicConsumer2
?
  /**
    * @JmsListener 接收目的地要和发送端保持一致
    * destination 指定目的地
    * containerFactory 指定监听器连接工厂
    * @param msg   msg即消息内容,msg是什么类型取决于发送端发送的消息类型
    */
  @JmsListener(destination="a1_MQtopic_3",containerFactory="jmsListenerContainerTopic")
  public void consumerTopic(String msg)
  System.out.println("topicConsumer2--接收到消息:"+msg);
 

测试queue和topic消息共存

  1. 测试发送queue消息

在浏览器中执行发送queue消息的URL


http://localhost:8083/sendQueue

查看消费端控制台输出信息:


consumerQueue--接收到消息:发送了一个queue消息,发送时间:2018-3-29 14:39:55

访问MQ管理页面


http://192.168.25.135:8161

查看queue消息

技术图片

  1. 测试发送topic消息在浏览器中执行发送topic消息的URL


http://localhost:8083/sendTopic

查看消费端控制台输出信息:


topicConsumer1--接收到消息:发送了一个topic消息,发送时间:2018-3-29 14:41:10
topicConsumer2--接收到消息:发送了一个topic消息,发送时间:2018-3-29 14:41:10

访问MQ管理页面


http://192.168.25.135:8161

查看topic消息

技术图片

这样我们就解决了queue和topic消息的共存问题,主要的实现思路就是为queue消息和topic消息分别指定监听容器工厂。

接下来还有最后一个问题,就是topic消息的持久化问题。众所周知,topic消息默认情况下是不持久化的,也就是说发送端发送消息的时候,如果消费端不在线,这个消息就丢了,消费端再上线时是无法收到这个消息的。而很多时候我们是需要去接收到这个消息的,那么要解决这个问题,就需要对topic消息做持久化处理。

接下来我们就来看看怎么实现topic消息的持久化。


四、 spring boot整合MQ以后topic消息如何持久化

要实现topic消息的持久化,需要在消息发送端和消费端以及监听容器(JmsListenerContainerFactory)都做处理。在之前案例的基础上,我们做一些修改,加入topic消息持久化设置。测试代码如下:

监听容器(JmsListenerContainerFactory)配置

/**
* MQ配置类
* @author leejin
*
*/
@Configuration
public class MQConfig

  /**
    * 配置topic目的地
    * bean的名字默认就是方法名
    * @return
    */
  @Bean
  public Topic topic()
  return new ActiveMQTopic("a11_1_MQtopic_5");
 
   
  /**
    * 配置queue目的地
    * bean的名字默认就是方法名
    * @return
    */
  @Bean
  public Queue queue()
  return new ActiveMQQueue("a1_queueMQ_3");
 
   
  /**
    * 此bean的名称就是方法名
    * 配置topic消息监听容器工厂
    * @param connectionFactory   与MQ创建连接的工厂
    * @return
    */
  @Bean
  public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory connectionFactory)
  DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
      //指定接收topic消息。相当于在applicatioin.properties中配置了spring.jms.pub-sub-domain=true
      bean.setPubSubDomain(true);
      bean.setConnectionFactory(connectionFactory);
      return bean;
 
   
  /**
    * 此bean的名称就是方法名
    * 配置queue消息监听容器工厂
    * @param connectionFactory   与MQ创建连接的工厂
    * @return
    */
  @Bean
  public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory connectionFactory)
  DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
  bean.setConnectionFactory(connectionFactory);
  return bean;
 
   
  //上面配置的那个topic监听工厂,对于不需要持久化的topic消息是可以用的
  //下面配置的监听工厂,是给需要持久化的topic消息使用的
  /**
    * 第一个topic消费端的监听工厂
    * @param connectionFactory   与MQ创建连接的工厂
    * @return
    */
  @Bean
  public JmsListenerContainerFactory<?> topicFactory1(ConnectionFactory connectionFactory)
  DefaultJmsListenerContainerFactory bean = defaultJmsListenerContainerFactoryTopic(connectionFactory);
        //用来标识第一个topic客户端(clientId值可以自己指定)
  bean.setClientId("10001");
        return bean;
 
   
  /**
    * 第二个topic消费端的监听工厂
    * @param connectionFactory   与MQ创建连接的工厂
    * @return
    */
  @Bean
  public JmsListenerContainerFactory<?> topicFactory2(ConnectionFactory connectionFactory)
  DefaultJmsListenerContainerFactory bean = defaultJmsListenerContainerFactoryTopic(connectionFactory);
  //用来标识第二个topic客户端(clientId值可以自己指定)
  bean.setClientId("10002");
  return bean;
 
   
  /**
    * 对于一些相同配置,提取成方法
    * @param connectionFactory
    * @return
    */
  private DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactoryTopic(ConnectionFactory connectionFactory)
      DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
      //指定接收topic消息。相当于在applicatioin.properties中配置了spring.jms.pub-sub-domain=true
      bean.setPubSubDomain(true);
      bean.setConnectionFactory(connectionFactory);
      //开启持久化订阅
      bean.setSubscriptionDurable(true);
      return bean;
 

消息发送端ProviderController

@RestController
public class ProviderController

  @Autowired
  private Topic topic;
   
  @Autowired
  private Queue queue;
   
  @Autowired
  private JmsMessagingTemplate template;
   
  /**
    * 发送queue消息
    */
  @RequestMapping("/sendQueue")
  public void sendQueueString()
  //第一个参数:消息目的地名称   第二个参数:消息内容
  template.convertAndSend(queue,
  "发送了一个queue消息,发送时间:"+new Date().toLocaleString());
 
   
  /**
    * 发送topic消息
    * @throws JMSException
    */
  @RequestMapping("/sendTopic")
  public void sendTopicString() throws JMSException
  /**设置发送持久化的topic消息
  * 注意:DeliveryMode是javax.jms的一个接口
  * 不要导入了org.springframework.boot.autoconfigure.jms.JmsProperties.DeliveryMode的这个枚举类
  **/
  template.getJmsTemplate().setDeliveryMode(DeliveryMode.PERSISTENT);
  template.convertAndSend(topic,
  "发送了一个topic消息,发送时间:"+new Date().toLocaleString());
 

消费端TopicConsumer1


@Component
public class TopicConsumer1

  /**
    * @JmsListener 接收目的地要和发送端保持一致
    * destination 指定目的地
    * containerFactory 指定监听器连接工厂
    * @param msg   msg即消息内容,msg是什么类型取决于发送端发送的消息类型
    */
  @JmsListener(destination="a11_1_MQtopic_5",containerFactory="topicFactory1")
  public void consumerTopic(String msg)
  System.out.println("topicConsumer1--接收到消息:"+msg);
 

消费端TopicConsumer2

@Component
public class TopicConsumer2 

    /**
     * @JmsListener 接收目的地要和发送端保持一致
     * destination  指定目的地
     * containerFactory  指定监听器连接工厂
     * @param msg   msg即消息内容,msg是什么类型取决于发送端发送的消息类型
     */
    @JmsListener(destination="a11_1_MQtopic_5",containerFactory="topicFactory2")
    public void consumerTopic(String msg)
    	System.out.println("topicConsumer2--接收到消息:"+msg);
    
	



测试topic消息的持久化【注意:在测试的时候需要让两个消费端先完成clientId注册(也就是要先在线接收一次消息)】

这里的测试分两步进行:

  1. TopicConsumer1和TopicConsumer2同时在线(目的是为了完成clientId注册)

在浏览器中执行发送topic消息的URL

http://localhost:8083/sendTopic


查看消费端控制台输出信息:

topicConsumer1--接收到消息:发送了一个topic消息,发送时间:2018-3-29 16:03:54
topicConsumer2--接收到消息:发送了一个topic消息,发送时间:2018-3-29 16:03:54


访问MQ管理页面

http://192.168.25.135:8161


查看topic消息

技术图片

  1. TopicConsumer1不在线(这里是注释掉了TopicConsumer1接收消息的代码)

在浏览器中执行发送topic消息的URL

http://localhost:8083/sendTopic


查看消费端控制台输出信息(只有TopicConsumer2接收到了消息):

topicConsumer2--接收到消息:发送了一个topic消息,发送时间:2018-3-29 16:07:44


这时打开注释的代码(TopicConsumer1上线),查看控制台,发现TopicConsumer1在上线后,接收到了刚才的消息

技术图片

这样,我们就完成了topic消息的持久化。

以上是关于spring boot整合activeMQ的主要内容,如果未能解决你的问题,请参考以下文章

Spring Boot如何整合Redis

Spring Boot如何整合Redis

Spring Boot:Spring Boot整合FreeMarker

spring boot 系列之四:spring boot 整合JPA

Spring Boot 2.X - Spring Boot整合AMQP之RabbitMQ

Spring Boot系列Spring Boot整合持久层