Netty IO事件与任务处理
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty IO事件与任务处理相关的知识,希望对你有一定的参考价值。
参考技术A 在实例化NioEventLoopGroup时,默认会创建2倍CPU核心数的NioEventLoop。对于bossGroup来说,虽然会创建这么多NioEventLoop,但是如果只绑定一个端口进行事件监听,实际上只会用到一个NioEventLoop,也就是说只有一个线程在循环处理事件与任务。NioEventLoopGroup UML图:
NioEventLoopGroup无参构造方法最终会调到下面的构造方法:
NioEventLoopGroup构造方法中会调用其父类MultithreadEventLoopGroup的构造方法,该构造方法会初始化默认的线程数量,常量DEFAULT_EVENT_LOOP_THREADS值为2倍CPU核心数:
MultithreadEventLoopGroup构造方法中会调用其父类MultithreadEventExecutorGroup的构造方法,该构造方法会初始化线程执行选择器工厂,常量DefaultEventExecutorChooserFactory.INSTANCE值为EventExecutorChooserFactory:
最终会调用MultithreadEventExecutorGroup如下构造方法,在该方法中会循环创建NioEventLoop:
线程执行器ThreadPerTaskExecutor#execute方法内部使用ThreadFactory来创建并启动线程,其中ThreadFactory就是调用其构造方法传入的DefaultThreadFactory,DefaultThreadFactory#newThread方法会创建线程,并设置线程属性,如线程名称等:
调用NioEventLoopGroup#newChild方法进行NioEventLoop的创建:
方法内部会调用NioEventLoop的构造方法进行创建:
该构造方法中有两个比较重要的操作:一是调用父类构造方法创建任务队列等,二是调用openSelector方法创建IO多路复用器,我们先看openSelector方法:
该方法会创建Java Selector,并通过反射替换其对应的selectedKeys、publicSelectedKeys属性。这里Netty对原生Selector数据结构进行了优化,由原本HashSet实现的数据结构替换为了Netty基于数组实现的数据结构:SelectedSelectionKeySet。
接下来继续跟进,该构造方法中会调用父类的构造方法,对父类属性进行初始化,先看下NioEventLoop UML图:
NioEventLoop父类为SingleThreadEventLoop,在其父类构造方法中会创建一个MpscQueue类型的队列:
SingleThreadEventLoop构造方法中会调用其父类SingleThreadEventExecutor的构造方法:
服务端NioEventLoop中的线程启动是在channel注册时触发的,我们再来回顾下:
NioEventLoop#execute方法实现在其父类SingleThreadEventExecutor中:
execute方法会主要分为三步,第一步:调用startThread方法创建并启动线程;第二步:将任务添加到任务队列中;第三步:唤醒线程执行任务。先看下startThread方法:
startThread方法最终会调用doStartThread方法来启动线程:
调用executor#execute方法创建并启动线程,这里的executor类型为:ThreadPerTaskExecutor,是实例化NioEventLoopGroup时,在其父类MultithreadEventExecutorGroup中创建的,ThreadPerTaskExecutor#execute方法内部通过线程工厂创建并启动线程。线程启动后主要做两件事情,一:保存创建的线程,二:处理IO事件与异步任务(后面会讲到)。
线程启动后,会调用addTask方法,将任务放到任务队列中,等待线程处理:
addTask方法会尝试将任务放入taskQueue中,如果放入失败则会触发拒绝策略。
任务添加到taskQueue中后,会调用wakeup方法唤醒线程,因为此时线程可能因为没有任务而进入到阻塞状态,使用wakeup方法可以将线程从阻塞中唤醒,处理任务。
NioEventLoop中的线程启动后,会一直循环处理IO事件与异步任务:
该方法会调用其子类NioEventLoop的run方法,看下NioEventLoop的run方法:
IO事件与任务处理主要分为三步,第一:检测IO事件,第二:处理IO事件,第三:处理普通任务与定时任务。
通过SelectStrategy#calculateStrategy方法计算走什么策略,SelectStrategy是在创建NioEventLoop时通过IO多路复用器策略工厂DefaultSelectStrategyFactory进行创建的:
hasTasks值为true,标识taskQueue或者tailQueue中有任务,则调用IntSupplier#get方法,该方法内部会调用Selector#selectNow方法,selectNow方法是一个非阻塞方法,不管有没有IO事件都会立即返回。如果任务队列中没有任务,则直接返回SelectStrategy.SELECT。
如果SelectStrategy#calculateStrategy方法返回SelectStrategy.SELECT,则会尝试将wakenUp属性设置为false,并调用select方法。因为Selector#wakeup方法是一个比较耗时的操作,而用户线程和IO线程都有可能操作该属性,因此使用原子操作防止多个线程重复唤醒。接着看下select方法:
processSelectedKeys方法,用来处理所有IO事件:
Netty默认会开启对Selector的优化,所以会进入processSelectedKeysOptimized方法处理IO事件:
最终会调用processSelectedKey方法进行IO事件的处理:
方法内部会调用Unsafe类的方法进行处理,以OP_ACCEPT事件为例,我们看下其read方法,OP_ACCEPT事件的read方法是在AbstractNioMessageChannel类的内部类NioMessageUnsafe中:
NioMessageUnsafe#read方法主要做了两件事,一是:调用NioserverSocketChannel#doReadMessages方法处理事件,二是:调用ChannelPipeline发送channelRead、channelReadComplete事件。
NioServerSocketChannel#doReadMessages方法中会调用Java的ServerSocketChannel方法建立连接:
到此IO事件处理流程就结束了,真正的事件处理还是由不同的Unsafe类调用对应的channel中的方法来进行处理。
runAllTasks方法用来处理普通任务与定时任务:
runAllTasks方法还是比较容易理解,方法主要做了两件事,一是:将可调度的定时任务从scheduledTaskQueue队列放入到taskQueue队列中,然后循环取出taskQueue中的任务,执行其run方法。
netty之NioEventLoop事件循环处理
参考技术A NioEventLoop的事件循环处理,就是在一个死循环中处理IO事件和队列里的任务,并且可以根据策略来平衡这两者之间的执行比例。首先,先来看下selectStrategy,netty中只有一个默认实现
这个策略,若是当前有任务,那么返回selectNow()方法的返回值,若是没有任务,则返回SelectStrategy.SELECT(-1)。
因此接下来的swtich语句块中只会有一种情况,就是值为-1时,表示没有任务。但是并不是就进入无限的阻塞状态select()方法中,还会判断队列是否有定时任务要执行,若有,则计算到下一次定时任务的时间间隔,并传给select()方法中,表示超时时间,这个是为了防止一直在select等待,而没有及时的执行定时任务。
这个超时时间还会设置到原子变量nextWakeupNanos中,这样应用程序就可以通过nextWakeupNanos获取到下一次线程唤醒的时间。当线程唤醒后,程序finally会执行nextWakeupNanos.lazySet(AWAKE),表示线程目前是唤醒状态。这个变量的主要作用是当线程阻塞在select方法时,而此时又有任务提交给这个NioEventLoop执行时
唤醒selector时,会先判断inEventLoop,因为若是inEventLoop,就是目前的任务正在被NioEventLoop的线程执行,并没有阻塞在selector的select方法,还有会对nextWakeupNanos的值设置为AWAKE唤醒状态,若该变量值之前就是唤醒的,那么也不会唤醒selector。
现在,把流程又回到刚刚的事件循环run方法中,当select方法返回后,要执行selectKeys和任务时,会先判断ioRatio这个参数,这个表示的是在当前循环中处理IO事件的时间与任务的比例
在每次的循环最后,会判断NioEventLoop是否shutdown了,若关闭了,则将Selector上的key都cancel,并关闭channel。
以上是关于Netty IO事件与任务处理的主要内容,如果未能解决你的问题,请参考以下文章