activemq的配置与结合spring使用
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了activemq的配置与结合spring使用相关的知识,希望对你有一定的参考价值。
其实无论在win下还是在linux下,都可以运行得很爽
下载安装包地址:
http://www.apache.org/dyn/closer.cgi?path=/activemq/5.12.1/apache-activemq-5.12.1-bin.tar.gz
安装
1)解压文件
tar zxvf apache-activemq-5.12.1-bin.tar.gz
2)改个文件名字
mv apache-activemq-5.12.1 ../system/activemq
3)配置用户密码,比如我在user.properties中添加了信息
admin=admin
chenweixian=chenweixian
4)启动,根据你自己当前部署的系统环境选择启动,如果是win下的,就根据自己系统的位数,选择32或64.如果是linux就直接启动activemq就可以了
执行命令:*****为具体路径。。
******/activemq/bin/activemq start
执行命令前请去确认是否已经授权,如果没有授权,到当前bin目录下,执行:
chmod 777 *
java工程中的配置
5)启动完成后
测试访问:
正常如下图:
6)maven工程中添加引入jar包,不是maven工程,自己找以下两个jar包加入工程中。
<!-- 使用activeMq消息队列 --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-spring</artifactId> <version>5.7.0</version> </dependency>
7)在web.xml中添加引入:
<!-- ContextLoaderListener初始化Spring上下文时需要使用到的contextConfigLocation参数 --> <context-param> <param-name>contextConfigLocation</param-name> <!-- 配置spring.xml和spring-mybatis.xml这两个配置文件的位置,固定写法 --> <param-value> classpath:spring.xml, classpath:spring-mybatis.xml, classpath:spring-activitymq.xml, classpath:dubbo.xml </param-value> </context-param>
8)spring-activitymq.xml文件配置
位置:
内容:
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:task="http://www.springframework.org/schema/task" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd"> <!--创建连接工厂--> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.10.102:61616"></property> </bean> <!-- 声明ActiveMQ消息目标,目标可以是一个队列,也可以是一个主题ActiveMQTopic,消息队列名称 <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="myQueue"></constructor-arg> </bean>--> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory"/> </bean> <!--模板消息--> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> <property name="receiveTimeout" value="600"></property> </bean> <!-- 队列A start--> <!--这个是队列目的地--> <bean id="demoQueueA" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg><value>demoQueueA</value></constructor-arg> </bean> <!-- 消息监听容器 --> <bean id="demoMessageListenerA" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="demoQueueA" /> <property name="messageListener" ref="demoMessageListener" /> </bean> <!-- 队列A end--> <!-- 队列B start--> <!-- 队列A start--> <!--这个是队列目的地--> <bean id="demoQueueB" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg><value>demoQueueB</value></constructor-arg> </bean> <!-- 消息监听容器 --> <bean id="demoMessageListenerB" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="demoQueueB" /> <property name="messageListener" ref="demoMessageListener2" /> </bean> <!-- 队列A end--> <!-- 队列B end--> </beans>
9)java监听器
DemoMessageListener.java
package com.iafclub.demo.activityMq; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; import org.apache.log4j.Logger; import org.springframework.stereotype.Component; /**接收mq消息 * * @author chenweixian * */ @Component public class DemoMessageListener implements MessageListener { private static final Logger logger = Logger.getLogger(DemoMessageListener.class); public void onMessage(Message message) { String messageStr = "DemoMessageListener接收消息"; //这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换 TextMessage textMsg = (TextMessage) message; logger.info(messageStr + "..........start......"); try { logger.info("消息内容是:" + textMsg.getText()); } catch (JMSException e) { logger.error(messageStr, e); } logger.info(messageStr + "..........end."); } }
DemoMessageListener2.java
package com.iafclub.demo.activityMq; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; import org.apache.log4j.Logger; import org.springframework.stereotype.Component; import com.iafclub.baseTools.contants.ControllerContants; /**接收mq消息 * * @author chenweixian * */ @Component public class DemoMessageListener2 implements MessageListener { private static final Logger logger = Logger.getLogger(DemoMessageListener2.class); public void onMessage(Message message) { String messageStr = "DemoMessageListener2接收消息"; //这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换 TextMessage textMsg = (TextMessage) message; logger.info(messageStr + ControllerContants.MESSAGE_START); try { logger.info("消息内容是:" + textMsg.getText()); } catch (JMSException e) { logger.error(messageStr, e); } logger.info(messageStr + ControllerContants.MESSAGE_START); } }
10)使用junit测试:
package test.iafclub.mq; import java.util.ArrayList; import java.util.List; import java.util.UUID; import net.sf.json.JSONArray; import net.sf.json.JSONObject; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import com.iafclub.baseTools.mq.ActiveMqUtil; import com.iafclub.demo.domain.Dictionary; @RunWith(SpringJUnit4ClassRunner.class) //配置了@ContextConfiguration注解并使用该注解的locations属性指明spring和配置文件之后, @ContextConfiguration(locations = {"classpath:spring.xml","classpath:spring-activitymq-test.xml", "classpath:spring-mybatis.xml", "classpath:dubbo-test.xml" }) public class ActiveMqTest { @Autowired private ActiveMqUtil activeMqUtil; String queueName = "chenweixianQueue"; @Test public void testSenderMq(){ for (int i=0;i<10;i++){ Dictionary dictionary = new Dictionary(); dictionary.setId(UUID.randomUUID().toString()); dictionary.setTypeId("002"); dictionary.setTypeName("字典分类"); dictionary.setFieldKey("username"+i); dictionary.setFieldValue("陈惟鲜"); dictionary.setFieldBack("back1"); dictionary.setFieldBack2("back2"); dictionary.setFieldBack3("back3"); dictionary.setRemark("备注"+i); String messageContent = JSONObject.fromObject(dictionary).toString(); System.out.println("发送消息:" + messageContent); activeMqUtil.sendMq("chenweixianQueue", messageContent); } List<Dictionary> dictionarys = new ArrayList<Dictionary>(); for (int i=0;i<10;i++){ Dictionary dictionary = new Dictionary(); dictionary.setId(UUID.randomUUID().toString()); dictionary.setTypeId("002"); dictionary.setTypeName("字典分类"); dictionary.setFieldKey("username"+i); dictionary.setFieldValue("陈惟鲜"); dictionary.setFieldBack("back1"); dictionary.setFieldBack2("back2"); dictionary.setFieldBack3("back3"); dictionary.setRemark("备注"+i); dictionarys.add(dictionary); } String messageContent = JSONArray.fromObject(dictionarys).toString(); System.out.println("发送消息:" + messageContent); activeMqUtil.sendMq("chenweixianQueue2", messageContent); System.out.println("发送完成"); } // @Test // public void testReceiverMq(){ // String result = activeMqUtil.receiveMq(queueName); // System.out.println(result); // System.out.println("接收完成"); // } // @Test // public void testParam(){ // System.out.println(System.getProperty("webAppRootKey")); // } }
因为使用简单方便,都是用json进行数据传输,所以,封装了一个工具类ActiveMqUtil.java
package com.iafclub.baseTools.mq; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Component; @Component public class ActiveMqUtil { private Logger logger = Logger.getLogger(ActiveMqUtil.class); @Autowired private JmsTemplate jmsTemplate; /**发送消息 * * @param messageString */ public void sendMq(String destinationName, final String messageString) { jmsTemplate.send(destinationName, new MessageCreator() { public Message createMessage(Session session) throws JMSException { // 消息内容 TextMessage message = session.createTextMessage(messageString); return message; } }); } /**接收消息系统启动的时候 * * @param messageString */ public String receiveMq(String destinationName) { String result = ""; ActiveMQTextMessage message = (ActiveMQTextMessage)jmsTemplate.receive(destinationName); try { result = message.getText(); } catch (JMSException e) { logger.error(e); } return result; } }
运行junit:
进入mq管理界面:可以看到我们刚刚发送的消息队列中有的消息个数,消费次数。。。
进入一个消息中,能看到消息具体内容,因为我们发送过来的是json格式的信息,所以,在这个服务器上能看到的内容也是json格式的内容。
11)现在消息在服务器上,因为我们刚刚已经配置了与之关联的spring-activemq.xml中指定了消息名字,所以。当我们的web服务启动后,就自动订阅这个服务器上的消息。
当然也可以自己手动去触发,刚刚的测试例子中也有手动触发的个例。
12)完毕。。。
以上是关于activemq的配置与结合spring使用的主要内容,如果未能解决你的问题,请参考以下文章
ActiveMQ(06):ActiveMQ结合Spring开发
ActiveMQ 与 WebSocket 的结合推送方案+Spring Websocket Stomp