Spring整合JMS异步消息
Posted 云水之路
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring整合JMS异步消息相关的知识,希望对你有一定的参考价值。
Spring整合JMS异步消息
在应用程序之间通信的消息,可分为同步消息和异步消息两种。前者就是当请求的程序端发出请求后,一直处于等待状态(阻塞),直到接收请求方反馈正确的结果后,请求方才能继续往下执行。而异步消息,则请求的程序端发出请求后,则可以继续向下执行,不需要阻塞当前流程,很多时候大大提高了用户的即时体验。当然,选择同步还是异步形式的通信,全凭应用的场景而做出合理的选择,比如:针对用户的即时操作要求较高的功能,则建议采用异步通信;而例如工作流或对即时性要求不高的功能实现,则可采用同步方式通信。
l 异步消息简介
l 消息模型简介
l JMS异步消息
一、异步消息简介
让我们先看下同步消息的流程,如下所示:
如上图,同步消息就是当客户端请求发出后,一直处于等待请求响应服务的反馈,否则会一直等待(阻塞)下去,待服务响应方返回内容后,客户端才能继续按程序流往下执行。接下来,再对比下异步消息,它的消息流程模型与同步很相似,如下所示:
如上图,异步消息不同于同步消息的地方,主要就是当客户端发出请求到响应服务后,会继续按照程序流往下执行运作,此时,客户端不会被阻塞,也就是用户感觉不到被阻塞的体验。那么,当响应服务处理逻辑完成后,怎么才能找到是谁调取的它?一般我们会在调取的时候,传递一个绑定的回调标识进程,该进程会一直驻留等待,直到接收到服务响应的反馈才会被销毁,当然,也可以通过消息中间件,如:ActiveMQ或RabbitMQ等,先将请求消息放入中间件,待服务提供方完成处理后,从消息中间件中获取对应消息返还给客户端即可。
二、消息模型简介
上面介绍了同步及异步消息的流程及区别。接下来,我们一起看下现在通用的两种消息模型,分别是点对点模型(队列)和发布/订阅模型(主题)。
1、点对点模型
该模型中,每一个消息都有一个发送者和一个接收者,当消息代理得到消息时,它将其放入队列中,当接收者从队列中读取下一条消息时,消息会从队列中取出,并投递给接收者,此时对应的消息会从队列中删除,所以保证了消息只有一个接收者,如下所示:
2、发布/订阅模型
在发布-订阅消息模型中,消息会发送一个主题,而订阅了这个主题的所有接收者都会接收到此消息的副本。与队列类似,多个接收者可以监听同一个主题,不同的是消息不再只投递给一个接收者,所有订阅主题的接收者都可以收到消息副本,如下所示:
三、JMS异步消息
JMS,即为Java MessageService的简写,它是一个Java标准,定义了使用消息代理的通用API,类似于JDBC为数据库提供通用的接口一样。不过在Spring中提供了基于模版抽象实现JMS功能的支持,该模版就是JmsTemplate,使用它可以很方便地在消息生产方发送消息队列和主题消息,而接收方也能很方便地接收到消息。另外,Spring还提供了消息驱动POJO的概念,它是一个简单的java对象,其能够以异步的方式响应消息队列或主题上的消息。
1、准备工作
Maven配置:
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.3.2.RELEASE</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
<scope>compile</scope>
</dependency>
A、在Spring中搭建一消息代理角色。
这里我们使用ActiveMQ作为消息代理角色,它是一个队列容器,可以很好地接收、存储、传递及管理消息,关于它的具体介绍,会在后续专题中介绍,这里主要介绍如何将其与Spring进行整合和使用。
首先,我们到ActiveMQ官网下载最新的软件包,地址:
然后,解压下载的二进制软件包,并切换到其下的bin目录下,执行activemq start 命令启动这个消息代理,供后续发送消息到该队列中做准备,如果未启动,则连接发送消息时,会报出未连接队列错误。
B、建立一个连接工厂,通过它与消息代理进行直接交互。
接下来,我们需要一个与消息代理队列交互的连接工厂,这里我选用ActiveMQ自身提供的连接工厂工具ActiveMQConnectionFactory ,它可以很好地与Spring结合,如下Java Config配置(与XML类似):
@Bean
public ActiveMQConnectionFactory connectionFactory()
ActiveMQConnectionFactory mqConnectionFactory = new ActiveMQConnectionFactory();
mqConnectionFactory.setBrokerURL("tcp://localhost:61616");
mqConnectionFactory.setUserName("root");
mqConnectionFactory.setPassword("123456");
return mqConnectionFactory;
先建立一个ActiveMQConnectionFactory 实例,然后指定访问消息代理的地址:setBrokerURL ,而ActiveMQ默认的端口是61616。一般情况,消息代理组件都会有账号存在,所以需要设置访问的用户名和密码进行授权访问:setUserName 和setPassword 。
C、声明消息目的地:队列和主题
这里需声明一个ActiveMQ的点对点消息队列(ActiveMQQueue),和一个发布/订阅队列(ActiveMQTopic),因为它们指定了消息发送的队列类型和地址,具体配置如下:
ActiveMQQueue 配置:
@Bean
public ActiveMQQueue queue()
ActiveMQQueue mqQueue = new ActiveMQQueue();
mqQueue.setPhysicalName("channel.test.queue");
return mqQueue;
ActiveMQTopic 配置:
@Bean
public ActiveMQTopic topic()
ActiveMQTopic mqTopic = new ActiveMQTopic();
mqTopic.setPhysicalName("channel.test.topic");
return mqTopic;
如上所示,不论哪种类型的队列,我们都需要为其配置对应的Physical Name地址,因为它们是目的队列的引用地址。
D、自定义消息驱动,可以异步的接收消息
为了可以在接收端异步的接收消息,我们需要实现基于EJB的消息驱动模型,为了测试队列及主题两种方式的消息分发,这里建立了两个消息驱动,具体如下:
@Bean
public CMessageHandler messageHandler()
return new CMessageHandler();
@Bean
public CMessageHandler2 messageHandler2()
return new CMessageHandler2();
CMessageHandler.java:
public class CMessageHandler
public void handleMessage(CMessage message)
System.out.println(message.getType() + ":" + message.getData());
CMessageHandler2.java:
public class CMessageHandler2
public void handleMessage(CMessage message)
System.out.println(message.getType() + ":" + message.getData());
需要注意的是这两个消息驱动POJO,在后面需要注入到消息监听适配器中,以实现异步接收消息的目的。
E、声明消息监听适配器,以实现异步接收消息
如上一步,我们已经自定义了两个消息驱动POJO模型,那么对应的也需要通过MessageListenerAdapter 监听方式,声明两个消息监听适配器,分别监听上面两个消息驱动,具体如下:
@Bean
public MessageListenerAdapter messageListenerAdapter()
MessageListenerAdapter mlAdapter = new MessageListenerAdapter();
mlAdapter.setDelegate(messageHandler());
mlAdapter.setDefaultListenerMethod("handleMessage");
return mlAdapter;
@Bean
public MessageListenerAdapter messageListenerAdapter2()
MessageListenerAdapter mlAdapter = new MessageListenerAdapter();
mlAdapter.setDelegate(messageHandler2());
mlAdapter.setDefaultListenerMethod("handleMessage");
return mlAdapter;
如上所示,通过setDelegate 指定了对应的两个消息驱动POJO模型,然后使用setDefaultListenerMethod 指定了监听的启动方法。
F、声明消息监听适配器容器,注入和管理监听适配器
接下来,我们需要针对队列(Queue)和主题(Topic),分别声明对应的两个监听适配器容器,具体如下:
@Bean
public DefaultMessageListenerContainer queueMessageListenerAdapterContainer()
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setDestination(queue());
container.setMessageListener(messageListenerAdapter());
return container;
@Bean
public DefaultMessageListenerContainer queueMessageListenerAdapterContainer2()
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setDestination(queue());
container.setMessageListener(messageListenerAdapter2());
return container;
@Bean
public DefaultMessageListenerContainer topicMessageListenerAdapterContainer()
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setDestination(topic());
container.setMessageListener(messageListenerAdapter());
return container;
@Bean
public DefaultMessageListenerContainer topicMessageListenerAdapterContainer2()
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setDestination(topic());
container.setMessageListener(messageListenerAdapter2());
return container;
如上所示,我们声明了两种类型的监听容器,分别为Queue和Topic,而每种容器分别声明两个监听容器,对应上面的两个监听适配器。
这里通过DefaultMessageListenerContainer 作为默认监听容器,使用setConnectionFactory 指定连接工厂,使用setDestination 绑定消息发送的目的地队列,并使用setMessageListener 设置对应的监听适配器。
G、使用Spring提供的JmsTemplate模版
在Spring中,已经提供了方便操作消息代理的工具JmsTemplate,通过它可以设置消息转发的模式(点对点或是发布/订阅),指定默认发送的消息目的地队列,及连接工厂。当然,为了保证传递的消息能够被正确的转换,我们可以指定消息转换器,如json格式转换,可以使用MappingJackson2MessageConverter ,可以这样配置:
@Bean
public MappingJackson2MessageConverter messageConverter()
return new MappingJackson2MessageConverter();
那么,下面罗列下两个JmsTemplate的配置,具体细节可以查看代码中的注释说明,具体如下:
@Bean
public JmsTemplate jmsQueueTemplate() // 声明点对点队列JmsTemplate
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(connectionFactory()); // 指定连接消息代理工厂
jmsTemplate.setMessageConverter(messageConverter()); // 指定默认的消息转换器
jmsTemplate.setDefaultDestinationName("channel.test.queue"); // 指定默认消息地址,针对目的地址不变情况
jmsTemplate.setPubSubDomain(false); // 关闭发布/订阅模式
return jmsTemplate;
@Bean
public JmsTemplate jmsTopicTemplate() // 声明发布订阅队列JmsTemplate
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(connectionFactory()); // 指定连接消息代理工厂
jmsTemplate.setMessageConverter(messageConverter()); // 指定默认的消息转换器
jmsTemplate.setDefaultDestinationName("channel.test.topic"); // 指定默认消息地址,针对目的地址不变情况
jmsTemplate.setPubSubDomain(true); // 开启发布/订阅模式
return jmsTemplate;
2、发送消息
A、建立一个消息业务服务
CMessageService.java:
public interface CMessageService
public void sendMessage(CMessage message);
CMessageServiceImpl.java:
@Service("CMessageService")
public class CMessageServiceImpl implements CMessageService
@Autowired
JmsTemplate jmsQueueTemplate;
@Autowired
JmsTemplate jmsTopicTemplate;
public void sendMessage(final CMessage message)
JmsTemplate jmsTemplate = null;
if(message.getType().equals("queue"))
jmsTemplate = jmsQueueTemplate;
else if(message.getType().equals("topic"))
jmsTemplate = jmsTopicTemplate;
jmsTemplate.send(new MessageCreator() // 缺省目的地地址
public Message createMessage(Session session) throws JMSException
return session.createObjectMessage(message); // 创建消息
);
B、建立一个发送消息控制器
MessageAction.java:
@RestController
@RequestMapping("/message")
public class MessageAction
@Autowired
CMessageService messageService;
@RequestMapping("/show")
public ModelAndView sendPage()
return new ModelAndView("test/jms_api");
@RequestMapping(value="/sendMessage")
public void sendMessage(HttpServletRequest request) throws Exception
String type = request.getParameter("type");
String data = request.getParameter("data");
CMessage message = new CMessage(); // 消息内容
message.setType(type);
message.setData(data);
messageService.sendMessage(message); // 发送消息
C、建立一个发送的jsp页面
jms_api.jsp(js部分):
<script type="text/javascript">
// 发送queue消息
function sendQueueMessage()
var data = $.trim($("#taQueue").val());
$.ajax(
url:'/xxx/message/sendMessage',
data:type:'queue',data:data,
type:"get",
dataType:'json',
contentType:'application/json',
success:function(result)
);
// 发送topic消息
function sendTopicMessage()
var data = $.trim($("#taTopic").val());
$.ajax(
url:'/xxx/message/sendMessage',
data:type:'topic',data:data,
type:"get",
dataType:'json',
contentType:'application/json',
success:function(result)
);
</script>
具体的效果如下:
3、接收消息
接收消息,则是通过上面声明的消息驱动POJO来处理的,其通过注入到消息监听器中,当有消息到队列的时候,监听器会即时从队列中获取消息,并将其传递给对应的消息驱动POJO来处理,并且是以异步的方式处理的消息。
A、点击发送Queue消息:
B、点击发送Topic消息:
如上所示,同样声明了两个监听者角色,Queue点对点模式时,接收者只收到一条消息,而Topic发布/订阅模式时,则注册订阅了这个主题的,都可以接收该主题消息,所以有两个接收者都收到消息。
由于作者水平有限,请在评论或(QQ: 245389109)不吝发言讨论,谢谢。
以上是关于Spring整合JMS异步消息的主要内容,如果未能解决你的问题,请参考以下文章