详细图解Netty Reactor启动全流程 | 万字长文 | 多图预警
Posted 热爱编程的大忽悠
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了详细图解Netty Reactor启动全流程 | 万字长文 | 多图预警相关的知识,希望对你有一定的参考价值。
详细图解Netty Reactor启动全流程 | 万字长文 | 多图预警
本系列Netty源码解析文章基于 4.1.56.Final版本
大家第一眼看到这幅流程图,是不是脑瓜子嗡嗡的呢?
大家先不要惊慌,问题不大,本文笔者的目的就是要让大家清晰的理解这幅流程图,从而深刻的理解Netty Reactor的启动全流程,包括其中涉及到的各种代码设计实现细节。
在上篇文章《聊聊Netty那些事儿之Reactor在Netty中的实现(创建篇)》中我们详细介绍了Netty服务端核心引擎组件主从Reactor组模型 NioEventLoopGroup
以及Reactor模型 NioEventLoop
的创建过程。最终我们得到了netty Reactor模型的运行骨架如下:
现在Netty服务端程序的骨架是搭建好了,本文我们就基于这个骨架来深入剖析下Netty服务端的启动过程。
我们继续回到上篇文章提到的Netty服务端代码模板中,在创建完主从Reactor线程组:bossGroup
,workerGroup
后,接下来就开始配置Netty服务端的启动辅助类ServerBootstrap
了。
public final class EchoServer
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception
// Configure the server.
//创建主从Reactor线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//配置主从Reactor
.channel(NioserverSocketChannel.class)//配置主Reactor中的channel类型
.option(ChannelOption.SO_BACKLOG, 100)//设置主Reactor中channel的option选项
.handler(new LoggingHandler(LogLevel.INFO))//设置主Reactor中Channel->pipline->handler
.childHandler(new ChannelInitializer<SocketChannel>() //设置从Reactor中注册channel的pipeline
@Override
public void initChannel(SocketChannel ch) throws Exception
ChannelPipeline p = ch.pipeline();
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
);
// Start the server. 绑定端口启动服务,开始监听accept事件
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
finally
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
在上篇文章中我们对代码模板中涉及到ServerBootstrap
的一些配置方法做了简单的介绍,大家如果忘记的话,可以在返回去回顾一下。
ServerBootstrap类
其实没有什么特别的逻辑,主要是对Netty启动过程中需要用到的一些核心信息进行配置管理,比如:
- Netty的核心引擎组件
主从Reactor线程组:bossGroup,workerGroup
。通过ServerBootstrap#group方法
配置。 - Netty服务端使用到的Channel类型:
NioServerSocketChannel
,通过ServerBootstrap#channel方法
配置。以及配置NioServerSocketChannel
时用到的SocketOption
。SocketOption
用于设置底层JDK NIO Socket的一些选项。通过ServerBootstrap#option方法
进行配置。
主ReactorGroup中的MainReactor管理的Channel类型为
NioServerSocketChannel
,如图所示主要用来监听端口,接收客户端连接,为客户端创建初始化NioSocketChannel
,然后采用round-robin
轮询的方式从图中从ReactorGroup中选择一个SubReactor与该客户端NioSocketChannel
进行绑定。
从ReactorGroup中的SubReactor管理的Channel类型为
NioSocketChannel
,它是netty中定义客户端连接的一个模型,每个连接对应一个。如图所示SubReactor负责监听处理绑定在其上的所有NioSocketChannel
上的IO事件。
- 保存服务端
NioServerSocketChannel
和客户端NioSocketChannel
对应pipeline
中指定的ChannelHandler
。用于后续Channel向Reactor注册成功之后,初始化Channel里的pipeline。
不管是服务端用到的
NioServerSocketChannel
还是客户端用到的NioSocketChannel
,每个Channel实例
都会有一个Pipeline
,Pipeline
中有多个ChannelHandler
用于编排处理对应Channel
上感兴趣的IO事件
。
ServerBootstrap
结构中包含了netty服务端程序启动的所有配置信息,在我们介绍启动流程之前,先来看下ServerBootstrap
的源码结构:
ServerBootstrap
ServerBootstrap
的继承结构比较简单,继承层次的职责分工也比较明确。
ServerBootstrap
主要负责对主从Reactor线程组
相关的配置进行管理,其中带child前缀的配置方法
是对从Reactor线程组
的相关配置管理。从Reactor线程组
中的Sub Reactor
负责管理的客户端NioSocketChannel
相关配置存储在ServerBootstrap
结构中。
父类AbstractBootstrap
则是主要负责对主Reactor线程组
相关的配置进行管理,以及主Reactor线程组
中的Main Reactor
负责处理的服务端ServerSocketChannel
相关的配置管理。
1. 配置主从Reactor线程组
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//配置主从Reactor
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel>
//Main Reactor线程组
volatile EventLoopGroup group;
//Sub Reactor线程组
private volatile EventLoopGroup childGroup;
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup)
//父类管理主Reactor线程组
super.group(parentGroup);
if (this.childGroup != null)
throw new IllegalStateException("childGroup set already");
this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
return this;
2. 配置服务端ServerSocketChannel
ServerBootstrap b = new ServerBootstrap();
b.channel(NioServerSocketChannel.class);
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel>
//用于创建ServerSocketChannel ReflectiveChannelFactory
private volatile ChannelFactory<? extends C> channelFactory;
public B channel(Class<? extends C> channelClass)
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
@Deprecated
public B channelFactory(ChannelFactory<? extends C> channelFactory)
ObjectUtil.checkNotNull(channelFactory, "channelFactory");
if (this.channelFactory != null)
throw new IllegalStateException("channelFactory set already");
this.channelFactory = channelFactory;
return self();
在向ServerBootstrap
配置服务端ServerSocketChannel
的channel
方法中,其实是创建了一个ChannelFactory
工厂实例ReflectiveChannelFactory
,在Netty服务端启动的过程中,会通过这个ChannelFactory
去创建相应的Channel
实例。
我们可以通过这个方法来配置netty的IO模型,下面为ServerSocketChannel
在不同IO模型下的实现:
BIO | NIO | AIO |
---|---|---|
OioServerSocketChannel | NioServerSocketChannel | AioServerSocketChannel |
EventLoopGroup
Reactor线程组在不同IO模型下的实现:
BIO | NIO | AIO |
---|---|---|
ThreadPerChannelEventLoopGroup | NioEventLoopGroup | AioEventLoopGroup |
我们只需要将IO模型
的这些核心接口对应的实现类前缀
改为对应IO模型
的前缀,就可以轻松在Netty中完成对IO模型
的切换。
2.1 ReflectiveChannelFactory
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T>
//NioServerSocketChannelde 构造器
private final Constructor<? extends T> constructor;
public ReflectiveChannelFactory(Class<? extends T> clazz)
ObjectUtil.checkNotNull(clazz, "clazz");
try
//反射获取NioServerSocketChannel的构造器
this.constructor = clazz.getConstructor();
catch (NoSuchMethodException e)
throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
" does not have a public non-arg constructor", e);
@Override
public T newChannel()
try
//创建NioServerSocketChannel实例
return constructor.newInstance();
catch (Throwable t)
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
从类的签名我们可以看出,这个工厂类是通过泛型
加反射
的方式来创建对应的Channel
实例。
- 泛型参数
T extends Channel
表示的是要通过工厂类创建的Channel类型
,这里我们初始化的是NioServerSocketChannel
。 - 在
ReflectiveChannelFactory
的构造器中通过反射
的方式获取NioServerSocketChannel
的构造器。 - 在
newChannel
方法中通过构造器反射创建NioServerSocketChannel
实例。
注意这时只是配置阶段,NioServerSocketChannel
此时并未被创建。它是在启动的时候才会被创建出来。
3. 为NioServerSocketChannel配置ChannelOption
ServerBootstrap b = new ServerBootstrap();
//设置被MainReactor管理的NioServerSocketChannel的Socket选项
b.option(ChannelOption.SO_BACKLOG, 100)
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable
//serverSocketChannel中的ChannelOption配置
private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
public <T> B option(ChannelOption<T> option, T value)
ObjectUtil.checkNotNull(option, "option");
synchronized (options)
if (value == null)
options.remove(option);
else
options.put(option, value);
return self();
无论是服务端的NioServerSocketChannel
还是客户端的NioSocketChannel
它们的相关底层Socket选项ChannelOption
配置全部存放于一个Map
类型的数据结构中。
由于客户端NioSocketChannel
是由从Reactor线程组
中的Sub Reactor
来负责处理,所以涉及到客户端NioSocketChannel
所有的方法和配置全部是以child
前缀开头。
ServerBootstrap b = new ServerBootstrap();
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel>
//客户端SocketChannel对应的ChannelOption配置
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value)
ObjectUtil.checkNotNull(childOption, "childOption");
synchronized (childOptions)
if (value == null)
childOptions.remove(childOption);
else
childOptions.put(childOption, value);
return this;
相关的底层Socket选项,netty全部枚举在ChannelOption类中,笔者这里就不一一列举了,在本系列后续相关的文章中,笔者还会为大家详细的介绍这些参数的作用。
public class ChannelOption<T> extends AbstractConstant<ChannelOption<T>>
..................省略..............
public static final ChannelOption<Boolean> SO_BROADCAST = valueOf("SO_BROADCAST");
public static final ChannelOption<Boolean> SO_KEEPALIVE = valueOf("SO_KEEPALIVE");
public static final ChannelOption<Integer> SO_SNDBUF = valueOf("SO_SNDBUF");
public static final ChannelOption<Integer> SO_RCVBUF = valueOf("SO_RCVBUF");
public static final ChannelOption<Boolean> SO_REUSEADDR = valueOf("SO_REUSEADDR");
public static final ChannelOption<Integer> SO_LINGER = valueOf("SO_LINGER");
public static final ChannelOption<Integer> SO_BACKLOG = valueOf("SO_BACKLOG");
public static final ChannelOption<Integer> SO_TIMEOUT = valueOf("SO_TIMEOUT");
..................省略..............
4. 为服务端NioServerSocketChannel中的Pipeline配置ChannelHandler
//serverSocketChannel中pipeline里的handler(主要是acceptor)
private volatile ChannelHandler handler;
public B handler(ChannelHandler handler)
this.handler = ObjectUtil.checkNotNull(handler, "handler");
return self();
向NioServerSocketChannel
中的Pipeline
添加ChannelHandler
分为两种方式:
显式添加:
显式添加的方式是由用户在main线程中通过ServerBootstrap#handler
的方式添加。如果需要添加多个ChannelHandler
,则可以通过ChannelInitializer
向pipeline
中进行添加。
关于
ChannelInitializer
后面笔者会有详细介绍,这里大家只需要知道ChannelInitializer
是一种特殊的ChannelHandler
,用于初始化pipeline
。适用于向pipeline中添加多个ChannelHandler的场景。
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//配置主从Reactor
.channel(NioServerSocketChannel.class)//配置主Reactor中的channel类型
.handler(new ChannelInitializer<NioServerSocketChannel>()
@Override
protected void initChannel(NioServerSocketChannel ch) throws Exception
ChannelPipeline p = ch.pipeline();
p.addLast(channelhandler1)
.addLast(channelHandler2)
......
.addLast(channelHandler3);
)
隐式添加:
隐式添加主要添加的就是主ReactorGroup
的核心组件也就是下图中的acceptor
,Netty中的实现为ServerBootstrapAcceptor
,本质上也是一种ChannelHandler
,主要负责在客户端连接建立好后,初始化客户端NioSocketChannel
,在从Reactor线程组中
选取一个Sub Reactor
,将客户端NioSocketChannel
注册到Sub Reactor
中的selector
上。
隐式添加
ServerBootstrapAcceptor
是由Netty框架在启动的时候负责添加,用户无需关心。
在本例中,NioServerSocketChannel
的PipeLine
中只有两个ChannelHandler
,一个由用户在外部显式添加的LoggingHandler
,另一个是由Netty框架隐式添加的ServerBootstrapAcceptor
。
其实我们在实际项目使用的过程中,不会向netty服务端
NioServerSocketChannel
添加额外的ChannelHandler,NioServerSocketChannel
只需要专心做好自己最重要的本职工作接收客户端连接就好了。这里额外添加一个LoggingHandler
只是为了向大家展示ServerBootstrap
的配置方法。
5. 为客户端NioSocketChannel中的Pipeline配置ChannelHandler
final EchoServerHandler serverHandler = new EchoServerHandler();
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() //设置从Reactor中注册channel的pipeline
@Override
public void initChannel(SocketChannel ch) throws Exception
ChannelPipeline p = ch.pipeline();
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
);
//socketChannel中pipeline中的处理handler
private volatile ChannelHandler childHandler;
public ServerBootstrap childHandler(ChannelHandler childHandler)
this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
return this;
向客户端NioSocketChannel
中的Pipeline
里添加ChannelHandler
完全是由用户自己控制显式添加,添加的数量不受限制。
由于在Netty的IO线程模型
中,是由单个Sub Reactor线程
负责执行客户端NioSocketChannel
中的Pipeline
,一个Sub Reactor线程
负责处理多个NioSocketChannel
上的IO事件
,如果Pipeline
中的ChannelHandler
添加的太多,就会影响Sub Reactor线程
执行其他NioSocketChannel
上的Pipeline
,从而降低IO处理效率
,降低吞吐量。
所以Pipeline
中的ChannelHandler
不易添加过多,并且不能再ChannelHandler
中执行耗时的业务处理任务。
在我们通过ServerBootstrap
配置netty服务端启动信息的时候,无论是向服务端NioServerSocketChannel
的pipeline中添加ChannelHandler,还是向客户端NioSocketChannel
的pipeline中添加ChannelHandler,当涉及到多个ChannelHandler添加的时候,我们都会用到ChannelInitializer
,那么这个ChannelInitializer
究竟是何方圣神,为什么要这样做呢?我们接着往下看~~
ChannelInitializer
首先ChannelInitializer
它继承于ChannelHandler
,它自己本身就是一个ChannelHandler,所以它可以添加到childHandler
中。
其他的父类大家这里可以不用管,后面文章中笔者会一一为大家详细介绍。
那为什么不直接添加ChannelHandler
而是选择用ChannelInitializer
呢?
这里主要有两点原因:
- 前边我们提到,客户端
NioSocketChannel
是在服务端accept连接后,在服务端NioServerSocketChannel
中被创建出来的。但是此时我们正处于配置ServerBootStrap
阶段,服务端还没有启动,更没有客户端连接上来,此时客户端NioSocketChannel
还没有被创建出来,所以也就没办法向客户端NioSocketChannel
的pipeline中添加ChannelHandler
。 - 客户端
NioSocketChannel
中Pipeline
里可以添加任意多个ChannelHandler
,但是Netty框架无法预知用户到底需要添加多少个ChannelHandler
,所以Netty框架提供了回调函数ChannelInitializer#initChannel
,使用户可以自定义ChannelHandler
的添加行为。
当客户端NioSocketChannel
注册到对应的Sub Reactor
上后,紧接着就会初始化NioSocketChannel
中的Pipeline
,此时Netty框架会回调ChannelInitializer#initChannel
执行用户自定义的添加逻辑。
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter
@Override
@SuppressWarnings("unchecked")
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception
//当channelRegister事件发生时,调用initChannel初始化pipeline
if (initChannel(ctx))
.................省略...............
else
.................省略...............
private boolean initChannel(ChannelHandlerContext ctx) throws Exception
if (initMap.add(ctx)) // Guard against re-entrance.
try
//此时客户单NioSocketChannel已经创建并初始化好了
initChannel((C) ctx.channel());
catch (Throwable cause)
.................省略...............
finally
.................省略...............
return true;
return false;
protected abstract void initChannel(C ch) throws Exception;
.................省略...............
这里由netty框架回调的ChannelInitializer#initChannel方法
正是我们自定义的添加逻辑。
final EchoServerHandler serverHandler = new EchoServerHandler();
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() //设置从Reactor中注册channel的pipeline
@Override
public void initChannel(SocketChannel ch) throws Exception
ChannelPipeline p = ch.pipeline();
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
);
到此为止,Netty服务端启动所需要的必要配置信息,已经全部存入ServerBootStrap
启动辅助类中。
接下来要做的事情就是服务端的启动了。
// Start the server. 绑定端口启动服务,开始监听accept事件
ChannelFuture f = serverBootStrap.bind(PORT).sync();
Netty服务端的启动
经过前面的铺垫终于来到了本文的核心内容----Netty服务端的启动过程。
如代码模板中的示例所示,Netty服务端的启动过程封装在io.netty.bootstrap.AbstractBootstrap#bind(int)
函数中。
接下来我们看一下Netty服务端在启动过程中究竟干了哪些事情?
大家看到这副启动流程图先不要慌,接下来的内容笔者会带大家各个击破它,在文章的最后保证让大家看懂这副流程图。
我们先来从netty服务端启动的入口函数开始我们今天的源码解析旅程:
public ChannelFuture bind(int inetPort)
return bind(new InetSocketAddress(inetPort));
public ChannelFuture bind(SocketAddress localAddress)
//校验Netty核心组件是否配置齐全
validate();
//服务端开始启动,绑定端口地址,接收客户端连接
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
private ChannelFuture doBind(final SocketAddress localAddress)
//异步创建,初始化,注册ServerSocketChannel到main reactor上
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null)
return regFuture;
if (regFuture.isDone())
........serverSocketChannel向Main Reactor注册成功后开始绑定端口....,
else
//如果此时注册操作没有完成,则向regFuture添加operationComplete回调函数,注册成功后回调。
regFuture.addListener(new ChannelFutureListener()
@Override
public void operationComplete(ChannelFuture future) throws Exception
........serverSocketChannel向Main Reactor注册成功后开始绑定端口....,
);
return promise;
Netty服务端的启动流程总体如下:
- 创建服务端
NioServerSocketChannel
并初始化。 - 将服务端
NioServerSocketChannel
注册到主Reactor线程组
中。 - 注册成功后,开始初始化
NioServerSocketChannel
中的pipeline,然后在pipeline中触发channelRegister事件。 - 随后由
NioServerSocketChannel
绑定端口地址。 - 绑定端口地址成功后,向
NioServerSocketChannel
对应的Pipeline
中触发传播ChannelActive事件
,在ChannelActive事件回调
中向Main Reactor
注册OP_ACCEPT事件
,开始等待客户端连接。服务端启动完成。
当netty服务端启动成功之后,最终我们会得到如下结构的阵型,开始枕戈待旦,准备接收客户端的连接,Reactor开始运转。
接下来,我们就来看下Netty源码是如何实现以上步骤的~~
1. initAndRegister
final ChannelFuture initAndRegister()
Channel channel = null;
try
//创建NioServerSocketChannel
//ReflectiveChannelFactory通过泛型,反射,工厂的方式灵活创建不同类型的channel
channel = channelFactory.newChannel();
//初始化NioServerSocketChannel
init(channel);
catch (Throwable t)
..............省略.................
//向MainReactor注册ServerSocketChannel
ChannelFuture regFuture = config().group().register(channel);
..............省略.................
return regFuture;
从函数命名中我们可以看出,这个函数主要做的事情就是首先创建NioServerSocketChannel
,并对NioServerSocketChannel
进行初始化,最后将NioServerSocketChannel
注册到Main Reactor
中。
1.1 创建NioServerSocketChannel
还记得我们在介绍ServerBootstrap
启动辅助类配置服务端ServerSocketChannel
类型的时候提到的工厂类ReflectiveChannelFactory
吗?
因为当时我们在配置ServerBootstrap
启动辅助类的时候,还没到启动阶段,而配置阶段并不是创建具体ServerSocketChannel
的时机。
所以Netty通过工厂模式
将要创建的ServerSocketChannel
的类型(通过泛型指定)以及 创建的过程(封装在newChannel函数中
)统统先封装在工厂类ReflectiveChannelFactory
中。
ReflectiveChannelFactory`通过`泛型`,`反射`,`工厂`的方式`灵活`创建不同类型的`channel
等待创建时机来临,我们调用保存在ServerBootstrap
中的channelFactory
直接进行创建。
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T>
private final Constructor<? extends T> constructor;
@Override
public T newChannel()
try
return constructor.newInstance();
catch (Throwable t)
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
下面我们来看下NioServerSocketChannel
的构建过程:
1.1.1 NioServerSocketChannel
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel
//SelectorProvider(用于创建Selector和Selectable Channels)
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
public NioServerSocketChannel()
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
//创建JDK NIO ServerSocketChannel
private static ServerSocketChannel newSocket(SelectorProvider provider)
try
return provider.openServerSocketChannel();
catch (IOException e)
throw new ChannelException(
"Failed to open a server socket.", e);
//ServerSocketChannel相关的配置
private final ServerSocketChannelConfig config;
public NioServerSocketChannel(ServerSocketChannel channel)
//父类AbstractNioChannel中保存JDK NIO原生ServerSocketChannel以及要监听的事件OP_ACCEPT
super(null, channel, SelectionKey.OP_ACCEPT);
//DefaultChannelConfig中设置用于Channel接收数据用的buffer->AdaptiveRecvByteBufAllocator
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
- 首先调用
newSocket
创建JDK NIO 原生ServerSocketChannel
,这里调用了SelectorProvider#openServerSocketChannel
来创建JDK NIO 原生ServerSocketChannel
,我们在上篇文章《聊聊Netty那些事儿之Reactor在Netty中的实现(创建篇)》中详细的介绍了SelectorProvider
相关内容,当时是用SelectorProvider
来创建Reactor
中的Selector
。大家还记得吗?? - 通过父类构造器设置
NioServerSocketChannel
感兴趣的IO事件
,这里设置的是SelectionKey.OP_ACCEPT
事件。并将JDK NIO 原生ServerSocketChannel
封装起来。 - 创建
Channel
的配置类NioServerSocketChannelConfig
,在配置类中封装了对Channel底层
的一些配置行为,以及JDK中的ServerSocket
。以及创建NioServerSocketChannel
接收数据用的Buffer
分配器AdaptiveRecvByteBufAllocator
。
NioServerSocketChannelConfig
没什么重要的东西,我们这里也不必深究,它就是管理NioServerSocketChannel
相关的配置,这里唯一需要大家注意的是这个用于Channel
接收数据用的Buffer分配器
AdaptiveRecvByteBufAllocator,我们后面在介绍Netty如何接收连接的时候还会提到。
NioServerSocketChannel
的整体构建过程介绍完了,现在我们来按照继承层次再回过头来看下NioServerSocketChannel
的层次构建,来看下每一层都创建了什么,封装了什么,这些信息都是Channel
的核心信息,所以有必要了解一下。
在NioServerSocketChannel
的创建过程中,我们主要关注继承结构图中红框标注的三个类,其他的我们占时先不用管。
其中AbstractNioMessageChannel类
主要是对NioServerSocketChannel
底层读写行为的封装和定义,比如accept接收客户端连接。这个我们后续会介绍到,这里我们并不展开。
这里是Netty自己构建的一套Channel体系,而java net库提供的Channel体系无关
1.1.2 AbstractNioChannel
public abstract class AbstractNioChannel extends AbstractChannel
//JDK NIO原生Selectable Channel
private final SelectableChannel ch;
// Channel监听事件集合 这里是SelectionKey.OP_ACCEPT事件
protected final int readInterestOp;
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp)
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try
//设置Channel为非阻塞 配合IO多路复用模型
ch.configureBlocking(false);
catch (IOException e)
.............省略................
- 封装由
SelectorProvider
创建出来的JDK NIO原生ServerSocketChannel
。 - 封装
Channel
在创建时指定感兴趣的IO事件
,对于NioServerSocketChannel
来说感兴趣的IO事件
为OP_ACCEPT事件
。 - 设置JDK NIO原生
ServerSocketChannel
为非阻塞模式, 配合IO多路复用模型。
1.1.3 AbstractChannel
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel
//channel是由创建层次的,比如ServerSocketChannel 是 SocketChannel的 parent
private final Channel parent;
//channel全局唯一ID machineId+processId+sequence+timestamp+random
private final ChannelId id;
//unsafe用于封装对底层socket的相关操作
private final Unsafe unsafe;
//为channel分配独立的pipeline用于IO事件编排
private final DefaultChannelPipeline pipeline;
protected AbstractChannel(Channel parent)
this.parent = parent;
//channel全局唯一ID machineId+processId+sequence+timestamp+random
id = newId();
//unsafe用于定义实现对Channel的底层操作
unsafe = newUnsafe();
//为channel分配独立的pipeline用于IO事件编排
pipeline = newChannelPipeline();
- Netty中的
Channel创建
是有层次的,这里的parent属性
用来保存上一级的Channel
,比如这里的NioServerSocketChannel
是顶级Channel
,所以它的parent = null
。客户端NioSocketChannel
是由NioServerSocketChannel
创建的,所以它的parent = NioServerSocketChannel
。 - 为
Channel
分配全局唯一的ChannelId
。ChannelId
由机器Id(machineId
),进程Id(processId
),序列号(sequence
),时间戳(timestamp
),随机数(random
)构成
private DefaultChannelId()
data = new byte[MACHINE_ID.length + PROCESS_ID_LEN + SEQUENCE_LEN + TIMESTAMP_LEN + RANDOM_LEN];
int i = 0;
// machineId
System.arraycopy(MACHINE_ID, 0, data, i, MACHINE_ID.length);
i += MACHINE_ID.length;
// processId
i = writeInt(i, PROCESS_ID);
// sequence
i = writeInt(i, nextSequence.getAndIncrement());
// timestamp (kind of)
i = writeLong(i, Long.reverse(System.nanoTime()) ^ System.currentTimeMillis());
// random
int random = PlatformDependent.threadLocalRandom().nextInt();
i = writeInt(i, random);
assert i == data.length;
hashCode = Arrays.hashCode(data);
- 创建
NioServerSocketChannel
的底层操作类Unsafe
。这里创建的是io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe
。
Unsafe
为Channel接口
的一个内部接口,用于定义实现对Channel底层的各种操作,Unsafe接口
定义的操作行为只能由Netty框架的Reactor线程
调用,用户线程禁止调用。
注意与java底层的Unsafe进行区分
interface Unsafe
//分配接收数据用的Buffer
RecvByteBufAllocator.Handle recvBufAllocHandle();
//服务端绑定的端口地址
SocketAddress localAddress();
//远端地址
SocketAddress remoteAddress();
//channel向Reactor注册
void register(EventLoop eventLoop, ChannelPromise promise);
//服务端绑定端口地址
void bind(SocketAddress localAddress, ChannelPromise promise);
//客户端连接服务端
void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
//关闭channle
void close(ChannelPromise promise);
//读数据
void beginRead();
//写数据
void write(Object msg, ChannelPromise promise);
- 为
NioServerSocketChannel
分配独立的pipeline
用于IO事件编排。pipeline
其实是一个ChannelHandlerContext
类型的双向链表。头结点HeadContext
,尾结点TailContext
。ChannelHandlerContext
中包装着ChannelHandler
。
ChannelHandlerContext
保存 ChannelHandler上下文信息,用于事件传播。后面笔者会单独开一篇文章介绍,这里我们还是聚焦于启动主线。
这里只是为了让大家简单理解pipeline
的一个大致的结构,后面会写一篇文章专门详细讲解pipeline
。
protected DefaultChannelPipeline(Channel channel)
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
到了这里NioServerSocketChannel
就创建完毕了,我们来回顾下它到底包含了哪些核心信息。
1.2 初始化NioServerSocketChannel
void init(Channel channel)
//向NioServerSocketChannelConfig设置ServerSocketChannelOption
setChannelOptions(channel, newOptionsArray(), logger);
//向netty自定义的NioServerSocketChannel设置attributes
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
ChannelPipeline p = channel.pipeline();
//获取从Reactor线程组
final EventLoopGroup currentChildGroup = childGroup;
//获取用于初始化客户端NioSocketChannel的ChannelInitializer
final ChannelHandler currentChildHandler = childHandler;
//获取用户配置的客户端SocketChannel的channelOption以及attributes
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
synchronized (childOptions)
currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
//向NioServerSocketChannel中的pipeline添加初始化ChannelHandler的逻辑
p.addLast(new ChannelInitializer<Channel>()
@Override
public void initChannel(final Channel ch)
final ChannelPipeline pipeline = ch.pipeline();
//ServerBootstrap中用户指定的channelHandler
ChannelHandler handler = config.handler();
if (handler != null)
//LoggingHandler
pipeline.addLast(handler);
//添加用于接收客户端连接的acceptor
ch.eventLoop().execute(new Runnable()
@Override
public void run()
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
);
);
- 向
NioServerSocketChannelConfig
设置ServerSocketChannelOption
。 - 向netty自定义的
NioServerSocketChannel
设置ChannelAttributes
Netty自定义的SocketChannel
类型均继承AttributeMap
接口以及DefaultAttributeMap
类,正是它们定义了ChannelAttributes
。用于向Channel
添加用户自定义的一些信息。
这个
ChannelAttributes
的用处大有可为,Netty后边的许多特性都是依靠这个ChannelAttributes
来实现的。这里先卖个关子,大家可以自己先想一下可以用这个ChannelAttributes
做哪些事情?
- 获取从Reactor线程组
childGroup
,以及用于初始化客户端NioSocketChannel
的ChannelInitializer
,ChannelOption
,ChannelAttributes
,这些信息均是由用户在启动的时候向ServerBootstrap
添加的客户端NioServerChannel
配置信息。这里用这些信息来初始化ServerBootstrapAcceptor
。因为后续会在ServerBootstrapAcceptor
中接收客户端连接以及创建NioServerChannel
。 - 向
NioServerSocketChannel
中的pipeline
添加用于初始化pipeline
的ChannelInitializer
。
问题来了,这里为什么不干脆直接将ChannelHandler
添加到pipeline
中,而是又使用到了ChannelInitializer
呢?
其实原因有两点:
- 为了保证
线程安全
地初始化pipeline
,所以初始化的动作需要由Reactor线程
进行,而当前线程是用户程序
的启动Main线程
并不是
Reactor线程。这里不能立即初始化。 - 初始化
Channel
中pipeline
的动作,需要等到Channel
注册到对应的Reactor
中才可以进行初始化,当前只是创建好了NioServerSocketChannel
,但并未注册到Main Reactor
上。
初始化
NioServerSocketChannel
中pipeline
的时机是:当NioServerSocketChannel
注册到Main Reactor
之后,绑定端口地址之前。
前边在介绍
ServerBootstrap
配置childHandler
时也用到了ChannelInitializer
,还记得吗??
问题又来了,大家注意下ChannelInitializer#initChannel
方法,在该初始化回调方法中,添加LoggingHandler是直接向pipeline中添加,而添加Acceptor为什么不是直接添加而是封装成异步任务呢?
这里先给大家卖个关子,笔者会在后续流程中为大家解答~~~~~
此时NioServerSocketChannel
中的pipeline
结构如下图所示: