Netty-源码分析WebSocketClient客户端

Posted 征服.刘华强

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty-源码分析WebSocketClient客户端相关的知识,希望对你有一定的参考价值。

WebSocketClient客户端使用Netty实现的源码分析

EventLoopGroup workerGroup = new NioEventLoopGroup();
        try 
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            b.channel(NiosocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
            b.handler(new ChannelInitializer<SocketChannel>() 
                @Override
                public void initChannel(SocketChannel ch) throws Exception 
                    //处理http请求的编解码器
                    ch.pipeline().addLast("http-codec", new HttpClientCodec());
                    ch.pipeline().addLast("aggregator", new HttpObjectAggregator(655360));
                    ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
                    ch.pipeline().addLast("ws", new WebSocketClientProtocolHandler(URI.create("ws://192.168.80.110:8080/websocket"),
                            WebSocketVersion.V08, "", true, new DefaultHttpHeaders(), Integer.MAX_VALUE));
                    ch.pipeline().addLast("handler", new WsClientHandler());
                
            );
            ChannelFuture f = b.connect("192.168.80.110", 8080).sync();
            f.channel().closeFuture().sync();
         finally 
            workerGroup.shutdownGracefully();
        

最核心的地方在于这个Handler。

 ch.pipeline().addLast("ws", new WebSocketClientProtocolHandler(URI.create("ws://192.168.80.110:8080/websocket"),
                            WebSocketVersion.V08, "", true, new DefaultHttpHeaders(), Integer.MAX_VALUE));
WebSocketClientProtocolHandler关键代码片段如下:
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) 
        ChannelPipeline cp = ctx.pipeline();
        if (cp.get(WebSocketClientProtocolHandshakeHandler.class) == null) 
            // Add the WebSocketClientProtocolHandshakeHandler before this one.
            ctx.pipeline().addBefore(ctx.name(), WebSocketClientProtocolHandshakeHandler.class.getName(),
                                     new WebSocketClientProtocolHandshakeHandler(handshaker, handshakeTimeoutMillis));
        
        if (cp.get(Utf8FrameValidator.class) == null) 
            // Add the UFT8 checking before this one.
            ctx.pipeline().addBefore(ctx.name(), Utf8FrameValidator.class.getName(),
                    new Utf8FrameValidator());
        
    

在handlerAdded事件中,动态的挂载了WebSocketClientProtocolHandshakeHandler到ppLine中。

 

 

WebSocketClientProtocolHandshakeHandler关键代码
@Override
    public void channelActive(final ChannelHandlerContext ctx) throws Exception 
        super.channelActive(ctx);
        handshaker.handshake(ctx.channel()).addListener(new ChannelFutureListener() 
            @Override
            public void operationComplete(ChannelFuture future) throws Exception 
                if (!future.isSuccess()) 
                    handshakePromise.tryFailure(future.cause());
                    ctx.fireExceptionCaught(future.cause());
                 else 
                    ctx.fireUserEventTriggered(
                            WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_ISSUED);
                
            
        );
        applyHandshakeTimeout();
    

在channelActive事件(tcp连接建立后触发),会发送http握手请求,升级websocket协议。

 @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception 
        if (!(msg instanceof FullHttpResponse)) 
            ctx.fireChannelRead(msg);
            return;
        

        FullHttpResponse response = (FullHttpResponse) msg;
        try 
            if (!handshaker.isHandshakeComplete()) 
                handshaker.finishHandshake(ctx.channel(), response);
                handshakePromise.trySuccess();
                ctx.fireUserEventTriggered(
                        WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE);
                ctx.pipeline().remove(this);
                return;
            
            throw new IllegalStateException("WebSocketClientHandshaker should have been non finished yet");
         finally 
            response.release();
        
    

在上面方法中,读取服务器端返回的Http响应,完成ws握手,移除当前Handler。

 

public final ChannelFuture handshake(Channel channel, final ChannelPromise promise) 
        ChannelPipeline pipeline = channel.pipeline();
        HttpResponseDecoder decoder = pipeline.get(HttpResponseDecoder.class);
        if (decoder == null) 
            HttpClientCodec codec = pipeline.get(HttpClientCodec.class);
            if (codec == null) 
               promise.setFailure(new IllegalStateException("ChannelPipeline does not contain " +
                       "an HttpResponseDecoder or HttpClientCodec"));
               return promise;
            
        

        FullHttpRequest request = newHandshakeRequest();

        channel.writeAndFlush(request).addListener(new ChannelFutureListener() 
            @Override
            public void operationComplete(ChannelFuture future) 
                if (future.isSuccess()) 
                    ChannelPipeline p = future.channel().pipeline();
                    ChannelHandlerContext ctx = p.context(HttpRequestEncoder.class);
                    if (ctx == null) 
                        ctx = p.context(HttpClientCodec.class);
                    
                    if (ctx == null) 
                        promise.setFailure(new IllegalStateException("ChannelPipeline does not contain " +
                                "an HttpRequestEncoder or HttpClientCodec"));
                        return;
                    
                    p.addAfter(ctx.name(), "ws-encoder", newWebSocketEncoder());

                    promise.setSuccess();
                 else 
                    promise.setFailure(future.cause());
                
            
        );
        return promise;
    

发送http请求到服务器端(ws客户端握手请求)

挂载websocket编码器。

 

   @Override
    protected FullHttpRequest newHandshakeRequest() 
        URI wsURL = uri();

        // Get 16 bit nonce and base 64 encode it
        byte[] nonce = WebSocketUtil.randomBytes(16);
        String key = WebSocketUtil.base64(nonce);

        String acceptSeed = key + MAGIC_GUID;
        byte[] sha1 = WebSocketUtil.sha1(acceptSeed.getBytes(CharsetUtil.US_ASCII));
        expectedChallengeResponseString = WebSocketUtil.base64(sha1);

        if (logger.isDebugEnabled()) 
            logger.debug(
                    "WebSocket version 08 client handshake key: , expected response: ",
                    key, expectedChallengeResponseString);
        

        // Format request
        FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, upgradeUrl(wsURL),
                Unpooled.EMPTY_BUFFER);
        HttpHeaders headers = request.headers();

        if (customHeaders != null) 
            headers.add(customHeaders);
        

        headers.set(HttpHeaderNames.UPGRADE, HttpHeaderValues.WEBSOCKET)
               .set(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE)
               .set(HttpHeaderNames.SEC_WEBSOCKET_KEY, key)
               .set(HttpHeaderNames.HOST, websocketHostValue(wsURL));

        if (!headers.contains(HttpHeaderNames.SEC_WEBSOCKET_ORIGIN)) 
            headers.set(HttpHeaderNames.SEC_WEBSOCKET_ORIGIN, websocketOriginValue(wsURL));
        

        String expectedSubprotocol = expectedSubprotocol();
        if (expectedSubprotocol != null && !expectedSubprotocol.isEmpty()) 
            headers.set(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL, expectedSubprotocol);
        

        headers.set(HttpHeaderNames.SEC_WEBSOCKET_VERSION, "8");
        return request;
    

构造握手的http请求信息。

 

QQ群:212320390

以上是关于Netty-源码分析WebSocketClient客户端的主要内容,如果未能解决你的问题,请参考以下文章

Netty源码分析(七) PoolChunk

源码分析Netty4专栏

源码分析Netty4专栏

Netty-源码分析LineBasedFrameDecoder

Netty源码分析:read

Netty源码分析:read