ActiveMQ 与 WebSocket 的结合推送方案+Spring Websocket Stomp

Posted 汪小哥

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ActiveMQ 与 WebSocket 的结合推送方案+Spring Websocket Stomp相关的知识,希望对你有一定的参考价值。

一、 背景

对目前前后端分离的开发开发的大环境下, 前端使用 vue 进行项目的开发, 后端不在使用以前的 jsp 的开发方式进行开发, 因此造成了对于前端推送方案的选型问题, 在项目的开发过程针对于前后端的开发时间和效率等综合考虑进行了一个技术的选型,其中有过多种的方案的考虑。

二、 各种推送方案的比较

1. 各种推送方案的简单介绍

Ajax 轮询:

轮询:缺点,糟糕的用户体验;对服务器压力很大,并造成带宽的极大浪费。

DWR:

DWR 是一个用于改善 web 页面与 Java 类交互的远程服务器端 Ajax 开源框架,可以帮助开发人员开发包含 AJAX 技术的网站。它可以允许在浏览器里的代码使用运行在 WEB 服务器上的 JAVA 函数,就像它就在浏览器里一样。 将 java 代码转为 js 文件,引入 js 文件路径不能更改,原理也是轮询。

Activemq 结合 Ajax:

ActiveMQ 支持 Ajax,利用 ActiveMQ 的“发布/订阅”的特性,来创建高度实时的 web 应用, 需要结合 Amq.js+JQuery 进行处理发布和订阅消息。参考文章官方使用说明可以简单的了解如何使用。

Netty:

基于原生 NIO 实现的高并发框架,配合 websocket 实现消息推送, netty 会单独开一个 websocket 端口处理请求,并不会占用中间件的连接数,而且一个线程可以处理几万个链接。

Spring 对于 Websocket 的支持:

websocket 是 html5 新增加特性之一,目的是浏览器与服务端建立全双工的通信方式,解决 http 请求-响应带来过多的资源消耗,同时对特殊场景应用提供了全新的实现方式, 比如聊天、股票交易、游戏等对对实时性要求较高的行业领域。

ActiveMQ 对于 Websocket 的支持:

可以通过 stomp 协议简单的处理发布订阅这样的消息模型并且建立在 Websocket 的基础上进行。 STOMP 即 Simple (or Streaming) Text Orientated Messaging Protocol,简单(流)文本定向消息协议,它提供了一个可互操作的连
接格式,允许 STOMP 客户端与任意 STOMP 消息代理(Broker)进行交互。参考文章STOMP Over WebSocket

2. 推送方案的选型

对于上述的各种推送的方案都曾经有过考虑和分析, 最终选取了最后一种方案”ActiveMQ 对于Websocket 的支持”.

DWR 和 Ajax

轮询采用同样的技术原理轮询,性能上首先淘汰掉了这两个方案

Activemq 结合 Ajax

在使用上来说依赖于 amq.js,这个 js 文件还依赖一些基于公共javascript 框架: jquery.js、 amq_jquery_adapter.js。因此,使用 amq.js 时,,必须先引入 jquery 库文件和适配器库文件 amq_jquery_adapter.js,其次这个方案的实现原理也是页
面轮询的方式, 就目前我们前端开发的特点上来说,使用的是 vue 不再提倡使用 JQuery 这样的类库,如果采用这样的方式非常的不利于前端的处理, 所以排除。

采用 Websocket

最后的三种方案其实都是采用 Websocket 的方式进行处理, Spring 对于 Websocket 的支持这种方案的使用可以参考Spring WebSocket 实现消息推送,这种方案目前在部门中没有看到过使用不成熟,需要自己去手动维护连接用户的信息并且针对不同的推送信息需要对于客户端进行区分到底是谁需要当前的这个推送的请求.Netty 的这个方案在之前有过使用, 代码量庞大并且当前的开发人员对于这个模块并不是很熟悉且与 spring 对于websocket 的支持有同样的毛病, 需要代码手动维护区分当前的推送需要针对哪些用户或者哪些页面的请求, 前端开发也是蛮复杂的,迁移成本巨大。 这里有一点需要说明,其实 spring也是支持 Stomp 协议Spring+STOMP 实现 WebSocket 广播订阅、权限认证、一对一通讯的,之前对于这方面不太了解,spring 使用本地维护 session 信息支持 Stomp 协议,同时也是支持与 ActiveMQ 集成,但是需要 ActiveMQ 支持 ws 协议;Spring Boot系列十七 Spring Boot 集成 websocket,使用RabbitMQ做为消息代理

spring websocket 配置类

/**
 * Spring websocket 接入使用
 *
 * @author: wangji
 * @date: 2018/04/13 15:37
 */
@EnableWebSocketMessageBroker
public class WebSocketMessageBrokerConfig extends AbstractWebSocketMessageBrokerConfigurer 


    @Resource(name = "taskExecutor")
    private ThreadPoolTaskExecutor threadPool;

    /**
     * 连接的端点,客户端建立连接时需要连接这里配置的端点
     */
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) 
        registry.addEndpoint("/client")
                .setAllowedOrigins("*")
                .setHandshakeHandler(new DefaultHandshakeHandler())
                .addInterceptors(new HandshakeInterceptor() 
                    /**
                     * 添加权限校验是否登录
                     * 不进行校验可以使用默认的 HttpSessionHandshakeInterceptor
                     *
                     * @param serverHttpRequest
                     * @param serverHttpResponse
                     * @param webSocketHandler
                     * @param map
                     * @return
                     * @throws Exception
                     * @see org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor
                     */
                    @Override
                    public boolean beforeHandshake(ServerHttpRequest serverHttpRequest
                            , ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler
                            , Map<String, Object> map) throws Exception 
                        boolean result = true;
                        if (!StringUtils.isNotEmpty(LoginInfoUtil.getRequesetUserId())) 
                            result = false;
                        
                        return result;
                    

                    @Override
                    public void afterHandshake(ServerHttpRequest serverHttpRequest
                            , ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) 
                    
                );
    

    /**
     * 使用参考
     *
     * @link https://blog.csdn.net/elonpage/article/details/78446695?locationNum=5&fps=1
     * @see org.springframework.messaging.simp.SimpMessageSendingOperations#convertAndSend(Object, Object) 发送消息(/topic/xxxx,"消息")
     */
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) 
        registry.setApplicationDestinationPrefixes("/app");
        registry.setUserDestinationPrefix("/user");
        registry.enableSimpleBroker("/topic", "/queue");
    

    /**
     * 输入通道参数设置设置多线程(接收消息通道)
     */
    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) 
        //线程信息
        registration.taskExecutor(threadPool);
    

    /**
     * 输出通道参数配置多线程(发送给给前端的推送通道)
     */
    @Override
    public void configureClientOutboundChannel(ChannelRegistration registration) 
        //线程信息
        registration.taskExecutor(threadPool);
    


发送工具类

**
 * stomp websocket协议手动发送通知到前端
 *
 * @author: wangji
 * @date: 2018/04/16 10:00
 */
@Component
public class StompWebsocketNotifier implements InitializingBean 

    private static final Logger log = LoggerFactory.getLogger(StompWebsocketNotifier.class);

    private static final String TOPIC_PREFIX = "/topic/";

    @Resource
    private ApplicationContext applicationContext;

    private static SimpMessageSendingOperations simpMessageSendingOperations;

    @Override
    public void afterPropertiesSet() throws Exception 
        simpMessageSendingOperations = applicationContext.getBean(SimpMessageSendingOperations.class);
    

    /**
     * 推送消息通过 websocket到前端
     *
     * @param topicName 队列的名称
     * @param message   发送消息的内容
     * @return
     */
    public static boolean sendToWebSocketTopic(String topicName, String message) 
        boolean result = true;
        if (StringUtils.isNotEmpty(topicName) && StringUtils.isNotEmpty(message)) 
            try 
                simpMessageSendingOperations.convertAndSend(TOPIC_PREFIX + topicName, message);
             catch (MessagingException e) 
                result = false;
                log.error(PrettyLogger.toMessage("push websocket error", "topicName"
                        , "message"), topicName, message, e);
            
        
        return result;
    

配置线程池

<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" destroy-method="shutdown">
        <!-- 核心线程数  -->
        <property name="corePoolSize" value="10" />
        <!-- 最大线程数 -->
        <property name="maxPoolSize" value="50" />
        <!-- 队列最大长度 >=mainExecutor.maxSize -->
        <property name="queueCapacity" value="1000" />
        <!-- 线程池维护线程所允许的空闲时间 -->
        <property name="keepAliveSeconds" value="300" />
        <!-- 线程池对拒绝任务(无线程可用)的处理策略 -->
        <property name="rejectedExecutionHandler">
            <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/>
        </property>
        <property name="threadGroup" value="taskExecutor-Pool"/>
        <property name="threadNamePrefix" value="taskExecutor-Pool"/>
        <property name="waitForTasksToCompleteOnShutdown" value="true"/>
    </bean>

三、 ActiveMQ与WebSocket的结合的使用

ActiveMQ结合Websocket的使用主要集中在前端,后端由于ActiveMQ历史沉淀了许多优秀的ActiveMQ使用的封装的类库,直接调用非常的方便。针对不同的topic队列可以根据前端页面的需求进行相应的订,如下是一个简单的使用Demo参考STOMP Over WebSocket

1. 简单使用

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
    <script src="stomp.min.js"></script>
    <script>
        var url = "ws://10.33.30.255:61614";
        var client = Stomp.client(url);
        // client will send heartbeats every 20000ms
        client.heartbeat.outgoing = 20000;
        client.heartbeat.incoming = 0;
        client.connect("", "", function (frame) 
            // upon connection, subscribe to the destination
            var sub = client.subscribe("/topic/testTopic", function (message) 
                console.log("receive: " + message.body);
            );
        );
    </script>
</head>
<body>

</body>
</html>

2. Vue组件封装

根据vue的特点,对于推送请求框架进行了简单的封装,各个页面可以简单的使用组件,需要下面的两个封装的文件
Message-push.vue

<template>
  <!--@input: URI port  destination 详情见下方props-->
  <!--@output:message-get 从服务器端得到的消息-->
  <span>
  </span>
</template>
<script>
  import Stomp from 'stompjs';
  export default 
    name: 'messagePush',
    props: 
      URI: // 连接着服务端的WebSocket的代理的地址  类似于服务器的ip地址
        type: String,
        default: '10.11.165.16'
      ,
      port: 
        type: String,
        default: '61614'
      ,
      URL: String, // 带协议的连接地址,传入这个属性会覆盖上面的两个属性
      destination: // 订阅消息的目的地,也就是消息的来源地址 类似于api中的地址;可以同时接收多个消息
        type: [String, Array],
        default: ''
      
    ,
    data () 
      return 
        client: null
      ;
    ,
    beforeDestroy() 
      this.destroy();
    ,
    mounted() 
      this.init();
    ,
    methods: 
      init() 
        if (window.WebSocket) 
          /*
           * * 步骤创建STOMP客户端------连接服务端
           * */
          // STOMP javascript 客户端会使用ws://的URL与STOMP 服务端进行交互。
          let url = `ws://$this.URI:$this.port`;
          if (this.URL !== '') url = this.URL;
          // 为了创建一个STOMP客户端js对象,你需要使用Stomp.client(url),而这个URL连接着服务端的WebSocket的代理:
          this.client = Stomp.client(url);
          this.client.heartbeat.incoming = 0;
          this.client.heartbeat.outgoing = 1000 * 60;
          this.client.debug = null;
          // Stomp 客户端建立后,必须调用它的connect()方法去连接
          this.client.connect('', '', () => 
            // 为了在浏览器中接收消息,STOMP客户端必须先订阅一个目的地destination。
            // body 是字符串,请使用JSON.parse()去转换JSON对象
            let desList = [];
            if (typeof this.destination === 'string') 
              desList.push(this.destination);
             else 
              desList = this.destination;
            
            // 遍历每一个地址,订阅消息
            for (let des of desList) 
              ((destination) => 
                this.client.subscribe(destination, (message) => 
                  let _data = null;
                  if (message.body) 
                    // 解析接受到的数据
                    _data = JSON.parse(message.body);
                   else 
                    // 接受到空消息
                    _data = '';
                  
                  this.$emit('message-get', _data, destination);
                );
              )(des);
            
          );
         else 
          this.$message(
            message: `你的浏览器版本太低了,不支持Websocket,会导致你接收不到实时数据!'
                  您可以更换一个支持websockets浏览器,如:IE11 或者 chrome 或firefox!`,
            type: 'warning'
          );
        
      ,
      destroy() 
        // 断开连接时,调用disconnect方法
        this.client.disconnect();
        this.client = null;
      
    
  ;
</script>
<style lang='less' scoped>

</style>

messager.vue

<template>
  <message-push :destination="destination" v-if="start" :URL="url" @message-get="onMessageGet"></message-push>
</template>

<script>
  import messagePush from './message-push.vue';
  import commonService from 'service/common.service';
  import ErrorHandleMixin from 'utils/mixin';
  export default 
    props: 
      destination: 
        type: [String, Array],
        default: ''
      
    ,
    mixins: [ErrorHandleMixin],
    components: 
      messagePush
    ,
    data() 
      return 
        start: false,
        url: ''
      ;
    ,
    methods: 
      getWsInfo() 
        commonService.getWsInfo().then(res => 
          this.url = res.mqAddr;
          if (this.url) this.start = true;
        ).catch(this._handleError);
      ,
      onMessageGet(data, topic) 
        this.$emit('message-get', data, topic);
      
    ,
    mounted() 
      this.getWsInfo();
      this.$emit('init');
    
  ;
</script>

3. 组件使用

只需要传入需要订阅的队列的名称和处理的地址即可,如下

<messager destination="/topic/XXXTopic" @message-get="msgPushHandler"></messager>

四、 总结

对于不同的推送方案的了解,在不断的咨询相关的同时和查找资料的阅读相应的官方文档下进行了了解,感谢开源,让我们能够利用优秀的方案不用重复的造轮子。

以上是关于ActiveMQ 与 WebSocket 的结合推送方案+Spring Websocket Stomp的主要内容,如果未能解决你的问题,请参考以下文章

ActiveMQ中拉模式与推模式

activemq的配置与结合spring使用

如何通过javascript 使用 MQTT

gin建立websocket服务

ActiveMQ(06):ActiveMQ结合Spring开发

Spring+Stomp+ActiveMq实现websocket长连接