WebSocket协议+Nginx动态负载均衡(史上最全)
Posted 架构师-尼恩
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了WebSocket协议+Nginx动态负载均衡(史上最全)相关的知识,希望对你有一定的参考价值。
文章很长,建议收藏起来慢慢读!疯狂创客圈总目录 语雀版 | 总目录 码云版| 总目录 博客园版 为您奉上珍贵的学习资源 :
-
免费赠送 经典图书:《Java高并发核心编程(卷1)》 面试必备 + 大厂必备 +涨薪必备 加尼恩免费领
-
免费赠送 经典图书:《Java高并发核心编程(卷2)》 面试必备 + 大厂必备 +涨薪必备 加尼恩免费领
-
免费赠送 经典图书:《Netty Zookeeper Redis 高并发实战》 面试必备 + 大厂必备 +涨薪必备 加尼恩免费领
-
免费赠送 经典图书:《SpringCloud Nginx高并发核心编程》 面试必备 + 大厂必备 +涨薪必备 加尼恩免费领
-
免费赠送 资源宝库: Java 必备 百度网盘资源大合集 价值>10000元 加尼恩领取
SpringCloud 微服务 精彩博文 | |
---|---|
nacos 实战(史上最全) | sentinel (史上最全+入门教程) |
SpringCloud gateway (史上最全) | 分库分表sharding-jdbc底层原理与实操(史上最全,5W字长文,吐血推荐) |
WebSocket协议+nginx动态负载均衡(史上最全)
html5 拥有众多引人注目的新特性,如 Canvas、本地存储、多媒体编程接口、WebSocket 等等。
其中,WebSocket 的出现使得浏览器提供对 Socket 的支持成为可能,从而在浏览器和服务器之间提供了一个基于 TCP 连接的双向通道。
使用 WebSocket,web开发人员可以很方便地构建实时 web 应用。
背景
以前,很多网站使用轮询实现推送技术。轮询是在特定的的时间间隔(比如1秒),由浏览器对服务器发出HTTP request,然后由服务器返回最新的数据给浏览器。轮询的缺点很明显,浏览器需要不断的向服务器发出请求,然而HTTP请求的header是非常长的,而实际传输的数据可能很小,这就造成了带宽和服务器资源的浪费。
Comet使用了AJAX改进了轮询,可以实现双向通信。但是Comet依然需要发出请求,而且在Comet中,普遍采用了长链接,这也会大量消耗服务器带宽和资源。
于是,WebSocket协议应运而生。
然后修改 Hosts, 添加, 比如 ws.repo
, 指向 127.0.0.1
然后是 Nginx 配置:
map $http_upgrade $connection_upgrade
default upgrade;
'' close;
server
listen 80;
server_name ws.repo;
location /
proxy_pass http://127.0.0.1:3000/;
proxy_redirect off;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
Reload Nginx 然后从浏览器控制台尝试链接, OK
new WebSocket('ws://ws.repo/')
或者通过 Upstream 的写法:
map $http_upgrade $connection_upgrade
default upgrade;
'' close;
upstream ws_server
server 127.0.0.1:3000;
server
listen 80;
server_name ws.repo;
location /
proxy_pass http://ws_server/;
proxy_redirect off;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;
WebSocket 先是通过 HTTP 建立连接,
然后通过 101 状态码, 表示切换协议, 在配置里是 Upgrade
WebSocket协议
浏览器通过 javascript 向服务器发出建立 WebSocket 连接的请求,连接建立以后,客户端和服务器通过 TCP 连接直接交换数据。WebSocket 连接本质上是一个 TCP 连接。
WebSocket在数据传输的稳定性和数据传输量的大小方面,具有很大的性能优势。Websocket.org 比较了轮询和WebSocket的性能优势:
从上图可以看出,WebSocket具有很大的性能优势,流量和负载增大的情况下,优势更加明显。
WebSocket 协议分析
WebSocket 协议解决了浏览器和服务器之间的全双工通信问题。在WebSocket出现之前,浏览器如果需要从服务器及时获得更新,则需要不停的对服务器主动发起请求,也就是 Web 中常用的 poll 技术。这样的操作非常低效,这是因为每发起一次新的 HTTP 请求,就需要单独开启一个新的 TCP 链接,同时 HTTP 协议本身也是一种开销非常大的协议。为了解决这些问题,所以出现了 WebSocket 协议。WebSocket 使得浏览器和服务器之间能通过一个持久的 TCP 链接就能完成数据的双向通信。关于 WebSocket 的 RFC 提案,可以参看 RFC6455。
WebSocket 和 HTTP 协议一般情况下都工作在浏览器中,但 WebSocket 是一种完全不同于 HTTP 的协议。尽管,浏览器需要通过 HTTP 协议的 GET 请求,将 HTTP 协议升级为 WebSocket 协议。升级的过程被称为 握手(handshake)。当浏览器和服务器成功握手后,则可以开始根据 WebSocket 定义的通信帧格式开始通信了。像其他各种协议一样,WebSocket 协议的通信帧也分为控制数据帧和普通数据帧,前者用于控制 WebSocket 链接状态,后者用于承载数据。下面我们将一一分析 WebSocket 协议的握手过程以及通信帧格式。
WebSocket 协议的握手过程
握手的过程也就是将 HTTP 协议升级为 WebSocket 协议的过程。前面我们说过,握手开始首先由浏览器端发送一个 GET 请求开发,该请求的 HTTP 头部信息如下:
Connection: Upgrade
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
Sec-WebSocket-Key: lGrvj+i7B76RB3YYbScQ9g==
Sec-WebSocket-Version: 13
Upgrade: websocket
当服务器端,成功验证了以上信息后,则会返回一个形如以下信息的响应:
Connection: upgrade
Sec-WebSocket-Accept: nImJE2gpj1XLtrOb+5cBMJn7bNQ=
Upgrade: websocket
可以看到,浏览器发送的 HTTP 请求中,增加了一些新的字段,其作用如下所示:
- Upgrade: 规定必需的字段,其值必需为 websocket, 如果不是则握手失败;
- Connection: 规定必需的字段,值必需为 Upgrade, 如果不是则握手失败;
- Sec-WebSocket-Key: 必需字段,一个随机的字符串;
- Sec-WebSocket-Version: 必需字段,代表了 WebSocket 协议版本,值必需是 13, 否则握手失败;
返回的响应中,如果握手成功会返回状态码为 101 的 HTTP 响应。同时其他字段说明如下:
- Upgrade: 规定必需的字段,其值必需为 websocket, 如果不是则握手失败;
- Connection: 规定必需的字段,值必需为 Upgrade, 如果不是则握手失败;
- Sec-WebSocket-Accept: 规定必需的字段,该字段的值是通过固定字符串258EAFA5-E914-47DA-95CA-C5AB0DC85B11加上请求中Sec-WebSocket-Key字段的值,然后再对其结果通过 SHA1 哈希算法求出的结果。
当浏览器和服务器端成功握手后,就可以传送数据了,传送数据是按照 WebSocket 协议的数据格式生成的。
WebSocket 协议数据帧
数据帧的定义类似于 TCP/IP 协议的格式定义,具体看下图:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
| Extended payload length continued, if payload len == 127 |
+ - - - - - - - - - - - - - - - +-------------------------------+
| |Masking-key, if MASK set to 1 |
+-------------------------------+-------------------------------+
| Masking-key (continued) | Payload Data |
+-------------------------------- - - - - - - - - - - - - - - - +
: Payload Data continued ... :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
| Payload Data continued ... |
+---------------------------------------------------------------+
以上这张图,一行代表 32 bit (位) ,也就是 4 bytes。总体上包含两份,帧头部和数据内容。每个从 WebSocket 链接中接收到的数据帧,都要按照以上格式进行解析,这样才能知道该数据帧是用于控制的还是用于传送数据的。
WebSocket与HTTP的关系
相比HTTP长连接,WebSocket有以下特点:
1)是真正的全双工方式,建立连接后客户端与服务器端是完全平等的,可以互相主动请求。而HTTP长连接基于HTTP,是传统的客户端对服务器发起请求的模式。
2)HTTP长连接中,每次数据交换除了真正的数据部分外,服务器和客户端还要大量交换HTTP header,信息交换效率很低。Websocket协议通过第一个request建立了TCP连接之后,之后交换的数据都不需要发送 HTTP header就能交换数据,这显然和原有的HTTP协议有区别所以它需要对服务器和客户端都进行升级才能实现(主流浏览器都已支持HTML5)。此外还有 multiplexing、不同的URL可以复用同一个WebSocket连接等功能。这些都是HTTP长连接不能做到的。
WebSocket与Http相同点
-
都是一样基于TCP的,都是可靠性传输协议。
-
都是应用层协议。
WebSocket与Http不同点
- WebSocket是双向通信协议,模拟Socket协议,可以双向发送或接受信息。HTTP是单向的。
- WebSocket是需要浏览器和服务器握手进行建立连接的。而http是浏览器发起向服务器的连接,服务器预先并不知道这个连接。
传统HTTP客户端与服务器请求响应模式如下图所示:
WebSocket模式客户端与服务器请求响应模式如下图:
上图对比可以看出,相对于传统HTTP每次请求-应答都需要客户端与服务端建立连接的模式,WebSocket是类似Socket的TCP长连接通讯模式。一旦WebSocket连接建立后,后续数据都以帧序列的形式传输。在客户端断开WebSocket连接或Server端中断连接前,不需要客户端和服务端重新发起连接请求。
WebSocket与Http联系
传统的http通讯模式是:客户端发起请求,服务端接收请求并作出响应。
WebSocket在建立握手时,数据是通过HTTP传输的。
第一步,建立连接,客户端使用http报文的格式发起协议升级的请求,服务端响应协议升级。
但是建立之后,在真正传输时候是不需要HTTP协议的。而websocket协议复用了http的握手通道,具体指的是,客户端通过HTTP请求与WebSocket服务端协商升级协议。
第二步,交换数据,客户端与服务端可以使用websocket协议进行双向通讯。
在WebSocket中,只需要服务器和浏览器通过HTTP协议进行一个握手的动作,然后单独建立一条TCP的通信通道进行数据的传送。
WebSocket连接的过程是:
1)客户端发起http请求,经过3次握手后,建立起TCP连接;
http请求里存放WebSocket支持的版本号等信息,如:Upgrade、Connection、WebSocket-Version等;
2)服务器收到客户端的握手请求后,同样采用HTTP协议回馈数据;
3)客户端收到连接成功的消息后,开始借助于TCP传输信道进行全双工通信。
websocket协议报文
客户端请求
在客户端,new WebSocket实例化一个新的WebSocket客户端对象,
请求类似 ws://yourdomain:port/ws 的服务端WebSocket URL,
客户端WebSocket对象会自动解析并识别为WebSocket请求,并连接服务端端口,执行双方握手过程,客户端发送数据格式类似:
GET /ws HTTP/1.1
Upgrade: websocket
Connection: Upgrade
Host: example.com
Origin: http://localhost:8080
Sec-WebSocket-Key: sN9cRrP/n9NdMgdcy2VJFQ==
Sec-WebSocket-Version: 13
可以看到,客户端发起的WebSocket连接报文类似传统HTTP报文,
Upgrade:websocket参数值表明这是WebSocket类型请求,
Sec-WebSocket-Key是WebSocket客户端发送的一个 base64编码的密文,要求服务端必须返回一个对应加密的Sec-WebSocket-Accept应答,否则客户端会抛出Error during WebSocket handshake错误,并关闭连接。
服务器回应
服务端收到报文后返回的数据格式类似:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: fFBooB7FAkLlXgRSz0BT3v4hq5s=
Sec-WebSocket-Origin: null
Sec-WebSocket-Location: ws://example.com/
HTTP/1.1 101 Switching Protocols
表示服务端接受WebSocket协议的客户端连接,
Sec-WebSocket-Accept
的值是服务端采用与客户端一致的密钥计算出来后返回客户端的,
客户端过来的 Sec-WebSocket-Key是随机的,服务器端会用这些数据来构造出一个SHA-1的信息摘要。把Sec-WebSocket-Key
加上一个魔幻字符串,使用 SHA-1 加密,之后进行 BASE-64编码,将结果作为 Sec-WebSocket-Accept
头的值,返回给客户端。
经过这样的请求-响应处理后,两端的WebSocket连接握手成功, 后续就可以进行TCP通讯了。
在开发方面,WebSocket API 也十分简单:只需要实例化 WebSocket,创建连接,然后服务端和客户端就可以相互发送和响应消息。在WebSocket 实现及案例分析部分可以看到详细的 WebSocket API 及代码实现。
浏览器兼容性
最新的主流浏览器对WebSocket支持良好:
- Chrome 4+
- Firefox 4+
- Internet Explorer 10+
- Opera 10+
- Safari 5+
客户端案例
JavaScript客户端
WebSocket协议本质上是一个基于TCP的协议,为了建立一个WebSocket连接,浏览器需要向服务器发起一个HTTP请求,这个请求和普通的HTTP请求不同,它包含了一些附加头信息,服务器解析这些附加头信息后产生应答信息返回给客户端,客户端和服务端的WebSocket连接就建立起来了,双方可以通过连接通道自由的传递信息,并且这个连接会持续存在直到客户端或服务端某一方主动关闭连接。
function webSocket()
if("WebSocket" in window)
console.log("您的浏览器支持WebSocket");
var ws = new WebSocket("ws://localhost:8080"); //创建WebSocket连接
//...
else
console.log("您的浏览器不支持WebSocket");
客户端支持WebSocket的浏览器中,在创建socket后,可以通过onopen、onmessage、onclose和onerror四个事件对socket进行响应。
浏览器通过Javascript向服务器发出建立WebSocket连接的请求,连接建立后,客户端和服务器就可以通过TCP连接直接交换数据。当你获取WebSocket连接后,可以通多send()方法向服务器发送数据,可以通过onmessage事件接收服务器返回的数据。
var ws = new WebSocket("ws://localhost:8080");
//申请一个WebSocket对象,参数是服务端地址,同http协议使用http://开头一样,WebSocket协议的url使用ws://开头,另外安全的WebSocket协议使用wss://开头
ws.onopen = function()
//当WebSocket创建成功时,触发onopen事件
console.log("open");
ws.send("hello"); //将消息发送到服务端
ws.onmessage = function(e)
//当客户端收到服务端发来的消息时,触发onmessage事件,参数e.data包含server传递过来的数据
console.log(e.data);
ws.onclose = function(e)
//当客户端收到服务端发送的关闭连接请求时,触发onclose事件
console.log("close");
ws.onerror = function(e)
//如果出现连接、处理、接收、发送数据失败的时候触发onerror事件
console.log(error);
WebSocket的所有操作都是采用事件的方式触发的,这样不会阻塞UI,是的UI有更快的响应时间,有更好的用户体验。
WebSocke的方法
WebSocke的属性
Socket.IO客户端
Socket.IO是一个封装了WebSocket的JavaScript模块。
因为完全使用JavaScript编写,所以在每个浏览器和移动设备中都可以方便地通过Socket.IO使用WebSocket。
服务器端
var io = require('socket.io').listen(80);
io.sockets.on('connection', function (socket)
socket.emit('news', hello: 'world' );
socket.on('my other event', function (data)
console.log(data);
);
);
客户端
var socket = io.connect('http://localhost');
socket.on('news', function (data)
console.log(data);
socket.emit('my other event', my: 'data' );
);
netty客户端模块
package com.crazymaker.springcloud.websocket.client;
import com.crazymaker.springcloud.common.constants.SessionConstants;
import com.crazymaker.springcloud.common.util.JsonUtil;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NiosocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
/**
* 基于websocket的netty客户端
*/
public class WebSocketMockClient
private static String account = "1860000000";
// static String uriString = "ws://127.0.0.1:9999/push";
static String uriString = "ws://cdh2:9999/push";
static String token = "eyJhbGciOiJSUzI1NiJ9.eyJqdGkiOiIxIiwic2lkIjoiNGFiMzVkNDMtZWNhZC00ZDhkLTkwN2MtZjA4NTIxYjU2ODVkIiwiZXhwIjoxNjQ5MzI2NDA4LCJpYXQiOjE2NDkyOTQwMDh9.cN6QTW__p3-RznkU4TqUo1sFIz2Ww_piWFTOvFJ7QoGqcq93ynNsE7RTMgGGYpX3Dpe6W_3vaWmJsHdzt8hme3kxwfKPnZfUF3hUwYCCU4WvXpQjwCFH1W_FSMZjZT2tvyPAmP75_4NDbTJ6sAw1hPVoEKIiGVkO0Aml_CixgqTY0UIyY0nCcz8T1yGkR5wPMhIyxQKPSjWU0UfyPovzIfwSKePfxnqgF42-_BA_YnrVL2qS9pNtTrtm-Bd2LNp5XLbOg-1mWCrHBl7DrYsBj9Q5hMSgy2cJxteyOz2gmfj4HiGeE_KCQO5ZcIChBkOJ9JV5HrzQ8xjGGoPtIReRiA";
public static void main(String[] args) throws Exception
//netty基本操作,线程组
EventLoopGroup group = new NioEventLoopGroup();
//netty基本操作,启动类
Bootstrap boot = new Bootstrap();
boot.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.group(group)
.handler(new LoggingHandler(LogLevel.INFO))
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>()
protected void initChannel(SocketChannel socketChannel) throws Exception
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("http-codec", new HttpClientCodec());
pipeline.addLast("aggregator", new HttpObjectAggregator(1024 * 1024 * 10));
pipeline.addLast("ws-handler", new WebSocketClientHandler());
);
//websocke连接的地址,/hello是因为在服务端的websockethandler设置的
URI websocketURI = new URI(uriString);
HttpHeaders httpHeaders = new DefaultHttpHeaders();
httpHeaders.set(SessionConstants.AUTHORIZATION_HEAD, token);
httpHeaders.set(SessionConstants.APP_ACCOUNT, account);
//进行握手
WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(websocketURI, WebSocketVersion.V13, (String) null, true, httpHeaders);
//客户端与服务端连接的通道,final修饰表示只会有一个
final Channel channel = boot.connect(websocketURI.getHost(), websocketURI.getPort()).sync().channel();
WebSocketClientHandler handler = (WebSocketClientHandler) channel.pipeline().get("ws-handler");
handler.setHandshaker(handshaker);
handshaker.handshake(channel);
//阻塞等待是否握手成功
handler.handshakeFuture().sync();
System.out.println("握手成功");
//给服务端发送的内容,如果客户端与服务端连接成功后,可以多次掉用这个方法发送消息
sengMessage(channel);
public static void sengMessage(Channel channel)
Map<String, String> map = new HashMap<>();
map.put("type", "msg");
map.put("msg", "你好,我是 " + account);
//发送的内容,是一个文本格式的内容
String putMessage = JsonUtil.pojoToJson(map);
TextWebSocketFrame frame = new TextWebSocketFrame(putMessage);
channel.writeAndFlush(frame).addListener(new ChannelFutureListener()
public void operationComplete(ChannelFuture channelFuture) throws Exception
if (channelFuture.isSuccess())
System.out.println("消息发送成功,发送的消息是:" + putMessage);
else
System.out.println("消息发送失败 " + channelFuture.cause().getMessage());
);
handler
package com.crazymaker.springcloud.websocket.client;
import io.netty.channel.*;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;
public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object>
//握手的状态信息
WebSocketClientHandshaker handshaker;
//netty自带的异步处理
ChannelPromise handshakeFuture;
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception
System.out.println("当前握手的状态"+this.handshaker.isHandshakeComplete());
Channel channel = ctx.channel();
FullHttpResponse response;
//进行握手操作
if (!this.handshaker.isHandshakeComplete())
try
response = (FullHttpResponse)msg;
//握手协议返回,设置结束握手
this.handshaker.finishHandshake(channel, response);
//设置成功
this.handshakeFuture.setSuccess();
System.out.println("服务端的消息"+response.headers());
catch (WebSocketHandshakeException var7)
FullHttpResponse res = (FullHttpResponse)msg;
String errorMsg = String.format("握手失败,status:%s,reason:%s", res.status(), res.content().toString(CharsetUtil.UTF_8));
this.handshakeFuture.setFailure(new Exception(errorMsg));
else if (msg instanceof FullHttpResponse)
response = (FullHttpResponse)msg;
throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
else
//接收服务端的消息
WebSocketFrame frame = (WebSocketFrame)msg;
//文本信息
if (frame instanceof TextWebSocketFrame)
TextWebSocketFrame textFrame = (TextWebSocketFrame)frame;
System.out.println("客户端接收的消息是:"+textFrame.text());
//二进制信息
if (frame instanceof BinaryWebSocketFrame)
BinaryWebSocketFrame binFrame = (BinaryWebSocketFrame)frame;
System.out.println("BinaryWebSocketFrame");
//ping信息
if (frame instanceof PongWebSocketFrame)
System.out.println("WebSocket Client received pong");
//关闭消息
if (frame instanceof CloseWebSocketFrame)
System.out.println("receive close frame");
channel.close();
/**
* Handler活跃状态,表示连接成功
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception
System.out.println("与服务端连接成功");
/**
* 非活跃状态,没有连接远程主机的时候。
*
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception
System.out.println("主机关闭");
/**
* 异常处理
* @param ctx
* @param cause
* @throws Exception
*/
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
System.out.println("连接异常:"+cause.getMessage());
ctx.close();
public void handlerAdded(ChannelHandlerContext ctx)
this.handshakeFuture = ctx.newPromise();
public WebSocketClientHandshaker getHandshaker()
return handshaker;
public void setHandshaker(WebSocketClientHandshaker handshaker)
this.handshaker = handshaker;
public ChannelPromise getHandshakeFuture()
return handshakeFuture;
public void setHandshakeFuture(ChannelPromise handshakeFuture)
this.handshakeFuture = handshakeFuture;
public ChannelFuture handshakeFuture()
return this.handshakeFuture;
Netty中Websocket握手安全验证
在使用Netty开发Websocket服务时,通常需要解析来自客户端请求的URL、Headers等等相关内容,并做相关检查或处理。
这里将讨论两种实现方法。
方法一:基于HandshakeComplete事件进行安全验证
特点:使用简单、校验在握手成功之后、失败信息可以通过Websocket发送回客户端。
下面的代码展示了如何监听自定义事件。
通过抛出异常可以终止链接,同时可以利用ctx
向客户端以Websocket协议返回错误信息。
private final class ServerHandler extends SimpleChannelInboundHandler<DeviceDataPacket>
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception
if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete)
// 在此处获取URL、Headers等信息并做校验,通过throw异常来中断链接。
//比如:通过url中的参数,来检验
String requestUri = handshakeComplete.requestUri();
requestUri = URLDecoder.decode(requestUri, "UTF-8");
log.info("HANDSHAKE_COMPLETE,ID->,URI->", channel.id().asLongText(), requestUri);
String socketKey = requestUri.substring(requestUri.lastIndexOf(dataKey) + dataKey.length());
对key进行校验
super.userEventTriggered(ctx, evt);
验证案例
package com.crazymaker.springcloud.websocket.netty;
import com.crazymaker.springcloud.common.dto.UserDTO;
import com.crazymaker.springcloud.websocket.netty.event.SecurityCheckCompleteEvent;
import com.crazymaker.springcloud.websocket.processer.RpcProcesser;
import com.crazymaker.springcloud.websocket.session.ServerSession;
import com.crazymaker.springcloud.websocket.session.SessionMap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
/**
* Created by 尼恩 @ 疯狂创客圈
* <p>
* WebSocket 帧:WebSocket 以帧的方式传输数据,每一帧代表消息的一部分。一个完整的消息可能会包含许多帧
*/
@Slf4j
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception
//增加消息的引用计数(保留消息),并将他写到 ChannelGroup 中所有已经连接的客户端
ServerSession session = ServerSession.getSession(ctx);
String result = RpcProcesser.inst().onMessage(msg.text(), session);
if (result != null)
SessionMap.getSingleton().sendMsg(ctx, result);
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception
//是否握手成功,升级为 Websocket 协议
if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE)
// 握手成功,移除 HttpRequestHandler,因此将不会接收到任何消息
// 并把握手成功的 Channel 加入到 ChannelGroup 中
doAuth(....)
else if (evt instanceof IdleStateEvent)
IdleStateEvent stateEvent = (IdleStateEvent) evt;
if (stateEvent.state() == IdleState.READER_IDLE)
ServerSession session = ServerSession.getSession(ctx);
String ack = RpcProcesser.inst().onIdleTooLong(session);
SessionMap.getSingleton().closeSessionAfterAck(session, ack);
public void doAuth(ChannelHandlerContext ctx, Object msg) throws Exception
if (msg instanceof FullHttpMessage)
//extracts token information from headers
HttpHeaders headers = ((FullHttpMessage) msg).headers();
String token = Objects.requireNonNull(headers.get(SessionConstants.AUTHORIZATION_HEAD));
//extracts account information from headers
String account = Objects.requireNonNull(headers.get(SessionConstants.APP_ACCOUNT));
if (null == token || null == account)
// 参数校验、设置响应
String content = "请登陆之后,再发起websocket连接!!!";
closeUnauthChannelAfterWrite(ctx, content);
return;
Payload<String> payload = null;
// 在此处获取URL、Headers等信息并做校验,通过throw异常来中断链接。
try
payload = AuthUtils.decodeRsaToken(token);
catch (Exception e)
// 解码异常、设置响应
String content = "请登陆之后,再发起websocket连接!!!";
closeUnauthChannelAfterWrite(ctx, content);
return;
if (null == payload)
// 解码异常、设置响应
String content = "请登陆之后,再发起websocket连接!!!";
closeUnauthChannelAfterWrite(ctx, content);
return;
String appId = payload.getId();
SecurityCheckCompleteEvent complete = new SecurityCheckCompleteEvent(token,appId, account);
ctx.channel().attr(SECURITY_CHECK_COMPLETE_ATTRIBUTE_KEY).set(complete);
ctx.fireUserEventTriggered(complete);
WebSocketServerProtocolHandshakeHandler源码分析
一般地,我们将netty内置的WebSocketServerProtocolHandler
作为Websocket协议的主要处理器。
通过研究其代码我们了解到在本处理器被添加到Pipline
后handlerAdded
方法将会被调用。
此方法经过简单的检查后将WebSocketHandshakeHandler
添加到了本处理器之前,用于处理握手相关业务。
我们都知道Websocket协议在握手时是通过HTTP(S)协议进行的,那么这个WebSocketHandshakeHandler
应该就是处理HTTP相关的数据的吧?
package io.netty.handler.codec.http.websocketx;
public class WebSocketServerProtocolHandler extends WebSocketProtocolHandler
@Override
public void handlerAdded(ChannelHandlerContext ctx)
ChannelPipeline cp = ctx.pipeline();
if (cp.get(WebSocketServerProtocolHandshakeHandler.class) == null)
// Add the WebSocketHandshakeHandler before this one.
cp.addBefore(ctx.name(), WebSocketServerProtocolHandshakeHandler.class.getName(),
new WebSocketServerProtocolHandshakeHandler(serverConfig));
//...
我们来看看WebSocketServerProtocolHandshakeHandler
都做了什么操作。
channelRead
方法会尝试接收一个FullHttpRequest
对象,表示来自客户端的HTTP请求,随后服务器将会进行握手相关操作,此处省略了握手大部分代码,感兴趣的同学可以自行阅读。
可以注意到,在确认握手成功后,channelRead
将会调用两次fireUserEventTriggered
,此方法将会触发自定义事件。
其他(在此处理器之后)的处理器会触发userEventTriggered
方法。
其中一个方法传入了WebSocketServerProtocolHandler
对象,此对象保存了HTTP请求相关信息。
那么解决方案逐渐浮出水面,通过监听自定义事件即可实现检查握手的HTTP请求。
package io.netty.handler.codec.http.websocketx;
/**
* Handles the HTTP handshake (the HTTP Upgrade request) for @link WebSocketServerProtocolHandler.
*/
class WebSocketServerProtocolHandshakeHandler extends ChannelInboundHandlerAdapter
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception
final FullHttpRequest req = (FullHttpRequest) msg;
if (isNotWebSocketPath(req))
ctx.fireChannelRead(msg);
return;
try
//...
if (!future.isSuccess())
else
localHandshakePromise.trySuccess();
// Kept for compatibility
ctx.fireUserEventTriggered(
WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE);
ctx.fireUserEventTriggered(
new WebSocketServerProtocolHandler.HandshakeComplete(
req.uri(), req.headers(), handshaker.selectedSubprotocol()));
finally
req.release();
说明: 以上源码比较复杂,具体的解读,请参见19章视频
方法一的流水线装配
附上Channel初始化代码作为参考。
private final class ServerInitializer extends ChannelInitializer<SocketChannel>
@Override
protected void initChannel(SocketChannel ch)
ch.pipeline()
.addLast("http-codec", new HttpServerCodec())
.addLast("chunked-write", new ChunkedWriteHandler())
.addLast("http-aggregator", new HttpObjectAggregator(8192))
.addLast("log-handler", new LoggingHandler(LogLevel.WARN))
.addLast("ws-server-handler", new WebSocketServerProtocolHandler(endpointUri.getPath()))
.addLast("server-handler", new ServerHandler());
方法一的问题:
方法一中,ws握手已经完成,所以虽然这种方案简单的过分,但是效率并不高,耗费服务端资源(都握手了又给人家踢了)。
方法二:在ws握手之前,进行安全检查处理器
特点:使用相对复杂、校验在握手成功之前、失败信息可以通过HTTP返回客户端。
解决方案
编写一个入站处理器,接收FullHttpMessage
消息,在Websocket处理器之前检测拦截请求信息。
下面的例子主要做了四件事情:
- 从HTTP请求中提取关心的数据
- 安全检查
- 将结果和其他数据绑定在Channel
- 触发安全检查完毕自定义事件
package com.crazymaker.springcloud.websocket.netty.handler;
import com.crazymaker.springcloud.base.auth.AuthUtils;
import com.crazymaker.springcloud.base.auth.Payload;
import com.crazymaker.springcloud.common.constants.SessionConstants;
import com.crazymaker.springcloud.websocket.netty.event.SecurityCheckCompleteEvent;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.FullHttpMessage;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import java.util.Objects;
import static com.crazymaker.springcloud.netty.util.HttpUtil.closeUnauthChannelAfterWrite;
@Slf4j
public class AuthCheckHandler extends ChannelInboundHandlerAdapter
public static final AttributeKey SECURITY_CHECK_COMPLETE_ATTRIBUTE_KEY =
AttributeKey.valueOf("SECURITY_CHECK_COMPLETE_ATTRIBUTE_KEY");
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
if (msg instanceof FullHttpMessage)
//extracts token information from headers
HttpHeaders headers = ((FullHttpMessage) msg).headers();
String token = Objects.requireNonNull(headers.get(SessionConstants.AUTHORIZATION_HEAD));
//extracts account information from headers
String account = Objects.requireNonNull(headers.get(SessionConstants.APP_ACCOUNT));
if (null == token || null == account)
// 参数校验、设置响应
String content = "请登陆之后,再发起websocket连接!!!";
closeUnauthChannelAfterWrite(ctx, content);
return;
Payload<String> payload = null;
// 在此处获取URL、Headers等信息并做校验,通过throw异常来中断链接。
try
payload = AuthUtils.decodeRsaToken(token);
catch (Exception e)
// 解码异常、设置响应
String content = "请登陆之后,再发起websocket连接!!!";
closeUnauthChannelAfterWrite(ctx, content);
return;
if (null == payload)
// 解码异常、设置响应
String content = "请登陆之后,再发起websocket连接!!!";
closeUnauthChannelAfterWrite(ctx, content);
return;
String appId = payload.getId();
SecurityCheckCompleteEvent complete = new SecurityCheckCompleteEvent(token,appId, account);
ctx.channel().attr(SECURITY_CHECK_COMPLETE_ATTRIBUTE_KEY).set(complete);
ctx.fireUserEventTriggered(complete);
//other protocols
super.channelRead(ctx, msg);
在业务逻辑处理器中,可以通过组合自定义的安全检查事件和Websocket握手完成事件。
方法二流水线装配
附上Channel初始化代码作为参考。
package com.crazymaker.springcloud.websocket.netty;
import com.crazymaker.springcloud.standard.context.SpringContextUtil;
import com.crazymaker.springcloud.websocket.netty.handler.AuthCheckHandler;
import com.crazymaker.springcloud.websocket.netty.handler.ServerExceptionHandler;
import com.crazymaker.springcloud.websocket.netty.handler.TextWebSocketFrameHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
/**
* Netty 服务
* Created by 尼恩 @ 疯狂创客圈
*/
@Component
@Slf4j
public class WebSocketServer implements ApplicationContextAware
@Value("$tunnel.websocket.port")
private int websocketPort;
@Value("$websocket.register.gateway")
private String websocketRegisterGateway;
private final EventLoopGroup group = new NioEventLoopGroup();
private Channel channel;
/**
* 停止即时通讯服务器
*/
public void stopServer()
if (channel != null)
channel.close();
group.shutdownGracefully();
/**
* 启动即时通讯服务器
*/
public void startServer(int port)
ChannelFuture channelFuture = null;
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(group)
.channel(NioServerSocketChannel.class)
.childHandler(new ChatServerInitializer());
InetSocketAddress address = new InetSocketAddress(port);
channelFuture = bootstrap.bind(address);
// channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
// 返回与当前Java应用程序关联的运行时对象
Runtime.getRuntime().addShutdownHook(new Thread()
@Override
public void run()
stopServer();
);
log.info("\\n----------------------------------------------------------\\n\\t" +
"Nett WebSocket 服务 is running! Access Port:\\n\\t", websocketPort);
channelFuture.channel().closeFuture().syncUninterruptibly();
/**
* 内部类
*/
class ChatServerInitializer extends ChannelInitializer<Channel>
private static final int READ_IDLE_TIME_OUT = 600; // 读超时 s
private static final int WRITE_IDLE_TIME_OUT = 0;// 写超时
private static final int ALL_IDLE_TIME_OUT = 0; // 所有超时
@Override
protected void initChannel(Channel ch) throws Exception
ChannelPipeline pipeline = ch.pipeline();
// Netty自己的http解码器和编码器,报文级别 HTTP请求的解码和编码
pipeline.addLast(new HttpServerCodec());
// ChunkedWriteHandler 是用于大数据的分区传输
// 主要用于处理大数据流,比如一个1G大小的文件如果你直接传输肯定会撑暴jvm内存的;
// 增加之后就不用考虑这个问题了
pipeline.addLast(new ChunkedWriteHandler());
// HttpObjectAggregator 是完全的解析Http消息体请求用的
// 把多个消息转换为一个单一的完全FullHttpRequest或是FullHttpResponse,
// 原因是HTTP解码器会在每个HTTP消息中生成多个消息对象HttpRequest/HttpResponse,HttpContent,LastHttpContent
pipeline.addLast(new HttpObjectAggregator(64 * 1024));
pipeline.addLast(new AuthCheckHandler());
// WebSocket数据压缩
pipeline.addLast(new WebSocketServerCompressionHandler());
// WebSocketServerProtocolHandler是配置websocket的监听地址/协议包长度限制
pipeline.addLast(new WebSocketServerProtocolHandler("/push", null, true, 10 * 1024));
//当连接在60秒内没有接收到消息时,就会触发一个 IdleStateEvent 事件,
// 此事件被 HeartbeatHandler 的 userEventTriggered 方法处理到
pipeline.addLast(new IdleStateHandler(READ_IDLE_TIME_OUT, WRITE_IDLE_TIME_OUT, ALL_IDLE_TIME_OUT, TimeUnit.SECONDS));
//WebSocketServerHandler、TextWebSocketFrameHandler 是自定义逻辑处理器,
pipeline.addLast(new TextWebSocketFrameHandler());
pipeline.addLast(new ServerExceptionHandler());
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException
SpringContextUtil.setContext(applicationContext);
new Thread(() ->
startServer(websocketPort);
).start();
方法一与方法二的对比
上述两种方式分别在握手完成后和握手之前拦截检查;实现复杂度和性能略有不同,可以通过具体业务需求选择合适的方法。
Netty增强了责任链模式,使用userEvent
传递自定义事件使得各个处理器之间减少耦合,更专注于业务。
但是、相比于流动于各个处理器之间的"主线"数据来说,userEvent
传递的"支线"数据往往不受关注。
通过阅读Netty内置的各种处理器源码,探索其产生的事件,同时在开发过程中加以善用,可以减少冗余代码。
另外在开发自定义的业务逻辑时,应该积极利用userEvent
传递事件数据,降低各模块之间代码耦合。
Netty的WebSocket开发常见问题
1、proxy_http_version 1.1,为什么使用http1.1协议?
proxy_http_version 设置代理使用的HTTP协议版本。
proxy_http_version 默认使用的版本是1.0,而1.0版本默认是短链接,如果换成长链接,需要和 keepalive连接时一起使用。
http1.0没有加keepalive选型,后端服务会返回101错误,然后断开连接。
所以,默认情况下,1.0版本,显然不合适ws协议
proxy_http_version 1.1版本默认为长链接,推荐在使用
传统HTTP客户端与服务器请求响应模式如下图所示:
WebSocket模式客户端与服务器请求响应模式如下图:
上图对比可以看出,相对于传统HTTP每次请求-应答都需要客户端与服务端建立连接的模式,WebSocket是类似Socket的TCP长连接通讯模式。一旦WebSocket连接建立后,后续数据都以帧序列的形式传输。在客户端断开WebSocket连接或Server端中断连接前,不需要客户端和服务端重新发起连接请求。
2、为什么HTTP Upgrade的时候,需要Connection: upgrade
HTTP的Upgrade协议头
HTTP的Upgrade协议头机制用于将连接从HTTP连接升级到WebSocket连接,
但是,Upgrade机制使用了Upgrade协议头和Connection协议头;
结论是:
为了让Nginx可以将来自客户端的Upgrade请求发送到后端服务器,Upgrade和Connection的头信息必须被显式的设置。
也就是说:
WebSocket等协议的Upgrade请求,需要同时带上Connection和Upgrade头部。
如果是仅仅Upgrade的话,Connection头部不就是多余的设计了么?
具体原因,这里慢慢道来.
一个典型的WebSocket升级请求如下:
GET /chat HTTP/1.1
Host: example.com:8000
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
Connection的起源
最开始,在HTTP/1.0出现没多久,
以上是关于WebSocket协议+Nginx动态负载均衡(史上最全)的主要内容,如果未能解决你的问题,请参考以下文章
nginx/httpd + tomcat及负载均衡tomcat