ActiveMQ订阅模式持久化实现

Posted 爷的眼睛闪亮

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ActiveMQ订阅模式持久化实现相关的知识,希望对你有一定的参考价值。

实现步骤:
1、配置发送xml,applicationContext-send.xml

[html] view plain copy
 
  1. <?xml version="1.0" encoding="UTF-8"?>    
  2.     
  3. <beans xmlns="http://www.springframework.org/schema/beans"    
  4.     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"    
  5.     xsi:schemaLocation="http://www.springframework.org/schema/beans      
  6.          http://www.springframework.org/schema/beans/spring-beans-2.5.xsd      
  7.          http://www.springframework.org/schema/context      
  8.          http://www.springframework.org/schema/context/spring-context-2.5.xsd">    
  9.   <context:property-placeholder location="classpath:/properties/jms.properties" />  
  10.     
  11.     <!-- 配置JMS连接工厂 -->    
  12.     <bean id="myConnectionFactory"    
  13.         class="org.springframework.jms.connection.CachingConnectionFactory">    
  14.         <!-- Session缓存数量 -->    
  15.         <property name="sessionCacheSize" value="10" />    
  16.         <property name="targetConnectionFactory">    
  17.             <bean class="org.apache.activemq.ActiveMQConnectionFactory">    
  18.                 <!-- MQ地址 -->    
  19.                 <property name="brokerURL" value="${brokerUrl}" />    
  20.                  <!-- 是否异步发送 -->    
  21.                 <property name="useAsyncSend" value="true" />    
  22.             </bean>    
  23.         </property>    
  24.     </bean>    
  25.     
  26.     <!-- 发送消息的目的地(一个主题) -->    
  27.     <bean id="myDestination" class="org.apache.activemq.command.ActiveMQTopic">    
  28.         <!-- 设置消息主题的名字 -->    
  29.         <constructor-arg index="0" value="${send.name}" />    
  30.     </bean>    
  31.     
  32.     <!-- 配置JMS模版 -->    
  33.     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">    
  34.         <property name="connectionFactory" ref="myConnectionFactory" />    
  35.         <property name="defaultDestination" ref="myDestination" />    
  36.         <!-- 订阅发布模式 -->    
  37.         <property name="pubSubDomain" value="true" />    
  38.         <property name="receiveTimeout" value="10000" />    
  39.     </bean>    
  40. </beans>   



2、编写发送java,ActiveMQsender.java

 

[java] view plain copy
 
  1. package com.by.activeMQ;  
  2.   
  3. import javax.jms.JMSException;  
  4. import javax.jms.Message;  
  5. import javax.jms.Session;  
  6. import javax.jms.TextMessage;  
  7.   
  8. import org.springframework.context.ApplicationContext;  
  9. import org.springframework.context.support.ClassPathXmlApplicationContext;  
  10. import org.springframework.jms.core.JmsTemplate;  
  11. import org.springframework.jms.core.MessageCreator;  
  12.   
  13. public class ActiveMQsender {  
  14.     public static void main(String[] args) {  
  15.         @SuppressWarnings("resource")  
  16.         ApplicationContext ctx = new ClassPathXmlApplicationContext(  
  17.                 "ApplicationContext/applicationContext-send.xml");  
  18.   
  19.         JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("jmsTemplate");  
  20.   
  21.         jmsTemplate.send(new MessageCreator() {  
  22.             public Message createMessage(Session session) throws JMSException {  
  23.                 TextMessage msg = session.createTextMessage();  
  24.                 // 设置消息属性  
  25.                 msg.setStringProperty("mood", "happy");  
  26.                 // 设置消息内容  
  27.                 msg.setText("Hello World!");  
  28.                 return msg;  
  29.             }  
  30.         });  
  31.   
  32.         System.out.println("send end");  
  33.     }  
  34. }  



 

3、配置接收xml,applicationContext-receive.xml

[html] view plain copy
 
  1. <?xml version="1.0" encoding="UTF-8"?>    
  2.     
  3. <beans xmlns="http://www.springframework.org/schema/beans"    
  4.     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"    
  5.     xsi:schemaLocation="http://www.springframework.org/schema/beans      
  6.          http://www.springframework.org/schema/beans/spring-beans-2.5.xsd      
  7.          http://www.springframework.org/schema/context      
  8.          http://www.springframework.org/schema/context/spring-context-2.5.xsd">    
  9.   <context:property-placeholder location="classpath:/properties/jms.properties" />  
  10.     
  11.   <!-- 第一个接收者 -->  
  12.     <!-- 配置JMS连接工厂 -->    
  13.     <bean id="myConnectionFactory"    
  14.         class="org.springframework.jms.connection.CachingConnectionFactory">    
  15.         <!-- Session缓存数量 -->    
  16.         <property name="sessionCacheSize" value="10" />    
  17.         <!-- 接收者ID -->    
  18.         <property name="clientId" value="${topic.clientId}" />    
  19.         <property name="targetConnectionFactory">    
  20.             <bean class="org.apache.activemq.ActiveMQConnectionFactory">    
  21.                 <!-- MQ地址 -->    
  22.                 <property name="brokerURL" value="${brokerUrl}" />    
  23.             </bean>    
  24.         </property>    
  25.     </bean>    
  26.     
  27.     <!-- 发送消息的目的地(一个主题) -->    
  28.     <bean id="myDestination" class="org.apache.activemq.command.ActiveMQTopic">    
  29.         <!-- 设置消息主题的名字 -->    
  30.         <constructor-arg index="0" value="${topic.name}" />    
  31.     </bean>    
  32.     
  33.     <!-- 生产消息配置 (自己定义)-->    
  34.     <bean id="myTopicConsumer" class="com.by.activeMQ.ActiveMQreceiver" />    
  35.     
  36.     <!-- 消息监听器 -->    
  37.     <bean id="myTopicListener"    
  38.         class="org.springframework.jms.listener.adapter.MessageListenerAdapter">    
  39.         <constructor-arg ref="myTopicConsumer" />    
  40.         <!-- 接收消息的方法名称 -->    
  41.         <property name="defaultListenerMethod" value="receive" />    
  42.         <!-- 不进行消息转换 -->    
  43.         <property name="messageConverter"><null/></property>    
  44.     </bean>    
  45.     
  46.     <!-- 消息监听容器 -->    
  47.     <bean id="myListenerContainer"    
  48.         class="org.springframework.jms.listener.DefaultMessageListenerContainer">    
  49.         <property name="connectionFactory" ref="myConnectionFactory" />    
  50.         <!-- 发布订阅模式 -->    
  51.         <property name="pubSubDomain" value="true"/>    
  52.         <!-- 消息持久化 -->    
  53.         <property name="subscriptionDurable" value="true"/>    
  54.         <property name="receiveTimeout" value="10"/>    
  55.         <!-- 接收者ID -->    
  56.         <property name="clientId" value="${topic.clientId}" />    
  57.         <property name="durableSubscriptionName" value="${topic.clientId}"/>    
  58.         <property name="destination" ref="myDestination" />    
  59.         <property name="messageListener" ref="myTopicListener" />    
  60.     </bean>    
  61.       
  62.         
  63.   <!-- 第二个接收者 -->  
  64.     
  65.          <!-- 配置JMS连接工厂 -->    
  66.     <bean id="myConnectionFactory2"    
  67.         class="org.springframework.jms.connection.CachingConnectionFactory">    
  68.         <!-- Session缓存数量 -->    
  69.         <property name="sessionCacheSize" value="10" />    
  70.         <!-- 接收者ID -->    
  71.         <property name="clientId" value="${topic2.clientId}" />    
  72.         <property name="targetConnectionFactory">    
  73.             <bean class="org.apache.activemq.ActiveMQConnectionFactory">    
  74.                 <!-- MQ地址 -->    
  75.                 <property name="brokerURL" value="${brokerUrl}" />    
  76.             </bean>    
  77.         </property>    
  78.     </bean>    
  79.     
  80.     <!-- 发送消息的目的地(一个主题) -->    
  81.     <bean id="myDestination2" class="org.apache.activemq.command.ActiveMQTopic">    
  82.         <!-- 设置消息主题的名字 -->    
  83.         <constructor-arg index="0" value="${topic2.name}" />    
  84.     </bean>    
  85.     
  86.     <!-- 生产消息配置 (自己定义)-->    
  87.     <bean id="myTopicConsumer2" class="com.by.activeMQ.ActiveMQreceiver2" />    
  88.     
  89.     <!-- 消息监听器 -->    
  90.     <bean id="myTopicListener2"    
  91.         class="org.springframework.jms.listener.adapter.MessageListenerAdapter">    
  92.         <constructor-arg ref="myTopicConsumer2" />    
  93.         <!-- 接收消息的方法名称 -->    
  94.         <property name="defaultListenerMethod" value="receive" />    
  95.         <!-- 不进行消息转换 -->    
  96.         <property name="messageConverter"><null/></property>    
  97.     </bean>    
  98.     
  99.     <!-- 消息监听容器 -->    
  100.     <bean id="myListenerContainer2"    
  101.         class="org.springframework.jms.listener.DefaultMessageListenerContainer">    
  102.         <property name="connectionFactory" ref="myConnectionFactory2" />    
  103.         <!-- 发布订阅模式 -->    
  104.         <property name="pubSubDomain" value="true"/>    
  105.         <!-- 消息持久化 -->    
  106.         <property name="subscriptionDurable" value="true"/>    
  107.         <property name="receiveTimeout" value="10"/>    
  108.         <!-- 接收者ID -->    
  109.         <property name="clientId" value="${topic2.clientId}" />    
  110.         <property name="durableSubscriptionName" value="${topic2.clientId}"/>    
  111.         <property name="destination" ref="myDestination2" />    
  112.         <property name="messageListener" ref="myTopicListener2" />    
  113.     </bean>   
  114.     
  115. </beans>    



4、编写接收java,ActiveMQreceiver.java

[java] view plain copy
 
  1. package com.by.activeMQ;  
  2.   
  3. import javax.jms.JMSException;  
  4. import javax.jms.TextMessage;  
  5.   
  6. import org.springframework.jms.JmsException;  
  7.   
  8. public class ActiveMQreceiver {  
  9.     public void receive(TextMessage message) throws JmsException, JMSException {   
  10.         String info = "this is receiver, "  
  11.                 + " mood is " + message.getStringProperty("mood") + ","  
  12.                 + "say " + message.getText();  
  13.         System.out.println(info);  
  14.     }   
  15. }  




5、编写另一个接收java,ActiveMQreceiver.java

[java] view plain copy
 
  1. package com.by.activeMQ;  
  2.   
  3. import javax.jms.JMSException;  
  4. import javax.jms.TextMessage;  
  5.   
  6. import org.springframework.jms.JmsException;  
  7.   
  8. public class ActiveMQreceiver2 {  
  9.     public void receive(TextMessage message) throws JmsException, JMSException {   
  10.         String info = "this is receiver2,"  
  11.                 + " mood is " + message.getStringProperty("mood") + ","  
  12.                 + "say " + message.getText();  
  13.         System.out.println(info);  
  14.     }   
  15. }  




6、编写一个main,开启接收监听,openReceive.java

[java] view plain copy
 
  1. package com.by.activeMQ;  
  2.   
  3. import org.springframework.context.ApplicationContext;  
  4. import org.springframework.context.support.ClassPathXmlApplicationContext;  
  5.   
  6. public class openReceive {  
  7.   
  8.     public static void main(String[] args) {  
  9.         @SuppressWarnings({ "unused", "resource" })  
  10.         ApplicationContext ctx = new ClassPathXmlApplicationContext("ApplicationContext/applicationContext-receive.xml");    
  11.         while(true) {    
  12.         }   
  13.     }  
  14.   
  15. }  




7、编写一个配置文件,jms.properties

 

[plain] view plain copy
 
  1. #send  
  2. send.name=Topic_Mood  
  3.   
  4. #receive  
  5. topic.name=Topic_Mood  
  6. topic.clientId=client_LiLei  
  7.   
  8. topic2.name=Topic_Mood  
  9. topic2.clientId=client_HanMei  
  10.   
  11. #url  
  12. brokerUrl=failover:(tcp://10.0.0.232:61616)?initialReconnectDelay=1000  

 

 

8、pom里面添加activeMQ的依赖

 

[html] view plain copy
 
  1. <dependency>  
  2.     <groupId>org.apache.activemq</groupId>  
  3.     <artifactId>activemq-pool</artifactId>  
  4.     <version>5.11.1</version>  
  5. </dependency>  
  6. <dependency>  
  7.     <groupId>org.apache.commons</groupId>  
  8.     <artifactId>commons-pool2</artifactId>  
  9.     <version>2.3</version>  
  10. </dependency>  
  11. <dependency>  
  12.     <groupId>org.springframework</groupId>  
  13.     <artifactId>spring-jms</artifactId>  
  14.     <version>4.0.0.RELEASE</version>  
  15. </dependency>  
  16.   
  17. <dependency>  
  18.     <groupId>org.apache.activemq</groupId>  
  19.     <artifactId>activemq-all</artifactId>  
  20.     <version>5.11.1</version>  
  21. </dependency>  



 


耶,运行就ok了。
发送消息的输出是这样的:

[plain] view plain copy
 
  1. 2016-08-05 11:27:19 [ main:0 ] - [ INFO ] Refreshing org[email protected]16011db4: startup date [Fri Aug 05 11:27:19 CST 2016]; root of context hierarchy  
  2. 2016-08-05 11:27:19 [ main:31 ] - [ INFO ] Loading XML bean definitions from class path resource [ApplicationContext/applicationContext-send.xml]  
  3. 2016-08-05 11:27:19 [ main:187 ] - [ INFO ] Loading properties file from class path resource [properties/jms.properties]  
  4. 2016-08-05 11:27:19 [ main:392 ] - [ INFO ] Established shared JMS Connection: ActiveMQConnection {id=ID:MDG42V9PSU28IKA-60542-1470367639797-1:1,clientId=null,started=false}  
  5. 2016-08-05 11:27:19 [ ActiveMQ Task-1:467 ] - [ INFO ] Successfully connected to tcp://10.0.0.232:61616  
  6. send end  



接收消息的输出是这样的:

[plain] view plain copy
 
  1. 2016-08-05 11:28:04 [ ActiveMQ Task-1:490 ] - [ INFO ] Successfully connected to tcp://10.0.0.232:61616  
  2. 2016-08-05 11:28:04 [ main:498 ] - [ INFO ] Established shared JMS Connection: ActiveMQConnection {id=ID:MDG42V9PSU28IKA-60544-1470367684739-1:1,clientId=client_LiLei,started=false}  
  3. 2016-08-05 11:28:04 [ ActiveMQ Task-1:504 ] - [ INFO ] Successfully connected to tcp://10.0.0.232:61616  
  4. 2016-08-05 11:28:04 [ main:509 ] - [ INFO ] Established shared JMS Connection: ActiveMQConnection {id=ID:MDG42V9PSU28IKA-60544-1470367684739-3:1,clientId=client_HanMei,started=false}  
  5. this is receiver2, mood is happy,say Hello World!  
  6. this is receiver,  mood is happy,say Hello World!  



配置另一个接收者就是,把第一个接收者的配置复制,然后添加个2,再把接收类复制,添加个2,就搞定了。这种方式也适用于mongodb啊这种配置。在一个工程里面操作两个mongodb数据库。

以上是关于ActiveMQ订阅模式持久化实现的主要内容,如果未能解决你的问题,请参考以下文章

JMS学习--ActiveMQ中的消息的持久化和非持久化 以及 持久订阅者 和 非持久订阅者之间的区别与联系

ActiveMQ持久化机制

持久订阅 ActiveMQ

Springboot整合activeMQ之Topic,不懂也得懂了吧

ActiveMQ(21):Consumer高级特性之管理持久化订阅(Manage Durable Subscribers)

如何让我的 ActiveMQ 代理删除离线持久订阅者