Spring + JMS + ActiveMQ实现简单的消息队列(监听器异步实现)

Posted 专注it

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring + JMS + ActiveMQ实现简单的消息队列(监听器异步实现)相关的知识,希望对你有一定的参考价值。

首先声明:以下内容均是在网上找别人的博客综合学习而成的,可能会发现某些代码与其他博主的相同,由于参考的文章比较多,这里对你们表示感谢,就不一一列举,如果有侵权的地方,请通知我,我可以把该文章删除。

1、jms-xml Spring配置文件

 

  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <beans  
  3.     xmlns="http://www.springframework.org/schema/beans"  
  4.     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
  5.     xmlns:p="http://www.springframework.org/schema/p"  
  6.     xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">  
  7.   
  8.     <bean id = "connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
  9.         <property name = "brokerURL" value = "tcp://localhost:61616"/>  
  10.     </bean>  
  11.       
  12.     <bean id = "topicDestination" class="org.apache.activemq.command.ActiveMQTopic"  
  13.         autowire="constructor">  
  14.         <constructor-arg value="com.spring.xkey.jms.topic"/>  
  15.     </bean>  
  16.     <bean id="sendMessage" class="com.spring.xkey.jms.SendMessage">  
  17.         <property name="username" value="xkey"/>  
  18.         <property name="password" value="1234567890"/>  
  19.     </bean>  
  20.     <bean id = "jmsMessageConverter" class="com.spring.xkey.jms.JmsMessageConverter">  
  21.         <property name="sendMessage" ref="sendMessage"/>  
  22.     </bean>  
  23.     <!-- 创建JMS发送信息的模板的对象 -->    
  24.     <bean id = "jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
  25.         <property name="connectionFactory" ref="connectionFactory"/>  
  26.         <!--property name="defaultDestination" ref="topicDestination"/-->  
  27.         <property name="receiveTimeout" value="6000"/>  
  28.         <property name="messageConverter" ref="jmsMessageConverter"/>  
  29.     </bean>  
  30.       
  31.     <bean id = "jmsMessageListener" class="com.spring.xkey.jms.JmsMessageListener">  
  32.     </bean>  
  33.       
  34.     <bean id = "publisher" class="com.spring.xkey.jms.Publisher">  
  35.         <property name="jmsTemplate" ref="jmsTemplate"/>  
  36.         <property name="destinations" ref="topicDestination" />  
  37.         <property name="sendMessage" ref="sendMessage"/>  
  38.           
  39.     </bean>  
  40.       
  41.     <bean id = "consumer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
  42.         <property name="connectionFactory" ref="connectionFactory"/>  
  43.         <property name="destination" ref="topicDestination" />  
  44.         <property name="messageListener" ref="jmsMessageListener" />  
  45.     </bean>  
  46.       
  47. </beans>  

2、Listener代码

 

 

  1. package com.spring.xkey.jms;  
  2.   
  3. import java.util.Date;  
  4.   
  5. import javax.jms.JMSException;  
  6. import javax.jms.Message;  
  7. import javax.jms.MessageListener;  
  8.   
  9. import org.apache.activemq.command.ActiveMQMapMessage;  
  10.   
  11. public class JmsMessageListener implements MessageListener {  
  12.   
  13.     public void onMessage(Message message) {  
  14.         ActiveMQMapMessage msg = null;  
  15.         //System.out.println("ONMessage-----------------" + message.toString());  
  16.         try {  
  17.             if (message instanceof ActiveMQMapMessage) {  
  18.                 msg = (ActiveMQMapMessage) message;  
  19.                 String username = msg.getString("username");  
  20.                 String password = msg.getString("password");  
  21.                 System.out.println("Message::: "+username+", "+password);  
  22. //              msg = (ActiveMQMapMessage) message;  
  23. //              String sentDate = msg.getString("date");  
  24. //              String reMessage = msg.getString("message");  
  25. //              int sentCount = msg.getInt("count");  
  26. //              System.out  
  27. //                      .println("-------------New Message Arrival-----------"  
  28. //                              + new Date());  
  29. //              System.out.println("It‘s " + sentCount + " time From Darcy: "  
  30. //                      + reMessage + "   ---Send time :" + sentDate);  
  31.             }  
  32.         } catch (JMSException e) {  
  33.             System.out.println("JMSException in onMessage(): " + e.toString());  
  34.         } catch (Throwable t) {  
  35.             System.out.println("Exception in onMessage():" + t.getMessage());  
  36.         }  
  37.   
  38.     }  
  39.   
  40. }  


3、Converter代码

 

 

  1. package com.spring.xkey.jms;  
  2.   
  3. import javax.jms.JMSException;  
  4. import javax.jms.MapMessage;  
  5. import javax.jms.Message;  
  6. import javax.jms.Session;  
  7.   
  8. import org.springframework.jms.support.converter.MessageConversionException;  
  9. import org.springframework.jms.support.converter.MessageConverter;  
  10.   
  11. public class JmsMessageConverter implements MessageConverter{  
  12.   
  13.     private SendMessage sendMessage;  
  14.     public void setSendMessage(SendMessage sendMsg){  
  15.         this.sendMessage = sendMsg;  
  16.     }  
  17.       
  18.     public Object fromMessage(Message message) throws JMSException,  
  19.             MessageConversionException {  
  20.         // TODO Auto-generated method stub  
  21.         MapMessage  mapmessage= (MapMessage)message;   
  22.         this.sendMessage.setUsername(mapmessage.getString("username"));  
  23.         this.sendMessage.setPassword(mapmessage.getString("password"));  
  24.         System.out.println("First");  
  25.         return sendMessage;  
  26.     }  
  27.   
  28.     public Message toMessage(Object arg0, Session session) throws JMSException,  
  29.             MessageConversionException {  
  30.         // TODO Auto-generated method stub  
  31.         this.sendMessage = (SendMessage)arg0;  
  32.         MapMessage  mapmessage= (MapMessage) session.createMapMessage();  
  33.         mapmessage.setString("username", this.sendMessage.getUsername());  
  34.         mapmessage.setString("password", this.sendMessage.getPassword());  
  35.         System.out.println("Second");  
  36.         return mapmessage;  
  37.     }  
  38.   
  39. }  


4、Publisher代码

 

 

  1. package com.spring.xkey.jms;  
  2.   
  3. import java.util.Scanner;  
  4.   
  5. import javax.jms.Destination;  
  6.   
  7. import org.springframework.jms.core.JmsTemplate;  
  8.   
  9. public class Publisher {  
  10.     private JmsTemplate template;  
  11.     private Destination[] destinations;  
  12.     private SendMessage sendMessage;  
  13.    
  14.     public void chart()  
  15.     {  
  16.         boolean chart = true;  
  17.         int count = 0;  
  18.         while(chart)  
  19.         {  
  20.             count ++;  
  21.             Scanner cin=new Scanner(System.in);  
  22.             System.out.println("输入聊天内容,输入N停止聊天");  
  23.             String text=cin.nextLine();  
  24.             if(text.equals("N"))  
  25.             {  
  26.                 chart = false;  
  27.             }  
  28.             System.out.println("我:"+text);  
  29.             sendChartMessage(count,text);  
  30.         }  
  31.    
  32.     }  
  33.     public void sendMsgCon(){  
  34.         Scanner cin=new Scanner(System.in);  
  35.         String username = cin.nextLine();  
  36.         String password = cin.nextLine();  
  37.         this.sendMessage.setUsername(username);  
  38.         this.sendMessage.setPassword(password);  
  39.         sendConvertor(this.sendMessage);  
  40.     }  
  41.       
  42.     public void sendConvertor(SendMessage sendMsg){  
  43.         template.convertAndSend(destinations[0],sendMsg);  
  44.     }  
  45.       
  46.     protected void sendChartMessage(int count , String strMessage)  
  47.     {  
  48.         MyMessageCreator creator = new MyMessageCreator(count,strMessage);  
  49.         template.send(destinations[0], creator);  
  50.     }  
  51.    
  52.     public JmsTemplate getJmsTemplate() {  
  53.         return template;  
  54.     }  
  55.    
  56.     public void setJmsTemplate(JmsTemplate template) {  
  57.         this.template = template;  
  58.     }  
  59.    
  60.     public Destination[] getDestinations() {  
  61.         return destinations;  
  62.     }  
  63.    
  64.     public void setDestinations(Destination[] destinations) {  
  65.         this.destinations = destinations;  
  66.     }  
  67.       
  68.     public void setSendMessage(SendMessage sendMsg){  
  69.         this.sendMessage = sendMsg;  
  70.     }  
  71.     public SendMessage getSendMessage(){  
  72.         return this.sendMessage;  
  73.     }  
  74. }  


5、SendMessage代码

  1. package com.spring.xkey.jms;  
  2.   
  3. public class SendMessage {  
  4.     private String username;  
  5.     private String password;  
  6.       
  7.     public void setUsername(String user){  
  8.         this.username = user;  
  9.     }  
  10.     public void setPassword(String pass){  
  11.         this.password = pass;  
  12.     }  
  13.       
  14.     public String getUsername(){  
  15.         return this.username;  
  16.     }  
  17.     public String getPassword(){  
  18.         return this.password;  
  19.     }  
  20. }  


6、Test类

    1. package com.spring.xkey.jms;  
    2.   
    3. import javax.jms.JMSException;  
    4.   
    5. import org.springframework.context.ApplicationContext;  
    6. import org.springframework.context.support.ClassPathXmlApplicationContext;  
    7. import org.springframework.jms.listener.DefaultMessageListenerContainer;  
    8.   
    9. public class Test {  
    10.   
    11.     /** 
    12.      * @param args 
    13.      */  
    14.     public static void main(String[] args) {  
    15.         // TODO Auto-generated method stub  
    16.         ApplicationContext context =   
    17.             new ClassPathXmlApplicationContext("jms.xml");  
    18.         /**Sender sender = (Sender)context.getBean("sender"); 
    19.         sender.SendInfo(); 
    20.         Receiver receiver = (Receiver)context.getBean("receiver"); 
    21.         try { 
    22.             System.out.println(receiver.receiverInfo()); 
    23.              
    24.         } catch (JMSException e) { 
    25.             // TODO Auto-generated catch block 
    26.             e.printStackTrace(); 
    27.         }*/  
    28.         Publisher pub = (Publisher)context.getBean("publisher");  
    29.         DefaultMessageListenerContainer consumer =   
    30.             (DefaultMessageListenerContainer)context.getBean("consumer");  
    31.         consumer.start();  
    32.         pub.sendMsgCon();  
    33.         //pub.chart();  
    34.     }  
    35.   
    36. }  

以上是关于Spring + JMS + ActiveMQ实现简单的消息队列(监听器异步实现)的主要内容,如果未能解决你的问题,请参考以下文章

深入浅出JMS--Spring和ActiveMQ整合的完整实例

Spring + JMS + ActiveMQ实现简单的消息队列(监听器异步实现)

深入浅出JMS--Spring和ActiveMQ整合的完整实例

深入浅出JMS--Spring和ActiveMQ整合的完整实例

Spring集成:控制ActiveMQ连接

ActiveMQ rebalanceClusterClients 不适用于 Spring Boot JMS