牛了Netty之EventLoop线程
Posted 像写诗一样写代码
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了牛了Netty之EventLoop线程相关的知识,希望对你有一定的参考价值。
在使用Netty时,我们老生常谈的是避免在Netty的IO线程中做耗费时间的业务操作和一些长时间的阻塞。本文将通过对Netty中核心类的剖析,帮助读者理解这个问题以及其中的EventLoop设计,从而在使用Netty时更加得心应手。
1
EventLoop是什么
用笔者的理解结合EventLoop维基百科的说明,总结如下
EventLoop是计算机科学中的常用设计模型,其等待事件的就绪并且对事件进行调度分发。EventLoop通常会阻塞等待直到事件就绪。然后调用相关的处理程序。EventLoop通常和Reactor模型一起结合使用。EventLoop并且事件产生源通常和EventLoop不在同一个线程
EventLoop整个是一个大循环,用伪代码来解释其基本运作机制:
1while not shotdown:
2 message = get_next_message()
3 process_message(message)
EventLoop在计算机系统设计中应用广泛,常用于网络库,例如本文重点介绍的Netty、JS的并发模型也建立在EventLoop之上。
后文将主要介绍EventLoop在Netty中的实现,帮助大家理解Netty底层的运作机制中核心一环。
2
Netty中的EventLoop使用
以EventLoop接口中的doc解释开局
Will handle all the I/O operations for a Channel once registered.
One EventLoop instance will usually handle more than one Channel
but this may depend on implementation details and internals.
其中说到EventLoop通常为多个channel服务,监听并处理这些注册上来的channel的就绪的IO操作。其实在Netty中,不仅是这些channel的IO操作,包括和channel关联的的定时事件也会交由对应的EventLoop进行执行。这一点后文会分析到。借用<<Netty In Action>
一旦一个 Channel 被分配给一个 EventLoop,该Channel的整个生命周期中都使用这个EventLoop。这种做法带来最显而易见的好处是避免了多个线程操作同一个Channel和该Channel上资源。从而免除很多线程安全问题,实现局部无锁,在消除了锁带来的性能影响之外,也无需开发人员去关心其上的并发问题,降低了使用网络通讯库的开发门槛。
Netty中的EventLoop有多种实现,有基于select的NioEventLoop
、基于Epoll的EpollEventLoop
以及macOS下的KQueueEventLoop
等。本文以NioEventLoop
举例,其的继承关系如下:
SingleThreadEventExecutor
类中定义了Thread和taskQueue任务队列。该Thread就是该Eventloop绑定的Thread。通过该Thread执行该EventLoop上的所有Channel的IO操作、任务操作。
除此之外,可以看到SingleThreadEventExecutor
类还往上继承了AbstractScheduledEventExecutor
类,该抽象类中通过优先级队列提供了EventLoop对于定时任务的支持,定时任务典型的典型使用场景就是Nettty原生提供的对于心跳的支持类IdleStateHandler
,当然扯远了,这并不是本文的重点,之后会再写篇文章详细解剖该类的打开方式以及原理。
NioEventLoop
的run
方法是Netty Reactor事件模型的主体,其本质就是一个大循环,如下所示:
1 @Override
2 protected void run() {
3 for (;;) {
4 try {
5 // 根据当前的selectStrategy进行判断当前是否要进入select环节或者跳过
6 switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
7 case SelectStrategy.CONTINUE:
8 continue;
9 case SelectStrategy.SELECT:
10 select(wakenUp.getAndSet(false)); // 阻塞等待
11 default:
12 }
13
14 cancelledKeys = 0;
15 needsToSelectAgain = false;
16 final int ioRatio = this.ioRatio;
17 if (ioRatio == 100) { // ioRatio表示执行IO事件的时间比率,如果设置了100,则会优先执行完所有的IO操作, // 再去
18 try {
19 processSelectedKeys(); //处理IO事件
20 } finally {
21 runAllTasks(); //执行非IO事件
22 }
23 } else {
24 final long iostartTime = System.nanoTime();
25 try {
26 processSelectedKeys();
27 } finally {
28 final long ioTime = System.nanoTime() - ioStartTime; //计算执行IO事件花费的时间
29 runAllTasks(ioTime * (100 - ioRatio) / ioRatio); //根据比例计算执行非IO事件所应该的时间
30 }
31 }
32 } catch (Throwable t) {
33 handleLoopException(t);
34 }
35 ... 省略
36 }
37 }
NioEventLoop通过一个线程干了所有Channel的IO事件和定时事件。这就是文章开头说到的为什么Netty中的handler中或者定时事件中不建议运行长时间的业务逻辑。
作为一个大轮询,当然也不能让他一直没事干也跑着。这会浪费大量的CPU资源。最好的方法就是没事干的时候就释放CPU资源让CPU去做别的事,有事干的时候就立马能响应过来恢复到一线。select(wakenUp.getAndSet(false));
干的就是这事。
1 private void select(boolean oldWakenUp) throws IOException {
2 Selector selector = this.selector;
3 try {
4 int selectCnt = 0;
5 long currentTimeNanos = System.nanoTime();
6 long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); // ①
7
8 for (;;) {
9 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; // ②
10 if (timeoutMillis <= 0) {
11 if (selectCnt == 0) {
12 // 立即
13 selector.selectNow();
14 selectCnt = 1;
15 }
16 break;
17 }
18 if (hasTasks() && wakenUp.compareAndSet(false, true)) { // ③
19 selector.selectNow();
20 selectCnt = 1;
21 break;
22 }
23
24 int selectedKeys = selector.select(timeoutMillis); // ④
25 selectCnt ++;
26
27 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
28 //⑤
29 break;
30 }
31
32 //省略无关代码
33
34 }
标号①:计算selectDeadLineNanos
其根据delayNanos
计算得到当前EventLoop的最近将要执行的定时任务的时间作为本次select的截止时间。
标号②:计算当前距离截止时间的毫秒数,其中加上了500000
,可以说预留了0.5ms的时间去缓冲其他的代码,从而提升定时任务的实时性。
标号③:会再次进行检查当前是否有等待执行的任务,此时如果不在select阻塞调用之前判断,则会出现提交的任务长时间得不到执行。
标号④:调用带timeoutMillis
参数的select方法,进行限期等待IO事件的就绪。
标号⑤:如果满足有就绪的IO事件、select阻塞过程被用户线程唤醒(一般是用户在非本EventLoop线程提交了新的任务,为了避免某个定时任务延迟很长导致timeoutMillis时间过大,允许提交新任务时立即唤醒select)、有待执行的任务、有就绪的定时任务则会理解跳出本次select,进入正式的IO处理和事件处理流程。
说完了EventLoop的循环,我们在文章开头处介绍EventLoop的时候,也说到EventLoop线程和提交就绪事件的事件源通常不在一个线程,IO事件的话很好理解,操作系统通过中断机制通知内核进而通知到我们用户空间阻塞在select
等上的系统调用,那非IO事件的情况下,Netty是如何处理这些事件的入队和通知EventLoop?
如下图所示,当程序在channel上调用execute其实是交给了该channel托管的EventLoop线程执行了,当前线程调用线程不同行为也有所不同。如下图所示:
以在channel上调用write
进行写数据为例,当我们调用write
或者writeAndFlush
时,为了达到串行化无锁操作,Netty在执行方法时会判断当前线程是否就是该channel绑定的EventLoop
线程。
具体代码可以在io.netty.channel.AbstractChannelHandlerContext#write
中找到,
1 private void write(Object msg, boolean flush, ChannelPromise promise) {
2 if (executor.inEventLoop()) {
3 if (flush) {
4 next.invokeWriteAndFlush(m, promise);
5 } else {
6 next.invokeWrite(m, promise);
7 }
8 } else {
9 final AbstractWriteTask task;
10 if (flush) {
11 task = WriteAndFlushTask.newInstance(next, m, promise);
12 } else {
13 task = WriteTask.newInstance(next, m, promise);
14 }
15 if (!safeExecute(executor, task, promise, m)) {
16 task.cancel();
17 }
18 }
19 }
如果是,则会继续执行,进行写操作。(虽然这个过程也不一定立刻写到Socket内核缓冲区的)。
如果不是,则会通过
safeExecute
方法将该write操作当做任务交给EventLoop
来处理,safeExecute
方法会直接调用NioEventLoop
的父类execute
方法,将该任务提交到待执行的taskQueue
中,并且唤醒EventLoop要求执行。
这部分逻辑在NioEventLoop
的父类SingleThreadEventExecutor#execute
中
1 @Override
2 public void execute(Runnable task) {
3 ....
4 boolean inEventLoop = inEventLoop();
5 addTask(task);
6 if (!inEventLoop) {
7 startThread(); // 如果该EventLoop绑定线程没启动
8 if (isShutdown() && removeTask(task)) {
9 reject();
10 }
11 }
12 if (!addTaskWakesUp && wakesUpForTask(task)) {
13 // 该方法会立即调用selector.wakeup()方法
14 wakeup(inEventLoop);
15 }
16 }
startThread
会在第一次调用EventLoop的execute方法时进行初始化,通过CAS的方式保证只会初始化一次。
并且在满足addTaskWakesUp
参数为false时(目前只有在BIO的情况下,该参数为true)以及传入的task类型不属于NonWakeupRunnable
的情况下,都会去调用wakeup唤醒selector,从而推动EventLoop执行。
1private void startThread() {
2 if (state == ST_NOT_STARTED) {
3 if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
4 try {
5 doStartThread();
6 } catch (Throwable cause) {
7 STATE_UPDATER.set(this, ST_NOT_STARTED);
8 PlatformDependent.throwException(cause);
9 }
10 }
11 }
12}
doStartThread
方法中就进行调用了我们EventLoop核心的run方法,从而驱动EventLoop这个精密的机器开始工作。
3
总结
总的流程说下来。NioEventLoop
作为reactor的核心,其主要工作可以浓缩成如下伪代码,即通过一个线程和任务队列在EvenLoop理念下把Netty和NIO结合到极致。
1while True:
2 wait_event_ready() // 等待任意事件就绪,并且能被外界唤醒
3 process_io_events(io_events) // 处理就绪的IO事件
4 process_task_and_schedule_task(other_events) //处理该EventLoop托管的一般任务和就绪的定时任务
篇幅原因,后续笔者将单独分析本文中隐藏的两个细节,即Netty中原生心跳支持处理类IdleStateHandler
和netty中对write
的异步写过程的分析和笔者线上在其上遇到的坑,欢迎关注和在看~
《EventLoop是什么》https://www.ruanyifeng.com/blog/2013/10/event_loop.html
《EventLoop维基百科》https://en.wikipedia.org/wiki/Event_loop
《NodeJS EventLoop》https://blog.insiderattack.net/event-loop-and-the-big-picture-nodejs-event-loop-part-1-1cb67a182810
《concurrency-vs-event-loop-vs-event-loop-concurrency》https://medium.com/@tigranbs/concurrency-vs-event-loop-vs-event-loop-concurrency-eb542ad4067b
《Netty In Action》https://livebook.manning.com/book/netty-in-action/chapter-7/74
本文由“壹伴编辑器”提供技术支持
以上是关于牛了Netty之EventLoop线程的主要内容,如果未能解决你的问题,请参考以下文章