ActiveMQ(06):ActiveMQ结合Spring开发

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ActiveMQ(06):ActiveMQ结合Spring开发相关的知识,希望对你有一定的参考价值。

一、pom.xml与mq.properties

Spring提供了对JMS的支持,需要添加Spring支持jms的包,如下:

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>4.1.7.RELEASE</version>
</dependency>

添加ActiveMQ的pool包,如下:

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-pool</artifactId>
    <version>5.11.1</version>
</dependency>

添加xbean的标签配置,如下:

<dependency>
    <groupId>org.apache.xbean</groupId>
    <artifactId>xbean-spring</artifactId>
    <version>3.16</version>
</dependency>

pom.xml完整配置如下:

<properties>
    <activemq.version>5.9.0</activemq.version>
    <activemq-pool.version>5.11.1</activemq-pool.version>
    <spring.version>4.1.7.RELEASE</spring.version>
    <xbean.version>3.16</xbean.version>
    <commons-lang3.version>3.3.2</commons-lang3.version>
    <commons-io.version>2.4</commons-io.version>
    <commons-fileupload.version>1.3.1</commons-fileupload.version>
    <fasterxml.jackson.version>2.8.4</fasterxml.jackson.version>
    <codehaus.jackson.version>1.9.13</codehaus.jackson.version>
</properties>
<dependencies>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
	<version>${junit.version}</version>
    </dependency>
    <!-- Apache工具组件 -->
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
	<version>${commons-lang3.version}</version>
    </dependency>
    <dependency>
        <groupId>commons-io</groupId>
        <artifactId>commons-io</artifactId>
	<version>${commons-io.version}</version>
    </dependency>
    <dependency>
        <groupId>commons-fileupload</groupId>
	<artifactId>commons-fileupload</artifactId>
	<version>${commons-fileupload.version}</version>
    </dependency>
    <!-- jackson -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
	<artifactId>jackson-databind</artifactId>
	<version>${fasterxml.jackson.version}</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
	<artifactId>jackson-core</artifactId>
	<version>${fasterxml.jackson.version}</version>
    </dependency>
    <dependency>
        <groupId>org.codehaus.jackson</groupId>
	<artifactId>jackson-core-asl</artifactId>
        <version>${codehaus.jackson.version}</version>
    </dependency>
    <dependency>
        <groupId>org.codehaus.jackson</groupId>
	<artifactId>jackson-mapper-asl</artifactId>
	<version>${codehaus.jackson.version}</version>
    </dependency>
    <!-- activemq -->
    <dependency>
        <groupId>org.apache.activemq</groupId>
	<artifactId>activemq-all</artifactId>
	<version>${activemq.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.activemq</groupId>
	<artifactId>activemq-pool</artifactId>
	<version>${activemq-pool.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
	<artifactId>spring-jms</artifactId>
	<version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.xbean</groupId>
	<artifactId>xbean-spring</artifactId>
	<version>${xbean.version}</version>
    </dependency>
    <!-- spring -->
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
	<version>${spring.version}</version>
    </dependency>
    <dependency>
    	<groupId>org.springframework</groupId>
	<artifactId>spring-context-support</artifactId>
	<version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
	<artifactId>spring-beans</artifactId>
	<version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
	<artifactId>spring-webmvc</artifactId>
	<version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jdbc</artifactId>
	<version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
	<artifactId>spring-aspects</artifactId>
	<version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
	<artifactId>spring-test</artifactId>
	<version>${spring.version}</version>
    </dependency>
</dependencies>

二、mq.xml配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd   
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context-4.0.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">
    
    <!-- ActiveMQ 连接工厂 -->
    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
    <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->
    <amq:connectionFactory id="amqConnectionFactory" brokerURL="${activemq.brokerURL}" userName="${activemq.userName}" password="${activemq.password}" />
    
    <!-- Spring Caching连接工厂 -->
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
        <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
        <!-- 同上,同理 -->
        <!-- <constructor-arg ref="amqConnectionFactory" /> -->
        <!-- Session缓存数量 -->
        <property name="sessionCacheSize" value="100" />
    </bean>
    
    <!-- =======Spring JmsTemplate 的消息生产者【开始】======== -->
    <!-- 定义JmsTemplate的Queue类型 -->
    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
        <constructor-arg ref="connectionFactory" />
        <!-- 非pub/sub模型(发布/订阅),即队列模式 -->
        <property name="pubSubDomain" value="false" />
    </bean>

    <!-- 定义JmsTemplate的Topic类型 -->
    <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
         <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
        <constructor-arg ref="connectionFactory" />
        <!-- pub/sub模型(发布/订阅) -->
        <property name="pubSubDomain" value="true" />
    </bean>
    <!-- =======Spring JmsTemplate 的消息生产者【结束】======== -->
    
    <!-- =======消息消费者=======【开始】 -->
    <!-- 定义Queue监听器 -->
    <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
        <jms:listener destination="test.queue" ref="queueReceiver1"/>
        <jms:listener destination="test.queue" ref="queueReceiver2"/>
    </jms:listener-container>

    <!-- 定义Topic监听器 -->
    <jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
        <jms:listener destination="test.topic" ref="topicReceiver1"/>
        <jms:listener destination="test.topic" ref="topicReceiver2"/>
    </jms:listener-container>
	<!-- =======消息消费者=======【结束】 -->
</beans>

三、java类

3.1 消费者监听器

3.1.1 队列消息监听器

package com.liuy.mq.consumer.queue;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.springframework.stereotype.Component;

/**
 * 队列消息监听器1
 * @description 队列消息监听器1
 * @author liuy
 * @version V1.00
 * @date:2017年4月12日上午10:15:19
 */
@Component
public class QueueReceiver1 implements MessageListener {

    @Override
    public void onMessage(Message message) {
        try {
	    System.out.println("QueueReceiver1接收到消息:"+((TextMessage)message).getText());
	} catch (JMSException e) {
	    e.printStackTrace();
	}
    }
}
package com.liuy.mq.consumer.queue;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.springframework.stereotype.Component;

/**
 * 队列消息监听器2
 * @description 队列消息监听器2
 * @author liuy
 * @version V1.00
 * @date:2017年4月12日上午10:15:19
 */
@Component
public class QueueReceiver2 implements MessageListener {

    @Override
    public void onMessage(Message message) {
        try {
	    System.out.println("QueueReceiver2接收到消息:"+((TextMessage)message).getText());
	} catch (JMSException e) {
	    e.printStackTrace();
	}
    }
}

3.1.2 Topic消息监听器

package com.liuy.mq.consumer.topic;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.springframework.stereotype.Component;

/**
 * Topic消息监听器1
 * @description Topic消息监听器1
 * @author liuy
 * @version V1.00
 * @date:2017年4月12日上午10:17:11
 */
@Component
public class TopicReceiver1 implements MessageListener{


    @Override
    public void onMessage(Message message) {
        try {
	    System.out.println("TopicReceiver1接收到消息:"+((TextMessage)message).getText());
	} catch (JMSException e) {
	    e.printStackTrace();
	}
    }
	
}
package com.liuy.mq.consumer.topic;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.springframework.stereotype.Component;

/**
 * Topic消息监听器2
 * @description Topic消息监听器2
 * @author liuy
 * @version V1.00
 * @date:2017年4月12日上午10:17:11
 */
@Component
public class TopicReceiver2 implements MessageListener{


    @Override
    public void onMessage(Message message) {
    	try {
            System.out.println("TopicReceiver2接收到消息:"+((TextMessage)message).getText());
	} catch (JMSException e) {
	    e.printStackTrace();
	}
    }
	
}

3.2 消息生产者

package com.liuy.mq.producer.queue;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

/**
 * 队列消息生产者,发送消息到队列
 * @description 队列消息生产者,发送消息到队列
 * @author liuy
 * @version V1.00
 * @date:2017年4月12日上午10:20:46
 */
@Component("queueSender")
public class QueueSender {
	
    @Autowired
    @Qualifier("jmsQueueTemplate")
    private JmsTemplate jmsTemplate;//通过@Qualifier修饰符来注入对应的bean
	
    /**
     * 发送一条消息到指定的队列(目标)
     * @param queueName 队列名称
     * @param message 消息内容
     */
    public void send(String queueName,final String message){
    	jmsTemplate.send(queueName, new MessageCreator() {
    	    @Override
	    public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(message);
	    }
        });
    }
	
}
package com.liuy.mq.producer.topic;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

/**
 * Topic生产者发送消息到Topic
 * @description Topic生产者发送消息到Topic
 * @author liuy
 * @version V1.00
 * @date:2017年4月12日上午10:20:46
 */
@Component("topicSender")
public class TopicSender {
    @Autowired
    @Qualifier("jmsTopicTemplate")
    private JmsTemplate jmsTemplate;
	
    /**
     * 发送一条消息到指定的队列(目标)
     * @param queueName 队列名称
     * @param message 消息内容
     */
    public void send(String topicName,final String message){
        jmsTemplate.send(topicName, new MessageCreator() {
	    @Override
	    public Message createMessage(Session session) throws JMSException {
	        return session.createTextMessage(message);
	    }
	});
    }

}

四、测试

package com.liuy.test.common;

import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

/**
 * 测试共公类
 * @description 测试共公类
 * @author liuy
 * @version V1.00
 * @date:2016年4月24日下午5:20:54
 */
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:application-context.xml")
public class SpringJunitTest 
{

}
package com.liuy.test.core;

import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;

import com.liuy.mq.producer.queue.QueueSender;
import com.liuy.mq.producer.topic.TopicSender;
import com.liuy.test.common.SpringJunitTest;

/**
 * @description 描述
 * @author liuy
 * @version 1.0
 * @date:2017年4月11日下午9:00:18
 */
public class SpringQueueTest extends SpringJunitTest {
	@Autowired 
	private QueueSender queueSender;
	@Autowired 
	private TopicSender topicSender;
	
	/**
	 * 发送消息到队列
	 * Queue队列:仅有一个订阅者会收到消息,消息一旦被处理就不会存在队列中
	 * @param message
	 * @return String
	 */
	@Test
	public void testQueueSend() throws Exception {
		queueSender.send("test.queue", "测试");
	}
	
	/**
	 * 发送消息到主题
	 * Topic主题 :放入一个消息,所有订阅者都会收到 
	 * 这个是主题目的地是一对多的
	 * @param message
	 * @return String
	 */
	@Test
	public void testTopicSend() throws Exception {
		topicSender.send("test.topic", "测试222");
	}
}


效果:

 列队:

    技术分享

 主题:

    技术分享



本文出自 “我爱大金子” 博客,请务必保留此出处http://1754966750.blog.51cto.com/7455444/1915190

以上是关于ActiveMQ(06):ActiveMQ结合Spring开发的主要内容,如果未能解决你的问题,请参考以下文章

activemq的配置与结合spring使用

ActiveMQ(07):ActiveMQ结合Spring开发--建议

ActiveMQ结合Spring收发消息

ActiveMQ结合Spring开发

分布式-信息方式-ActiveMQ结合Spring

ActiveMQ RabbitMQ RokcetMQ Kafka实战 消息队列中间件视频教程