死磕Netty-----服务端启动过程分析

Posted Java技术驿站

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了死磕Netty-----服务端启动过程分析相关的知识,希望对你有一定的参考价值。

原文出处http://cmsblogs.com/ 『chenssy』
转载请注明原创出处,谢谢!

上篇博客(),了解了 Netty 的核心组件及其设计,但是这些都是零散的,不成体系。那么 Netty 是如何利用这些组件构建成一个高性能的异步通信框架。通过这篇博客可以初步了解。

下面先来一段 Netty 服务端的代码:

 
   
   
 
  1. public class NettyServer {

  2.    public void bind(int port){

  3.        // 创建EventLoopGroup

  4.        EventLoopGroup bossGroup = new NioEventLoopGroup();        //创建BOSS线程组 用于服务端接受客户端的连接

  5.        EventLoopGroup workerGroup = new NioEventLoopGroup();      //创建WORK线程组 用于进行SocketChannel的网络读写

  6.        try {

  7.            // 创建ServerBootStrap实例

  8.            // ServerBootstrap 用于启动NIO服务端的辅助启动类,目的是降低服务端的开发复杂度

  9.            ServerBootstrap b = new ServerBootstrap();

  10.            // 绑定Reactor线程池

  11.            b.group(bossGroup, workerGroup)

  12.                    // 设置并绑定服务端Channel

  13.                    // 指定所使用的NIO传输的Channel

  14.                    .channel(NioserverSocketChannel.class)

  15.                    .option(ChannelOption.SO_BACKLOG, 1024)

  16.                    .handler(new LoggingServerHandler())

  17.                    .childHandler(new ChannelInitializer(){

  18.                        @Override

  19.                        protected void initChannel(Channel ch) throws Exception {

  20.                            //do something

  21.                        }

  22.                    });

  23.            // 绑定端口,同步等待成功

  24.            ChannelFuture future = b.bind(port).sync();

  25.            // 等待服务端监听端口关闭

  26.            future.channel().closeFuture().sync();

  27.        } catch (InterruptedException e) {

  28.            e.printStackTrace();

  29.        } finally {

  30.            // 优雅地关闭

  31.            bossGroup.shutdownGracefully();

  32.            workerGroup.shutdownGracefully();

  33.        }

  34.    }

  35.    private class LoggingServerHandler extends ChannelInboundHandlerAdapter{

  36.        @Override

  37.        public void channelActive(ChannelHandlerContext ctx) throws Exception {

  38.            System.out.println("loggin-channelActive");

  39.        }

  40.        @Override

  41.        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {

  42.            System.out.println("loggin-channelRegistered");

  43.        }

  44.        @Override

  45.        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {

  46.            System.out.println("loggin-handlerAdded");

  47.        }

  48.    }

  49.    public static void main(String[] args){

  50.            new NettyServer().bind(8899);

  51.    }

  52. }

上面代码为 Netty 服务器端的完整代码,在整个服务端代码中会涉及如下几个核心类。

ServerBootstrap

ServerBootstrap 为 Netty 服务端的启动辅助类,它提供了一系列的方法用于设置服务端启动相关的参数。

Channel

Channel 为 Netty 网络操作抽象类,它定义了一组功能,其提供的 API 大大降低了直接使用 Socket 类的复杂性。当然它也不仅仅只是包括了网络 IO 操作的基本功能,还包括一些与 Netty 框架相关的功能,包括获取该 Channel 的 EventLoop 等等。

EventLoopGroup

EventLoopGroup 为 Netty 的 Reactor 线程池,它实际上就是 EventLoop 的容器,而 EventLoop 为 Netty 的核心抽象类,它的主要职责是处理所有注册到本线程多路复用器 Selector 上的 Channel。

ChannelHandler

ChannelHandler 作为 Netty 的主要组件,它主要负责 I/O 事件或者 I/O 操作进行拦截和处理,它可以选择性地拦截和处理自己感觉兴趣的事件,也可以透传和终止事件的传递。

ChannelPipeline

ChannelPipeline 是 ChannelHandler 链的容器,它负责 ChannelHandler 的管理和事件拦截与调度。每当新建一个 Channel 都会分配一个新的 ChannelPepeline,同时这种关联是永久性的。

以上是简要介绍,详细介绍请参考(

服务端创建流程

Netty 服务端创建的时序图,如下(摘自《Netty权威指南(第二版)》)

主要步骤为:

  1. 创建 ServerBootstrap 实例

  2. 设置并绑定 Reactor 线程池

  3. 设置并绑定服务端 Channel

  4. 创建并初始化 ChannelPipeline

  5. 添加并设置 ChannelHandler

  6. 绑定并启动监听端口

服务端源码分析

1、创建两个EventLoopGroup

 
   
   
 
  1.        EventLoopGroup bossGroup = new NioEventLoopGroup();

  2.        EventLoopGroup workerGroup = new NioEventLoopGroup();

bossGroup 为 BOSS 线程组,用于服务端接受客户端的连接, workerGroup 为 worker 线程组,用于进行 SocketChannel 的网络读写。当然也可以创建一个并共享。

2、创建ServerBootstrap实例

 
   
   
 
  1. ServerBootstrap b = new ServerBootstrap();

ServerBootStrap为Netty服务端的启动引导类,用于帮助用户快速配置、启动服务端服务。提供的方法如下:

方法名称 方法描述
group 设置 ServerBootstrap 要用的 EventLoopGroup
channel 设置将要被实例化的 ServerChannel 类
option 实例化的 ServerChannel 的配置项
childHandler 设置并添加 ChannelHandler
bind 绑定 ServerChannel

ServerBootStrap底层采用装饰者模式。

关于 ServerBootStrap 我们后续做详细分析。

3、设置并绑定Reactor线程池

调用 group() 方法,为 ServerBootstrap 实例设置并绑定 Reactor 线程池。

 
   
   
 
  1. b.group(bossGroup, workerGroup)

EventLoopGroup 为 Netty 线程池,它实际上就是 EventLoop 的数组容器。EventLoop 的职责是处理所有注册到本线程多路复用器 Selector 上的 Channel,Selector 的轮询操作由绑定的 EventLoop 线程 run 方法驱动,在一个循环体内循环执行。通俗点讲就是一个死循环,不断的检测 I/O 事件、处理 I/O 事件。

这里设置了两个group,这个其实有点儿像我们工作一样。需要两类型的工人,一个老板(bossGroup),一个工人(workerGroup),老板负责从外面接活,工人则负责死命干活(尼玛,和我上家公司一模一样)。所以这里 bossGroup 的作用就是不断地接收新的连接,接收之后就丢给 workerGroup 来处理,workerGroup 负责干活就行(负责客户端连接的 IO 操作)。

源码如下:

 
   
   
 
  1.    public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {

  2.        super.group(parentGroup);        // 绑定boosGroup

  3.        if (childGroup == null) {

  4.            throw new NullPointerException("childGroup");

  5.        }

  6.        if (this.childGroup != null) {

  7.            throw new IllegalStateException("childGroup set already");

  8.        }

  9.        this.childGroup = childGroup;    // 绑定workerGroup

  10.        return this;

  11.    }

其中父 EventLoopGroup 传递到父类的构造函数中:

 
   
   
 
  1.    public B group(EventLoopGroup group) {

  2.        if (group == null) {

  3.            throw new NullPointerException("group");

  4.        }

  5.        if (this.group != null) {

  6.            throw new IllegalStateException("group set already");

  7.        }

  8.        this.group = group;

  9.        return (B) this;

  10.    }

4、设置并绑定服务端Channel绑定线程池后,则需要设置 channel 类型,服务端用的是 NioServerSocketChannel 。

 
   
   
 
  1. .channel(NioServerSocketChannel.class)

调用 ServerBootstrap.channel 方法用于设置服务端使用的 Channel,传递一个 NioServerSocketChannel Class对象,Netty通过工厂类,利用反射创建NioServerSocketChannel 对象,如下:

 
   
   
 
  1.    public B channel(Class<? extends C> channelClass) {

  2.        if (channelClass == null) {

  3.            throw new NullPointerException("channelClass");

  4.        }

  5.        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));

  6.    }

channelFactory() 用于设置 Channel 工厂的:

 
   
   
 
  1.    public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {

  2.        return channelFactory((ChannelFactory<C>) channelFactory);

  3.    }

  4.    public B channelFactory(ChannelFactory<? extends C> channelFactory) {

  5.        if (channelFactory == null) {

  6.            throw new NullPointerException("channelFactory");

  7.        }

  8.        if (this.channelFactory != null) {

  9.            throw new IllegalStateException("channelFactory set already");

  10.        }

  11.        this.channelFactory = channelFactory;

  12.        return (B) this;

  13.    }

这里传递的是 ReflectiveChannelFactory,其源代码如下:

 
   
   
 
  1. public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

  2.    private final Class<? extends T> clazz;

  3.    public ReflectiveChannelFactory(Class<? extends T> clazz) {

  4.        if (clazz == null) {

  5.            throw new NullPointerException("clazz");

  6.        }

  7.        this.clazz = clazz;

  8.    }

  9.    //需要创建 channel 的时候,该方法将被调用

  10.    @Override

  11.    public T newChannel() {

  12.        try {

  13.            // 反射创建对应 channel

  14.            return clazz.newInstance();

  15.        } catch (Throwable t) {

  16.            throw new ChannelException("Unable to create Channel from class " + clazz, t);

  17.        }

  18.    }

  19.    @Override

  20.    public String toString() {

  21.        return StringUtil.simpleClassName(clazz) + ".class";

  22.    }

  23. }

确定服务端的 Channel(NioServerSocketChannel)后,调用 option()方法设置 Channel 参数,作为服务端,主要是设置TCP的backlog参数,如下:

 
   
   
 
  1. .option(ChannelOption.SO_BACKLOG, 1024)

option()源码如下:

 
   
   
 
  1.    public <T> B option(ChannelOption<T> option, T value) {

  2.        if (option == null) {

  3.            throw new NullPointerException("option");

  4.        }

  5.        if (value == null) {

  6.            synchronized (options) {

  7.                options.remove(option);

  8.            }

  9.        } else {

  10.            synchronized (options) {

  11.                options.put(option, value);

  12.            }

  13.        }

  14.        return (B) this;

  15.    }

  16.    private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();

五、添加并设置ChannelHandler

设置完 Channel 参数后,用户可以为启动辅助类和其父类分别指定 Handler。

 
   
   
 
  1. .handler(new LoggingServerHandler())

  2. .childHandler(new ChannelInitializer(){

  3.    //省略代码

  4. })

这两个 Handler 不一样,前者( handler())设置的 Handler 是服务端 NioServerSocketChannel的,后者( childHandler())设置的 Handler 是属于每一个新建的 NioSocketChannel 的。跟踪源代码会发现两种所处的类不一样,handler 位于 AbstractBootstrap 中,childHandler 位于 ServerBootstrap 中,如下:

 
   
   
 
  1.    // AbstractBootstrap

  2.    public B handler(ChannelHandler handler) {

  3.        if (handler == null) {

  4.            throw new NullPointerException("handler");

  5.        }

  6.        this.handler = handler;

  7.        return (B) this;

  8.    }

  9.    // ServerBootstrap

  10.    public ServerBootstrap childHandler(ChannelHandler childHandler) {

  11.        if (childHandler == null) {

  12.            throw new NullPointerException("childHandler");

  13.        }

  14.        this.childHandler = childHandler;

  15.        return this;

  16.    }

ServerBootstrap 中的 Handler 是 NioServerSocketChannel 使用的,所有连接该监听端口的客户端都会执行它,父类 AbstractBootstrap 中的 Handler 是一个工厂类,它为每一个新接入的客户端都创建一个新的 Handler。如下图(《Netty权威指南(第二版)》):

六、绑定端口,启动服务

服务端最后一步,绑定端口并启动服务,如下:

 
   
   
 
  1. ChannelFuture future = b.bind(port).sync();

调用 ServerBootstrap 的 bind() 方法进行端口绑定:

 
   
   
 
  1.    public ChannelFuture bind(int inetPort) {

  2.        return bind(new InetSocketAddress(inetPort));

  3.    }

  4.    public ChannelFuture bind(SocketAddress localAddress) {

  5.        validate();

  6.        if (localAddress == null) {

  7.            throw new NullPointerException("localAddress");

  8.        }

  9.        return doBind(localAddress);

  10.    }    

首先调用 validate() 方法进行参数校验,然后调用 doBind() 方法:

 
   
   
 
  1.    private ChannelFuture doBind(final SocketAddress localAddress) {

  2.        // 初始化并注册一个Channel

  3.        final ChannelFuture regFuture = initAndRegister();

  4.        final Channel channel = regFuture.channel();

  5.        if (regFuture.cause() != null) {

  6.            return regFuture;

  7.        }

  8.        // 注册成功

  9.        if (regFuture.isDone()) {

  10.            // At this point we know that the registration was complete and successful.

  11.            ChannelPromise promise = channel.newPromise();

  12.            // 调用doBind0绑定

  13.            doBind0(regFuture, channel, localAddress, promise);

  14.            return promise;

  15.        } else {

  16.            // Registration future is almost always fulfilled already, but just in case it's not.

  17.            final AbstractBootstrap.PendingRegistrationPromise promise = new AbstractBootstrap.PendingRegistrationPromise(channel);

  18.            regFuture.addListener(new ChannelFutureListener() {

  19.                @Override

  20.                public void operationComplete(ChannelFuture future) throws Exception {

  21.                    Throwable cause = future.cause();

  22.                    if (cause != null) {

  23.                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an

  24.                        // IllegalStateException once we try to access the EventLoop of the Channel.

  25.                        promise.setFailure(cause);

  26.                    } else {

  27.                        // Registration was successful, so set the correct executor to use.

  28.                        // See https://github.com/netty/netty/issues/2586

  29.                        promise.registered();

  30.                        doBind0(regFuture, channel, localAddress, promise);

  31.                    }

  32.                }

  33.            });

  34.            return promise;

  35.        }

  36.    }

该方法涉及内容较多,我们分解来看,如下:

  1. 首先通过 initAndRegister() 得到一个 ChannelFuture 对象 regFuture;

  2. 根据得到的 regFuture 对象判断该对象是否抛出异常 ( regFuture.cause()),如果是,直接返回;

  3. 根据 regFuture.isDone()判断 initAndRegister()是否执行完毕,如果执行完成,则调用 doBind0

  4. 若 initAndRegister() 没有执行完毕,则向 regFuture 对象添加一个 ChannelFutureListener 监听,当 initAndRegister() 执行完毕后会调用 operationComplete(),在 operationComplete() 中依然会判断 ChannelFuture 是否抛出异常,如果没有则调用 doBind0进行绑定。

按照上面的步骤我们一步一步来剖析 doBind() 方法。

initAndRegister()

执行 initAndRegister() 会得到一个 ChannelFuture 对象 regFuture,代码如下:

 
   
   
 
  1.    final ChannelFuture initAndRegister() {

  2.        Channel channel = null;

  3.        try {

  4.            // 新建一个Channel

  5.            channel = channelFactory.newChannel();

  6.            // 初始化Channel

  7.            init(channel);

  8.        } catch (Throwable t) {

  9.            if (channel != null) {

  10.                channel.unsafe().closeForcibly();

  11.            }

  12.            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);

  13.        }

  14.        // /向EventLoopGroup中注册一个channel

  15.        ChannelFuture regFuture = config().group().register(channel);

  16.        if (regFuture.cause() != null) {

  17.            if (channel.isRegistered()) {

  18.                channel.close();

  19.            } else {

  20.                channel.unsafe().closeForcibly();

  21.            }

  22.        }

  23.        return regFuture;

  24.    }

首先调用 newChannel() 新建一个Channel,这里是NioServerSocketChannel,还记前面 4、设置并绑定服务端Channel( .channel(NioServerSocketChannel.class))中 设置的Channel工厂类么?在这里派上用处了。在上面提到了通过反射的机制我们可以得到一个 NioServerSocketChannel 类的实例。那么 NioServerSocketChannel 到底是一个什么东西呢?如下图:

上图是 NioServerSocketChannel 的继承体系结构图, NioServerSocketChannel 在构造函数中会依靠父类来完成一项一项的初始化工作。先看 NioServerSocketChannel 构造函数。

 
   
   
 
  1.    public NioServerSocketChannel() {

  2.        this(newSocket(DEFAULT_SELECTOR_PROVIDER));

  3.    }

newSocket() 方法较为简单,它是利用 SelectorProvider.openServerSocketChannel(),产生一个 ServerSocketChannel 对象。

 
   
   
 
  1.    public NioServerSocketChannel(ServerSocketChannel channel) {

  2.        super(null, channel, SelectionKey.OP_ACCEPT);

  3.        config = new NioServerSocketChannelConfig(this, javaChannel().socket());

  4.    }

该构造函数首先是调用父类的构造方法,然后设置 config属性。父类构造方法如下:

 
   
   
 
  1.    // AbstractNioMessageChannel

  2.    protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {

  3.        super(parent, ch, readInterestOp);

  4.    }

  5.    // AbstractNioChannel

  6.    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {

  7.        super(parent);

  8.        this.ch = ch;

  9.        this.readInterestOp = readInterestOp;

  10.        try {

  11.            ch.configureBlocking(false);

  12.        } catch (IOException e) {

  13.            try {

  14.                ch.close();

  15.            } catch (IOException e2) {

  16.                if (logger.isWarnEnabled()) {

  17.                    logger.warn(

  18.                            "Failed to close a partially initialized socket.", e2);

  19.                }

  20.            }

  21.            throw new ChannelException("Failed to enter non-blocking mode.", e);

  22.        }

  23.    }

  24.    // AbstractChannel

  25.    protected AbstractChannel(Channel parent) {

  26.        this.parent = parent;

  27.        id = newId();

  28.        unsafe = newUnsafe();

  29.        pipeline = newChannelPipeline();

  30.    }

通过 super() ,一层一层往上,直到 AbstractChannel。我们从最上层解析。

  • AbstractChannel 设置了 unsafe ( unsafe=newUnsafe())和 pipeline( pipeline=newChannelPipeline());

  • AbstractNioChannel 将当前 ServerSocketChannel 设置成了非阻塞( ch.configureBlocking(false);),同时设置SelectionKey.OP_ACCEPT事件( this.readInterestOp=readInterestOp; readInterestOp 值由 NioServerSocketChannel 中传递);

  • NioServerSocketChannel 设置 config属性( config=newNioServerSocketChannelConfig(this,javaChannel().socket()))。

所以 channel=channelFactory.newChannel() 通过反射机制产生了 NioServerSocketChannel 类实例。同时该实例设置了NioMessageUnsafe、DefaultChannelPipeline、非阻塞、SelectionKey.OP_ACCEPT事件 和 NioServerSocketChannelConfig 属性。

看完了 channelFactory.newChannel();,我们再看 init()

 
   
   
 
  1.    void init(Channel channel) throws Exception {

  2.         // 设置配置的option参数

  3.        final Map<ChannelOption<?>, Object> options = options0();

  4.        synchronized (options) {

  5.            channel.config().setOptions(options);

  6.        }

  7.        final Map<AttributeKey<?>, Object> attrs = attrs0();

  8.        synchronized (attrs) {

  9.            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {

  10.                @SuppressWarnings("unchecked")

  11.                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();

  12.                channel.attr(key).set(e.getValue());

  13.            }

  14.        }

  15.        // 获取绑定的pipeline

  16.        ChannelPipeline p = channel.pipeline();

  17.        // 准备child用到的4个part

  18.        final EventLoopGroup currentChildGroup = childGroup;

  19.        final ChannelHandler currentChildHandler = childHandler;

  20.        final Entry<ChannelOption<?>, Object>[] currentChildOptions;

  21.        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;

  22.        synchronized (childOptions) {

  23.            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));

  24.        }

  25.        synchronized (childAttrs) {

  26.            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));

  27.        }

  28.        // 为NioServerSocketChannel的pipeline添加一个初始化Handler,

  29.        // 当NioServerSocketChannel在EventLoop注册成功时,该handler的init方法将被调用

  30.        p.addLast(new ChannelInitializer<Channel>() {

  31.            @Override

  32.            public void initChannel(Channel ch) throws Exception {

  33.                final ChannelPipeline pipeline = ch.pipeline();

  34.                ChannelHandler handler = config.handler();

  35.                //如果用户配置过Handler

  36.                if (handler != null) {

  37.                    pipeline.addLast(handler);

  38.                }

  39.                ch.eventLoop().execute(new Runnable() {

  40.                    @Override

  41.                    public void run() {

  42.                        // 为NioServerSocketChannel的pipeline添加ServerBootstrapAcceptor处理器

  43.                        // 该Handler主要用来将新创建的NioSocketChannel注册到EventLoopGroup中

  44.                        pipeline.addLast(new ServerBootstrapAcceptor(

  45.                                currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));

  46.                    }

  47.                });

  48.            }

  49.        });

  50.    }

其实整个过程可以分为三个步骤:

  1. 设置 Channel 的 option 和 attr;

  2. 获取绑定的 pipeline,然后为 NioServerSocketChanne l绑定的 pipeline 添加 Handler;

  3. 将用于服务端注册的 Handler ServerBootstrapAcceptor 添加到 ChannelPipeline 中。ServerBootstrapAcceptor 为一个接入器,专门接受新请求,把新的请求扔给某个事件循环器。

至此初始化部分已经结束,我们再看注册部分,

 
   
   
 
  1.        // /向EventLoopGroup中注册一个channel

  2.        ChannelFuture regFuture = config().group().register(channel);

注册方法的调用位于 initAndRegister() 方法中。注意这里的 group() 返回的是前面的 boss NioEvenLoopGroup,它继承 MultithreadEventLoopGroup,调用的 register(),也是 MultithreadEventLoopGroup 中的。如下:

 
   
   
 
  1.    public ChannelFuture register(Channel channel) {

  2.        return next().register(channel);

  3.    }

调用 next() 方法从 EventLoopGroup 中获取下一个 EventLoop,调用 register() 方法注册:

 
   
   
 
  1.    public ChannelFuture register(Channel channel) {

  2.        return register(new DefaultChannelPromise(channel, this));

  3.    }

将Channel和EventLoop封装成一个DefaultChannelPromise对象,然后调用register()方法。DefaultChannelPromis为ChannelPromise的默认实现,而ChannelPromisee继承Future,具备异步执行结构,绑定Channel,所以又具备了监听的能力,故而ChannelPromis是Netty异步执行的核心接口。

 
   
   
 
  1.    public ChannelFuture register(ChannelPromise promise) {

  2.        ObjectUtil.checkNotNull(promise, "promise");

  3.        promise.channel().unsafe().register(this, promise);

  4.        return promise;

  5.    }

首先获取 channel 的 unsafe 对象,该 unsafe 对象就是在之前设置过得。然后调用 register() 方法,如下:

 
   
   
 
  1.        public final void register(EventLoop eventLoop, final ChannelPromise promise) {

  2.            if (eventLoop == null) {

  3.                throw new NullPointerException("eventLoop");

  4.            }

  5.            if (isRegistered()) {

  6.                promise.setFailure(new IllegalStateException("registered to an event loop already"));

  7.                return;

  8.            }

  9.            if (!isCompatible(eventLoop)) {

  10.                promise.setFailure(

  11.                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));

  12.                return;

  13.            }

  14.            AbstractChannel.this.eventLoop = eventLoop;

  15.            // 必须要保证注册是由该EventLoop发起的

  16.            if (eventLoop.inEventLoop()) {

  17.                register0(promise);        // 注册

  18.            } else {

  19.                // 如果不是单独封装成一个task异步执行

  20.                try {

  21.                    eventLoop.execute(new Runnable() {

  22.                        @Override

  23.                        public void run() {

  24.                            register0(promise);

  25.                        }

  26.                    });

  27.                } catch (Throwable t) {

  28.                    logger.warn(

  29.                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",

  30.                            AbstractChannel.this, t);

  31.                    closeForcibly();

  32.                    closeFuture.setClosed();

  33.                    safeSetFailure(promise, t);

  34.                }

  35.            }

  36.        }

过程如下:

  1. 首先通过 isRegistered() 判断该 Channel 是否已经注册到 EventLoop 中;

  2. 通过 eventLoop.inEventLoop() 来判断当前线程是否为该 EventLoop 自身发起的,如果是,则调用 register0() 直接注册;

  3. 如果不是,说明该 EventLoop 中的线程此时没有执行权,则需要新建一个线程,单独封装一个 Task,而该 Task 的主要任务则是执行 register0()

无论当前 EventLoop 的线程是否拥有执行权,最终都会要执行 register0(),如下:

 
   
   
 
  1.        private void register0(ChannelPromise promise) {

  2.            try {

  3.                // 确保 Channel 处于 open

  4.                if (!promise.setUncancellable() || !ensureOpen(promise)) {

  5.                    return;

  6.                }

  7.                boolean firstRegistration = neverRegistered;

  8.                // 真正的注册动作

  9.                doRegister();

  10.                neverRegistered = false;

  11.                registered = true;        

  12.                pipeline.invokeHandlerAddedIfNeeded();    

  13.                safeSetSuccess(promise);        //设置注册结果为成功

  14.                pipeline.fireChannelRegistered();

  15.                if (isActive()) {

  16.                    //如果是首次注册,发起 pipeline 的 fireChannelActive

  17.                    if (firstRegistration) {

  18.                        pipeline.fireChannelActive();

  19.                    } else if (config().isAutoRead()) {

  20.                        beginRead();

  21.                    }

  22.                }

  23.            } catch (Throwable t) {

  24.                closeForcibly();

  25.                closeFuture.setClosed();

  26.                safeSetFailure(promise, t);

  27.            }

  28.        }

如果 Channel 处于 open 状态,则调用 doRegister() 方法完成注册,然后将注册结果设置为成功。最后判断如果是首次注册且处于激活状态,则发起 pipeline 的 fireChannelActive()

 
   
   
 
  1.    protected void doRegister() throws Exception {

  2.        boolean selected = false;

  3.        for (;;) {

  4.            try {

  5.                // 注册到NIOEventLoop的Selector上

  6.                selectionKey = javaChannel().register(eventLoop().selector, 0, this);

  7.                return;

  8.            } catch (CancelledKeyException e) {

  9.                if (!selected) {

  10.                    eventLoop().selectNow();

  11.                    selected = true;

  12.                } else {

  13.                    throw e;

  14.                }

  15.            }

  16.        }

  17.    }

这里注册时 ops 设置的是 0,也就是说 ServerSocketChannel 仅仅只是表示了注册成功,还不能监听任何网络操作,这样做的目的是(摘自《Netty权威指南(第二版)》):

  1. 注册方式是多态的,它既可以被 NIOServerSocketChannel 用来监听客户端的连接接入,也可以注册 SocketChannel 用来监听网络读或者写操作。

  2. 通过 SelectionKey.interestOps(intops) 方法可以方便地修改监听操作位。所以,此处注册需要获取 SelectionKey 并给 AbstractNIOChannel 的成员变量 selectionKey 赋值。

由于这里 ops 设置为 0,所以还不能监听读写事件。调用 doRegister()后,然后调用 pipeline.invokeHandlerAddedIfNeeded();,这个时候控制台会出现 loggin-handlerAdded,内部如何调用,我们在剖析 pipeline 时再做详细分析。然后将注册结果设置为成功( safeSetSuccess(promise))。调用 pipeline.fireChannelRegistered(); 这个时候控制台会打印 loggin-channelRegistered。这里简单分析下该方法。

 
   
   
 
  1.    public final ChannelPipeline fireChannelRegistered() {

  2.        AbstractChannelHandlerContext.invokeChannelRegistered(head);

  3.        return this;

  4.    }

  5.    static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {

  6.        EventExecutor executor = next.executor();

  7.        if (executor.inEventLoop()) {

  8.            next.invokeChannelRegistered();

  9.        } else {

  10.            executor.execute(new Runnable() {

  11.                @Override

  12.                public void run() {

  13.                    next.invokeChannelRegistered();

  14.                }

  15.            });

  16.        }

  17.    }

pipeline 维护着 handle 链表,事件会在 NioServerSocketChannel 的 pipeline 中传播。最终都会调用 next.invokeChannelRegistered(),如下:

 
   
   
 
  1.    private void invokeChannelRegistered() {

  2.        if (invokeHandler()) {

  3.            try {

  4.                ((ChannelInboundHandler) handler()).channelRegistered(this);

  5.            } catch (Throwable t) {

  6.                notifyHandlerException(t);

  7.            }

  8.        } else {

  9.            fireChannelRegistered();

  10.        }

  11.    }

invokeChannelRegistered() 会调用我们在前面设置的 handler (还记得签名的 handler(newLoggingServerHandler() )么)的 channelRegistered(),这个时候控制台应该会打印 loggin-channelRegistered

到这里 initAndRegister()(finalChannelFutureregFuture=initAndRegister();)就分析完毕了,该方法主要做如下三件事:

  1. 通过反射产生了一个 NioServerSocketChannle 对象;

  2. 调用 init(channel)完成初始化工作;

  3. 将NioServerSocketChannel进行了注册。

initAndRegister()篇幅较长,分析完毕了,我们再返回到 doBind(finalSocketAddresslocalAddress)。在 doBind(finalSocketAddresslocalAddress) 中如果 initAndRegister()执行完成,则 regFuture.isDone() 则为 true,执行 doBind0()。如果没有执行完成,则会注册一个监听 ChannelFutureListener,当 initAndRegister() 完成后,会调用该监听的 operationComplete()方法,最终目的还是执行 doBind0()。故而我们下面分析 doBind0()到底做了些什么。源码如下:

 
   
   
 
  1.    private static void doBind0(

  2.            final ChannelFuture regFuture, final Channel channel,

  3.            final SocketAddress localAddress, final ChannelPromise promise) {

  4.        channel.eventLoop().execute(new Runnable() {

  5.            @Override

  6.            public void run() {

  7.                if (regFuture.isSuccess()) {

  8.                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);

  9.                } else {

  10.                    promise.setFailure(regFuture.cause());

  11.                }

  12.            }

  13.        });

  14.    }

doBind0() 较为简单,首先new 一个线程 task,然后将该任务提交到 NioEventLoop 中进行处理,我们先看 execute()

 
   
   
 
  1.  public void execute(Runnable task) {

  2.        if (task == null) {

  3.            throw new NullPointerException("task");

  4.        }

  5.        boolean inEventLoop = inEventLoop();

  6.        if (inEventLoop) {

  7.            addTask(task);

  8.        } else {

  9.            startThread();

  10.            addTask(task);

  11.            if (isShutdown() && removeTask(task)) {

  12.                reject();

  13.            }

  14.        }

  15.        if (!addTaskWakesUp && wakesUpForTask(task)) {

  16.            wakeup(inEventLoop);

  17.        }

  18.    }

调用 inEventLoop() 判断当前线程是否为该 NioEventLoop 所关联的线程,如果是,则调用 addTask()将任务 task 添加到队列中,如果不是,则先启动线程,在调用 addTask() 将任务 task 添加到队列中。 addTask() 如下:

 
   
   
 
  1.    protected void addTask(Runnable task) {

  2.        if (task == null) {

  3.            throw new NullPointerException("task");

  4.        }

  5.        if (!offerTask(task)) {

  6.            reject(task);

  7.        }

  8.    }

offerTask()添加到队列中:

 
   
   
 
  1.    final boolean offerTask(Runnable task) {

  2.        if (isShutdown()) {

  3.            reject();

  4.        }

  5.        return taskQueue.offer(task);

  6.    }

task 添加到任务队列 taskQueue成功后,执行任务会调用如下方法:

 
   
   
 
  1. channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);

channel 首先调用 bind() 完成 channel 与端口的绑定,如下:

 
   
   
 
  1.    public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {

  2.        return pipeline.bind(localAddress, promise);

  3.    }

  4.    public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {

  5.        return tail.bind(localAddress, promise);

  6.    }

tail 在 DefaultChannelPipeline 中定义: finalAbstractChannelHandlerContexttail; 有 tail 就会有 head ,在 DefaultChannelPipeline 中维护这一个 AbstractChannelHandlerContext 节点的双向链表,该链表是实现 Pipeline 机制的关键,更多详情会在 ChannelPipeline 中做详细说明。 bind() 最终会调用 DefaultChannelPipeline 的 bind() 方法。如下:

 
   
   
 
  1.    public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {

  2.        if (localAddress == null) {

  3.            throw new NullPointerException("localAddress");

  4.        }

  5.        if (!validatePromise(promise, false)) {

  6.            // cancelled

  7.            return promise;

  8.        }

  9.        final AbstractChannelHandlerContext next = findContextOutbound();

  10.        EventExecutor executor = next.executor();

  11.        if (executor.inEventLoop()) {

  12.            next.invokeBind(localAddress, promise);

  13.        } else {

  14.            safeExecute(executor, new Runnable() {

  15.                @Override

  16.                public void run() {

  17.                    next.invokeBind(localAddress, promise);

  18.                }

  19.            }, promise, null);

  20.        }

  21.        return promise;

  22.    }

首先对 localAddress 、 promise 进行校验,符合规范则调用 findContextOutbound() ,该方法用于在 pipeline 中获取 AbstractChannelHandlerContext 双向链表中的一个节点,如下:

 
   
   
 
  1.    private AbstractChannelHandlerContext findContextOutbound() {

  2.        AbstractChannelHandlerContext ctx = this;

  3.        do {

  4.            ctx = ctx.prev;

  5.        } while (!ctx.outbound);

  6.        return ctx;

  7.    }

从该方法可以看出,所获取的节点是从 tail 开始遍历,获取第一个节点属性 outbound 为 true 的节点。其实该节点是 AbstractChannelHandlerContext 双向链表的 head 节点。获取该节点后,调用 invokeBind(),如下:

 
   
   
 
  1.    private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {

  2.        if (invokeHandler()) {

  3.            try {

  4.                ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);

  5.            } catch (Throwable t) {

  6.                notifyOutboundHandlerException(t, promise);

  7.            }

  8.        } else {

  9.            bind(localAddress, promise);

  10.        }

  11.    }

handler() 返回的是 HeadContext 对象,然后调用其 bind(),如下:

 
   
   
 
  1.        public void bind(

  2.                ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)

  3.                throws Exception {

  4.            unsafe.bind(localAddress, promise);

  5.        }

unsafe 定义在 HeadContext 中,在构造函数中初始化( unsafe=pipeline.channel().unsafe();),调用 bind() 如下:

 
   
   
 
  1.        public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {

  2.            assertEventLoop();

  3.            if (!promise.setUncancellable() || !ensureOpen(promise)) {

  4.                return;

  5.            }

  6.            if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&

  7.                localAddress instanceof InetSocketAddress &&

  8.                !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&

  9.                !PlatformDependent.isWindows() && !PlatformDependent.isRoot()) {

  10.                logger.warn(

  11.                        "A non-root user can't receive a broadcast packet if the socket " +

  12.                        "is not bound to a wildcard address; binding to a non-wildcard " +

  13.                        "address (" + localAddress + ") anyway as requested.");

  14.            }

  15.            boolean wasActive = isActive();

  16.            try {

  17.                // 最核心方法

  18.                doBind(localAddress);

  19.            } catch (Throwable t) {

  20.                safeSetFailure(promise, t);

  21.                closeIfClosed();

  22.                return;

  23.            }

  24.            if (!wasActive && isActive()) {

  25.                invokeLater(new Runnable() {

  26.                    @Override

  27.                    public void run() {

  28.                        pipeline.fireChannelActive();

  29.                    }

  30.                });

  31.            }

  32.            safeSetSuccess(promise);

  33.        }

内部调用 doBind() ,该方法为绑定中最核心的方法,位于 NioServerSocketChannel 中,如下:

 
   
   
 
  1.    protected void doBind(SocketAddress localAddress) throws Exception {

  2.        if (PlatformDependent.javaVersion() >= 7) {

  3.            javaChannel().bind(localAddress, config.getBacklog());

  4.        } else {

  5.            javaChannel().socket().bind(localAddress, config.getBacklog());

  6.        }

  7.    }

javaChannel()返回的是 NioServerSocketChannel 实例初始化时所产生的 Java NIO ServerSocketChannel 实例(ServerSocketChannelImple实例),然后调用其 bind(),如下:

 
   
   
 
  1.    public ServerSocketChannel bind(SocketAddress var1, int var2) throws IOException {

  2.        Object var3 = this.lock;

  3.        synchronized(this.lock) {

  4.            if(!this.isOpen()) {

  5.                throw new ClosedChannelException();

  6.            } else if(this.isBound()) {

  7.                throw new AlreadyBoundException();

  8.            } else {

  9.                InetSocketAddress var4 = var1 == null?new InetSocketAddress(0):Net.checkAddress(var1);

  10.                SecurityManager var5 = System.getSecurityManager();

  11.                if(var5 != null) {

  12.                    var5.checkListen(var4.getPort());

  13.                }

  14.                NetHooks.beforeTcpBind(this.fd, var4.getAddress(), var4.getPort());

  15.                Net.bind(this.fd, var4.getAddress(), var4.getPort());

  16.                Net.listen(this.fd, var2 < 1?50:var2);

  17.                Object var6 = this.stateLock;

  18.                synchronized(this.stateLock) {

  19.                    this.localAddress = Net.localAddress(this.fd);

  20.                }

  21.                return this;

  22.            }

  23.        }

  24.    }

该方法属于 Java NIO 层次的,该方法涉及到服务端端口的绑定,端口的监听,这些内容在后续的 Channel 时做详细介绍。

到这里就真正完成了服务端端口的绑定。

这篇博客比较长,大体上从源码层次稍微解读了 Netty 服务端的启动过程,当中涉及到 Netty 的各个核心组件,只能笼统来描述服务端的启动过程,具体的细节部分还需要后续做详细分析,而且其中有多个点还是懵懵懂懂,相信在后面对 Netty 的分析过程会一一解答。

谢谢阅读,祝好!!!

参考资料

  • 《Netty权威指南(第二版)》

  • 《Netty IN ACTION》

  • Netty源码分析:服务端启动全过程(篇幅很长)


以上是关于死磕Netty-----服务端启动过程分析的主要内容,如果未能解决你的问题,请参考以下文章

[Netty源码] 服务端启动过程

netty服务端启动--ServerBootstrap源码解析

Netty源码分析之服务端启动

说说Netty服务端启动流程

netty源码分析之服务端启动全

Netty Server端启动分析EventLoopGroup和 EventLoop分析