牛了Netty之EventLoop线程

Posted 像写诗一样写代码

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了牛了Netty之EventLoop线程相关的知识,希望对你有一定的参考价值。

在使用Netty时,我们老生常谈的是避免在Netty的IO线程中做耗费时间的业务操作和一些长时间的阻塞。本文将通过对Netty中核心类的剖析,帮助读者理解这个问题以及其中的EventLoop设计,从而在使用Netty时更加得心应手。


1

EventLoop是什么


用笔者的理解结合EventLoop维基百科的说明,总结如下

EventLoop是计算机科学中的常用设计模型,其等待事件的就绪并且对事件进行调度分发。EventLoop通常会阻塞等待直到事件就绪。然后调用相关的处理程序。EventLoop通常和Reactor模型一起结合使用。EventLoop并且事件产生源通常和EventLoop不在同一个线程

EventLoop整个是一个大循环,用伪代码来解释其基本运作机制:

1while not shotdown:
2    message = get_next_message()
3    process_message(message)

EventLoop在计算机系统设计中应用广泛,常用于网络库,例如本文重点介绍的Netty、JS的并发模型也建立在EventLoop之上。

后文将主要介绍EventLoop在Netty中的实现,帮助大家理解Netty底层的运作机制中核心一环。



2

Netty中的EventLoop使用


以EventLoop接口中的doc解释开局

Will handle all the I/O operations for a Channel once registered. One EventLoop instance will usually handle more than one Channel but this may depend on implementation details and internals.

其中说到EventLoop通常为多个channel服务,监听并处理这些注册上来的channel的就绪的IO操作。其实在Netty中,不仅是这些channel的IO操作,包括和channel关联的的定时事件也会交由对应的EventLoop进行执行。这一点后文会分析到。借用<<Netty In Action> >书中的图片来阐述这种关系。



一旦一个 Channel 被分配给一个 EventLoop,该Channel的整个生命周期中都使用这个EventLoop。这种做法带来最显而易见的好处是避免了多个线程操作同一个Channel和该Channel上资源。从而免除很多线程安全问题,实现局部无锁,在消除了锁带来的性能影响之外,也无需开发人员去关心其上的并发问题,降低了使用网络通讯库的开发门槛。

Netty中的EventLoop有多种实现,有基于select的NioEventLoop、基于Epoll的EpollEventLoop以及macOS下的KQueueEventLoop等。本文以NioEventLoop举例,其的继承关系如下:

牛了Netty之EventLoop线程

SingleThreadEventExecutor类中定义了Thread和taskQueue任务队列。该Thread就是该Eventloop绑定的Thread。通过该Thread执行该EventLoop上的所有Channel的IO操作、任务操作。

除此之外,可以看到SingleThreadEventExecutor类还往上继承了AbstractScheduledEventExecutor类,该抽象类中通过优先级队列提供了EventLoop对于定时任务的支持,定时任务典型的典型使用场景就是Nettty原生提供的对于心跳的支持类IdleStateHandler,当然扯远了,这并不是本文的重点,之后会再写篇文章详细解剖该类的打开方式以及原理。

NioEventLooprun方法是Netty Reactor事件模型的主体,其本质就是一个大循环,如下所示:

 1 @Override
2    protected void run() {
3        for (;;) {
4            try {
5                 // 根据当前的selectStrategy进行判断当前是否要进入select环节或者跳过
6                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
7                    case SelectStrategy.CONTINUE:
8                        continue;
9                    case SelectStrategy.SELECT:
10                        select(wakenUp.getAndSet(false)); // 阻塞等待
11                    default:
12                }
13
14                cancelledKeys = 0;
15                needsToSelectAgain = false;
16                final int ioRatio = this.ioRatio;
17                if (ioRatio == 100) {  // ioRatio表示执行IO事件的时间比率,如果设置了100,则会优先执行完所有的IO操作,                                         //  再去
18                    try {
19                        processSelectedKeys(); //处理IO事件
20                    } finally {
21                        runAllTasks(); //执行非IO事件
22                    }
23                } else {
24                    final long iostartTime = System.nanoTime();
25                    try {
26                        processSelectedKeys();
27                    } finally {
28                        final long ioTime = System.nanoTime() - ioStartTime; //计算执行IO事件花费的时间
29                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio); //根据比例计算执行非IO事件所应该的时间
30                    }
31                }
32            } catch (Throwable t) {
33                handleLoopException(t);
34            }
35            ... 省略
36        }
37    }

NioEventLoop通过一个线程干了所有Channel的IO事件和定时事件。这就是文章开头说到的为什么Netty中的handler中或者定时事件中不建议运行长时间的业务逻辑。

作为一个大轮询,当然也不能让他一直没事干也跑着。这会浪费大量的CPU资源。最好的方法就是没事干的时候就释放CPU资源让CPU去做别的事,有事干的时候就立马能响应过来恢复到一线。select(wakenUp.getAndSet(false));干的就是这事。

 1   private void select(boolean oldWakenUp) throws IOException {
2        Selector selector = this.selector;
3        try {
4            int selectCnt = 0;
5            long currentTimeNanos = System.nanoTime();
6            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); // ①
7
8            for (;;) {
9                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L// ②
10                if (timeoutMillis <= 0) {
11                    if (selectCnt == 0) {
12                        // 立即
13                        selector.selectNow();
14                        selectCnt = 1;
15                    }
16                    break;
17                }
18                if (hasTasks() && wakenUp.compareAndSet(falsetrue)) { // ③
19                    selector.selectNow();
20                    selectCnt = 1;
21                    break;
22                }
23
24                int selectedKeys = selector.select(timeoutMillis); // ④
25                selectCnt ++;
26
27                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
28                   //⑤
29                    break;
30                }
31
32              //省略无关代码
33
34        }

标号①:计算selectDeadLineNanos其根据delayNanos计算得到当前EventLoop的最近将要执行的定时任务的时间作为本次select的截止时间。

标号②:计算当前距离截止时间的毫秒数,其中加上了500000,可以说预留了0.5ms的时间去缓冲其他的代码,从而提升定时任务的实时性。

标号③:会再次进行检查当前是否有等待执行的任务,此时如果不在select阻塞调用之前判断,则会出现提交的任务长时间得不到执行。

标号④:调用带timeoutMillis参数的select方法,进行限期等待IO事件的就绪。

标号⑤:如果满足有就绪的IO事件、select阻塞过程被用户线程唤醒(一般是用户在非本EventLoop线程提交了新的任务,为了避免某个定时任务延迟很长导致timeoutMillis时间过大,允许提交新任务时立即唤醒select)、有待执行的任务、有就绪的定时任务则会理解跳出本次select,进入正式的IO处理和事件处理流程。


说完了EventLoop的循环,我们在文章开头处介绍EventLoop的时候,也说到EventLoop线程和提交就绪事件的事件源通常不在一个线程,IO事件的话很好理解,操作系统通过中断机制通知内核进而通知到我们用户空间阻塞在select等上的系统调用,那非IO事件的情况下,Netty是如何处理这些事件的入队和通知EventLoop?

如下图所示,当程序在channel上调用execute其实是交给了该channel托管的EventLoop线程执行了,当前线程调用线程不同行为也有所不同。如下图所示:



以在channel上调用write进行写数据为例,当我们调用write或者writeAndFlush时,为了达到串行化无锁操作,Netty在执行方法时会判断当前线程是否就是该channel绑定的EventLoop线程。
具体代码可以在io.netty.channel.AbstractChannelHandlerContext#write中找到,

 1  private void write(Object msg, boolean flush, ChannelPromise promise) {
2        if (executor.inEventLoop()) {
3            if (flush) {
4                next.invokeWriteAndFlush(m, promise);
5            } else {
6                next.invokeWrite(m, promise);
7            }
8        } else {
9            final AbstractWriteTask task;
10            if (flush) {
11                task = WriteAndFlushTask.newInstance(next, m, promise);
12            }  else {
13                task = WriteTask.newInstance(next, m, promise);
14            }
15            if (!safeExecute(executor, task, promise, m)) {
16                task.cancel();
17            }
18        }
19    }


  1. 如果是,则会继续执行,进行写操作。(虽然这个过程也不一定立刻写到Socket内核缓冲区的)。

  2. 如果不是,则会通过safeExecute方法将该write操作当做任务交给EventLoop来处理,safeExecute方法会直接调用NioEventLoop的父类execute方法,将该任务提交到待执行的taskQueue中,并且唤醒EventLoop要求执行

这部分逻辑在NioEventLoop的父类SingleThreadEventExecutor#execute

 1 @Override
2 public void execute(Runnable task) {
3        ....
4        boolean inEventLoop = inEventLoop();
5        addTask(task);
6        if (!inEventLoop) { 
7            startThread(); // 如果该EventLoop绑定线程没启动
8            if (isShutdown() && removeTask(task)) {
9                reject();
10            }
11        }
12        if (!addTaskWakesUp && wakesUpForTask(task)) {
13            // 该方法会立即调用selector.wakeup()方法
14            wakeup(inEventLoop);
15        }
16 }

startThread会在第一次调用EventLoop的execute方法时进行初始化,通过CAS的方式保证只会初始化一次。
并且在满足addTaskWakesUp参数为false时(目前只有在BIO的情况下,该参数为true)以及传入的task类型不属于NonWakeupRunnable的情况下,都会去调用wakeup唤醒selector,从而推动EventLoop执行。

 1private void startThread() {
2    if (state == ST_NOT_STARTED) {
3        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
4            try {
5                doStartThread();
6            } catch (Throwable cause) {
7                STATE_UPDATER.set(this, ST_NOT_STARTED);
8                PlatformDependent.throwException(cause);
9            }
10        }
11    }
12}

doStartThread方法中就进行调用了我们EventLoop核心的run方法,从而驱动EventLoop这个精密的机器开始工作。



3

总结


总的流程说下来。NioEventLoop作为reactor的核心,其主要工作可以浓缩成如下伪代码,即通过一个线程和任务队列在EvenLoop理念下把Netty和NIO结合到极致。

1while True:
2    wait_event_ready()         // 等待任意事件就绪,并且能被外界唤醒
3    process_io_events(io_events)    // 处理就绪的IO事件
4    process_task_and_schedule_task(other_events) //处理该EventLoop托管的一般任务和就绪的定时任务


篇幅原因,后续笔者将单独分析本文中隐藏的两个细节,即Netty中原生心跳支持处理类IdleStateHandler和netty中对write异步写过程的分析和笔者线上在其上遇到的坑,欢迎关注和在看~



推荐阅读


《EventLoop是什么》https://www.ruanyifeng.com/blog/2013/10/event_loop.html

《EventLoop维基百科》https://en.wikipedia.org/wiki/Event_loop
《NodeJS EventLoop》https://blog.insiderattack.net/event-loop-and-the-big-picture-nodejs-event-loop-part-1-1cb67a182810
《concurrency-vs-event-loop-vs-event-loop-concurrency》https://medium.com/@tigranbs/concurrency-vs-event-loop-vs-event-loop-concurrency-eb542ad4067b
《Netty In Action》https://livebook.manning.com/book/netty-in-action/chapter-7/74





本文由“壹伴编辑器”提供技术支持





精彩推荐






以上是关于牛了Netty之EventLoop线程的主要内容,如果未能解决你的问题,请参考以下文章

6. Netty源码分析之EventLoop与EventLoopGroup

Netty的EventLoop和线程模型

Netty实战七之EventLoop和线程模型

Netty源码EventLoop

Netty实战-EventLoop和线程模型

Netty的常用API