从Netty EventLoop实现上可以学到什么
Posted TopCoder
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从Netty EventLoop实现上可以学到什么相关的知识,希望对你有一定的参考价值。
编者注:本文主要讨论Netty NioEventLoop原理及实践,关于Netty NioEventLoop,首先要知道NioEventLoop是什么,为什么它会是Netty核心Reactor处理器,实现原理是什么,进而再讨论Netty对其的实现及使用上我们可以学到哪些。
EventLoop是一个Reactor模型的事件处理器,一个EventLoop对应一个线程,其内部会维护一个selector和taskQueue,负责处理客户端请求和内部任务,内部任务如ServerSocketChannel注册、ServerSocket绑定和延时任务处理等操作。
EventLoop是由事件驱动的,比如IO事件和任务等,IO事件即selectionKey中ready的事件,如accept、connect、read、write
等,由processSelectedKeys方法触发。处理完请求时间之后,会处理内部添加到taskQueue中的任务,如register0、bind0
等任务,由runAllTasks方法触发。注意NioEventLoop在Linux中默认底层是基于epoll机制。
上图是EventLoop的核心流程图,如果从Netty整体视角看EventLoop的事件流转,下图来的更直观:
注意:bossGroup和WorkerGroup中的NioEventLoop流程是一致的,只不过前者处理Accept事件之后将连接注册到后者,由后者处理该连接上后续的读写事件。
大致了解了NioEventLoop之后,不知道有没有小伙伴有这样的疑问,为什么Netty要这样实现呢,这种实现方案对于我们后续开发如何借鉴呢?关于这些疑问,本文最后讨论哈 :)
EventLoop实现原理
EventLoop是一个Reactor模型的事件处理器,一个EventLoop对应一个线程,其内部会维护一个selector和taskQueue,负责处理IO事件和内部任务。IO事件和内部任务执行时间百分比通过ioRatio来调节,ioRatio表示执行IO时间所占百分比。任务包括普通任务和已经到时的延迟任务,延迟任务存放到一个优先级队列PriorityQueue中,执行任务前从PriorityQueue读取所有到时的task,然后添加到taskQueue中,最后统一执行task。
事件处理机制
EventLoop是由事件驱动的,比如IO事件即selectionKey中ready的事件,如accept、connect、read、write
等,处理的核心逻辑主要是在NioEventLoop.run
方法中,流程如下:
1protected void run() {
2 for (;;) {
3 /* 如果hasTasks,则调用selector.selectNow(),非阻塞方式获取channel事件,没有channel事件时可能返回为0。这里用非阻塞方式是为了尽快获取连接事件,然后处理连接事件和内部任务。*/
4 switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
5 case SelectStrategy.CONTINUE:
6 continue;
7 case SelectStrategy.SELECT:
8 select(wakenUp.getAndSet(false));
9 if (wakenUp.get()) {
10 selector.wakeup();
11 }
12 default:
13 }
14
15 cancelledKeys = 0;
16 needsToSelectAgain = false;
17 /* ioRatio调节连接事件和内部任务执行事件百分比
18 * ioRatio越大,连接事件处理占用百分比越大 */
19 final int ioRatio = this.ioRatio;
20 if (ioRatio == 100) {
21 try {
22 processSelectedKeys();
23 } finally {
24 runAllTasks();
25 }
26 } else {
27 final long iostartTime = System.nanoTime();
28 try {
29 processSelectedKeys();
30 } finally {
31 final long ioTime = System.nanoTime() - ioStartTime;
32 runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
33 }
34 }
35 }
36}
从代码上,在执行select()
前有一个hasTasks()
的操作,这个hasTasks()
方法判断当前taskQueue是否有元素。如果taskQueue中有元素,执行 selectNow() 方法,最终执行selector.selectNow()
,该方法会立即返回,保证了EventLoop在有任务执行时不会因为IO事件迟迟不来造成延后处理,这里优先处理IO事件,然后再处理任务。
如果当前taskQueue没有任务时,就会执行select(wakenUp.getAndSet(false))
方法,代码如下:
1/* 这个方法解决了Nio中臭名昭著的bug:selector的select方法导致空轮询 cpu100% */
2private void select(boolean oldWakenUp) throws IOException {
3 Selector selector = this.selector;
4 try {
5 int selectCnt = 0;
6 long currentTimeNanos = System.nanoTime();
7
8 /* delayNanos(currentTimeNanos):计算延迟任务队列中第一个任务的到期执行时间(即最晚还能延迟多长时间执行),默认返回1s。每个SingleThreadEventExecutor都持有一个延迟执行任务的优先队列PriorityQueue,启动线程时,往队列中加入一个任务。*/
9 long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
10 for (;;) {
11 /* 如果延迟任务队列中第一个任务的最晚还能延迟执行的时间小于500000纳秒,且selectCnt == 0(selectCnt 用来记录selector.select方法的执行次数和标识是否执行过selector.selectNow()),则执行selector.selectNow()方法并立即返回。*/
12 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
13 if (timeoutMillis <= 0) {
14 if (selectCnt == 0) {
15 selector.selectNow();
16 selectCnt = 1;
17 }
18 break;
19 }
20
21 if (hasTasks() && wakenUp.compareAndSet(false, true)) {
22 selector.selectNow();
23 selectCnt = 1;
24 break;
25 }
26
27 // 超时阻塞select
28 int selectedKeys = selector.select(timeoutMillis);
29 selectCnt ++;
30 System.out.println(selectCnt);
31
32 // 有事件到来 | 被唤醒 | 有内部任务 | 有定时任务时,会返回
33 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
34 break;
35 }
36
37 long time = System.nanoTime();
38 if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
39 // 阻塞超时后没有事件到来,重置selectCnt
40 selectCnt = 1;
41 } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
42 selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
43 // Selector重建
44 rebuildSelector();
45 selector = this.selector;
46 // Select again to populate selectedKeys.
47 selector.selectNow();
48 selectCnt = 1;
49 break;
50 }
51 currentTimeNanos = time;
52 }
53 } catch (CancelledKeyException e) {
54 // Harmless exception - log anyway
55 }
56}
当java NIO bug触发时,进行Selector重建,rebuildSelector过程如下:
通过方法openSelector创建一个新的selector。
将old selector的selectionKey执行cancel。
将old selector的channel重新注册到新的selector中。
Netty的连接处理就是IO事件的处理,IO事件包括读事件、ACCEPT事件、写事件和OP_CONNECT事件:
ACCEPT事件:连接建立好之后将该连接的channel注册到workGroup中某个NIOEventLoop的selector中;
READ事件:从channel中读取数据,存放到byteBuf中,触发后续的ChannelHandler来处理数据;
WRITE事件:正常情况下一般是不会注册写事件的,如果Socket发送缓冲区中没有空闲内存时,在写入会导致阻塞,此时可以注册写事件,当有空闲内存(或者可用字节数大于等于其低水位标记)时,再响应写事件,并触发对应回调。
CONNECT事件:该事件是client触发的,由主动建立连接这一侧触发的。
任务处理机制
任务处理也就是处理内部任务,这里也包括延时任务,延时任务到时后会移动到taskQueue然后被执行。任务处理是在IO事件处理之后进行的,IO事件和内部任务执行时间百分比可以通过ioRatio来调节,ioRatio表示执行IO时间所占百分比。
1/* timeoutNanos:任务执行花费最长耗时/
2protected boolean runAllTasks(long timeoutNanos) {
3 // 把scheduledTaskQueue中已经超过延迟执行时间的任务移到taskQueue中等待被执行。
4 fetchFromScheduledTaskQueue();
5
6 // 非阻塞方式pollTask
7 Runnable task = pollTask();
8 if (task == null) {
9 afterRunningAllTasks();
10 return false;
11 }
12
13 final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
14 long runTasks = 0;
15 long lastExecutionTime;
16 for (;;) {
17 // 执行task
18 safeExecute(task);
19 runTasks ++;
20 // 依次从taskQueue任务task执行,每执行64个任务,进行耗时检查。
21 // 如果已执行时间超过预先设定的执行时间,则停止执行非IO任务,避免非IO任务太多,影响IO任务的执行。
22 if ((runTasks & 0x3F) == 0) {
23 lastExecutionTime = ScheduledFutureTask.nanoTime();
24 if (lastExecutionTime >= deadline) {
25 break;
26 }
27 }
28
29 task = pollTask();
30 if (task == null) {
31 lastExecutionTime = ScheduledFutureTask.nanoTime();
32 break;
33 }
34 }
35 afterRunningAllTasks();
36 this.lastExecutionTime = lastExecutionTime;
37 return true;
38}
39
注意,任务的处理过程中有个执行一定量任务后的执行时间耗时检查动作,这里是为了避免任务的处理时间过长,影响Netty网络IO的处理效率,毕竟Netty是要处理大量网络IO的。
对于NioEventLoop实现的思考
EventLoop是一个Reactor模型的事件处理器,一个EventLoop对应一个线程,其内部会维护一个selector和taskQueue,负责处理网络IO请求和内部任务,这里的selector和taskQueue是线程内部的。
Netty的BossGroup和WorkerGroup可能包含多个EventLoop,BossGroup接收到请求之后轮询交给WorkerGroup中的其中一个线程(对应一个NioEventLoop)来处理,也就是连接之间的处理是线程独立的,这也就是NioEventLoop流程的无锁化设计。
从EventLoop“无锁化”设计和常见的锁机制对比来看,要实现线程并发安全,有两种实现策略:
数据隔离:数据隔离就是数据产生后就提交给不同的线程来处理,线程内部一般有一个数据容器来保存待处理的数据,这里的提交动作需要保证是安全的,比如Netty的BossGroup将建立好的连接注册到WorkerGroup时,是由内核来保证线程安全的(比如Linux就是通过epoll_ctl方法,该方法是线程安全的);
数据分配:数据产生后统一放在数据容器中,由数据消费线程自己来获取数据进行处理,这里的获取动作需要保证是安全的,一般通过锁机制来保护,比如Java线程池中线程从阻塞队列中获取任务进行执行,就是由阻塞队列保证线程安全。
对于数据隔离和数据分配来说,二者都有优缺点及适用场景。对于数据隔离来说,一般“锁”交互少成本较低,并且其隔离性较好,线程内部如果有新数据产生还继续由该线程来处理,但是可能造成数据负载不均衡;对于数据分配来说,“锁”交互较多,但是由于数据处理线程都是从同一数据容器消费数据,所以不会出现数据处理负载不均衡问题。
如果想实现类似EventLoop中单个线程对应一个处理队列的方案,可以使用只配置一个线程的Java线程池,达到类似的实现效果。
推荐阅读
以上是关于从Netty EventLoop实现上可以学到什么的主要内容,如果未能解决你的问题,请参考以下文章