WebSocket通信原理和在Tomcat中实现源码详解(万字爆肝)

Posted 徐同学呀

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了WebSocket通信原理和在Tomcat中实现源码详解(万字爆肝)相关的知识,希望对你有一定的参考价值。

首发CSDN:徐同学呀,原创不易,转载请注明源链接。我是徐同学,用心输出高质量文章,希望对你有所帮助。 本篇基于Tomcat10.0.6。建议收藏起来慢慢看。

一、前言

WebSocket是一种全双工通信协议,即客户端可以向服务端发送请求,服务端也可以主动向客户端推送数据。这样的特点,使得它在一些实时性要求比较高的场景效果斐然(比如微信朋友圈实时通知、在线协同编辑等)。主流浏览器以及一些常见服务端通信框架(TomcatnettyundertowwebLogic等)都对WebSocket进行了技术支持。那么,WebSocket具体是什么?为什么会出现WebSocket?如何做到全双工通信?解决了什么问题?

二、什么是WebSocket

1、HTTP/1.1的缺陷

HTTP/1.1最初是为网络中超文本资源(html),请求-响应传输而设计的,后来支持了传输更多类型的资源,如图片、视频等,但都没有改变它单向的请求-响应模式。

随着互联网的日益壮大,HTTP/1.1功能使用上已体现捉襟见肘的疲态。虽然可以通过某些方式满足需求(如AjaxComet),但是性能上还是局限于HTTP/1.1,那么HTTP/1.1有哪些缺陷呢:

  • 请求-响应模式,只能客户端发送请求给服务端,服务端才可以发送响应数据给客户端。
  • 传输数据为文本格式,且请求/响应头部冗长重复。

(为了区分HTTP/1.1HTTP/1.2,下面描述中,HTTP均代表HTTP/1.1

2、WebSocket发展历史

(1)背景

WebSocket出现之前,主要通过长轮询和HTTP长连接实现实时数据更新,这种方式有个统称叫CometTomcat8.5之前有对Comet基于流的HTTP长连接做支持,后来因为WebSocket的成熟和标准化,以及Comet自身依然是基于HTTP,在性能消耗和瓶颈上无法跳脱HTTP,就把Comet废弃了。

还有一个SPDY技术,也对HTTP进行了改进,多路复用流、服务器推送等,后来演化成HTTP/2.0,因为适用场景和解决的问题不同,暂不对HTTP/2.0做过多解释,不过对于HTTP/2.0WebSocketTomcat实现中都是作为协议升级来处理的。

CometSPDY的原理不是本篇重点,没有展开讲解,感兴趣的同学可自行百度)

(2)历史

在这种背景下,HTML5制定了WebSocket

  • 筹备阶段,WebSocket被划分为HTML5标准的一部分,2008年6月,Michael Carter进行了一系列讨论,最终形成了称为WebSocket的协议。
  • 2009年12月,Google Chrome 4是第一个提供标准支持的浏览器,默认情况下启用了WebSocket
  • 2010年2月,WebSocket协议的开发从W3CWHATWG小组转移到IETF(TheInternet Engineering Task Force),并在Ian Hickson的指导下进行了两次修订。
  • 2011年,IETFWebSocket协议标准化为RFC 6455起,大多数Web浏览器都在实现支持WebSocket协议的客户端API。此外,已经开发了许多实现WebSocket协议的Java库。
  • 2013年,发布JSR356标准,Java API for WebSocket。

(为什么要去了解WebSocket的发展历史和背景呢?个人认为可以更好的理解某个技术实现的演变历程,比如Tomcat,早期有Comet没有WebSocket时,Tomcat就对Comet做了支持,后来有WebSocket了,但是还没出JSR356标准,Tomcat就对Websocket做了支持,自定义API,再后来有了JSR356Tomcat立马紧跟潮流,废弃自定义的API,实现JSR356那一套,这就使得在Tomcat7使用WebSocket的同学,想升为Tomcat8(其实Tomcat7.0.47之后就是JSR356标准了),发现WebSocket接入方式变了,而且一些细节也变了。)

3、WebSocket握手和双向通信

(1)定义

WebSocket全双工通信协议,在客户端和服务端建立连接后,可以持续双向通信,和HTTP同属于应用层协议,并且都依赖于传输层的TCP/IP协议。

虽然WebSocket有别于HTTP,是一种新协议,但是RFC 6455中规定:

it is designed to work over HTTP ports 80 and 443 as well as to support HTTP proxies and intermediaries.

  • WebSocket通过HTTP端口80和443进行工作,并支持HTTP代理和中介,从而使其与HTTP协议兼容。
  • 为了实现兼容性,WebSocket握手使用HTTP Upgrade头从HTTP协议更改为WebSocket协议。
  • Websocket使用wswss的统一资源标志符(URI),分别对应明文和加密连接。

(2)握手(建立连接)

在双向通信之前,必须通过握手建立连接。Websocket通过 HTTP/1.1 协议的101状态码进行握手,首先客户端(如浏览器)发出带有特殊消息头(UpgradeConnection)的请求到服务器,服务器判断是否支持升级,支持则返回响应状态码101,表示协议升级成功,对于WebSocket就是握手成功。

客户端请求示例:

GET /test HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: tFGdnEL/5fXMS9yKwBjllg==
Origin: http://example.com
Sec-WebSocket-Protocol: v10.stomp, v11.stomp, v12.stomp
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
Sec-WebSocket-Version: 13
  • Connection必须设置Upgrade,表示客户端希望连接升级。
  • Upgrade: websocket表明协议升级为websocket
  • Sec-WebSocket-Key字段内记录着握手过程中必不可少的键值,由客户端(浏览器)生成,可以尽量避免普通HTTP请求被误认为Websocket协议。
  • Sec-WebSocket-Version 表示支持的Websocket版本。RFC6455要求使用的版本是13。
  • Origin字段是必须的。如果缺少origin字段,WebSocket服务器需要回复HTTP 403 状态码(禁止访问),通过Origin可以做安全校验。

服务端响应示例:

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: HaA6EjhHRejpHyuO0yBnY4J4n3A=
Sec-WebSocket-Extensions: permessage-deflate;client_max_window_bits=15
Sec-WebSocket-Protocol: v12.stomp

Sec-WebSocket-Accept的字段值是由握手请求中的Sec-WebSocket-Key的字段值生成的。成功握手确立WebSocket连接之后,通信时不再使用HTTP的数据帧,而采用WebSocket独立的数据帧。

(3)消息帧

WebSocket使用二进制消息帧作为双向通信的媒介。何为消息帧?发送方将每个应用程序消息拆分为一个或多个帧,通过网络将它们传输到目的地,并重新组装解析出一个完整消息。

有别于HTTP/1.1文本消息格式(冗长的消息头和分隔符等),WebSocket消息帧规定一定的格式,以二进制传输,更加短小精悍。二者相同之处就是都是基于TCP/IP流式协议(没有规定消息边界)。

如下是消息帧的基本结构图:

  • FIN: 1 bit,表示该帧是否为消息的最后一帧。1-是,0-否。
  • RSV1,RSV2,RSV3: 1 bit each,预留(3位),扩展的预留标志。一般情况为0,除非协商的扩展定义为非零值。如果接收到非零值且不为协商扩展定义,接收端必须使连接失败。
  • Opcode: 4 bits,定义消息帧的操作类型,如果接收到一个未知Opcode,接收端必须使连接失败。(0x0-延续帧,0x1-文本帧,0x2-二进制帧,0x8-关闭帧,0x9-PING帧,0xA-PONG帧(在接收到PING帧时,终端必须发送一个PONG帧响应,除非它已经接收到关闭帧),0x3-0x7保留给未来的非控制帧,0xB-F保留给未来的控制帧)
  • Mask: 1 bit,表示该帧是否为隐藏的,即被加密保护的。1-是,0-否。Mask=1时,必须传一个Masking-key,用于解除隐藏(客户端发送消息给服务器端,Mask必须为1)。
  • Payload length: 7 bits, 7+16 bits, or 7+64 bits,有效载荷数据的长度(扩展数据长度+应用数据长度,扩展数据长度可以为0)。

if 0-125, that is the payload length. If 126, the following 2 bytes interpreted as a 16-bit unsigned integer are the payload length. If 127, the following 8 bytes interpreted as a 64-bit unsigned integer (the most significant bit MUST be 0) are the payload length.

  • Masking-key: 0 or 4 bytes,用于解除帧隐藏(加密)的key,Mask=1时不为空,Mask=0时不用传。
  • Payload data: (x+y) bytes,有效载荷数据包括扩展数据(x bytes)和应用数据(y bytes)。有效载荷数据是用户真正要传输的数据。

这样的二进制消息帧设计,与HTTP协议相比,WebSocket协议可以提供约500:1的流量减少和3:1的延迟减少。

(4)挥手(关闭连接)

挥手相对于握手要简单很多,客户端和服务器端任何一方都可以通过发送关闭帧来发起挥手请求。发送关闭帧的一方,之后不再发送任何数据给对方;接收到关闭帧的一方,如果之前没有发送过关闭帧,则必须发送一个关闭帧作为响应。关闭帧中可以携带关闭原因。

在发送和接收一个关闭帧消息之后,就认为WebSocket连接已关闭,且必须关闭底层TCP连接。

除了通过关闭握手来关闭连接外,WebSocket连接也可能在另一方离开或底层TCP连接关闭时突然关闭。

4、WebSocket优点

  • 较少的控制开销。在连接建立后,服务器和客户端之间交换数据时,用于协议控制的数据包头部相对于HTTP请求每次都要携带完整的头部,显著减少。

  • 更强的实时性。由于协议是全双工的,所以服务器可以随时主动给客户端下发数据。相对于HTTP请求需要等待客户端发起请求服务端才能响应,延迟明显更少。

  • 保持连接状态。与HTTP不同的是,Websocket需要先建立连接,这就使得其成为一种有状态的协议,之后通信时可以省略部分状态信息。而HTTP请求可能需要在每个请求都携带状态信息(如身份认证等)。

  • 更好的二进制支持。Websocket定义了二进制帧,相对HTTP,可以更轻松地处理二进制内容。

  • 支持扩展。Websocket定义了扩展,用户可以扩展协议、实现部分自定义的子协议。

  • 更好的压缩效果。相对于HTTP压缩,Websocket在适当的扩展支持下,可以沿用之前内容的上下文,在传递类似的数据时,可以显著提高压缩率。

三、Java API for WebSocket(JSR356)

JSR356Java EE7时归为Java EE标准的一部分(后来Java EE更名为Jakarta EE,世上再无Java EE,以下统一称Jakarta EE),所有兼容Jakarta EE的应用服务器,都必须遵循JSR356标准的WebSocket协议API。

根据JSR356规定, 建立WebSocket连接的服务器端和客户端,两端对称,可以互相通信,差异性较小,抽象成API,就是一个个Endpoint(端点),只不过服务器端的叫ServerEndpoint,客户端的叫ClientEndpoint。客户端向服务端发送WebSocket握手请求,建立连接后就创建一个ServerEndpoint对象。(这里的EndpointTomcat连接器里的AbstractEndpoint名称上有点像,但是两个毫不相干的东西,就像周杰伦和周杰的关系。)

ServerEndpointClientEndpoint在API上差异也很小,有相同的生命周期事件(OnOpenOnCloseOnErrorOnMessage),不同之处是ServerEndpoint作为服务器端点,可以指定一个URI路径供客户端连接,ClientEndpoint没有。

1、服务端API

服务器端的Endpoint有两种实现方式,一种是注解方式@ServerEndpoint,一种是继承抽象类Endpoint

(1)注解方式@ServerEndpoint

首先看看@ServerEndpoint有哪些要素:

  • value,可以指定一个URI路径标识一个Endpoint
  • subprotocols,用户在WebSocket协议下自定义扩展一些子协议。
  • decoders,用户可以自定义一些消息解码器,比如通信的消息是一个对象,接收到消息可以自动解码封装成消息对象。
  • encoders,有解码器就有编码器,定义解码器和编码器的好处是可以规范使用层消息的传输。
  • configuratorServerEndpoint配置类,主要提供ServerEndpoint对象的创建方式扩展(如果使用TomcatWebSocket实现,默认是反射创建ServerEndpoint对象)。

@ServerEndpoint可以注解到任何类上,但是想实现服务端的完整功能,还需要配合几个生命周期的注解使用,这些生命周期注解只能注解在方法上:

  • @OnOpen 建立连接时触发。
  • @OnClose 关闭连接时触发。
  • @OnError 发生异常时触发。
  • @OnMessage 接收到消息时触发。

(2)继承抽象类Endpoint

继承抽象类Endpoint,重写几个生命周期方法。

怎么没有onMessage方法,实现onMessage还需要继承实现一个接口jakarta.websocket.MessageHandlerMessageHandler接口又分为PartialWhole,实现的MessageHandler需要在onOpen触发时注册到jakarta.websocket.Session中。

继承抽象类Endpoint的方式相对于注解方式要麻烦的多,除了继承Endpoint和实现接口MessageHandler外,还必须实现一个jakarta.websocket.server.ServerApplicationConfig来管理Endpoint,比如给Endpoint分配URI路径。

encodersdecodersconfigurator等配置信息由jakarta.websocket.server.ServerEndpointConfig管理,默认实现jakarta.websocket.server.DefaultServerEndpointConfig

所以如果使用 Java 版WebSocket服务器端实现首推注解方式。

2、客户端API

对于客户端API,也是有注解方式和继承抽象类Endpoint方式。

  • 注解方式,只需要将@ServerEndpoint换成@ClientEndpoint
  • 继承抽象类Endpoint方式,需要一个jakarta.websocket.ClientEndpointConfig来管理encodersdecodersconfigurator等配置信息,默认实现jakarta.websocket.DefaultClientEndpointConfig

3、上下文Session

WebSocket是一个有状态的连接,建立连接后的通信都是通过jakarta.websocket.Session保持状态,一个连接一个Session,每一个Session有一个唯一标识Id。

Session的主要职责涉及:

  • 基础信息管理(request信息(getRequestURIgetRequestParameterMapgetPathParameters等)、协议版本getProtocolVersion、子协议getNegotiatedSubprotocol等)。
  • 连接管理(状态判断isOpen、接收消息的MessageHandler、发送消息的异步远程端点RemoteEndpoint.Async和同步远程端点RemoteEndpoint.Basic等)。

4、HandshakeRequest 和 HandshakeResponse

HandshakeRequestHandshakeResponse了解即可,这两个接口主要用于WebScoket握手升级过程中握手请求响应的封装,如果只是单纯使用WebSocket,不会接触到这两个接口。

(1)HandshakeRequest

(2)HandshakeResponse

Sec-WebSocket-Accept根据客户端传的Sec-WebSocket-Key生成,如下是Tomcat10.0.6 WebSocket源码实现中生成Sec-WebSocket-Accept的算法:

private static String getWebSocketAccept(String key) {
    byte[] digest = ConcurrentMessageDigest.digestSHA1(
            key.getBytes(StandardCharsets.ISO_8859_1), WS_ACCEPT);
    return Base64.encodeBase64String(digest);
}

5、WebSocketContainer

jakarta.websocket.WebSocketContainer顾名思义,就是WebSocket的容器,集大成者。其主要职责包括但不限于connectToServer,客户端连接服务器端,基于浏览器的WebSocket客户端连接服务器端,由浏览器支持,但是基于Java版的WebSocket客户端就可以通过WebSocketContainer#connectToServer向服务端发起连接请求。


四、WebSocket基于Tomcat应用

(如下使用的是javax.websocket包,未使用最新的jakarta.websocket,主要是测试项目基于SpringBoot+Tomcat9.x的,Java API for WebSocket版本需要保持一致。)

1、服务器端实现

(1)@ServerEndpoint注解方式

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

@ServerEndpoint(value = "/ws/test/{userId}", encoders = {MessageEncoder.class}, decoders = {MessageDecoder.class}, configurator = MyServerConfigurator.class)
public class WebSocketServerEndpoint {
    private Session session;
    private String userId;
    @OnOpen
    public void OnOpen(Session session, @PathParam(value = "userId") String userId) {
        this.session = session;
        this.userId = userId;
        // 建立连接后,将连接存到一个map里
        endpointMap.put(userId, this);
        Message message = new Message(0, "connected, hello " + userId);
        sendMsg(message);
    }

    @OnClose
    public void OnClose() {
        // 关闭连接时触发,从map中删除连接
        endpointMap.remove(userId);
        System.out.println("server closed...");
    }

    @OnMessage
    public void onMessage(Message message) {
        System.out.println("server recive message=" + message.toString());
    }

    @OnError
    public void onError(Throwable t) throws Throwable {
        this.session.close(new CloseReason(CloseReason.CloseCodes.CLOSED_ABNORMALLY, "系统异常"));
        t.printStackTrace();
    }
    
    /**
     * 群发
     * @param data
     */
    public void sendAllMsg(Message data) {
        for (WebSocketServerEndpoint value : endpointMap.values()) {
            value.sendMsgAsync(data);
        }
    }

    /**
     * 推送消息给指定 userId
     * @param data
     * @param userId
     */
    public void sendMsg(Message data, String userId) {
        WebSocketServerEndpoint endpoint = endpointMap.get(userId);
        if (endpoint == null) {
            System.out.println("not conected to " + userId);
            return;
        }
        endpoint.sendMsgAsync(data);
    }

    private void sendMsg(Message data) {
        try {
            this.session.getBasicRemote().sendObject(data);
        } catch (IOException ioException) {
            ioException.printStackTrace();
        } catch (EncodeException e) {
            e.printStackTrace();
        }
    }

    private void sendMsgAsync(Message data) {
        this.session.getAsyncRemote().sendObject(data);
    }
    // 存储建立连接的Endpoint
    private static ConcurrentHashMap<String, WebSocketServerEndpoint> endpointMap = new ConcurrentHashMap<String, WebSocketServerEndpoint>();
}

每一个客户端与服务器端建立连接后,都会生成一个WebSocketServerEndpoint,可以通过一个Map将其与userId对应存起来,为后续群发广播和单独推送消息给某个客户端提供便利。

注意:@ServerEndpointencodersdecodersconfigurator等配置信息在实际使用中可以不定义,如果项目简单,完全可以用默认的。

如果通信消息被封装成一个对象,如示例的Message(因为源码过于简单就不展示了,属性主要有codemsgdata),就必须提供编码器和解码器。也可以在每次发送消息时硬编码转为字符串,在接收到消息时转为Message。有了编码器和解码器,显得比较规范,转为字符串由编码器做,字符串转为对象由解码器做,但也使得架构变复杂了,视项目需求而定。

Configurator的用处就是自定义Endpoint对象创建方式,默认Tomcat提供的是通过反射。WebScoket是每个连接都会创建一个Endpoint对象,如果连接比较多,很频繁,通过反射创建,用后即毁,可能不是一个好主意,所以可以搞一个对象池,用过回收,用时先从对象池中拿,有就重置,省去实例化分配内存等消耗过程。

如果使用SpringBoot内置TomcatundertowNetty等,接入WebSocket时除了加@ServerEndpoint还需要加一个@Component,再给Spring注册一个ServerEndpointExporter类,这样,服务端Endpoint就交由Spring去扫描注册了。

@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        ServerEndpointExporter serverEndpointExporter = new ServerEndpointExporter();
        return serverEndpointExporter;
    }
}

外置Tomcat就不需要这么麻烦,Tomcat会默认扫描classpath下带有@ServerEndpoint注解的类。(SpringBoot接入Websocket后续会单独出文章讲解,也挺有意思的)

(2)继承抽象类Endpoint方式

import javax.websocket.*;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;

public class WebSocketServerEndpoint extends Endpoint {
    private Session session;
    private String userId;

    @Override
    public void onOpen(Session session, EndpointConfig endpointConfig) {
        this.session = session;
        this.userId = session.getPathParameters().get("userId");
        session.addMessageHandler(new MessageHandler());
        endpointMap.put(userId, this);
        Message message = new Message(0, "connected, hello " + userId);
        sendMsg(message);
    }

    @Override
    public void onClose(Session session, CloseReason closeReason) {
        endpointMap.remove(userId);
    }

    @Override
    public void onError(Session session, Throwable throwable) {
        throwable.printStackTrace();
    }
    
    /**
     * 群发
     * @param data
     */
    public void sendAllMsg(Message data) {
        for (WebSocketServerEndpoint value : endpointMap.values()) {
            value.sendMsgAsync(data);
        }
    }

    /**
     * 推送消息给指定 userId
     * @param data
     * @param userId
     */
    public void sendMsg(Message data, String userId) {
        WebSocketServerEndpoint endpoint = endpointMap.get(userId);
        if (endpoint == null) {
            System.out.println("not conected to " + userId);
            return;
        }
        endpoint.sendMsgAsync(data);
    }

    private void sendMsg(Message data) {
        try {
            this.session.getBasicRemote().sendObject(data);
        } catch (IOException ioException) {
            ioException.printStackTrace();
        } catch (EncodeException e) {
            e.printStackTrace();
        }
    }

    private void sendMsgAsync(Message data) {
        this.session.getAsyncRemote().sendObject(data);
    }

    private class MessageHandler implements javax.websocket.MessageHandler.Whole<Message> {

        @Override
        public void onMessage(Message message) {
            System.out.println("server recive message=" + message.toString());
        }
    }

    private static ConcurrentHashMap<String, WebSocketServerEndpoint> endpointMap = new ConcurrentHashMap<String, WebSocketServerEndpoint>();

}

继承抽象类Endpoint方式比加注解@ServerEndpoint方式麻烦的很,主要是需要自己实现MessageHandlerServerApplicationConfig@ServerEndpoint的话都是使用默认的,原理上差不多,只是注解更自动化,更简洁。

MessageHandler做的事情,一个@OnMessage就搞定了,ServerApplicationConfig做的URI映射、decodersencodersconfigurator等,一个@ServerEndpoint就可以了。

import javax.websocket.Decoder;
import javax.websocket.Encoder;
import javax.websocket.Endpoint;
import javax.websocket.server.ServerApplicationConfig;
import javax.websocket.server.ServerEndpointConfig;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class MyServerApplicationConfig implements ServerApplicationConfig {
    @Override
    public Set<ServerEndpointConfig> getEndpointConfigs(Set<Class<? extends Endpoint>> set) {
        Set<ServerEndpointConfig> result = new HashSet<ServerEndpointConfig>();
        List<Class<? extends Decoder>> decoderList = new ArrayList<Class<? extends Decoder>>();
        decoderList.add(MessageDecoder.class);
        List<Class<? extends Encoder>> encoderList = new ArrayList<Class<? extends Encoder>>();
        encoderList.add(MessageEncoder.class);

        if (set.contains(WebSocketServerEndpoint3.class)) {
            ServerEndpointConfig serverEndpointConfig = ServerEndpointConfig.Builder
                    .create(WebSocketServerEndpoint3.class, "/ws/test3")
                    .decoders(decoderList)
                    .encoders(encoderList)
                    .configurator(new MyServerConfigurator())
                    .build();
            result.add(serverEndpointConfig);
        }
        return result;
    }

    @Override
    public Set<Class<?>> getAnnotatedEndpointClasses(Set<Class<?>> set) {
        return set;
    }
}

如果使用SpringBoot内置Tomcat,则不需要ServerApplicationConfig了,但是需要给Spring注册一个ServerEndpointConfig

@Bean
public ServerEndpointConfig serverEndpointConfig(以上是关于WebSocket通信原理和在Tomcat中实现源码详解(万字爆肝)的主要内容,如果未能解决你的问题,请参考以下文章

利用webSocket使网页和服务器通信

WebSocket通信协议基础原理与潜在安全威胁

WebSocket通信协议基础原理与潜在安全威胁

tomcat websocket servlet监听端口

WebSocket 实战

Java后端WebSocket的Tomcat实现