死磕Netty源码之Reactor线程模型
Posted 纯洁的明依
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了死磕Netty源码之Reactor线程模型相关的知识,希望对你有一定的参考价值。
`Netty`中最核心的思想就是`Reactor`线程,对应`Netty`中的代码就是`NioEventLoop`。
`NioEventLoop`是通过`NioEventLoopGroup`进行维护的,
所以在介绍`NioEventLoop`前我们先介绍一下`NioEventLoopGroup`
NioEventLoopGroup`创建
`NioEventLoopGroup`在客户端/服务端初始化时创建
EventLoopGroup bossGroup = new NioEventLoopGroup();
以下是`NioEventLoopGroup`继承关系图:
顶层接口是`Executor`可知`EventLoopGroup`支持执行一个异步任务,
`ScheduledExecutorService`看名字可知子类将支持任务的调度执行,
接下来我们继续跟进`EventLoopGroup`的构造方法,通过调用链发现它最终将调用到`MultithreadEventLoopGroup`的构造方法。
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
// ...
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
children[i] = newChild(executor, args);
success = true;
}
chooser = chooserFactory.newChooser(children);
}
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
此处传递进来的`executor`为`null`,所以`MultithreadEventExecutorGroup`构造方法主要完成如下功能
- 创建线程执行器`ThreadPerTaskExecutor`
- 创建`NioEventLoop`数组
- 初始化`NioEventLoop`数组
- 初始化线程选择器
在调用构造函数时传递的参数`ThreadFactory`为`DefaultThreadFactory`实例。
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
`execute()`方法的作用是新建线程并执行任务,`Netty`中的默认`NIO`线程都是由`DefaultThreadFactory`创建。
public Thread newThread(Runnable r) {
Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet());
try {
if (t.isDaemon() != daemon) {
t.setDaemon(daemon);
}
if (t.getPriority() != priority) {
t.setPriority(priority);
}
} catch (Exception ignored) {
}
return t;
}
`Netty`对线程进行了一层封装及一些属性设置,这些参数是在`DefaultThreadFactory`的构造方法中被初始化的。
public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {
// ...
prefix = poolName + '-' + poolId.incrementAndGet() + '-';
this.daemon = daemon;
this.priority = priority;
this.threadGroup = threadGroup;
}
我们重点关注新建线程的线程名`prefix + nextId.incrementAndGet()`到底是什么?跟踪代码发现`prefix`的规则是`poolName`和`poolId`(自增)通过`-`连接起来的(此处的`poolName`为`nioEventLoopGroup`)。所以`Netty`新建`NIO`线程默认名称为`nioEventLoopGroup-nioEventLoopGroupId-自增ID`,如`nioEventLoopGroup-2-1`。
创建`NioEventLoop`数组:
children = new EventExecutor[nThreads];
关于`nThreads`如果用户显示指定`nThreads`数量那就按照用户指定的设置,否则这个值将是`CPU`核数的两倍。
由于我们在创建`NioEventLoopGroup`时未传递任何参数,所以此处的`nThreads`为2倍的`CPU`核数。
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
DEFAULT_EVENT_LOOP_THREADS = Math.max(1,
SystemPropertyUtil.getInt("io.netty.eventLoopThreads",
NettyRuntime.availableProcessors() * 2));
`children`是`EventExecutor`数组,但是这里数组中的每个元素其实都是`NioEventLoop`实例。
初始化`NioEventLoop`数组:
`children`数组的初始化是在以下代码中完成的。
children[i] = newChild(executor, args);
我们跟进`newChild()`它最后调用的是`NioEventLoopGroup`的`newChild`方法。
protected EventLoop newChild(Executor executor, Object... args)
throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider)
args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(),
(RejectedExecutionHandler) args[2]);
}
`newChild`方法最后调用了`NioEventLoop`的构造方法,以下是`NioEve
`NioEventLoop`的构造方法代码如下:
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
可以看到这里打开了一个`Selector`,也就是说每一个`NioEventLoop`都与一个`Selector`绑定.
初始化线程选择器:
初始化线程选择器在如下代码中完成:
chooser = chooserFactory.newChooser(children);
继续跟进newChooser方法,代码如下:
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
我们可以发现`Netty`通过判断线程个数`nThreads`是否为2的幂次方来选择`chooser`:
private final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
@Override
public EventExecutor next() {
// 利用2的N次方法的特点,使用&比求余更快
return children[childIndex.getAndIncrement() & children.length - 1];
}
}
private final class GenericEventExecutorChooser implements EventExecutorChooser {
@Override
public EventExecutor next() {
// 使用求余方式
return children[Math.abs(childIndex.getAndIncrement() % children.length)];
}
}
至此完成了`NioEventLoopGroup`的创建,并在`NioEventLoopGroup`创建过程中完成`NioEventLoop`初始化工作.
NioEventLoop启动:
`NioEventLoop`的`run`方法是`Reactor`线程的主体,在第一次添加任务的时候被启动.
public void execute(Runnable task) {
// ...
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
}
// ...
}
外部线程在往任务队列里面添加任务的时候执行`startThread()`,代码如下
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
doStartThread();
}
}
}
在`startThread()`方法中`Netty`会判断`Reactor`线程有没有被启动,如果还未启动则先通过`CAS`方式将启动状态`STATE_UPDATER`的值设置为`ST_START`然后启动线程(`CAS`确保下次有新任务执行的时候再调用这个方法不会再次去启动线程),接下来分析`doStartThread()`.
private vprivate void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
doStartThread();
}
}
}oid doStartThread() {
...
executor.execute(new Runnable() {
public void run() {
thread = Thread.currentThread();
...
SingleThreadEventExecutor.this.run();
...
}
}
}
在这里`Netty`没有使用传统的线程创建方式来执行`run`方法,而是通过一个线程执行器`executor`来执行,其实是因为`executor`底层对线程做了一层优化,此处的`executor`就是上文中介绍到的`ThreadPerTaskExecutor`,它在每次执行`execute`方法的时候都会通过`DefaultThreadFactory`创建一个`FastThreadLocalThread`线程,而这个线程就是`Netty`中的`Reactor`线程实体.
至此`NioEventLoop`启动完毕,在下一篇将介绍`NioEventLoop`的执行过程.
以上是关于死磕Netty源码之Reactor线程模型的主要内容,如果未能解决你的问题,请参考以下文章
#yyds干货盘点# Netty源码分析之Reactor线程模型详解