本人对于netty框架的一些理解,怎么与网站上的websock建立连接

Posted kangniuniu

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了本人对于netty框架的一些理解,怎么与网站上的websock建立连接相关的知识,希望对你有一定的参考价值。

在Netty的里面有一个Boss,他开了一家公司(开启一个服务端口)对外提供业务服务,它手下有一群做事情的workers。Boss一直对外宣传自己公司提供的业务,并且接受(accept)有需要的客户(client),当一位客户找到Boss说需要他公司提供的业务,Boss便会为这位客户安排一个worker,这个worker全程为这位客户服务(read/write)。如果公司业务繁忙,一个worker可能会为多个客户进行服务。这就是Netty里面Boss和worker之间的关系。下面看看Netty是如何让Boss和Worker进行协助的。


private EventLoopGroup boss = new NioEventLoopGroup();
private EventLoopGroup work = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap()
        .group(boss, work)
        .channel(NioserverSocketChannel.class)
        .localAddress(new InetSocketAddress(nettyPort))
        //保持长连接
        .childOption(ChannelOption.SO_KEEPALIVE, true)
        .childHandler(new HeartbeatInitializer());
ChannelFuture future = bootstrap.bind().sync();
if (future.isSuccess()) 
    log.info("启动 Netty 成功");


上诉代码初始化了一条netty的服务,那么如何初始话的写在类HeartbeatInitializer里面

public class HeartbeatInitializer extends ChannelInitializer<Channel> 
    @Override
    protected void initChannel(Channel ch) throws Exception 
        ch.pipeline()
                //五秒没有收到消息 将IdleStateHandler 添加到 ChannelPipeline 拦截器中
                .addLast(new IdleStateHandler(5, 0, 0))

                // HttpServerCodec:将请求和应答消息解码为HTTP消息
                .addLast("http-codec",new HttpServerCodec())
                // HttpObjectAggregator:将HTTP消息的多个部分合成一条完整的HTTP消息
                .addLast("aggregator",new HttpObjectAggregator(65536))
                // ChunkedWriteHandler:向客户端发送html5文件
                .addLast("http-chunked",new ChunkedWriteHandler())

                .addLast(new HeartBeatSimpleHandle());
    

上述文件设置了 拦截器,解码和解码合并,还有响应,最后一个new HeartBeatSimpleHandle()用来处理请求

public class HeartBeatSimpleHandle extends SimpleChannelInboundHandler<Object> 

    private WebSocketServerHandshaker handShaker;

    /**
     * 取消绑定
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception 

        NettySocketHolder.remove((NioSocketChannel) ctx.channel());
    

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception 
        super.userEventTriggered(ctx, evt);
    

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception 
        // 传统的HTTP接入
        if (msg instanceof FullHttpRequest) 
            FullHttpRequest req = (FullHttpRequest) msg;
            handleHttpRequest(ctx, req);
            //获取url后置参数
            String uri=req.uri();
            QueryStringDecoder queryStringDecoder = new QueryStringDecoder(uri);
            Map<String, List<String>> parameters = queryStringDecoder.parameters();
            Integer userId = Integer.valueOf(parameters.get("userId").get(0));
            // 存储当前登录ctx
            if(NettySocketHolder.get((long)userId) == null)
                NettySocketHolder.put((long)userId, (NioSocketChannel) ctx.channel());
            
            // WebSocket接入
         else if (msg instanceof WebSocketFrame) 
            if ("live".equals(ctx.channel().attr(AttributeKey.valueOf("type")).get())) 
                handlerWebSocketFrame(ctx, (WebSocketFrame) msg);
            
            log.info("收到msg=,id=", msg);
            //保存客户端与 Channel 之间的关系
        
    

    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) 
        // 如果HTTP解码失败,返回HTTP异常
        if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) 
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
            return;
        
        //获取url后置参数
        HttpMethod method=req.method();
        String uri=req.uri();
        QueryStringDecoder queryStringDecoder = new QueryStringDecoder(uri);
        Map<String, List<String>> parameters = queryStringDecoder.parameters();
        if(method==HttpMethod.GET&&"/websocket".equals(uri))
            //...处理
            ctx.channel().attr(AttributeKey.valueOf("type")).set("live");
        
        // 构造握手响应返回,本机测试
        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
                "ws://"+req.headers().get(HttpHeaderNames.HOST)+uri, null, false);
        handShaker = wsFactory.newHandshaker(req);
        if (handShaker == null) 
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
         else 
            handShaker.handshake(ctx.channel(), req);
        
    

    private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) 
        // 返回应答给客户端
        if (res.status().code() != 200) 
            ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();
        
        // 如果是非Keep-Alive,关闭连接
        ChannelFuture f = ctx.channel().writeAndFlush(res);
        if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) 
            f.addListener(ChannelFutureListener.CLOSE);
        
    

    private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) 
        // 判断是否关闭链路的指令
        if (frame instanceof CloseWebSocketFrame) 
            handShaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
        
    

HeartBeatSimpleHandle 用与处理管道(Channel)的读取工作,将管道储存在NettySocketHolder里面,用的时候取出。WebSocketServerHandshaker用于响应客户端的响应

  var websocket;

    $(function()
        $.ajax(
            type: "POST",
            url: "/login",
            data: 
                userId: 2,
                account: "135541",
                userName: "123"
            ,
            contentType: "application/x-www-form-urlencoded; charset=utf-8",
            dataType: "json",
            success: function (result) 
                websoketCannel(result.data.userId, result.data.account, result.data.userName);
            
        );
    );

    function websoketCannel(userId, account, userName)

        //如果浏览器支持WebSocket
        if(window.WebSocket)
            websocket = new WebSocket("ws://localhost:1212/websocket?userId="+ userId +"&account="+ account +"&userName="+ userName +"");  //获得WebSocket对象

            //当有消息过来的时候触发
            websocket.onmessage = function(event)
                var data = JSON.parse(event.data);
                $("#message").text(data.msg);
            ;

            //连接关闭的时候触发
            websocket.onclose = function(event)
                console.log("断开连接");
            ;

            //连接打开的时候触发
            websocket.onopen = function(event)
                console.log("建立连接");
            
        else
            alert("浏览器不支持WebSocket");
        
    

    function sendMsg(msg)  //发送消息
        if(window.WebSocket)
            if(websocket.readyState == WebSocket.OPEN)  //如果WebSocket是打开状态
                websocket.send(msg); //send()发送消息
            
        else
            return;
        
    

上面代码是客户端如何连netty

 

以上是关于本人对于netty框架的一些理解,怎么与网站上的websock建立连接的主要内容,如果未能解决你的问题,请参考以下文章

记一次Netty堆外内存泄露排查过程

浅析Netty

Netty原理浅析

全流程分析Netty设计思路与实践

高性能底层怎么运作?一文帮你吃透Netty架构原理

TCP/IP的底层队列是如何实现的?