Netty源码EventLoop

Posted 程序猿阿越

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty源码EventLoop相关的知识,希望对你有一定的参考价值。

前言

Netty串行化设计避免了线程竞争问题,核心在于EventLoop的设计,将每个需要同步的对象,都关联一个对应的EventLoop线程,专门用于处理这个对象产生的任务。本章深入理解EventLoop。

一方面从继承结构上深入理解NioEventLoop:

  • Netty如何通过JDK的线程池抽象实现自己的Future和Promise(AbstractEventExecutor)。

  • 三个任务队列:定时任务队列、普通任务队列、尾部任务队列。

  • 如何将线程与EventLoop绑定(SingleThreadEventExecutor)。

另一方面,学习NioEventLoop的职责和执行:

  • 针对JDKSelector的性能优化。

  • 解决JDK空轮询BUG。

  • 事件循环执行流程select->processSelectionKey->runAllTasks。

一、EventLoop继承结构

Netty源码(十一)EventLoop
NioEventLoop.png

1、抽象

首先从抽象的接口看,顶层是EventExecutorGroup。

  • 负责提供EventExecutor(s),通过next方法选择下一个EventExecutor(竟然是EventExecutorGroup的子接口)。

  • 负责管理EventExecutor(s)的生命周期,提供关闭所有EventExecutor服务。

public interface EventExecutorGroup extends ScheduledExecutorServiceIterable<EventExecutor{
    // 当所有EventExecutor都正在关闭或已经被关闭时,返回true
    boolean isShuttingDown();
        // 优雅关闭所有EventExecutor
    Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
    // 返回一个被当前Group管理的EventExecutor
    EventExecutor next();
}

EventExecutor继续扩展EventExecutorGroup接口

  • 特殊的EventExecutorGroup,next方法只会返回自身。

  • 判断 给定线程/当前线程 是否与EventLoop处于同一个线程中。

  • 构造与当前EventExecutor绑定的Promise与Future实例。

public interface EventExecutor extends EventExecutorGroup {
    // 返回自身
    @Override
    EventExecutor next();
        // 判断 给定线程/当前线程 是否与EventLoop处于同一个线程中
    boolean inEventLoop();
    boolean inEventLoop(Thread thread);
        // 构造与当前EventExecutor绑定的Promise与Future实例
    <V> Promise<V> newPromise();
    <V> ProgressivePromise<V> newProgressivePromise();
    <V> Future<V> newSucceededFuture(V result);
    <V> Future<V> newFailedFuture(Throwable cause);
}

EventLoopGroup与EventExecutor平级,扩展EventExecutorGroup接口,是一个特殊的EventExecutorGroup,支持注册Channel。

public interface EventLoopGroup extends EventExecutorGroup {
    ChannelFuture register(Channel channel);
    ChannelFuture register(ChannelPromise promise);
}

结合EventExecutor和EventLoopGroup,不难看出Channel<->EventLoop<->Thread的绑定关系。

OrderedEventExecutor标记接口,标记EventExecutor会有序处理所有task。

/**
 * Marker interface for {@link EventExecutor}s that will process all submitted tasks in an ordered / serial fashion.
 */

public interface OrderedEventExecutor extends EventExecutor {
}

EventLoop也可以认为是一个标记接口,继承了EventLoopGroup接口,可以注册Channel并处理Channel上的所有IO操作,一个EventLoop实例往往管理多个Channel。

/**
 * Will handle all the I/O operations for a {@link Channel} once registered.
 * One {@link EventLoop} instance will usually handle more than one {@link Channel} but this may depend on implementation details and internals.
 */

public interface EventLoop extends OrderedEventExecutorEventLoopGroup {
    @Override
    EventLoopGroup parent();
}

2、实现

AbstractEventExecutor

AbstractEventExecutor是顶层抽象类(除去JDK),实现一些通用方法,这里贴出来部分。

public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
    private final EventExecutorGroup parent;
    protected AbstractEventExecutor(EventExecutorGroup parent) {
        this.parent = parent;
    }
    @Override
    public EventExecutorGroup parent() {
        return parent;
    }
    @Override
    public EventExecutor next() {
        return this;
    }
    @Override
    public boolean inEventLoop() {
        return inEventLoop(Thread.currentThread());
    }
    @Override
    public Future<?> shutdownGracefully() {
        return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
    }
    @Override
    public <V> Promise<V> newPromise() {
        return new DefaultPromise<V>(this);
    }
}

AbstractEventExecutor还继承了JDK的AbstractExecutorService线程池抽象类,主要是为了使用JDK提供的线程池服务的一套抽象体系,同时接入自己的Future&Promise体系。submit方法适配了JDK的submit方法,返回继承JDKFuture的NettyFuture;newTaskFor方法,在JDK中是返回FutureTask,而在Netty中是Netty自己的PromiseTask。

// 父类AbstractExecutorService接口定义返回JDK的Future;
// 这里接口定义返回为Netty的Future(因为是JDKFuture的子类)。
@Override
public <T> Future<T> submit(Runnable task, T result) {
    return (Future<T>) super.submit(task, result);
}
// 重写JDK的AbstractExecutorService#newTaskFor方法,为的是接入Netty自己的PromiseTask
@Override
protected final <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
  return new PromiseTask<T>(this, runnable, value);
}

PromiseTask

简单看一下PromiseTask,继承Netty的DefaultPromise,实现JDK的RunnableFuture,属于一个适配类。

class PromiseTask<Vextends DefaultPromise<Vimplements RunnableFuture<V{
    // 一个Callable或一个Runnable
    private Object task;
}

一方面,PromiseTask禁用了所有设置Promise结果的方法。

@Override
public final Promise<V> setFailure(Throwable cause) {
    throw new IllegalStateException();
}
@Override
public final Promise<V> setSuccess(V result) {
  throw new IllegalStateException();
}
@Override
public final boolean tryFailure(Throwable cause) {
  return false;
}
// ... 

另一方面实现JDK的RunnableFuture,为了可以提交到JDK的线程池抽象服务,保证只有真正执行任务时才能设置Promise结果。

@Override
public void run() {
    try {
        // super.setUncancellable Promise设置不可取消
        if (setUncancellableInternal()) {
            // 执行任务
            V result = runTask();
            // super.setSuccess Promise设置执行成功
            setSuccessInternal(result);
        }
    } catch (Throwable e) {
        // super.setFailure Promise设置执行失败
        setFailureInternal(e);
    }
}
final V runTask() throws Exception {
  final Object task = this.task;
  if (task instanceof Callable) {
    return ((Callable<V>) task).call();
  }
  ((Runnable) task).run();
  return null;
}

AbstractScheduledEventExecutor

AbstractScheduledEventExecutor在AbstractEventExecutor的基础上实现了定时/延迟任务执行。

public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
    // 优先级队列,存储延迟任务,根据任务的下次执行时间从小到大排列
    PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
    // 下一个任务的id
    long nextTaskId;
}

schedule提交任务,如果是当前EventLoop线程,直接放入scheduledTaskQueue,否则调用子类实现的execute方法(放入普通任务队列)。

@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
  return schedule(new ScheduledFutureTask<Void>(
    this,
    command,
    deadlineNanos(unit.toNanos(delay))));
}
private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
    if (inEventLoop()) {
        scheduleFromEventLoop(task);
    } else {
        // ...
        execute(task);
    }
    return task;
}
final void scheduleFromEventLoop(final ScheduledFutureTask<?> task) {
  scheduledTaskQueue().add(task.setId(++nextTaskId));
}

pollScheduledTask获取任务,要求执行线程必须是EventLoop线程。这个方法由子类调用,也就是说是子类控制消费优先级队列中的task。

protected final Runnable pollScheduledTask(long nanoTime) {
    assert inEventLoop();
    // 取出优先级队列第一个任务
    ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
    // 如果这个任务到了执行的时间,则从队列中移除并返回
    if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) {
        return null;
    }
    scheduledTaskQueue.remove();
    scheduledTask.setConsumed();
    return scheduledTask;
}

SingleThreadEventExecutor

SingleThreadEventExecutor实现了单线程的EventExecutor的execute方法,并将线程与EventLoop进行了绑定。先来看一下成员变量。

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
    // 状态字段原子更新器
    private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
            AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
    // 普通任务队列
    private final Queue<Runnable> taskQueue;
    // 绑定线程
    private volatile Thread thread;
    // JDKExecutor
    private final Executor executor;
    // 队列最大长度 默认Integer.MAX_VALUE
    private final int maxPendingTasks;
    // 拒绝策略 抛出RejectedExecutionException异常
    private final RejectedExecutionHandler rejectedExecutionHandler;
    // 上次执行时间
    private long lastExecutionTime;
    // 状态
    private volatile int state = ST_NOT_STARTED;
}
  • SingleThreadEventExecutor维护了普通任务队列,队列长度Integer.MAX_VALUE可认为无界。

  • 通过JDKExecutor执行任务并绑定线程至thread成员变量。

  • 内部通过state成员变量控制SingleThreadEventExecutor的行为。

一方面,SingleThreadEventExecutor维护了任务队列,给子类提供获取/查询/新增任务的辅助方法,屏蔽了内部的任务队列。比如下面获取Task的pollTask方法。

protected Runnable pollTask() {
    assert inEventLoop();
    return pollTaskFrom(taskQueue);
}

protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
    for (;;) {
        Runnable task = taskQueue.poll();
        if (task != WAKEUP_TASK) { // 一个特殊的Runnable,对于NioEventLoop可以忽略
            return task;
        }
    }
}

再比如hasTasks方法查询是否存在任务。

protected boolean hasTasks() {
    assert inEventLoop();
    return !taskQueue.isEmpty();
}

另一方面,SingleThreadEventExecutor实现了JDK Executor的execute方法。

@Override
public void execute(Runnable task) {
    // 第二个参数,默认是true,忽略
    execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}
private void execute(Runnable task, boolean immediate) {
  boolean inEventLoop = inEventLoop();
  // 将task加入任务队列
  addTask(task);
  if (!inEventLoop) {
    // 如果当前线程的不是EventLoop绑定的线程,尝试开启这个EventLoop对应的线程
    startThread();
    if (isShutdown()) {
      // ... 执行拒绝策略
    }
  }
  // 唤醒EventLoop
  // 重点看NioEventLoop的wakeup方法,会唤醒java.nio.channels.Selector#wakeup
  if (!addTaskWakesUp && immediate) {
    wakeup(inEventLoop);
  }
}

重点看startThread方法,首先CAS更新state状态成功后,执行doStartThread方法。

private void startThread() {
    if (state == ST_NOT_STARTED) {
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
          doStartThread();
          // ...
        }
    }
}

最终doStartThread将执行线程与EventLoop(SingleThreadEventExecutor)做了绑定,具体如何消费任务队列由子类的run方法实现。这里的executor可以认为是个new Thread(new Runnable()).start(),只是提供了线程工厂,方便线程命名。

// 绑定线程
private volatile Thread thread;
private void doStartThread() {
    assert thread == null;
    executor.execute(new Runnable() {
        @Override
        public void run() {
            // 设置EventLoop对应线程是executor中的线程
            thread = Thread.currentThread();
            // run抽象方法,需要子类实现
            SingleThreadEventExecutor.this.run();
            // ...
        }
    });
}

所以SingleThreadEventExecutor也实现了重要的inEventLoop方法,判断入参线程(往往是当前线程)与绑定线程是否一致。

// 绑定线程
private volatile Thread thread;
@Override
public boolean inEventLoop(Thread thread) {
    return thread == this.thread;
}

SingleThreadEventLoop

SingleThreadEventLoop实现了EventLoopGroup需要实现的register抽象方法,同时又引入了一个任务队列---尾部任务队列。

public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {

    protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
            SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
    // 第三个任务队列,尾部任务队列,默认长度Integer.MAX_VALUE
    private final Queue<Runnable> tailTasks;

    @Override
    public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }
    // 调用unsafe注册
    @Override
    public ChannelFuture register(final ChannelPromise promise) {
        promise.channel().unsafe().register(this, promise);
        return promise;
    }
    // 执行尾部任务
    @Override
    protected void afterRunningAllTasks() {
        runAllTasksFrom(tailTasks);
    }
    // 重写父类hasTasks方法,当普通任务队列和尾部任务队列都为空才返回false
    @Override
    protected boolean hasTasks() {
        return super.hasTasks() || !tailTasks.isEmpty();
    }
}

二、NioEventLoop

NioEventLoop在run方法中做了很多事情:

  • 轮询Channel上发生的事件,并分发给Unsafe处理。

  • 执行task,包括定时任务、普通任务、尾部任务。

  • 检测JDK NIO空轮询bug并重构Selector。

  • 针对JDK NIO的性能优化。

public final class NioEventLoop extends SingleThreadEventLoop {
    // JDK NIO空轮询BUG的检测阈值。默认512
    private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
    // 被select策略使用
    private final IntSupplier selectNowSupplier = new IntSupplier() {
        @Override
        public int get() throws Exception {
            return selectNow();
        }
    };
    // JDK SelectorImpl
    private Selector selector;
    // Netty SelectedSelectionKeySetSelector
    private Selector unwrappedSelector;
    // 存储SelectionKey 继承AbstractSet
    private SelectedSelectionKeySet selectedKeys;
    // 创建JDK Selector
    private final SelectorProvider provider;
    // 一个状态标记
    private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
    // select策略
    private final SelectStrategy selectStrategy;
    // ioRatio
    private volatile int ioRatio = 50;
}

1、一个性能优化

NioEventLoop中有一个特殊的成员变量:SelectedSelectionKeySet selectedKeys,SelectedSelectionKeySet继承了AbstractSet是个用于存储SelectionKey的Set。

final class SelectedSelectionKeySet extends AbstractSet<SelectionKey{
    SelectionKey[] keys;
    int size;
}

为什么会有这样一个特殊的Set,作用是什么?在NioEventLoop的构造方法中找到了答案。

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
             EventLoopTaskQueueFactory queueFactory) {
    super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
            rejectedExecutionHandler);
    this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
    this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
    final SelectorTuple selectorTuple = openSelector();
    this.selector = selectorTuple.selector;
    this.unwrappedSelector = selectorTuple.unwrappedSelector;
}

关注openSelector方法。

private SelectorTuple openSelector() {
    // 创建JDK Selector
    final Selector unwrappedSelector;
    try {
        unwrappedSelector = provider.openSelector();
    } catch (IOException e) {
        throw new ChannelException("failed to open a new selector", e);
    }
    // 默认false,开启优化
    if (DISABLE_KEY_SET_OPTIMIZATION) {
        return new SelectorTuple(unwrappedSelector);
    }

    Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
        @Override
        public Object run() {
            try {
                return Class.forName(
                        "sun.nio.ch.SelectorImpl",
                        false,
                        PlatformDependent.getSystemClassLoader());
            } catch (Throwable cause) {
                return cause;
            }
        }
    });

    if (!(maybeSelectorImplClass instanceof Class) ||
        !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
        if (maybeSelectorImplClass instanceof Throwable) {
            Throwable t = (Throwable) maybeSelectorImplClass;
        }
        return new SelectorTuple(unwrappedSelector);
    }

    final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
    final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

    // 通过反射,把Netty的SelectedSelectionKeySet强行放到JDK的SelectorImpl的selectedKeys和publicSelectedKeys属性上
    // 后续JDK的Selector.select时直接会把SelectionKey放到SelectedSelectionKeySet这个Set中
    Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
        @Override
        public Object run() {
            try {
                Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
                if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
                    long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
                    long publicSelectedKeysFieldOffset =
                            PlatformDependent.objectFieldOffset(publicSelectedKeysField);

                    if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
                        PlatformDependent.putObject(
                                unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
                        PlatformDependent.putObject(
                                unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
                        return null;
                    }
                }
                Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
                if (cause != null) {
                    return cause;
                }
                cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
                if (cause != null) {
                    return cause;
                }
                selectedKeysField.set(unwrappedSelector, selectedKeySet);
                publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                return null;
            } catch (NoSuchFieldException e) {
                return e;
            } catch (IllegalAccessException e) {
                return e;
            }
        }
    });
    // 如果没能成功注入,降级使用JDK原生Selector
    if (maybeException instanceof Exception) {
        selectedKeys = null;
        Exception e = (Exception) maybeException;
        return new SelectorTuple(unwrappedSelector);
    }
    // 成功注入,使用SelectedSelectionKeySetSelector代理原生Selector
    selectedKeys = selectedKeySet;
    return new SelectorTuple(unwrappedSelector,
                             new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}

上面这一大串代码,都是为了做性能优化,目的是将Netty自定义的SelectedSelectionKeySet注入JDK的SelectorImpl的成员变量selectedKeys中。当执行JDK的Selector.select方法时,对应的SelectionKey会放入selectedKeys中,如果这里通过反射偷换了集合对象,元素会加入到Netty自定义的SelectedSelectionKeySet集合中。

public abstract class SelectorImpl extends AbstractSelector {
    protected Set<SelectionKey> selectedKeys = new HashSet();
}

用自定义的Set实现替换原来的HashSet,优势在于Netty的使用场景+底层数据结构优化。由于Netty对于SelectionKey只需要遍历操作,不需要类似于contains等依赖哈希表的方法,所以更合适的数据结构是数组。

一方面add方法不会有哈希冲突。

@Override
public boolean add(SelectionKey o) {
    if (o == null) {
        return false;
    }

    keys[size++] = o;
    if (size == keys.length) {
        increaseCapacity();
    }

    return true;
}

另一方面可以通过数组直接遍历获取SelectionKey。

private void processSelectedKeysOptimized() {
    for (int i = 0; i < selectedKeys.size; ++i) {
        final SelectionKey k = selectedKeys.keys[i];
        // ...
    }
}

据说这样的性能提升1-2%。

Netty源码(十一)EventLoop
NioEventLoop性能优化.png

2、轮询

NioEventLoop最重要的功能就是轮询Channel上是否有IO事件发生,并将IO事件丢给Unsafe进行处理。把run方法拆分为几步。

Netty源码(十一)EventLoop
EventLoop做了什么.png
  • 根据策略,执行select

  • 处理selectionKey

  • 根据ioRatio执行任务

protected void run() {
    int selectCnt = 0;
    for (;;) {
        try {
            // 1. 执行策略,执行select
            int strategy;
            try {
                //  hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
                strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                switch (strategy) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.BUSY_WAIT:
                case SelectStrategy.SELECT:
                    // 下一个scheduledTask任务的执行时间
                    long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                    if (curDeadlineNanos == -1L) {
                        curDeadlineNanos = NONE;
                    }
                    nextWakeupNanos.set(curDeadlineNanos);
                    try {
                        if (!hasTasks()) {
                            // 如果curDeadlineNanos=NONE会无限阻塞在selector.select上等待唤醒
                            strategy = select(curDeadlineNanos);
                        }
                    } finally {
                        nextWakeupNanos.lazySet(AWAKE);
                    }
                default:
                }
            } catch (IOException e) {
                // ...
                continue;
            }

            selectCnt++;
            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            boolean ranTasks;
            if (ioRatio == 100) {
                  if (strategy > 0) {
                    processSelectedKeys();
                  }
                  ranTasks = runAllTasks();
            } else if (strategy > 0) {
                // select返回大于0
                final long iostartTime = System.nanoTime();
                  // 2. 处理selectionKey
                   processSelectedKeys();
                // 3. 执行任务
                  final long ioTime = System.nanoTime() - ioStartTime;
                ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
            } else {
                // 3. select返回为0,执行任务
                ranTasks = runAllTasks(0);
            }
            if (ranTasks || strategy > 0) {
                selectCnt = 0;
            }
            // 如果selectCnt>512认为发生JDKNIO空轮询bug,重构Selector
            else if (unexpectedSelectorWakeup(selectCnt)) {
                selectCnt = 0;
            }
        } catch (CancelledKeyException e) {
        } catch (Throwable t) {
            handleLoopException(t);
        }
        // ... 关闭
    }
}

select

是否执行Select,如何执行Select是根据任务队列情况来决定的。

Netty源码(十一)EventLoop
NioEventLoopSelect.png

首先根据策略,确定执行select的方式。

strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());

DefaultSelectStrategy是默认SelectStrategy实现,如果hasTasks方法返回true表示有任务需要执行时,会执行IntSupplier的get方法返回策略,否则会返回SELECT策略。

final class DefaultSelectStrategy implements SelectStrategy {
    static final SelectStrategy INSTANCE = new DefaultSelectStrategy();
    private DefaultSelectStrategy() { }
    @Override
    public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
        return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
    }
}

hasTasks方法是SingleThreadEventLoop提供的,当尾部任务队列或普通任务队列有任务时返回true,否则返回false。

// 重写父类hasTasks方法,当普通任务队列和尾部任务队列都为空才返回false
@Override
protected boolean hasTasks() {
    return super.hasTasks() || !tailTasks.isEmpty();
}

IntSupplier这里取的实现类是NioEventLoop#selectNowSupplier,会立即执行一次select。

// 被select策略使用
private final IntSupplier selectNowSupplier = new IntSupplier() {
    @Override
    public int get() throws Exception {
        return selectNow();
    }
};
int selectNow() throws IOException {
  return selector.selectNow();
}

执行策略得到strategy,如果strategy大于等于0表示执行过一次selectNow,其他策略枚举分为三类,都是小于0的int类型:

/**
 * Indicates a blocking select should follow.
 */

int SELECT = -1;
/**
 * Indicates the IO loop should be retried, no blocking select to follow directly.
 */

int CONTINUE = -2;
/**
 * Indicates the IO loop to poll for new events without blocking.
 */

int BUSY_WAIT = -3;

因为默认策略没有涉及CONTINUE和BUSY_WAIT,直接看SELECT策略的实现。首先nextScheduledTaskDeadlineNanos获取下一个定时任务的deadline时间:如果没有定时任务设置为NONE;否则设置为下一个定时任务的执行时间。

private static final long NONE = Long.MAX_VALUE;

// 下一个scheduledTask任务的执行时间
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
    curDeadlineNanos = NONE;
}
nextWakeupNanos.set(curDeadlineNanos);
try {
    if (!hasTasks()) {
        // 如果curDeadlineNanos=NONE会阻塞在selector.select上等待唤醒
        strategy = select(curDeadlineNanos);
    }
finally {
    nextWakeupNanos.lazySet(AWAKE);
}

再次确认没有尾部任务或普通任务,则会执行select操作。

private int select(long deadlineNanos) throws IOException {
  if (deadlineNanos == NONE) { // 阻塞在select上
    return selector.select();
  }
  // 计算定时任务执行时间 - 当前时间
  long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
  return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}

看到这里发现,并不是每次轮询都会执行select操作。如果第一次策略判断时,hasTasks返回无任务,第二次执行SELECT策略时hasTasks返回有任务,则不会执行select操作。

处理selectionKey

先不管ioRatio如何决定三类任务的执行,当执行完select之后,需要处理SelectionKey。

if (strategy > 0) {
    processSelectedKeys();
}

processSelectedKeys分为两种逻辑,当selectedKeys成员变量不为空,表示走了HashSet优化(见上面一小节);否则使用原生JDK的处理方式。一般情况走前面一个优化逻辑。

// 存储SelectionKey 继承AbstractSet
private SelectedSelectionKeySet selectedKeys;
private void processSelectedKeys() {
    if (selectedKeys != null) {
        // 构造时,成功将SelectedSelectionKeySet注入JDK的SelectorImpl的selectedKeys中
        processSelectedKeysOptimized();
    } else {
        // 上面那一步失败的情况,使用原生JDK里的selectedKeys
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

 private void processSelectedKeysOptimized() {
   for (int i = 0; i < selectedKeys.size; ++i) {
     final SelectionKey k = selectedKeys.keys[i];
     selectedKeys.keys[i] = null;
     final Object a = k.attachment();

     if (a instanceof AbstractNioChannel) {
       processSelectedKey(k, (AbstractNioChannel) a);
     }
     // ...
   }
 }

在注册Channel到Selector上时,传入的附件attachment是Channel自己,进入processSelectedKey方法。这里根据不同的事件调用Unsafe的不同方法处理,这部分等后续章节再看。

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    if (!k.isValid()) {
        // ...
        return;
    }
    try {
        int readyOps = k.readyOps();
        // CONNECT事件,调用finishConnect
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);
            unsafe.finishConnect();
        }
        // WRITE事件,将buffer中的内容写入对应Channel
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            ch.unsafe().forceFlush();
        }
        // READ或ACCEPT事件,调用unsafe的read方法
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

执行任务

在处理完成selectionKey上发生的事件之后,根据ioRatio和事件数量(strategy),选择不同的策略执行任务。

if (ioRatio == 100) {
    // ...
    ranTasks = runAllTasks();
else if (strategy > 0) {
    // ...
      ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
else {
    // ...
    ranTasks = runAllTasks(0);
}

根据上面继承结构的分析,NioEventLoop需要处理三类任务:定时任务、普通任务、尾部任务。

当ioRatio配置为100时,会执行所有task。无参runAllTasks方法会循环将定时任务拉取到普通任务队列,执行完所有定时和普通任务之后,执行尾部任务。不过ioRatio默认配置为50,这种情况可以忽略。

protected boolean runAllTasks() {
    assert inEventLoop();
    boolean fetchedAll;
    boolean ranAtLeastOne = false;
    do {
        // 1. 拉取定时任务队列中的任务到普通任务队列
        fetchedAll = fetchFromScheduledTaskQueue();
        // 2. 执行普通任务
        if (runAllTasksFrom(taskQueue)) {
            ranAtLeastOne = true;
        }
    } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
    if (ranAtLeastOne) {
        lastExecutionTime = ScheduledFutureTask.nanoTime();
    }
    // 3. 执行尾部任务
    afterRunningAllTasks();
    return ranAtLeastOne;
}

strategy>0,表示有selectionKey对应事件被激活,此时会根据ioRatio决定到底要执行多少时间的任务,ioRatio越大执行task的时间越短;

strategy=0,表示没有selectionKey对应事件被激活,会尽量少执行任务,不会超过64个定时+普通任务。

protected boolean runAllTasks(long timeoutNanos) {
    // 1. 将定时任务从定时任务队列移动到普通任务队列
    fetchFromScheduledTaskQueue();
    // 2. 获取普通任务
    Runnable task = pollTask();
    if (task == null) {
        // 如果前面两个队列都没任务,执行尾部任务
        afterRunningAllTasks();
        return false;
    }
    final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
    long runTasks = 0;
    long lastExecutionTime;
    // 3. 循环执行普通任务(包含一开始获取的定时任务),直至deadline或普通任务队列为空
    for (;;) {
        safeExecute(task);
        runTasks ++;
        // 每64次循环才执行一次超时检测,因为nanoTime性能损耗...
        if ((runTasks & 0x3F) == 0) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            if (lastExecutionTime >= deadline) {
                // 超时结束循环
                break;
            }
        }
        task = pollTask();
        // 普通任务执行完了,结束循环
        if (task == null) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }
    // 4. 执行所有尾部任务
    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

3、解决JDK空轮询bug

在run方法的轮询主流程中,有个局部变量selectCnt表示Channel没发生IO事件且没执行任何task的空轮询次数,当selectCnt达到512之后,会触发重构Selector,目的是解决JDK空轮询bug导致CPU100%。

protected void run() {
    int selectCnt = 0;
    for (;;) {
        try {
                        // ...
            selectCnt++;
            // 如果selectCnt>512认为发生JDKNIO空轮询bug,重构Selector
            else if (unexpectedSelectorWakeup(selectCnt)) {
                selectCnt = 0;
            }
            // ...
    }
}
private boolean unexpectedSelectorWakeup(int selectCnt) {
    if (Thread.interrupted()) {
        return true;
    }
    // selectCnt > 512 认为发生了JDK空轮询BUG
    if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
            selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
        rebuildSelector();
        return true;
    }
    return false;
}

/**
* Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work around the infamous epoll 100% CPU bug.
*/

public void rebuildSelector() {
  // EventLoop串行化执行,如果是run主流程,直接执行
  if (!inEventLoop()) {
    execute(new Runnable() {
      @Override
      public void run() {
        rebuildSelector0();
      }
    });
    return;
  }
  rebuildSelector0();
}

rebuildSelector0方法构造新的Selector,并将老Selector上的Channel重新注册到新Selector上。

private void rebuildSelector0() {
    final Selector oldSelector = selector;
    // 构造新的Selector,和构造方法里的一样
    final SelectorTuple newSelectorTuple = openSelector();
    // 注册Channel到新的Selector上
    for (SelectionKey key: oldSelector.keys()) {
        Object a = key.attachment();
        try {
            // 如果key无效了 或 key已经注册到新的selector上了 跳过
            if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
                continue;
            }
            // 注册key到新的Selector上
            int interestOps = key.interestOps();
            key.cancel();
            SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
            if (a instanceof AbstractNioChannel) {
                ((AbstractNioChannel) a).selectionKey = newKey;
            }
        } catch (Exception e) {
        }
    }
    // 更新成员变量selector为新的selector
    selector = newSelectorTuple.selector;
    unwrappedSelector = newSelectorTuple.unwrappedSelector;

    // 关闭老的selector
    oldSelector.close();
}

总结

  • AbstractEventExecutor继承JDK的AbstractExecutorService抽象线程池服务,接入自己的Future体系。

  • SingleThreadEventExecutor将EventLoop与线程绑定,实现无锁串行化执行。

  • NioEventLoop的父类分别掌控了三个任务队列:定时任务队列、普通任务队列、尾部任务队列。

  • NioEventLoop通过反射注入自定义Set对JDK的Selector做了一个性能优化。

  • NioEventLoop事件循环的执行流程是:select->processSelectionKey->runAllTasks,其中会根据策略执行select和runAllTasks。

  • NioEventLoop通过重构Selector的方式解决了JDK空轮询的BUG。

以上是关于Netty源码EventLoop的主要内容,如果未能解决你的问题,请参考以下文章

netty源码:4 事件调度层:为什么 EventLoop 是 Netty 的精髓?

6. Netty源码分析之EventLoop与EventLoopGroup

Netty源码深度解析-EventLoopEventLoop的构造

Netty4.XNetty源码分析之NioEventLoopGroup

Netty的EventLoop和线程模型

Netty实战七之EventLoop和线程模型