Netty核心组件之NioEventLoop(一)

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty核心组件之NioEventLoop(一)相关的知识,希望对你有一定的参考价值。

参考技术A

在接下来几篇文章,我会通过Netty的源码深入讲解NioEventLoop的实现机制。
特别说明:基于4.1.52版本的源码

先来看下NioEventLoop的类关系图和重要的属性,对其有一个整体的感知,便于后面详细分析。

首先来看NioEventLoop的构造函数

默认情况下,会创建MPSC,即多生产者单消费者的队列,这里最终会用到JCTools库,这里不过多介绍,感兴趣的可以自己去了解。

如果设置了优化开关(默认优化选项是开启的),则通过反射的方式从Selector中获取selectedKeys和publicSelectedKeys,将这两个成员设置为可写,通过反射,使用Netty构造的selectedKeySet将原生JDK的selectedKeys替换掉。
我们知道使用Java原生NIO接口时,需要先调Selector的select方法,再调selectedKeys方法才可以获得有IO事件准备好的SelectionKey集合。这里优化过后,只通过一步select调用,就可以从selectedKeySet获得需要的SelectionKey集合。
另外,原生Java的SelectionKey集合是一个HashSet,这里优化过后的SelectedSelectionKeySet底层是一个数组,效率更高。

EventLoop的职责可以用下面这张图形象的表示

下面详细解析:

每次循环,都会检测任务队列和IO事件,如果任务队列中没有任务,则直接返回SelectStrategy.SELECT;如果任务队列中有任务,则会调用非阻塞的 selectNow 检测有IO事件准备好的Channel数。

nextScheduledTaskDeadlineNanos 方法返回下一个将要被执行的定时任务的截止时间

NioEventLoop的定时任务队列是一个优先级队列,队列中存储的是ScheduledFutureTask对象

通过ScheduledFutureTask的 compareTo 方法可以看出,优先级队列中的元素是以任务的截止时间来排序的,队首元素的截止时间最小,当截止时间相同时,以任务ID排序,ID小的排在前面。

当定时任务ScheduledFutureTask执行后,会根据 periodNanos 的取值决定是否要将任务重新放回队列。从netty的注释可以清晰看到:

看下ScheduledFutureTask的 run 方法

当任务的执行时间还未到,则判断任务是否已经取消,如果已取消则移除任务,否则重新加入队列。对于只执行一次的任务,执行完了不会再放回队列。其他的任务,则根据 periodNanos 的类型,重新计算截止时间,重新放回队列,等待下次调度。
定时任务的优先级队列到此介绍完毕,接着看NioEventLoop的 run 方法

在调用 select 之前,再次调用 hasTasks() 判断从上次调用该方法到目前为止是否有任务加入,多做了一层防护,因为调用 select 时,可能会阻塞,这时,如果任务队列中有任务就会长时间得不到执行,所以须小心谨慎。
如果任务队列中还是没有任务,则会调用 select 方法。在这个方法中会根据入参 deadlineNanos 来选择调用NIO的哪个select方法:

到这里,可能有人要问了: 在上面的方法中,如果调用了Java NIO的无参的 select 方法,就会进入阻塞,除非检测到Channel的IO事件,那么在检测到IO事件之前,加入到任务队列中的任务怎么得到执行呢?

好,你想,在检测到IO事件之前,可以退出阻塞的方法是什么?对,调用 wakeup 方法。那么我们来搜一下NioEventLoop中有调用Selector的 wakeup 方法的地方吗:

还真搜到了,再看一下这个方法被调用的地方

看到SingleThreadEventExecutor的 execute 方法了吗,就是说在调 execute 方法,向EventLoop提交任务时,会将EventLoop线程从Java NIO的select阻塞中唤醒。

到这里,NioEventLoop的run方法的职责之一:检测Channel的IO事件就讲解完毕。

至于IO事件的处理以及任务队列中任务的处理会在后面的文章中解析,敬请期待。

在本文中,对Netty的NioEventLoop进行了深入的解读,并且详细讲解了它的三大职责之一:检测Channel的IO事件的机制。
NioEventLoop是Netty最核心的概念,内部运行机制很复杂,在接下来的两篇文章中会继续分析。

netty源码之NioEventLoop

目录

一、初始化

1、NioEventLoopGroup最终都是调用父类MultithreadEventLoopGroup的构造器

2、通过父类MultiThreadEventExecutorGroup构造器创建NioEventLoopGroup实例,内部维护了一个SingleThreadEventExecutor类型的数组,通过newChild()方法进行实例化

3、newChild()方法实例化的是NioEventLoop类型的EventLoop对象

4、NioEventLoop 打开一个selector

二、EventLoop 与 Channel的关系

1、客户端在通过bootstrap启动时会创建一个Channel实例并进行初始化,在初始化过程中会绑定一个NioEventLoop

2、NioEventLoopGroup调用父类的register()方法将获取一个EventLoop与通道绑定

3、最终调用了 AbstractChannel#AbstractUnsafe.register 后完成了 Channel 和 EventLoop 的关联,将获取的EventLoop值赋值给AbstractChannel内的一个eventLoop属性

4、调用AbstractChannel的register0()方法最终完成通道的注册


一、初始化

    ServerBootstrap b = new ServerBootstrap();
    
    EventLoopGroup bossGroup = new NioEventLoopGroup();    //创建多线程线程池
    
    EventLoopGroup workGroup = new NioEventLoopGroup();    //创建多线程线程池
    
    b.group(bossGroup,workGroup); 
    

服务端的acceptor ,处理IO事件分两种:监听请求建立连接 和 处理连接 。

在reactor多线程模型下 , 两种事件都是有同一个线程处理。

在reactor主从线程模型下 , bossGroup负责处理监听连接事件 , workerGroup负责处理连接

1、NioEventLoopGroup最终都是调用父类MultithreadEventLoopGroup的构造器

    protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
        super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
    }

2、通过父类MultiThreadEventExecutorGroup构造器创建NioEventLoopGroup实例,内部维护了一个SingleThreadEventExecutor类型的数组,通过newChild()方法进行实例化

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                                EventExecutorChooserFactory chooserFactory, Object... args) {
            // 创建一个线程池
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
            // 创建一个NioEventLoop数组
            children = new EventExecutor[nThreads];
    
            for (int i = 0; i < nThreads; i ++) {
                children[i] = newChild(executor, args);
            }
            // 生成一个线程选择器
            chooser = chooserFactory.newChooser(children);
    }

3、newChild()方法实例化的是NioEventLoop类型的EventLoop对象

NioEventLoop类继承了SingleThreadEventExecutor类 , 该类种有一个thread属性 ,用以绑定本地线程

    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
            return new NioEventLoop(this, executor, (SelectorProvider) args[0],
                ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }
    
    
    

4、NioEventLoop 打开一个selector

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        final SelectorTuple selectorTuple = openSelector();
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }

二、EventLoop 与 Channel的关系

1、客户端在通过bootstrap启动时会创建一个Channel实例并进行初始化,在初始化过程中会绑定一个NioEventLoop

        -> AbstractBootstrap.initAndRegister()
        -> channelFactory.newChannel()  
        -> group().register(channel) 
        -> MultiThreadEventLoopGroup.regiser()

2、NioEventLoopGroup调用父类的register()方法将获取一个EventLoop与通道绑定

    public ChannelFuture register(Channel channel) {
        return this.next().register(channel);
    }

3、最终调用了 AbstractChannel#AbstractUnsafe.register 后完成了 Channel 和 EventLoop 的关联,将获取的EventLoop值赋值给AbstractChannel内的一个eventLoop属性

    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        // 删除条件检查.
        ...
        AbstractChannel.this.eventLoop = eventLoop;
    
        if (eventLoop.inEventLoop()) {
            register0(promise);
        } else {
            try {
                eventLoop.execute(new OneTimeTask() {
                    @Override
                    public void run() {
                        register0(promise);
                    }
                });
            } catch (Throwable t) {
                ...
            }
        }
    }

4、调用AbstractChannel的register0()方法最终完成通道的注册

    private void register0(ChannelPromise promise) {
        boolean firstRegistration = neverRegistered;
        doRegister();
        neverRegistered = false;
        registered = true;
        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
        if (firstRegistration && isActive()) {
            pipeline.fireChannelActive();
        }
    }
    
    
    protected void doRegister() throws Exception {
        // 省略错误处理
        selectionKey = javaChannel().register(eventLoop().selector, 0, this);
    }


 

以上是关于Netty核心组件之NioEventLoop(一)的主要内容,如果未能解决你的问题,请参考以下文章

Netty的NioEventLoop和NioEventLoopGroup是什么关系?

Netty学习系列-EventLoop

netty源码之NioEventLoop

netty源码之NioEventLoop

Netty源码分析之NioEventLoop执行流程

Netty4.XNetty源码分析之NioEventLoop