学习ActiveMQ:spring与ActiveMQ整合
Posted 今天也是阳光正好
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了学习ActiveMQ:spring与ActiveMQ整合相关的知识,希望对你有一定的参考价值。
在上一篇中已经怎么使用activemq的api来实现消息的发送接收了,但是在实际的开发过程中,我们很少使用activemq直接上去使用,因为我们每次都要创建连接工厂,创建连接,创建session。。。有些繁琐,那么利用spring的话简单多了,强大的spring
提供了对了jms的支持,我们可以使用JmsTemplate来实现,JmsTemplate隔离了像打开、关闭Session和Producer的繁琐操作,因此应用开发人员仅仅需要关注实际的业务逻辑,接下来就一起来看看具体怎么做吧。
首先我们打开idea新建一个maven的webapp项目,对项目项目结构进行一下调整,如下图:
接下来对pom.xml进行修改,需要spring的jar包和jms的jar包,我贴出我的完整代码:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.easylab.jms</groupId> <artifactId>jms-spring</artifactId> <version>1.0-SNAPSHOT</version> <properties> <spring.version>4.2.5.RELEASE</spring.version> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.9.0</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.4</version> <exclusions> <exclusion> <artifactId>spring-context</artifactId> <groupId>org.springframework</groupId> </exclusion> </exclusions> </dependency> </dependencies> </project>
在resource文件夹下面建立一个common.xml文件,并进行相关配置,配置作用看代码注释。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <!--支持注解--> <context:component-scan base-package="com.easylab.jms"/> <!--使用PooledConnectionFactory对session和消息producer的缓存机制带来的性能提升--> <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <!--连接mq的连接工厂--> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL"> <value>tcp://127.0.0.1:61616</value> </property> </bean> </property> <property name="maxConnections" value="100"></property> </bean> <!--使用缓存可以提升效率--> <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="connectionFactory"/> <property name="sessionCacheSize" value="1"/> </bean> <!--配置JmsTemplate,用于发送消息--> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"/> <!--给一个默认的是destination--> <property name="defaultDestination" ref="queueDestination"/> </bean> <!--队列模式的destination--> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="queue"/> </bean> <!--主题模式的destination配置--> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="topic"/> </bean> <!--消费者所需--> <!--配置 消息监听容器 使用DefaultMessageListenerContainer,它允许异步接收消息并缓存session和消息consumer,而且还可以根据消息数量动态的增加或缩减监听器的数量--> <bean id="jmsContainer" class=" org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="destination" ref="queueDestination"/> <property name="messageListener" ref="consumerMessageListener"/> </bean> <!--配置 消息监听容器--> <bean id="jmsContainerTopic" class=" org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="destination" ref="topicDestination"/> <property name="messageListener" ref="consumerMessageListenerTopic"/> </bean> <!--自定义消息监听器--> <bean id="consumerMessageListener" class="com.easylab.jms.consumer.ConsumerMessageListener"/> <bean id="consumerMessageListenerTopic" class="com.easylab.jms.consumer.ConsumerMessageListenerTopic"/> </beans>
接下来在com.easylab.jms.producer创建一个IProducerService 发送者接口
package com.easylab.jms.producer; /****************************** * @author : liuyuan * <p>ProjectName:jms-spring </p> * @ClassName : IProducerService * @date : 2018/6/24 0024 * @time : 8:41 * @createTime 2018-06-24 8:41 * @version : 2.0 * @description :消息发送 *******************************/ public interface IProducerService { public void sendMessage(String message); }
对这个接口进行实现,在com.easylab.jms.producer创建一个ProducerServiceImpl实现类(使用队列模式发送)
package com.easylab.jms.producer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Service; import javax.annotation.Resource; import javax.jms.*; /****************************** * @author : liuyuan * <p>ProjectName:jms-spring </p> * @ClassName : ProducerServiceImpl * @date : 2018/6/24 0024 * @time : 8:42 * @createTime 2018-06-24 8:42 * @version : 2.0 * @description : *******************************/ @Service public class ProducerServiceImpl implements IProducerService { @Autowired JmsTemplate jmsTemplate; //这里不定义的话,就使用在xml文件里配置的默认的destination /* @Resource(name = "queueDestination") Destination destination;*/ public void sendMessage(final String message) { // 使用JmsTemplate发送消息 jmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { // 创建一个消息 TextMessage textMessage = session.createTextMessage(message); return textMessage; } }); System.out.println("发送消息" + message); } }
再对这个接口写一个实现,在com.easylab.jms.producer创建一个ProducerServiceImplTopic实现类(使用主题模式发送)
package com.easylab.jms.producer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Service; import javax.annotation.Resource; import javax.jms.*; /****************************** * @author : liuyuan * <p>ProjectName:jms-spring </p> * @ClassName : ProducerServiceImpl * @date : 2018/6/24 0024 * @time : 8:42 * @createTime 2018-06-24 8:42 * @version : 2.0 * @description : *******************************/ @Service public class ProducerServiceImplTopic implements IProducerService { @Autowired JmsTemplate jmsTemplate; //指定主题模式 @Resource(name = "topicDestination") Destination destination; public void sendMessage( final String message) { // 使用JmsTemplate发送消息 jmsTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { // 创建一个消息 TextMessage textMessage = session.createTextMessage(message); return textMessage; } }); System.out.println("发送消息" + message); } }
ok,接下在com.easylab.jms.produce写一个AppProducer运行类就行了
package com.easylab.jms.producer; import javafx.application.Application; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.xbean.spring.context.ClassPathXmlApplicationContext; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.ApplicationContext; import javax.jms.*; /****************************** * @author : liuyuan * <p>ProjectName:jms-test </p> * @ClassName : AppProducer * @date : 2018/6/20 0020 * @time : 20:46 * @createTime 2018-06-20 20:46 * @version : 2.0 * @description : *******************************/ public class AppProducer { public static void main(String[] args) { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("common.xml"); IProducerService service = (IProducerService)context.getBean(ProducerServiceImpl.class); for (int i = 0; i < 2; i++) { service.sendMessage("test" + i); } //切换到主题模式下再发送几条 service = (IProducerService)context.getBean(ProducerServiceImplTopic.class); for (int i = 0; i < 10; i++) { service.sendMessage("test" + i); } context.close(); } }
可以运行一下看看,可以看到发送信息,但是没有接受消息,因为我们还没有写消费者,我们可以通过监听器来监听队列,如果队列中有消息,就会直接进入到监听器的onMessage方法中进行我们的业务处理,所以我们可以不用写消费者代码了。用监听器即可。
在配置文件中,我们写了两个监听容器和监听器,用他们分别监听队列和主题,查看效果。接下来创建两个监听器.
在com.easylab.jms.produce写一个ConsumerMessageListener监听器:
package com.easylab.jms.consumer; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /****************************** * @author : liuyuan * <p>ProjectName:jms-spring </p> * @ClassName : ConsumerMessageListener * @date : 2018/6/24 0024 * @time : 9:16 * @createTime 2018-06-24 9:16 * @version : 2.0 * @description : * * * *******************************/ public class ConsumerMessageListener implements MessageListener { public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("队列监听器接收消息" + textMessage); } catch (Exception e) { e.printStackTrace(); } } }
在com.easylab.jms.produce写一个ConsumerMessageListener监听器:
package com.easylab.jms.consumer; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /****************************** * @author : liuyuan * <p>ProjectName:jms-spring </p> * @ClassName : ConsumerMessageListener * @date : 2018/6/24 0024 * @time : 9:16 * @createTime 2018-06-24 9:16 * @version : 2.0 * @description : *******************************/ public class ConsumerMessageListenerTopic implements MessageListener { public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("我是topic监听器,接收到消息:" + textMessage); } catch (Exception e) { e.printStackTrace(); } } }
到这里代码就全部写完了,运行刚刚创建启动类
可以清楚的看到消息队列接收到了两条,主题队列接收到了十条,测试成功。
代码地址:https://github.com/MrLiu1227/ActiveMQ
以上是关于学习ActiveMQ:spring与ActiveMQ整合的主要内容,如果未能解决你的问题,请参考以下文章
activemq 学习系列 activemq 与 spring boot 整合