基于Spring支持的通信
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于Spring支持的通信相关的知识,希望对你有一定的参考价值。
概述
基于Spring支持的客户端编程,包括发送方客户端、接收方客户端。
发送方客户端代码:jms-producer
接收方客户端代码:jms-consumer
本文有pdf版本:基于Spring支持的通信.pdf
发送方客户端
这里基于demo进行说明。这个demo将往example.queue和example.topic各发一条信息。
文件目录结构
1 src/main/resources/ 2 |---- jndi.properties 3 |---- spring-beans.xml 4 src/main/java/ 5 |---- cn.sinobest.asj.producer.springsupport.simple 6 |---- SimpleProducer.java # 发送测试类 7 pom.xml
文件内容
1.jndi.properties
1 java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory 2 3 # use the following property to configure the default connector 4 java.naming.provider.url=tcp://localhost:61616 5 6 # register some queues in JNDI using the form 7 # queue.[jndiName] = [physicalName] 8 queue.exampleQueue=example.queue 9 10 # register some topics in JNDI using the form 11 # topic.[jndiName] = [physicalName] 12 topic.exampleTopic=example.topic
2.spring-beans.xml
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xmlns:context="http://www.springframework.org/schema/context" 5 xsi:schemaLocation="http://www.springframework.org/schema/beans 6 http://www.springframework.org/schema/beans/spring-beans-4.2.xsd 7 http://www.springframework.org/schema/context 8 http://www.springframework.org/schema/context/spring-context-4.2.xsd"> 9 <!-- import outer properties file --> 10 <context:property-placeholder location="classpath:jndi.properties" /> 11 12 <!-- create pooled connection factory --> 13 <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> 14 <property name="connectionFactory"> 15 <bean class="org.apache.activemq.ActiveMQConnectionFactory"> 16 <property name="brokerURL" value="${java.naming.provider.url}" /> 17 <property name="closeTimeout" value="60000" /> 18 </bean> 19 </property> 20 </bean> 21 22 <!-- create queue destination --> 23 <bean id="exampleQueue" class="org.apache.activemq.command.ActiveMQQueue"> 24 <constructor-arg value="${queue.exampleQueue}" /> 25 </bean> 26 27 <!-- create topic destination --> 28 <bean id="exampleTopic" class="org.apache.activemq.command.ActiveMQTopic"> 29 <constructor-arg value="${topic.exampleTopic}" /> 30 </bean> 31 32 <!-- create template for send message --> 33 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 34 <!-- bind the connection factory --> 35 <property name="connectionFactory" ref="pooledConnectionFactory" /> 36 <!-- bind the default destination, but you can also appoint other destination when send --> 37 <property name="defaultDestination" ref="exampleQueue" /> 38 </bean> 39 </beans>
说明:
JmsTemplate提供了多个重载的send方法,用于发送消息,但是它依赖于ConnectionFactory资源。
在声明ConnectionFactory资源时,我们使用PooledConnectionFactory包装ActiveMQConnectionFactory[1],这是为了有效的利用资源,就像JDBC中的连接池。
此外,还声明了两个Destination:exampleQueue和exampleTopic,并将exampleQueue作为jmsTemplate的defaultDestination。另一个exampleTopic,将在SimpleProducer.java中使用。
这里使用${java.naming.provider.url}的方式引用的属性值,都来自jndi.properties。
注:
1. 在我参考的系列文章中,还有使用org.springframework.jms.connection.SingleConnectionFactory来包装ActiveMQConnectionFactory的。另外,我觉得直接使用ActiveMQConnectionFactory也可以。
3.SimpleProducer.java
1 package cn.sinobest.asj.producer.springsupport.simple; 2 import javax.annotation.Resource; 3 import javax.jms.Destination; 4 import javax.jms.JMSException; 5 import javax.jms.Message; 6 import javax.jms.Session; 7 import javax.jms.TextMessage; 8 import org.junit.Test; 9 import org.junit.runner.RunWith; 10 import org.springframework.jms.core.JmsTemplate; 11 import org.springframework.jms.core.MessageCreator; 12 import org.springframework.test.context.ContextConfiguration; 13 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; 14 /** 15 * 单元测试类.<br> 16 * 基于Spring运行环境的测试. 17 * @author lijinlong 18 * 19 */ 20 @RunWith(SpringJUnit4ClassRunner.class) // 配置spring组件运行时 21 @ContextConfiguration("/spring-beans.xml") // 配置文件 22 public class SimpleProducer { 23 @Resource(name="exampleTopic") 24 private Destination exampleTopic; 25 26 @Resource(name="jmsTemplate") 27 private JmsTemplate jmsTemplate; 28 29 @Test 30 public void test() { 31 // 发送到默认的destination - queue 32 jmsTemplate.send(new MessageCreator() { 33 34 public Message createMessage(Session session) throws JMSException { 35 TextMessage msg = session.createTextMessage(); 36 msg.setText("This message is plan to the queue."); 37 return msg; 38 } 39 }); 40 41 // 发送到指定的destination - topic 42 jmsTemplate.send(exampleTopic, new MessageCreator() { 43 44 public Message createMessage(Session session) throws JMSException { 45 TextMessage msg = session.createTextMessage(); 46 msg.setText("This message is plan to the topic."); 47 return msg; 48 } 49 }); 50 } 51 }
说明:
使用RunWith注解,为单元测试类指定Spring运行环境,就可以在单元测试类中接受依赖的注入。
使用ContextConfiguration注解,进一步指定spring beans的配置文件。
test方法是测试方法,它使用不同的重载的send方法,分别向默认的Destination、exampleTopic destination发送消息。
JmsTemplate的send方法:
- send(String destinationName, MessageCreator messageCreator):void
将messageCreator创建的消息,发送到destinationName指定的目的地。
destinationName:目的地的名称,具体按照queue还是topic解析,需要另外的参数指定[1];默认情况应该是按照queue来解析的。
messageCreator:回调接口,提供回调方法,基于Session创建消息。
- send(Destination destination, MessageCreator messageCreator):void
将messageCreator创建的消息,发送到指定的Destination。 - send(MessageCreator messageCreator):void
将messageCreator创建的消息,发送到默认的Destination。
注:
1. 应该是在声明JmsTemplate bean时,指定pubSubDomain属性:false - Queues;true - Topics。
4.pom.xml
这里仅记录添加的依赖,完整的内容可以参考pom.xml。
1 <!-- import org.apache.activemq.pool.PooledConnectionFactory using in spring-beans.xml --> 2 <dependency> 3 <groupId>org.apache.activemq</groupId> 4 <artifactId>activemq-pool</artifactId> 5 <version>5.13.2</version> 6 </dependency> 7 <!-- import for unit test --> 8 <dependency> 9 <groupId>junit</groupId> 10 <artifactId>junit</artifactId> 11 <version>4.12</version> 12 </dependency> 13 <!-- import spring support --> 14 <dependency> 15 <groupId>org.springframework</groupId> 16 <artifactId>spring-context</artifactId> 17 <version>4.2.4.RELEASE</version> 18 </dependency> 19 <dependency> 20 <groupId>org.springframework</groupId> 21 <artifactId>spring-jms</artifactId> 22 <version>4.2.4.RELEASE</version> 23 </dependency> 24 <dependency> 25 <groupId>org.springframework</groupId> 26 <artifactId>spring-test</artifactId> 27 <version>4.2.4.RELEASE</version> 28 </dependency>
接收方客户端
这里基于demo进行说明。这个demo将接收example.queue和example.topic的消息。
文件目录结构
1 src/main/resources/ 2 |---- jndi.properties 3 |---- spring-beans.xml 4 src/main/java/ 5 |---- cn.sinobest.asj.consumer.util 6 |---- Hold.java # 提供pause功能 7 |---- cn.sinobest.asj.consumer.springsupport.simple 8 |---- SimpleConsumer.java # 单元测试类 9 pom.xml
文件内容
1.jndi.properties
1 java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory 2 3 # use the following property to configure the default connector 4 java.naming.provider.url=tcp://localhost:61616 5 6 # register some queues in JNDI using the form 7 # queue.[jndiName] = [physicalName] 8 queue.exampleQueue=example.queue 9 10 # register some topics in JNDI using the form 11 # topic.[jndiName] = [physicalName] 12 topic.exampleTopic=example.topic
2.spring-beans.xml
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xmlns:context="http://www.springframework.org/schema/context" 5 xmlns:jms="http://www.springframework.org/schema/jms" 6 xsi:schemaLocation="http://www.springframework.org/schema/beans 7 http://www.springframework.org/schema/beans/spring-beans-4.2.xsd 8 http://www.springframework.org/schema/context 9 http://www.springframework.org/schema/context/spring-context-4.2.xsd 10 http://www.springframework.org/schema/jms 11 http://www.springframework.org/schema/jms/spring-jms-4.2.xsd"> 12 <!-- import outer properties file --> 13 <context:property-placeholder location="classpath:jndi.properties" /> 14 15 <!-- create pooled connection factory --> 16 <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> 17 <property name="connectionFactory"> 18 <bean class="org.apache.activemq.ActiveMQConnectionFactory"> 19 <property name="brokerURL" value="${java.naming.provider.url}" /> 20 <property name="closeTimeout" value="60000" /> 21 </bean> 22 </property> 23 </bean> 24 25 <bean id="simpleConsumer" class="cn.sinobest.asj.consumer.springsupport.simple.SimpleConsumer" /> 26 27 <!-- config listener container for queues --> 28 <jms:listener-container connection-factory="pooledConnectionFactory"> 29 <jms:listener destination="${queue.exampleQueue}" ref="simpleConsumer" 30 method="onMessageFromQueue" /> 31 </jms:listener-container> 32 33 <!-- config listener container for topics --> 34 <jms:listener-container connection-factory="pooledConnectionFactory" 35 destination-type="topic"> 36 <jms:listener destination="${topic.exampleTopic}" ref="simpleConsumer" 37 method="onMessageFromTopic" /> 38 </jms:listener-container> 39 </beans>
说明:
这里声明了两个listener-container,这是我们分析配置的起点。
listener-container用于管理listener:listener-container的connection-factory依赖于ConnectionFactory实例,destination-type标识listener监听的destination的类型,默认为queue。
listener配置监听器:destination配置监听的目的地,ref、method分别配置消息处理bean和方法——监听器收到消息之后,会调用对应的bean方法[1]。
simpleConsumer作为消息处理bean,对于这个类本身没有特殊的要求。
这里使用${java.naming.provider.url}的方式引用的属性值,都来自jndi.properties。
注:
1. 要求配置的方法的参数只有一个,且类型和Message中封装的数据类型兼容。比如Message是一个TextMessage,那么String类型足以满足;如果Message是一个ObjectMessage,且封装的Object是字符串数组类型,那就要提供一个可以兼容字符串数组的参数类型 —— 提供重载的方法,Spring会在类型兼容的前提下,选择一个最接近的方法调用。
基于bean的配置
这是另外一种异步接收的配置方式:
1 <bean id="messageContainer" 2 class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 3 <property name="connectionFactory" ref="connectionFactory" /> 4 <property name="destination" ref="destination" /> 5 <property name="messageListener" ref="messageListener" /> 6 </bean>
两种配置的比较
基于listener-container的配置 | 基于bean的配置 |
一个container可以配置多个listener | 一个container只能配置一个listener |
配置消息处理类来接收消息,且对这个类没有类型的要求。 | 用messageListener属性配置消息监听器,要求实现javax.jms.MessageListener接口。 |
对Message进行了预处理:根据接收到的Message的类型进行解封,根据解封的类型选择一个兼容的处理方法调用 | 没有对Message进行预处理 |
同步接收方式
JmsTemplate同样提供了一系列receive方法来接受消息。参考前文发送方客户端中配置JmsTemplate的方式,然后调用receive方法进行同步接收。
- receive系列方法,只是单纯的接收,返回的结果是javax.jms.Message类型。
- receiveSelected系列方法,在receive系列方法的基础上加入了选择器。
- receiveAndConvert系列方法,接收并转换,返回的是Object类型(实际类型是Message内部封装的类型)。
- receiveSelectedAndConvert系列方法,在receiveAndConvert系列方法的基础上加入了选择器。
3.Hold.java
1 package cn.sinobest.asj.consumer.util; 2 /** 3 * 防止程序立即退出.<br> 4 * @author lijinlong 5 * 6 */ 7 public class Hold { 8 /** 9 * 防止程序立即退出,直到用户输入Enter键. 10 */ 11 public static void hold() { 12 @SuppressWarnings("resource") 13 java.util.Scanner scanner = new java.util.Scanner(System.in); 14 System.out.println("按Enter键退出:"); 15 scanner.nextLine(); 16 } 17 }
基于异步的接收方式,需要防止主线程执行完毕后退出。
4.SimpleConsumer.java
1 package cn.sinobest.asj.consumer.springsupport.simple; 2 import org.junit.Test; 3 import org.junit.runner.RunWith; 4 import org.springframework.test.context.ContextConfiguration; 5 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; 6 import cn.sinobest.asj.consumer.util.Hold; 7 @RunWith(SpringJUnit4ClassRunner.class) 8 @ContextConfiguration("/spring-beans.xml") 9 public class SimpleConsumer { 10 @Test 11 public void test() { 12 Hold.hold(); 13 } 14 15 public void onMessageFromQueue(String message) { 16 System.out.println("从queue收到消息:" + message); 17 } 18 19 public void onMessageFromTopic(String message) { 20 System.out.println("从Topic收到消息:" + message); 21 } 22 }
5.pom.xml
这里仅记录添加的依赖,完整的内容可以参考pom.xml。
1 <!-- import org.apache.activemq.pool.PooledConnectionFactory using in spring-beans.xml --> 2 <dependency> 3 <groupId>org.apache.activemq</groupId> 4 <artifactId>activemq-pool</artifactId> 5 <version>5.13.2</version> 6 </dependency> 7 <!-- import for unit test --> 8 <dependency> 9 <groupId>junit</groupId> 10 <artifactId>junit</artifactId> 11 <version>4.12</version> 12 </dependency> 13 <!-- import spring support --> 14 <dependency> 15 <groupId>org.springframework</groupId> 16 <artifactId>spring-context</artifactId> 17 <version>4.2.4.RELEASE</version> 18 </dependency> 19 <dependency> 20 <groupId>org.springframework</groupId> 21 <artifactId>spring-jms</artifactId> 22 <version>4.2.4.RELEASE</version> 23 </dependency> 24 <dependency> 25 <groupId>org.springframework</groupId> 26 <artifactId>spring-test</artifactId> 27 <version>4.2.4.RELEASE</version> 28 </dependency>
测试
- 启动ActiveMQ
- 以JUnit的方式运行SimpleConsumer.java
- 以JUnit的方式运行SimpleProducer.java
- 在SimpleConsumer的控制台可以看到接收到的消息
附录
参考文章
- Apache-ActiveMQ整合Spring
使用JmsTemplate发送消息,基于bean的消息接收配置,JUnit集成Spring的单元测试 - ActiveMQ 使用示例
基于listener-container的消息接收配置,使用PooledConnectionFactory配置连接池 - Spring Support
ActiveMQ官方文档
以上是关于基于Spring支持的通信的主要内容,如果未能解决你的问题,请参考以下文章
带你手写基于 Spring 的可插拔式 RPC 框架通信协议模块