Netty核心组件之NioEventLoop(一)
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty核心组件之NioEventLoop(一)相关的知识,希望对你有一定的参考价值。
参考技术A 在接下来几篇文章,我会通过Netty的源码深入讲解NioEventLoop的实现机制。
特别说明:基于4.1.52版本的源码
先来看下NioEventLoop的类关系图和重要的属性,对其有一个整体的感知,便于后面详细分析。
首先来看NioEventLoop的构造函数
默认情况下,会创建MPSC,即多生产者单消费者的队列,这里最终会用到JCTools库,这里不过多介绍,感兴趣的可以自己去了解。
如果设置了优化开关(默认优化选项是开启的),则通过反射的方式从Selector中获取selectedKeys和publicSelectedKeys,将这两个成员设置为可写,通过反射,使用Netty构造的selectedKeySet将原生JDK的selectedKeys替换掉。
我们知道使用Java原生NIO接口时,需要先调Selector的select方法,再调selectedKeys方法才可以获得有IO事件准备好的SelectionKey集合。这里优化过后,只通过一步select调用,就可以从selectedKeySet获得需要的SelectionKey集合。
另外,原生Java的SelectionKey集合是一个HashSet,这里优化过后的SelectedSelectionKeySet底层是一个数组,效率更高。
EventLoop的职责可以用下面这张图形象的表示
下面详细解析:
每次循环,都会检测任务队列和IO事件,如果任务队列中没有任务,则直接返回SelectStrategy.SELECT;如果任务队列中有任务,则会调用非阻塞的 selectNow 检测有IO事件准备好的Channel数。
nextScheduledTaskDeadlineNanos 方法返回下一个将要被执行的定时任务的截止时间
NioEventLoop的定时任务队列是一个优先级队列,队列中存储的是ScheduledFutureTask对象
通过ScheduledFutureTask的 compareTo 方法可以看出,优先级队列中的元素是以任务的截止时间来排序的,队首元素的截止时间最小,当截止时间相同时,以任务ID排序,ID小的排在前面。
当定时任务ScheduledFutureTask执行后,会根据 periodNanos 的取值决定是否要将任务重新放回队列。从netty的注释可以清晰看到:
看下ScheduledFutureTask的 run 方法
当任务的执行时间还未到,则判断任务是否已经取消,如果已取消则移除任务,否则重新加入队列。对于只执行一次的任务,执行完了不会再放回队列。其他的任务,则根据 periodNanos 的类型,重新计算截止时间,重新放回队列,等待下次调度。
定时任务的优先级队列到此介绍完毕,接着看NioEventLoop的 run 方法
在调用 select 之前,再次调用 hasTasks() 判断从上次调用该方法到目前为止是否有任务加入,多做了一层防护,因为调用 select 时,可能会阻塞,这时,如果任务队列中有任务就会长时间得不到执行,所以须小心谨慎。
如果任务队列中还是没有任务,则会调用 select 方法。在这个方法中会根据入参 deadlineNanos 来选择调用NIO的哪个select方法:
到这里,可能有人要问了: 在上面的方法中,如果调用了Java NIO的无参的 select 方法,就会进入阻塞,除非检测到Channel的IO事件,那么在检测到IO事件之前,加入到任务队列中的任务怎么得到执行呢?
好,你想,在检测到IO事件之前,可以退出阻塞的方法是什么?对,调用 wakeup 方法。那么我们来搜一下NioEventLoop中有调用Selector的 wakeup 方法的地方吗:
还真搜到了,再看一下这个方法被调用的地方
看到SingleThreadEventExecutor的 execute 方法了吗,就是说在调 execute 方法,向EventLoop提交任务时,会将EventLoop线程从Java NIO的select阻塞中唤醒。
到这里,NioEventLoop的run方法的职责之一:检测Channel的IO事件就讲解完毕。
至于IO事件的处理以及任务队列中任务的处理会在后面的文章中解析,敬请期待。
在本文中,对Netty的NioEventLoop进行了深入的解读,并且详细讲解了它的三大职责之一:检测Channel的IO事件的机制。
NioEventLoop是Netty最核心的概念,内部运行机制很复杂,在接下来的两篇文章中会继续分析。
netty源码之NioEventLoop
目录
1、NioEventLoopGroup最终都是调用父类MultithreadEventLoopGroup的构造器
3、newChild()方法实例化的是NioEventLoop类型的EventLoop对象
1、客户端在通过bootstrap启动时会创建一个Channel实例并进行初始化,在初始化过程中会绑定一个NioEventLoop
2、NioEventLoopGroup调用父类的register()方法将获取一个EventLoop与通道绑定
4、调用AbstractChannel的register0()方法最终完成通道的注册
一、初始化
ServerBootstrap b = new ServerBootstrap();
EventLoopGroup bossGroup = new NioEventLoopGroup(); //创建多线程线程池
EventLoopGroup workGroup = new NioEventLoopGroup(); //创建多线程线程池
b.group(bossGroup,workGroup);
服务端的acceptor ,处理IO事件分两种:监听请求建立连接 和 处理连接 。
在reactor多线程模型下 , 两种事件都是有同一个线程处理。
在reactor主从线程模型下 , bossGroup负责处理监听连接事件 , workerGroup负责处理连接
1、NioEventLoopGroup最终都是调用父类MultithreadEventLoopGroup的构造器
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}
2、通过父类MultiThreadEventExecutorGroup构造器创建NioEventLoopGroup实例,内部维护了一个SingleThreadEventExecutor类型的数组,通过newChild()方法进行实例化
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
// 创建一个线程池
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
// 创建一个NioEventLoop数组
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
children[i] = newChild(executor, args);
}
// 生成一个线程选择器
chooser = chooserFactory.newChooser(children);
}
3、newChild()方法实例化的是NioEventLoop类型的EventLoop对象
NioEventLoop类继承了SingleThreadEventExecutor类 , 该类种有一个thread属性 ,用以绑定本地线程
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
4、NioEventLoop 打开一个selector
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
二、EventLoop 与 Channel的关系
1、客户端在通过bootstrap启动时会创建一个Channel实例并进行初始化,在初始化过程中会绑定一个NioEventLoop
-> AbstractBootstrap.initAndRegister()
-> channelFactory.newChannel()
-> group().register(channel)
-> MultiThreadEventLoopGroup.regiser()
2、NioEventLoopGroup调用父类的register()方法将获取一个EventLoop与通道绑定
public ChannelFuture register(Channel channel) {
return this.next().register(channel);
}
3、最终调用了 AbstractChannel#AbstractUnsafe.register 后完成了 Channel 和 EventLoop 的关联,将获取的EventLoop值赋值给AbstractChannel内的一个eventLoop属性
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// 删除条件检查.
...
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new OneTimeTask() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
...
}
}
}
4、调用AbstractChannel的register0()方法最终完成通道的注册
private void register0(ChannelPromise promise) {
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (firstRegistration && isActive()) {
pipeline.fireChannelActive();
}
}
protected void doRegister() throws Exception {
// 省略错误处理
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
}
以上是关于Netty核心组件之NioEventLoop(一)的主要内容,如果未能解决你的问题,请参考以下文章