Netty4.XNetty源码分析之NioEventLoop

Posted 程序员小毛驴

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty4.XNetty源码分析之NioEventLoop相关的知识,希望对你有一定的参考价值。

继承关系:

NioEventLoop初始化

NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) 
    super(parent, threadFactory, false);
    if (selectorProvider == null) 
        throw new NullPointerException("selectorProvider");
    
    provider = selectorProvider;
    selector = openSelector();

1、调用父类方法构造一个taskQueue,它是一个LinkedBlockingQueue

2、openSelector(): Netty是基于Nio实现的,所以也离不开selector。

3、DISABLE_KEYSET_OPTIMIZATION: 判断是否需要对sun.nio.ch.SelectorImpl中的selectedKeys进行优化, 不做配置的话默认需要优化.

4、主要优化在哪: Netty通过反射将selectedKeySet与sun.nio.ch.SelectorImpl中的两个field selectedKeys和publicSelectedKeys绑定,大家知道SelectorImpl原来的selectedKeys和publicSelectedKeys数据结构是HashSet,而HashSet的数据结构是数组+链表,新的数据结构是由2个数组A、B组成,初始大小是1024,避免了HashSet扩容带来的性能问题。除了扩容外,遍历效率也是一个原因,对于需要遍历selectedKeys的全部元素, 数组效率无疑是最高的。

private Selector openSelector() 
    final Selector selector;
    try 
        selector = provider.openSelector();
     catch (IOException e) 
        throw new ChannelException("failed to open a new selector", e);
    

    if (DISABLE_KEYSET_OPTIMIZATION) 
        return selector;
    

    try 
        SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

        Class<?> selectorImplClass =
                Class.forName("sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader());

        // Ensure the current selector implementation is what we can instrument.
        if (!selectorImplClass.isAssignableFrom(selector.getClass())) 
            return selector;
        

        Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
        Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

        selectedKeysField.setAccessible(true);
        publicSelectedKeysField.setAccessible(true);

        selectedKeysField.set(selector, selectedKeySet);
        publicSelectedKeysField.set(selector, selectedKeySet);

        selectedKeys = selectedKeySet;
        logger.trace("Instrumented an optimized java.util.Set into: ", selector);
     catch (Throwable t) 
        selectedKeys = null;
        logger.trace("Failed to instrument an optimized java.util.Set into: ", selector, t);
    

    return selector;

NioEventLoop的启动

在上一遍讲过NioEventLoop中维护了一个线程,线程启动时会调用NioEventLoop的run方法,loop会不断循环一个过程:select -> processSelectedKeys(IO任务) -> runAllTasks(非IO任务)

  • I/O任务: 即selectionKey中ready的事件,如accept、connect、read、write等

  • 非IO任务: 添加到taskQueue中的任务,如bind、channelActive等

@Override
protected void run() 
    for (;;) 
        boolean oldWakenUp = wakenUp.getAndSet(false);
        try 
            // 判断是否有非IO任务,如果有立刻返回
            if (hasTasks()) 
                selectNow();
             else 
                select(oldWakenUp);

                if (wakenUp.get()) 
                    selector.wakeup();
                
            

            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) 
                // IO任务
                processSelectedKeys();
                // 非IO任务
                runAllTasks();
             else 
                // 用以控制IO任务与非IO任务的运行时间比
                final long iostartTime = System.nanoTime();
                // IO任务
                processSelectedKeys();

                final long ioTime = System.nanoTime() - ioStartTime;
                // 非IO任务
                runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
            

            if (isShuttingDown()) 
                closeAll();
                if (confirmShutdown()) 
                    break;
                
            
         catch (Throwable t) 
            // Prevent possible consecutive immediate failures that lead to
            // excessive CPU consumption.
            try 
                Thread.sleep(1000);
             catch (InterruptedException e) 
                // Ignore.
            
        
    

1、wakenUp: 用来决定是否调用selector.wakeup(),只有当wakenUp未true时才会调用,目的是为了减少wake-up的负载,因为Selector.wakeup()是一个昂贵的操作。

2、hasTask(): 判断是否有非IO任务,如果有的话,选择调用非阻塞的selectNow()让select立即返回, 否则以阻塞的方式调用select.timeoutMillis是阻塞时间

3、ioRatio: 控制两种任务的执行时间,你可以通过它来限制非IO任务的执行时间, 默认值是50, 表示允许非IO任务获得和IO任务相同的执行时间,这个值根据自己的具体场景来设置.

4、processSelectedKeys(): 处理IO事件

5、runAllTasks(): 处理非IO任务

6、isShuttingDown(): 检查state是否被标记为ST_SHUTTING_DOWN

private void select(boolean oldWakenUp) throws IOException 
        Selector selector = this.selector;
        try 
            int selectCnt = 0;
            long currentTimeNanos = System.nanoTime();
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
            for (;;) 
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                if (timeoutMillis <= 0) 
                    if (selectCnt == 0) 
                        selector.selectNow();
                        selectCnt = 1;
                    
                    break;
                

                int selectedKeys = selector.select(timeoutMillis);
                selectCnt ++;

                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) 
                    // - Selected something,
                    // - waken up by user, or
                    // - the task queue has a pending task.
                    // - a scheduled task is ready for processing
                    break;
                
                if (Thread.interrupted()) 
                    // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                    // As this is most likely a bug in the handler of the user or it's client library we will
                    // also log it.
                    //
                    // See https://github.com/netty/netty/issues/2426
                    if (logger.isDebugEnabled()) 
                        logger.debug("Selector.select() returned prematurely because " +
                                "Thread.currentThread().interrupt() was called. Use " +
                                "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                    
                    selectCnt = 1;
                    break;
                

                long time = System.nanoTime();
                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) 
                    // timeoutMillis elapsed without anything selected.
                    selectCnt = 1;
                 else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) 
                    // The selector returned prematurely many times in a row.
                    // Rebuild the selector to work around the problem.
                    logger.warn(
                            "Selector.select() returned prematurely  times in a row; rebuilding selector.",
                            selectCnt);

                    rebuildSelector();
                    selector = this.selector;

                    // Select again to populate selectedKeys.
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                

                currentTimeNanos = time;
            

            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) 
                if (logger.isDebugEnabled()) 
                    logger.debug("Selector.select() returned prematurely  times in a row.", selectCnt - 1);
                
            
         catch (CancelledKeyException e) 
            if (logger.isDebugEnabled()) 
                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e);
            
        
    
protected long delayNanos(long currentTimeNanos) 
    ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
    if (scheduledTask == null) 
        return SCHEDULE_PURGE_INTERVAL;
    

    return scheduledTask.delayNanos(currentTimeNanos);


public long delayNanos(long currentTimeNanos) 
    return Math.max(0, deadlineNanos() - (currentTimeNanos - START_TIME));


public long deadlineNanos() 
    return deadlineNanos;

1、delayNanos(currentTimeNanos): 在父类SingleThreadEventExecutor中有一个延迟执行任务的队列,delayNanos就是去这个延迟队列里看是否有非IO任务未执行
* 如果没有则返回1秒钟。
* 如果延迟队列里有任务并且最终的计算出来的时间(selectDeadLineNanos - currentTimeNanos)小于500000L纳秒,就调用selectNow()直接返回,反之执行阻塞的select

2、select如果遇到以下几种情况会立即返回

if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) 
                    // - Selected something,
                    // - waken up by user, or
                    // - the task queue has a pending task.
                    // - a scheduled task is ready for processing
                    break;
                
  1. Selected something 如果select到了就绪连接(selectedKeys > 0)
  2. waken up by user 被用户唤醒了
  3. the task queue has a pending task.任务队列来了一个新任务
  4. a scheduled task is ready for processing 延迟队列里面有个预约任务需要到期执行

3、selectCnt: 记录select空转的次数(selectCnt),该方法解决了Nio中臭名昭著selector的select方法导致cpu100%的BUG,当空转的次数超过了512(定义一个阀值,这个阀值默认是512,可以在应用层通过设置系统属性io.netty.selectorAutoRebuildThreshold传入),Netty会重新构建新的Selector,将老Selector上注册的Channel转移到新建的Selector上,关闭老Selector,用新的Selector代替老Selector。详细看下面rebuildSelector()方法

4、rebuildSelector(): 就是上面说过得。

public void rebuildSelector() 
    if (!inEventLoop()) 
        execute(new Runnable() 
            @Override
            public void run() 
                rebuildSelector();
            
        );
        return;
    

    final Selector oldSelector = selector;
    final Selector newSelector;

    if (oldSelector == null) 
        return;
    

    try 
        newSelector = openSelector();
     catch (Exception e) 
        logger.warn("Failed to create a new Selector.", e);
        return;
    

    // Register all channels to the new Selector.
    int nChannels = 0;
    for (;;) 
        try 
            for (SelectionKey key: oldSelector.keys()) 
                Object a = key.attachment();
                try 
                    if (!key.isValid() || key.channel().keyFor(newSelector) != null) 
                        continue;
                    

                    int interestOps = key.interestOps();
                    key.cancel();
                    SelectionKey newKey = key.channel().register(newSelector, interestOps, a);
                    if (a instanceof AbstractNioChannel) 
                        // Update SelectionKey
                        ((AbstractNioChannel) a).selectionKey = newKey;
                    
                    nChannels ++;
                 catch (Exception e) 
                    logger.warn("Failed to re-register a Channel to the new Selector.", e);
                    if (a instanceof AbstractNioChannel) 
                        AbstractNioChannel ch = (AbstractNioChannel) a;
                        ch.unsafe().close(ch.unsafe().voidPromise());
                     else 
                        @SuppressWarnings("unchecked")
                        NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                        invokeChannelUnregistered(task, key, e);
                    
                
            
         catch (ConcurrentModificationException e) 
            // Probably due to concurrent modification of the key set.
            continue;
        

        break;
    

    selector = newSelector;

    try 
        // time to close the old selector as everything else is registered to the new one
        oldSelector.close();
     catch (Throwable t) 
        if (logger.isWarnEnabled()) 
            logger.warn("Failed to close the old Selector.", t);
        
    

    logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");

作者:小毛驴,一个游戏人 
梦想:世界和平   
github主页:https://liulongling.github.io/
csdn主页:http://blog.csdn.net/liulongling
若有错误之处,请多多谅解并欢迎批评指正。    
本博客中未标明转载的文章归作者小毛驴所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

以上是关于Netty4.XNetty源码分析之NioEventLoop的主要内容,如果未能解决你的问题,请参考以下文章

Netty4.XNetty源码分析之NioEventLoopGroup

Netty4.XNetty源码分析之ByteBuf

Netty4.xNetty TCP粘包/拆包问题的解决办法

Redux源码分析之compose

Zookeeper源码分析目录

Spark 源码分析系列