基于Spring支持的通信

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于Spring支持的通信相关的知识,希望对你有一定的参考价值。

概述


基于Spring支持的客户端编程,包括发送方客户端、接收方客户端。
发送方客户端代码:jms-producer
接收方客户端代码:jms-consumer
 
本文有pdf版本:基于Spring支持的通信.pdf

发送方客户端


这里基于demo进行说明。这个demo将往example.queue和example.topic各发一条信息。

文件目录结构

1 src/main/resources/
2     |---- jndi.properties
3     |---- spring-beans.xml
4 src/main/java/
5     |---- cn.sinobest.asj.producer.springsupport.simple
6               |---- SimpleProducer.java # 发送测试类
7 pom.xml

文件内容

1.jndi.properties

 1 java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
 2 
 3 # use the following property to configure the default connector
 4 java.naming.provider.url=tcp://localhost:61616
 5 
 6 # register some queues in JNDI using the form
 7 # queue.[jndiName] = [physicalName]
 8 queue.exampleQueue=example.queue
 9 
10 # register some topics in JNDI using the form
11 # topic.[jndiName] = [physicalName]
12 topic.exampleTopic=example.topic

2.spring-beans.xml

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4     xmlns:context="http://www.springframework.org/schema/context"
 5     xsi:schemaLocation="http://www.springframework.org/schema/beans
 6         http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
 7         http://www.springframework.org/schema/context
 8         http://www.springframework.org/schema/context/spring-context-4.2.xsd">
 9     <!-- import outer properties file -->
10     <context:property-placeholder location="classpath:jndi.properties" />
11     
12     <!-- create pooled connection factory -->
13     <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
14         <property name="connectionFactory">
15             <bean class="org.apache.activemq.ActiveMQConnectionFactory">
16                 <property name="brokerURL" value="${java.naming.provider.url}" />
17                 <property name="closeTimeout" value="60000" />
18             </bean>
19         </property>
20     </bean>
21     
22     <!-- create queue destination -->
23     <bean id="exampleQueue" class="org.apache.activemq.command.ActiveMQQueue">
24         <constructor-arg value="${queue.exampleQueue}" />
25     </bean>
26     
27     <!-- create topic destination -->
28     <bean id="exampleTopic" class="org.apache.activemq.command.ActiveMQTopic">
29         <constructor-arg value="${topic.exampleTopic}" />
30     </bean>
31     
32     <!-- create template for send message -->
33     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
34         <!-- bind the connection factory -->
35         <property name="connectionFactory" ref="pooledConnectionFactory" />
36         <!-- bind the default destination, but you can also appoint other destination when send -->
37         <property name="defaultDestination" ref="exampleQueue" />
38     </bean>
39 </beans>

说明:

JmsTemplate提供了多个重载的send方法,用于发送消息,但是它依赖于ConnectionFactory资源。
在声明ConnectionFactory资源时,我们使用PooledConnectionFactory包装ActiveMQConnectionFactory[1],这是为了有效的利用资源,就像JDBC中的连接池
此外,还声明了两个Destination:exampleQueue和exampleTopic,并将exampleQueue作为jmsTemplate的defaultDestination。另一个exampleTopic,将在SimpleProducer.java中使用。
这里使用${java.naming.provider.url}的方式引用的属性值,都来自jndi.properties。
 
注:
1. 在我参考的系列文章中,还有使用org.springframework.jms.connection.SingleConnectionFactory来包装ActiveMQConnectionFactory的。另外,我觉得直接使用ActiveMQConnectionFactory也可以。

3.SimpleProducer.java

技术分享
 1 package cn.sinobest.asj.producer.springsupport.simple;
 2 import javax.annotation.Resource;
 3 import javax.jms.Destination;
 4 import javax.jms.JMSException;
 5 import javax.jms.Message;
 6 import javax.jms.Session;
 7 import javax.jms.TextMessage;
 8 import org.junit.Test;
 9 import org.junit.runner.RunWith;
10 import org.springframework.jms.core.JmsTemplate;
11 import org.springframework.jms.core.MessageCreator;
12 import org.springframework.test.context.ContextConfiguration;
13 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
14 /**
15  * 单元测试类.<br>
16  * 基于Spring运行环境的测试.
17  * @author lijinlong
18  *
19  */
20 @RunWith(SpringJUnit4ClassRunner.class) // 配置spring组件运行时
21 @ContextConfiguration("/spring-beans.xml") // 配置文件
22 public class SimpleProducer {
23     @Resource(name="exampleTopic")
24     private Destination exampleTopic;
25     
26     @Resource(name="jmsTemplate")
27     private JmsTemplate jmsTemplate;
28     
29     @Test
30     public void test() {
31         // 发送到默认的destination - queue
32         jmsTemplate.send(new MessageCreator() {
33             
34             public Message createMessage(Session session) throws JMSException {
35                 TextMessage msg = session.createTextMessage();
36                 msg.setText("This message is plan to the queue.");
37                 return msg;
38             }
39         });
40         
41         // 发送到指定的destination - topic
42         jmsTemplate.send(exampleTopic, new MessageCreator() {
43             
44             public Message createMessage(Session session) throws JMSException {
45                 TextMessage msg = session.createTextMessage();
46                 msg.setText("This message is plan to the topic.");
47                 return msg;
48             }
49         });
50     }
51 }
SimpleProducer.java

说明:

使用RunWith注解,为单元测试类指定Spring运行环境,就可以在单元测试类中接受依赖的注入。
使用ContextConfiguration注解,进一步指定spring beans的配置文件。
test方法是测试方法,它使用不同的重载的send方法,分别向默认的Destination、exampleTopic destination发送消息。
MessageCreator是一个消息创建器,消息的接收方式和消息的筛选一文,SendTemplate.java提供的抽象方法createMessage(Session)与之是相似的思路。
 
JmsTemplate的send方法:
  • send(String destinationName, MessageCreator messageCreator):void
    将messageCreator创建的消息,发送到destinationName指定的目的地。
    destinationName:目的地的名称,具体按照queue还是topic解析,需要另外的参数指定[1];默认情况应该是按照queue来解析的。
    messageCreator:回调接口,提供回调方法,基于Session创建消息。
  • send(Destination destination, MessageCreator messageCreator):void
    将messageCreator创建的消息,发送到指定的Destination。
  • send(MessageCreator messageCreator):void
    将messageCreator创建的消息,发送到默认的Destination。
注:
1. 应该是在声明JmsTemplate bean时,指定pubSubDomain属性:false - Queues;true - Topics。

4.pom.xml

这里仅记录添加的依赖,完整的内容可以参考pom.xml
 1 <!-- import org.apache.activemq.pool.PooledConnectionFactory using in spring-beans.xml -->
 2 <dependency>
 3     <groupId>org.apache.activemq</groupId>
 4     <artifactId>activemq-pool</artifactId>
 5     <version>5.13.2</version>
 6 </dependency>
 7 <!-- import for unit test -->
 8 <dependency>
 9     <groupId>junit</groupId>
10     <artifactId>junit</artifactId>
11     <version>4.12</version>
12 </dependency>
13 <!-- import spring support -->
14 <dependency>
15     <groupId>org.springframework</groupId>
16     <artifactId>spring-context</artifactId>
17     <version>4.2.4.RELEASE</version>
18 </dependency>
19 <dependency>
20     <groupId>org.springframework</groupId>
21     <artifactId>spring-jms</artifactId>
22     <version>4.2.4.RELEASE</version>
23 </dependency>
24 <dependency>
25     <groupId>org.springframework</groupId>
26     <artifactId>spring-test</artifactId>
27     <version>4.2.4.RELEASE</version>
28 </dependency>

接收方客户端


这里基于demo进行说明。这个demo将接收example.queue和example.topic的消息。

文件目录结构

1 src/main/resources/
2     |---- jndi.properties
3     |---- spring-beans.xml
4 src/main/java/
5     |---- cn.sinobest.asj.consumer.util
6               |---- Hold.java # 提供pause功能
7     |---- cn.sinobest.asj.consumer.springsupport.simple
8               |---- SimpleConsumer.java # 单元测试类
9 pom.xml

文件内容

1.jndi.properties

 1 java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
 2 
 3 # use the following property to configure the default connector
 4 java.naming.provider.url=tcp://localhost:61616
 5 
 6 # register some queues in JNDI using the form
 7 # queue.[jndiName] = [physicalName]
 8 queue.exampleQueue=example.queue
 9 
10 # register some topics in JNDI using the form
11 # topic.[jndiName] = [physicalName]
12 topic.exampleTopic=example.topic

2.spring-beans.xml

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4     xmlns:context="http://www.springframework.org/schema/context"
 5     xmlns:jms="http://www.springframework.org/schema/jms"
 6     xsi:schemaLocation="http://www.springframework.org/schema/beans
 7         http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
 8         http://www.springframework.org/schema/context
 9         http://www.springframework.org/schema/context/spring-context-4.2.xsd
10         http://www.springframework.org/schema/jms
11         http://www.springframework.org/schema/jms/spring-jms-4.2.xsd">
12     <!-- import outer properties file -->
13     <context:property-placeholder location="classpath:jndi.properties" />
14     
15     <!-- create pooled connection factory -->
16     <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
17         <property name="connectionFactory">
18             <bean class="org.apache.activemq.ActiveMQConnectionFactory">
19                 <property name="brokerURL" value="${java.naming.provider.url}" />
20                 <property name="closeTimeout" value="60000" />
21             </bean>
22         </property>
23     </bean>
24     
25     <bean id="simpleConsumer" class="cn.sinobest.asj.consumer.springsupport.simple.SimpleConsumer" />
26     
27     <!-- config listener container for queues -->
28     <jms:listener-container connection-factory="pooledConnectionFactory">
29         <jms:listener destination="${queue.exampleQueue}" ref="simpleConsumer"
30             method="onMessageFromQueue" />
31     </jms:listener-container>
32     
33     <!-- config listener container for topics -->
34     <jms:listener-container connection-factory="pooledConnectionFactory"
35         destination-type="topic">
36         <jms:listener destination="${topic.exampleTopic}" ref="simpleConsumer"
37             method="onMessageFromTopic" />
38     </jms:listener-container>
39 </beans>
说明:
这里声明了两个listener-container,这是我们分析配置的起点
listener-container用于管理listener:listener-container的connection-factory依赖于ConnectionFactory实例,destination-type标识listener监听的destination的类型,默认为queue。
listener配置监听器:destination配置监听的目的地,ref、method分别配置消息处理bean和方法——监听器收到消息之后,会调用对应的bean方法[1]
simpleConsumer作为消息处理bean,对于这个类本身没有特殊的要求。
这里使用${java.naming.provider.url}的方式引用的属性值,都来自jndi.properties。
 
注:
1. 要求配置的方法的参数只有一个,且类型和Message中封装的数据类型兼容。比如Message是一个TextMessage,那么String类型足以满足;如果Message是一个ObjectMessage,且封装的Object是字符串数组类型,那就要提供一个可以兼容字符串数组的参数类型 —— 提供重载的方法,Spring会在类型兼容的前提下,选择一个最接近的方法调用。

基于bean的配置

这是另外一种异步接收的配置方式:
1 <bean id="messageContainer"
2     class="org.springframework.jms.listener.DefaultMessageListenerContainer">
3     <property name="connectionFactory" ref="connectionFactory" />
4     <property name="destination" ref="destination" />
5     <property name="messageListener" ref="messageListener" />
6 </bean>

两种配置的比较

基于listener-container的配置 基于bean的配置
一个container可以配置多个listener 一个container只能配置一个listener
配置消息处理类来接收消息,且对这个类没有类型的要求。 用messageListener属性配置消息监听器,要求实现javax.jms.MessageListener接口。
对Message进行了预处理:根据接收到的Message的类型进行解封,根据解封的类型选择一个兼容的处理方法调用 没有对Message进行预处理

同步接收方式

JmsTemplate同样提供了一系列receive方法来接受消息。参考前文发送方客户端中配置JmsTemplate的方式,然后调用receive方法进行同步接收。
  • receive系列方法,只是单纯的接收,返回的结果是javax.jms.Message类型。
  • receiveSelected系列方法,在receive系列方法的基础上加入了选择器。
  • receiveAndConvert系列方法,接收并转换,返回的是Object类型(实际类型是Message内部封装的类型)。
  • receiveSelectedAndConvert系列方法,在receiveAndConvert系列方法的基础上加入了选择器。

3.Hold.java

 1 package cn.sinobest.asj.consumer.util;
 2 /**
 3  * 防止程序立即退出.<br>
 4  * @author lijinlong
 5  *
 6  */
 7 public class Hold {
 8     /**
 9      * 防止程序立即退出,直到用户输入Enter键.
10      */
11     public static void hold() {
12         @SuppressWarnings("resource")
13         java.util.Scanner scanner = new java.util.Scanner(System.in);
14         System.out.println("按Enter键退出:");
15         scanner.nextLine();
16     }
17 }
基于异步的接收方式,需要防止主线程执行完毕后退出。

4.SimpleConsumer.java

 1 package cn.sinobest.asj.consumer.springsupport.simple;
 2 import org.junit.Test;
 3 import org.junit.runner.RunWith;
 4 import org.springframework.test.context.ContextConfiguration;
 5 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 6 import cn.sinobest.asj.consumer.util.Hold;
 7 @RunWith(SpringJUnit4ClassRunner.class)
 8 @ContextConfiguration("/spring-beans.xml")
 9 public class SimpleConsumer {
10     @Test
11     public void test() {
12         Hold.hold();
13     }
14     
15     public void onMessageFromQueue(String message) {
16         System.out.println("从queue收到消息:" + message);
17     }
18     
19     public void onMessageFromTopic(String message) {
20         System.out.println("从Topic收到消息:" + message);
21     }
22 }

5.pom.xml

这里仅记录添加的依赖,完整的内容可以参考pom.xml
 1 <!-- import org.apache.activemq.pool.PooledConnectionFactory using in spring-beans.xml -->
 2 <dependency>
 3     <groupId>org.apache.activemq</groupId>
 4     <artifactId>activemq-pool</artifactId>
 5     <version>5.13.2</version>
 6 </dependency>
 7 <!-- import for unit test -->
 8 <dependency>
 9     <groupId>junit</groupId>
10     <artifactId>junit</artifactId>
11     <version>4.12</version>
12 </dependency>
13 <!-- import spring support -->
14 <dependency>
15     <groupId>org.springframework</groupId>
16     <artifactId>spring-context</artifactId>
17     <version>4.2.4.RELEASE</version>
18 </dependency>
19 <dependency>
20     <groupId>org.springframework</groupId>
21     <artifactId>spring-jms</artifactId>
22     <version>4.2.4.RELEASE</version>
23 </dependency>
24 <dependency>
25     <groupId>org.springframework</groupId>
26     <artifactId>spring-test</artifactId>
27     <version>4.2.4.RELEASE</version>
28 </dependency>

测试


  1. 启动ActiveMQ
  2. 以JUnit的方式运行SimpleConsumer.java
    技术分享
  3. 以JUnit的方式运行SimpleProducer.java
  4. 在SimpleConsumer的控制台可以看到接收到的消息
    技术分享

附录


参考文章

  1. Apache-ActiveMQ整合Spring
    使用JmsTemplate发送消息,基于bean的消息接收配置,JUnit集成Spring的单元测试
  2. ActiveMQ 使用示例
    基于listener-container的消息接收配置,使用PooledConnectionFactory配置连接池
  3. Spring Support
    ActiveMQ官方文档





以上是关于基于Spring支持的通信的主要内容,如果未能解决你的问题,请参考以下文章

Spring Cloud Schema Registry

在tablayout片段之间进行通信[重复]

与另一个片段通信的片段接口

带你手写基于 Spring 的可插拔式 RPC 框架通信协议模块

我去!你竟然不知道Spring MVC中有现成的WebSocket组件...

无法通过接口获取与片段通信的活动