Spring整合JMS异步消息

Posted 云水之路

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring整合JMS异步消息相关的知识,希望对你有一定的参考价值。

Spring整合JMS异步消息

  

    在应用程序之间通信的消息,可分为同步消息和异步消息两种。前者就是当请求的程序端发出请求后,一直处于等待状态(阻塞),直到接收请求方反馈正确的结果后,请求方才能继续往下执行。而异步消息,则请求的程序端发出请求后,则可以继续向下执行,不需要阻塞当前流程,很多时候大大提高了用户的即时体验。当然,选择同步还是异步形式的通信,全凭应用的场景而做出合理的选择,比如:针对用户的即时操作要求较高的功能,则建议采用异步通信;而例如工作流或对即时性要求不高的功能实现,则可采用同步方式通信。

 

 

l  异步消息简介

l  消息模型简介

l  JMS异步消息

 

 

一、异步消息简介

让我们先看下同步消息的流程,如下所示:

如上图,同步消息就是当客户端请求发出后,一直处于等待请求响应服务的反馈,否则会一直等待(阻塞)下去,待服务响应方返回内容后,客户端才能继续按程序流往下执行。接下来,再对比下异步消息,它的消息流程模型与同步很相似,如下所示:

如上图,异步消息不同于同步消息的地方,主要就是当客户端发出请求到响应服务后,会继续按照程序流往下执行运作,此时,客户端不会被阻塞,也就是用户感觉不到被阻塞的体验。那么,当响应服务处理逻辑完成后,怎么才能找到是谁调取的它?一般我们会在调取的时候,传递一个绑定的回调标识进程,该进程会一直驻留等待,直到接收到服务响应的反馈才会被销毁,当然,也可以通过消息中间件,如:ActiveMQ或RabbitMQ等,先将请求消息放入中间件,待服务提供方完成处理后,从消息中间件中获取对应消息返还给客户端即可。

 

 

二、消息模型简介

上面介绍了同步及异步消息的流程及区别。接下来,我们一起看下现在通用的两种消息模型,分别是点对点模型(队列)和发布/订阅模型(主题)。

 

1、点对点模型

该模型中,每一个消息都有一个发送者和一个接收者,当消息代理得到消息时,它将其放入队列中,当接收者从队列中读取下一条消息时,消息会从队列中取出,并投递给接收者,此时对应的消息会从队列中删除,所以保证了消息只有一个接收者,如下所示:

 

 

2、发布/订阅模型

在发布-订阅消息模型中,消息会发送一个主题,而订阅了这个主题的所有接收者都会接收到此消息的副本。与队列类似,多个接收者可以监听同一个主题,不同的是消息不再只投递给一个接收者,所有订阅主题的接收者都可以收到消息副本,如下所示:

 

 

 

三、JMS异步消息

JMS,即为Java MessageService的简写,它是一个Java标准,定义了使用消息代理的通用API,类似于JDBC为数据库提供通用的接口一样。不过在Spring中提供了基于模版抽象实现JMS功能的支持,该模版就是JmsTemplate,使用它可以很方便地在消息生产方发送消息队列和主题消息,而接收方也能很方便地接收到消息。另外,Spring还提供了消息驱动POJO的概念,它是一个简单的java对象,其能够以异步的方式响应消息队列或主题上的消息。

 

1、准备工作

 

Maven配置:

     

 <dependency> 

       <groupId>org.springframework</groupId> 

       <artifactId>spring-jms</artifactId> 

       <version>4.3.2.RELEASE</version> 

       <scope>compile</scope> 

 </dependency> 

 <dependency> 

       <groupId>org.apache.activemq</groupId> 

       <artifactId>activemq-all</artifactId> 

       <version>5.9.0</version> 

       <scope>compile</scope> 

 </dependency>

 

 

A、在Spring中搭建一消息代理角色。

这里我们使用ActiveMQ作为消息代理角色,它是一个队列容器,可以很好地接收、存储、传递及管理消息,关于它的具体介绍,会在后续专题中介绍,这里主要介绍如何将其与Spring进行整合和使用。

首先,我们到ActiveMQ官网下载最新的软件包,地址:

http://activemq.apache.org/

 

然后,解压下载的二进制软件包,并切换到其下的bin目录下,执行activemq  start 命令启动这个消息代理,供后续发送消息到该队列中做准备,如果未启动,则连接发送消息时,会报出未连接队列错误。

 

B、建立一个连接工厂,通过它与消息代理进行直接交互。

接下来,我们需要一个与消息代理队列交互的连接工厂,这里我选用ActiveMQ自身提供的连接工厂工具ActiveMQConnectionFactory ,它可以很好地与Spring结合,如下Java Config配置(与XML类似):

@Bean

public ActiveMQConnectionFactory connectionFactory()     

       ActiveMQConnectionFactory mqConnectionFactory = new ActiveMQConnectionFactory();

       mqConnectionFactory.setBrokerURL("tcp://localhost:61616"); 

       mqConnectionFactory.setUserName("root"); 

       mqConnectionFactory.setPassword("123456");

       return mqConnectionFactory;

先建立一个ActiveMQConnectionFactory 实例,然后指定访问消息代理的地址:setBrokerURL ,而ActiveMQ默认的端口是61616。一般情况,消息代理组件都会有账号存在,所以需要设置访问的用户名和密码进行授权访问:setUserName 和setPassword 。

 

C、声明消息目的地:队列和主题

这里需声明一个ActiveMQ的点对点消息队列(ActiveMQQueue),和一个发布/订阅队列(ActiveMQTopic),因为它们指定了消息发送的队列类型和地址,具体配置如下:

ActiveMQQueue 配置:

 

@Bean

public ActiveMQQueue queue()

    ActiveMQQueue mqQueue = new ActiveMQQueue();

    mqQueue.setPhysicalName("channel.test.queue");  

    return mqQueue;

 

ActiveMQTopic 配置:

@Bean

public ActiveMQTopic topic()

    ActiveMQTopic mqTopic = new ActiveMQTopic();

    mqTopic.setPhysicalName("channel.test.topic");  

    return mqTopic;

 

如上所示,不论哪种类型的队列,我们都需要为其配置对应的Physical Name地址,因为它们是目的队列的引用地址。

 

 

D、自定义消息驱动,可以异步的接收消息

为了可以在接收端异步的接收消息,我们需要实现基于EJB的消息驱动模型,为了测试队列及主题两种方式的消息分发,这里建立了两个消息驱动,具体如下:

    @Bean

    public CMessageHandler messageHandler()       

       return new CMessageHandler();

   

   

    @Bean

    public CMessageHandler2 messageHandler2()  

       return new CMessageHandler2();

   

 

CMessageHandler.java:

public class CMessageHandler

    public void handleMessage(CMessage message)

       System.out.println(message.getType() + ":" + message.getData());

   

 

CMessageHandler2.java:

public class CMessageHandler2

    public void handleMessage(CMessage message)

       System.out.println(message.getType() + ":" + message.getData());

   

 

需要注意的是这两个消息驱动POJO,在后面需要注入到消息监听适配器中,以实现异步接收消息的目的。

 

E、声明消息监听适配器,以实现异步接收消息

如上一步,我们已经自定义了两个消息驱动POJO模型,那么对应的也需要通过MessageListenerAdapter 监听方式,声明两个消息监听适配器,分别监听上面两个消息驱动,具体如下:

@Bean

public MessageListenerAdapter messageListenerAdapter()

       MessageListenerAdapter  mlAdapter = new MessageListenerAdapter();

       mlAdapter.setDelegate(messageHandler());

       mlAdapter.setDefaultListenerMethod("handleMessage");

       return mlAdapter;

   

   

@Bean

public MessageListenerAdapter messageListenerAdapter2()

       MessageListenerAdapter  mlAdapter = new MessageListenerAdapter();

       mlAdapter.setDelegate(messageHandler2());

       mlAdapter.setDefaultListenerMethod("handleMessage");

       return mlAdapter;

   

 

如上所示,通过setDelegate 指定了对应的两个消息驱动POJO模型,然后使用setDefaultListenerMethod 指定了监听的启动方法。

 

F、声明消息监听适配器容器,注入和管理监听适配器

接下来,我们需要针对队列(Queue)和主题(Topic),分别声明对应的两个监听适配器容器,具体如下:

@Bean

    public DefaultMessageListenerContainer queueMessageListenerAdapterContainer()

       DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();

       container.setConnectionFactory(connectionFactory());

       container.setDestination(queue());

       container.setMessageListener(messageListenerAdapter());

       return container;

   

   

    @Bean

    public DefaultMessageListenerContainer queueMessageListenerAdapterContainer2()   

       DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();

       container.setConnectionFactory(connectionFactory());

       container.setDestination(queue());

       container.setMessageListener(messageListenerAdapter2());

       return container;

   

   

    @Bean

    public DefaultMessageListenerContainer topicMessageListenerAdapterContainer()

       DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();

       container.setConnectionFactory(connectionFactory());

       container.setDestination(topic());

       container.setMessageListener(messageListenerAdapter());

       return container;

   

   

    @Bean

    public DefaultMessageListenerContainer topicMessageListenerAdapterContainer2()

       DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();

       container.setConnectionFactory(connectionFactory());

       container.setDestination(topic());

       container.setMessageListener(messageListenerAdapter2());

       return container;

   

如上所示,我们声明了两种类型的监听容器,分别为Queue和Topic,而每种容器分别声明两个监听容器,对应上面的两个监听适配器。

这里通过DefaultMessageListenerContainer 作为默认监听容器,使用setConnectionFactory 指定连接工厂,使用setDestination 绑定消息发送的目的地队列,并使用setMessageListener 设置对应的监听适配器。

 

G、使用Spring提供的JmsTemplate模版

在Spring中,已经提供了方便操作消息代理的工具JmsTemplate,通过它可以设置消息转发的模式(点对点或是发布/订阅),指定默认发送的消息目的地队列,及连接工厂。当然,为了保证传递的消息能够被正确的转换,我们可以指定消息转换器,如json格式转换,可以使用MappingJackson2MessageConverter ,可以这样配置:

@Bean

    public MappingJackson2MessageConverter messageConverter()   

       return new MappingJackson2MessageConverter();

   

 

那么,下面罗列下两个JmsTemplate的配置,具体细节可以查看代码中的注释说明,具体如下:

@Bean

public JmsTemplate jmsQueueTemplate()       // 声明点对点队列JmsTemplate

       JmsTemplate jmsTemplate = new JmsTemplate();

       jmsTemplate.setConnectionFactory(connectionFactory());  // 指定连接消息代理工厂

       jmsTemplate.setMessageConverter(messageConverter());    // 指定默认的消息转换器

       jmsTemplate.setDefaultDestinationName("channel.test.queue");       // 指定默认消息地址,针对目的地址不变情况

       jmsTemplate.setPubSubDomain(false);      // 关闭发布/订阅模式

       return jmsTemplate;

   

   

@Bean

public JmsTemplate jmsTopicTemplate()       // 声明发布订阅队列JmsTemplate

       JmsTemplate jmsTemplate = new JmsTemplate();

       jmsTemplate.setConnectionFactory(connectionFactory());  // 指定连接消息代理工厂

        jmsTemplate.setMessageConverter(messageConverter());    // 指定默认的消息转换器

       jmsTemplate.setDefaultDestinationName("channel.test.topic");       // 指定默认消息地址,针对目的地址不变情况

       jmsTemplate.setPubSubDomain(true);    // 开启发布/订阅模式

       return jmsTemplate;

   

 

 

2、发送消息

A、建立一个消息业务服务

CMessageService.java:

public interface CMessageService

    public void sendMessage(CMessage message);

 

CMessageServiceImpl.java:

@Service("CMessageService")

public class CMessageServiceImpl implements CMessageService

    @Autowired

    JmsTemplate jmsQueueTemplate;

   

    @Autowired

    JmsTemplate jmsTopicTemplate;

   

    public void sendMessage(final CMessage message)

       JmsTemplate jmsTemplate = null;

       if(message.getType().equals("queue"))

           jmsTemplate = jmsQueueTemplate;

       else if(message.getType().equals("topic"))

           jmsTemplate = jmsTopicTemplate;

      

      

       jmsTemplate.send(new MessageCreator()       // 缺省目的地地址

           public Message createMessage(Session session) throws JMSException

              return session.createObjectMessage(message);    // 创建消息

          

       );

   

 

 

B、建立一个发送消息控制器

MessageAction.java:

@RestController

@RequestMapping("/message")

public class MessageAction

   @Autowired

   CMessageService messageService;

  

   @RequestMapping("/show")

   public ModelAndView sendPage()

      return new ModelAndView("test/jms_api");

  

  

   @RequestMapping(value="/sendMessage")

   public void sendMessage(HttpServletRequest request) throws Exception

      String type = request.getParameter("type");

      String data = request.getParameter("data");

     

      CMessage message = new CMessage();  // 消息内容

      message.setType(type);

      message.setData(data);

      messageService.sendMessage(message);      // 发送消息

  

 

C、建立一个发送的jsp页面

jms_api.jsp(js部分):

<script type="text/javascript">

           // 发送queue消息

           function sendQueueMessage()

               var data = $.trim($("#taQueue").val());

              $.ajax(

                      url:'/xxx/message/sendMessage',

                      data:type:'queue',data:data,

                      type:"get",

                      dataType:'json',

                      contentType:'application/json',

                      success:function(result)

                     

              );

          

          

           // 发送topic消息

           function sendTopicMessage()

              var data = $.trim($("#taTopic").val());

              $.ajax(

                      url:'/xxx/message/sendMessage',

                      data:type:'topic',data:data,

                      type:"get",

                      dataType:'json',

                      contentType:'application/json',

                      success:function(result)

                     

              );

          

       </script>

 

具体的效果如下:

 

 

3、接收消息

接收消息,则是通过上面声明的消息驱动POJO来处理的,其通过注入到消息监听器中,当有消息到队列的时候,监听器会即时从队列中获取消息,并将其传递给对应的消息驱动POJO来处理,并且是以异步的方式处理的消息。

 

A、点击发送Queue消息:

 

 

B、点击发送Topic消息:

 

 

如上所示,同样声明了两个监听者角色,Queue点对点模式时,接收者只收到一条消息,而Topic发布/订阅模式时,则注册订阅了这个主题的,都可以接收该主题消息,所以有两个接收者都收到消息。

 

 

 

 

由于作者水平有限,请在评论或(QQ: 245389109)不吝发言讨论,谢谢。

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

以上是关于Spring整合JMS异步消息的主要内容,如果未能解决你的问题,请参考以下文章

Spring整合JMS(消息中间件)

Spring整合JMS(消息中间件)

什么是jms?jms定定义了哪些不同的消息正文格式

Spring整合JMS——基于ActiveMQ实现

Spring整合activeMQ消息队列

Spring整合Weblogic jms实战