分布式专题|都说netty入门很难,那是因为你没有看我的文章!

Posted 乐哉开讲

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式专题|都说netty入门很难,那是因为你没有看我的文章!相关的知识,希望对你有一定的参考价值。


在写代码之前,我们先看下netty的线程模型,这比那固定格式的代码将会更有趣,看完线程模型,你就知道netty写的那几段固定代码的意义了。

线程模型图

分布式专题|都说netty入门很难,那是因为你没有看我的文章!
在这里插入图片描述

这个线程模型图里面大概包含了这几个组件:bossGroup,workGroup,selectot(accept),selector(读写),pipline,NiosocketChannel,NioServerSocketChannel;

  • bossgroup,workgroup

    在netty中,处理客户端的请求会被注册在两类selector上,这两类selector分别对应两个线程池bossGroup和workgroup,bossGroup主要处理客户端与服务端建立连接注册的selector;workgroup看名字也知道了,是用来干活的线程池,它主要负责处理客户端读事件的selector逻辑;在创建netty的第一行代码中,就是创建这两个线程池,一般情况下bossgroup会设置成一个线程,workgroup会设置多个线程,默认不写的话,netty会获取当前服务器中的cpu核数*2作为默认创建的线程数量。

  • selector(accepet),selector(读写)

    selector和NIO中的selector是同一种组件,不过在netty中会分为两种类型的selector:专门处理连接事件的selector和专门处理读写事件的selector;但是在NIO中处理这些事件都是使用的同一个selector,NIO中通过遍历key的方式,来判断是连接事件还是读写事件,然后交给后端线程处理的逻辑;

  • NioServerSocketChannel

    这是服务端启动之后创建的一个channel,然后会把这个channel注册到selector中,并添加自己感兴趣的accept的事件,后续所有客户端发起的连接都会被该channel监听到。具体用来做什么,我们会结合下个组件介绍

  • NioSocketChannel

    客户端在发起连接请求之后,服务端会通过调用NioServerSocketChannel的accepet方法,生成一个NioSocketChannel,接着会从workGroup中挑选一个eventLoop,然后把channel注册到该eventLoop线程的selector上,并添加感兴趣的读事件;后续客户端与服务端所有的读写操作都会在该channel中进行。

  • pipline

    pipline是一个实现了职责链模式的管道处理器,在初始化之后,会添加一些处理器,例如:编码器、解码器、业务逻辑处理器,select在得到客户端发送过来的数据后,会把数据丢到这个管道里面,然后从头到尾依次执行这些处理器;如果是服务端把数据发往客户端,会从尾部到头部依次执行处理器,但是从服务端发数据到客户端,只会执行出站处理器;客户端发送数据到服务端,只会执行入站处理器。

介绍完这些基本组件之后,我们对netty的线程模型应该有了初步的认识,现在我们大概梳理下netty的整个处理过程:

流程讲解

  1. 服务端初始化时,会创建两个线程组bossGroup,workGoup;

  2. 创建一个NioServerSocketChannel 注册到bossGroup中eventLoop的selector上面,添加自己感兴趣的accept事件, 并监听指定端口;

  3. client1发起连接请求,在服务端会产生一个accept事件,通过遍历selector中的key得到accept事件;

  4. 服务端的NioServerSocketChannel通过accept方法进行阻塞(其实该事件已经来了,不需要阻塞),返回一个客户端的channel1(NioSocketChannel);

  5. 获得了chnnel1之后,服务端会从workgroup挑选一个eventloop1,并将channel1注册到该eventloop1的selector1上面,并添加感兴趣的读事件;这时候已经初始化好了该通道中的pipline1,并将所有的处理器都添加到了pipline1中;

  6. 这个时候又新加入一个client2发起连接,会执行同样的操作,最终将chnnel2注册到另外一个eventloop2里面的selector2上面,并添加感兴趣的读事件;这时候已经初始化好了该通道中的pipline2,并将所有的处理器都添加到了pipline2中;

  7. 如果client1发送数据到服务端,服务端生成的selector1会监听到该事件(读事件),读取通道中的数据,并将数据交给pipline1中,执行后续逻辑处理;

  8. 如果client2发送数据到服务端,服务端生成的selector2会监听到该事件(读事件),读取通道中的数据,并将数据交给pipline2中,执行后续逻辑处理;

快速上手

前面已经将netty的基本组成和其线程模型大概说了下,现在我们演示下如何使用netty进行开发:代码已经放到码云:穿云箭

添加依赖

   <dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>5.0.0.Alpha1</version>
   </dependency>

服务端代码

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class NettyServer {
    public static void main(String[] args) {
        // 创建 处理连接请求的线程组 1个
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        // 创建工作组线程 默认为 cpu核数*2 个
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            
            serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //在pipline中添加自定义的handle处理器
                            socketChannel.pipeline().addLast(new NettyServerHandler());
                        }
                    });
            System.out.println("netty server start");
            // 绑定9000 端口号 sync指的是 创建完端口监听后,才执行后续操作
            ChannelFuture cf = serverBootstrap.bind(9000).sync();
            // 添加监听器 
            cf.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    System.out.println("服务启动完成");
                }
            });
            // 注册chnnel的关闭事件,sync是只有当关闭事件发生后才结束该线程,否则一直阻塞
            cf.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}

创建自定义的处理器,写我们自己的业务逻辑

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf msg1 = (ByteBuf) msg;
        System.out.println(String.format("收到客户端(%s)消息:%s", ctx.channel().remoteAddress().toString(), msg1.toString(CharsetUtil.UTF_8)));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ByteBuf buf = Unpooled.copiedBuffer("HelloClient", CharsetUtil.UTF_8);
        ctx.writeAndFlush(buf);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(String.format("有新的客户端连接:%s", ctx.channel().remoteAddress().toString()));
    }
}
# 这里的ChannelInboundHandlerAdapter已经被废弃了,大家后续可以继承SimpleChannelInboundHandler,支持传入泛型,然后配合解码器使用,这里只是做个简单的演示。

客户端代码

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

public class NettyClient {
    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(bossGroup).channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new NettyClientHandler());
                        }

                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            System.out.println(String.format("有新的客户端连接:%s", ctx.channel().remoteAddress().toString()));
                        }
                    });
            System.out.println("netty client start");
            ChannelFuture cf = bootstrap.connect("127.0.0.1", 9000).sync();
            cf.addListener((ChannelFutureListener) channelFuture -> System.out.println("客户端启动完成"));

            String msg = "";
            BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
            do {
                try {
                    msg = br.readLine();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                ByteBuf buf = Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8);
                cf.channel().writeAndFlush(buf);
            } while (!msg.equals("end"));
            System.out.println("您已退出");
            cf.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
        }

    }
}

创客户端自定义处理器

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf buf = Unpooled.copiedBuffer("HelloServer", CharsetUtil.UTF_8);
        ctx.writeAndFlush(buf);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf msg1 = (ByteBuf) msg;
        System.out.println(String.format("收到服户端(%s)消息:%s", ctx.channel().remoteAddress().toString(), msg1.toString(CharsetUtil.UTF_8)));
    }

}

netty相关面试知识拓展

什么是拆包和粘包

名词解释

客户端与服务端建立了TCP/UDP连接,如果连接中限制了发送数据的报文大小,此时 将要发送的数据大于这个限制,就会产生拆包现象;截取后的数据包会等待下次发送数据的时候一起发送,如果这个时候这部分数据和其他数据包一起发到服务端,又会产生粘包的现象;分布式专题|都说netty入门很难,那是因为你没有看我的文章!

解决方案

  • 自己定义数据发送的数据格式,包括数据长度和数据内容两个,通过长度来判断数据有没有结束
  • 使用定长解码器实现
  • 使用指定开始符和结束符实现

解释下什么是零拷贝

说零拷贝之前,我们需要引入一个名词“直接内存”,我们知道java代码都运行在jvm虚拟机中,分配的内存数据都是在jvm中分配的,如果想直接访问jvm之外的内存数据,那就叫直接内存访问;在netty中,直接使用直接内存进行socket进行读写。不需要将数据拷贝到jvm中的缓冲区中,而是将数据直接发送到socket中,不需要再执行中间的拷贝操作;分布式专题|都说netty入门很难,那是因为你没有看我的文章!分布式专题|都说netty入门很难,那是因为你没有看我的文章!


分布式专题|都说netty入门很难,那是因为你没有看我的文章!
认真读一本书