死磕Netty源码之Reactor线程模型详解NioEventLoop的执行
Posted 纯洁的明依
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了死磕Netty源码之Reactor线程模型详解NioEventLoop的执行相关的知识,希望对你有一定的参考价值。
`NioEventLoop`的执行在`run`方法中完成,代码如下
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
}
processSelectedKeys();
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
} catch (Throwable t) {
handleLoopException(t);
}
}
}
`NioEventLoop`执行过程大致可以分为如下三个步骤:
1.轮询检测IO事件
2.处理产生IO事件
3.处理异步任务队列
轮询检测`IO`事件:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
`wakenUp`表示是否应该唤醒正在阻塞的`select`操作,可以看到`Netty`在进行一次新轮询之前都会将`wakeUp`设置成`false`标志新的一轮轮询的开始。
接下来我们来看一下具体的`select`操作,它可以分为以下三个部分
定时任务截止事时间快到了,中断本次轮询。
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
....
}
我们可以看到`NioEventLoop`中`Reactor`线程的`select`操作也是一个`for`循环,
在`for`循环第一步中如果发现当前的定时任务队列中有任务的截止事件快到了`<=0.5ms`就跳出循环
。此外跳出之前如果发现目前为止还没有进行过`select`操作那么就会调用一次`selectNow()`,
该方法会立即返回不会阻塞。`Netty`里面定时任务队列是按照延迟时间从小到大进行排序。
protected long delayNanos(long currentTimeNanos) {
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
if (scheduledTask == null) {
return SCHEDULE_PURGE_INTERVAL;
}
return scheduledTask.delayNanos(currentTimeNanos);
}
`delayNanos`方法取出的是第一个定时任务的延迟时间,如果没有任务默认值为1秒
轮询过程中发现有任务加入或被唤醒,中断本次轮询。
for (;;) {
// 1.定时任务截至事时间快到了,中断本次轮询
...
// 2.轮询过程中发现有任务加入,中断本次轮询
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
....
}
判断任务队列是否为空或者是否被唤醒,若不为空或被唤醒就执行一次非阻塞`select`操作,跳出循环立即返回
阻塞式`select`操作:
for (;;) {
// 1.定时任务截至事时间快到了,中断本次轮询
...
// 2.轮询过程中发现有任务加入,中断本次轮询
...
// 3.阻塞式select操作
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
break;
}
....
}
执行到这一步说明`Netty`任务队列里面队列为空,并且所有定时任务延迟时间还未到`大于0.5ms`,于是在这里进行一次阻塞式`select`操作,截止到第一个定时任务的截止时间。如果第一个定时任务的延迟非常长(比如一个小时)那么线程有可能一直阻塞在`select`操作,但是只要在这段时间内有新任务加入该阻塞就会被释放
外部线程调用`execute`方法添加任务。
public void execute(Runnable task) {
...
// inEventLoop为false
wakeup(inEventLoop);
...
}
调用`wakeup`方法唤醒`selector`阻塞
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
可以看到在外部线程添加任务的时候,会调用`wakeup`方法来唤醒`selector.select(timeoutMillis)`
阻塞`select`操作结束之后`Netty`又做了一系列的状态判断来决定是否中断本次轮询,中断本次轮询的条件有
轮询到IO事件
oldWakenUp参数为true
任务队列里面有任务hasTasks
第一个定时任务即将要被执行
用户主动唤醒 => wakenUp.get()
避免`JDK`空轮询`BUG`
long currentTimeNanos = System.nanoTime();
for (;;) {
// 1.定时任务截止事时间快到了,中断本次轮询
...
// 2.轮询过程中发现有任务加入,中断本次轮询
...
// 3.阻塞式select操作
selector.select(timeoutMillis);
// 4.解决jdk的nio bug
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
rebuildSelector();
selector = this.selector;
selector.selectNow();
selectCnt = 1;
break;
}
currentTimeNanos = time;
...
}
`Netty`每次进行`selector.select(timeoutMillis)`之前记录开始时间`currentTimeNanos`,
在`select`之后记录一下结束时间,判断`select`操作是否至少持续`timeoutMillis`秒
time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos
可转换为
time - currentTimeNanos >= TimeUnit.MILLISECONDS.toNanos(timeoutMillis)
如果持续的时间大于等于`timeoutMillis`说明就是一次有效的轮询重置`selectCnt`标志,
否则表明该阻塞方法并没有阻塞这么长时间,可能触发了`JDK`的空轮询`BUG`,
当空轮询的次数超过一个阀值的时候(默认是`512`)就开始重建`selector`。接下来分析一下`Netty`的`rebuildSelector`过程
public void rebuildSelector() {
final Selector oldSelector = selector;
final Selector newSelector;
newSelector = openSelector();
int nChannels = 0;
try {
for (;;) {
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment();
if (!key.isValid() || key.channel().keyFor(newSelector) != null) {
continue;
}
int interestOps = key.interestOps();
key.cancel();
SelectionKey newKey = key.channel().register(newSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels ++;
}
break;
}
} catch (ConcurrentModificationException e) {
continue;
}
selector = newSelector;
oldSelector.close();
}
将之前注册到老的`selector`上的`channel`重新转移到新的`selector`上。
它是通过`openSelector()`方法创建一个新的`selector`然后执行一个死循环,
只要执行过程中出现过一次并发修改`selectionKeys`异常就重新开始转移,具体的转移步骤为:
1.拿到有效的key
2.取消该key在旧的selector上的事件注册
3.将该key对应的channel注册到新的selector上
4.重新绑定channel和新的key的关系
转移完成之后就可以将原有的`selector`废弃,后面所有的轮询都是在新的`selector`进行
处理产生`IO`事件:
处理`IO`事件的过程是在`processSelectedKeys()`中完成
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
这里的`selectedKeys`是`SelectedSelectionKeySet`对象的实例,它是在`NioEventLoop`的构造方法中调用的`openSelector`时初始化的
private Selector NioEventLoop.openSelector() {
//...
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
// selectorImplClass -> sun.nio.ch.SelectorImpl
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
selectedKeysField.setAccessible(true);
publicSelectedKeysField.setAccessible(true);
selectedKeysField.set(selector, selectedKeySet);
publicSelectedKeysField.set(selector, selectedKeySet);
//...
selectedKeys = selectedKeySet;
}
通过反射将`selectedKeys`与`sun.nio.ch.SelectorImpl`中的两个`field`绑定。
这两个`field`其实是两个`HashSet`。`selector`在调用`select()`方法的时候如果有`IO`事件发生
,就会往里面的两个`field`中塞相应的`selectionKey`,相当于往一个`hashSet`中`add`元素,`Netty`
通过反射将`JDK`中的两个`field`替换掉,接下来我们看一下`Netty`自定义`SelectedSelectionKeySet`的`add`方法做了哪些优化?
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
private SelectionKey[] keysA;
private int keysASize;
private SelectionKey[] keysB;
private int keysBSize;
private boolean isA = true;
SelectedSelectionKeySet() {
keysA = new SelectionKey[1024];
keysB = keysA.clone();
}
public boolean add(SelectionKey o) {
if (o == null) {
return false;
}
if (isA) {
int size = keysASize;
keysA[size ++] = o;
keysASize = size;
if (size == keysA.length) {
doubleCapacityA();
}
} else {
int size = keysBSize;
keysB[size ++] = o;
keysBSize = size;
if (size == keysB.length) {
doubleCapacityB();
}
}
return true;
}
private void doubleCapacityA() {
SelectionKey[] newKeysA = new SelectionKey[keysA.length << 1];
System.arraycopy(keysA, 0, newKeysA, 0, keysASize);
keysA = newKeysA;
}
private void doubleCapacityB() {
SelectionKey[] newKeysB = new SelectionKey[keysB.length << 1];
System.arraycopy(keysB, 0, newKeysB, 0, keysBSize);
keysB = newKeysB;
}
SelectionKey[] flip() {
if (isA) {
isA = false;
keysA[keysASize] = null;
keysBSize = 0;
return keysA;
} else {
isA = true;
keysB[keysBSize] = null;
keysASize = 0;
return keysB;
}
}
public int size() {
if (isA) {
return keysASize;
} else {
return keysBSize;
}
}
public boolean remove(Object o) {
return false;
}
public boolean contains(Object o) {
return false;
}
public Iterator<SelectionKey> iterator() {
throw new UnsupportedOperationException();
}
}
该类继承了`AbstractSet`说明该类可以当作一个`set`来用,但是底层使用两个数组来交替使用,在`add`方法中判断当前使用哪个数组,找到对应的数组,然后经历下面三个步骤
1.将SelectionKey塞到该数组的逻辑尾部
2.更新该数组的逻辑长度+1
3.如果该数组的逻辑长度等于数组的物理长度,就将该数组扩容
待程序跑过一段时间,等数组的长度足够长每次在轮询到`NIO`事件的时候,`Netty`只需要`O(1)`的时间复杂度就能将`SelectionKey`塞到`set`中去,而`JDK`底层使用的`hashSet`需要`O(lgn)`的时间复杂度,接下来我们继续跟进`processSelectedKeysOptimized`方法
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
// 1.取出IO事件以及对应的channel
final SelectionKey k = selectedKeys[i];
if (k == null) {
break;
}
selectedKeys[i] = null;
final Object a = k.attachment();
// 2.处理该channel
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
// 3.判断是否该再来次轮询
if (needsToSelectAgain) {
for (;;) {
i++;
if (selectedKeys[i] == null) {
break;
}
selectedKeys[i] = null;
}
selectAgain();
selectedKeys = this.selectedKeys.flip();
i = -1;
}
}
}
取出`IO`事件以及对应的`Netty Channel`类:
final SelectionKey k = selectedKeys[i];
if (k == null) {
break;
}
selectedKeys[i] = null;
final Object a = k.attachment();
这里其实也能体会到优化过的`SelectedSelectionKeySet`的好处,遍历的时候遍历的是数组相对`JDK`原生的`HashSet`效率有所提高。拿到当前`SelectionKey`之后将`selectedKeys[i]`置为`null`,这里解释一下这么做的理由:想象一下这种场景假设一个`NioEventLoop`平均每次轮询出`N`个`IO`事件高峰期轮询出`3N`个事件,那么`selectedKeys`的物理长度要大于等于`3N`,如果每次处理这些`key`不设置`selectedKeys[i]`为空,高峰期一过这些存在数组尾部的`selectedKeys[i]`对应的`SelectionKey`将一直无法被回收,`SelectionKey`对应的对象可能不大,但是它的`attachment`可能很大,这样一来这些元素是`GC Root`可达的很容易造成`GC`不掉,内存泄漏就发生了
处理该`Channel`
processSelectedKey(k, (AbstractNioChannel) a);
接下来分析一下这里的`attachment`对象是啥玩意,我们回顾一下`Channel`注册的过程
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
`javaChannel()`返回`Netty`类`SelectableChannel`对应的`JDK`底层`channel`对象。
protected SelectableChannel javaChannel() {
return ch;
}
查看`SelectableChannel`的`register`方法,不难推断出`Netty`轮询注册机制其实是将`SelectableChannel`对象注册到`JDK`类`Selctor`对象上去,并且将`AbstractNioChannel`类作为一个`attachment`附属上,这样在`JDK`轮询出某条`SelectableChannel`有`IO`事件发生时,就可以直接取出`AbstractNioChannel`进行后续操作,关于`processSelectedKey(SelectionKey k, AbstractNioChannel ch)`将在下一篇中详细介绍。
判断是否需要重建`selector`:
if (needsToSelectAgain) {
for (;;) {
i++;
if (selectedKeys[i] == null) {
break;
}
selectedKeys[i] = null;
}
selectAgain();
selectedKeys = this.selectedKeys.flip();
i = -1;
}
每次在抓到`IO`事件之后都会将`needsToSelectAgain`重置为`false`,那么什么时候`needsToSelectAgain`会重新被设置成`true`呢?这里的`needsToSelectAgain`对象通过开发工具可以很方便的找到被引用的地方
void cancel(SelectionKey key) {
key.cancel();
cancelledKeys ++;
if (cancelledKeys >= CLEANUP_INTERVAL) {
cancelledKeys = 0;
needsToSelectAgain = true;
}
}
继续查看`cancel`函数被调用的地方
protected void doDeregister() throws Exception {
eventLoop().cancel(selectionKey());
}
在`Channel`从`selector`上移除时调用`cancel`函数将`key`取消,当被去掉的`key`到达`CLEANUP_INTERVAL`的时候设置`needsToSelectAgain`为`true`。每满`256`次就会进入到`if`的代码块,首先将`selectedKeys`的内部数组全部清空方便被`JVM`垃圾回收,然后重新调用`selectAgain`重新填装一下`selectionKey`。`Netty`这么做目的我想应该是每隔256次`channel`断线,重新清理一下`selectionKey`保证现存的`selectionKey`及时有效
处理异步任务队列:
我们取三种典型的`Task`使用场景来分析
用户自定义普通任务:
ctx.channel().eventLoop().execute(new Runnable() {
public void run() {
//...
}
});
我们跟进`execute`方法,看重点
public void execute(Runnable task) {
//...
addTask(task);
//...
}
`execute`方法调用`addTask`方法
protected void addTask(Runnable task) {
// ...
if (!offerTask(task)) {
reject(task);
}
}
final boolean offerTask(Runnable task) {
// ...
return taskQueue.offer(task);
}
跟到`offerTask`方法基本上`task`就落地了,`Netty`内部使用一个`taskQueue`将`task`保存起来
private final Queue<Runnable> taskQueue;
taskQueue = newTaskQueue(this.maxPendingTasks);
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
// This event loop never calls takeTask()
return PlatformDependent.newMpscQueue(maxPendingTasks);
}
`taskQueue`在`SingleThreadEventExecutor`构造方法中被初始化,我们发现`taskQueue`在`NioEventLoop`中默认实现是`MPSC`队列(`MPSC`队列即多生产者单消费者队列),`Netty`使用`MPSC`方便的将外部线程的`task`聚集在`Reactor`线程内部用单线程来串行执行。我们可以借鉴`Netty`的任务执行模式来处理类似多线程数据上报,定时聚合的应用。本节讨论任务场景中所有代码执行都是在`Reactor`线程中的,所以所有调用`inEventLoop()`的地方都返回`true`,既然都是在`reactor`线程中执行那么其实这里的`MPSC`队列其实没有发挥真正的作用,`MPSC`大显身手的地方其实在第二种场景。
非当前`Reactor`线程调用`Channel`的各种方法:
channel.write(...)
它会调用`AbstractChannelHandlerContext`的`write`方法
private void write(Object msg, boolean flush, ChannelPromise promise) {
// ...
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}
外部线程在调用`write`的时候`executor.inEventLoop()`会返回`false`,
直接进入到`else`分支将`write`封装成一个`WriteTask`(这里仅仅是`write`而没有`flush`因此`flush`参数为`false`),
然后调用`safeExecute`方法。
private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
// ...
executor.execute(runnable);
// ...
}
接下来的调用链就进入到第一种场景了,但是和第一种场景有个明显的区别就是,第一种场景的调用链的发起线程是`Reactor`线程,
第二种场景的调用链的发起线程是用户线程,用户线程可能会有很多个。显然多个线程并发写`taskQueue`可能出现线程同步问题,
于是这种场景下`Netty`的`MPSC`就有了用武之地。
用户自定义定时任务:
ctx.channel().eventLoop().schedule(new Runnable() {
public void run() {
}
}, 60, TimeUnit.SECONDS);
第三种场景就是定时任务逻辑了,用的最多的便是如上方法:在一定时间之后执行任务,我们跟进`schedule`方法
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
// ...
return schedule(new ScheduledFutureTask<Void>(this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
}
通过`ScheduledFutureTask`将用户自定义任务再次包装成一个`Netty`内部的任务
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (inEventLoop()) {
scheduledTaskQueue().add(task);
} else {
execute(new Runnable() {
@Override
public void run() {
scheduledTaskQueue().add(task);
}
});
}
return task;
}
在执行定时任务前`Netty`会先判断当前是否在内部线程进行调用,如果是在内部线程则直接将任务添加进队列,如果是在外部线程调用`schedule`,`Netty`将添加定时任务的逻辑封装成一个普通的`task`,这个`task`的任务是添加[添加定时任务]的任务而不是添加定时任务,其实也就是第二种场景这样对`PriorityQueue`的访问就变成单线程即只有`Reactor`线程,确保线程安全
`scheduledTaskQueue()`方法会返回一个优先级队列,然后调用`add`方法将定时任务加入到队列中去
Queue<ScheduledFutureTask >> scheduledTaskQueue() {
if (scheduledTaskQueue == null) {
scheduledTaskQueue = new PriorityQueue<ScheduledFutureTask >>();
}
return scheduledTaskQueue;
}
接下来我们分析一下优先级队列中定时任务的比较规则,代码如下
public int compareTo(Delayed o) {
if (this == o) {
return 0;
}
ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
long d = deadlineNanos() - that.deadlineNanos();
if (d < 0) {
return -1;
} else if (d > 0) {
return 1;
} else if (id < that.id) {
return -1;
} else if (id == that.id) {
throw new Error();
} else {
return 1;
}
}
两个定时任务的比较是先比较任务的截止时间,截止时间相同的情况下再比较`ID`(即任务添加的顺序),若`ID`再相同抛异常,这样在执行定时任务的时候就能保证最近截止时间的任务先执行。下面我们再来看下`Netty`是如何来保证各种定时任务的执行的。
`Netty`里面的定时任务分以下三种:
1.若干时间后执行一次
2.每隔一段时间执行一次
3.每次执行结束,隔一定时间再执行一次
`Netty`使用一个`periodNanos`来区分这三种情况
public void run() {
if (periodNanos == 0) {
V result = task.call();
setSuccessInternal(result);
} else {
task.call();
long p = periodNanos;
if (p > 0) {
deadlineNanos += p;
} else {
deadlineNanos = nanoTime() - p;
}
scheduledTaskQueue.add(this);
}
}
`if(periodNanos == 0)`对应若干时间后执行一次的定时任务类型执行完了该任务就结束了,否则进入到`else`代码块先执行任务然后再区分是哪种类型的任务,`periodNanos`大于`0`表示是以固定频率执行某个任务和任务的持续时间无关,然后设置该任务的下一次截止时间为本次的截止时间加上间隔时间`periodNanos`,否则就是每次任务执行完毕之后间隔多长时间之后再次执行,截止时间为当前时间加上间隔时间(`-p`就表示加上一个正的间隔时间),最后将当前任务对象再次加入到队列实现任务的定时执行。
任务的调度:
在了解了Netty内部的任务添加机制后,我们回到处理异步任务队列方法中
runAllTasks(long timeoutNanos);
这行代码表示了尽量在一定的时间内将所有的任务都取出来`run`一遍
,`timeoutNanos`表示该方法最多执行这么长时间(这里有个`ioRatio`的变量意思是`IO`任务所占的比重(默认是`50`),也就是说`IO`任务和非`IO`任务所分配的时间是`1:1`)。因为如果`Reactor`线程在此停留的时间过长,那么将积攒许多的`IO`事件无法处理最终导致大量客户端请求阻塞,因此默认情况下`Netty`将控制内部队列的执行时间。
protected boolean runAllTasks(long timeoutNanos) {
fetchFromScheduledTaskQueue();
Runnable task = pollTask();
//...
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
safeExecute(task);
runTasks ++;
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
这段代码便是`Reactor`执行`task`的所有逻辑,可以拆解成下面几个步骤
1.从scheduledTaskQueue转移定时任务到taskQueue(mpsc queue)
2.计算本次任务循环的截止时间
3.执行任务
4.收尾
从`scheduledTaskQueue`转移定时任务到`taskQueue(mpsc queue)`
首先调用`fetchFromScheduledTaskQueue()`方法,将到期的定时任务转移到`mpsc queue`里面。
private boolean fetchFromScheduledTaskQueue() {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
Runnable scheduledTask = pollScheduledTask(nanoTime);
while (scheduledTask != null) {
if (!taskQueue.offer(scheduledTask)) {
// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
scheduledTask = pollScheduledTask(nanoTime);
}
return true;
}
`NioEventLoop`会维护两个任务队列(一个定时任务队列一个正常任务队列),这里的逻辑就是把定时任务队列中已经到执行时间的任务取出来放到正常的任务队列中去,来看下`pollScheduledTask`这个方法
protected final Runnable pollScheduledTask(long nanoTime) {
assert inEventLoop();
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
if (scheduledTask == null) {
return null;
}
if (scheduledTask.deadlineNanos() <= nanoTime) {
scheduledTaskQueue.remove();
return scheduledTask;
}
return null;
}
可以看到每次`pollScheduledTask`的时候,只有在当前任务的截止时间已经到了才会取出来
计算本次任务循环的截止时间:
Runnable task = pollTask();
// ...
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
这一步将取出第一个任务用`Reactor`线程传入的超时时间`timeoutNanos`来计算出当前任务循环`deadline`,并且使用`runTasks`,`lastExecutionTime`来时刻记录任务的状态
循环执行任务:
for (;;) {
safeExecute(task);
runTasks ++;
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
这一步便是`Netty`里面执行所有任务的核心代码了。首先调用`safeExecute`来确保任务安全执行忽略任何异常
protected static void safeExecute(Runnable task) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception. Task: {}", task, t);
}
}
然后将已运行任务`runTasks`加一,每隔`0x3F`任务(每执行完`64`个任务)判断当前时间是否超过本次`reactor`任务循环的截止时间了,如果超过那就`break`掉,如果没有超过那就继续执行。可以看到`Netty`对性能的优化考虑地相当的周到,假设`Netty`任务队列里面如果有海量小任务,如果每次都要执行完任务都要判断一下是否到截止时间,那么效率是比较低下的。
收尾
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
收尾工作很简单,调用一下`afterRunningAllTasks`方法
protected void afterRunningAllTasks() {
runAllTasksFrom(tailTasks);
}
`NioEventLoop`可以通过父类`SingleThreadEventLoop`的`executeAfterEventLoopIteration`方法向`tailTasks`中添加收尾任务,
比如你想统计一下一次执行一次任务循环花了多长时间就可以调用此方法
public final void executeAfterEventLoopIteration(Runnable task) {
// ...
if (!tailTasks.offer(task)) {
reject(task);
}
// ...
}
至此`Reactor`线程模型源码分析完毕,在下一篇中将介绍新连接的接入过程。
以上是关于死磕Netty源码之Reactor线程模型详解NioEventLoop的执行的主要内容,如果未能解决你的问题,请参考以下文章
#yyds干货盘点# Netty源码分析之Reactor线程模型详解