Netty4.XNetty源码分析之NioEventLoop
Posted 程序员小毛驴
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty4.XNetty源码分析之NioEventLoop相关的知识,希望对你有一定的参考价值。
继承关系:
NioEventLoop初始化
NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider)
super(parent, threadFactory, false);
if (selectorProvider == null)
throw new NullPointerException("selectorProvider");
provider = selectorProvider;
selector = openSelector();
1、调用父类方法构造一个taskQueue,它是一个LinkedBlockingQueue
2、openSelector(): Netty是基于Nio实现的,所以也离不开selector。
3、DISABLE_KEYSET_OPTIMIZATION: 判断是否需要对sun.nio.ch.SelectorImpl中的selectedKeys进行优化, 不做配置的话默认需要优化.
4、主要优化在哪: Netty通过反射将selectedKeySet与sun.nio.ch.SelectorImpl中的两个field selectedKeys和publicSelectedKeys绑定,大家知道SelectorImpl原来的selectedKeys和publicSelectedKeys数据结构是HashSet,而HashSet的数据结构是数组+链表,新的数据结构是由2个数组A、B组成,初始大小是1024,避免了HashSet扩容带来的性能问题。除了扩容外,遍历效率也是一个原因,对于需要遍历selectedKeys的全部元素, 数组效率无疑是最高的。
private Selector openSelector()
final Selector selector;
try
selector = provider.openSelector();
catch (IOException e)
throw new ChannelException("failed to open a new selector", e);
if (DISABLE_KEYSET_OPTIMIZATION)
return selector;
try
SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Class<?> selectorImplClass =
Class.forName("sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader());
// Ensure the current selector implementation is what we can instrument.
if (!selectorImplClass.isAssignableFrom(selector.getClass()))
return selector;
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
selectedKeysField.setAccessible(true);
publicSelectedKeysField.setAccessible(true);
selectedKeysField.set(selector, selectedKeySet);
publicSelectedKeysField.set(selector, selectedKeySet);
selectedKeys = selectedKeySet;
logger.trace("Instrumented an optimized java.util.Set into: ", selector);
catch (Throwable t)
selectedKeys = null;
logger.trace("Failed to instrument an optimized java.util.Set into: ", selector, t);
return selector;
NioEventLoop的启动
在上一遍讲过NioEventLoop中维护了一个线程,线程启动时会调用NioEventLoop的run方法,loop会不断循环一个过程:select -> processSelectedKeys(IO任务) -> runAllTasks(非IO任务)
I/O任务: 即selectionKey中ready的事件,如accept、connect、read、write等
非IO任务: 添加到taskQueue中的任务,如bind、channelActive等
@Override
protected void run()
for (;;)
boolean oldWakenUp = wakenUp.getAndSet(false);
try
// 判断是否有非IO任务,如果有立刻返回
if (hasTasks())
selectNow();
else
select(oldWakenUp);
if (wakenUp.get())
selector.wakeup();
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100)
// IO任务
processSelectedKeys();
// 非IO任务
runAllTasks();
else
// 用以控制IO任务与非IO任务的运行时间比
final long iostartTime = System.nanoTime();
// IO任务
processSelectedKeys();
final long ioTime = System.nanoTime() - ioStartTime;
// 非IO任务
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
if (isShuttingDown())
closeAll();
if (confirmShutdown())
break;
catch (Throwable t)
// Prevent possible consecutive immediate failures that lead to
// excessive CPU consumption.
try
Thread.sleep(1000);
catch (InterruptedException e)
// Ignore.
1、wakenUp: 用来决定是否调用selector.wakeup(),只有当wakenUp未true时才会调用,目的是为了减少wake-up的负载,因为Selector.wakeup()是一个昂贵的操作。
2、hasTask(): 判断是否有非IO任务,如果有的话,选择调用非阻塞的selectNow()让select立即返回, 否则以阻塞的方式调用select.timeoutMillis是阻塞时间
3、ioRatio: 控制两种任务的执行时间,你可以通过它来限制非IO任务的执行时间, 默认值是50, 表示允许非IO任务获得和IO任务相同的执行时间,这个值根据自己的具体场景来设置.
4、processSelectedKeys(): 处理IO事件
5、runAllTasks(): 处理非IO任务
6、isShuttingDown(): 检查state是否被标记为ST_SHUTTING_DOWN
private void select(boolean oldWakenUp) throws IOException
Selector selector = this.selector;
try
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;)
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0)
if (selectCnt == 0)
selector.selectNow();
selectCnt = 1;
break;
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks())
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
if (Thread.interrupted())
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled())
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
selectCnt = 1;
break;
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos)
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD)
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn(
"Selector.select() returned prematurely times in a row; rebuilding selector.",
selectCnt);
rebuildSelector();
selector = this.selector;
// Select again to populate selectedKeys.
selector.selectNow();
selectCnt = 1;
break;
currentTimeNanos = time;
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS)
if (logger.isDebugEnabled())
logger.debug("Selector.select() returned prematurely times in a row.", selectCnt - 1);
catch (CancelledKeyException e)
if (logger.isDebugEnabled())
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e);
protected long delayNanos(long currentTimeNanos)
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
if (scheduledTask == null)
return SCHEDULE_PURGE_INTERVAL;
return scheduledTask.delayNanos(currentTimeNanos);
public long delayNanos(long currentTimeNanos)
return Math.max(0, deadlineNanos() - (currentTimeNanos - START_TIME));
public long deadlineNanos()
return deadlineNanos;
1、delayNanos(currentTimeNanos): 在父类SingleThreadEventExecutor中有一个延迟执行任务的队列,delayNanos就是去这个延迟队列里看是否有非IO任务未执行
* 如果没有则返回1秒钟。
* 如果延迟队列里有任务并且最终的计算出来的时间(selectDeadLineNanos - currentTimeNanos)小于500000L纳秒,就调用selectNow()直接返回,反之执行阻塞的select
2、select如果遇到以下几种情况会立即返回
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks())
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
- Selected something 如果select到了就绪连接(selectedKeys > 0)
- waken up by user 被用户唤醒了
- the task queue has a pending task.任务队列来了一个新任务
- a scheduled task is ready for processing 延迟队列里面有个预约任务需要到期执行
3、selectCnt: 记录select空转的次数(selectCnt),该方法解决了Nio中臭名昭著selector的select方法导致cpu100%的BUG,当空转的次数超过了512(定义一个阀值,这个阀值默认是512,可以在应用层通过设置系统属性io.netty.selectorAutoRebuildThreshold传入),Netty会重新构建新的Selector,将老Selector上注册的Channel转移到新建的Selector上,关闭老Selector,用新的Selector代替老Selector。详细看下面rebuildSelector()方法
4、rebuildSelector(): 就是上面说过得。
public void rebuildSelector()
if (!inEventLoop())
execute(new Runnable()
@Override
public void run()
rebuildSelector();
);
return;
final Selector oldSelector = selector;
final Selector newSelector;
if (oldSelector == null)
return;
try
newSelector = openSelector();
catch (Exception e)
logger.warn("Failed to create a new Selector.", e);
return;
// Register all channels to the new Selector.
int nChannels = 0;
for (;;)
try
for (SelectionKey key: oldSelector.keys())
Object a = key.attachment();
try
if (!key.isValid() || key.channel().keyFor(newSelector) != null)
continue;
int interestOps = key.interestOps();
key.cancel();
SelectionKey newKey = key.channel().register(newSelector, interestOps, a);
if (a instanceof AbstractNioChannel)
// Update SelectionKey
((AbstractNioChannel) a).selectionKey = newKey;
nChannels ++;
catch (Exception e)
logger.warn("Failed to re-register a Channel to the new Selector.", e);
if (a instanceof AbstractNioChannel)
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.unsafe().voidPromise());
else
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, key, e);
catch (ConcurrentModificationException e)
// Probably due to concurrent modification of the key set.
continue;
break;
selector = newSelector;
try
// time to close the old selector as everything else is registered to the new one
oldSelector.close();
catch (Throwable t)
if (logger.isWarnEnabled())
logger.warn("Failed to close the old Selector.", t);
logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
作者:小毛驴,一个游戏人
梦想:世界和平
github主页:https://liulongling.github.io/
csdn主页:http://blog.csdn.net/liulongling
若有错误之处,请多多谅解并欢迎批评指正。
本博客中未标明转载的文章归作者小毛驴所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
以上是关于Netty4.XNetty源码分析之NioEventLoop的主要内容,如果未能解决你的问题,请参考以下文章