死磕Netty源码之Reactor线程模型

Posted 纯洁的明依

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了死磕Netty源码之Reactor线程模型相关的知识,希望对你有一定的参考价值。

       `Netty`中最核心的思想就是`Reactor`线程,对应`Netty`中的代码就是`NioEventLoop`。

`NioEventLoop`是通过`NioEventLoopGroup`进行维护的,

       所以在介绍`NioEventLoop`前我们先介绍一下`NioEventLoopGroup`

NioEventLoopGroup`创建

     `NioEventLoopGroup`在客户端/服务端初始化时创建

EventLoopGroup bossGroup = new NioEventLoopGroup();

     以下是`NioEventLoopGroup`继承关系图:

       顶层接口是`Executor`可知`EventLoopGroup`支持执行一个异步任务,

`ScheduledExecutorService`看名字可知子类将支持任务的调度执行,

接下来我们继续跟进`EventLoopGroup`的构造方法,通过调用链发现它最终将调用到`MultithreadEventLoopGroup`的构造方法。

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {// ...if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); }
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {boolean success = false; children[i] = newChild(executor, args); success = true; }
chooser = chooserFactory.newChooser(children);}
 executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());            

 

此处传递进来的`executor`为`null`,所以`MultithreadEventExecutorGroup`构造方法主要完成如下功能

- 创建线程执行器`ThreadPerTaskExecutor`

- 创建`NioEventLoop`数组

- 初始化`NioEventLoop`数组

- 初始化线程选择器

在调用构造函数时传递的参数`ThreadFactory`为`DefaultThreadFactory`实例。

public final class ThreadPerTaskExecutor implements Executor {    private final ThreadFactory threadFactory;public ThreadPerTaskExecutor(ThreadFactory threadFactory) {if (threadFactory == null) {throw new NullPointerException("threadFactory"); }this.threadFactory = threadFactory;    }@Overridepublic void execute(Runnable command) { threadFactory.newThread(command).start(); }}

`execute()`方法的作用是新建线程并执行任务,`Netty`中的默认`NIO`线程都是由`DefaultThreadFactory`创建。

public Thread newThread(Runnable r) { Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet());try {if (t.isDaemon() != daemon) { t.setDaemon(daemon);        }if (t.getPriority() != priority) { t.setPriority(priority); } } catch (Exception ignored) {
}return t;}

`Netty`对线程进行了一层封装及一些属性设置,这些参数是在`DefaultThreadFactory`的构造方法中被初始化的。

public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {// ... prefix = poolName + '-' + poolId.incrementAndGet() + '-';this.daemon = daemon;this.priority = priority;this.threadGroup = threadGroup;}

       我们重点关注新建线程的线程名`prefix + nextId.incrementAndGet()`到底是什么?跟踪代码发现`prefix`的规则是`poolName`和`poolId`(自增)通过`-`连接起来的(此处的`poolName`为`nioEventLoopGroup`)。所以`Netty`新建`NIO`线程默认名称为`nioEventLoopGroup-nioEventLoopGroupId-自增ID`,如`nioEventLoopGroup-2-1`。


创建`NioEventLoop`数组:

children = new EventExecutor[nThreads];

      关于`nThreads`如果用户显示指定`nThreads`数量那就按照用户指定的设置,否则这个值将是`CPU`核数的两倍。

      由于我们在创建`NioEventLoopGroup`时未传递任何参数,所以此处的`nThreads`为2倍的`CPU`核数。

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);}
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads",  NettyRuntime.availableProcessors() * 2));

`children`是`EventExecutor`数组,但是这里数组中的每个元素其实都是`NioEventLoop`实例。

初始化`NioEventLoop`数组:

`children`数组的初始化是在以下代码中完成的。

children[i] = newChild(executor, args);

我们跟进`newChild()`它最后调用的是`NioEventLoopGroup`的`newChild`方法。

protected EventLoop newChild(Executor executor, Object... args) throws Exception {return new NioEventLoop(this, executor, (SelectorProvider)  args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);}

`newChild`方法最后调用了`NioEventLoop`的构造方法,以下是`NioEve

`NioEventLoop`的构造方法代码如下:

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);provider = selectorProvider;final SelectorTuple selectorTuple = openSelector();selector = selectorTuple.selector;unwrappedSelector = selectorTuple.unwrappedSelector;selectStrategy = strategy;}

可以看到这里打开了一个`Selector`,也就是说每一个`NioEventLoop`都与一个`Selector`绑定.

初始化线程选择器:

初始化线程选择器在如下代码中完成:

chooser = chooserFactory.newChooser(children);

继续跟进newChooser方法,代码如下:

public EventExecutorChooser newChooser(EventExecutor[] executors) {if (isPowerOfTwo(executors.length)) {return new PowerOfTwoEventExecutorChooser(executors); } else {return new GenericEventExecutorChooser(executors); }}

我们可以发现`Netty`通过判断线程个数`nThreads`是否为2的幂次方来选择`chooser`:

private final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {@Overridepublic EventExecutor next() {// 利用2的N次方法的特点,使用&比求余更快return children[childIndex.getAndIncrement() & children.length - 1]; }}private final class GenericEventExecutorChooser implements EventExecutorChooser {@Overridepublic EventExecutor next() {// 使用求余方式return children[Math.abs(childIndex.getAndIncrement() % children.length)]; }}

至此完成了`NioEventLoopGroup`的创建,并在`NioEventLoopGroup`创建过程中完成`NioEventLoop`初始化工作.


NioEventLoop启动:

`NioEventLoop`的`run`方法是`Reactor`线程的主体,在第一次添加任务的时候被启动.

public void execute(Runnable task) {// ...
boolean inEventLoop = inEventLoop();if (inEventLoop) { addTask(task); } else { startThread(); addTask(task); }
// ...}

外部线程在往任务队列里面添加任务的时候执行`startThread()`,代码如下

private void startThread() {if (state == ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { doStartThread(); } }}

在`startThread()`方法中`Netty`会判断`Reactor`线程有没有被启动,如果还未启动则先通过`CAS`方式将启动状态`STATE_UPDATER`的值设置为`ST_START`然后启动线程(`CAS`确保下次有新任务执行的时候再调用这个方法不会再次去启动线程),接下来分析`doStartThread()`.

private vprivate void startThread() {if (state == ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { doStartThread(); } }}oid doStartThread() { ... executor.execute(new Runnable() {@Overridepublic void run() { thread = Thread.currentThread(); ... SingleThreadEventExecutor.this.run(); ... } }}

在这里`Netty`没有使用传统的线程创建方式来执行`run`方法,而是通过一个线程执行器`executor`来执行,其实是因为`executor`底层对线程做了一层优化,此处的`executor`就是上文中介绍到的`ThreadPerTaskExecutor`,它在每次执行`execute`方法的时候都会通过`DefaultThreadFactory`创建一个`FastThreadLocalThread`线程,而这个线程就是`Netty`中的`Reactor`线程实体.

至此`NioEventLoop`启动完毕,在下一篇将介绍`NioEventLoop`的执行过程.

以上是关于死磕Netty源码之Reactor线程模型的主要内容,如果未能解决你的问题,请参考以下文章

Reactor(死磕2)

Netty之Reactor线程模型概述

#yyds干货盘点# Netty源码分析之Reactor线程模型详解

netty服务端Chanel注册与线程模型详解(附带源码分析)

Netty框架之Reactor线程模型

Netty精粹之基于EventLoop机制的高效线程模型