Netty 核心源码解读 —— ServerBootstrap 篇
Posted 松然聊技术
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty 核心源码解读 —— ServerBootstrap 篇相关的知识,希望对你有一定的参考价值。
本文我们就开始对 ServerBootstrap 进行源码解读(4.1.51.Final-SNAPSHOT),为什么是 ServerBootstrap,记得在用 Netty 做第一个项目的时候,写的第一行 Code 就是 new ServerBootstrap()
,ServerBootstrap 是 Netty Server 的启动类,所以从它开始了解 Netty 是最合适的。
ServerBootstrap
private final EventLoopGroup bossGroup = new NioEventLoopGroup(BIZ_GROUP_SIZE);
private final EventLoopGroup workerGroup = new NioEventLoopGroup(BIZ_THREAD_SIZE);
public void init() throws Exception {
// Server 服务启动
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup);
bootstrap.channel(NioserverSocketChannel.class);
bootstrap.childHandler(new ServerChannelInitializer(serverConfig));
// 可选参数
bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
// 绑定接口,同步等待成功
ChannelFuture future = bootstrap.bind(port).sync();
ChannelFuture channelFuture = future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
}
});
}
这是我在做 TCP 网关时写的 Netty Server 的代码片段(https://github.com/SongranZhang/tcp-gateway/blob/master/src/main/java/com/linkedkeeper/tcp/connector/tcp/server/TcpServer.java),可以看到,Netty Server 的初始化首先是通过 ServerBootstrap 的无参构造函数创建一个对象,接着是这个对象的一串链式调用 bootstrap.group().channel().childHandler().childOption()
,而服务启动的真正触发点是这段 bootstrap.bind(port).sync()
,下面我们就逐一来分析下这里的每个方法。
首先是 group()
方法。
## ServerBootstrap.java
private volatile EventLoopGroup childGroup;
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
return this;
}
这里 workerGroup 赋值给了 ServerBootstrap 的 childGroup,bossGroup 赋值给了父类 AbstractBootstrap 的 group。
volatile EventLoopGroup group;
public B group(EventLoopGroup group) {
ObjectUtil.checkNotNull(group, "group");
if (this.group != null) {
throw new IllegalStateException("group set already");
}
this.group = group;
return self();
}
接下来是 channel()
方法。
## AbstractBootstrap.java
private volatile ChannelFactory extends C> channelFactory;
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
public B channelFactory(ChannelFactory extends C> channelFactory) {
ObjectUtil.checkNotNull(channelFactory, "channelFactory");
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return self();
}
这里 NioServerSocketChannel.class
通过 ReflectiveChannelFactory 进行了实例化,然后赋值给了 AbstractBootstrap 的 channelFactory。
接下来是 childHandler()
方法。
## ServerBootstrap.java
private volatile ChannelHandler childHandler;
public ServerBootstrap childHandler(ChannelHandler childHandler) {
this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
return this;
}
这里是对 ServerBootstrap 的 childHandler 赋值。
最后是 childOption()
方法。
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {
ObjectUtil.checkNotNull(childOption, "childOption");
synchronized (childOptions) {
if (value == null) {
childOptions.remove(childOption);
} else {
childOptions.put(childOption, value);
}
}
return this;
}
这里 childOptions 维护了 TCP 的参数设置。
简言之 bootstrap.group().channel().childHandler().childOption()
就是在构建 Netty Server 的各种参数,下面再来看 bootstrap.bind(port).sync()
。
首先是 bind()
方法。
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {
validate();
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
public B validate() {
if (group == null) {
throw new IllegalStateException("group not set");
}
if (channelFactory == null) {
throw new IllegalStateException("channel or channelFactory not set");
}
return self();
}
这里的 validate()
方法对 AbstractBootstrap 的 group 和 channelFactory 进行非空校验,之后调用 doBind()
方法。
## AbstractBootstrap.java
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
首先看一下 initAndRegister()
方法。
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
这里 channelFactory.newChannel()
调用的是 ReflectiveChannelFactory 的 newChannel 方法。
## ReflectiveChannelFactory
private final Constructor<? extends T> constructor;
public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
" does not have a public non-arg constructor", e);
}
}
public T newChannel() {
try {
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
这里 constructor.newInstance()
是 NioServerSocketChannel.class
的一个实例。得到 channel 后,调用 init(channel)
进行初始化,一是给 options 和 attrs 赋值,二是构建 pipeline。
## ServerBootstrap.java
void init(Channel channel) {
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption >, Object>[] currentChildOptions;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
}
final Entry<AttributeKey >, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
回到 initAndRegister()
方法中,init(channel)
之后是 register(channel)
,该方法在 NioEventLoopGroup 的父类 MultithreadEventLoopGroup 中实现,我们在解读 NioEventLoop 源码时再分析。
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
看完 initAndRegister()
,再回到 doBind()
接着看 doBind0()
。
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
这里 regFuture.isSuccess()
会执行 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
,否者执行 promise.setFailure(regFuture.cause());
,这里的 promise 可以认为是一种特殊的 Future 对象。bind 是在 ChannelPipeline 里进行绑定的,我们在解读 ChannelPipeline 源码时再分析。
最后看一下 bootstrap.bind(serverPort).sync()
中的 sync()
,bootstrap.bind(serverPort)
返回的是 ChannelFuture,所以 sync()
是调用 DefaultChannelPromise 的方法。
## DefaultChannelPromise
public ChannelPromise sync() throws InterruptedException {
super.sync();
return this;
}
这里 super.sync();
调用了父类的方法。
## DefaultPromise
public Promise<V> sync() throws InterruptedException {
await();
rethrowIfFailed();
return this;
}
public Promise<V> await() throws InterruptedException {
if (isDone()) {
return this;
}
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
checkDeadLock();
synchronized (this) {
while (!isDone()) {
incWaiters();
try {
wait();
} finally {
decWaiters();
}
}
}
return this;
}
这里 while(!isDone())
会进入循环,调用 sync()
后线程会被阻塞住。
总结
本篇也是写了好久,本文介绍了 ServerBootstrap,它是构建 Netty Server 的主要实现类,ServerBootstrap 里主要是对各种属性进行赋值,并创建 Channel 和 ChannelPipeline,最后绑定本地端口开始监听 IO 事件。在后续的文章里,我会继续与大家讨论 Netty 的 EventLoop,还请大家多多关注我的个人博客或公账号。
以上是关于Netty 核心源码解读 —— ServerBootstrap 篇的主要内容,如果未能解决你的问题,请参考以下文章
ElasticSearchEs 源码之 Netty4HttpServerTransport 源码解读