Netty源码EventLoop
Posted 程序猿阿越
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty源码EventLoop相关的知识,希望对你有一定的参考价值。
前言
Netty串行化设计避免了线程竞争问题,核心在于EventLoop的设计,将每个需要同步的对象,都关联一个对应的EventLoop线程,专门用于处理这个对象产生的任务。本章深入理解EventLoop。
一方面从继承结构上深入理解NioEventLoop:
Netty如何通过JDK的线程池抽象实现自己的Future和Promise(AbstractEventExecutor)。
三个任务队列:定时任务队列、普通任务队列、尾部任务队列。
如何将线程与EventLoop绑定(SingleThreadEventExecutor)。
另一方面,学习NioEventLoop的职责和执行:
针对JDKSelector的性能优化。
解决JDK空轮询BUG。
事件循环执行流程select->processSelectionKey->runAllTasks。
一、EventLoop继承结构
1、抽象
首先从抽象的接口看,顶层是EventExecutorGroup。
负责提供EventExecutor(s),通过next方法选择下一个EventExecutor(竟然是EventExecutorGroup的子接口)。
负责管理EventExecutor(s)的生命周期,提供关闭所有EventExecutor服务。
public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {
// 当所有EventExecutor都正在关闭或已经被关闭时,返回true
boolean isShuttingDown();
// 优雅关闭所有EventExecutor
Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
// 返回一个被当前Group管理的EventExecutor
EventExecutor next();
}
EventExecutor继续扩展EventExecutorGroup接口
特殊的EventExecutorGroup,next方法只会返回自身。
判断 给定线程/当前线程 是否与EventLoop处于同一个线程中。
构造与当前EventExecutor绑定的Promise与Future实例。
public interface EventExecutor extends EventExecutorGroup {
// 返回自身
@Override
EventExecutor next();
// 判断 给定线程/当前线程 是否与EventLoop处于同一个线程中
boolean inEventLoop();
boolean inEventLoop(Thread thread);
// 构造与当前EventExecutor绑定的Promise与Future实例
<V> Promise<V> newPromise();
<V> ProgressivePromise<V> newProgressivePromise();
<V> Future<V> newSucceededFuture(V result);
<V> Future<V> newFailedFuture(Throwable cause);
}
EventLoopGroup与EventExecutor平级,扩展EventExecutorGroup接口,是一个特殊的EventExecutorGroup,支持注册Channel。
public interface EventLoopGroup extends EventExecutorGroup {
ChannelFuture register(Channel channel);
ChannelFuture register(ChannelPromise promise);
}
结合EventExecutor和EventLoopGroup,不难看出Channel<->EventLoop<->Thread的绑定关系。
OrderedEventExecutor标记接口,标记EventExecutor会有序处理所有task。
/**
* Marker interface for {@link EventExecutor}s that will process all submitted tasks in an ordered / serial fashion.
*/
public interface OrderedEventExecutor extends EventExecutor {
}
EventLoop也可以认为是一个标记接口,继承了EventLoopGroup接口,可以注册Channel并处理Channel上的所有IO操作,一个EventLoop实例往往管理多个Channel。
/**
* Will handle all the I/O operations for a {@link Channel} once registered.
* One {@link EventLoop} instance will usually handle more than one {@link Channel} but this may depend on implementation details and internals.
*/
public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
@Override
EventLoopGroup parent();
}
2、实现
AbstractEventExecutor
AbstractEventExecutor是顶层抽象类(除去JDK),实现一些通用方法,这里贴出来部分。
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
private final EventExecutorGroup parent;
protected AbstractEventExecutor(EventExecutorGroup parent) {
this.parent = parent;
}
@Override
public EventExecutorGroup parent() {
return parent;
}
@Override
public EventExecutor next() {
return this;
}
@Override
public boolean inEventLoop() {
return inEventLoop(Thread.currentThread());
}
@Override
public Future<?> shutdownGracefully() {
return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
}
@Override
public <V> Promise<V> newPromise() {
return new DefaultPromise<V>(this);
}
}
AbstractEventExecutor还继承了JDK的AbstractExecutorService线程池抽象类,主要是为了使用JDK提供的线程池服务的一套抽象体系,同时接入自己的Future&Promise体系。submit方法适配了JDK的submit方法,返回继承JDKFuture的NettyFuture;newTaskFor方法,在JDK中是返回FutureTask,而在Netty中是Netty自己的PromiseTask。
// 父类AbstractExecutorService接口定义返回JDK的Future;
// 这里接口定义返回为Netty的Future(因为是JDKFuture的子类)。
@Override
public <T> Future<T> submit(Runnable task, T result) {
return (Future<T>) super.submit(task, result);
}
// 重写JDK的AbstractExecutorService#newTaskFor方法,为的是接入Netty自己的PromiseTask
@Override
protected final <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new PromiseTask<T>(this, runnable, value);
}
PromiseTask
简单看一下PromiseTask,继承Netty的DefaultPromise,实现JDK的RunnableFuture,属于一个适配类。
class PromiseTask<V> extends DefaultPromise<V> implements RunnableFuture<V> {
// 一个Callable或一个Runnable
private Object task;
}
一方面,PromiseTask禁用了所有设置Promise结果的方法。
@Override
public final Promise<V> setFailure(Throwable cause) {
throw new IllegalStateException();
}
@Override
public final Promise<V> setSuccess(V result) {
throw new IllegalStateException();
}
@Override
public final boolean tryFailure(Throwable cause) {
return false;
}
// ...
另一方面实现JDK的RunnableFuture,为了可以提交到JDK的线程池抽象服务,保证只有真正执行任务时才能设置Promise结果。
@Override
public void run() {
try {
// super.setUncancellable Promise设置不可取消
if (setUncancellableInternal()) {
// 执行任务
V result = runTask();
// super.setSuccess Promise设置执行成功
setSuccessInternal(result);
}
} catch (Throwable e) {
// super.setFailure Promise设置执行失败
setFailureInternal(e);
}
}
final V runTask() throws Exception {
final Object task = this.task;
if (task instanceof Callable) {
return ((Callable<V>) task).call();
}
((Runnable) task).run();
return null;
}
AbstractScheduledEventExecutor
AbstractScheduledEventExecutor在AbstractEventExecutor的基础上实现了定时/延迟任务执行。
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
// 优先级队列,存储延迟任务,根据任务的下次执行时间从小到大排列
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
// 下一个任务的id
long nextTaskId;
}
schedule提交任务,如果是当前EventLoop线程,直接放入scheduledTaskQueue,否则调用子类实现的execute方法(放入普通任务队列)。
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return schedule(new ScheduledFutureTask<Void>(
this,
command,
deadlineNanos(unit.toNanos(delay))));
}
private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (inEventLoop()) {
scheduleFromEventLoop(task);
} else {
// ...
execute(task);
}
return task;
}
final void scheduleFromEventLoop(final ScheduledFutureTask<?> task) {
scheduledTaskQueue().add(task.setId(++nextTaskId));
}
pollScheduledTask获取任务,要求执行线程必须是EventLoop线程。这个方法由子类调用,也就是说是子类控制消费优先级队列中的task。
protected final Runnable pollScheduledTask(long nanoTime) {
assert inEventLoop();
// 取出优先级队列第一个任务
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
// 如果这个任务到了执行的时间,则从队列中移除并返回
if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) {
return null;
}
scheduledTaskQueue.remove();
scheduledTask.setConsumed();
return scheduledTask;
}
SingleThreadEventExecutor
SingleThreadEventExecutor实现了单线程的EventExecutor的execute方法,并将线程与EventLoop进行了绑定。先来看一下成员变量。
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
// 状态字段原子更新器
private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
// 普通任务队列
private final Queue<Runnable> taskQueue;
// 绑定线程
private volatile Thread thread;
// JDKExecutor
private final Executor executor;
// 队列最大长度 默认Integer.MAX_VALUE
private final int maxPendingTasks;
// 拒绝策略 抛出RejectedExecutionException异常
private final RejectedExecutionHandler rejectedExecutionHandler;
// 上次执行时间
private long lastExecutionTime;
// 状态
private volatile int state = ST_NOT_STARTED;
}
SingleThreadEventExecutor维护了普通任务队列,队列长度Integer.MAX_VALUE可认为无界。
通过JDKExecutor执行任务并绑定线程至thread成员变量。
内部通过state成员变量控制SingleThreadEventExecutor的行为。
一方面,SingleThreadEventExecutor维护了任务队列,给子类提供获取/查询/新增任务的辅助方法,屏蔽了内部的任务队列。比如下面获取Task的pollTask方法。
protected Runnable pollTask() {
assert inEventLoop();
return pollTaskFrom(taskQueue);
}
protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
for (;;) {
Runnable task = taskQueue.poll();
if (task != WAKEUP_TASK) { // 一个特殊的Runnable,对于NioEventLoop可以忽略
return task;
}
}
}
再比如hasTasks方法查询是否存在任务。
protected boolean hasTasks() {
assert inEventLoop();
return !taskQueue.isEmpty();
}
另一方面,SingleThreadEventExecutor实现了JDK Executor的execute方法。
@Override
public void execute(Runnable task) {
// 第二个参数,默认是true,忽略
execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
// 将task加入任务队列
addTask(task);
if (!inEventLoop) {
// 如果当前线程的不是EventLoop绑定的线程,尝试开启这个EventLoop对应的线程
startThread();
if (isShutdown()) {
// ... 执行拒绝策略
}
}
// 唤醒EventLoop
// 重点看NioEventLoop的wakeup方法,会唤醒java.nio.channels.Selector#wakeup
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
重点看startThread方法,首先CAS更新state状态成功后,执行doStartThread方法。
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
doStartThread();
// ...
}
}
}
最终doStartThread将执行线程与EventLoop(SingleThreadEventExecutor)做了绑定,具体如何消费任务队列由子类的run方法实现。这里的executor可以认为是个new Thread(new Runnable()).start(),只是提供了线程工厂,方便线程命名。
// 绑定线程
private volatile Thread thread;
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
// 设置EventLoop对应线程是executor中的线程
thread = Thread.currentThread();
// run抽象方法,需要子类实现
SingleThreadEventExecutor.this.run();
// ...
}
});
}
所以SingleThreadEventExecutor也实现了重要的inEventLoop方法,判断入参线程(往往是当前线程)与绑定线程是否一致。
// 绑定线程
private volatile Thread thread;
@Override
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
SingleThreadEventLoop
SingleThreadEventLoop实现了EventLoopGroup需要实现的register抽象方法,同时又引入了一个任务队列---尾部任务队列。
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
// 第三个任务队列,尾部任务队列,默认长度Integer.MAX_VALUE
private final Queue<Runnable> tailTasks;
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
// 调用unsafe注册
@Override
public ChannelFuture register(final ChannelPromise promise) {
promise.channel().unsafe().register(this, promise);
return promise;
}
// 执行尾部任务
@Override
protected void afterRunningAllTasks() {
runAllTasksFrom(tailTasks);
}
// 重写父类hasTasks方法,当普通任务队列和尾部任务队列都为空才返回false
@Override
protected boolean hasTasks() {
return super.hasTasks() || !tailTasks.isEmpty();
}
}
二、NioEventLoop
NioEventLoop在run方法中做了很多事情:
轮询Channel上发生的事件,并分发给Unsafe处理。
执行task,包括定时任务、普通任务、尾部任务。
检测JDK NIO空轮询bug并重构Selector。
针对JDK NIO的性能优化。
public final class NioEventLoop extends SingleThreadEventLoop {
// JDK NIO空轮询BUG的检测阈值。默认512
private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
// 被select策略使用
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};
// JDK SelectorImpl
private Selector selector;
// Netty SelectedSelectionKeySetSelector
private Selector unwrappedSelector;
// 存储SelectionKey 继承AbstractSet
private SelectedSelectionKeySet selectedKeys;
// 创建JDK Selector
private final SelectorProvider provider;
// 一个状态标记
private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
// select策略
private final SelectStrategy selectStrategy;
// ioRatio
private volatile int ioRatio = 50;
}
1、一个性能优化
NioEventLoop中有一个特殊的成员变量:SelectedSelectionKeySet selectedKeys,SelectedSelectionKeySet继承了AbstractSet是个用于存储SelectionKey的Set。
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
SelectionKey[] keys;
int size;
}
为什么会有这样一个特殊的Set,作用是什么?在NioEventLoop的构造方法中找到了答案。
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
关注openSelector方法。
private SelectorTuple openSelector() {
// 创建JDK Selector
final Selector unwrappedSelector;
try {
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
// 默认false,开启优化
if (DISABLE_KEY_SET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});
if (!(maybeSelectorImplClass instanceof Class) ||
!((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
if (maybeSelectorImplClass instanceof Throwable) {
Throwable t = (Throwable) maybeSelectorImplClass;
}
return new SelectorTuple(unwrappedSelector);
}
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
// 通过反射,把Netty的SelectedSelectionKeySet强行放到JDK的SelectorImpl的selectedKeys和publicSelectedKeys属性上
// 后续JDK的Selector.select时直接会把SelectionKey放到SelectedSelectionKeySet这个Set中
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
long publicSelectedKeysFieldOffset =
PlatformDependent.objectFieldOffset(publicSelectedKeysField);
if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
PlatformDependent.putObject(
unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
PlatformDependent.putObject(
unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
return null;
}
}
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
return e;
}
}
});
// 如果没能成功注入,降级使用JDK原生Selector
if (maybeException instanceof Exception) {
selectedKeys = null;
Exception e = (Exception) maybeException;
return new SelectorTuple(unwrappedSelector);
}
// 成功注入,使用SelectedSelectionKeySetSelector代理原生Selector
selectedKeys = selectedKeySet;
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}
上面这一大串代码,都是为了做性能优化,目的是将Netty自定义的SelectedSelectionKeySet注入JDK的SelectorImpl的成员变量selectedKeys中。当执行JDK的Selector.select方法时,对应的SelectionKey会放入selectedKeys中,如果这里通过反射偷换了集合对象,元素会加入到Netty自定义的SelectedSelectionKeySet集合中。
public abstract class SelectorImpl extends AbstractSelector {
protected Set<SelectionKey> selectedKeys = new HashSet();
}
用自定义的Set实现替换原来的HashSet,优势在于Netty的使用场景+底层数据结构优化。由于Netty对于SelectionKey只需要遍历操作,不需要类似于contains等依赖哈希表的方法,所以更合适的数据结构是数组。
一方面add方法不会有哈希冲突。
@Override
public boolean add(SelectionKey o) {
if (o == null) {
return false;
}
keys[size++] = o;
if (size == keys.length) {
increaseCapacity();
}
return true;
}
另一方面可以通过数组直接遍历获取SelectionKey。
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// ...
}
}
据说这样的性能提升1-2%。
2、轮询
NioEventLoop最重要的功能就是轮询Channel上是否有IO事件发生,并将IO事件丢给Unsafe进行处理。把run方法拆分为几步。
根据策略,执行select
处理selectionKey
根据ioRatio执行任务
protected void run() {
int selectCnt = 0;
for (;;) {
try {
// 1. 执行策略,执行select
int strategy;
try {
// hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
case SelectStrategy.SELECT:
// 下一个scheduledTask任务的执行时间
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE;
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
// 如果curDeadlineNanos=NONE会无限阻塞在selector.select上等待唤醒
strategy = select(curDeadlineNanos);
}
} finally {
nextWakeupNanos.lazySet(AWAKE);
}
default:
}
} catch (IOException e) {
// ...
continue;
}
selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) {
if (strategy > 0) {
processSelectedKeys();
}
ranTasks = runAllTasks();
} else if (strategy > 0) {
// select返回大于0
final long iostartTime = System.nanoTime();
// 2. 处理selectionKey
processSelectedKeys();
// 3. 执行任务
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
} else {
// 3. select返回为0,执行任务
ranTasks = runAllTasks(0);
}
if (ranTasks || strategy > 0) {
selectCnt = 0;
}
// 如果selectCnt>512认为发生JDKNIO空轮询bug,重构Selector
else if (unexpectedSelectorWakeup(selectCnt)) {
selectCnt = 0;
}
} catch (CancelledKeyException e) {
} catch (Throwable t) {
handleLoopException(t);
}
// ... 关闭
}
}
select
是否执行Select,如何执行Select是根据任务队列情况来决定的。
首先根据策略,确定执行select的方式。
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
DefaultSelectStrategy是默认SelectStrategy实现,如果hasTasks方法返回true表示有任务需要执行时,会执行IntSupplier的get方法返回策略,否则会返回SELECT策略。
final class DefaultSelectStrategy implements SelectStrategy {
static final SelectStrategy INSTANCE = new DefaultSelectStrategy();
private DefaultSelectStrategy() { }
@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
}
hasTasks方法是SingleThreadEventLoop提供的,当尾部任务队列或普通任务队列有任务时返回true,否则返回false。
// 重写父类hasTasks方法,当普通任务队列和尾部任务队列都为空才返回false
@Override
protected boolean hasTasks() {
return super.hasTasks() || !tailTasks.isEmpty();
}
IntSupplier这里取的实现类是NioEventLoop#selectNowSupplier,会立即执行一次select。
// 被select策略使用
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};
int selectNow() throws IOException {
return selector.selectNow();
}
执行策略得到strategy,如果strategy大于等于0表示执行过一次selectNow,其他策略枚举分为三类,都是小于0的int类型:
/**
* Indicates a blocking select should follow.
*/
int SELECT = -1;
/**
* Indicates the IO loop should be retried, no blocking select to follow directly.
*/
int CONTINUE = -2;
/**
* Indicates the IO loop to poll for new events without blocking.
*/
int BUSY_WAIT = -3;
因为默认策略没有涉及CONTINUE和BUSY_WAIT,直接看SELECT策略的实现。首先nextScheduledTaskDeadlineNanos获取下一个定时任务的deadline时间:如果没有定时任务设置为NONE;否则设置为下一个定时任务的执行时间。
private static final long NONE = Long.MAX_VALUE;
// 下一个scheduledTask任务的执行时间
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE;
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
// 如果curDeadlineNanos=NONE会阻塞在selector.select上等待唤醒
strategy = select(curDeadlineNanos);
}
} finally {
nextWakeupNanos.lazySet(AWAKE);
}
再次确认没有尾部任务或普通任务,则会执行select操作。
private int select(long deadlineNanos) throws IOException {
if (deadlineNanos == NONE) { // 阻塞在select上
return selector.select();
}
// 计算定时任务执行时间 - 当前时间
long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}
看到这里发现,并不是每次轮询都会执行select操作。如果第一次策略判断时,hasTasks返回无任务,第二次执行SELECT策略时hasTasks返回有任务,则不会执行select操作。
处理selectionKey
先不管ioRatio如何决定三类任务的执行,当执行完select之后,需要处理SelectionKey。
if (strategy > 0) {
processSelectedKeys();
}
processSelectedKeys分为两种逻辑,当selectedKeys成员变量不为空,表示走了HashSet优化(见上面一小节);否则使用原生JDK的处理方式。一般情况走前面一个优化逻辑。
// 存储SelectionKey 继承AbstractSet
private SelectedSelectionKeySet selectedKeys;
private void processSelectedKeys() {
if (selectedKeys != null) {
// 构造时,成功将SelectedSelectionKeySet注入JDK的SelectorImpl的selectedKeys中
processSelectedKeysOptimized();
} else {
// 上面那一步失败的情况,使用原生JDK里的selectedKeys
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
}
// ...
}
}
在注册Channel到Selector上时,传入的附件attachment是Channel自己,进入processSelectedKey方法。这里根据不同的事件调用Unsafe的不同方法处理,这部分等后续章节再看。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
// ...
return;
}
try {
int readyOps = k.readyOps();
// CONNECT事件,调用finishConnect
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// WRITE事件,将buffer中的内容写入对应Channel
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
// READ或ACCEPT事件,调用unsafe的read方法
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
执行任务
在处理完成selectionKey上发生的事件之后,根据ioRatio和事件数量(strategy),选择不同的策略执行任务。
if (ioRatio == 100) {
// ...
ranTasks = runAllTasks();
} else if (strategy > 0) {
// ...
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
} else {
// ...
ranTasks = runAllTasks(0);
}
根据上面继承结构的分析,NioEventLoop需要处理三类任务:定时任务、普通任务、尾部任务。
当ioRatio配置为100时,会执行所有task。无参runAllTasks方法会循环将定时任务拉取到普通任务队列,执行完所有定时和普通任务之后,执行尾部任务。不过ioRatio默认配置为50,这种情况可以忽略。
protected boolean runAllTasks() {
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false;
do {
// 1. 拉取定时任务队列中的任务到普通任务队列
fetchedAll = fetchFromScheduledTaskQueue();
// 2. 执行普通任务
if (runAllTasksFrom(taskQueue)) {
ranAtLeastOne = true;
}
} while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
if (ranAtLeastOne) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
// 3. 执行尾部任务
afterRunningAllTasks();
return ranAtLeastOne;
}
strategy>0,表示有selectionKey对应事件被激活,此时会根据ioRatio决定到底要执行多少时间的任务,ioRatio越大执行task的时间越短;
strategy=0,表示没有selectionKey对应事件被激活,会尽量少执行任务,不会超过64个定时+普通任务。
protected boolean runAllTasks(long timeoutNanos) {
// 1. 将定时任务从定时任务队列移动到普通任务队列
fetchFromScheduledTaskQueue();
// 2. 获取普通任务
Runnable task = pollTask();
if (task == null) {
// 如果前面两个队列都没任务,执行尾部任务
afterRunningAllTasks();
return false;
}
final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
long runTasks = 0;
long lastExecutionTime;
// 3. 循环执行普通任务(包含一开始获取的定时任务),直至deadline或普通任务队列为空
for (;;) {
safeExecute(task);
runTasks ++;
// 每64次循环才执行一次超时检测,因为nanoTime性能损耗...
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
// 超时结束循环
break;
}
}
task = pollTask();
// 普通任务执行完了,结束循环
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
// 4. 执行所有尾部任务
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
3、解决JDK空轮询bug
在run方法的轮询主流程中,有个局部变量selectCnt表示Channel没发生IO事件且没执行任何task的空轮询次数,当selectCnt达到512之后,会触发重构Selector,目的是解决JDK空轮询bug导致CPU100%。
protected void run() {
int selectCnt = 0;
for (;;) {
try {
// ...
selectCnt++;
// 如果selectCnt>512认为发生JDKNIO空轮询bug,重构Selector
else if (unexpectedSelectorWakeup(selectCnt)) {
selectCnt = 0;
}
// ...
}
}
private boolean unexpectedSelectorWakeup(int selectCnt) {
if (Thread.interrupted()) {
return true;
}
// selectCnt > 512 认为发生了JDK空轮询BUG
if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
rebuildSelector();
return true;
}
return false;
}
/**
* Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work around the infamous epoll 100% CPU bug.
*/
public void rebuildSelector() {
// EventLoop串行化执行,如果是run主流程,直接执行
if (!inEventLoop()) {
execute(new Runnable() {
@Override
public void run() {
rebuildSelector0();
}
});
return;
}
rebuildSelector0();
}
rebuildSelector0方法构造新的Selector,并将老Selector上的Channel重新注册到新Selector上。
private void rebuildSelector0() {
final Selector oldSelector = selector;
// 构造新的Selector,和构造方法里的一样
final SelectorTuple newSelectorTuple = openSelector();
// 注册Channel到新的Selector上
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment();
try {
// 如果key无效了 或 key已经注册到新的selector上了 跳过
if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
continue;
}
// 注册key到新的Selector上
int interestOps = key.interestOps();
key.cancel();
SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {
((AbstractNioChannel) a).selectionKey = newKey;
}
} catch (Exception e) {
}
}
// 更新成员变量selector为新的selector
selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;
// 关闭老的selector
oldSelector.close();
}
总结
AbstractEventExecutor继承JDK的AbstractExecutorService抽象线程池服务,接入自己的Future体系。
SingleThreadEventExecutor将EventLoop与线程绑定,实现无锁串行化执行。
NioEventLoop的父类分别掌控了三个任务队列:定时任务队列、普通任务队列、尾部任务队列。
NioEventLoop通过反射注入自定义Set对JDK的Selector做了一个性能优化。
NioEventLoop事件循环的执行流程是:select->processSelectionKey->runAllTasks,其中会根据策略执行select和runAllTasks。
NioEventLoop通过重构Selector的方式解决了JDK空轮询的BUG。
以上是关于Netty源码EventLoop的主要内容,如果未能解决你的问题,请参考以下文章
netty源码:4 事件调度层:为什么 EventLoop 是 Netty 的精髓?
6. Netty源码分析之EventLoop与EventLoopGroup
Netty源码深度解析-EventLoopEventLoop的构造