聊聊Netty那些事儿之从内核角度看IO模型

Posted ImportNew

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了聊聊Netty那些事儿之从内核角度看IO模型相关的知识,希望对你有一定的参考价值。

今天我们来聊聊Netty的那些事儿,我们都知道Netty是一个高性能异步事件驱动的网络框架。

它的设计异常优雅简洁,扩展性高,稳定性强。拥有非常详细完整的用户文档。

同时内置了很多非常有用的模块基本上做到了开箱即用,用户只需要编写短短几行代码,就可以快速构建出一个具有高吞吐低延时更少的资源消耗高性能(非必要的内存拷贝最小化)等特征的高并发网络应用程序。

本文我们来探讨下支持Netty具有高吞吐低延时特征的基石----netty的网络IO模型

由Netty的网络IO模型开始,我们来正式揭开本系列Netty源码解析的序幕:

网络包收发过程.png
  • 网络数据帧通过网络传输到达网卡时,网卡会将网络数据帧通过DMA的方式放到环形缓冲区RingBuffer中。
  • RingBuffer是网卡在启动的时候分配和初始化环形缓冲队列。当RingBuffer满的时候,新来的数据包就会被丢弃。我们可以通过ifconfig命令查看网卡收发数据包的情况。其中overruns数据项表示当RingBuffer满时,被丢弃的数据包。如果发现出现丢包情况,可以通过ethtool命令来增大RingBuffer长度。

  • DMA操作完成时,网卡会向CPU发起一个硬中断,告诉CPU有网络数据到达。CPU调用网卡驱动注册的硬中断响应程序。网卡硬中断响应程序会为网络数据帧创建内核数据结构sk_buffer,并将网络数据帧拷贝sk_buffer中。然后发起软中断请求,通知内核有新的网络数据帧到达。
  • sk_buff缓冲区,是一个维护网络帧结构的双向链表,链表中的每一个元素都是一个网络帧。虽然 TCP/IP 协议栈分了好几层,但上下不同层之间的传递,实际上只需要操作这个数据结构中的指针,而无需进行数据复制

  • 内核线程ksoftirqd发现有软中断请求到来,随后调用网卡驱动注册的poll函数poll函数sk_buffer中的网络数据包送到内核协议栈中注册的ip_rcv函数中。
  • 每个CPU会绑定一个ksoftirqd内核线程专门用来处理软中断响应。2个 CPU 时,就会有 ksoftirqd/0ksoftirqd/1这两个内核线程。

    这里有个事情需要注意下: 网卡接收到数据后,当DMA拷贝完成时,向CPU发出硬中断,这时哪个CPU上响应了这个硬中断,那么在网卡硬中断响应程序中发出的软中断请求也会在这个CPU绑定的ksoftirqd线程中响应。所以如果发现Linux软中断,CPU消耗都集中在一个核上的话,那么就需要调整硬中断的CPU亲和性,来将硬中断打散不通的CPU核上去。

  • ip_rcv函数中也就是上图中的网络层取出数据包的IP头,判断该数据包下一跳的走向,如果数据包是发送给本机的,则取出传输层的协议类型(TCP或者UDP),并去掉数据包的IP头,将数据包交给上图中得传输层处理。
  • 传输层的处理函数:TCP协议对应内核协议栈中注册的tcp_rcv函数UDP协议对应内核协议栈中注册的udp_rcv函数

  • 当我们采用的是TCP协议时,数据包到达传输层时,会在内核协议栈中的tcp_rcv函数处理,在tcp_rcv函数中去掉TCP头,根据四元组(源IP,源端口,目的IP,目的端口)查找对应的Socket,如果找到对应的Socket则将网络数据包中的传输数据拷贝到Socket中的接收缓冲区中。如果没有找到,则发送一个目标不可达icmp包。

  • 内核在接收网络数据包时所做的工作我们就介绍完了,现在我们把视角放到应用层,当我们程序通过系统调用read读取Socket接收缓冲区中的数据时,如果接收缓冲区中没有数据,那么应用程序就会在系统调用上阻塞,直到Socket接收缓冲区有数据,然后CPU内核空间(Socket接收缓冲区)的数据拷贝用户空间,最后系统调用read返回,应用程序读取数据。

  • 从内核处理网络数据包接收的整个过程来看,内核帮我们做了非常之多的工作,最终我们的应用程序才能读取到网络数据。

    随着而来的也带来了很多的性能开销,结合前面介绍的网络数据包接收过程我们来看下网络数据包接收的过程中都有哪些性能开销:

  • 应用程序通过系统调用用户态转为内核态的开销以及系统调用返回时从内核态转为用户态的开销。
  • 网络数据从内核空间通过CPU拷贝用户空间的开销。
  • 内核线程ksoftirqd响应软中断的开销。
  • CPU响应硬中断的开销。
  • DMA拷贝网络数据包到内存中的开销。
  • 网络包发送过程.png
  • 当我们在应用程序中调用send系统调用发送数据时,由于是系统调用所以线程会发生一次用户态到内核态的转换,在内核中首先根据fd将真正的Socket找出,这个Socket对象中记录着各种协议栈的函数地址,然后构造struct msghdr对象,将用户需要发送的数据全部封装在这个struct msghdr结构体中。

  • 调用内核协议栈函数inet_sendmsg,发送流程进入内核协议栈处理。在进入到内核协议栈之后,内核会找到Socket上的具体协议的发送函数。

  • 比如:我们使用的是TCP协议,对应的TCP协议发送函数是tcp_sendmsg,如果是UDP协议的话,对应的发送函数为udp_sendmsg

  • TCP协议的发送函数tcp_sendmsg中,创建内核数据结构sk_buffer,将struct msghdr结构体中的发送数据拷贝sk_buffer中。调用tcp_write_queue_tail函数获取Socket发送队列中的队尾元素,将新创建的sk_buffer添加到Socket发送队列的尾部。
  • Socket的发送队列是由sk_buffer组成的一个双向链表

    发送流程走到这里,用户要发送的数据总算是从用户空间拷贝到了内核中,这时虽然发送数据已经拷贝到了内核Socket中的发送队列中,但并不代表内核会开始发送,因为TCP协议流量控制拥塞控制,用户要发送的数据包并不一定会立马被发送出去,需要符合TCP协议的发送条件。如果没有达到发送条件,那么本次send系统调用就会直接返回。

  • 如果符合发送条件,则开始调用tcp_write_xmit内核函数。在这个函数中,会循环获取Socket发送队列中待发送的sk_buffer,然后进行拥塞控制以及滑动窗口的管理

  • 将从Socket发送队列中获取到的sk_buffer重新拷贝一份,设置sk_buffer副本中的TCP HEADER

  • sk_buffer 内部其实包含了网络协议中所有的 header。在设置 TCP HEADER的时候,只是把指针指向 sk_buffer的合适位置。后面再设置 IP HEADER的时候,在把指针移动一下就行,避免频繁的内存申请和拷贝,效率很高。

    sk_buffer.png

    为什么不直接使用Socket发送队列中的sk_buffer而是需要拷贝一份呢?因为TCP协议是支持丢包重传的,在没有收到对端的ACK之前,这个sk_buffer是不能删除的。内核每次调用网卡发送数据的时候,实际上传递的是sk_buffer拷贝副本,当网卡把数据发送出去后,sk_buffer拷贝副本会被释放。当收到对端的ACK之后,Socket发送队列中的sk_buffer才会被真正删除。

  • 当设置完TCP头后,内核协议栈传输层的事情就做完了,下面通过调用ip_queue_xmit内核函数,正式来到内核协议栈网络层的处理。

    通过route命令可以查看本机路由配置。

    如果你使用 iptables配置了一些规则,那么这里将检测是否命中规则。如果你设置了非常复杂的 netfilter 规则,在这个函数里将会导致你的线程 CPU 开销极大增加

  • sk_buffer中的指针移动到IP头位置上,设置IP头

  • 执行netfilters过滤。过滤通过之后,如果数据大于 MTU的话,则执行分片。

  • 检查Socket中是否有缓存路由表,如果没有的话,则查找路由项,并缓存到Socket中。接着在把路由表设置到sk_buffer中。
  • 内核协议栈网络层的事情处理完后,现在发送流程进入了到了邻居子系统邻居子系统位于内核协议栈中的网络层网络接口层之间,用于发送ARP请求获取MAC地址,然后将sk_buffer中的指针移动到MAC头位置,填充MAC头

  • 经过邻居子系统的处理,现在sk_buffer中已经封装了一个完整的数据帧,随后内核将sk_buffer交给网络设备子系统进行处理。网络设备子系统主要做以下几项事情:

  • 选择发送队列(RingBuffer)。因为网卡拥有多个发送队列,所以在发送前需要选择一个发送队列。
  • sk_buffer添加到发送队列中。
  • 循环从发送队列(RingBuffer)中取出sk_buffer,调用内核函数sch_direct_xmit发送数据,其中会调用网卡驱动程序来发送数据。
  • 以上过程全部是用户线程的内核态在执行,占用的CPU时间是系统态时间(sy),当分配给用户线程的CPU quota用完的时候,会触发NET_TX_SOFTIRQ类型的软中断,内核线程ksoftirqd会响应这个软中断,并执行NET_TX_SOFTIRQ类型的软中断注册的回调函数net_tx_action,在回调函数中会执行到驱动程序函数 dev_hard_start_xmit来发送数据。

    注意:当触发NET_TX_SOFTIRQ软中断来发送数据时,后边消耗的 CPU 就都显示在 si这里了,不会消耗用户进程的系统态时间(sy)了。

    从这里可以看到网络包的发送过程和接受过程是不同的,在介绍网络包的接受过程时,我们提到是通过触发NET_RX_SOFTIRQ类型的软中断在内核线程ksoftirqd中执行内核网络协议栈接受数据。而在网络数据包的发送过程中是用户线程的内核态在执行内核网络协议栈,只有当线程的CPU quota用尽时,才触发NET_TX_SOFTIRQ软中断来发送数据。

    在整个网络包的发送和接受过程中,NET_TX_SOFTIRQ类型的软中断只会在发送网络包时并且当用户线程的CPU quota用尽时,才会触发。剩下的接受过程中触发的软中断类型以及发送完数据触发的软中断类型均为NET_RX_SOFTIRQ。所以这就是你在服务器上查看 /proc/softirqs,一般 NET_RX都要比 NET_TX大很多的的原因。

  • 现在发送流程终于到了网卡真实发送数据的阶段,前边我们讲到无论是用户线程的内核态还是触发NET_TX_SOFTIRQ类型的软中断在发送数据的时候最终会调用到网卡的驱动程序函数dev_hard_start_xmit来发送数据。在网卡驱动程序函数dev_hard_start_xmit中会将sk_buffer映射到网卡可访问的内存 DMA 区域,最终网卡驱动程序通过DMA的方式将数据帧通过物理网卡发送出去。

  • 当数据发送完毕后,还有最后一项重要的工作,就是清理工作。数据发送完毕后,网卡设备会向CPU发送一个硬中断,CPU调用网卡驱动程序注册的硬中断响应程序,在硬中断响应中触发NET_RX_SOFTIRQ类型的软中断,在软中断的回调函数igb_poll中清理释放 sk_buffer,清理网卡发送队列(RingBuffer),解除 DMA 映射。

  • 无论硬中断是因为有数据要接收,还是说发送完成通知,从硬中断触发的软中断都是 NET_RX_SOFTIRQ

    这里释放清理的只是sk_buffer的副本,真正的sk_buffer现在还是存放在Socket的发送队列中。前面在传输层处理的时候我们提到过,因为传输层需要保证可靠性,所以 sk_buffer其实还没有删除。它得等收到对方的 ACK 之后才会真正删除。

    前边我们提到了在网络包接收过程中涉及到的性能开销,现在介绍完了网络包的发送过程,我们来看下在数据包发送过程中的性能开销:

  • 和接收数据一样,应用程序在调用系统调用send的时候会从用户态转为内核态以及发送完数据后,系统调用返回时从内核态转为用户态的开销。

  • 用户线程内核态CPU quota用尽时触发NET_TX_SOFTIRQ类型软中断,内核响应软中断的开销。

  • 网卡发送完数据,向CPU发送硬中断,CPU响应硬中断的开销。以及在硬中断中发送NET_RX_SOFTIRQ软中断执行具体的内存清理动作。内核响应软中断的开销。

  • 内存拷贝的开销。我们来回顾下在数据包发送的过程中都发生了哪些内存拷贝:

  • 在内核协议栈的传输层中,TCP协议对应的发送函数tcp_sendmsg会申请sk_buffer,将用户要发送的数据拷贝sk_buffer中。
  • 在发送流程从传输层到网络层的时候,会拷贝一个sk_buffer副本出来,将这个sk_buffer副本向下传递。原始sk_buffer保留在Socket发送队列中,等待网络对端ACK,对端ACK后删除Socket发送队列中的sk_buffer。对端没有发送ACK,则重新从Socket发送队列中发送,实现TCP协议的可靠传输。
  • 在网络层,如果发现要发送的数据大于MTU,则会进行分片操作,申请额外的sk_buffer,并将原来的sk_buffer拷贝到多个小的sk_buffer中。
  • 在我们聊完网络数据的接收和发送过程后,我们来谈下IO中特别容易混淆的概念:阻塞与同步非阻塞与异步

    网上各种博文还有各种书籍中有大量的关于这两个概念的解释,但是笔者觉得还是不够形象化,只是对概念的生硬解释,如果硬套概念的话,其实感觉阻塞与同步非阻塞与异步还是没啥区别,时间长了,还是比较模糊容易混淆。

    所以笔者在这里尝试换一种更加形象化,更加容易理解记忆的方式来清晰地解释下什么是阻塞与非阻塞,什么是同步与异步

    经过前边对网络数据包接收流程的介绍,在这里我们可以将整个流程总结为两个阶段:

    数据接收阶段.png
  • 数据准备阶段: 在这个阶段,网络数据包到达网卡,通过DMA的方式将数据包拷贝到内存中,然后经过硬中断,软中断,接着通过内核线程ksoftirqd经过内核协议栈的处理,最终将数据发送到内核Socket的接收缓冲区中。

  • 数据拷贝阶段: 当数据到达内核Socket的接收缓冲区中时,此时数据存在于内核空间中,需要将数据拷贝用户空间中,才能够被应用程序读取。

  • 阻塞与非阻塞的区别主要发生在第一阶段:数据准备阶段

    当应用程序发起系统调用read时,线程从用户态转为内核态,读取内核Socket的接收缓冲区中的网络数据。

    同步异步主要的区别发生在第二阶段:数据拷贝阶段

    前边我们提到在数据拷贝阶段主要是将数据从内核空间拷贝到用户空间。然后应用程序才可以读取数据。

    当内核Socket的接收缓冲区有数据到达时,进入第二阶段。

    改善了原来Linux native AIO的一些性能问题。性能相比Epoll以及之前原生的AIO提高了不少,值得关注。

    异步IO.png

    在进行网络IO操作时,用什么样的IO模型来读写数据将在很大程度上决定了网络框架的IO性能。所以IO模型的选择是构建一个高性能网络框架的基础。

    在《UNIX 网络编程》一书中介绍了五种IO模型:阻塞IO,非阻塞IO,IO多路复用,信号驱动IO,异步IO,每一种IO模型的出现都是对前一种的升级优化。

    下面我们就来分别介绍下这五种IO模型各自都解决了什么问题,适用于哪些场景,各自的优缺点是什么?

    阻塞IO.png

    经过前一小节对阻塞这个概念的介绍,相信大家可以很容易理解阻塞IO的概念和过程。

    既然这小节我们谈的是IO,那么下边我们来看下在阻塞IO模型下,网络数据的读写过程。

    阻塞IO模型.png

    由于阻塞IO的读写特点,所以导致在阻塞IO模型下,每个请求都需要被一个独立的线程处理。一个线程在同一时刻只能与一个连接绑定。来一个请求,服务端就需要创建一个线程用来处理请求。

    当客户端请求的并发量突然增大时,服务端在一瞬间就会创建出大量的线程,而创建线程是需要系统资源开销的,这样一来就会一瞬间占用大量的系统资源。

    如果客户端创建好连接后,但是一直不发数据,通常大部分情况下,网络连接也并不总是有数据可读,那么在空闲的这段时间内,服务端线程就会一直处于阻塞状态,无法干其他的事情。CPU也无法得到充分的发挥,同时还会导致大量线程切换的开销

    基于以上阻塞IO模型的特点,该模型只适用于连接数少并发度低的业务场景。

    比如公司内部的一些管理系统,通常请求数在100个左右,使用阻塞IO模型还是非常适合的。而且性能还不输NIO。

    该模型在C10K之前,是普遍被采用的一种IO模型。

    阻塞IO模型最大的问题就是一个线程只能处理一个连接,如果这个连接上没有数据的话,那么这个线程就只能阻塞在系统IO调用上,不能干其他的事情。这对系统资源来说,是一种极大的浪费。同时大量的线程上下文切换,也是一个巨大的系统开销。

    所以为了解决这个问题,我们就需要用尽可能少的线程去处理更多的连接。网络IO模型的演变也是根据这个需求来一步一步演进的。

    基于这个需求,第一种解决方案非阻塞IO就出现了。我们在上一小节中介绍了非阻塞的概念,现在我们来看下网络读写操作在非阻塞IO下的特点:

    非阻塞IO.png
    EAGAIN错误,这个阶段用户线程不会阻塞,也不会让出CPU,而是会继续轮训直到Socket接收缓冲区中有数据为止。

  • Socket接收缓冲区中有数据,用户线程在内核态会将内核空间中的数据拷贝到用户空间注意这个数据拷贝阶段,应用程序是阻塞的,当数据拷贝完成,系统调用返回。

  • 非阻塞IO模型.png

    基于以上非阻塞IO的特点,我们就不必像阻塞IO那样为每个请求分配一个线程去处理连接上的读写了。

    我们可以利用一个线程或者很少的线程,去不断地轮询每个Socket的接收缓冲区是否有数据到达,如果没有数据,不必阻塞线程,而是接着去轮询下一个Socket接收缓冲区,直到轮询到数据后,处理连接上的读写,或者交给业务线程池去处理,轮询线程则继续轮询其他的Socket接收缓冲区。

    这样一个非阻塞IO模型就实现了我们在本小节开始提出的需求:我们需要用尽可能少的线程去处理更多的连接

    虽然非阻塞IO模型阻塞IO模型相比,减少了很大一部分的资源消耗和系统开销。

    但是它仍然有很大的性能问题,因为在非阻塞IO模型下,需要用户线程去不断地发起系统调用去轮训Socket接收缓冲区,这就需要用户线程不断地从用户态切换到内核态内核态切换到用户态。随着并发量的增大,这个上下文切换的开销也是巨大的。

    所以单纯的非阻塞IO模型还是无法适用于高并发的场景。只能适用于C10K以下的场景。

    非阻塞IO这一小节的开头,我们提到网络IO模型的演变都是围绕着---如何用尽可能少的线程去处理更多的连接这个核心需求开始展开的。

    本小节我们来谈谈IO多路复用模型,那么什么是多路?,什么又是复用呢?

    我们还是以这个核心需求来对这两个概念展开阐述:

  • 多路:我们的核心需求是要用尽可能少的线程来处理尽可能多的连接,这里的多路指的就是我们需要处理的众多连接。

  • 复用:核心需求要求我们使用尽可能少的线程尽可能少的系统开销去处理尽可能多的连接(多路),那么这里的复用指的就是用有限的资源,比如用一个线程或者固定数量的线程去处理众多连接上的读写事件。换句话说,在阻塞IO模型中一个连接就需要分配一个独立的线程去专门处理这个连接上的读写,到了IO多路复用模型中,多个连接可以复用这一个独立的线程去处理这多个连接上的读写。

  • 好了,IO多路复用模型的概念解释清楚了,那么问题的关键是我们如何去实现这个复用,也就是如何让一个独立的线程去处理众多连接上的读写事件呢?

    这个问题其实在非阻塞IO模型中已经给出了它的答案,在非阻塞IO模型中,利用非阻塞的系统IO调用去不断的轮询众多连接的Socket接收缓冲区看是否有数据到来,如果有则处理,如果没有则继续轮询下一个Socket。这样就达到了用一个线程去处理众多连接上的读写事件了。

    但是非阻塞IO模型最大的问题就是需要不断的发起系统调用去轮询各个Socket中的接收缓冲区是否有数据到来,频繁系统调用随之带来了大量的上下文切换开销。随着并发量的提升,这样也会导致非常严重的性能问题。

    那么如何避免频繁的系统调用同时又可以实现我们的核心需求呢?

    这就需要操作系统的内核来支持这样的操作,我们可以把频繁的轮询操作交给操作系统内核来替我们完成,这样就避免了在用户空间频繁的去使用系统调用来轮询所带来的性能开销。

    正如我们所想,操作系统内核也确实为我们提供了这样的功能实现,下面我们来一起看下操作系统对IO多路复用模型的实现。

    select是操作系统内核提供给我们使用的一个系统调用,它解决了在非阻塞IO模型中需要不断的发起系统IO调用去轮询各个连接上的Socket接收缓冲区所带来的用户空间内核空间不断切换的系统开销

    select系统调用将轮询的操作交给了内核来帮助我们完成,从而避免了在用户空间不断的发起轮询所带来的的系统性能开销。

    select.png
  • 首先用户线程在发起select系统调用的时候会阻塞select系统调用上。此时,用户线程从用户态切换到了内核态完成了一次上下文切换

  • 用户线程将需要监听的Socket对应的文件描述符fd数组通过select系统调用传递给内核。此时,用户线程将用户空间中的文件描述符fd数组拷贝内核空间

  • 这里的文件描述符数组其实是一个BitMapBitMap下标为文件描述符fd,下标对应的值为:1表示该fd上有读写事件,0表示该fd上没有读写事件。

    fd数组BitMap.png

    文件描述符fd其实就是一个整数值,在Linux中一切皆文件,Socket也是一个文件。描述进程所有信息的数据结构task_struct中有一个属性struct files_struct *files,它最终指向了一个数组,数组里存放了进程打开的所有文件列表,文件信息封装在struct file结构体中,这个数组存放的类型就是struct file结构体,数组的下标则是我们常说的文件描述符fd

  • 当用户线程调用完select后开始进入阻塞状态内核开始轮询遍历fd数组,查看fd对应的Socket接收缓冲区中是否有数据到来。如果有数据到来,则将fd对应BitMap的值设置为1。如果没有数据到来,则保持值为0
  • 注意这里内核会修改原始的fd数组!!

  • 内核遍历一遍fd数组后,如果发现有些fd上有IO数据到来,则将修改后的fd数组返回给用户线程。此时,会将fd数组从内核空间拷贝到用户空间

  • 当内核将修改后的fd数组返回给用户线程后,用户线程解除阻塞,由用户线程开始遍历fd数组然后找出fd数组中值为1Socket文件描述符。最后对这些Socket发起系统调用读取数据。

  • select不会告诉用户线程具体哪些fd上有IO数据到来,只是在IO活跃fd上打上标记,将打好标记的完整fd数组返回给用户线程,所以用户线程还需要遍历fd数组找出具体哪些fd上有IO数据到来。

  • 由于内核在遍历的过程中已经修改了fd数组,所以在用户线程遍历完fd数组后获取到IO就绪Socket后,就需要重置fd数组,并重新调用select传入重置后的fd数组,让内核发起新的一轮遍历轮询。
  • select传递给内核监听的文件描述符集合中数值最大的文件描述符+1,目的是用于限定内核遍历范围。比如:select监听的文件描述符集合为0,1,2,3,4,那么maxfdp1的值为5

  • fd_set *readset:可读事件感兴趣的文件描述符集合。

  • fd_set *writeset:可写事件感兴趣的文件描述符集合。

  • fd_set *exceptset:可写事件感兴趣的文件描述符集合。

  • 这里的fd_set就是我们前边提到的文件描述符数组,是一个BitMap结构。

  • const struct timeval *timeout:select系统调用超时时间,在这段时间内,内核如果没有发现有IO就绪的文件描述符,就直接返回。
  • 上小节提到,在内核遍历完fd数组后,发现有IO就绪fd,则会将该fd对应的BitMap中的值设置为1,并将修改后的fd数组,返回给用户线程。

    在用户线程中需要重新遍历fd数组,找出IO就绪fd出来,然后发起真正的读写调用。

    下面介绍下在用户线程中重新遍历fd数组的过程中,我们需要用到的API

  • void FD_ZERO(fd_set *fdset):清空指定的文件描述符集合,即让fd_set中不在包含任何文件描述符。

  • void FD_SET(int fd, fd_set *fdset):将一个给定的文件描述符加入集合之中。

  • 每次调用select之前都要通过FD_ZEROFD_SET重新设置文件描述符,因为文件描述符集合会在内核被修改

  • int FD_ISSET(int fd, fd_set *fdset):检查集合中指定的文件描述符是否可以读写。用户线程遍历文件描述符集合,调用该方法检查相应的文件描述符是否IO就绪

  • void FD_CLR(int fd, fd_set *fdset):将一个给定的文件描述符从集合中删除

  • poll相当于是改进版的select,但是工作原理基本和select没有本质的区别。

    通过上边对select,poll核心原理的介绍,我们看到select,poll的性能瓶颈主要体现在下面三个地方:

  • 因为内核不会保存我们要监听的socket集合,所以在每次调用select,poll的时候都需要传入,传出全量的socket文件描述符集合。这导致了大量的文件描述符在用户空间内核空间频繁的来回复制。

  • 由于内核不会通知具体IO就绪socket,只是在这些IO就绪的socket上打好标记,所以当select系统调用返回时,在用户空间还是需要完整遍历一遍socket文件描述符集合来获取具体IO就绪socket

  • 内核空间中也是通过遍历的方式来得到IO就绪socket

  • 下面我们来看下epoll是如何解决这些问题的。在介绍epoll的核心原理之前,我们需要介绍下理解epoll工作过程所需要的一些核心基础知识。

    tcp_prot。并把它们分别设置到socket->opssock->sk_prot上。

    这里可以回看下本小节开头的《Socket内核结构图》捋一下他们之间的关系。

    socket相关的操作接口定义在inet_stream_ops函数集合中,负责对上给用户提供接口。而socket与内核协议栈之间的操作接口定义在struct sock中的sk_prot指针上,这里指向tcp_prot协议操作函数集合。

    对象中的sk_data_ready 函数指针设置为 sock_def_readable,在Socket数据就绪的时候内核会回调该函数。

  • struct sock中的等待队列中存放的是系统IO调用发生阻塞的进程fd,以及相应的回调函数记住这个地方,后边介绍epoll的时候我们还会提到!

    1. struct filestruct socketstruct sock这些核心的内核对象创建好之后,最后就是把socket对象对应的struct file放到进程打开的文件列表fd_array中。随后系统调用accept返回socket的文件描述符fd给用户程序。
    由于红黑树在查找插入删除等综合性能方面是最优的,所以epoll内部使用一颗红黑树来管理海量的Socket连接。

    select数组管理连接,poll链表管理连接。

    网上有大量的关于这两种模式的讲解,大部分讲的比较模糊,感觉只是强行从概念上进行描述,看完让人难以理解。所以在这里,笔者想结合上边epoll的工作过程,再次对这两种模式做下自己的解读,力求清晰的解释出这两种工作模式的异同。

    经过上边对epoll工作过程的详细解读,我们知道,当我们监听的socket上有数据到来时,软中断会执行epoll的回调函数ep_poll_callback,在回调函数中会将epoll中描述socket信息的数据结构epitem插入到epoll中的就绪队列rdllist中。随后用户进程从epoll的等待队列中被唤醒,epoll_waitIO就绪socket返回给用户进程,随即epoll_wait会清空rdllist

    水平触发边缘触发最关键的区别就在于当socket中的接收缓冲区还有数据可读时。epoll_wait是否会清空rdllist

  • 水平触发:在这种模式下,用户线程调用epoll_wait获取到IO就绪的socket后,对Socket进行系统IO调用读取数据,假设socket中的数据只读了一部分没有全部读完,这时再次调用epoll_waitepoll_wait会检查这些Socket中的接收缓冲区是否还有数据可读,如果还有数据可读,就将socket重新放回rdllist。所以当socket上的IO没有被处理完时,再次调用epoll_wait依然可以获得这些socket,用户进程可以接着处理socket上的IO事件。

  • 边缘触发: 在这种模式下,epoll_wait就会直接清空rdllist,不管socket上是否还有数据可读。所以在边缘触发模式下,当你没有来得及处理socket接收缓冲区的剩下可读数据时,再次调用epoll_wait,因为这时rdlist已经被清空了,socket不会再次从epoll_wait中返回,所以用户进程就不会再次获得这个socket了,也就无法在对它进行IO处理了。除非,这个socket上有新的IO数据到达,根据epoll的工作过程,该socket会被再次放入rdllist中。

  • 如果你在边缘触发模式下,处理了部分socket上的数据,那么想要处理剩下部分的数据,就只能等到这个socket上再次有网络数据到达。

    Netty中实现的EpollSocketChannel默认的就是边缘触发模式。JDKNIO默认是水平触发模式。

    配合线程池,再加上 CPU、内存和网络接口的性能和容量提升。大部分情况下,C100K很自然就可以达到。

    甚至C1000K的解决方法,本质上还是构建在 epoll多路复用 I/O 模型上。只不过,除了 I/O 模型之外,还需要从应用程序到 Linux 内核、再到 CPU、内存和网络等各个层次的深度优化,特别是需要借助硬件,来卸载那些原来通过软件处理的大量功能(去掉大量的中断响应开销以及内核协议栈处理的开销)。

    信号驱动IO.png

    大家对这个装备肯定不会陌生,当我们去一些美食城吃饭的时候,点完餐付了钱,老板会给我们一个信号器。然后我们带着这个信号器可以去找餐桌,或者干些其他的事情。当信号器亮了的时候,这时代表饭餐已经做好,我们可以去窗口取餐了。

    这个典型的生活场景和我们要介绍的信号驱动IO模型就很像。

    信号驱动IO模型下,用户进程操作通过系统调用 sigaction 函数发起一个 IO 请求,在对应的socket注册一个信号回调,此时不阻塞用户进程,进程会继续工作。当内核数据就绪时,内核就为该进程生成一个 SIGIO 信号,通过信号回调通知进程进行相关 IO 操作。

    这里需要注意的是:信号驱动式 IO 模型依然是同步IO,因为它虽然可以在等待数据的时候不被阻塞,也不会频繁的轮询,但是当数据就绪,内核信号通知后,用户进程依然要自己去读取数据,在数据拷贝阶段发生阻塞。

    信号驱动 IO模型 相比于前三种 IO 模型,实现了在等待数据就绪时,进程不被阻塞,主循环可以继续工作,所以理论上性能更佳。

    但是实际上,使用TCP协议通信时,信号驱动IO模型几乎不会被采用。原因如下:

  • 信号IO 在大量 IO 操作时可能会因为信号队列溢出导致没法通知
  • SIGIO 信号是一种 Unix 信号,信号没有附加信息,如果一个信号源有多种产生信号的原因,信号接收者就无法确定究竟发生了什么。而 TCP socket 生产的信号事件有七种之多,这样应用程序收到 SIGIO,根本无从区分处理。
  • 信号驱动IO模型可以用在 UDP通信上,因为UDP 只有一个数据请求事件,这也就意味着在正常情况下 UDP 进程只要捕获 SIGIO 信号,就调用 read 系统调用读取到达的数据。如果出现异常,就返回一个异常错误。


    这里插句题外话,大家觉不觉得阻塞IO模型在生活中的例子就像是我们在食堂排队打饭。你自己需要排队去打饭同时打饭师傅在配菜的过程中你需要等待。

    阻塞IO.png

    IO多路复用模型就像是我们在饭店门口排队等待叫号。叫号器就好比select,poll,epoll可以统一管理全部顾客的吃饭就绪事件,客户好比是socket连接,谁可以去吃饭了,叫号器就通知谁。

    IO多路复用.png

    ##异步IO(AIO)

    以上介绍的四种IO模型均为同步IO,它们都会阻塞在第二阶段数据拷贝阶段

    通过在前边小节《同步与异步》中的介绍,相信大家很容易就会理解异步IO模型,在异步IO模型下,IO操作在数据准备阶段数据拷贝阶段均是由内核来完成,不会对应用程序造成任何阻塞。应用进程只需要在指定的数组中引用数据即可。

    异步 IO信号驱动 IO 的主要区别在于:信号驱动 IO 由内核通知何时可以开始一个 IO 操作,而异步 IO由内核通知 IO 操作何时已经完成

    举个生活中的例子:异步IO模型就像我们去一个高档饭店里的包间吃饭,我们只需要坐在包间里面,点完餐(类比异步IO调用)之后,我们就什么也不需要管,该喝酒喝酒,该聊天聊天,饭餐做好后服务员(类比内核)会自己给我们送到包间(类比用户空间)来。整个过程没有任何阻塞。

    异步IO.png

    异步IO的系统调用需要操作系统内核来支持,目前只有Window中的IOCP实现了非常成熟的异步IO机制

    Linux系统对异步IO机制实现的不够成熟,且与NIO的性能相比提升也不明显。

    但Linux kernel 在5.1版本由Facebook的大神Jens Axboe引入了新的异步IO库io_uring 改善了原来Linux native AIO的一些性能问题。性能相比Epoll以及之前原生的AIO提高了不少,值得关注。

    再加上信号驱动IO模型不适用TCP协议,所以目前大部分采用的还是IO多路复用模型

    在前边内容的介绍中,我们详述了网络数据包的接收和发送过程,并通过介绍5种IO模型了解了内核是如何读取网络数据并通知给用户线程的。

    前边的内容都是以内核空间的视角来剖析网络数据的收发模型,本小节我们站在用户空间的视角来看下如果对网络数据进行收发。

    相对内核来讲,用户空间的IO线程模型相对就简单一些。这些用户空间IO线程模型都是在讨论当多线程一起配合工作时谁负责接收连接,谁负责响应IO 读写、谁负责计算、谁负责发送和接收,仅仅是用户IO线程的不同分工模式罢了。

    Reactor是利用NIOIO线程进行不同的分工:

  • 使用前边我们提到的IO多路复用模型比如select,poll,epoll,kqueue,进行IO事件的注册和监听。
  • 将监听到就绪的IO事件分发dispatch到各个具体的处理Handler中进行相应的IO事件处理
  • 通过IO多路复用技术就可以不断的监听IO事件,不断的分发dispatch,就像一个反应堆一样,看起来像不断的产生IO事件,因此我们称这种模式为Reactor模型。

    下面我们来看下Reactor模型的三种分类:

    Proactor是基于AIOIO线程进行分工的一种模型。前边我们介绍了异步IO模型,它是操作系统内核支持的一种全异步编程模型,在数据准备阶段数据拷贝阶段全程无阻塞。

    ProactorIO线程模型IO事件的监听IO操作的执行IO结果的dispatch统统交给内核来做。

    proactor.png

    Proactor模型组件介绍:

  • completion handler 为用户程序定义的异步IO操作回调函数,在异步IO操作完成时会被内核回调并通知IO结果。

  • Completion Event Queue 异步IO操作完成后,会产生对应的IO完成事件,将IO完成事件放入该队列中。

  • Asynchronous Operation Processor 负责异步IO的执行。执行完成后产生IO完成事件放入Completion Event Queue 队列中。

  • Proactor 是一个事件循环派发器,负责从Completion Event Queue中获取IO完成事件,并回调与IO完成事件关联的completion handler

  • Initiator 初始化异步操作(asynchronous operation)并通过Asynchronous Operation Processorcompletion handlerproactor注册到内核。

  • Proactor模型执行过程:

  • 用户线程发起aio_read,并告诉内核用户空间中的读缓冲区地址,以便内核完成IO操作将结果放入用户空间的读缓冲区,用户线程直接可以读取结果(无任何阻塞)。

  • Initiator 初始化aio_read异步读取操作(asynchronous operation),并将completion handler注册到内核。

  • Proactor中我们关心的IO完成事件:内核已经帮我们读好数据并放入我们指定的读缓冲区,用户线程可以直接读取。在Reactor中我们关心的是IO就绪事件:数据已经到来,但是需要用户线程自己去读取。

  • 此时用户线程就可以做其他事情了,无需等待IO结果。而内核与此同时开始异步执行IO操作。当IO操作完成时会产生一个completion event事件,将这个IO完成事件放入completion event queue中。

  • Proactorcompletion event queue中取出completion event,并回调与IO完成事件关联的completion handler

  • completion handler中完成业务逻辑处理。

  • Reactor是基于NIO实现的一种IO线程模型Proactor是基于AIO实现的IO线程模型

  • Reactor关心的是IO就绪事件Proactor关心的是IO完成事件

  • Proactor中,用户程序需要向内核传递用户空间的读缓冲区地址Reactor则不需要。这也就导致了在Proactor中每个并发操作都要求有独立的缓存区,在内存上有一定的开销。

  • Proactor 的实现逻辑复杂,编码成本较 Reactor要高很多。

  • Proactor 在处理高耗时 IO时的性能要高于 Reactor,但对于低耗时 IO的执行效率提升并不明显

  • 在我们介绍完网络数据包在内核中的收发过程以及五种IO模型和两种IO线程模型后,现在我们来看下netty中的IO模型是什么样的。

    在我们介绍Reactor IO线程模型的时候提到有三种Reactor模型单Reactor单线程单Reactor多线程主从Reactor多线程

    这三种Reactor模型netty中都是支持的,但是我们常用的是主从Reactor多线程模型

    而我们之前介绍的三种Reactor只是一种模型,是一种设计思想。实际上各种网络框架在实现中并不是严格按照模型来实现的,会有一些小的不同,但大体设计思想上是一样的。

    下面我们来看下netty中的主从Reactor多线程模型是什么样子的?

    netty中的reactor.png
  • Reactornetty中是以group的形式出现的,netty中将Reactor分为两组,一组是MainReactorGroup也就是我们在编码中常常看到的EventLoopGroup bossGroup,另一组是SubReactorGroup也就是我们在编码中常常看到的EventLoopGroup workerGroup

  • MainReactorGroup中通常只有一个Reactor,专门负责做最重要的事情,也就是监听连接accept事件。当有连接事件产生时,在对应的处理handler acceptor中创建初始化相应的NiosocketChannel(代表一个Socket连接)。然后以负载均衡的方式在SubReactorGroup中选取一个Reactor,注册上去,监听Read事件

  • MainReactorGroup中只有一个Reactor的原因是,通常我们服务端程序只会绑定监听一个端口,如果要绑定监听多个端口,就会配置多个Reactor

  • SubReactorGroup中有多个Reactor,具体Reactor的个数可以由系统参数 -D io.netty.eventLoopThreads指定。默认的Reactor的个数为CPU核数 * 2SubReactorGroup中的Reactor主要负责监听读写事件,每一个Reactor负责监听一组socket连接。将全量的连接分摊在多个Reactor中。

  • 一个Reactor分配一个IO线程,这个IO线程负责从Reactor中获取IO就绪事件,执行IO调用获取IO数据,执行PipeLine

  • Socket连接在创建后就被固定的分配给一个Reactor,所以一个Socket连接也只会被一个固定的IO线程执行,每个Socket连接分配一个独立的PipeLine实例,用来编排这个Socket连接上的IO处理逻辑。这种无锁串行化的设计的目的是为了防止多线程并发执行同一个socket连接上的IO逻辑处理,防止出现线程安全问题。同时使系统吞吐量达到最大化

    由于每个Reactor中只有一个IO线程,这个IO线程既要执行IO活跃Socket连接对应的PipeLine中的ChannelHandler,又要从Reactor中获取IO就绪事件,执行IO调用。所以PipeLineChannelHandler中执行的逻辑不能耗时太长,尽量将耗时的业务逻辑处理放入单独的业务线程池中处理,否则会影响其他连接的IO读写,从而近一步影响整个服务程序的IO吞吐

  • IO请求在业务线程中完成相应的业务逻辑处理后,在业务线程中利用持有的ChannelHandlerContext引用将响应数据在PipeLine中反向传播,最终写回给客户端。
  • netty中的IO模型我们介绍完了,下面我们来简单介绍下在netty中是如何支持前边提到的三种Reactor模型的。

    本文是一篇信息量比较大的文章,用了25张图,22336个字从内核如何处理网络数据包的收发过程开始展开,随后又在内核角度介绍了经常容易混淆的阻塞与非阻塞同步与异步的概念。以这个作为铺垫,我们通过一个C10K的问题,引出了五种IO模型,随后在IO多路复用中以技术演进的形式介绍了select,poll,epoll的原理和它们综合的对比。最后我们介绍了两种IO线程模型以及netty中的Reactor模型


    - EOF -

    推荐阅读  点击标题可跳转

    18张图,图解SpringBoot解析yml全流程

    起飞,会了这4个 Intellij IDEA 调试魔法,阅读源码都简单了!

    还在用策略模式解决 if-else?Map+函数式接口方法才是YYDS!



    看完本文有收获?请转发分享给更多人

    关注「ImportNew」,提升Java技能

    点赞和在看就是最大的支持❤️

    聊聊Netty那些事儿之Reactor在Netty中的实现(创建篇)

    聊聊Netty那些事儿之Reactor在Netty中的实现-创建篇


    本系列Netty源码解析文章基于 4.1.56.Final版本

    在上篇文章《聊聊Netty那些事儿之从内核角度看IO模型》中我们花了大量的篇幅来从内核角度详细讲述了五种IO模型的演进过程以及ReactorIO线程模型的底层基石IO多路复用技术在内核中的实现原理。

    最后我们引出了netty中使用的主从Reactor IO线程模型。

    通过上篇文章的介绍,我们已经清楚了在IO调用的过程中内核帮我们搞了哪些事情,那么俗话说的好内核领进门,修行在netty,netty在用户空间又帮我们搞了哪些事情?

    那么从本文开始,笔者将从源码角度来带大家看下上图中的Reactor IO线程模型在Netty中是如何实现的。

    本文作为Reactor在Netty中实现系列文章中的开篇文章,笔者先来为大家介绍Reactor的骨架是如何创建出来的。

    在上篇文章中我们提到Netty采用的是主从Reactor多线程的模型,但是它在实现上又与Doug Lea在Scalable IO in Java论文中提到的经典主从Reactor多线程模型有所差异。

    经典主从Reactor多线程模型

    Netty中的Reactor是以Group的形式出现的,主从Reactor在Netty中就是主从Reactor组,每个Reactor Group中会有多个Reactor用来执行具体的IO任务。当然在netty中Reactor不只用来执行IO任务,这个我们后面再说。

    • Main Reactor Group中的Reactor数量取决于服务端要监听的端口个数,通常我们的服务端程序只会监听一个端口,所以Main Reactor Group只会有一个Main Reactor线程来处理最重要的事情:绑定端口地址接收客户端连接为客户端创建对应的SocketChannel将客户端SocketChannel分配给一个固定的Sub Reactor
    • Sub Reactor Group里有多个Reactor线程,Reactor线程的个数可以通过系统参数-D io.netty.eventLoopThreads指定。默认的Reactor的个数为CPU核数 * 2Sub Reactor线程主要用来轮询客户端SocketChannel上的IO就绪事件处理IO就绪事件执行异步任务

    一个客户端SocketChannel只能分配给一个固定的Sub Reactor。一个Sub Reactor负责处理多个客户端SocketChannel,这样可以将服务端承载的全量客户端连接分摊到多个Sub Reactor中处理,同时也能保证客户端SocketChannel上的IO处理的线程安全性

    由于文章篇幅的关系,作为Reactor在netty中实现的第一篇我们主要来介绍主从Reactor Group的创建流程,骨架脉络先搭好。

    下面我们来看一段Netty服务端代码的编写模板,从代码模板的流程中我们来解析下主从Reactor的创建流程以及在这个过程中所涉及到的Netty核心类。

    Netty服务端代码模板

    /**
     * Echoes back any received data from a client.
     */
    public final class EchoServer 
        static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
    
        public static void main(String[] args) throws Exception 
            // Configure the server.
            //创建主从Reactor线程组
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            final EchoServerHandler serverHandler = new EchoServerHandler();
            try 
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)//配置主从Reactor
                 .channel(NioServerSocketChannel.class)//配置主Reactor中的channel类型
                 .option(ChannelOption.SO_BACKLOG, 100)//设置主Reactor中channel的option选项
                 .handler(new LoggingHandler(LogLevel.INFO))//设置主Reactor中Channel->pipline->handler
                 .childHandler(new ChannelInitializer<SocketChannel>() //设置从Reactor中注册channel的pipeline
                     @Override
                     public void initChannel(SocketChannel ch) throws Exception 
                         ChannelPipeline p = ch.pipeline();
                         //p.addLast(new LoggingHandler(LogLevel.INFO));
                         p.addLast(serverHandler);
                     
                 );
    
                // Start the server. 绑定端口启动服务,开始监听accept事件
                ChannelFuture f = b.bind(PORT).sync();
                // Wait until the server socket is closed.
                f.channel().closeFuture().sync();
             finally 
                // Shut down all event loops to terminate all threads.
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            
        
    
    
    1. 首先我们要创建Netty最核心的部分 -> 创建主从Reactor Group,在Netty中EventLoopGroup就是Reactor Group的实现类。对应的EventLoop就是Reactor的实现类。
      //创建主从Reactor线程组
      EventLoopGroup bossGroup = new NioEventLoopGroup(1);
      EventLoopGroup workerGroup = new NioEventLoopGroup();
    
    1. 创建用于IO处理ChannelHandler,实现相应IO事件的回调函数,编写对应的IO处理逻辑。注意这里只是简单示例哈,详细的IO事件处理,笔者会单独开一篇文章专门讲述。
    final EchoServerHandler serverHandler = new EchoServerHandler();
    
    /**
     * Handler implementation for the echo server.
     */
    @Sharable
    public class EchoServerHandler extends ChannelInboundHandlerAdapter 
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) 
            ................省略IO处理逻辑................
            ctx.write(msg);
        
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) 
            
            ctx.flush();
        
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
            // Close the connection when an exception is raised.
            cause.printStackTrace();
            ctx.close();
        
    
    
    1. 创建ServerBootstrapNetty服务端启动类,并在启动类中配置启动Netty服务端所需要的一些必备信息。

      在上篇文章介绍Socket内核结构小节中我们提到,在编写服务端网络程序时,我们首先要创建一个Socket用于listen和bind端口地址,我们把这个叫做监听Socket,这里对应的就是NioServerSocketChannel.class。当客户端连接完成三次握手,系统调用accept函数会基于监听Socket创建出来一个新的Socket专门用于与客户端之间的网络通信我们称为客户端连接Socket,这里对应的就是NioSocketChannel.class

      netty有两种Channel类型:一种是服务端用于监听绑定端口地址的NioServerSocketChannel,一种是用于客户端通信的NioSocketChannel。每种Channel类型实例都会对应一个PipeLine用于编排对应channel实例上的IO事件处理逻辑。PipeLine中组织的就是ChannelHandler用于编写特定的IO处理逻辑。

      注意serverBootstrap.handler设置的是服务端NioServerSocketChannel PipeLine中的ChannelHandler

      ServerBootstrap启动类方法带有child前缀的均是设置客户端NioSocketChannel属性的。

      ChannelInitializer是用于当SocketChannel成功注册到绑定的Reactor上后,用于初始化该SocketChannelPipeline。它的initChannel方法会在注册成功后执行。这里只是捎带提一下,让大家有个初步印象,后面我会专门介绍。

      • serverBootstrap.childHandler(ChannelHandler childHandler)用于设置客户端NioSocketChannel中对应Pipieline中的ChannelHandler。我们通常配置的编码解码器就是在这里。
      • serverBootstrap.option(ChannelOption.SO_BACKLOG, 100)设置服务端ServerSocketChannel中的SocketOption。关于SocketOption的选项我们后边的文章再聊,本文主要聚焦在Netty Main Reactor Group的创建及工作流程。
      • serverBootstrap.handler(....)设置服务端NioServerSocketChannel中对应Pipieline中的ChannelHandler
      • 通过serverBootstrap.group(bossGroup, workerGroup)为Netty服务端配置主从Reactor Group实例。
      • 通过serverBootstrap.channel(NioServerSocketChannel.class)配置Netty服务端的ServerSocketChannel用于绑定端口地址以及创建客户端SocketChannel。Netty中的NioServerSocketChannel.class就是对JDK NIO中ServerSocketChannel的封装。而用于表示客户端连接NioSocketChannel是对JDK NIO SocketChannel封装。
    2. ChannelFuture f = serverBootstrap.bind(PORT).sync()这一步会是下篇文章要重点分析的主题Main Reactor Group的启动,绑定端口地址,开始监听客户端连接事件(OP_ACCEPT)。本文我们只关注创建流程。

    3. f.channel().closeFuture().sync()等待服务端NioServerSocketChannel关闭。Netty服务端到这里正式启动,并准备好接受客户端连接的准备。

    4. shutdownGracefully优雅关闭主从Reactor线程组里的所有Reactor线程

    Netty对IO模型的支持

    在上篇文章中我们介绍了五种IO模型,Netty中支持BIO,NIO,AIO以及多种操作系统下的IO多路复用技术实现。

    在Netty中切换这几种IO模型也是非常的方便,下面我们来看下Netty如何对这几种IO模型进行支持的。

    首先我们介绍下几个与IO模型相关的重要接口:

    EventLoop

    EventLoop就是Netty中的Reactor,可以说它就是Netty的引擎,负责Channel上IO就绪事件的监听IO就绪事件的处理异步任务的执行驱动着整个Netty的运转。

    不同IO模型下,EventLoop有着不同的实现,我们只需要切换不同的实现类就可以完成对NettyIO模型的切换。

    BIONIOAIO
    ThreadPerChannelEventLoopNioEventLoopAioEventLoop

    NIO模型下Netty会自动根据操作系统以及版本的不同选择对应的IO多路复用技术实现。比如Linux 2.6版本以上用的是Epoll,2.6版本以下用的是Poll,Mac下采用的是Kqueue

    其中Linux kernel 在5.1版本引入的异步IO库io_uring正在netty中孵化。

    EventLoopGroup

    Netty中的Reactor是以Group的形式出现的,EventLoopGroup正是Reactor组的接口定义,负责管理Reactor,Netty中的Channel就是通过EventLoopGroup注册到具体的Reactor上的。

    Netty的IO线程模型是主从Reactor多线程模型主从Reactor线程组在Netty源码中对应的其实就是两个EventLoopGroup实例。

    不同的IO模型也有对应的实现:

    BIONIOAIO
    ThreadPerChannelEventLoopGroupNioEventLoopGroupAioEventLoopGroup

    ServerSocketChannel

    用于Netty服务端使用的ServerSocketChannel,对应于上篇文章提到的监听Socket,负责绑定监听端口地址,接收客户端连接并创建用于与客户端通信的SocketChannel

    不同的IO模型下的实现:

    BIONIOAIO
    OioServerSocketChannelNioServerSocketChannelAioServerSocketChannel

    SocketChannel

    用于与客户端通信的SocketChannel,对应于上篇文章提到的客户端连接Socket,当客户端完成三次握手后,由系统调用accept函数根据监听Socket创建。

    不同的IO模型下的实现:

    BIONIOAIO
    OioSocketChannelNioSocketChannelAioSocketChannel

    我们看到在不同IO模型的实现中,Netty这些围绕IO模型的核心类只是前缀的不同:

    • BIO对应的前缀为Oio表示old io,现在已经废弃不推荐使用。
    • NIO对应的前缀为Nio,正是Netty推荐也是我们常用的非阻塞IO模型
    • AIO对应的前缀为Aio,由于Linux下的异步IO机制实现的并不成熟,性能提升表现上也不明显,现已被删除。

    我们只需要将IO模型的这些核心接口对应的实现类前缀改为对应IO模型的前缀,就可以轻松在Netty中完成对IO模型的切换。

    多种NIO的实现

    CommonLinuxMac
    NioEventLoopGroupEpollEventLoopGroupKQueueEventLoopGroup
    NioEventLoopEpollEventLoopKQueueEventLoop
    NioServerSocketChannelEpollServerSocketChannelKQueueServerSocketChannel
    NioSocketChannelEpollSocketChannelKQueueSocketChannel

    我们通常在使用NIO模型的时候会使用Common列下的这些IO模型核心类,Common类也会根据操作系统的不同自动选择JDK在对应平台下的IO多路复用技术的实现。

    而Netty自身也根据操作系统的不同提供了自己对IO多路复用技术的实现,比JDK的实现性能更优。比如:

    • JDK的 NIO 默认实现是水平触发,Netty 是边缘触发(默认)和水平触发可切换。。
    • Netty 实现的垃圾回收更少、性能更好。

    我们编写Netty服务端程序的时候也可以根据操作系统的不同,采用Netty自身的实现来进一步优化程序。做法也很简单,直接将上图中红框里的实现类替换成Netty的自身实现类即可完成切换。


    经过以上对Netty服务端代码编写模板以及IO模型相关核心类的简单介绍,我们对Netty的创建流程有了一个简单粗略的总体认识,下面我们来深入剖析下创建流程过程中的每一个步骤以及这个过程中涉及到的核心类实现。

    以下源码解析部分我们均采用Common列NIO相关的实现进行解析。

    创建主从Reactor线程组

    在Netty服务端程序编写模板的开始,我们首先会创建两个Reactor线程组:

    • 一个是主Reactor线程组bossGroup用于监听客户端连接,创建客户端连接NioSocketChannel,并将创建好的客户端连接NioSocketChannel注册到从Reactor线程组中一个固定的Reactor上。
    • 一个是从Reactor线程组workerGroupworkerGroup中的Reactor负责监听绑定在其上的客户端连接NioSocketChannel上的IO就绪事件,并处理IO就绪事件执行异步任务
      //创建主从Reactor线程组
      EventLoopGroup bossGroup = new NioEventLoopGroup(1);
      EventLoopGroup workerGroup = new NioEventLoopGroup();
    

    Netty中Reactor线程组的实现类为NioEventLoopGroup,在创建bossGroupworkerGroup的时候用到了NioEventLoopGroup的两个构造函数:

    • nThreads参数的构造函数public NioEventLoopGroup(int nThreads)
    • 不带nThreads参数的默认构造函数public NioEventLoopGroup()
    public class NioEventLoopGroup extends MultithreadEventLoopGroup 
    
        /**
         * Create a new instance using the default number of threads, the default @link ThreadFactory and
         * the @link SelectorProvider which is returned by @link SelectorProvider#provider().
         */
        public NioEventLoopGroup() 
            this(0);
        
    
        /**
         * Create a new instance using the specified number of threads, @link ThreadFactory and the
         * @link SelectorProvider which is returned by @link SelectorProvider#provider().
         */
        public NioEventLoopGroup(int nThreads) 
            this(nThreads, (Executor) null);
        
    
        ......................省略...........................
    
    

    nThreads参数表示当前要创建的Reactor线程组内包含多少个Reactor线程。不指定nThreads参数的话采用默认的Reactor线程个数,用0表示。

    最终会调用到构造函数

        public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                                 final SelectStrategyFactory selectStrategyFactory) 
            super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
        
    

    下面简单介绍下构造函数中这几个参数的作用,后面我们在讲解本文主线的过程中还会提及这几个参数,到时在详细介绍,这里只是让大家有个初步印象,不必做过多的纠缠。

    • Executor executor:负责启动Reactor线程进而Reactor才可以开始工作。

    Reactor线程组NioEventLoopGroup负责创建Reactor线程,在创建的时候会将executor传入。

    • RejectedExecutionHandler: 当向Reactor添加异步任务添加失败时,采用的拒绝策略。Reactor的任务不只是监听IO活跃事件和IO任务的处理,还包括对异步任务的处理。这里大家只需有个这样的概念,后面笔者会专门详细介绍。
    • SelectorProvider selectorProvider: Reactor中的IO模型为IO多路复用模型,对应于JDK NIO中的实现为java.nio.channels.Selector(就是我们上篇文章中提到的select,poll,epoll),每个Reator中都包含一个Selector,用于轮询注册在该Reactor上的所有Channel上的IO事件SelectorProvider就是用来创建Selector的。
    • SelectStrategyFactory selectStrategyFactory: Reactor最重要的事情就是轮询注册其上的Channel上的IO就绪事件,这里的SelectStrategyFactory用于指定轮询策略,默认为DefaultSelectStrategyFactory.INSTANCE

    Executor是执行器的意思,即负责执行传入的task任务,具体如何执行该任务,则会有不同的实现,可以是同步执行,或者异步单线程执行,或者异步线程池执行,再或者是异步定时执,等等…

    最终会将这些参数交给NioEventLoopGroup的父类构造器,下面我们来看下NioEventLoopGroup类的继承结构:

    NioEventLoopGroup类的继承结构乍一看比较复杂,大家不要慌,笔者会随着主线的深入慢慢地介绍这些父类接口,我们现在重点关注Mutithread前缀的类。

    我们知道NioEventLoopGroup是Netty中的Reactor线程组的实现,既然是线程组那么肯定是负责管理和创建多个Reactor线程的,所以Mutithread前缀的类定义的行为自然是对Reactor线程组内多个Reactor线程的创建和管理工作。

    MultithreadEventLoopGroup

    public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup 
    
        private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);
        //默认Reactor个数
        private static final int DEFAULT_EVENT_LOOP_THREADS;
    
        static 
            DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                    "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
    
            if (logger.isDebugEnabled()) 
                logger.debug("-Dio.netty.eventLoopThreads: ", DEFAULT_EVENT_LOOP_THREADS);
            
        
    
        /**
         * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
         */
        protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) 
            super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
        
    
        ...................省略.....................
    
    

    MultithreadEventLoopGroup类主要的功能就是用来确定Reactor线程组Reactor的个数。

    默认的Reactor的个数存放于字段DEFAULT_EVENT_LOOP_THREADS中。

    static 静态代码块中我们可以看出默认Reactor的个数的获取逻辑:

    • 可以通过系统变量 -D io.netty.eventLoopThreads"指定。
    • 如果不指定,那么默认的就是NettyRuntime.availableProcessors() * 2

    nThread参数设置为0采用默认设置时,Reactor线程组内的Reactor个数则设置为DEFAULT_EVENT_LOOP_THREADS

    MultithreadEventExecutorGroup

    MultithreadEventExecutorGroup这里就是本小节的核心,主要用来定义创建和管理Reactor的行为。

    public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup 
    
        //Reactor线程组中的Reactor集合
        private final EventExecutor[] children;
        private final Set<EventExecutor> readonlyChildren;
        //从Reactor group中选择一个特定的Reactor的选择策略 用于channel注册绑定到一个固定的Reactor上
        private final EventExecutorChooserFactory.EventExecutorChooser chooser;
    
        /**
         * Create a new instance.
         *
         * @param nThreads          the number of threads that will be used by this instance.
         * @param executor          the Executor to use, or @code null if the default should be used.
         * @param args              arguments which will passed to each @link #newChild(Executor, Object...) call
         */
        protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) 
            this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
        
    
        ............................省略................................
    
    

    首先介绍一个新的构造器参数EventExecutorChooserFactory chooserFactory。当客户端连接完成三次握手后,Main Reactor会创建客户端连接NioSocketChannel,并将其绑定到Sub Reactor Group中的一个固定Reactor,那么具体要绑定到哪个具体的Sub Reactor上呢?这个绑定策略就是由chooserFactory来创建的。默认为DefaultEventExecutorChooserFactory

    下面就是本小节的主题Reactor线程组的创建过程:

        protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                                EventExecutorChooserFactory chooserFactory, Object... args) 
            if (nThreads <= 0) 
                throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
            
    
            if (executor == null) 
                //用于创建Reactor线程
                executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
            
    
            children = new EventExecutor[nThreads];
            //循环创建reaactor group中的Reactor
            for (int i = 0; i < nThreads; i ++) 
                boolean success = false;
                try 
                    //创建reactor
                    children[i] = newChild(executor, args);
                    success = true;
                 catch (Exception e) 
                    throw new IllegalStateException("failed to create a child event loop", e);
                 finally 
                         ................省略................
                    
                
            
            //创建channel到Reactor的绑定策略
            chooser = chooserFactory.newChooser(children);
    
             ................省略................
    
            Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
            Collections.addAll(childrenSet, children);
            readonlyChildren = Collections.unmodifiableSet(childrenSet);
        
    

    1. 创建用于启动Reactor线程的executor

    在Netty Reactor Group中的单个ReactorIO线程模型为上篇文章提到的单Reactor单线程模型,一个Reactor线程负责轮询注册其上的所有Channel中的IO就绪事件,处理IO事件,执行Netty中的异步任务等工作。正是这个Reactor线程驱动着整个Netty的运转,可谓是Netty的核心引擎。

    而这里的executor就是负责启动Reactor线程的,从创建源码中我们可以看到executor的类型为ThreadPerTaskExecutor

    ThreadPerTaskExecutor

    public final class ThreadPerTaskExecutor implements Executor 
        private final ThreadFactory threadFactory;
    
        public ThreadPerTaskExecutor(ThreadFactory threadFactory) 
            this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
        
    
        @Override
        public void execute(Runnable command) 
            threadFactory.newThread(command).start();
        
    
    

    我们看到ThreadPerTaskExecutor做的事情很简单,从它的命名前缀ThreadPerTask我们就可以猜出它的工作方式,就是来一个任务就创建一个线程执行。而创建的这个线程正是netty的核心引擎Reactor线程。

    Reactor线程启动的时候,Netty会将Reactor线程要做的事情封装成Runnable,丢给exexutor启动。

    Reactor线程的核心就是一个死循环不停的轮询IO就绪事件,处理IO事件,执行异步任务。一刻也不停歇,堪称996典范

    这里向大家先卖个关子,"Reactor线程是何时启动的呢??"

    2. 创建Reactor

    Reactor线程组NioEventLoopGroup包含多个Reactor,存放于private final EventExecutor[] children数组中。

    所以下面的事情就是创建nThreadReactor,并存放于EventExecutor[] children字段中,

    我们来看下用于创建ReactornewChild(executor, args)方法:

    newChild

    newChild方法是MultithreadEventExecutorGroup中的一个抽象方法,提供给具体子类实现。

    protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
    

    这里我们解析的是NioEventLoopGroup,我们来看下newChild在该类中的实现:

    public class NioEventLoopGroup extends MultithreadEventLoopGroup 
        @Override
        protected EventLoop newChild(Executor executor, Object... args) throws Exception 
            EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
            return new NioEventLoop(this, executor, (SelectorProvider) args[0],
                ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
        
    
    

    前边提到的众多构造器参数,这里会通过可变参数Object... args传入到Reactor类NioEventLoop的构造器中。

    这里介绍下新的参数EventLoopTaskQueueFactory queueFactory,前边提到Netty中的Reactor主要工作是轮询注册其上的所有Channel上的IO就绪事件,处理IO就绪事件。除了这些主要的工作外,Netty为了极致的压榨Reactor的性能,还会让它做一些异步任务的执行工作。既然要执行异步任务,那么Reactor中就需要一个队列来保存任务。

    这里的EventLoopTaskQueueFactory就是用来创建这样的一个队列来保存Reactor中待执行的异步任务。

    可以把Reactor理解成为一个单线程的线程池类似JDK中的SingleThreadExecutor,仅用一个线程来执行轮询IO就绪事件处理IO就绪事件执行异步任务。同时待执行的异步任务保存在Reactor里的taskQueue中。

    NioEventLoop

    public final class NioEventLoop extends SingleThreadEventLoop 
        //用于创建JDK NIO Selector,ServerSocketChannel
        private final SelectorProvider provider;
        //Selector轮询策略 决定什么时候轮询,什么时候处理IO事件,什么时候执行异步任务
        private final SelectStrategy selectStrategy;
        /**
         * The NIO @link Selector.
         */
        private Selector selector;
        private Selector unwrappedSelector;
    
        NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                     SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                     EventLoopTaskQueueFactory queueFactory) 
            super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
                    rejectedExecutionHandler);
            this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
            this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
            final SelectorTuple selectorTuple = openSelector();
            this.selector = selectorTuple.selector;
            this.unwrappedSelector = selectorTuple.unwrappedSelector;
        
    
    

    这里就正式开始了Reactor的创建过程,我们知道Reactor的核心是采用的IO多路复用模型来对客户端连接上的IO事件进行监听,所以最重要的事情是创建Selector(JDK NIO 中IO多路复用技术的实现)。

    可以把Selector理解为我们上篇文章介绍的Select,poll,epoll,它是JDK NIO对操作系统内核提供的这些IO多路复用技术的封装。

    openSelector

    openSelectorNioEventLoop类中用于创建IO多路复用Selector,并对创建出来的JDK NIO 原生的Selector进行性能优化。

    首先会通过SelectorProvider#openSelector创建JDK NIO原生的Selector

     private SelectorTuple openSelector() 
            final Selector unwrappedSelector;
            try 
                //通过JDK NIO SelectorProvider创建Selector
                unwrappedSelector = provider.openSelector();
             catch (IOException e) 
                throw new ChannelException("failed to open a new selector", e);
            
    
            ..................省略.............
    
    

    SelectorProvider会根据操作系统的不同选择JDK在不同操作系统版本下的对应Selector的实现。Linux下会选择Epoll,Mac下会选择Kqueue

    下面我们就来看下SelectorProvider是如何做到自动适配不同操作系统下IO多路复用实现的

    SelectorProvider

        public NioEventLoopGroup(ThreadFactory threadFactory) 
            this(0, threadFactory, SelectorProvider.provider());
        
    

    SelectorProvider是在前面介绍的NioEventLoopGroup类构造函数中通过调用SelectorProvider.provider()被加载,并通过NioEventLoopGroup#newChild方法中的可变长参数Object... args传递到NioEventLoop中的private final SelectorProvider provider字段中。

    SelectorProvider的加载过程:

    public abstract class SelectorProvider 
    
        public static SelectorProvider provider() 
            synchronized (lock) 
                if (provider != null)
                    return provider;
                return AccessController.doPrivileged(
                    new PrivilegedAction<SelectorProvider>() 
                        public SelectorProvider run() 
                                if (loadProviderFromProperty())
                                    return provider;
                                if (loadProviderAsService())
                                    return provider;
                                provider = sun.nio.ch.DefaultSelectorProvider.create();
                                return provider;
                            
                        );
            
        
    
    

    SelectorProvider加载源码中我们可以看出,SelectorProvider的加载方式有三种,优先级如下:

    1. 通过系统变量-D java.nio.channels.spi.SelectorProvider指定SelectorProvider的自定义实现类全限定名。通过应用程序类加载器(Application Classloader)加载。
        private static boolean loadProviderFromProperty() 
            String cn = System.getProperty("java.nio.channels.spi.SelectorProvider");
            if (cn == null)
                return false;
            try 
                Class<?> c = Class.forName(cn, true,
                                           ClassLoader.getSystemClassLoader());
                provider = (SelectorProvider)c.newInstance();
                return true;
             
            .................省略.............
        
    
    1. 通过SPI方式加载。在工程目录META-INF/services下定义名为java.nio.channels.spi.SelectorProviderSPI文件,文件中第一个定义的SelectorProvider实现类全限定名就会被加载。
        private static boolean loadProviderAsService() 
    
            ServiceLoader<SelectorProvider> sl =
                ServiceLoader.load(SelectorProvider.class,
                                   ClassLoader.getSystemClassLoader());
            Iterator<SelectorProvider> i = sl.iterator();
            for (;;) 
                try 
                    if (!i.hasNext())
                        return false;
                    provider = i.next();
                    return true;
                 catch (ServiceConfigurationError sce) 
                    if (sce.getCause() instanceof SecurityException) 
                        // Ignore the security exception, try the next provider
                        continue;
                    
                    throw sce;
                
            
        
    
    1. 如果以上两种方式均未被定义,那么就采用SelectorProvider系统默认实现sun.nio.ch.DefaultSelectorProvider。笔者当前使用的操作系统是MacOS,从源码中我们可以看到自动适配了KQueue实现。
    public class DefaultSelectorProvider 
        private DefaultSelectorProvider() 
        
    
        public static SelectorProvider create() 
            return new KQueueSelectorProvider();
        
    
    

    不同操作系统中JDK对于DefaultSelectorProvider会有所不同,Linux内核版本2.6以上对应的Epoll,Linux内核版本2.6以下对应的Poll,MacOS对应的是KQueue

    下面我们接着回到io.netty.channel.nio.NioEventLoop#openSelector的主线上来。

    Netty对JDK NIO 原生Selector的优化

    首先在NioEventLoop中有一个Selector优化开关DISABLE_KEY_SET_OPTIMIZATION,通过系统变量-D io.netty.noKeySetOptimization指定,默认是开启的,表示需要对JDK NIO原生Selector进行优化。

    public final class NioEventLoop extends SingleThreadEventLoop 
       //Selector优化开关 默认开启 为了遍历的效率 会对Selector中的SelectedKeys进行数据结构优化
        private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
                SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
    
    

    如果优化开关DISABLE_KEY_SET_OPTIMIZATION是关闭的,那么直接返回JDK NIO原生的Selector

    private SelectorTuple openSelector() 
            ..........SelectorProvider创建JDK NIO  原生Selector..............
    
            if (DISABLE_KEY_SET_OPTIMIZATION) 
                //JDK NIO原生Selector ,Selector优化开关 默认开启需要对Selector进行优化
                return new SelectorTuple(unwrappedSelector);
            
    
    

    下面为Netty对JDK NIO原生的Selector的优化过程:

    1. 获取JDK NIO原生Selector的抽象实现类sun.nio.ch.SelectorImplJDK NIO原生Selector的实现均继承于该抽象类。用于判断由SelectorProvider创建出来的Selector是否为JDK默认实现SelectorProvider第三种加载方式)。因为SelectorProvider可以是自定义加载,所以它创建出来的Selector并不一定是JDK NIO 原生的。
           Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() 
                @Override
                public Object run() 
                    try 
                        return Class.forName(
                                "sun.nio.ch.SelectorImpl",
                                false,
                                PlatformDependent.getSystemClassLoader());
                     catch (Throwable cause) 
                        return cause;
                    
                
            );
    

    JDK NIO Selector的抽象类sun.nio.ch.SelectorImpl

    public abstract class SelectorImpl extends AbstractSelector 
    
        // The set of keys with data ready for an operation
        // //IO就绪的SelectionKey(里面包裹着channel)
        protected Set<SelectionKey> selectedKeys;
    
        // The set of keys registered with this Selector
        //注册在该Selector上的所有SelectionKey(里面包裹着channel)
        protected HashSet<SelectionKey> keys;
    
        // Public views of the key sets
        //用于向调用线程返回的keys,不可变
        private Set<SelectionKey> publicKeys;             // Immutable
        //当有IO就绪的SelectionKey时,向调用线程返回。只可删除其中元素,不可增加
        private Set<SelectionKey> publicSelectedKeys;     // Removal allowed, but not addition
    
        protected SelectorImpl(SelectorProvider sp) 
            super(sp);
            keys = new HashSet<SelectionKey>();
            selectedKeys = new HashSet<SelectionKey>();
            if (Util.atBugLevel("1.4")) 
                publicKeys = keys;
                publicSelectedKeys = selectedKeys;
             else 
                //不可变
                publicKeys = Collections.unmodifiableSet(keys);
                //只可删除其中元素,不可增加
                publicSelectedKeys = Util.ungrowableSet(selectedKeys);
            
        
    
    

    这里笔者来简单介绍下JDK NIO中的Selector中这几个字段的含义,我们可以和上篇文章讲到的epoll在内核中的结构做类比,方便大家后续的理解:

    • Set<SelectionKey> selectedKeys 类似于我们上篇文章讲解Epoll时提到的就绪队列eventpoll->rdllistSelector这里大家可以理解为EpollSelector会将自己监听到的IO就绪Channel放到selectedKeys中。

    这里的SelectionKey暂且可以理解为ChannelSelector中的表示,类比上图中epitem结构里的epoll_event,封装IO就绪Socket的信息。其实SelectionKey里包含的信息不止是Channel还有很多IO相关的信息。后面我们在详细介绍。

    • HashSet<SelectionKey> keys:这里存放的是所有注册到该Selector上的Channel。类比epoll中的红黑树结构rb_root

    SelectionKeyChannel注册到Selector中后生成。

    • Set<SelectionKey> publicSelectedKeys 相当于是selectedKeys的视图,用于向外部线程返回IO就绪SelectionKey。这个集合在外部线程中只能做删除操作不可增加元素,并且不是线程安全的
    • Set<SelectionKey> publicKeys相当于keys的不可变视图,用于向外部线程返回所有注册在该Selector上的SelectionKey

    这里需要重点关注抽象类sun.nio.ch.SelectorImpl中的selectedKeyspublicSelectedKeys这两个字段,注意它们的类型都是HashSet,一会优化的就是这里!!!!

    1. 判断由SelectorProvider创建出来的Selector是否是JDK NIO原生的Selector实现。因为Netty优化针对的是JDK NIO 原生Selector。判断标准为sun.nio.ch.SelectorImpl类是否为SelectorProvider创建出Selector的父类。如果不是则直接返回。不在继续下面的优化过程。
            //判断是否可以对Selector进行优化,这里主要针对JDK NIO原生Selector的实现类进行优化,因为SelectorProvider可以加载的是自定义Selector实现
            //如果SelectorProvider创建的Selector不是JDK原生sun.nio.ch.SelectorImpl的实现类,那么无法进行优化,直接返回
            if (!(maybeSelectorImplClass instanceof Class) ||
                !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) 
                if (maybeSelectorImplClass instanceof Throwable) 
                    Throwable t = (Throwable) maybeSelectorImplClass;
                    logger.trace("failed to instrument a special java.util.Set into: ", unwrappedSelector, t);
                
                return new SelectorTuple(unwrappedSelector);
            
    

    通过前面对SelectorProvider的介绍我们知道,这里通过provider.openSelector()创建出来的Selector实现类为KQueueSelectorImpl类,它继承实现了sun.nio.ch.SelectorImpl,所以它是JDK NIO 原生的Selector实现

    class KQueueSelectorImpl extends SelectorImpl 
    
    
    
    1. 创建SelectedSelectionKeySet通过反射替换掉sun.nio.ch.SelectorImpl类selectedKeyspublicSelectedKeys的默认HashSet实现。

    为什么要用SelectedSelectionKeySet替换掉原来的HashSet呢??

    因为这里涉及到对HashSet类型sun.nio.ch.SelectorImpl#selectedKeys集合的两种操作:

    • 插入操作: 通过前边对sun.nio.ch.SelectorImpl类中字段的介绍我们知道,在Selector监听到IO就绪SelectionKey后,会将IO就绪SelectionKey插入sun.nio.ch.SelectorImpl#selectedKeys集合中,这时Reactor线程会从java.nio.channels.Selector#select(long)阻塞调用中返回(类似上篇文章提到的epoll_wait)。
    • 遍历操作:Reactor线程返回后,会从Selector中获取IO就绪SelectionKey集合(也就是sun.nio.ch.SelectorImpl#selectedKeys),Reactor线程遍历selectedKeys,获取IO就绪SocketChannel,并处理SocketChannel上的IO事件

    我们都知道HashSet底层数据结构是一个哈希表,由于Hash冲突这种情况的存在,所以导致对哈希表进行插入遍历操作的性能不如对数组进行插入遍历操作的性能好。

    还有一个重要原因是,数组可以利用CPU缓存的优势来提高遍历的效率。后面笔者会有一篇专门的文章来讲述利用CPU缓存行如何为我们带来性能优势。

    所以Netty为了优化对sun.nio.ch.SelectorImpl#selectedKeys集合的插入,遍历性能,自己用数组这种数据结构实现了SelectedSelectionKeySet,用它来替换原来的HashSet实现。

    SelectedSelectionKeySet

    • 初始化SelectionKey[] keys数组大小为1024,当数组容量不够时,扩容为原来的两倍大小。
    • 通过数组尾部指针size,在向数组插入元素的时候可以直接定位到插入位置keys[size++]。操作一步到位,不用像哈希表那样还需要解决Hash冲突
    • 对数组的遍历操作也是如丝般顺滑,CPU直接可以在缓存行中遍历读取数组元素无需访问内存。比HashSet的迭代器java.util.HashMap.KeyIterator 遍历方式性能不知高到哪里去了。
    final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> 
    
        //采用数组替换到JDK中的HashSet,这样add操作和遍历操作效率更高,不需要考虑hash冲突
        SelectionKey[] keys;
        //数组尾部指针
        int size;
    
        SelectedSelectionKeySet() 
            keys = new SelectionKey[1024];
        
    
        /**
         * 数组的添加效率高于 HashSet 因为不需要考虑hash冲突
         * */
        @Override
        public boolean add(SelectionKey o) 
            if (o == null) 
                return false;
            
            //时间复杂度O(1)
            keys[size++] = o;
            if (size == keys.length) 
                //扩容为原来的两倍大小
                increaseCapacity();
            
    
            return true;
        
    
        private void increaseCapacity() 
            SelectionKey[] newKeys = new SelectionKey[keys.length << 1];
            System.arraycopy(keys, 0, newKeys, 0, size);
            keys = newKeys;
        
    
        /**
         * 采用数组的遍历效率 高于 HashSet
         * */
        @Override
        public Iterator<SelectionKey> iterator() 
            return new Iterator<SelectionKey>() 
                private int idx;
    
                @Override
                public boolean hasNext() 
                    return idx < size;
                
    
                @Override
                public SelectionKey next() 
                    if (!hasNext()) 
                        throw new NoSuchElementException();
                    
                    return keys[idx++];
                
    
                @Override
                public void remove() 
                    throw new UnsupportedOperationException();
                
            ;
        
    
    

    看到这里不禁感叹,从各种小的细节可以看出Netty对性能的优化简直淋漓尽致,对性能的追求令人发指。细节真的是魔鬼。

    1. Netty通过反射的方式用SelectedSelectionKeySet替换掉sun.nio.ch.SelectorImpl#selectedKeyssun.nio.ch.SelectorImpl#publicSelectedKeys这两个集合中原来HashSet的实现。