聊聊Netty那些事儿之Reactor在Netty中的实现(创建篇)
Posted 热爱编程的大忽悠
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了聊聊Netty那些事儿之Reactor在Netty中的实现(创建篇)相关的知识,希望对你有一定的参考价值。
聊聊Netty那些事儿之Reactor在Netty中的实现-创建篇
本系列Netty源码解析文章基于 4.1.56.Final版本
在上篇文章《聊聊Netty那些事儿之从内核角度看IO模型》中我们花了大量的篇幅来从内核角度详细讲述了五种IO模型
的演进过程以及ReactorIO线程模型
的底层基石IO多路复用技术在内核中的实现原理。
最后我们引出了netty中使用的主从Reactor IO线程模型。
通过上篇文章的介绍,我们已经清楚了在IO调用的过程中内核帮我们搞了哪些事情,那么俗话说的好内核领进门,修行在netty
,netty在用户空间又帮我们搞了哪些事情?
那么从本文开始,笔者将从源码角度来带大家看下上图中的Reactor IO线程模型
在Netty中是如何实现的。
本文作为Reactor在Netty中实现系列文章中的开篇文章,笔者先来为大家介绍Reactor的骨架是如何创建出来的。
在上篇文章中我们提到Netty采用的是主从Reactor多线程
的模型,但是它在实现上又与Doug Lea在Scalable IO in Java论文中提到的经典主从Reactor多线程模型
有所差异。
经典主从Reactor多线程模型
Netty中的Reactor
是以Group
的形式出现的,主从Reactor
在Netty中就是主从Reactor组
,每个Reactor Group
中会有多个Reactor
用来执行具体的IO任务
。当然在netty中Reactor
不只用来执行IO任务
,这个我们后面再说。
Main Reactor Group
中的Reactor
数量取决于服务端要监听的端口个数,通常我们的服务端程序只会监听一个端口,所以Main Reactor Group
只会有一个Main Reactor
线程来处理最重要的事情:绑定端口地址
,接收客户端连接
,为客户端创建对应的SocketChannel
,将客户端SocketChannel分配给一个固定的Sub Reactor
。Sub Reactor Group
里有多个Reactor
线程,Reactor
线程的个数可以通过系统参数-D io.netty.eventLoopThreads
指定。默认的Reactor
的个数为CPU核数 * 2
。Sub Reactor
线程主要用来轮询客户端SocketChannel上的IO就绪事件
,处理IO就绪事件
,执行异步任务
。
一个
客户端SocketChannel
只能分配给一个固定的Sub Reactor
。一个Sub Reactor
负责处理多个客户端SocketChannel
,这样可以将服务端承载的全量客户端连接
分摊到多个Sub Reactor
中处理,同时也能保证客户端SocketChannel上的IO处理的线程安全性
。
由于文章篇幅的关系,作为Reactor在netty中实现的第一篇我们主要来介绍主从Reactor Group
的创建流程,骨架脉络先搭好。
下面我们来看一段Netty服务端代码的编写模板,从代码模板的流程中我们来解析下主从Reactor的创建流程以及在这个过程中所涉及到的Netty核心类。
Netty服务端代码模板
/**
* Echoes back any received data from a client.
*/
public final class EchoServer
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception
// Configure the server.
//创建主从Reactor线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//配置主从Reactor
.channel(NioserverSocketChannel.class)//配置主Reactor中的channel类型
.option(ChannelOption.SO_BACKLOG, 100)//设置主Reactor中channel的option选项
.handler(new LoggingHandler(LogLevel.INFO))//设置主Reactor中Channel->pipline->handler
.childHandler(new ChannelInitializer<SocketChannel>() //设置从Reactor中注册channel的pipeline
@Override
public void initChannel(SocketChannel ch) throws Exception
ChannelPipeline p = ch.pipeline();
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
);
// Start the server. 绑定端口启动服务,开始监听accept事件
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
finally
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
- 首先我们要创建Netty最核心的部分 ->
创建主从Reactor Group
,在Netty中EventLoopGroup
就是Reactor Group
的实现类。对应的EventLoop
就是Reactor
的实现类。
//创建主从Reactor线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
- 创建用于
IO处理
的ChannelHandler
,实现相应IO事件
的回调函数,编写对应的IO处理
逻辑。注意这里只是简单示例哈,详细的IO事件处理,笔者会单独开一篇文章专门讲述。
final EchoServerHandler serverHandler = new EchoServerHandler();
/**
* Handler implementation for the echo server.
*/
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
................省略IO处理逻辑................
ctx.write(msg);
@Override
public void channelReadComplete(ChannelHandlerContext ctx)
ctx.flush();
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
-
创建
ServerBootstrap
Netty服务端启动类,并在启动类中配置启动Netty服务端所需要的一些必备信息。在上篇文章介绍
Socket内核结构
小节中我们提到,在编写服务端网络程序时,我们首先要创建一个Socket
用于listen和bind
端口地址,我们把这个叫做监听Socket
,这里对应的就是NioServerSocketChannel.class
。当客户端连接完成三次握手,系统调用accept
函数会基于监听Socket
创建出来一个新的Socket
专门用于与客户端之间的网络通信我们称为客户端连接Socket
,这里对应的就是NioSocketChannel.class
netty有两种
Channel类型
:一种是服务端用于监听绑定端口地址的NioServerSocketChannel
,一种是用于客户端通信的NioSocketChannel
。每种Channel类型实例
都会对应一个PipeLine
用于编排对应channel实例
上的IO事件处理逻辑。PipeLine
中组织的就是ChannelHandler
用于编写特定的IO处理逻辑。注意
serverBootstrap.handler
设置的是服务端NioServerSocketChannel PipeLine
中的ChannelHandler
。ServerBootstrap
启动类方法带有child
前缀的均是设置客户端NioSocketChannel
属性的。ChannelInitializer
是用于当SocketChannel
成功注册到绑定的Reactor
上后,用于初始化该SocketChannel
的Pipeline
。它的initChannel
方法会在注册成功后执行。这里只是捎带提一下,让大家有个初步印象,后面我会专门介绍。 -
serverBootstrap.childHandler(ChannelHandler childHandler)
用于设置客户端NioSocketChannel
中对应Pipieline
中的ChannelHandler
。我们通常配置的编码解码器就是在这里。serverBootstrap.option(ChannelOption.SO_BACKLOG, 100)
设置服务端ServerSocketChannel
中的SocketOption
。关于SocketOption
的选项我们后边的文章再聊,本文主要聚焦在NettyMain Reactor Group
的创建及工作流程。serverBootstrap.handler(....)
设置服务端NioServerSocketChannel
中对应Pipieline
中的ChannelHandler
。- 通过
serverBootstrap.group(bossGroup, workerGroup)
为Netty服务端配置主从Reactor Group
实例。 - 通过
serverBootstrap.channel(NioServerSocketChannel.class)
配置Netty服务端的ServerSocketChannel
用于绑定端口地址
以及创建客户端SocketChannel
。Netty中的NioServerSocketChannel.class
就是对JDK NIO中ServerSocketChannel
的封装。而用于表示客户端连接
的NioSocketChannel
是对JDK NIOSocketChannel
封装。
-
ChannelFuture f = serverBootstrap.bind(PORT).sync()
这一步会是下篇文章要重点分析的主题Main Reactor Group
的启动,绑定端口地址,开始监听客户端连接事件(OP_ACCEPT
)。本文我们只关注创建流程。 -
f.channel().closeFuture().sync()
等待服务端NioServerSocketChannel
关闭。Netty服务端到这里正式启动,并准备好接受客户端连接的准备。 -
shutdownGracefully
优雅关闭主从Reactor线程组
里的所有Reactor线程
。
Netty对IO模型的支持
在上篇文章中我们介绍了五种IO模型
,Netty中支持BIO
,NIO
,AIO
以及多种操作系统下的IO多路复用技术
实现。
在Netty中切换这几种IO模型
也是非常的方便,下面我们来看下Netty如何对这几种IO模型进行支持的。
首先我们介绍下几个与IO模型
相关的重要接口:
EventLoop
EventLoop
就是Netty中的Reactor
,可以说它就是Netty的引擎,负责Channel上IO就绪事件的监听
,IO就绪事件的处理
,异步任务的执行
驱动着整个Netty的运转。
不同IO模型
下,EventLoop
有着不同的实现,我们只需要切换不同的实现类就可以完成对NettyIO模型
的切换。
BIO | NIO | AIO |
---|---|---|
ThreadPerChannelEventLoop | NioEventLoop | AioEventLoop |
在NIO模型
下Netty会自动
根据操作系统以及版本的不同选择对应的IO多路复用技术实现
。比如Linux 2.6版本以上用的是Epoll
,2.6版本以下用的是Poll
,Mac下采用的是Kqueue
。
其中Linux kernel 在5.1版本引入的异步IO库io_uring正在netty中孵化。
EventLoopGroup
Netty中的Reactor
是以Group
的形式出现的,EventLoopGroup
正是Reactor组
的接口定义,负责管理Reactor
,Netty中的Channel
就是通过EventLoopGroup
注册到具体的Reactor
上的。
Netty的IO线程模型是主从Reactor多线程模型
,主从Reactor线程组
在Netty源码中对应的其实就是两个EventLoopGroup
实例。
不同的IO模型
也有对应的实现:
BIO | NIO | AIO |
---|---|---|
ThreadPerChannelEventLoopGroup | NioEventLoopGroup | AioEventLoopGroup |
ServerSocketChannel
用于Netty服务端使用的ServerSocketChannel
,对应于上篇文章提到的监听Socket
,负责绑定监听端口地址,接收客户端连接并创建用于与客户端通信的SocketChannel
。
不同的IO模型
下的实现:
BIO | NIO | AIO |
---|---|---|
OioServerSocketChannel | NioServerSocketChannel | AioServerSocketChannel |
SocketChannel
用于与客户端通信的SocketChannel
,对应于上篇文章提到的客户端连接Socket
,当客户端完成三次握手后,由系统调用accept
函数根据监听Socket
创建。
不同的IO模型
下的实现:
BIO | NIO | AIO |
---|---|---|
OioSocketChannel | NioSocketChannel | AioSocketChannel |
我们看到在不同IO模型
的实现中,Netty这些围绕IO模型
的核心类只是前缀的不同:
- BIO对应的前缀为
Oio
表示old io
,现在已经废弃不推荐使用。 - NIO对应的前缀为
Nio
,正是Netty推荐也是我们常用的非阻塞IO模型
。 - AIO对应的前缀为
Aio
,由于Linux下的异步IO
机制实现的并不成熟,性能提升表现上也不明显,现已被删除。
我们只需要将IO模型
的这些核心接口对应的实现类前缀
改为对应IO模型
的前缀,就可以轻松在Netty中完成对IO模型
的切换。
多种NIO的实现
Common | Linux | Mac |
---|---|---|
NioEventLoopGroup | EpollEventLoopGroup | KQueueEventLoopGroup |
NioEventLoop | EpollEventLoop | KQueueEventLoop |
NioServerSocketChannel | EpollServerSocketChannel | KQueueServerSocketChannel |
NioSocketChannel | EpollSocketChannel | KQueueSocketChannel |
我们通常在使用NIO模型
的时候会使用Common列
下的这些IO模型
核心类,Common类
也会根据操作系统的不同自动选择JDK
在对应平台下的IO多路复用技术
的实现。
而Netty自身也根据操作系统的不同提供了自己对IO多路复用技术
的实现,比JDK
的实现性能更优。比如:
JDK
的 NIO默认
实现是水平触发
,Netty 是边缘触发(默认)
和水平触发可切换。。- Netty 实现的垃圾回收更少、性能更好。
我们编写Netty服务端程序的时候也可以根据操作系统的不同,采用Netty自身的实现来进一步优化程序。做法也很简单,直接将上图中红框里的实现类替换成Netty的自身实现类即可完成切换。
经过以上对Netty服务端代码编写模板以及IO模型
相关核心类的简单介绍,我们对Netty的创建流程有了一个简单粗略的总体认识,下面我们来深入剖析下创建流程过程中的每一个步骤以及这个过程中涉及到的核心类实现。
以下源码解析部分我们均采用Common列
下NIO
相关的实现进行解析。
创建主从Reactor线程组
在Netty服务端程序编写模板的开始,我们首先会创建两个Reactor线程组:
- 一个是主Reactor线程组
bossGroup
用于监听客户端连接,创建客户端连接NioSocketChannel
,并将创建好的客户端连接NioSocketChannel
注册到从Reactor线程组中一个固定的Reactor
上。 - 一个是从Reactor线程组
workerGroup
,workerGroup
中的Reactor
负责监听绑定在其上的客户端连接NioSocketChannel
上的IO就绪事件
,并处理IO就绪事件
,执行异步任务
。
//创建主从Reactor线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
Netty中Reactor线程组的实现类为NioEventLoopGroup
,在创建bossGroup
和workerGroup
的时候用到了NioEventLoopGroup
的两个构造函数:
- 带
nThreads
参数的构造函数public NioEventLoopGroup(int nThreads)
。 - 不带
nThreads
参数的默认
构造函数public NioEventLoopGroup()
public class NioEventLoopGroup extends MultithreadEventLoopGroup
/**
* Create a new instance using the default number of threads, the default @link ThreadFactory and
* the @link SelectorProvider which is returned by @link SelectorProvider#provider().
*/
public NioEventLoopGroup()
this(0);
/**
* Create a new instance using the specified number of threads, @link ThreadFactory and the
* @link SelectorProvider which is returned by @link SelectorProvider#provider().
*/
public NioEventLoopGroup(int nThreads)
this(nThreads, (Executor) null);
......................省略...........................
nThreads
参数表示当前要创建的Reactor线程组
内包含多少个Reactor线程
。不指定nThreads
参数的话采用默认的Reactor线程
个数,用0
表示。
最终会调用到构造函数
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory)
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
下面简单介绍下构造函数中这几个参数的作用,后面我们在讲解本文主线的过程中还会提及这几个参数,到时在详细介绍,这里只是让大家有个初步印象,不必做过多的纠缠。
Executor executor:
负责启动Reactor线程
进而Reactor才可以开始工作。
Reactor线程组
NioEventLoopGroup
负责创建Reactor线程
,在创建的时候会将executor
传入。
RejectedExecutionHandler:
当向Reactor
添加异步任务添加失败时,采用的拒绝策略。Reactor的任务不只是监听IO活跃事件和IO任务的处理,还包括对异步任务的处理。这里大家只需有个这样的概念,后面笔者会专门详细介绍。SelectorProvider selectorProvider:
Reactor中的IO模型为IO多路复用模型
,对应于JDK NIO中的实现为java.nio.channels.Selector
(就是我们上篇文章中提到的select,poll,epoll
),每个Reator中都包含一个Selector
,用于轮询
注册在该Reactor上的所有Channel
上的IO事件
。SelectorProvider
就是用来创建Selector
的。SelectStrategyFactory selectStrategyFactory:
Reactor最重要的事情就是轮询
注册其上的Channel
上的IO就绪事件
,这里的SelectStrategyFactory
用于指定轮询策略
,默认为DefaultSelectStrategyFactory.INSTANCE
。
Executor是执行器的意思,即负责执行传入的task任务,具体如何执行该任务,则会有不同的实现,可以是同步执行,或者异步单线程执行,或者异步线程池执行,再或者是异步定时执,等等…
最终会将这些参数交给NioEventLoopGroup
的父类构造器,下面我们来看下NioEventLoopGroup类
的继承结构:
NioEventLoopGroup类
的继承结构乍一看比较复杂,大家不要慌,笔者会随着主线的深入慢慢地介绍这些父类接口,我们现在重点关注Mutithread
前缀的类。
我们知道NioEventLoopGroup
是Netty中的Reactor线程组
的实现,既然是线程组那么肯定是负责管理和创建多个Reactor线程的
,所以Mutithread
前缀的类定义的行为自然是对Reactor线程组
内多个Reactor线程
的创建和管理工作。
MultithreadEventLoopGroup
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup
private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);
//默认Reactor个数
private static final int DEFAULT_EVENT_LOOP_THREADS;
static
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled())
logger.debug("-Dio.netty.eventLoopThreads: ", DEFAULT_EVENT_LOOP_THREADS);
/**
* @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
*/
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args)
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
...................省略.....................
MultithreadEventLoopGroup类
主要的功能就是用来确定Reactor线程组
内Reactor
的个数。
默认的Reactor
的个数存放于字段DEFAULT_EVENT_LOOP_THREADS
中。
从static
静态代码块中我们可以看出默认Reactor
的个数的获取逻辑:
- 可以通过系统变量
-D io.netty.eventLoopThreads"
指定。 - 如果不指定,那么默认的就是
NettyRuntime.availableProcessors() * 2
当nThread
参数设置为0
采用默认设置时,Reactor线程组
内的Reactor
个数则设置为DEFAULT_EVENT_LOOP_THREADS
。
MultithreadEventExecutorGroup
MultithreadEventExecutorGroup
这里就是本小节的核心,主要用来定义创建和管理Reactor
的行为。
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup
//Reactor线程组中的Reactor集合
private final EventExecutor[] children;
private final Set<EventExecutor> readonlyChildren;
//从Reactor group中选择一个特定的Reactor的选择策略 用于channel注册绑定到一个固定的Reactor上
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
/**
* Create a new instance.
*
* @param nThreads the number of threads that will be used by this instance.
* @param executor the Executor to use, or @code null if the default should be used.
* @param args arguments which will passed to each @link #newChild(Executor, Object...) call
*/
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args)
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
............................省略................................
首先介绍一个新的构造器参数EventExecutorChooserFactory chooserFactory
。当客户端连接完成三次握手后,Main Reactor
会创建客户端连接NioSocketChannel
,并将其绑定到Sub Reactor Group
中的一个固定Reactor
,那么具体要绑定到哪个具体的Sub Reactor
上呢?这个绑定策略就是由chooserFactory
来创建的。默认为DefaultEventExecutorChooserFactory
。
下面就是本小节的主题Reactor线程组
的创建过程:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args)
if (nThreads <= 0)
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
if (executor == null)
//用于创建Reactor线程
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
children = new EventExecutor[nThreads];
//循环创建reaactor group中的Reactor
for (int i = 0; i < nThreads; i ++)
boolean success = false;
try
//创建reactor
children[i] = newChild(executor, args);
success = true;
catch (Exception e)
throw new IllegalStateException("failed to create a child event loop", e);
finally
................省略................
//创建channel到Reactor的绑定策略
chooser = chooserFactory.newChooser(children);
................省略................
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
1. 创建用于启动Reactor线程的executor
在Netty Reactor Group中的单个Reactor
的IO线程模型
为上篇文章提到的单Reactor单线程模型
,一个Reactor线程
负责轮询
注册其上的所有Channel
中的IO就绪事件
,处理IO事件,执行Netty中的异步任务等工作。正是这个Reactor线程
驱动着整个Netty的运转,可谓是Netty的核心引擎。
而这里的executor
就是负责启动Reactor线程
的,从创建源码中我们可以看到executor
的类型为ThreadPerTaskExecutor
。
ThreadPerTaskExecutor
public final class ThreadPerTaskExecutor implements Executor
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory)
this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
@Override
public void execute(Runnable command)
threadFactory.newThread(command).start();
我们看到ThreadPerTaskExecutor
做的事情很简单,从它的命名前缀ThreadPerTask
我们就可以猜出它的工作方式,就是来一个任务就创建一个线程执行。而创建的这个线程正是netty的核心引擎Reactor线程。
在Reactor线程
启动的时候,Netty会将Reactor线程
要做的事情封装成Runnable
,丢给exexutor
启动。
而Reactor线程
的核心就是一个死循环
不停的轮询
IO就绪事件,处理IO事件,执行异步任务。一刻也不停歇,堪称996典范
。
这里向大家先卖个关子,"Reactor线程是何时启动的呢??"
2. 创建Reactor
Reactor线程组NioEventLoopGroup
包含多个Reactor
,存放于private final EventExecutor[] children
数组中。
所以下面的事情就是创建nThread
个Reactor
,并存放于EventExecutor[] children
字段中,
我们来看下用于创建Reactor
的newChild(executor, args)
方法:
newChild
newChild
方法是MultithreadEventExecutorGroup
中的一个抽象方法,提供给具体子类实现。
protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
这里我们解析的是NioEventLoopGroup
,我们来看下newChild
在该类中的实现:
public class NioEventLoopGroup extends MultithreadEventLoopGroup
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
前边提到的众多构造器参数,这里会通过可变参数Object... args
传入到Reactor类NioEventLoop
的构造器中。
这里介绍下新的参数EventLoopTaskQueueFactory queueFactory
,前边提到Netty中的Reactor
主要工作是轮询
注册其上的所有Channel
上的IO就绪事件
,处理IO就绪事件
。除了这些主要的工作外,Netty为了极致的压榨Reactor
的性能,还会让它做一些异步任务的执行工作。既然要执行异步任务,那么Reactor
中就需要一个队列
来保存任务。
这里的EventLoopTaskQueueFactory
就是用来创建这样的一个队列来保存Reactor
中待执行的异步任务。
可以把Reactor
理解成为一个单线程的线程池
,类似
于JDK
中的SingleThreadExecutor
,仅用一个线程来执行轮询IO就绪事件
,处理IO就绪事件
,执行异步任务
。同时待执行的异步任务保存在Reactor
里的taskQueue
中。
NioEventLoop
public final class NioEventLoop extends SingleThreadEventLoop
//用于创建JDK NIO Selector,ServerSocketChannel
private final SelectorProvider provider;
//Selector轮询策略 决定什么时候轮询,什么时候处理IO事件,什么时候执行异步任务
private final SelectStrategy selectStrategy;
/**
* The NIO @link Selector.
*/
private Selector selector;
private Selector unwrappedSelector;
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory)
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
这里就正式开始了Reactor
的创建过程,我们知道Reactor
的核心是采用的IO多路复用模型
来对客户端连接上的IO事件
进行监听
,所以最重要的事情是创建Selector
(JDK NIO 中IO多路复用技术的实现
)。
可以把
Selector
理解为我们上篇文章介绍的Select,poll,epoll
,它是JDK NIO
对操作系统内核提供的这些IO多路复用技术
的封装。
openSelector
openSelector
是NioEventLoop类
中用于创建IO多路复用
的Selector
,并对创建出来的JDK NIO
原生的Selector
进行性能优化。
首先会通过SelectorProvider#openSelector
创建JDK NIO原生的Selector
。
private SelectorTuple openSelector()
final Selector unwrappedSelector;
try
//通过JDK NIO SelectorProvider创建Selector
unwrappedSelector = provider.openSelector();
catch (IOException e)
throw new ChannelException("failed to open a new selector", e);
..................省略.............
SelectorProvider
会根据操作系统的不同选择JDK在不同操作系统版本下的对应Selector
的实现。Linux下会选择Epoll
,Mac下会选择Kqueue
。
下面我们就来看下SelectorProvider
是如何做到自动适配不同操作系统下IO多路复用
实现的
SelectorProvider
public NioEventLoopGroup(ThreadFactory threadFactory)
this(0, threadFactory, SelectorProvider.provider());
SelectorProvider
是在前面介绍的NioEventLoopGroup类
构造函数中通过调用SelectorProvider.provider()
被加载,并通过NioEventLoopGroup#newChild
方法中的可变长参数Object... args
传递到NioEventLoop
中的private final SelectorProvider provider
字段中。
SelectorProvider的加载过程:
public abstract class SelectorProvider
public static SelectorProvider provider()
synchronized (lock)
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>()
public SelectorProvider run()
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
);
从SelectorProvider
加载源码中我们可以看出,SelectorProvider
的加载方式有三种,优先级如下:
- 通过系统变量
-D java.nio.channels.spi.SelectorProvider
指定SelectorProvider
的自定义实现类全限定名
。通过应用程序类加载器(Application Classloader)
加载。
private static boolean loadProviderFromProperty()
String cn = System.getProperty("java.nio.channels.spi.SelectorProvider");
if (cn == null)
return false;
try
Class<?> c = Class.forName(cn, true,
ClassLoader.getSystemClassLoader());
provider = (SelectorProvider)c.newInstance();
return true;
.................省略.............
- 通过
SPI
方式加载。在工程目录META-INF/services
下定义名为java.nio.channels.spi.SelectorProvider
的SPI文件
,文件中第一个定义的SelectorProvider
实现类全限定名就会被加载。
private static boolean loadProviderAsService()
ServiceLoader<SelectorProvider> sl =
ServiceLoader.load(SelectorProvider.class,
ClassLoader.getSystemClassLoader());
Iterator<SelectorProvider> i = sl.iterator();
for (;;)
try
if (!i.hasNext())
return false;
provider = i.next();
return true;
catch (ServiceConfigurationError sce)
if (sce.getCause() instanceof SecurityException)
// Ignore the security exception, try the next provider
continue;
throw sce;
- 如果以上两种方式均未被定义,那么就采用
SelectorProvider
系统默认实现sun.nio.ch.DefaultSelectorProvider
。笔者当前使用的操作系统是MacOS
,从源码中我们可以看到自动适配了KQueue
实现。
public class DefaultSelectorProvider
private DefaultSelectorProvider()
public static SelectorProvider create()
return new KQueueSelectorProvider();
不同操作系统中JDK对于
DefaultSelectorProvider
会有所不同,Linux内核版本2.6以上对应的Epoll
,Linux内核版本2.6以下对应的Poll
,MacOS对应的是KQueue
。
下面我们接着回到io.netty.channel.nio.NioEventLoop#openSelector
的主线上来。
Netty对JDK NIO 原生Selector的优化
首先在NioEventLoop
中有一个Selector优化开关DISABLE_KEY_SET_OPTIMIZATION
,通过系统变量-D io.netty.noKeySetOptimization
指定,默认是开启的,表示需要对JDK NIO原生Selector
进行优化。
public final class NioEventLoop extends SingleThreadEventLoop
//Selector优化开关 默认开启 为了遍历的效率 会对Selector中的SelectedKeys进行数据结构优化
private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
如果优化开关DISABLE_KEY_SET_OPTIMIZATION
是关闭的,那么直接返回JDK NIO原生的Selector
。
private SelectorTuple openSelector()
..........SelectorProvider创建JDK NIO 原生Selector..............
if (DISABLE_KEY_SET_OPTIMIZATION)
//JDK NIO原生Selector ,Selector优化开关 默认开启需要对Selector进行优化
return new SelectorTuple(unwrappedSelector);
下面为Netty对JDK NIO原生的Selector
的优化过程:
- 获取
JDK NIO原生Selector
的抽象实现类sun.nio.ch.SelectorImpl
。JDK NIO原生Selector
的实现均继承于该抽象类。用于判断由SelectorProvider
创建出来的Selector
是否为JDK默认实现
(SelectorProvider
第三种加载方式)。因为SelectorProvider
可以是自定义加载,所以它创建出来的Selector
并不一定是JDK NIO 原生的。
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>()
@Override
public Object run()
try
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
catch (Throwable cause)
return cause;
);
JDK NIO Selector的抽象类sun.nio.ch.SelectorImpl
public abstract class SelectorImpl extends AbstractSelector
// The set of keys with data ready for an operation
// //IO就绪的SelectionKey(里面包裹着channel)
protected Set<SelectionKey> selectedKeys;
// The set of keys registered with this Selector
//注册在该Selector上的所有SelectionKey(里面包裹着channel)
protected HashSet<SelectionKey> keys;
// Public views of the key sets
//用于向调用线程返回的keys,不可变
private Set<SelectionKey> publicKeys; // Immutable
//当有IO就绪的SelectionKey时,向调用线程返回。只可删除其中元素,不可增加
private Set<SelectionKey> publicSelectedKeys; // Removal allowed, but not addition
protected SelectorImpl(SelectorProvider sp)
super(sp);
keys = new HashSet<SelectionKey>();
selectedKeys = new HashSet<SelectionKey>();
if (Util.atBugLevel("1.4"))
publicKeys = keys;
publicSelectedKeys = selectedKeys;
else
//不可变
publicKeys = Collections.unmodifiableSet(keys);
//只可删除其中元素,不可增加
publicSelectedKeys = Util.ungrowableSet(selectedKeys);
这里笔者来简单介绍下JDK NIO中的Selector
中这几个字段的含义,我们可以和上篇文章讲到的epoll在内核中的结构做类比,方便大家后续的理解:
Set<SelectionKey> selectedKeys
类似于我们上篇文章讲解Epoll
时提到的就绪队列eventpoll->rdllist
,Selector
这里大家可以理解为Epoll
。Selector
会将自己监听到的IO就绪
的Channel
放到selectedKeys
中。
这里的
SelectionKey
暂且可以理解为Channel
在Selector
中的表示,类比上图中epitem结构
里的epoll_event
,封装IO就绪Socket的信息。其实SelectionKey
里包含的信息不止是Channel
还有很多IO相关的信息。后面我们在详细介绍。
HashSet<SelectionKey> keys:
这里存放的是所有注册到该Selector
上的Channel
。类比epoll中的红黑树结构rb_root
SelectionKey
在Channel
注册到Selector
中后生成。
Set<SelectionKey> publicSelectedKeys
相当于是selectedKeys
的视图,用于向外部线程返回IO就绪
的SelectionKey
。这个集合在外部线程中只能做删除操作不可增加元素
,并且不是线程安全的
。Set<SelectionKey> publicKeys
相当于keys
的不可变视图,用于向外部线程返回所有注册在该Selector
上的SelectionKey
这里需要重点关注
抽象类sun.nio.ch.SelectorImpl
中的selectedKeys
和publicSelectedKeys
这两个字段,注意它们的类型都是HashSet
,一会优化的就是这里!!!!
- 判断由
SelectorProvider
创建出来的Selector
是否是JDK NIO原生的Selector
实现。因为Netty优化针对的是JDK NIO 原生Selector
。判断标准为sun.nio.ch.SelectorImpl
类是否为SelectorProvider
创建出Selector
的父类。如果不是则直接返回。不在继续下面的优化过程。
//判断是否可以对Selector进行优化,这里主要针对JDK NIO原生Selector的实现类进行优化,因为SelectorProvider可以加载的是自定义Selector实现
//如果SelectorProvider创建的Selector不是JDK原生sun.nio.ch.SelectorImpl的实现类,那么无法进行优化,直接返回
if (!(maybeSelectorImplClass instanceof Class) ||
!((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass()))
if (maybeSelectorImplClass instanceof Throwable)
Throwable t = (Throwable) maybeSelectorImplClass;
logger.trace("failed to instrument a special java.util.Set into: ", unwrappedSelector, t);
return new SelectorTuple(unwrappedSelector);
通过前面对SelectorProvider
的介绍我们知道,这里通过provider.openSelector()
创建出来的Selector
实现类为KQueueSelectorImpl类
,它继承实现了sun.nio.ch.SelectorImpl
,所以它是JDK NIO 原生的Selector
实现
class KQueueSelectorImpl extends SelectorImpl
- 创建
SelectedSelectionKeySet
通过反射替换掉sun.nio.ch.SelectorImpl类
中selectedKeys
和publicSelectedKeys
的默认HashSet
实现。
为什么要用SelectedSelectionKeySet
替换掉原来的HashSet
呢??
因为这里涉及到对HashSet类型
的sun.nio.ch.SelectorImpl#selectedKeys
集合的两种操作:
- 插入操作: 通过前边对
sun.nio.ch.SelectorImpl类
中字段的介绍我们知道,在Selector
监听到IO就绪
的SelectionKey
后,会将IO就绪
的SelectionKey
插入sun.nio.ch.SelectorImpl#selectedKeys
集合中,这时Reactor线程
会从java.nio.channels.Selector#select(long)
阻塞调用中返回(类似上篇文章提到的epoll_wait
)。 - 遍历操作:
Reactor线程
返回后,会从Selector
中获取IO就绪
的SelectionKey
集合(也就是sun.nio.ch.SelectorImpl#selectedKeys
),Reactor线程
遍历selectedKeys
,获取IO就绪
的SocketChannel
,并处理SocketChannel
上的IO事件
。
我们都知道HashSet
底层数据结构是一个哈希表
,由于Hash冲突
这种情况的存在,所以导致对哈希表
进行插入
和遍历
操作的性能不如对数组
进行插入
和遍历
操作的性能好。
还有一个重要原因是,数组可以利用CPU缓存的优势来提高遍历的效率。后面笔者会有一篇专门的文章来讲述利用CPU缓存行如何为我们带来性能优势。
所以Netty为了优化对sun.nio.ch.SelectorImpl#selectedKeys
集合的插入,遍历
性能,自己用数组
这种数据结构实现了SelectedSelectionKeySet
,用它来替换原来的HashSet
实现。
SelectedSelectionKeySet
- 初始化
SelectionKey[] keys
数组大小为1024
,当数组容量不够时,扩容为原来的两倍大小。 - 通过数组尾部指针
size
,在向数组插入元素的时候可以直接定位到插入位置keys[size++]
。操作一步到位,不用像哈希表
那样还需要解决Hash冲突
。 - 对数组的遍历操作也是如丝般顺滑,CPU直接可以在缓存行中遍历读取数组元素无需访问内存。比
HashSet
的迭代器java.util.HashMap.KeyIterator
遍历方式性能不知高到哪里去了。
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey>
//采用数组替换到JDK中的HashSet,这样add操作和遍历操作效率更高,不需要考虑hash冲突
SelectionKey[] keys;
//数组尾部指针
int size;
SelectedSelectionKeySet()
keys = new SelectionKey[1024];
/**
* 数组的添加效率高于 HashSet 因为不需要考虑hash冲突
* */
@Override
public boolean add(SelectionKey o)
if (o == null)
return false;
//时间复杂度O(1)
keys[size++] = o;
if (size == keys.length)
//扩容为原来的两倍大小
increaseCapacity();
return true;
private void increaseCapacity()
SelectionKey[] newKeys = new SelectionKey[keys.length << 1];
System.arraycopy(keys, 0, newKeys, 0, size);
keys = newKeys;
/**
* 采用数组的遍历效率 高于 HashSet
* */
@Override
public Iterator<SelectionKey> iterator()
return new Iterator<SelectionKey>()
private int idx;
@Override
public boolean hasNext()
return idx < size;
@Override
public SelectionKey next()
if (!hasNext())
throw new NoSuchElementException();
return keys[idx++];
@Override
public void remove()
throw new UnsupportedOperationException();
;
看到这里不禁感叹,从各种小的细节可以看出Netty对性能的优化简直淋漓尽致,对性能的追求令人发指。细节真的是魔鬼。
- Netty通过反射的方式用
SelectedSelectionKeySet
替换掉sun.nio.ch.SelectorImpl#selectedKeys
,sun.nio.ch.SelectorImpl#publicSelectedKeys
这两个集合中原来HashSet
的实现。
- 反射获取
sun
以上是关于聊聊Netty那些事儿之Reactor在Netty中的实现(创建篇)的主要内容,如果未能解决你的问题,请参考以下文章