activemq的配置与结合spring使用

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了activemq的配置与结合spring使用相关的知识,希望对你有一定的参考价值。

其实无论在win下还是在linux下,都可以运行得很爽

下载安装包地址:

http://www.apache.org/dyn/closer.cgi?path=/activemq/5.12.1/apache-activemq-5.12.1-bin.tar.gz

安装

1)解压文件

tar zxvf apache-activemq-5.12.1-bin.tar.gz

 

2)改个文件名字

mv apache-activemq-5.12.1 ../system/activemq

 

3)配置用户密码,比如我在user.properties中添加了信息

admin=admin
chenweixian=chenweixian

技术分享

4)启动,根据你自己当前部署的系统环境选择启动,如果是win下的,就根据自己系统的位数,选择32或64.如果是linux就直接启动activemq就可以了

执行命令:*****为具体路径。。

******/activemq/bin/activemq start

 

执行命令前请去确认是否已经授权,如果没有授权,到当前bin目录下,执行:

chmod 777 *

 

技术分享

java工程中的配置

5)启动完成后

测试访问:

http://ip:8161/admin/

正常如下图:

技术分享

 

6)maven工程中添加引入jar包,不是maven工程,自己找以下两个jar包加入工程中。

<!-- 使用activeMq消息队列 -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-core</artifactId>
            <version>5.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-spring</artifactId>
            <version>5.7.0</version>
        </dependency>

7)在web.xml中添加引入:

    <!-- ContextLoaderListener初始化Spring上下文时需要使用到的contextConfigLocation参数 -->
    <context-param>
        <param-name>contextConfigLocation</param-name>
        <!-- 配置spring.xml和spring-mybatis.xml这两个配置文件的位置,固定写法 -->
        <param-value>
            classpath:spring.xml,
            classpath:spring-mybatis.xml,
            classpath:spring-activitymq.xml,
            classpath:dubbo.xml
        </param-value>
    </context-param>

8)spring-activitymq.xml文件配置

位置:

技术分享

内容:

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:p="http://www.springframework.org/schema/p"
    xmlns:task="http://www.springframework.org/schema/task"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:aop="http://www.springframework.org/schema/aop" 
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd  
    http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd  
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd  
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd  
    http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd">

    <!--创建连接工厂-->
    <bean id="targetConnectionFactory"
        class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://192.168.10.102:61616"></property>
    </bean>
    <!-- 声明ActiveMQ消息目标,目标可以是一个队列,也可以是一个主题ActiveMQTopic,消息队列名称
    <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg index="0" value="myQueue"></constructor-arg>
    </bean>-->
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
    </bean>
    <!--模板消息-->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="receiveTimeout" value="600"></property>
    </bean>
    
    <!-- 队列A start-->
    <!--这个是队列目的地-->
    <bean id="demoQueueA" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg><value>demoQueueA</value></constructor-arg>
    </bean>
    <!-- 消息监听容器 -->
    <bean id="demoMessageListenerA"
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="demoQueueA" />
        <property name="messageListener" ref="demoMessageListener" />
    </bean>
    <!-- 队列A end-->
    <!-- 队列B start-->
    
    <!-- 队列A start-->
    <!--这个是队列目的地-->
    <bean id="demoQueueB" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg><value>demoQueueB</value></constructor-arg>
    </bean>
    <!-- 消息监听容器 -->
    <bean id="demoMessageListenerB"
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="demoQueueB" />
        <property name="messageListener" ref="demoMessageListener2" />
    </bean>
    <!-- 队列A end-->
    <!-- 队列B end-->
</beans>

 

9)java监听器

DemoMessageListener.java

package com.iafclub.demo.activityMq;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.apache.log4j.Logger;
import org.springframework.stereotype.Component;

/**接收mq消息
 * 
 * @author chenweixian
 *
 */
@Component
public class DemoMessageListener implements MessageListener {
    private static final Logger logger = Logger.getLogger(DemoMessageListener.class);
    
    public void onMessage(Message message) {
        String messageStr = "DemoMessageListener接收消息";
        //这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换  
        TextMessage textMsg = (TextMessage) message;  
        logger.info(messageStr + "..........start......");  
        try {  

            logger.info("消息内容是:" + textMsg.getText());  
        } catch (JMSException e) {  
            logger.error(messageStr, e); 
        } 
        logger.info(messageStr + "..........end."); 
    }
}

 

DemoMessageListener2.java

package com.iafclub.demo.activityMq;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.apache.log4j.Logger;
import org.springframework.stereotype.Component;

import com.iafclub.baseTools.contants.ControllerContants;

/**接收mq消息
 * 
 * @author chenweixian
 *
 */
@Component
public class DemoMessageListener2 implements MessageListener {
    private static final Logger logger = Logger.getLogger(DemoMessageListener2.class);
    
    public void onMessage(Message message) {
        String messageStr = "DemoMessageListener2接收消息";
        //这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换  
        TextMessage textMsg = (TextMessage) message;  
        logger.info(messageStr + ControllerContants.MESSAGE_START);  
        try {  

            logger.info("消息内容是:" + textMsg.getText());  
        } catch (JMSException e) {  
            logger.error(messageStr, e); 
        } 
        logger.info(messageStr + ControllerContants.MESSAGE_START); 
    }
}

10)使用junit测试:

package test.iafclub.mq;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import net.sf.json.JSONArray;
import net.sf.json.JSONObject;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.iafclub.baseTools.mq.ActiveMqUtil;
import com.iafclub.demo.domain.Dictionary;

@RunWith(SpringJUnit4ClassRunner.class)
//配置了@ContextConfiguration注解并使用该注解的locations属性指明spring和配置文件之后,
@ContextConfiguration(locations = {"classpath:spring.xml","classpath:spring-activitymq-test.xml", "classpath:spring-mybatis.xml", "classpath:dubbo-test.xml" })
public class ActiveMqTest {
    @Autowired
    private ActiveMqUtil activeMqUtil;
    String queueName = "chenweixianQueue";
    
    @Test
    public void testSenderMq(){
        for (int i=0;i<10;i++){
            Dictionary dictionary = new Dictionary();
            dictionary.setId(UUID.randomUUID().toString());
            dictionary.setTypeId("002");
            dictionary.setTypeName("字典分类");
            dictionary.setFieldKey("username"+i);
            dictionary.setFieldValue("陈惟鲜");
            dictionary.setFieldBack("back1");
            dictionary.setFieldBack2("back2");
            dictionary.setFieldBack3("back3");
            dictionary.setRemark("备注"+i);
            String messageContent = JSONObject.fromObject(dictionary).toString();
            System.out.println("发送消息:" + messageContent);
            activeMqUtil.sendMq("chenweixianQueue", messageContent);
        }
        

        List<Dictionary> dictionarys = new ArrayList<Dictionary>();
        for (int i=0;i<10;i++){
            Dictionary dictionary = new Dictionary();
            dictionary.setId(UUID.randomUUID().toString());
            dictionary.setTypeId("002");
            dictionary.setTypeName("字典分类");
            dictionary.setFieldKey("username"+i);
            dictionary.setFieldValue("陈惟鲜");
            dictionary.setFieldBack("back1");
            dictionary.setFieldBack2("back2");
            dictionary.setFieldBack3("back3");
            dictionary.setRemark("备注"+i);
            dictionarys.add(dictionary);
        }
        String messageContent = JSONArray.fromObject(dictionarys).toString();
        System.out.println("发送消息:" + messageContent);
        activeMqUtil.sendMq("chenweixianQueue2", messageContent);
        
        System.out.println("发送完成");
    }

//    @Test
//    public void testReceiverMq(){
//        String result = activeMqUtil.receiveMq(queueName);
//        System.out.println(result);
//        System.out.println("接收完成");
//    }
//    @Test
//    public void testParam(){
//        System.out.println(System.getProperty("webAppRootKey"));
//    }
}

因为使用简单方便,都是用json进行数据传输,所以,封装了一个工具类ActiveMqUtil.java

package com.iafclub.baseTools.mq;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

@Component
public class ActiveMqUtil {
    
    private Logger logger = Logger.getLogger(ActiveMqUtil.class);
    
    @Autowired
    private JmsTemplate jmsTemplate;
    
    /**发送消息
     * 
     * @param messageString
     */
    public void sendMq(String destinationName, final String messageString) {
        
        jmsTemplate.send(destinationName, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                // 消息内容
                TextMessage message = session.createTextMessage(messageString);
                return message;
            }
        });
    }
    
    /**接收消息系统启动的时候
     * 
     * @param messageString
     */
    public String receiveMq(String destinationName) {
        String result = "";
        ActiveMQTextMessage message = (ActiveMQTextMessage)jmsTemplate.receive(destinationName);
        try {
            result = message.getText();
        } catch (JMSException e) {
            logger.error(e);
        }
        return result;
    }
}

运行junit:

技术分享

 

进入mq管理界面:可以看到我们刚刚发送的消息队列中有的消息个数,消费次数。。。

技术分享

进入一个消息中,能看到消息具体内容,因为我们发送过来的是json格式的信息,所以,在这个服务器上能看到的内容也是json格式的内容。

技术分享

11)现在消息在服务器上,因为我们刚刚已经配置了与之关联的spring-activemq.xml中指定了消息名字,所以。当我们的web服务启动后,就自动订阅这个服务器上的消息。

当然也可以自己手动去触发,刚刚的测试例子中也有手动触发的个例。

技术分享

 

12)完毕。。。

以上是关于activemq的配置与结合spring使用的主要内容,如果未能解决你的问题,请参考以下文章

ActiveMQ(06):ActiveMQ结合Spring开发

ActiveMQ 与 WebSocket 的结合推送方案+Spring Websocket Stomp

ActiveMQ(06):ActiveMQ结合Spring开发--第二种方式

ActiveMQ结合Spring收发消息

spring jms结合activemq

ActiveMQ结合Spring开发