Netty 从成神到升仙系列 三Netty 凭什么成为国内最流行的网络通信框架?

Posted 爱敲代码的小黄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty 从成神到升仙系列 三Netty 凭什么成为国内最流行的网络通信框架?相关的知识,希望对你有一定的参考价值。

  • 👏作者简介:大家好,我是爱敲代码的小黄,独角兽企业的Java开发工程师,Java领域新星创作者。
  • 📝个人公众号:爱敲代码的小黄(回复 “技术书籍” 可获千本电子书籍)
  • 📕系列专栏:Java设计模式、数据结构和算法、Kafka从入门到成神、Kafka从成神到升仙、操作系统从入门到成神
  • 📧如果文章知识点有错误的地方,请指正!和大家一起学习,一起进步👀
  • 🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦
  • 🍂博主正在努力完成2022计划中:以梦为马,扬帆起航,2022追梦人

文章目录

Netty 源码

一、源码构成

二、Netty对三种I/O的支持


首先,我们看下这几种 I/O 的实现。

BIO(阻塞IO):食堂排队打饭模式,持队在窗口,打好才走

NIO(非阻塞IO):点单、等待被叫模式,等待被叫,好了自己去端

AIO(异步IO):包厢模式,点单后菜直接被端上桌。

BIO的实现:

// 如果是BIO的话
EventLoopGroup bossGroup = new OioEventLoopGroup();
EventLoopGroup workerGroup = new OioEventLoopGroup();
b.group(bossGroup, workerGroup).channel(OioserverSocketChannel.class)

其余实现也如上所示,我们 Linux 下的环境基本都是 NIO 的 I/O 实现

1、三种模式如何切换

我们可以看到,上面我们 Netty 实际支持三种 I/O 模式,那么这三种 I/O 是怎么样进行的切换呢?

方法的执行是在:

b.group(bossGroup, workerGroup)
// 主要在下面这一行!!!
.channel(NioServerSocketChannel.class)

我们观察一下,.channel 方法做了什么

public B channel(Class<? extends C> channelClass) 
    	  // 反射工厂的实现
        return channelFactory(new ReflectiveChannelFactory<C>(  
                ObjectUtil.checkNotNull(channelClass, "channelClass")
        ));
    

public ReflectiveChannelFactory(Class<? extends T> clazz) 
        // 校验当前传入的clazz是否为空
        ObjectUtil.checkNotNull(clazz, "clazz");
        try 
            // 通过反射获取对应类的无参构造器
            this.constructor = clazz.getConstructor();
        
    
// 上面获取完无参构造器之后,这里会通过无参构造器调用 newInstance 得到一个类的实例
public T newChannel() 
    try 
        //反射创建channel
        return constructor.newInstance();
     

简单描述一下,通过 泛型+反射+工厂 实现 IO 模式切换

三、Netty 如何支持 Reactor 模型

Reactor 模型通过注册监听事件的方式,将阻塞模式修改为非阻塞模式,大大的提升了效率。


真正去执行的是 EventLoop,而我们的 EventLoopGroup 则是多个 EventLoop 的集合

1. EventLoop 是线程嘛

首先,我们看一下 EventLoop 的继承关系:

可以明显的看出,我们的 EventLoop 最终继承 ScheduledExecutorService 方法,也就是我们经常经常使用的线程池中的定时线程。

2. 如何实现三种模式的 Reactor 模型

我们知道,一般 Reactor 模型,有三种:

2.1 单线程 Reactor 模式


EventLoopGroup bossGroup = new NioEventLoopGroup(1); //有参的构造方法,创建1个线程

使用上述代码即可创建单线程的 Reactor 模型。

但具体如何实现,我们去看下源码:

// nThreads 传递的入参
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) 
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    

在源码中明确表明了,如果当前你传入的入参为 **0(不传入参)**时,当前线程为 DEFAULT_EVENT_LOOP_THREADS,如果你传递了入参,则按照你入参的个数申请线程。

PS:

DEFAULT_EVENT_LOOP_THREADS 的大小在源码中定义如下:

DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

这里的 NettyRuntime.availableProcessors() 最终调用的方法为:Runtime.getRuntime().availableProcessors())

有兴趣的读者可以自己去看下自己电脑的线程,博主的线程如下:

  • IDEA的线程
  • 鲁大师的面板

2.2 多线程 Reactor 模式


如果我们传递无入参的话,创建的就是多线程的 Reactor 模式

这种相对于单线程来说,其实提升并不是很大,简单而言就是多个线程的堆积。

比如

  • 原来一个线程完成接受、读取、解码、计算、加码、发送整个流程
  • 现在多个线程一起完成接受、读取、解码、计算、加码、发送整个流程

相当于复制了几份而已

2.3 主从 Reactor 模式


而我们的主从 Reactor 模式则打破了原有的架构,采用了一个新的架构进行数据的接受和发送

我们创建两个线程组,一个线程组负责接受客户端的消息,另外一个线程组负责读取、解码、计算、加码、发送整个流程。

这样,我们每个线程都有自己要做的事情并且由于接受客户端的消息很快,我们的 mainReactor 线程组会比原来接受更多的客户端消息

2.3.1 主从模式如何实现

Netty 中只需要指定两个 EventLoopGroup 即可,如下:

EventLoopGroup bossGroup = new NioEventLoopGroup();  //无参的构造方法: 线程组,多个线程
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)

我们的 bossGroup 就是负责接受数据的线程组,而我们的 workerGroup 就是负责处理数据的线程组

2.3.2 主从模式源码实现
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) 
    // 这里会设置父线程组(接受数据的)
    super.group(parentGroup);
    ObjectUtil.checkNotNull(childGroup, "childGroup");
    this.childGroup = childGroup;
    return this;

// 类的全路径:io.netty.bootstrap.AbstractBootstrap
volatile EventLoopGroup group;
public B group(EventLoopGroup group) 
    ObjectUtil.checkNotNull(group, "group");
    this.group = group;
    return self();

经过上述操作,我们传入的 bossGroup 已经被赋值到 io.netty.bootstrap.AbstractBootstrap.group 了。

我们看上述的图片,可以看出来,这个 bossGroup 主要做的是接受客户端的数据连接,但口说无凭,源码中在哪实现了呢

主要在于 bind() 这个方法:

private ChannelFuture doBind(final SocketAddress localAddress) 
    final ChannelFuture regFuture = initAndRegister();//这个方法比较重要

final ChannelFuture initAndRegister() 
    // 在这里的config().group()拿出我们的 EventLoopGroup 进行注册
    ChannelFuture regFuture = config().group().register(channel);
    return regFuture;

而这里的 EventLoopGroup 我们可以看到,他正是我们之前设置的 bossGroup,也就是 io.netty.bootstrap.AbstractBootstrap.group 这个变量

另外,我们的从模式解决读事件的方法,可以直接去 io.netty.bootstrap.ServerBootstrap.childGroup 这个变量去找

3. Netty 给 Channel 分配 NIOEventLoop 的规则

我们上面可以看到,一个 EventLoopGroup 包括多个 EventLoop,那么我们处理数据,Netty 是如何分配这些 EventLoop 的呢?

在源码 io.netty.util.concurrent.MultithreadEventExecutorGroup.next() 方法中

@Override
public EventExecutor next() 
    return chooser.next(); //chooser是一个选择器,一般的实现会使用策略模式

利用 chooser 选择器,选择不同的策略执行不同的 EventLoop

// 选择器的选择
public EventExecutorChooser newChooser(EventExecutor[] executors) 
    //根据待绑定的executor是否是2的幂次方,做出不同的选择
    // 这里的二次方后面会讲到
    if (isPowerOfTwo(executors.length)) 
        return new PowerOfTwoEventExecutorChooser(executors);
     else 
        return new GenericEventExecutorChooser(executors);
    

我们点进去发现,这里的策略一共有两种

  • GenericEventExecutorChooser:取模轮询
  • PowerOfTwoEventExecutorChooser:幂等运算(数组长度必须是2的幂次方

3.1 GenericEventExecutorChooser

这里其实没什么好说的,就简单的取模、

private static final class GenericEventExecutorChooser implements EventExecutorChooser 
    // 原子递增
    private final AtomicInteger idx = new AtomicInteger();
    private final EventExecutor[] executors;

    GenericEventExecutorChooser(EventExecutor[] executors) 
        this.executors = executors;
    

    @Override
    public EventExecutor next() 
        //递增、取模,取正值,不然可能是负数,另外:有个非常小的缺点,当idx累加成最大值后,有短暂的不公平:
        //1,2,3,4,5,6,7,0,7 7 7 
        // 当我们的 idx.getAndIncrement() 到达最大值 Integer.MAX_VALUE 时,会不再增加,也就一直是
        return executors[Math.abs(idx.getAndIncrement() % executors.length)];
    

3.2 PowerOfTwoEventExecutorChooser

这里利用的是 & 的知识点

private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser 
    private final AtomicInteger idx = new AtomicInteger();
    private final EventExecutor[] executors;

    PowerOfTwoEventExecutorChooser(EventExecutor[] executors) 
        this.executors = executors;
    

    @Override
    //executors总数必须是2的幂次方(2,4,8...等)才会用,&运算效率更高,同时当idx累加成最大值之后,相比较通用的方式(GenericEventExecutorChooser),更公平
    public EventExecutor next() 
        return executors[idx.getAndIncrement() & executors.length - 1];
    

读者如果阅读过 HashMap 的源码,在 put 的过程中,采用了 & 的方式去进行数组的确定。

public V put(K key, V value) 
    return putVal(hash(key), key, value, false, true);


final V putVal(int hash, K key, V value, boolean onlyIfAbsent, boolean evict) 
    Node<K,V>[] tab; Node<K,V> p; int n, i;
    if ((tab = table) == null || (n = tab.length) == 0)
        n = (tab = resize()).length;
    
    if ((p = tab[i = (n - 1) & hash]) == null)
        tab[i] = newNode(hash, key, value, null)
    

其中,tab[i = (n - 1) & hash] 利用 (n - 1) & hash 将当前的数组均匀的分配到每个数组上,其实效果和我们上面的取模是一样的。

主要是性能上的提高

  • 对于取模运算来说,我们计算机需要去进行不断的取余计算,如果数据过大,其实会耗费一些性能。
  • 但对于 & 运算,直接操作二进制,这种效率较高

这里就不介绍为什么 (n - 1) & hash 等价于 hash % n 了,有兴趣的读者可以翻一下以前介绍 HashMap 的文章

4. Netty 如何保证跨平台性

我们知道,对于不同的平台,Netty 具有不同的实现,如下:

那这种是如何实现的呢,毕竟我们使用的都是同一套代码

当我们创建 EventLoopGroup 时,我们会有这么一个实现:

public NioEventLoopGroup(int nThreads, Executor executor) 
    //默认selector,最终实现类似:https://github.com/frohoff/jdk8u-jdk/blob/master/src/macosx/classes/sun/nio/ch/DefaultSelectorProvider.java
    //basic flow: 1 java.nio.channels.spi.SelectorProvider 2 META-INF/services 3 default
    this(nThreads, executor, SelectorProvider.provider());

这里面的 SelectorProvider.provider() 就是我们所选择的实现

  • 最终的实现是在我们下载的JDK里面,所以不同平台的JDK默认有不同的实现
public static SelectorProvider provider() 
    // 防止多个线程并发访问
    synchronized (lock) 
        if (provider != null)
            return provider;
        return AccessController.doPrivileged(
            new PrivilegedAction<SelectorProvider>() 
                public SelectorProvider run() 
                    	 // 这里是通过SPI去查询文件是否配置
                       // String cn = System.getProperty("java.nio.channels.spi.SelectorProvider");
                        if (loadProviderFromProperty())
                            return provider;
                    	// META-INF/services
                    // ServiceLoader<SelectorProvider> sl =
            ServiceLoader.load(SelectorProvider.class,
                               ClassLoader.getSystemClassLoader());
                        if (loadProviderAsService())
                            return provider;
                    	// 默认会用下面这个
                        provider = sun.nio.ch.DefaultSelectorProvider.create();
                        return provider;
                    
                );
    

// 类的路径:java.nio.channels.spi.SelectorProvider.DefaultSelectorProvider
public static SelectorProvider create() 
    return new WindowsSelectorProvider();

// 博主安装的是 Window 的JDK版本,如果是MAC的,下面的将会是这个样子
public static SelectorProvider create() 
    return new sun.nio.ch.KQueueSelectorProvider();

四、Netty 对粘包和半包的支持

1. 什么是粘包、半包


我们的客户端向服务端发送 ABC DED,但我们的服务端收到的消息却不一样,具体情况如下:

  • 没有问题的情况下:服务端收到 ABC DEF
  • 一次收到了:服务端收到 ABCDEF (TCP粘包)
  • 多次收到:第一次 AB,第二次 CD,第三次 EF(TCP拆包)

我们这里实际测试一下,有的读者可能还是不太清楚:

首先,我们的服务端核心代码如下:

public class EchoServerHandler extends ChannelInboundHandlerAdapter 

    private AtomicInteger counter = new AtomicInteger(0);

    /*** 服务端读取到网络数据后的处理*/
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception 
        ByteBuf in = (ByteBuf)msg;
        String request = in.toString(CharsetUtil.UTF_8);
        System.out.println("Server Accept["+request
                +"] and the counter is:"+counter.incrementAndGet());
        String resp = "Hello,"+request+". Welcome to Netty World!"
                + System.getProperty("line.separator");
        ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));
    


当服务端收到客户端的消息时,会执行 channelRead0 这个方法,我使用了一个 counter 原子变量来记录服务端当前收到的消息数量。

客户端核心代码:

/**
 * 类说明:粘包/半包问题展示
 */
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> 

    private AtomicInteger counter = new AtomicInteger(0);

    /*** 客户端读取到网络数据后的处理*/
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception 
        System.out.println("client Accept["+msg.toString(CharsetUtil.UTF_8)
                +"] and the counter is:"+counter.incrementAndGet());
    

    /*** 客户端被通知channel活跃后,做事*/
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception 
        ByteBuf msg = null;
        String request = "ABC,DEF,GHI,JKL,MNO"
                + System.getProperty("line.separator");
        //发送100次
        for(int i=0;i<100;i++)
            msg = Unpooled.buffer(request.length());
            msg.writeBytes(request.getBytes());
            ctx.writeAndFlush(msg);
        
    

我们可以看到,我们的客户端会发送 100ABC,DEF,GHI,JKL,MNO 的字符串

正常来说,我们的服务端会收到 100 条消息,我们运行一下看看实际情况

服务端的截图:

  • 收到的第一条消息:
  • 收到的第二条消息

这正是出现的粘包、半包问题

2. 为什么会出现这个问题?

当我们的客户端向服务端发送数据时,并不是立即发送

主要在于:socket会有缓存区,当缓存区达到一定的条件时,才会进行发送至服务端。

当然,本质还是在与:对于TCP来说,TCP是一个流式的协议,消息无边界

3. 如何避免粘包、半包

既然出现问题的原因在于消息无边界,那么只要我们找到当前客户端发送消息的边界就可以了。

3.1 短连接(不推荐)

当我们的客户端和服务端建立连接后,客户端发送一次消息就立即断开,这样我们的消息仅仅只发一次,服务端也只会收到一次。

这种一般不太实用,总不能我客户端每一次都要和服务端重新建立连接吧。

3.2 固定长度(不推荐)

我们客户端和服务端发送消息规定一个长度,比如我现在想发送 ABCDEF 100 次,那么我规定的长度就是 6,一次6个字符为一次调用。

Netty 对长度的支持为:FixedLengthFrameDecoder,自动为你分割服务端当前收到的消息

但这种比较浪费空间,比如你当前的长度设置成 100,但最后一次消息分割只有 10,浪费掉了 90 的空间。

3.3 分隔符(推荐)

我们客户端和服务端发送消息规定一个换行符,比如我现在想发送 ABCDEF 100 次,那么我们每一次发送后面都加一个 换行符

Netty 对换行符的支持为:LineBasedFrameDecoder,自动为你分割服务端当前收到的消息。

当然,如果你不想局限于 换行符,也可以使用自定义的:DelimiterBasedFrameDecoder

但这种的缺点在于有的分隔符需要进行转义,代码写起来较为复杂。

3.4 消息头和消息体(推荐)

参考我们的 HTTP 请求
分别以下几方面:

  • 魔数:4 个字节的魔数
  • 版本:当前的版本号
  • 序列化方式:JDK或者JSON
  • 指令类型:
  • 请求序号:唯一性
  • 对齐填充
  • 获取内容的字节数组
  • 正文长度
protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception 
        // 1. 4 个字节的魔数
        out.writeBytes(new byte[]1, 2, 3, 4);
        // 2. 1 字节的版本
        out.writeByte(1);
        // 3. 1 字节的序列化方式:0 JDK  1 JSON
        out.writeByte(0);
        // 4. 1 指令类型
        out.writeByte(msg.getMessageType(以上是关于Netty 从成神到升仙系列 三Netty 凭什么成为国内最流行的网络通信框架?的主要内容,如果未能解决你的问题,请参考以下文章

Spring从成神到升仙系列 一2023年再不会动态代理,就要被淘汰了

Spring从成神到升仙系列 四从源码分析 Spring 事务的来龙去脉

Kafka从成神到升仙系列 四你真的了解 Kafka 的缓存池机制嘛

Kafka从成神到升仙系列 五面试官问我 Kafka 生产者的网络架构,我直接开始从源码背起.......

2023年再不会 IOC 源码,就要被淘汰了

回首2022,展望2023