netty实现WebSocket协议
Posted 快乐崇拜234
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了netty实现WebSocket协议相关的知识,希望对你有一定的参考价值。
文章目录
注:更多netty相关文章请访问博主专栏: netty专栏
WebSocket协议
一般web应用都是使用的HTTP协议。HTTP协议有以下特点:
- 支持
客户端-服务端
模式 - 使用简单:只需要知道服务端URL,携带参数发送请求即可
- 支持多种传输数据类型,由消息头中content-type标识
- 无状态,使得HTTP服务轻量级
HTTP协议也存在一些缺点:
- 半双工通信:同一时刻,数据只能往同一方向传输。比如向服务器发送消息时,服务器此时不可以向客户端发送消息。(不过目前HTTP2已经支持了全双工通信)
- HTTP消息冗长:HTTP消息包含消息头,消息体,换行符等。且大多采用文本传输。所以HTTP消息会有很多冗余消息并且消息占用字节数大,消耗过多的带宽
- 针对服务器推送的黑客攻击,如长时间轮询
因此在一些场景使用WebSocket来实现更加妥当,比如需要全双工通信的需求(聊天室、消息推送)。WebSocket有以下特点:
- 底层采用单一TCP链接,全双工通信
- 对代理、防火墙、路由器透明
- 无头部消息,cookie,身份验证
- 无安全开销
- 通过ping/pong消息保持心跳
WebSocket稳定性很高,它实质是一个TCP链接,且支持全双工。WebSocket是当下必学知识点。
服务端开发
package com.example;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioserverSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedFile;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.GlobalEventExecutor;
import javax.activation.MimetypesFileTypeMap;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.RandomAccessFile;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.net.URLDecoder;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.regex.Pattern;
/**
* netty http 文件下载 服务器
*/
public class MyWebSocketServer
int port;
public MyWebSocketServer(int port)
this.port = port;
public void start()
ServerBootstrap bootstrap = new ServerBootstrap();
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup work = new NioEventLoopGroup();
try
bootstrap.group(boss, work)
.handler(new LoggingHandler(LogLevel.DEBUG))
.channel(NioServerSocketChannel.class)
.childHandler(new WebSocketServerInitializer());
ChannelFuture f = bootstrap.bind(new InetSocketAddress(port)).sync();
System.out.println("http server started . port : " + port);
f.channel().closeFuture().sync();
catch (Exception e)
e.printStackTrace();
finally
boss.shutdownGracefully();
work.shutdownGracefully();
public static void main(String[] args)
MyWebSocketServer server = new MyWebSocketServer(8080);// 8081为启动端口
server.start();
class WebSocketServerInitializer extends ChannelInitializer<SocketChannel>
@Override
protected void initChannel(SocketChannel channel)
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new HttpServerCodec())// http 编解器
// http 消息聚合器 512*1024为接收的最大contentlength
.addLast("httpAggregator", new HttpObjectAggregator(512 * 1024))
// 支持异步发送大的码流(大的文件传输),但不占用过多的内存,防止java内存溢出
.addLast("http-chunked", new ChunkedWriteHandler())
.addLast(new WebSocketRequestHandler());// 请求处理器
class WebSocketRequestHandler extends SimpleChannelInboundHandler<Object>
private WebSocketServerHandshaker handshaker;
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception
System.out.println("收到消息:" + msg);
if (msg instanceof FullHttpRequest)
//以http请求形式接入,但是走的是websocket
handleHttpRequest(ctx, (FullHttpRequest) msg);
else if (msg instanceof WebSocketFrame)
//处理websocket客户端的消息
handlerWebSocketFrame(ctx, (WebSocketFrame) msg);
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception
ctx.flush();
/*
对WebSocket请求进行处理
*/
private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame)
// 判断是否关闭链路的指令
if (frame instanceof CloseWebSocketFrame)
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
// 判断是否ping消息,如果是,则构造pong消息返回。用于心跳检测
if (frame instanceof PingWebSocketFrame)
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
return;
// 本例程仅支持文本消息,不支持二进制消息
if (!(frame instanceof TextWebSocketFrame))
System.out.println("本例程仅支持文本消息,不支持二进制消息");
throw new UnsupportedOperationException(
String.format("%s frame types not supported", frame.getClass().getName()));
//处理客户端请求并返回应答消息
String request = ((TextWebSocketFrame) frame).text();
System.out.println("服务端收到:" + request);
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
JSONObject jsonObject = new JSONObject();
jsonObject.put("time",format.format(new Date()));
jsonObject.put("channelId",ctx.channel().id().asShortText());
jsonObject.put("request",request);
TextWebSocketFrame tws = new TextWebSocketFrame(jsonObject.toJSONString());
// 返回【谁发的发给谁】
ctx.channel().writeAndFlush(tws);
/**
* 唯一的一次http请求。
* 该方法用于处理websocket握手请求
*/
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req)
//如果HTTP解码失败,返回异常。要求Upgrade为websocket,过滤掉get/Post
if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade"))))
//若不是websocket方式,则创建BAD_REQUEST(400)的req,返回给客户端
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
return;
// 构造握手响应返回,本机测试
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
"ws://localhost:8080/websocket", null, false);
//通过工厂来创建WebSocketServerHandshaker实例
handshaker = wsFactory.newHandshaker(req);
if (handshaker == null)
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
else
/*
通过WebSocketServerHandshaker来构建握手响应消息返回给客户端。
同时将WebSocket相关编解码类添加到ChannelPipeline中,该功能需要阅读handshake的源码。
*/
handshaker.handshake(ctx.channel(), req);
/**
* 拒绝不合法的请求,并返回错误信息
*/
private static void sendHttpResponse(ChannelHandlerContext ctx,
FullHttpRequest req, DefaultFullHttpResponse res)
// 返回应答给客户端
if (res.status().code() != 200)
ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
res.content().writeBytes(buf);
buf.release();
HttpUtil.setContentLength(res, res.content().readableBytes());
ChannelFuture f = ctx.channel().writeAndFlush(res);
// 如果是非Keep-Alive,关闭连接
if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200)
f.addListener(ChannelFutureListener.CLOSE);
注释都已经写好,这里需要注意的几点是:
- netty的websocket协议是在HTTP协议基础之上完成的,要使用WebSocket协议,需要将HTTP请求头中添加
Upgrade:WebSocket
- WebSocket相关的编解码是在
handshaker.handshake(ctx.channel(), req);
中添加进去的。handshaker是WebSocketServerHandshaker的对象。handshake方法中建握手响应消息返回给客户端。同时将WebSocket相关编解码类添加到ChannelPipeline中。
handshake源码:
public final ChannelFuture handshake(Channel channel, FullHttpRequest req,
HttpHeaders responseHeaders, final ChannelPromise promise)
if (logger.isDebugEnabled())
logger.debug(" WebSocket version server handshake", channel, version());
//构造握手响应
FullHttpResponse response = newHandshakeResponse(req, responseHeaders);
//下面将channelpipeline中的HttpObjectAggregator和HttpContentCompressor移除,并且添加WebSocket编解码器newWebSocketEncoder和newWebsocketDecoder
ChannelPipeline p = channel.pipeline();
if (p.get(HttpObjectAggregator.class) != null)
p.remove(HttpObjectAggregator.class);
if (p.get(HttpContentCompressor.class) != null)
p.remove(HttpContentCompressor.class);
ChannelHandlerContext ctx = p.context(HttpRequestDecoder.class);
final String encoderName;
if (ctx == null)
// this means the user use an HttpServerCodec
ctx = p.context(HttpServerCodec.class);
if (ctx == null)
promise.setFailure(
new IllegalStateException("No HttpDecoder and no HttpServerCodec in the pipeline"));
return promise;
p.addBefore(ctx.name(), "wsencoder", newWebSocketEncoder());
p.addBefore(ctx.name(), "wsdecoder", newWebsocketDecoder());
encoderName = ctx.name();
else
p.replace(ctx.name(), "wsdecoder", newWebsocketDecoder());
encoderName = p.context(HttpResponseEncoder.class).name();
p.addBefore(encoderName, "wsencoder", newWebSocketEncoder());
//将response消息返回给客户端
channel.writeAndFlush(response).addListener(new ChannelFutureListener()
@Override
public void operationComplete(ChannelFuture future) throws Exception
if (future.isSuccess())
ChannelPipeline p = future.channel().pipeline();
p.remove(encoderName);
promise.setSuccess();
else
promise.setFailure(future.cause());
);
return promise;
客户端
客户端采用html实现,支持WebSocket的浏览器都可以,如果不支持会提示ERROR:您的浏览器不支持WebSocket!!
。
客户端的功能是想服务器发送请求消息,服务器返回服务器时间以及channelID。
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>my websocket client</title>
</head>
<body>
<textarea id="msgBoxs"></textarea><br>
待发送消息`:<input type="text" id="msg">
<input type="button" id="sendBtn" onclick="send()" value="发送">
<script type="application/javascript">
var socket ;
if(!window.WebSocket)
window.WebSocket = window.MozWebSocket;
if(window.WebSocket)
var msgBoxs = document.getElementById("msgBoxs")
var msgBox = document.getElementById("msg")
socket = new WebSocket("ws://localhost:8080/websocket")
socket.onopen = function (evt)
console.log("Connection open ...");
socket.send("Hello WebSocket!");
socket.onmessage = function (evt)
console.log("Received Message: ", evt.data)
msgBoxs.value = msgBoxs.value + "\\n" + evt.data
socket.onclose = function (evt)
console.log<以上是关于netty实现WebSocket协议的主要内容,如果未能解决你的问题,请参考以下文章
netty系列之:使用netty搭建websocket服务器
Netty框架之协议应用一(redis客户端简单实现以及使用websocket实现弹幕功能)