Netty源码分析-NioEventLoop
Posted 征服.刘华强
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty源码分析-NioEventLoop相关的知识,希望对你有一定的参考价值。
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.nio;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopException;
import io.netty.channel.EventLoopTaskQueueFactory;
import io.netty.channel.SelectStrategy;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.util.IntSupplier;
import io.netty.util.concurrent.RejectedExecutionHandler;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ReflectionUtil;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.Selector;
import java.nio.channels.SelectionKey;
import java.nio.channels.spi.SelectorProvider;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
/**
* @link SingleThreadEventLoop implementation which register the @link Channel's to a
* @link Selector and so does the multi-plexing of these in the event loop.
*
*/
public final class NioEventLoop extends SingleThreadEventLoop
private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioEventLoop.class);
private static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
//是否禁止优化JDK原生的selector
private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
//默认值512
private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
//调用JDK的selector.selectNow()方法,返回IO事件的数量
private final IntSupplier selectNowSupplier = new IntSupplier()
@Override
public int get() throws Exception
return selectNow();
;
// Workaround for JDK NIO bug.
// 修复JDK空轮询的BUG
// See:
// - http://bugs.sun.com/view_bug.do?bug_id=6427854
// - https://github.com/netty/netty/issues/203
static
final String key = "sun.nio.ch.bugLevel";
final String bugLevel = SystemPropertyUtil.get(key);
if (bugLevel == null)
try
AccessController.doPrivileged(new PrivilegedAction<Void>()
@Override
public Void run()
System.setProperty(key, "");
return null;
);
catch (final SecurityException e)
logger.debug("Unable to get/set System Property: " + key, e);
//默认值512
int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS)
selectorAutoRebuildThreshold = 0;
//默认值512
SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
if (logger.isDebugEnabled())
logger.debug("-Dio.netty.noKeySetOptimization: ", DISABLE_KEY_SET_OPTIMIZATION);
logger.debug("-Dio.netty.selectorAutoRebuildThreshold: ", SELECTOR_AUTO_REBUILD_THRESHOLD);
/**
* The NIO @link Selector.
*/
//优化后的事件选择器
private Selector selector;
//JDK-NIO的事件选择器
private Selector unwrappedSelector;
//优化了Set接口,内部使用数组
private SelectedSelectionKeySet selectedKeys;
//NIO的selector创建器
private final SelectorProvider provider;
private static final long AWAKE = -1L;
private static final long NONE = Long.MAX_VALUE;
// nextWakeupNanos is:
// AWAKE when EL is awake
// NONE when EL is waiting with no wakeup scheduled
// other value T when EL is waiting with wakeup scheduled at time T
private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
//默认实现类DefaultSelectStrategy
private final SelectStrategy selectStrategy;
//IO使用比率
private volatile int ioRatio = 50;
private int cancelledKeys;
private boolean needsToSelectAgain;
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory)
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
final SelectorTuple selectorTuple = openSelector();
//优化后的对象
this.selector = selectorTuple.selector;
//JDK原生对象
this.unwrappedSelector = selectorTuple.unwrappedSelector;
private static Queue<Runnable> newTaskQueue(
EventLoopTaskQueueFactory queueFactory)
if (queueFactory == null)
return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
private static final class SelectorTuple
final Selector unwrappedSelector;
final Selector selector;
SelectorTuple(Selector unwrappedSelector)
this.unwrappedSelector = unwrappedSelector;
this.selector = unwrappedSelector;
SelectorTuple(Selector unwrappedSelector, Selector selector)
this.unwrappedSelector = unwrappedSelector;
this.selector = selector;
private SelectorTuple openSelector()
final Selector unwrappedSelector;
try
//JDK原生Selector
unwrappedSelector = provider.openSelector();
catch (IOException e)
throw new ChannelException("failed to open a new selector", e);
//禁止优化JDK原生Selector
if (DISABLE_KEY_SET_OPTIMIZATION)
//封装为SelectorTuple对象,内部原生JDK的Selector
return new SelectorTuple(unwrappedSelector);
//反射获取JDK原生Selector的实现类sun.nio.ch.SelectorImpl
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>()
@Override
public Object run()
try
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
catch (Throwable cause)
return cause;
);
//不是Class类型,说明上面方法出现异常
if (!(maybeSelectorImplClass instanceof Class) ||
// ensure the current selector implementation is what we can instrument.
//不是unwrappedSelector的类型
!((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass()))
if (maybeSelectorImplClass instanceof Throwable)
Throwable t = (Throwable) maybeSelectorImplClass;
logger.trace("failed to instrument a special java.util.Set into: ", unwrappedSelector, t);
//如果出现异常则不进行优化
return new SelectorTuple(unwrappedSelector);
//转换为Class类型
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
//优化了Set接口,内部使用数组
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>()
@Override
public Object run()
try
//反射获取selectedKeys成员变量,原来默认使用HashSet<SelectionKey>
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
//反射获取publicSelectedKeys成员变量,原来默认使用HashSet<SelectionKey>
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
//根据JDK版本,使用不用API替换俩个成员变量
if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe())
// Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
// This allows us to also do this in Java9+ without any extra flags.
long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
long publicSelectedKeysFieldOffset =
PlatformDependent.objectFieldOffset(publicSelectedKeysField);
if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1)
//把原来的HashSet<SelectionKey>替换为数组实现的SelectedSelectionKeySet
//数组的实现比HashSet效率高一些
PlatformDependent.putObject(
unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
PlatformDependent.putObject(
unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
return null;
// We could not retrieve the offset, lets try reflection as last-resort.
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null)
return cause;
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null)
return cause;
//把原来的HashSet<SelectionKey>替换为数组实现的SelectedSelectionKeySet
//数组的实现比HashSet效率高一些
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
catch (NoSuchFieldException e)
return e;
catch (IllegalAccessException e)
return e;
);
//如果maybeException是异常则不进行优化
if (maybeException instanceof Exception)
selectedKeys = null;
Exception e = (Exception) maybeException;
logger.trace("failed to instrument a special java.util.Set into: ", unwrappedSelector, e);
return new SelectorTuple(unwrappedSelector);
selectedKeys = selectedKeySet;
logger.trace("instrumented a special java.util.Set into: ", unwrappedSelector);
//进行优化
//unwrappedSelector把内部的HasSet变成了数组的实现
//SelectedSelectionKeySetSelector代理了JDK原生的NIO-Selector
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
/**
* Returns the @link SelectorProvider used by this @link NioEventLoop to obtain the @link Selector.
*/
public SelectorProvider selectorProvider()
return provider;
@Override
protected Queue<Runnable> newTaskQueue(int maxPendingTasks)
return newTaskQueue0(maxPendingTasks);
private static Queue<Runnable> newTaskQueue0(int maxPendingTasks)
// This event loop never calls takeTask()
return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
: PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
/**
* Registers an arbitrary @link SelectableChannel, not necessarily created by Netty, to the @link Selector
* of this event loop. Once the specified @link SelectableChannel is registered, the specified @code task will
* be executed by this event loop when the @link SelectableChannel is ready.
*/
public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task)
ObjectUtil.checkNotNull(ch, "ch");
if (interestOps == 0)
throw new IllegalArgumentException("interestOps must be non-zero.");
//检查感兴趣的事件是否是channel能够注册的。
if ((interestOps & ~ch.validOps()) != 0)
throw new IllegalArgumentException(
"invalid interestOps: " + interestOps + "(validOps: " + ch.validOps() + ')');
ObjectUtil.checkNotNull(task, "task");
//如果已经关闭
if (isShutdown())
throw new IllegalStateException("event loop shut down");
//把Channel注册到selector上面,必须是EventLoop线程
if (inEventLoop())
register0(ch, interestOps, task);
else
try
// Offload to the EventLoop as otherwise java.nio.channels.spi.AbstractSelectableChannel.register
// may block for a long time while trying to obtain an internal lock that may be hold while selecting.
submit(new Runnable()
@Override
public void run()
register0(ch, interestOps, task);
).sync();
catch (InterruptedException ignore)
// Even if interrupted we did schedule it so just mark the Thread as interrupted.
Thread.currentThread().interrupt();
private void register0(SelectableChannel ch, int interestOps, NioTask<?> task)
try
//把JDK的channel注册到selector上面
ch.register(unwrappedSelector, interestOps, task);
catch (Exception e)
throw new EventLoopException("failed to register a channel", e);
/**
* Returns the percentage of the desired amount of time spent for I/O in the event loop.
*/
public int getIoRatio()
return ioRatio;
/**
* Sets the percentage of the desired amount of time spent for I/O in the event loop. Value range from 1-100.
* The default value is @code 50, which means the event loop will try to spend the same amount of time for I/O
* as for non-I/O tasks. The lower the number the more time can be spent on non-I/O tasks. If value set to
* @code 100, this feature will be disabled and event loop will not attempt to balance I/O and non-I/O tasks.
*/
public void setIoRatio(int ioRatio)
if (ioRatio <= 0 || ioRatio > 100)
throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
this.ioRatio = ioRatio;
/**
* Replaces the current @link Selector of this event loop with newly created @link Selectors to work
* around the infamous epoll 100% CPU bug.
*/
public void rebuildSelector()
if (!inEventLoop())
execute(new Runnable()
@Override
public void run()
rebuildSelector0();
);
return;
rebuildSelector0();
@Override
public int registeredChannels()
return selector.keys().size() - cancelledKeys;
//解决JDK空轮训的bug
//当bug发生后,把selector上的channel全部转移到新的channel上面
private void rebuildSelector0()
//优化后的selector,所有的channel都注册到它上面
final Selector oldSelector = selector;
final SelectorTuple newSelectorTuple;
if (oldSelector == null)
return;
try
//重新创建Seletor
newSelectorTuple = openSelector();
catch (Exception e)
logger.warn("Failed to create a new Selector.", e);
return;
// Register all channels to the new Selector.
int nChannels = 0;
//循环所有旧selector上面的key,每一个key代表一个channel
for (SelectionKey key: oldSelector.keys())
Object a = key.attachment();
try
//key无效 或者 key所属的channel已经绑定在新的selector上都不需要处理
if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null)
continue;
//把key取消掉
int interestOps = key.interestOps();
key.cancel();
//重新注册channel,把channel注册到新的selector上
SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
//更新对象的成员变量
if (a instanceof AbstractNioChannel)
// Update SelectionKey
((AbstractNioChannel) a).selectionKey = newKey;
nChannels ++;
catch (Exception e)
logger.warn("Failed to re-register a Channel to the new Selector.", e);
if (a instanceof AbstractNioChannel)
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.unsafe().voidPromise());
else
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, key, e);
//更新成员变量
selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;
try
//关闭旧的selector
// time to close the old selector as everything else is registered to the new one
oldSelector.close();
catch (Throwable t)
if (logger.isWarnEnabled())
logger.warn("Failed to close the old Selector.", t);
if (logger.isInfoEnabled())
logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
@Override
protected void run()
int selectCnt = 0;
for (;;)
try
int strategy;
try
// hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
// selectSupplier.get() = selector.selectNow();
// 如果存在任务已不阻塞的方式查询IO事件的数量,否则返回SelectStrategy.SELECT
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy)
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
//如果返回了SelectStrategy.SELECT说明队列里没有任务
case SelectStrategy.SELECT:
//查询优先级队列中最近到期的任务时间
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L)
//优先级队列也没有任务
curDeadlineNanos = NONE; // nothing on the calendar
//设置值
nextWakeupNanos.set(curDeadlineNanos);
try
//再次判断是否具有任务
if (!hasTasks())
//阻塞固定时间
strategy = select(curDeadlineNanos);
finally
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
// fall through
default:
catch (IOException e)
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
selectCnt = 0;
handleLoopException(e);
continue;
selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
//IO执行比率为100,则先执行IO任务,然后执行全部的其它任务
if (ioRatio == 100)
try
//如果IO事件数量大于0则先处理IO事件
if (strategy > 0)
processSelectedKeys();
finally
//然后实行全部任务
// Ensure we always run tasks.
ranTasks = runAllTasks();
else if (strategy > 0)
//记录IO事件的起始时间
final long iostartTime = System.nanoTime();
try
//如果IO事件数量大于0则先处理IO事件
processSelectedKeys();
finally
// Ensure we always run tasks.
//ioTime= 如果存在IO事件,则ioTime代表了底层socketIO事件的执行时间
final long ioTime = System.nanoTime() - ioStartTime;
//计算普通任务的可以执行的时间,比如ioRatio=80%,那么任务能执行的时间就是20%
//(100 - ioRatio) / ioRatio = 1/4
//也就是说如果socket-IO执行了20秒,那么任务最多执行5秒。
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
else
//如果没有IO任务,则传入0。那么普通任务最多执行64个。
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
if (ranTasks || strategy > 0)
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled())
logger.debug("Selector.select() returned prematurely times in a row for Selector .",
selectCnt - 1, selector);
//如果具备任务,或者具备IO事件
selectCnt = 0; //则把计数器归0
//否则的话就是空轮训,相当于线程自动唤醒
else if (unexpectedSelectorWakeup(selectCnt)) // Unexpected wakeup (unusual case)
selectCnt = 0;
catch (CancelledKeyException e)
// Harmless exception - log anyway
if (logger.isDebugEnabled())
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?",
selector, e);
catch (Throwable t)
handleLoopException(t);
// Always handle shutdown even if the loop processing threw an exception.
try
if (isShuttingDown())
closeAll();
if (confirmShutdown())
return;
catch (Throwable t)
handleLoopException(t);
// returns true if selectCnt should be reset
private boolean unexpectedSelectorWakeup(int selectCnt)
//是否是因为线程中断被唤醒的
if (Thread.interrupted())
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled())
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
return true;
//默认是512,如果空轮询512次则说明发生了JDK-NIO的BUG
if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD)
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn("Selector.select() returned prematurely times in a row; rebuilding Selector .",
selectCnt, selector);
//修复BUG,重新创建selector
rebuildSelector();
return true;
return false;
private static void handleLoopException(Throwable t)
logger.warn("Unexpected exception in the selector loop.", t);
// Prevent possible consecutive immediate failures that lead to
// excessive CPU consumption.
try
Thread.sleep(1000);
catch (InterruptedException e)
// Ignore.
private void processSelectedKeys()
if (selectedKeys != null)
processSelectedKeysOptimized();
else
processSelectedKeysPlain(selector.selectedKeys());
@Override
protected void cleanup()
try
selector.close();
catch (IOException e)
logger.warn("Failed to close a selector.", e);
void cancel(SelectionKey key)
key.cancel();
cancelledKeys ++;
if (cancelledKeys >= CLEANUP_INTERVAL)
cancelledKeys = 0;
needsToSelectAgain = true;
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys)
// check if the set is empty and if so just return to not create garbage by
// creating a new Iterator every time even if there is nothing to process.
// See https://github.com/netty/netty/issues/597
if (selectedKeys.isEmpty())
return;
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;)
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove();
if (a instanceof AbstractNioChannel)
processSelectedKey(k, (AbstractNioChannel) a);
else
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
if (!i.hasNext())
break;
if (needsToSelectAgain)
selectAgain();
selectedKeys = selector.selectedKeys();
// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty())
break;
else
i = selectedKeys.iterator();
private void processSelectedKeysOptimized()
//循环这一批IO事件上的key
for (int i = 0; i < selectedKeys.size; ++i)
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.keys[i] = null;
//拿到key关联的对象
final Object a = k.attachment();
//强制类型转换
if (a instanceof AbstractNioChannel)
processSelectedKey(k, (AbstractNioChannel) a);
else
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
if (needsToSelectAgain)
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch)
//拿到Unsafe对象操作底层Socket
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
//验证key的有效性
if (!k.isValid())
final EventLoop eventLoop;
try
eventLoop = ch.eventLoop();
catch (Throwable ignored)
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop == this)
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
try
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
//底层事件为Socket客户端连接到服务器端的事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0)
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
//通过unsafe调用
unsafe.finishConnect();
//可以写的事件,调用底层输出字节流
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0)
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
//可读事件或者连接事件,调用底层socket处理
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0)
unsafe.read();
catch (CancelledKeyException ignored)
unsafe.close(unsafe.voidPromise());
private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task)
int state = 0;
try
task.channelReady(k.channel(), k);
state = 1;
catch (Exception e)
k.cancel();
invokeChannelUnregistered(task, k, e);
state = 2;
finally
switch (state)
case 0:
k.cancel();
invokeChannelUnregistered(task, k, null);
break;
case 1:
if (!k.isValid()) // Cancelled by channelReady()
invokeChannelUnregistered(task, k, null);
break;
private void closeAll()
selectAgain();
Set<SelectionKey> keys = selector.keys();
Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
for (SelectionKey k: keys)
Object a = k.attachment();
if (a instanceof AbstractNioChannel)
channels.add((AbstractNioChannel) a);
else
k.cancel();
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, k, null);
for (AbstractNioChannel ch: channels)
ch.unsafe().close(ch.unsafe().voidPromise());
private static void invokeChannelUnregistered(NioTask<SelectableChannel> task, SelectionKey k, Throwable cause)
try
task.channelUnregistered(k.channel(), cause);
catch (Exception e)
logger.warn("Unexpected exception while running NioTask.channelUnregistered()", e);
@Override
protected void wakeup(boolean inEventLoop)
if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE)
selector.wakeup();
@Override
protected boolean beforeScheduledTaskSubmitted(long deadlineNanos)
// Note this is also correct for the nextWakeupNanos == -1 (AWAKE) case
return deadlineNanos < nextWakeupNanos.get();
@Override
protected boolean afterScheduledTaskSubmitted(long deadlineNanos)
// Note this is also correct for the nextWakeupNanos == -1 (AWAKE) case
return deadlineNanos < nextWakeupNanos.get();
Selector unwrappedSelector()
return unwrappedSelector;
int selectNow() throws IOException
return selector.selectNow();
private int select(long deadlineNanos) throws IOException
if (deadlineNanos == NONE)
//阻塞查询,不会自动唤醒,需要其它线程唤醒
return selector.select();
// Timeout will only be 0 if deadline is within 5 microsecs
long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
//因为优先级队列有即将到期的任务,所以阻塞固定时间后自动唤醒.
return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
private void selectAgain()
needsToSelectAgain = false;
try
selector.selectNow();
catch (Throwable t)
logger.warn("Failed to update SelectionKeys.", t);
以上是关于Netty源码分析-NioEventLoop的主要内容,如果未能解决你的问题,请参考以下文章
Netty4.XNetty源码分析之NioEventLoop
Netty源码分析 NioEventLoop的rebuildSelector方法解决Nio中select方法导致cpu100%的BUG