Spring WebSocket

Posted 阳君

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring WebSocket相关的知识,希望对你有一定的参考价值。

1 WebSocket 介绍

WebSocket 是 html5 开始提供的一种在 TCP 上进行的套接字全双工通讯协议,可以实现客户端与服务器端的异步通信,服务器的推送功能。WebSocket 和 Http 的区别如下所示。

WebSocket 是基于 TCP 通信,浏览器通过 javascript 向服务器发出建立 WebSocket 连接的请求,首次的请求是使用的 HTTP 连接,连接之后浏览器和服务器之间就形成了一条快速通道。两者之间就可以通过 TCP 通道进行数据交互。

WebSocket 传输的数据格式比较轻量,可以发送纯文本,也可以直接发送二进制数据。和传统的 Http 长连接相比,性能开销小,通信高效。如图所示 和 http 一样它有有加密方式 ws(不加密) 和 wss(加密)。

WebSocket 是 HTML5 增加的通信协议,并不是所有的浏览器都支持,目前可支持的版本如下所示。

2 Spring WebSocket

在Spring 中开发 WebSocket 框架主要基于 spring-websocket 和 spring-messaging 库。

在 WebSocket 中,主要包含如下信息。

  1. Message消息,包含消息头和负载。消息头中包含,信息id、优先级等;负载是传输的信息,可以放任何数据。
  2. Channel 是一个管道,服务器生产一个 Message 放入 Channel,消费者通过Subscribe(订阅)从 Channel 消费一个 Message。Channel 上可以加设拦截器,拦截非法请求。
  3. EndPoint 是服务器的接入点,客户端通过这个接入点建立 WebSocket 通信。

在浏览器我们并不是直接使用原生的的 WebSocket 协议,而是使用更高级的 SockJS 和 StompJS。

2.1 SockJS

为了应对许多浏览器不支持 WebSocket 协议的问题,Spring 对备选协议 SockJS 提供了支持。

SockJS 的请求格式如下:

http://host:port/myApp/myEndpoint/server-id/session-id/transport

2.2 Stomp

SockJS 是 WebSocket 的备选方案,但它同样是一种偏底层的协议,Spring 建议我们使用它们的高级协议 STOMP

STOMP 是一种简单的面向文本的消息传递协议,其前身是 TTMP 协议(一个简单的基于文本的协议),专为消息中间件设计,如 ActiveMQ。它提供了一个可互操作的连接格式,允许 STOMP 客户端与任意 STOMP 消息代理(Broker)进行交互。虽然 STOMP 是一种面向文本的协议,但消息的有效负载可以是文本或二进制文件。

同 HTTP 在 TCP 套接字上添加请求-响应模型层一样,STOMP 在 WebSocket 之上提供了一个基于帧的线路格式层,用来定义消息语义。

STOMP 帧由命令,一个或多个头信息以及负载所组成。如下就是一个发送数据的 STOMP 帧:

SEND
destination:/queue/trade
content-type:application/json
content-length:44

"action":"BUY","ticker":"MMM","shares",44^@

3 实战

3.1 JSP页面

WebSocket 的 javascript 开发使用 SockJS 和 Stomp。

<%@ page contentType="text/html;charset=UTF-8" language="java" %>
<html lang="en">
<head>
    <title>WebSocket</title>
    <script src="$pageContext.request.contextPath/js/jquery.js"></script>
    <script src="$pageContext.request.contextPath/js/sockjs.js"></script>
    <script src="$pageContext.request.contextPath/js/stomp.js"></script>
    <script type="text/javascript">
        $(document).ready(function () 
            setConnected(false);
        );

        var stompClient = null;

        // 连接
        function connect() 
            var userId = $("#userId").val();
            var socket = new SockJS("/stomp");
            stompClient = Stomp.over(socket);
            stompClient.connect('userId': userId, function (frame) 
                setConnected(true);
                console.log('连接成功: ' + frame);
                showGreeting("连接成功");
                // 广播
                stompClient.subscribe('/topic/message', function (greeting) 
                    console.log('subscribe: ' + greeting);
                    showGreeting(JSON.parse(greeting.body).content);
                );
                // 一对一通信
                stompClient.subscribe('/user/' + userId + '/message', function (greeting) 
                    console.log('subscribe: ' + greeting);
                    showGreeting(JSON.parse(greeting.body).content);
                );
            , function (error) 
                console.log('connect error: ' + error);
                alert(error);
            );
        

        function disconnect() 
            if (stompClient != null) 
                stompClient.disconnect(function () 
                    setConnected(false);
                    console.log("断开");
                );
            
        

        function send() 
            if (stompClient.connected) 
                var body = JSON.stringify(
                    'userId': $("#send_userId").val(),
                    'content': $("#send_content").val()
                );
                stompClient.send("/app/stomp/send", 'type': 'text', body);
                console.log("send: " + body);
             else 
                setConnected(false);
            
        

        // 显示信息
        function showGreeting(message) 
            var response = document.getElementById('response');
            var p = document.createElement('p');
            p.style.wordWrap = 'break-word';
            p.appendChild(document.createTextNode(message));
            response.appendChild(p);
        

        function setConnected(connected) 
            $("#connect").prop("disabled", connected);
            $("#disconnect").prop("disabled", !connected);
            if (connected) 
                $("#conversationDiv").show();
             else 
                $("#conversationDiv").hide();
            
            $("#response").html('');
        
    </script>
</head>
<body>
<div>
    <div>
        <label>登录</label><br/>
        <input type="text" id="userId" placeholder="输入账号" value="937447974"/>
        <button id="connect" onclick="connect();">连接</button>
        <button id="disconnect" disabled="disabled" onclick="disconnect();">断开</button>
    </div>
    <div id="conversationDiv">
        <label>发送消息</label>
        <br/>
        <input type="text" id="send_userId" placeholder="对方账号"/>
        <input type="text" id="send_content" placeholder="输入信息"/>
        <button onclick="send();">发送</button>
        <br/>
        <label>控制台</label><br/>
        <p id="response"></p>
    </div>
</div>
</body>
</html>

可以看到客户端开发只需要建立连接,添加订阅,发送消息和断开连接等相关功能。

3.2 WebSocket 配置

在 xml 中 配置 SpringMVC 的相关配置。WebSocket 配置使用 Java 的方式开发,这有利于模块独立化。

package com.websocket.config;

/**
 * WebsocketConfig.java
 * Websocket 配置
 * Created by 阳君 on 2017/10/20.
 * Copyright © 2017年 websocket. All rights reserved.
 */
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer 

    @Autowired
    private InboundChannelInterceptor inboundChannelInterceptor;

    // 连接站点配置
    public void registerStompEndpoints(StompEndpointRegistry registry) 
        registry.addEndpoint("/stomp") // stomp 连接点
                .addInterceptors(new AuthHandshakeInterceptor()) // 拦截器
                .setAllowedOrigins("*")
                .withSockJS();
    

    // 消息传输参数配置
    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) 
        super.configureWebSocketTransport(registration);
        registration.setSendTimeLimit(15 * 1000)    // 超时时间
                .setSendBufferSizeLimit(512 * 1024) // 缓存空间
                .setMessageSizeLimit(128 * 1024);   // 消息大小
    

    // 输入通道配置
    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) 
        super.configureClientInboundChannel(registration);
        registration.interceptors(this.inboundChannelInterceptor);// 设置拦截器
        registration.taskExecutor()    // 线程信息
                .corePoolSize(400)     // 核心线程池
                .maxPoolSize(800)      // 最多线程池数
                .keepAliveSeconds(60); // 超过核心线程数后,空闲线程超时60秒则杀死
    

    // 输出通道配置
    @Override
    public void configureClientOutboundChannel(ChannelRegistration registration) 
        super.configureClientOutboundChannel(registration);
    

    // 配置消息转化器
    @Override
    public boolean configureMessageConverters(List<MessageConverter> messageConverters) 
        return super.configureMessageConverters(messageConverters);
    

    // 配置消息代理
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) 
        super.configureMessageBroker(registry);
        registry.enableSimpleBroker("/topic", "/user"); // 推送消息前缀
        registry.setApplicationDestinationPrefixes("/app") // 应用请求前缀
                .setUserDestinationPrefix("/user/");  // 推送用户前缀
    

WebSocketConfig 有丰富的配置,我们可以定义很多的操作。

3.3 拦截器

3.3.1 握手拦截

WebSocket 首次连接时,会使用 Http 的连接方式,默认使用了 OriginHandshakeInterceptor 拦截,它主要做域名拦截,我们可以通过继承它做其他连接处理。

package com.websocket.config;

/**
 * AuthHandshakeInterceptor.java
 * 握手拦截器
 * Created by 阳君 on 2017/10/21.
 * Copyright © 2017年 websocket. All rights reserved.
 */
public class AuthHandshakeInterceptor extends OriginHandshakeInterceptor 

    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler
            wsHandler, Map<String, Object> attributes) throws Exception 
        System.out.println("beforeHandshake");
        return super.beforeHandshake(request, response, wsHandler, attributes);
    


    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
                               @Nullable Exception ex) 
        System.out.println("afterHandshake");
        super.afterHandshake(request, response, wsHandler, ex);
    

这个类中的方法,只会在连接时执行一次,数据传输过程中会走通道拦截。

3.3.2 通道拦截

WebSockt 是通过 Channel 传输数据,通过继承 ChannelInterceptorAdapter 可做到对输入和输出数据的拦截处理。

package com.websocket.config;

/**
 * InboundChannelInterceptor.java
 * 输入通道拦截器
 * Created by 阳君 on 2017/10/21.
 * Copyright © 2017年 websocket. All rights reserved.
 */
@Component
public class InboundChannelInterceptor extends ChannelInterceptorAdapter 

    @Autowired
    private WebSocketService webSocketService;

    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) 
        System.out.println("preSend:" + message.getHeaders());
        StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
        StompCommand stompCommand = accessor.getCommand();
        if (StompCommand.CONNECT.equals(stompCommand)) 
            String userId = accessor.getFirstNativeHeader("userId");
            String simpSessionId = accessor.getHeader("simpSessionId").toString();
            this.webSocketService.connect(simpSessionId, userId);
         else if (StompCommand.DISCONNECT.equals(stompCommand)) 
            this.webSocketService.disconnect(accessor.getHeader("simpSessionId").toString());
        
        return super.preSend(message, channel);
    

    @Override
    public void postSend(Message<?> message, MessageChannel channel, boolean sent) 
        System.out.println("postSend输入数据处理后");
        super.postSend(message, channel, sent);
    

    @Override
    public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, @Nullable Exception ex) 
        super.afterSendCompletion(message, channel, sent, ex);
        System.out.println("afterSendCompletion");
    

输入通道的拦截可以自定义相关命令的实现,如连接和断开。

3.4 控制器

控制器写法和 SpringMVC 类似,主要是使用 @MessageMapping 做匹配处理。

package com.websocket.controller;

/**
 * WebSocketController.java
 * <p>
 * Created by 阳君 on 2017/10/20.
 * Copyright © 2017年 websocket. All rights reserved.
 */
@MessageMapping("/stomp")
@RestController
public class WebSocketController 
    // @SendTo("/topic/message") 广播发送给 /topic/message 订阅客户端
    // @SendToUser("1/message") 发送给指定订阅用户

    @Autowired
    public WebSocketService webSocketService;

    @MessageMapping("/send")
    public void sendBroadcast(@Headers Map<String, Object> headers, @Header String simpSessionId, TextMessage
            textMessage) 
        TextMessage sendTM = new TextMessage(this.webSocketService.getPool().get(simpSessionId) + ": " + textMessage
                .getContent());
        String userId = textMessage.getUserId();
        if (StringUtils.isEmpty(userId)) 
            this.webSocketService.send(sendTM);
         else 
            this.webSocketService.sendToUser(userId, sendTM);
        
    

@SendTo 和 @SendToUser 可以做返回数据处理,但实际开发中,多数会自定义返回操作。

3.5 业务层

业务层 WebSocketService 实现了登录、断开、群发和定点发送消息的功能。

package com.websocket.service;

/**
 * WebSocketService.java
 * 服务层
 * Created by 阳君 on 2017/10/22.
 * Copyright © 2017年 websocket. All rights reserved.
 */
@Service
public class WebSocketService 

    private Map<String, String> pool; // 连接池
    @Autowired
    private SimpMessagingTemplate simpMessagingTemplate;

    @PostConstruct
    void init() 
        this.pool = new HashMap<String, String>();
    

    public Map<String, String> getPool() 
        return pool;
    

    /**
     * 连接
     *
     * @param simpSessionId 会话id
     * @param userId        用户id
     */
    public void connect(String simpSessionId, String userId) 
        if (!(StringUtils.isEmpty(simpSessionId) || StringUtils.isEmpty(userId))) 
            this.pool.put(simpSessionId, userId);
            TextMessage textMessage = new TextMessage(userId + "上线,当前在线人数:" + this.pool.size());
            this.simpMessagingTemplate.convertAndSend("/topic/message", textMessage);
        
    


    /**
     * 断开连接
     *
     * @param simpSessionId 会话id
     */
    public void disconnect(String simpSessionId) 
        String userId = this.pool.get(simpSessionId);
        if (userId != null) 
            this.pool.remove(simpSessionId);
            TextMessage textMessage = new TextMessage(userId + "离线,当前在线人数:" + this.pool.size());
            this.send(textMessage);
        
    

    /**
     * 给所有用户发送消息
     *
     * @param payload 消息
     * @throws MessagingException
     */
    public void send(Object payload) throws MessagingException 
        this.simpMessagingTemplate.convertAndSend("/topic/message", payload);
    

    /**
     * 给指定用户发消息
     *
     * @param userId  用户id
     * @param payload 消息
     * @throws MessagingException
     */
    public void sendToUser(String userId, Object payload) throws MessagingException 
        this.simpMessagingTemplate.convertAndSendToUser(userId, "/message", payload);
    

WebSocketService 是一个简单的实现,数据的回调使用了 SimpMessagingTemplate 。

3.6 数据模型

传输的数据模型使用 TextMessage 完成。

package com.websocket.po;

import org.springframework.lang.Nullable;

public class TextMessage 

    @Nullable
    private String userId; // 用户id
    @Nullable
    private String content; // 内容

    public TextMessage() 
    

    public TextMessage(String content) 
        this.content = content;
    

    @Nullable
    public String getUserId() 
        return userId;
    

    @Nullable
    public String getContent() 
        return content;
    

为了避免客户端传空,引起解析报错,需在字段上添加 @Nullable。

 


Appendix

Sample Code

Java

Servlet-based WebSocket Support

Spring Integeration学习

理解JMS规范中的持久订阅和非持久订阅

WebSocket应用安全问题分析

WebSocket 教程

WebSocket+SockJs+STMOP

STOMP协议详解

Revision History

时间描述
2017-10-26博文完成

CSDN:http://blog.csdn.net/y550918116j

GitHub:https://github.com/937447974

以上是关于Spring WebSocket的主要内容,如果未能解决你的问题,请参考以下文章

如何在Spring中配置Websocket

java WebSocket的实现以及Spring WebSocket

Spring 4.0 WebSocket 示例应用程序:无法建立 websocket 连接

spring-webSocket

WebSocket简介

Spring 4 WebSocket 配置端口