全流程分析Netty设计思路与实践

Posted 有山先生

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了全流程分析Netty设计思路与实践相关的知识,希望对你有一定的参考价值。

1. 背景

通过上一篇文章:深入理解NIO多路复用,了解到内核态通过事件通知+中断检测socket事件,用户态可以使用1个线程处理所有socket请求,时间复杂度为O(1)。看上去该IO方案已经很完美了,但是当连接数越来越多时,且活跃的连接越来越多时,比如10w+,单线程处理可能会很吃力。而Netty可以设置线程池处理socket事件,从而分摊单线程压力;同时,Netty框架封装了通用逻辑,大大方便了业务开发。本文将会分析Netty线程池调用关系,即reactor模型,直接了解Netty最核心的设计思想;同时还会优化下NIO Server代码,实现一个最简单的Netty服务端Demo;最后介绍下Netty常见问题。

2. Netty基本概念

Netty通用代码:

创建两个线程池,分别用于处理ServerSocket事件和Socket事件;并指定ServerSocket和Socket发生事件时执行自定义类ServerHandler中的方法:

Netty业务代码:

ServerHander定义了方法,当服务端接受到了客户端发送的数据时,调用channelRead方法处理数据;当socket/serverSocket注册到selector中时,调用channelRegistered:

上述代码中,netty架构图如下所示:

从Netty架构图中可以看到NioEventLoopGroup和pipeline是最重要的概念,后面将会从Netty工作流程详细分析这两个概念的实现思想。

3. Netty工作流程

3.1 创建bossGroup和workerGroup对象

如下,bossGroup对应NioEventLoopGroup创建1个NioEventLoop,workerGroup创建10个NioEventLoop。每个NioEventLoop内部包含一个新的多路复用器Selector和线程,bossGroup的selector用于注册serverSocketChannel,workerGroup的selector用于注册socketChannel。在线程中则是处理selector注册的socket上发生的事件。

NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup(10);

NioEventLoopGroup从子类到父类的初始化顺序为:NioEventLoopGroup -> MultithreadEventLoopGroup -> MultithreadEventExecutorGroup。

3.1.1 创建SelectorProvider

SelectorProvider是Selector多路复用的工厂类,用于创建Selector的实现类。NioEventLoopGroup初始化时,创建了SelectorProvider对象:

public NioEventLoopGroup(int nThreads, Executor executor) 
    this(nThreads, executor, SelectorProvider.provider());

SelectorProvider类通过rt.jar包中的sun.nio.ch.DefaultSelectorProvider类调用create方法,创建SelectorProvider实现:

public abstract class SelectorProvider 
    public static SelectorProvider provider() 
        synchronized (lock) 
            if (provider != null)
                return provider;
            return AccessController.doPrivileged(
                new PrivilegedAction<SelectorProvider>() 
                    public SelectorProvider run() 
                            if (loadProviderFromProperty())
                                return provider;
                            if (loadProviderAsService())
                                return provider;
                            provider = sun.nio.ch.DefaultSelectorProvider.create();
                            return provider;
                        
                    );
        
    

不同操作系统的jdk包中rt.jar包中DefaultSelectorProvider实现不同,例如mac os的create方法返回KQueueSelectorProvider对象:

public class DefaultSelectorProvider 
    private DefaultSelectorProvider() 
    

    public static SelectorProvider create() 
        return new KQueueSelectorProvider();
    

linux操作系统rt.jar包的create方法返回EPollSelectorProvider对象:

public class DefaultSelectorProvider 
    private DefaultSelectorProvider() 
    

    public static SelectorProvider create() 
        String var0 = (String)AccessController.doPrivileged(new GetPropertyAction("os.name"));
        if (var0.equals("SunOS")) 
            return createProvider("sun.nio.ch.DevPollSelectorProvider");
         else 
            return (SelectorProvider)(var0.equals("Linux") ? createProvider("sun.nio.ch.EPollSelectorProvider") : new PollSelectorProvider());
        
    

EPollSelectorProvider可以通过openSelector方法创建EPollSelectorImpl对象:

public class EPollSelectorProvider extends SelectorProviderImpl 
    public EPollSelectorProvider() 
    

    public AbstractSelector openSelector() throws IOException 
        return new EPollSelectorImpl(this);
    

EPollSelectorImpl最底层封装了socket系统调用epoll_create、epoll_ctl,完成多路复用功能。

3.1.2 创建线程池

有了SelectorProvider,就可以创建线程执行器Executor了。线程池中每一个线程的创建动作由DefaultThreadFactory定义。Executor直接从线程池中使用一个线程:

public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup 
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) 
        //创建线程执行器,
        if (executor == null) 
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        
        //省略
    

    //创建线程池
    protected ThreadFactory newDefaultThreadFactory() 
        return new DefaultThreadFactory(getClass());
    

线程池的初始化操作如下:

public class DefaultThreadFactory implements ThreadFactory 
    public DefaultThreadFactory(Class<?> poolType) 
        this(poolType, false, Thread.NORM_PRIORITY);
    

    public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) 
        this(toPoolName(poolType), daemon, priority);
    

    public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) 
        ObjectUtil.checkNotNull(poolName, "poolName");

        if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) 
            throw new IllegalArgumentException(
                    "priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)");
        
        //使用统一的前缀作为线程名
        prefix = poolName + - + poolId.incrementAndGet() + -;
        this.daemon = daemon;
        this.priority = priority;
        this.threadGroup = threadGroup;
    

    //可以调用newThread直接创建一个线程
    public Thread newThread(Runnable r) 
        Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
        try 
            if (t.isDaemon() != daemon) 
                t.setDaemon(daemon);
            

            if (t.getPriority() != priority) 
                t.setPriority(priority);
            
         catch (Exception ignored) 
            // Doesnt matter even if failed to set.
        
        return t;
    

定义了线程名前缀:

后续创建线程时,使用线程名做前缀:

ThreadPerTaskExecutor调用execute时,直接从线程池中创建一个新线程:

public final class ThreadPerTaskExecutor implements Executor 
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) 
        this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
    

    @Override
    public void execute(Runnable command) 
        threadFactory.newThread(command).start();
    

3.1.3 封装线程池和Selector

通过创建SelectorProvider和Executor两个重要依赖后,就可以构造NioEventLoop了:

public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup 
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) 
        //创建线程池
        if (executor == null) 
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        

        children = new EventExecutor[nThreads];
        //创建NioEventLoop,bossGroup指定1个NioEventLoop,workerGroup指定10个NioEventLoop
        for (int i = 0; i < nThreads; i ++) 
            boolean success = false;
            try 
                //创建NioEventLoop
                children[i] = newChild(executor, args);
                success = true;
             catch (Exception e) 
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
             finally 
                //省略
            
        
        chooser = chooserFactory.newChooser(children);

        //省略
    

    //创建NioEventLoop的方法由NioEventLoopGroup类实现
    protected abstract EventLoop newChild(Executor executor, Object... args) throws Exception;

NioEventLoopGroup实现了newChild方法,创建NioEventLoop对象:

public class NioEventLoopGroup extends MultithreadEventLoopGroup 
    protected EventLoop newChild(Executor executor, Object... args) throws Exception 
        EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
    

NioEventLoop中,通过openSelector()方法创建selector,也就是EPollSelectorImpl对象。

public final class NioEventLoop extends SingleThreadEventLoop 
    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                 EventLoopTaskQueueFactory queueFactory) 
        super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
                rejectedExecutionHandler);
        this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
        this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
        final SelectorTuple selectorTuple = openSelector();
        this.selector = selectorTuple.selector;
        this.unwrappedSelector = selectorTuple.unwrappedSelector;
    

    private SelectorTuple openSelector() 
        final Selector unwrappedSelector;
        try 
            //创建EPollSelectorImpl对象
            unwrappedSelector = provider.openSelector();
         catch (IOException e) 
            throw new ChannelException("failed to open a new selector", e);
        
        //省略
        return new SelectorTuple(unwrappedSelector);
    

3.2 NioEventLoopGroup总结

NioEventLoopGroup包含多个NioEventLoop。每个NioEventLoop内部包含一个新的多路复用器Selector和线程,bossGroup的selector用于注册serverSocketChannel,workerGroup的selector用于注册socketChannel。每个NioEventLoop中,都包含一个Selector以及一个线程,线程暂时用ThreadPerTaskExecutor表示,执行ThreadPerTaskExecutor#executor就会创建NioEventLoop专属的线程。

3.3 创建启动类ServerBootstrap对象

ServerBootstrap是启动类,将NioEventLoopGroup等参数传递到ServerBootstrap中,ServerBootstrap负责启动netty服务端。

3.3.1 指定SeverSocketChannel的实现类

指定NioserverSocketChannel作为netty的SeverSocketChannel实现类:

serverBootstrap.channel(NioServerSocketChannel.class);

NioServerSocketChannel的构造函数通过EPollSelectorProvider创建ServerSocketChannel对象

public class NioServerSocketChannel extends AbstractNioMessageChannel
                             implements io.netty.channel.socket.ServerSocketChannel 
    //DEFAULT_SELECTOR_PROVIDER就是EPollSelectorProvider对象
    private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

    public NioServerSocketChannel() 
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    

    private static ServerSocketChannel newSocket(SelectorProvider provider) 
        try 
            //通过EPollSelectorProvider的父类SelectorProviderImpl的openServerSocketChannel()方法创建ServerSocketChannel对象。
            return provider.openServerSocketChannel();
         catch (IOException e) 
            throw new ChannelException(
                    "Failed to open a server socket.", e);
        
    

    public NioServerSocketChannel(ServerSocketChannel channel) 
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    

NioServerSocketChannel通过父类的AbstractNioChannel构造方法设置ServerSocketChannel为非阻塞:

public abstract class AbstractNioChannel extends AbstractChannel 
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) 
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try 
            ch.configureBlocking(false);
         catch (IOException e) 
            try 
                ch.close();
             catch (IOException e2) 
                logger.warn(
                            "Failed to close a partially initialized socket.", e2);
            

            throw new ChannelException("Failed to enter non-blocking mode.", e);
        
    

NioServerSocketChannel的父类AbstractChannel会为ServerSocketChannel创建对应的Unsafe和Pipeline,这个后面再展开:

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel 
    protected AbstractChannel(Channel parent) 
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    

    protected abstract AbstractUnsafe newUnsafe();

    protected DefaultChannelPipeline newChannelPipeline() 
        return new DefaultChannelPipeline(this);
    

3.3.2 配置handler

handler表示socket发生事件时,应该执行的操作。

serverBootstrap.option(ChannelOption.SO_BACKLOG, 128)
                    .handler(new ChannelInitializer<ServerSocketChannel>() 
                        @Override
                        protected void initChannel(ServerSocketChannel ch) throws Exception 
                            ch.pipeline().addLast(new ServerHandler());
                        
                    )
                    .childHandler(new ChannelInitializer<SocketChannel>() 
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception 
                            ch.pipeline().addLast(new ServerHandler());
                        
                    );

ServerBootstrap的父类AbstractBootstrap保存ServerSocketChannel对应的handler:

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable 
    public B handler(ChannelHandler handler) 
        this.handler = ObjectUtil.checkNotNull(handler, "handler");
        return self();
    

ServerBootstrap保存SocketChannel对应的childHander:

public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> 
    public ServerBootstrap childHandler(ChannelHandler childHandler) 
        this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
        return this;
    

3.4 netty服务端启动

通过ServerBootstrap#bind方法启动netty服务端:

ChannelFuture future = serverBootstrap.bind(8080).sync();

3.4.1 创建ServerSocketChannel

调用ServerBootstrap的父类AbstractBootstrap的doBind方法,通过AbstractBootstrap#initAndRegister开始创建ServerSocketChannel:

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable 
    private ChannelFuture doBind(final SocketAddress localAddress) 
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) 
            return regFuture;
        
        //省略
   

   //创建ServerSocketChannel
   final ChannelFuture initAndRegister() 
        Channel channel = null;
        try 
            channel = channelFactory.newChannel();
            init(channel);
         catch (Throwable t) 
            //省略
        
        //省略

从上面的AbstractBootstrap#initAndRegister可以看到channelFactory#newChannel方法,它就调用了NioServerSocketChannel的构造函数,而NioServerSocketChannel构造函数里面就创建了ServerSocketChannel,并设置了非阻塞。

3.4.2 初始化pipeline

在创建完NioServerSocketChannel后,通过init方法,将主程序中定义的的Handler放到NioServerSocketChannel的pipeline中:

public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> 
    void init(Channel channel) 
        setChannelOptions(channel, newOptionsArray(), logger);
        setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));

        ChannelPipeline p = channel.pipeline();

        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        synchronized (childOptions) 
            currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
        
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);

        p.addLast(new ChannelInitializer<Channel>() 
            @Override
            public void initChannel(final Channel ch) 
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) 
                    pipeline.addLast(handler);
                

                ch.eventLoop().execute(new Runnable() 
                    @Override
                    public void run() 
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    
                );
            
        );
    

3.4.3 DefaultChannelPipeline插入元素

pipeline基于设计模式中的责任链模式。责任链模式为请求创建了一个处理对象的链。发起请求和具体处理请求的过程进行解耦:职责链上的处理者(Handler)负责处理请求,客户只需要将请求发送到职责链上即可,无须关心请求的处理细节和请求的传递。

当用户发起请求时,服务端逐步调用Inbound Handler,响应用户请求时,服务端逐步调用Outbound Handler。如下所示:

在创建ServerSocketChannel时,创建了NioEventLoop对应的DefaultChannelPipeline对象,该pipeline专属于ServerSocketChannel。

如下可以看到,DefaultChannelPipeline就是一个链表结构,每次addLast方法插入一个handler,就将handler封装成DefaultChannelHandlerContext,加入到链表结尾:

public class DefaultChannelPipeline implements ChannelPipeline 
    final AbstractChannelHandlerContext head;
    final AbstractChannelHandlerContext tail;

    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) 
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) 
            checkMultiplicity(handler);

            newCtx = newContext(group, filterName(name, handler), handler);

            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventLoop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) 
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            
            //执行callHandlerAdded0方法
            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) 
                callHandlerAddedInEventLoop(newCtx, executor);
                return this;
            
        
        callHandlerAdded0(newCtx);
        return this;
    

    private void addLast0(AbstractChannelHandlerContext newCtx) 
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    

关于pipeline中handler调用链中的调用方式,后面再展开。

3.4.4 ServerSocketChannel的pipeline添加handler

在3.4.2节中,init方法会增加一个handler,通过addLast添加到ServerSocketChannel的pipeline中:

void init(Channel channel) 
    //省略
    p.addLast(new ChannelInitializer<Channel>() 
            @Override
            public void initChannel(final Channel ch) 
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) 
                    pipeline.addLast(handler);
                

                ch.eventLoop().execute(new Runnable() 
                    @Override
                    public void run() 
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    
                );
            
        );

上面这个handler是netty自定义的handler,它重写了ChannelInitializer的initChannel方法,当ServerSocketChannel发生了初始化事件时,会调用ChannelInitializer的initChannel方法,它负责将用户自定义的ServerHandler加入pipeline中。随后将netty自定义的ServerBootstrapAcceptor类放到pipeline中,而ServerBootstrapAcceptor负责将ServerSocketChannel创建的SocketChannel注册到Selector中

public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> 
    private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter 
        public void channelRead(ChannelHandlerContext ctx, Object msg) 
            final Channel child = (Channel) msg;

            child.pipeline().addLast(childHandler);

            setChannelOptions(child, childOptions, logger);
            setAttributes(child, childAttrs);

            try 
                childGroup.register(child).addListener(new ChannelFutureListener() 
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception 
                        if (!future.isSuccess()) 
                            forceClose(child, future.cause());
                        
                    
                );
             catch (Throwable t) 
                forceClose(child, t);
            
        
    

3.4.5 准备将ServerSocketChannel注册到selector中

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable 
    final ChannelFuture initAndRegister() 
        Channel channel = null;
        try 
            //创建socketServerChannel
            channel = channelFactory.newChannel();
            init(channel);
         catch (Throwable t) 
            if (channel != null) 
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
        
        //注册serverSocketChannel到selector中
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) 
            if (channel.isRegistered()) 
                channel.close();
             else 
                channel.unsafe().closeForcibly();
            
        
        return regFuture;
    

上述config().group()返回bossGroup,调用NioEventLoopGroup的父类MultithreadEventLoopGroup的register方法进行注册:

public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup 
    public ChannelFuture register(Channel channel) 
        //注册serversocketchannel
        return next().register(channel);
    

next方法表示要选择一个在NioEventLoopGroup中选择一个NioEventLoop对象,NioEventLoop包含Selctor和线程。MultithreadEventExecutorGroup通过DefaultEventExecutorChooserFactory工厂创建chooser对象,该对象用于决定如何选择NioEventLoop执行对应任务。注意,由于bossGroup只有一个NioEventLoop,因此只会挑选唯一的NioEventLoop。而对于workerGroup,chooser才会在10个EventLoopGroup轮询选择。

public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup 

    private final EventExecutorChooserFactory.EventExecutorChooser chooser;

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) 
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    

    public EventExecutor next() 
        return chooser.next();
    

如下所示,当NioEventLoopGroup的线程数为2的次方时,使用PowerOfTwoEventExecutorChooser实现;否则用GenericEventExecutorChooser实现。PowerOfTwoEventExecutorChooser通过位运算计算下一次轮询的NioEventLoop;GenericEventExecutorChooser通过算术运算计算下一次轮询的NioEventLoop。显然PowerOfTwoEventExecutorChooser效率更高:

public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory 

    public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

    private DefaultEventExecutorChooserFactory()  

    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) 
        if (isPowerOfTwo(executors.length)) 
            return new PowerOfTwoEventExecutorChooser(executors);
         else 
            return new GenericEventExecutorChooser(executors);
        
    

    private static boolean isPowerOfTwo(int val) 
        return (val & -val) == val;
    

    private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser 
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTwoEventExecutorChooser(EventExecutor[] executors) 
            this.executors = executors;
        

        @Override
        public EventExecutor next() 
            //位运算
            return executors[idx.getAndIncrement() & executors.length - 1];
        
    

    private static final class GenericEventExecutorChooser implements EventExecutorChooser 
               private final AtomicLong idx = new AtomicLong();
        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) 
            this.executors = executors;
        

        @Override
        public EventExecutor next() 
            //算术运算
            return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
        
    

挑选好要执行的NioEventLoop对象后,调用其父类SingleThreadEventLoop的register方法:

public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop 
    public ChannelFuture register(final ChannelPromise promise) 
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    

其中,promise.channel()就是NioServerSocketChannel,调用它的父类AbstractChannel的unsafe方法,返回unsafe成员:

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel 

   private final Unsafe unsafe;

    public Unsafe unsafe() 
        return unsafe;
    

Unsafe成员是在NioServerSocketChannel初始化时创建的,调用newUnsafe方法创建:

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel 
    protected AbstractChannel(Channel parent) 
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    
    //抽象方法,子类实现
    protected abstract AbstractUnsafe newUnsafe();

newUnsafe由子类AbstractNioMessageChannel实现的unsafe()方法:

public abstract class AbstractNioMessageChannel extends AbstractNioChannel 
    protected AbstractNioUnsafe newUnsafe() 
        return new NioMessageUnsafe();
    

    //NioMessageUnsafe定义,重写了读方法,后面的ServerSocketChannel的读操作就执行这个方法
    private final class NioMessageUnsafe extends AbstractNioUnsafe 

        private final List<Object> readBuf = new ArrayList<Object>();

        @Override
        public void read() 
            assert eventLoop().inEventLoop();
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            allocHandle.reset(config);

            boolean closed = false;
            Throwable exception = null;
            try 
                try 
                    do 
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) 
                            break;
                        
                        if (localRead < 0) 
                            closed = true;
                            break;
                        

                        allocHandle.incMessagesRead(localRead);
                     while (allocHandle.continueReading());
                 catch (Throwable t) 
                    exception = t;
                

                int size = readBuf.size();
                for (int i = 0; i < size; i ++) 
                    readPending = false;
                    pipeline.fireChannelRead(readBuf.get(i));
                
                readBuf.clear();
                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

                if (exception != null) 
                    closed = closeOnReadError(exception);

                    pipeline.fireExceptionCaught(exception);
                

                if (closed) 
                    inputShutdown = true;
                    if (isOpen()) 
                        close(voidPromise());
                    
                
             finally 
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!readPending && !config.isAutoRead()) 
                    removeReadOp();
                
            
        
    

由于NioMessageUnsafe继承了AbstractNioUnsafe方法,因此执行AbstractNioUnsafe父类AbstractChannel中的register方法。register方法确保EventLoop线程启动,如果没有启动,就当场启动。在线程中执行register0方法。register0负责注册及handler添加操作等,后面逐步展开:

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel 
    protected abstract class AbstractUnsafe implements Unsafe 
        public final void register(EventLoop eventLoop, final ChannelPromise promise) 
            ObjectUtil.checkNotNull(eventLoop, "eventLoop");
            if (isRegistered()) 
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            
            if (!isCompatible(eventLoop)) 
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            

            AbstractChannel.this.eventLoop = eventLoop;
            //确定eventLoop线程已经启动,如果已经启动,在线程中执行register0方法
            if (eventLoop.inEventLoop()) 
                register0(promise);
             else 
                try 
                    //如果eventLoop线程没有启动,先启动线程,再在线程中执行register0方法
                    eventLoop.execute(new Runnable() 
                        @Override
                        public void run() 
                            register0(promise);
                        
                    );
                 catch (Throwable t) 
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: ",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                
            
        

        private void register0(ChannelPromise promise) 
            try 
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if (!promise.setUncancellable() || !ensureOpen(promise)) 
                    return;
                
                boolean firstRegistration = neverRegistered;
                //执行注册动作
                doRegister();
                neverRegistered = false;
                registered = true;

                // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                // user may already fire events through the pipeline in the ChannelFutureListener.
                //添加Handler
                pipeline.invokeHandlerAddedIfNeeded();

                safeSetSuccess(promise);
                pipeline.fireChannelRegistered();
                // Only fire a channelActive if the channel has never been registered. This prevents firing
                // multiple channel actives if the channel is deregistered and re-registered.
                if (isActive()) 
                    if (firstRegistration) 
                        pipeline.fireChannelActive();
                     else if (config().isAutoRead()) 
                        // This channel was registered before and autoRead() is set. This means we need to begin read
                        // again so that we process inbound data.
                        //
                        // See https://github.com/netty/netty/issues/4805
                        beginRead();
                    
                
             catch (Throwable t) 
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            
        
    

3.4.6 启动NioEventLoop线程

NioEventLoop线程为慢启动,当需要执行task时,才启动线程。为了保证ServerSocketChannel注册到Selector是在子线程中执行的,在调用register时,会判断NioEventLoop是否启动,如下:

            if (eventLoop.inEventLoop()) 
                register0(promise);
             else 
                try 
                    //如果eventLoop线程没有启动,先启动线程,再在线程中执行register0方法
                    eventLoop.execute(new Runnable() 
                        @Override
                        public void run() 
                            register0(promise);
                        
                    );
                 catch (Throwable t) 
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: ",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                
            

NioEventLoop的父类SingleThreadEventExecutor维护了一个线程thread对象,默认情况下线程为null,传入的是main线程,因此初始情况下,inEventLoop返回false:

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor 
    private volatile Thread thread;

    //输入参数为Thread.currentThread()
    public boolean inEventLoop(Thread thread) 
        return thread == this.thread;
    

因此执行else分支。它封装了register0方法成为一个线程对象,传入NioEventLoop#execute方法中,

                   eventLoop.execute(new Runnable() 
                        @Override
                        public void run() 
                            register0(promise);
                        
                    );

NioEventLoop的execute由其父类SingleThreadEventExecutor实现。先将注册任务加入任务队列中,然后启动线程:

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor 
    private void execute(Runnable task, boolean immediate) 
        boolean inEventLoop = inEventLoop();
        //先将注册任务加入任务队列中
        addTask(task);
        if (!inEventLoop) 
            //启动线程
            startThread();
            if (isShutdown()) 
                boolean reject = false;
                try 
                    if (removeTask(task)) 
                        reject = true;
                    
                 catch (UnsupportedOperationException e) 
                    // The task queue does not support removal so the best thing we can do is to just move on and
                    // hope we will be able to pick-up the task before its completely terminated.
                    // In worst case we will log on termination.
                
                if (reject) 
                    reject();
                
            
        

        if (!addTaskWakesUp && immediate) 
            wakeup(inEventLoop);
        
    

通过Executor成员启动子线程,而Executor就是在创建NioEventLoop对象时指定的ThreadPerTaskExecutor,上面分析过,每次ThreadPerTaskExecutor#execute会执行 threadFactory.newThread(command).start();命令启动新线程。线程内部如下:

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor 

    private final Executor executor;

    private void doStartThread() 
        assert thread == null;
        executor.execute(new Runnable() 
            @Override
            public void run() 
                //当前线程为子线程,赋值给thread成员,供下次inEventLoop()判断是否启动子线程
                thread = Thread.currentThread();
                if (interrupted) 
                    thread.interrupt();
                

                boolean success = false;
                updateLastExecutionTime();
                try 
                    //执行SingleThreadEventExecutor#run方法
                    SingleThreadEventExecutor.this.run();
                    success = true;
                 catch (Throwable t) 
                    logger.warn("Unexpected exception from an event executor: ", t);
                 finally 
                    //省略
                
            
        );
    

run方法由NioEventLoop实现,负责处理发生的事件。它是一个死循环,有两个重要方法,processSelectedKeys负责处理事件,runAllTasks处理事件:

public final class NioEventLoop extends SingleThreadEventLoop 
protected void run() 
        int selectCnt = 0;
        for (;;) 
            try 
                int strategy;
                try 
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) 
                    case SelectStrategy.CONTINUE:
                        continue;

                    case SelectStrategy.BUSY_WAIT:
                        // fall-through to SELECT since the busy-wait is not supported with NIO

                    case SelectStrategy.SELECT:
                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                        if (curDeadlineNanos == -1L) 
                            curDeadlineNanos = NONE; // nothing on the calendar
                        
                        nextWakeupNanos.set(curDeadlineNanos);
                        try 
                            //如果有task执行,就不阻塞select;否则阻塞等待socket事件
                            if (!hasTasks()) 
                                strategy = select(curDeadlineNanos);
                            
                         finally 
                            // This update is just to help block unnecessary selector wakeups
                            // so use of lazySet is ok (no race condition)
                            nextWakeupNanos.lazySet(AWAKE);
                        
                        // fall through
                    default:
                    
                 catch (IOException e) 
                    // If we receive an IOException here its because the Selector is messed up. Lets rebuild
                    // the selector and retry. https://github.com/netty/netty/issues/8566
                    rebuildSelector0();
                    selectCnt = 0;
                    handleLoopException(e);
                    continue;
                

                selectCnt++;
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                boolean ranTasks;
                //ioRatio表示处理SelectedKeys和执行tasks的事件占比
                //如果ioRatio为100,则执行完所有task
                if (ioRatio == 100) 
                    try 
                        if (strategy > 0) 
                            processSelectedKeys();
                        
                     finally 
                        // Ensure we always run tasks.
                        ranTasks = runAllTasks();
                    
                 else if (strategy > 0) 
                    //如果ioRatio小于100,则只能花费一半处理SelectedKeys时间,去处理task
                    final long ioStartTime = System.nanoTime();
                    try 
                        processSelectedKeys();
                     finally 
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    
                 else 
                    //当ioRatio小于等于0,只执行一个task
                    ranTasks = runAllTasks(0); // This will run the minimum number of tasks
                

                if (ranTasks || strategy > 0) 
                    if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) 
                        logger.debug("Selector.select() returned prematurely  times in a row for Selector .",
                                selectCnt - 1, selector);
                    
                    selectCnt = 0;
                 else if (unexpectedSelectorWakeup(selectCnt))  // Unexpected wakeup (unusual case)
                    selectCnt = 0;
                
             catch (CancelledKeyException e) 
                // Harmless exception - log anyway
                if (logger.isDebugEnabled()) 
                    logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector  - JDK bug?",
                            selector, e);
                
             catch (Throwable t) 
                handleLoopException(t);
            
            // Always handle shutdown even if the loop processing threw an exception.
            try 
                if (isShuttingDown()) 
                    closeAll();
                    if (confirmShutdown()) 
                        return;
                    
                
             catch (Throwable t) 
                handleLoopException(t);
            
        
    

由于这时ServerSocketChannel还没有注册到Selector中,因此Selector中不可能有事件,这时就直接开始执行register0这个task。

3.4.7 将ServerSocketChannel注册到selector中

上面发现NioEventLoop线程执行的死循环,最开始执行register0方法:

private void register0(ChannelPromise promise) 
            try 
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if (!promise.setUncancellable() || !ensureOpen(promise)) 
                    return;
                
                boolean firstRegistration = neverRegistered;
                //注册
                doRegister();
                neverRegistered = false;
                registered = true;

                // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                // user may already fire events through the pipeline in the ChannelFutureListener.
``  

                safeSetSuccess(promise);
                pipeline.fireChannelRegistered();
                // Only fire a channelActive if the channel has never been registered. This prevents firing
                // multiple channel actives if the channel is deregistered and re-registered.
                if (isActive()) 
                    if (firstRegistration) 
                        pipeline.fireChannelActive();
                     else if (config().isAutoRead()) 
                        // This channel was registered before and autoRead() is set. This means we need to begin read
                        // again so that we process inbound data.
                        //
                        // See https://github.com/netty/netty/issues/4805
                        beginRead();
                    
                
            
    

上述方法中,通过doRegister方法注册,很简单就可以看到这是将ServerSocketChannel注册到了Selector中:

public abstract class AbstractNioChannel extends AbstractChannel 
    protected void doRegister() throws Exception 
        boolean selected = false;
        for (;;) 
            try 
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
             catch (CancelledKeyException e) 
                if (!selected) 
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                 else 
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                
            
        
    

3.4.8 执行ServerSocketChannel的pipeline中的initChannel方法

register0执行到pipeline.invokeHandlerAddedIfNeeded();最终执行到DefaultChannelPipeline.callHandlerAddedForAllHandlers(),执行pendingHandlerCallbackHead链表中的Handler:

public class DefaultChannelPipeline implements ChannelPipeline 
    private void callHandlerAddedForAllHandlers() 
        final PendingHandlerCallback pendingHandlerCallbackHead;
        synchronized (this) 
            assert !registered;

            // This Channel itself was registered.
            registered = true;

            pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
            // Null out so it can be GCed.
            this.pendingHandlerCallbackHead = null;
        

        // This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
        // holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
        // the EventLoop.
        PendingHandlerCallback task = pendingHandlerCallbackHead;
        while (task != null) 
            task.execute();
            task = task.next;
        
    

Handler是在前面代码中放入的。在NioServerSocketChannel初始化时,调用了init方法。它加入了一个netty实现的ChannelInitializer类,内部负责在pipeline中加入自定义的Handler和ServerBootstrapAcceptor这个Handler:

p.addLast(new ChannelInitializer<Channel>() 
            @Override
            public void initChannel(final Channel ch) 
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) 
                    pipeline.addLast(handler);
                

                ch.eventLoop().execute(new Runnable() 
                    @Override
                    public void run() 
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    
                );
            
        );

而在addLast方法中,就通过callHandlerCallbackLater方法将ChannelInitializer这个Handler加入到pendingHandlerCallbackHead中:

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) 
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) 
            checkMultiplicity(handler);

            newCtx = newContext(group, filterName(name, handler), handler);

            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventLoop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) 
                newCtx.setAddPending();
                //将ChannelInitializer这个Handler加入到pendingHandlerCallbackHead中
                callHandlerCallbackLater(newCtx, true);
                return this;
            

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) 
                callHandlerAddedInEventLoop(newCtx, executor);
                return this;
            
        
        callHandlerAdded0(newCtx);
        return this;
    

callHandlerCallbackLater方法将ChannelInitializer加入到链表中:

    private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) 
        assert !registered;

        PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
        PendingHandlerCallback pending = pendingHandlerCallbackHead;
        if (pending == null) 
            pendingHandlerCallbackHead = task;
         else 
            // Find the tail of the linked-list.
            while (pending.next != null) 
                pending = pending.next;
            
            pending.next = task;
        
    

最终,执行Handler的initChannel方法,如下:

public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter 
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception 
        if (ctx.channel().isRegistered()) 
            // This should always be true with our current DefaultChannelPipeline implementation.
            // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
            // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
            // will be added in the expected order.
            if (initChannel(ctx)) 

                // We are done with init the Channel, removing the initializer now.
                removeState(ctx);
            
        
    

而initChannel方法则是向pipeline中加入Handler:

                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) 
                    //这里的Handler时main方法中自定义的Handler
                    pipeline.addLast(handler);
                
                //在当前子线程中,封装一个task,task中负责将一个ServerBootstrapAcceptor放入pipeline中
                ch.eventLoop().execute(new Runnable() 
                    @Override
                    public void run() 
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    
                );

在当前ChannelInitializer的方法没执行完前,又加入了一个自定义的Handler:

serverBootstrap.option(ChannelOption.SO_BACKLOG, 128)
                    .handler(new ChannelInitializer<ServerSocketChannel>() 
                        @Override
                        protected void initChannel(ServerSocketChannel ch) throws Exception 
                            ch.pipeline().addLast(new ServerHandler());
                        
                    )

下一次handlerAdded方法会执行这个initChannel方法,又加入了ServerHandler。

通过pipeline.invokeHandlerAddedIfNeeded();调用,依次执行了netty自定义的ChannelInitializer和main线程中自定义的ChannelInitializer。在ChannelInitializer中,放入了负责处理数据的Handler。这些Handler放到了pipeline中。

3.4.9 执行Pipeline中的handler的channelRegistered方法

调用完initChannel方法后,最后执行pipeline.fireChannelRegistered(),用于执行所有Handler中channelRegistered方法,fireChannelRegistered方法最终执行AbstractChannelHandlerContext#invokeChannelRegistered,开始执行pipeline中的handler实现的channelRegistered方法:

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint 
    private void invokeChannelRegistered() 
        if (invokeHandler()) 
            try 
                ((ChannelInboundHandler) handler()).channelRegistered(this);
             catch (Throwable t) 
                invokeExceptionCaught(t);
            
         else 
            fireChannelRegistered();
        
    

由于pipeline中加入了自定义的NettyHandler类。该类实现了channelRegistered方法。最终必定会执行该方法:

public class ServerHandler extends ChannelInboundHandlerAdapter 
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception 
        System.out.println("channelRegistered");
        super.channelRegistered(ctx);
    

执行到super.channelRegistered(ctx),会尝试找下一个Handler执行。如下,继续执行invokeChannelRegistered方法:

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint 
    public ChannelHandlerContext fireChannelRegistered() 
        invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
        return this;
    

其中,findContextInbound方法非常重要。他负责找下一个Handler:

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint 
    private AbstractChannelHandlerContext findContextInbound(int mask) 
        AbstractChannelHandlerContext ctx = this;
        EventExecutor currentExecutor = executor();
        do 
            ctx = ctx.next;
         while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
        return ctx;
    

最终,通过在每个channelRegistered方法最后,加入super.channelRegistered(ctx)语句,就可以执行完所有责任链中所有Handler的channelRegistered方法。对于其他方法,比如channelActivechannelRead,通过加入类似语句,同样可以执行完责任链中的所有相关方法。

3.4.10 绑定端口

在上面ServerSocketChannel绑定到Selector后,且自定义的Handler和最重要的ServerBootstrapAcceptor这个Handler加入pipeline后,就可以将ServerSocketChannel绑定端口,提供服务了:

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable 
    private ChannelFuture doBind(final SocketAddress localAddress) 
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) 
            return regFuture;
        

        if (regFuture.isDone()) 
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            //绑定端口
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
         
    

3.4.11 ServerSocketChannel接受客户端请求

上述register注册任务完成,并且bind完成后,NioEventLoop子线程也在进行死循环。只要task队列没有任务,就阻塞进行select,如果阻塞超时或者有socket事件发生,后面就调用processSelectedKeys方法处理socket事件:

protected void run() 
        int selectCnt = 0;
        for (;;) 
            try 
                int strategy;
                try 
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) 
                    case SelectStrategy.CONTINUE:
                        continue;

                    case SelectStrategy.BUSY_WAIT:
                        // fall-through to SELECT since the busy-wait is not supported with NIO

                    case SelectStrategy.SELECT:
                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                        if (curDeadlineNanos == -1L) 
                            curDeadlineNanos = NONE; // nothing on the calendar
                        
                        nextWakeupNanos.set(curDeadlineNanos);
                        try 
                            if (!hasTasks()) 
                                strategy = select(curDeadlineNanos);
                            
                         finally 
                            // This update is just to help block unnecessary selector wakeups
                            // so use of lazySet is ok (no race condition)
                            nextWakeupNanos.lazySet(AWAKE);
                        
                        // fall through
                    default:
                    
                //省略

                selectCnt++;
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                boolean ranTasks;
                if (ioRatio == 100) 
                    try 
                        if (strategy > 0) 
                            //处理socket事件
                            processSelectedKeys();
                        
                     finally 
                        // Ensure we always run tasks.
                        ranTasks = runAllTasks();
                    
                 else if (strategy > 0) 
                    final long ioStartTime = System.nanoTime();
                    try 
                        processSelectedKeys();
                     finally 
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    
                 else 
                    ranTasks = runAllTasks(0); // This will run the minimum number of tasks
                
                //省略
            
        
    

NioEventLoop类中的select()方法就是通过EPollSelectorImpl的select()方法阻塞等待socket事件发生,当然会设置阻塞的超时时间:

public final class NioEventLoop extends SingleThreadEventLoop 

    private S

以上是关于全流程分析Netty设计思路与实践的主要内容,如果未能解决你的问题,请参考以下文章

基于Jenkins的持续交付全流程设计与实践

京东到家基于Netty与WebSocket的实践

直播回顾 | 告警全生命周期管理的思路与落地实践

JDK ThreadPoolExecutor核心原理与实践

全链路设计与实践

Netty源码分析-NioByteUnsafe(read读取流程)