netty实现WebSocket协议

Posted 快乐崇拜234

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了netty实现WebSocket协议相关的知识,希望对你有一定的参考价值。

文章目录

注:更多netty相关文章请访问博主专栏: netty专栏

WebSocket协议

一般web应用都是使用的HTTP协议。HTTP协议有以下特点:

  • 支持客户端-服务端模式
  • 使用简单:只需要知道服务端URL,携带参数发送请求即可
  • 支持多种传输数据类型,由消息头中content-type标识
  • 无状态,使得HTTP服务轻量级

HTTP协议也存在一些缺点:

  • 半双工通信:同一时刻,数据只能往同一方向传输。比如向服务器发送消息时,服务器此时不可以向客户端发送消息。(不过目前HTTP2已经支持了全双工通信)
  • HTTP消息冗长:HTTP消息包含消息头,消息体,换行符等。且大多采用文本传输。所以HTTP消息会有很多冗余消息并且消息占用字节数大,消耗过多的带宽
  • 针对服务器推送的黑客攻击,如长时间轮询

因此在一些场景使用WebSocket来实现更加妥当,比如需要全双工通信的需求(聊天室、消息推送)。WebSocket有以下特点:

  • 底层采用单一TCP链接,全双工通信
  • 对代理、防火墙、路由器透明
  • 无头部消息,cookie,身份验证
  • 无安全开销
  • 通过ping/pong消息保持心跳

WebSocket稳定性很高,它实质是一个TCP链接,且支持全双工。WebSocket是当下必学知识点。

服务端开发

package com.example;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioserverSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedFile;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.GlobalEventExecutor;

import javax.activation.MimetypesFileTypeMap;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.RandomAccessFile;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.net.URLDecoder;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.regex.Pattern;

/**
 * netty  http 文件下载 服务器
 */
public class MyWebSocketServer 

    int port;

    public MyWebSocketServer(int port) 
        this.port = port;
    

    public void start() 
        ServerBootstrap bootstrap = new ServerBootstrap();
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup work = new NioEventLoopGroup();
        try 
            bootstrap.group(boss, work)
                    .handler(new LoggingHandler(LogLevel.DEBUG))
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new WebSocketServerInitializer());

            ChannelFuture f = bootstrap.bind(new InetSocketAddress(port)).sync();
            System.out.println("http server started . port : " + port);
            f.channel().closeFuture().sync();
         catch (Exception e) 
            e.printStackTrace();
         finally 
            boss.shutdownGracefully();
            work.shutdownGracefully();
        
    

    public static void main(String[] args) 
        MyWebSocketServer server = new MyWebSocketServer(8080);// 8081为启动端口
        server.start();
    


class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> 

    @Override
    protected void initChannel(SocketChannel channel) 
        ChannelPipeline pipeline = channel.pipeline();
        pipeline.addLast(new HttpServerCodec())// http 编解器
                // http 消息聚合器  512*1024为接收的最大contentlength
                .addLast("httpAggregator", new HttpObjectAggregator(512 * 1024))
                // 支持异步发送大的码流(大的文件传输),但不占用过多的内存,防止java内存溢出
                .addLast("http-chunked", new ChunkedWriteHandler())
                .addLast(new WebSocketRequestHandler());// 请求处理器
    


class WebSocketRequestHandler extends SimpleChannelInboundHandler<Object> 

    private WebSocketServerHandshaker handshaker;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception 
        System.out.println("收到消息:" + msg);
        if (msg instanceof FullHttpRequest) 
            //以http请求形式接入,但是走的是websocket
            handleHttpRequest(ctx, (FullHttpRequest) msg);
         else if (msg instanceof WebSocketFrame) 
            //处理websocket客户端的消息
            handlerWebSocketFrame(ctx, (WebSocketFrame) msg);
        
    

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception 
        ctx.flush();
    

    /*
    对WebSocket请求进行处理
     */
    private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) 
        // 判断是否关闭链路的指令
        if (frame instanceof CloseWebSocketFrame) 
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            return;
        
        // 判断是否ping消息,如果是,则构造pong消息返回。用于心跳检测
        if (frame instanceof PingWebSocketFrame) 
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            return;
        

        // 本例程仅支持文本消息,不支持二进制消息
        if (!(frame instanceof TextWebSocketFrame)) 
            System.out.println("本例程仅支持文本消息,不支持二进制消息");
            throw new UnsupportedOperationException(
                    String.format("%s frame types not supported", frame.getClass().getName()));
        

        //处理客户端请求并返回应答消息
        String request = ((TextWebSocketFrame) frame).text();
        System.out.println("服务端收到:" + request);
        
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("time",format.format(new Date()));
        jsonObject.put("channelId",ctx.channel().id().asShortText());
        jsonObject.put("request",request);

        TextWebSocketFrame tws = new TextWebSocketFrame(jsonObject.toJSONString());

        // 返回【谁发的发给谁】
         ctx.channel().writeAndFlush(tws);
    

    /**
     * 唯一的一次http请求。
     * 该方法用于处理websocket握手请求
     */
    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) 
        //如果HTTP解码失败,返回异常。要求Upgrade为websocket,过滤掉get/Post
        if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) 
            //若不是websocket方式,则创建BAD_REQUEST(400)的req,返回给客户端
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(
                    HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
            return;
        

//        构造握手响应返回,本机测试
        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
                "ws://localhost:8080/websocket", null, false);
        //通过工厂来创建WebSocketServerHandshaker实例
        handshaker = wsFactory.newHandshaker(req);
        if (handshaker == null) 
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
         else 
            /*
            通过WebSocketServerHandshaker来构建握手响应消息返回给客户端。
            同时将WebSocket相关编解码类添加到ChannelPipeline中,该功能需要阅读handshake的源码。
             */
            handshaker.handshake(ctx.channel(), req);
        
    

    /**
     * 拒绝不合法的请求,并返回错误信息
     */
    private static void sendHttpResponse(ChannelHandlerContext ctx,
                                         FullHttpRequest req, DefaultFullHttpResponse res) 
        // 返回应答给客户端
        if (res.status().code() != 200) 
            ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();
            HttpUtil.setContentLength(res, res.content().readableBytes());
        

        ChannelFuture f = ctx.channel().writeAndFlush(res);

        // 如果是非Keep-Alive,关闭连接
        if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) 
            f.addListener(ChannelFutureListener.CLOSE);
        
    

注释都已经写好,这里需要注意的几点是:

  • netty的websocket协议是在HTTP协议基础之上完成的,要使用WebSocket协议,需要将HTTP请求头中添加Upgrade:WebSocket
  • WebSocket相关的编解码是在handshaker.handshake(ctx.channel(), req);中添加进去的。handshaker是WebSocketServerHandshaker的对象。handshake方法中建握手响应消息返回给客户端。同时将WebSocket相关编解码类添加到ChannelPipeline中。

handshake源码:

public final ChannelFuture handshake(Channel channel, FullHttpRequest req,
                                    HttpHeaders responseHeaders, final ChannelPromise promise) 

    if (logger.isDebugEnabled()) 
        logger.debug(" WebSocket version  server handshake", channel, version());
    
    //构造握手响应
    FullHttpResponse response = newHandshakeResponse(req, responseHeaders);
//下面将channelpipeline中的HttpObjectAggregator和HttpContentCompressor移除,并且添加WebSocket编解码器newWebSocketEncoder和newWebsocketDecoder
    ChannelPipeline p = channel.pipeline();
    if (p.get(HttpObjectAggregator.class) != null) 
        p.remove(HttpObjectAggregator.class);
    
    if (p.get(HttpContentCompressor.class) != null) 
        p.remove(HttpContentCompressor.class);
    
    ChannelHandlerContext ctx = p.context(HttpRequestDecoder.class);
    final String encoderName;
    if (ctx == null) 
        // this means the user use an HttpServerCodec
        ctx = p.context(HttpServerCodec.class);
        if (ctx == null) 
            promise.setFailure(
                    new IllegalStateException("No HttpDecoder and no HttpServerCodec in the pipeline"));
            return promise;
        
        p.addBefore(ctx.name(), "wsencoder", newWebSocketEncoder());
        p.addBefore(ctx.name(), "wsdecoder", newWebsocketDecoder());
        encoderName = ctx.name();
     else 
        p.replace(ctx.name(), "wsdecoder", newWebsocketDecoder());

        encoderName = p.context(HttpResponseEncoder.class).name();
        p.addBefore(encoderName, "wsencoder", newWebSocketEncoder());
    
//将response消息返回给客户端
    channel.writeAndFlush(response).addListener(new ChannelFutureListener() 
        @Override
        public void operationComplete(ChannelFuture future) throws Exception 
            if (future.isSuccess()) 
                ChannelPipeline p = future.channel().pipeline();
                p.remove(encoderName);
                promise.setSuccess();
             else 
                promise.setFailure(future.cause());
            
        
    );
    return promise;

客户端

客户端采用html实现,支持WebSocket的浏览器都可以,如果不支持会提示ERROR:您的浏览器不支持WebSocket!!
客户端的功能是想服务器发送请求消息,服务器返回服务器时间以及channelID。

<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <title>my websocket client</title>
</head>
<body>
<textarea id="msgBoxs"></textarea><br>
待发送消息`:<input type="text" id="msg">
<input type="button" id="sendBtn" onclick="send()" value="发送">
<script type="application/javascript">
    var socket ;
    if(!window.WebSocket)
        window.WebSocket = window.MozWebSocket;
    

    if(window.WebSocket)
        var msgBoxs = document.getElementById("msgBoxs")
        var msgBox = document.getElementById("msg")

        socket = new WebSocket("ws://localhost:8080/websocket")
        socket.onopen = function (evt) 
            console.log("Connection open ...");
            socket.send("Hello WebSocket!");
        

        socket.onmessage = function (evt) 
            console.log("Received Message: ", evt.data)
            msgBoxs.value =  msgBoxs.value + "\\n" + evt.data
        

        socket.onclose = function (evt) 
            console.log<

以上是关于netty实现WebSocket协议的主要内容,如果未能解决你的问题,请参考以下文章

netty系列之:使用netty搭建websocket服务器

netty实现websocket请求实战

WebSocket协议探究:MQTT子协议

Netty框架之协议应用一(redis客户端简单实现以及使用websocket实现弹幕功能)

Netty框架之协议应用一(redis客户端简单实现以及使用websocket实现弹幕功能)

Netty框架之协议应用一(redis客户端简单实现以及使用websocket实现弹幕功能)