ActiveMQ之与Spring集成

Posted zengnansheng

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ActiveMQ之与Spring集成相关的知识,希望对你有一定的参考价值。

增加maven依赖

spring版本此处是4.3.5

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>${spring.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.10.0</version>
</dependency>
<!-- xbean 如<amq:connectionFactory /> -->
<dependency>
    <groupId>org.apache.xbean</groupId>
    <artifactId>xbean-spring</artifactId>
    <version>4.5</version>
</dependency>
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-pool</artifactId>
    <version>5.10.0</version>
</dependency>

增加spring-activemq.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans.xsd   
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- ActiveMQ 连接工厂 -->
    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
    <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->
    <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://localhost:61616" userName="admin" password="admin"  />

    <!-- Spring Caching连接工厂 -->
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
        <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
        <!-- 同上,同理 -->
        <!-- <constructor-arg ref="amqConnectionFactory" /> -->
        <!-- Session缓存数量 -->
        <property name="sessionCacheSize" value="100" />
    </bean>

    <!-- 定义一个(Queue)Destination -->
    <bean id="demoQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <!-- 设置消息队列的名字 -->
        <constructor-arg><value>myqueue</value></constructor-arg>
    </bean>
    <!-- 定义一个Queue类型的JmsTemplate -->
    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
        <!-- <constructor-arg ref="connectionFactory" /> -->
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="defaultDestination" ref="demoQueueDestination" />
        <!-- pubSubDomain:false表示非pub/sub模型(发布/订阅),即队列模式 -->
        <property name="pubSubDomain" value="false" />
    </bean>
    <!-- 定义Queue监听器 -->  
    <bean id="queueListenerContainer1" class="org.springframework.jms.listener.DefaultMessageListenerContainer">       
        <property name="connectionFactory" ref="connectionFactory" />   
        <property name="pubSubDomain" value="false"/>     
        <property name="destination" ref="demoQueueDestination" />    
        <property name="messageListener" ref="queueListenter1" />          
    </bean>    
    <bean id="queueListenter1" class="com.zns.listenter.QueueListenter1"></bean>


    <!-- 定义一个(Topic)Destination -->
    <bean id="demoTopicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <!-- 设置消息队列的名字 -->
        <constructor-arg><value>myqueue</value></constructor-arg>
    </bean>
    <!-- 定义一个Topic类型的JmsTemplate -->
    <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
        <!-- <constructor-arg ref="connectionFactory" /> -->
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="defaultDestination" ref="demoTopicDestination" />
        <!-- pubSubDomain:false表示非pub/sub模型(发布/订阅),即队列模式 -->
        <property name="pubSubDomain" value="true" />
    </bean>
    <!-- 定义Topic监听器 -->  
    <bean id="topicListenerContainer1" class="org.springframework.jms.listener.DefaultMessageListenerContainer">       
        <property name="connectionFactory" ref="connectionFactory" />   
        <property name="pubSubDomain" value="true"/>     
        <property name="destination" ref="demoTopicDestination" />    
        <property name="messageListener" ref="topicListenter1" />          
    </bean>
    <bean id="topicListenerContainer2" class="org.springframework.jms.listener.DefaultMessageListenerContainer">       
        <property name="connectionFactory" ref="connectionFactory" />   
        <property name="pubSubDomain" value="true"/>     
        <property name="destination" ref="demoTopicDestination" />    
        <property name="messageListener" ref="topicListenter2" />          
    </bean>    
    <bean id="topicListenter1" class="com.zns.listenter.TopicListenter1"></bean>
    <bean id="topicListenter2" class="com.zns.listenter.TopicListenter2"></bean>    
</beans>

 

 

引入spring-activemq.xml文件

<import resource="classpath:spring/spring-activemq.xml"/>

 

消息监听器

package com.zns.listenter;
import javax.jms.JMSException;
import javax.jms.Message;  
import javax.jms.MessageListener;  
import javax.jms.TextMessage;  
   
public class QueueListenter1 implements MessageListener {   
    public void onMessage(Message message) {  
        TextMessage textMsg = (TextMessage) message;  
        try {  
            System.out.println("消息内容是:" + textMsg.getText());  
        } catch (JMSException e) {  
            e.printStackTrace();  
        }  
    }   
}
package com.zns.listenter;
import javax.jms.JMSException;
import javax.jms.Message;  
import javax.jms.MessageListener;  
import javax.jms.TextMessage;  
   
public class TopicListenter1 implements MessageListener {   
    public void onMessage(Message message) {  
        TextMessage textMsg = (TextMessage) message;  
        try {  
            System.out.println("监听者1 收到消息内容是:" + textMsg.getText());  
        } catch (JMSException e) {  
            e.printStackTrace();  
        }  
    }   
}
package com.zns.listenter;
import javax.jms.JMSException;
import javax.jms.Message;  
import javax.jms.MessageListener;  
import javax.jms.TextMessage;  
   
public class TopicListenter2 implements MessageListener {   
    public void onMessage(Message message) {  
        TextMessage textMsg = (TextMessage) message;  
        try {  
            System.out.println("监听者2 收到消息内容是:" + textMsg.getText());  
        } catch (JMSException e) {  
            e.printStackTrace();  
        }  
    }   
}

控制器Controller核心代码

@Resource(name = "jmsQueueTemplate")
private JmsTemplate jmsQueueTemplate;
    
@Resource(name = "demoQueueDestination")
@Autowired
private Destination demoQueueDestination;
    
@Resource(name = "jmsTopicTemplate")
private JmsTemplate jmsTopicTemplate;
    
@Resource(name = "demoTopicDestination")
@Autowired
private Destination demoTopicDestination;


final String msg="hello,queue...";
System.out.println("发送消息...");
jmsQueueTemplate.send(demoQueueDestination, new MessageCreator() {
    public Message createMessage(Session session) throws JMSException {
        return session.createTextMessage(msg);
    }
});


final String msg="hello,topic...";
System.out.println("发送消息...");
jmsTopicTemplate.send(demoTopicDestination, new MessageCreator() {
    public Message createMessage(Session session) throws JMSException {
        return session.createTextMessage(msg);
    }
});

发送消息后,可以看到监听器接收到消息

以上是关于ActiveMQ之与Spring集成的主要内容,如果未能解决你的问题,请参考以下文章

Spring集成:控制ActiveMQ连接

干货 | 从零开始学 Java - Spring 集成 ActiveMQ 配置

Spring集成Activemq使用(未完待续)

ActiveMQ与Spring集成

spring集成ActiveMQ居然要依赖这么多包

使用 spring 集成自动配置 ActiveMQ