本人对于netty框架的一些理解,怎么与网站上的websock建立连接
Posted kangniuniu
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了本人对于netty框架的一些理解,怎么与网站上的websock建立连接相关的知识,希望对你有一定的参考价值。
在Netty的里面有一个Boss,他开了一家公司(开启一个服务端口)对外提供业务服务,它手下有一群做事情的workers。Boss一直对外宣传自己公司提供的业务,并且接受(accept)有需要的客户(client),当一位客户找到Boss说需要他公司提供的业务,Boss便会为这位客户安排一个worker,这个worker全程为这位客户服务(read/write)。如果公司业务繁忙,一个worker可能会为多个客户进行服务。这就是Netty里面Boss和worker之间的关系。下面看看Netty是如何让Boss和Worker进行协助的。
private EventLoopGroup boss = new NioEventLoopGroup(); private EventLoopGroup work = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap() .group(boss, work) .channel(NioserverSocketChannel.class) .localAddress(new InetSocketAddress(nettyPort)) //保持长连接 .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new HeartbeatInitializer()); ChannelFuture future = bootstrap.bind().sync(); if (future.isSuccess()) log.info("启动 Netty 成功");
上诉代码初始化了一条netty的服务,那么如何初始话的写在类HeartbeatInitializer里面
public class HeartbeatInitializer extends ChannelInitializer<Channel> @Override protected void initChannel(Channel ch) throws Exception ch.pipeline() //五秒没有收到消息 将IdleStateHandler 添加到 ChannelPipeline 拦截器中 .addLast(new IdleStateHandler(5, 0, 0)) // HttpServerCodec:将请求和应答消息解码为HTTP消息 .addLast("http-codec",new HttpServerCodec()) // HttpObjectAggregator:将HTTP消息的多个部分合成一条完整的HTTP消息 .addLast("aggregator",new HttpObjectAggregator(65536)) // ChunkedWriteHandler:向客户端发送html5文件 .addLast("http-chunked",new ChunkedWriteHandler()) .addLast(new HeartBeatSimpleHandle());
上述文件设置了 拦截器,解码和解码合并,还有响应,最后一个new HeartBeatSimpleHandle()用来处理请求
public class HeartBeatSimpleHandle extends SimpleChannelInboundHandler<Object> private WebSocketServerHandshaker handShaker; /** * 取消绑定 * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception NettySocketHolder.remove((NioSocketChannel) ctx.channel()); @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception super.userEventTriggered(ctx, evt); @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception // 传统的HTTP接入 if (msg instanceof FullHttpRequest) FullHttpRequest req = (FullHttpRequest) msg; handleHttpRequest(ctx, req); //获取url后置参数 String uri=req.uri(); QueryStringDecoder queryStringDecoder = new QueryStringDecoder(uri); Map<String, List<String>> parameters = queryStringDecoder.parameters(); Integer userId = Integer.valueOf(parameters.get("userId").get(0)); // 存储当前登录ctx if(NettySocketHolder.get((long)userId) == null) NettySocketHolder.put((long)userId, (NioSocketChannel) ctx.channel()); // WebSocket接入 else if (msg instanceof WebSocketFrame) if ("live".equals(ctx.channel().attr(AttributeKey.valueOf("type")).get())) handlerWebSocketFrame(ctx, (WebSocketFrame) msg); log.info("收到msg=,id=", msg); //保存客户端与 Channel 之间的关系 private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) // 如果HTTP解码失败,返回HTTP异常 if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); return; //获取url后置参数 HttpMethod method=req.method(); String uri=req.uri(); QueryStringDecoder queryStringDecoder = new QueryStringDecoder(uri); Map<String, List<String>> parameters = queryStringDecoder.parameters(); if(method==HttpMethod.GET&&"/websocket".equals(uri)) //...处理 ctx.channel().attr(AttributeKey.valueOf("type")).set("live"); // 构造握手响应返回,本机测试 WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory( "ws://"+req.headers().get(HttpHeaderNames.HOST)+uri, null, false); handShaker = wsFactory.newHandshaker(req); if (handShaker == null) WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); else handShaker.handshake(ctx.channel(), req); private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) // 返回应答给客户端 if (res.status().code() != 200) ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release(); // 如果是非Keep-Alive,关闭连接 ChannelFuture f = ctx.channel().writeAndFlush(res); if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) f.addListener(ChannelFutureListener.CLOSE); private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) // 判断是否关闭链路的指令 if (frame instanceof CloseWebSocketFrame) handShaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
HeartBeatSimpleHandle 用与处理管道(Channel)的读取工作,将管道储存在NettySocketHolder里面,用的时候取出。WebSocketServerHandshaker用于响应客户端的响应
var websocket; $(function() $.ajax( type: "POST", url: "/login", data: userId: 2, account: "135541", userName: "123" , contentType: "application/x-www-form-urlencoded; charset=utf-8", dataType: "json", success: function (result) websoketCannel(result.data.userId, result.data.account, result.data.userName); ); ); function websoketCannel(userId, account, userName) //如果浏览器支持WebSocket if(window.WebSocket) websocket = new WebSocket("ws://localhost:1212/websocket?userId="+ userId +"&account="+ account +"&userName="+ userName +""); //获得WebSocket对象 //当有消息过来的时候触发 websocket.onmessage = function(event) var data = JSON.parse(event.data); $("#message").text(data.msg); ; //连接关闭的时候触发 websocket.onclose = function(event) console.log("断开连接"); ; //连接打开的时候触发 websocket.onopen = function(event) console.log("建立连接"); else alert("浏览器不支持WebSocket"); function sendMsg(msg) //发送消息 if(window.WebSocket) if(websocket.readyState == WebSocket.OPEN) //如果WebSocket是打开状态 websocket.send(msg); //send()发送消息 else return;
上面代码是客户端如何连netty
以上是关于本人对于netty框架的一些理解,怎么与网站上的websock建立连接的主要内容,如果未能解决你的问题,请参考以下文章