Netty-源码分析WebSocketClient客户端
Posted 征服.刘华强
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty-源码分析WebSocketClient客户端相关的知识,希望对你有一定的参考价值。
WebSocketClient客户端使用Netty实现的源码分析
EventLoopGroup workerGroup = new NioEventLoopGroup();
try
Bootstrap b = new Bootstrap(); // (1)
b.group(workerGroup); // (2)
b.channel(NiosocketChannel.class); // (3)
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new ChannelInitializer<SocketChannel>()
@Override
public void initChannel(SocketChannel ch) throws Exception
//处理http请求的编解码器
ch.pipeline().addLast("http-codec", new HttpClientCodec());
ch.pipeline().addLast("aggregator", new HttpObjectAggregator(655360));
ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
ch.pipeline().addLast("ws", new WebSocketClientProtocolHandler(URI.create("ws://192.168.80.110:8080/websocket"),
WebSocketVersion.V08, "", true, new DefaultHttpHeaders(), Integer.MAX_VALUE));
ch.pipeline().addLast("handler", new WsClientHandler());
);
ChannelFuture f = b.connect("192.168.80.110", 8080).sync();
f.channel().closeFuture().sync();
finally
workerGroup.shutdownGracefully();
最核心的地方在于这个Handler。
ch.pipeline().addLast("ws", new WebSocketClientProtocolHandler(URI.create("ws://192.168.80.110:8080/websocket"),
WebSocketVersion.V08, "", true, new DefaultHttpHeaders(), Integer.MAX_VALUE));
WebSocketClientProtocolHandler关键代码片段如下:
@Override
public void handlerAdded(ChannelHandlerContext ctx)
ChannelPipeline cp = ctx.pipeline();
if (cp.get(WebSocketClientProtocolHandshakeHandler.class) == null)
// Add the WebSocketClientProtocolHandshakeHandler before this one.
ctx.pipeline().addBefore(ctx.name(), WebSocketClientProtocolHandshakeHandler.class.getName(),
new WebSocketClientProtocolHandshakeHandler(handshaker, handshakeTimeoutMillis));
if (cp.get(Utf8FrameValidator.class) == null)
// Add the UFT8 checking before this one.
ctx.pipeline().addBefore(ctx.name(), Utf8FrameValidator.class.getName(),
new Utf8FrameValidator());
在handlerAdded事件中,动态的挂载了WebSocketClientProtocolHandshakeHandler到ppLine中。
WebSocketClientProtocolHandshakeHandler关键代码
@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception
super.channelActive(ctx);
handshaker.handshake(ctx.channel()).addListener(new ChannelFutureListener()
@Override
public void operationComplete(ChannelFuture future) throws Exception
if (!future.isSuccess())
handshakePromise.tryFailure(future.cause());
ctx.fireExceptionCaught(future.cause());
else
ctx.fireUserEventTriggered(
WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_ISSUED);
);
applyHandshakeTimeout();
在channelActive事件(tcp连接建立后触发),会发送http握手请求,升级websocket协议。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
if (!(msg instanceof FullHttpResponse))
ctx.fireChannelRead(msg);
return;
FullHttpResponse response = (FullHttpResponse) msg;
try
if (!handshaker.isHandshakeComplete())
handshaker.finishHandshake(ctx.channel(), response);
handshakePromise.trySuccess();
ctx.fireUserEventTriggered(
WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE);
ctx.pipeline().remove(this);
return;
throw new IllegalStateException("WebSocketClientHandshaker should have been non finished yet");
finally
response.release();
在上面方法中,读取服务器端返回的Http响应,完成ws握手,移除当前Handler。
public final ChannelFuture handshake(Channel channel, final ChannelPromise promise)
ChannelPipeline pipeline = channel.pipeline();
HttpResponseDecoder decoder = pipeline.get(HttpResponseDecoder.class);
if (decoder == null)
HttpClientCodec codec = pipeline.get(HttpClientCodec.class);
if (codec == null)
promise.setFailure(new IllegalStateException("ChannelPipeline does not contain " +
"an HttpResponseDecoder or HttpClientCodec"));
return promise;
FullHttpRequest request = newHandshakeRequest();
channel.writeAndFlush(request).addListener(new ChannelFutureListener()
@Override
public void operationComplete(ChannelFuture future)
if (future.isSuccess())
ChannelPipeline p = future.channel().pipeline();
ChannelHandlerContext ctx = p.context(HttpRequestEncoder.class);
if (ctx == null)
ctx = p.context(HttpClientCodec.class);
if (ctx == null)
promise.setFailure(new IllegalStateException("ChannelPipeline does not contain " +
"an HttpRequestEncoder or HttpClientCodec"));
return;
p.addAfter(ctx.name(), "ws-encoder", newWebSocketEncoder());
promise.setSuccess();
else
promise.setFailure(future.cause());
);
return promise;
发送http请求到服务器端(ws客户端握手请求)
挂载websocket编码器。
@Override
protected FullHttpRequest newHandshakeRequest()
URI wsURL = uri();
// Get 16 bit nonce and base 64 encode it
byte[] nonce = WebSocketUtil.randomBytes(16);
String key = WebSocketUtil.base64(nonce);
String acceptSeed = key + MAGIC_GUID;
byte[] sha1 = WebSocketUtil.sha1(acceptSeed.getBytes(CharsetUtil.US_ASCII));
expectedChallengeResponseString = WebSocketUtil.base64(sha1);
if (logger.isDebugEnabled())
logger.debug(
"WebSocket version 08 client handshake key: , expected response: ",
key, expectedChallengeResponseString);
// Format request
FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, upgradeUrl(wsURL),
Unpooled.EMPTY_BUFFER);
HttpHeaders headers = request.headers();
if (customHeaders != null)
headers.add(customHeaders);
headers.set(HttpHeaderNames.UPGRADE, HttpHeaderValues.WEBSOCKET)
.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE)
.set(HttpHeaderNames.SEC_WEBSOCKET_KEY, key)
.set(HttpHeaderNames.HOST, websocketHostValue(wsURL));
if (!headers.contains(HttpHeaderNames.SEC_WEBSOCKET_ORIGIN))
headers.set(HttpHeaderNames.SEC_WEBSOCKET_ORIGIN, websocketOriginValue(wsURL));
String expectedSubprotocol = expectedSubprotocol();
if (expectedSubprotocol != null && !expectedSubprotocol.isEmpty())
headers.set(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL, expectedSubprotocol);
headers.set(HttpHeaderNames.SEC_WEBSOCKET_VERSION, "8");
return request;
构造握手的http请求信息。
QQ群:212320390
以上是关于Netty-源码分析WebSocketClient客户端的主要内容,如果未能解决你的问题,请参考以下文章