一文读懂Netty线程模型分析

Posted 春丸

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一文读懂Netty线程模型分析相关的知识,希望对你有一定的参考价值。

转载自: https://hengyoush.github.io/netty/2019/08/03/netty-threadmodel-analysis.html

概述

本文通过Netty源码分析Netty的线程模型.

我们先来概述每个类和接口的作用:


  1. EventExecutorGroup: 负责通过 next方法提供 EventExecutor, 除此之外, 还提供了一系列方法用于管理 EventExecutor的生命周期.



  2. EventExecutor: 一种特殊的 EventExecutorGroup, 它的 next方法返回它自己. 它提供了 inEventLoop方法判断一个线程是否在 eventLoop中.



  3. EventLoopGroup: EventExecutor的子接口, 可以注册一个Channel到 EventLoopGroup上, 并且可通过 next方法产生 EventLoop.



  4. OrderedEventExecutor: 标记接口, 表示使用顺序执行处理提交的task( EventExecutorsubmit提交的task)



  5. EventLoop: 提供parent方法用于返回所属的 EventLoopGroup.



  6. AbstractEventExecutor: 对ExecutorService的基本实现, 不支持定时调度功能(即: ScheduledExecutorService接口功能)



  7. AbstractScheduledEventExecutor: 继承自 AbstractEventExecutor, 增加定时调度功能, 内含一个优先级队列.



  8. SingleThreadEventExecutor: 继承自 AbstractScheduledEventExecutor, 内部含有一个task队列, 用于存放外部执行请求.



  9. SingleThreadEventLoop: 继承自 SingleThreadEventExecutor, 实现了 EventLoop接口, 增加了注册channel的功能.



  10. NioEventLoop: 继承自 SingleThreadEventLoop, 实现了 SingleThreadEventExecutor的run方法, 内含一个事件循环, 使用NIO selector执行IO任务和非IO任务.



  11. AbstractEventExecutorGroup: 使用next方法委托给子EventExecutor执行相关方法.



  12. MultithreadEventExecutorGroup: 主要含有 EventExecutorChooser, 是next方法具体执行的策略



  13. MultithreadEventLoopGroup: 主要增加注册Channel的功能



  14. NioEventLoopGroup: 实现newChild方法, 构造子EventLoop, 即NioEventLoop.


Reactor模式简介

一文读懂Netty线程模型分析

下面解释一下这张图片里几个组件的作用.


  1. Handle: 可以是任意的资源, Selector在这些资源上等待事件发生. 例如一个电典型的客户端/服务端程序, 一个服务端保持着多个客户端的连接, 这种连接即Handle, Selector在这些连接上等待读写等事件的发生.



  2. Selecotr: 经典的Reacotr模式中, 它叫做 SynchronousEventDemultiplexer(同步事件分离器). 它在一个handle_set上等待事件的发生.



  3. Initiation Dispatcher: 定义了一系列管理Event Handler的接口, 包括注册, 注销Event Handler.一般来说, Selector在返回后会通知Initiation Dispatcher ,然后Initiation Dispatcher回调应用指定的Handler.



  4. Event Handler: 定义了用于事件发生时的回调方法, 来处理这些事件.


Reactor模式流程

在此我们将以最经典的Reactor模型来阐述Reactor模式的工作流程.

  1. MainReactor: 对应于Initiation Dispatcher, 它只有一个Handler, 即Acceptor作为注册的Event Handler.在MainReactor包含一个Event Loop, 该Loop调用select方法, , 当有新的连接请求到来, 调用Acceptor的回调方法进行处理.

  2. Acceptor: 初始化时, 注册到MainReactor中, 并且指定监听的端口(监听的端口即Handle). Acceptor处理连接事件的逻辑是: 将新的连接注册到SubReacotr上, 并且指定监听事件类型是可读/可写等, 这样该连接的后续事件都将由SubReactor进行处理.

  3. SubReactor: 负责分配读写事件给注册到该SubReactor的Handler, 一个SubReactor可同时管理多个连接(Handle).

  4. ThreadPool: 负责处理非IO任务.

Netty的线程模型

如上所示: BossEventLoopGroup相当于MainReactor, 它用来处理新的连接请求, 一旦有新的连接请求到来, BossEventLoopGroup相当于MainReactor会将这个请求分配给ServerBootstrapAcceptor, ServerBootstrapAcceptor将会把这个新的连接注册到ChildEventLoopGroup中的一个EventLoop中, 后续的读写事件都由这个EventLoop进行处理.

关键源码分析

一. ServerBootstrapAcceptor作用分析

ServerBootstrap#bind(int inetPort)
- doBind(localAddress)
- initAndRegister()
- ServerBootstrap#init(channel)

如下是 ServerBootstrap#init(channel)中关于Acceptor创建的代码:

 
   
   
 
  1. void init(Channel channel) throws Exception {

  2. // 省略部分代码

  3. ...


  4. p.addLast(new ChannelInitializer<Channel>() {

  5. @Override

  6. public void initChannel(final Channel ch) throws Exception {

  7. final ChannelPipeline pipeline = ch.pipeline();

  8. ChannelHandler handler = config.handler();

  9. if (handler != null) {

  10. pipeline.addLast(handler);

  11. }


  12. ch.eventLoop().execute(new Runnable() {

  13. @Override

  14. public void run() {

  15. pipeline.addLast(new ServerBootstrapAcceptor(

  16. ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));

  17. }

  18. });

  19. }

  20. });

  21. }

init方法参数的channel即即将注册到BossEventLoop中到channel, 负责监听本地端口, 此时在init方法内还在进行channel以及channel对应到pipeline到构造.此处即为channel的pipeline添加了一个 ServerBootstrapAcceptor.

ServerBootstrapAcceptorChannelInboundHandlerAdapter的子类, 即它是处理入站事件的处理器.

我们依次解释一下构造 ServerBootstrapAcceptor所需的参数:

  1. ch: 即ServerSocketChannel

  2. currentChildGroup: ServerBootstrap中的currentChildGroup, 在构造ServerBootstrap中的currentChildGroup时传入

  3. currentChildOptions: 子Channel的Option

  4. currentChildAttrs: 子Channel的AttributeKey

下面查看 ServerBootstrapAcceptor的源码, 可以想到 ServerBootstrapAcceptor既然是Acceptor那么它一定有将新的连接请求注册到childEventLoopGroup中的逻辑.

 
   
   
 
  1. // ServerBootstrapAcceptor#channelRead

  2. public void channelRead(ChannelHandlerContext ctx, Object msg) {

  3. final Channel child = (Channel) msg;

  4. // childHandler

  5. child.pipeline().addLast(childHandler);


  6. setChannelOptions(child, childOptions, logger);


  7. for (Entry<AttributeKey<?>, Object> e: childAttrs) {

  8. child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());

  9. }


  10. try {

  11. childGroup.register(child).addListener(new ChannelFutureListener() {

  12. @Override

  13. public void operationComplete(ChannelFuture future) throws Exception {

  14. if (!future.isSuccess()) {

  15. forceClose(child, future.cause());

  16. }

  17. }

  18. });

  19. } catch (Throwable t) {

  20. forceClose(child, t);

  21. }

  22. }

处理逻辑如下:

  1. 将msg强制转型为Channel(关于这里为什么可以正常转型我们在其他文章中详述), 然后添加子Channel对应的Handler(很可能是我们配置bootstrap时的ChannelInitializer)

  2. 设置Channel的Option和Attr

  3. 最关键到来了: 将子Channel注册到childGroup上.

二. ChildGroup的注册Channel流程

MultithreadEventLoopGroup#register(Channel channel)
-> SingleThreadEventLoop#register(Channel channel)
-> register(finalChannelPromisepromise)
-> Unsafe#register(EventLoop eventLoop, final ChannelPromise promise)
-> register0(ChannelPromisepromise)
-> AbstractNioUnsafe#doRegister()

 
   
   
 
  1. // MultithreadEventLoopGroup#register

  2. @Override

  3. public ChannelFuture register(Channel channel) {

  4. return next().register(channel);

  5. }

next方法以近似于一种轮训的方式获取group中的下一个EventLoop, 进行实际的注册.

 
   
   
 
  1. public ChannelFuture register(Channel channel) {

  2. return register(new DefaultChannelPromise(channel, this));

  3. }


  4. public ChannelFuture register(final ChannelPromise promise) {

  5. promise.channel().unsafe().register(this, promise);

  6. return promise;

  7. }

可以看到最终还是调用了Unsafe类的register方法.

 
   
   
 
  1. private void register0(ChannelPromise promise) {

  2. try {

  3. if (!promise.setUncancellable() || !ensureOpen(promise)) {

  4. return;

  5. }

  6. boolean firstRegistration = neverRegistered;

  7. doRegister();

  8. neverRegistered = false;

  9. registered = true;


  10. pipeline.invokeHandlerAddedIfNeeded();


  11. safeSetSuccess(promise);

  12. pipeline.fireChannelRegistered();

  13. if (isActive()) {

  14. if (firstRegistration) {

  15. pipeline.fireChannelActive();

  16. } else if (config().isAutoRead()) {

  17. beginRead();

  18. }

  19. }

  20. } catch (Throwable t) {

  21. closeForcibly();

  22. closeFuture.setClosed();

  23. safeSetFailure(promise, t);

  24. }

  25. }

上述代码的逻辑如下:

  1. 首先确保Channel没有close

  2. 调用doRegister

  3. 调用 pipeline.invokeHandlerAddedIfNeeded(), 因为在channel注册完成之前可能有handler加入到pipeline中, 我们需要等待注册完毕才触发 HandlerAddedI.

  4. 只有第一次注册时才触发channel active事件.

下面我我们看 doRegister的逻辑, doRegister在AbstractUnsafe中是一个空实现, 我们看 AbstractNioUnsafe的实现如下:

 
   
   
 
  1. protected void doRegister() throws Exception {

  2. boolean selected = false;

  3. for (;;) {

  4. try {

  5. selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);

  6. return;

  7. } catch (CancelledKeyException e) {

  8. if (!selected) {

  9. eventLoop().selectNow();

  10. selected = true;

  11. } else {

  12. throw e;

  13. }

  14. }

  15. }

  16. }

实际上就是调用了javaChannel的register方法.
注意这里的异常处理, 这里catch了 CancelledKeyException, 这里有可能发生该异常的原因是该channel已经注册在selector上了,但是当前channle对应的selectionKey已经取消了, 如果再次调用register方法的话会造成此异常. 所以此处再次selectNow, 因为selectionKey的cancel方法会将当前key放入selector的cancel-set中,只有再一次select才会从selector中移除.


以上是关于一文读懂Netty线程模型分析的主要内容,如果未能解决你的问题,请参考以下文章

Netty之Reactor线程模型概述

一文读懂高性能网络编程中的I/O模型

Java 并发编程一文读懂线程协程守护线程

一文读懂JDK源码:ThreadPoolExecutor

一文读懂Redis中的多路复用模型

一文读懂Linux任务间调度原理和整个执行过程