netty源码之启动流程

Posted better_hui

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了netty源码之启动流程相关的知识,希望对你有一定的参考价值。

一、前言

 

our thread 就是我们的主线程, boss thread 就是netty的 bossGroup线程 , 也就是reactor的主线程

二、启动

启动的本质

启动的实质是什么?实际上我们只需要找到java nio 的那些代码就可以了

1、打开一个selector

2、打开一个channel

3、绑定一个端口

4、将channel 注册到selector 并标明感兴趣的事件

netty启动三部曲

1、实例化引导程序 ServerBootstrap

2、配置引导程序参数 : group / channel / option / childOption / handler / childHandler

3、绑定

关键步骤

 

selector创建

查看NioEventLoopGroup通过创建一个子的NioEventLoopGroup,然后子的NioEventLoopGroupNioEventLoop,并完成selector的创建

serverSocketChannel创建

AbstractBootstrap.initAndRegister

channelFactory.newChannel()

init(channel) , 初始化channel , 初始化options / handlers

config.group.register(channel) 请注意 这里并不是真正的绑定,我们暂且叫它预绑定 , 因为他注册了0事件

绑定与注册监听

NioserverSocketChannel
 protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            this.javaChannel().bind(localAddress, this.config.getBacklog());
        } else {
            this.javaChannel().socket().bind(localAddress, this.config.getBacklog());
        }
​
    }

三、代码跟踪

private ChannelFuture doBind(final SocketAddress localAddress) {
        // channel注册、初始化、绑定到EventLoopGroup上
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }
​
        // regFuture是异步的,到此处不一定会完成
        if (regFuture.isDone()) {//注册成功获取回调
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            // 完成对端口绑定
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            // 如果没有完成注册操作会将注册任务封装成一个task,加入到regFuture的Listener上
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();
​
                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }
​

initAndRegister方法

1、initAndRegister中的init方法

@Override
    void init(Channel channel) {
        // 初始化option
        setChannelOptions(channel, newOptionsArray(), logger);
        // 初始化属性
        setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
​
        // 构建一个pipeline
        ChannelPipeline p = channel.pipeline();
​
        final EventLoopGroup currentChildGroup = childGroup;// 全局变量childGroup改名为currentChildGroup
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
        }
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
​
        // ChannelInitializer完成将我们需要的handler加入到pipeline中
        // 当将所有的handler加入到pipeline完成以后,ChannelInitializer结束,从pipeline中移除
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }
​
                // new ServerBootstrapAcceptor()对象时将currentChildGroup作为参数传入
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

2、initAndRegister中的register()

next方法我们之前已经讲过了两种选择器

跟进register()的代码

@Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            ObjectUtil.checkNotNull(eventLoop, "eventLoop");
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }
​
            AbstractChannel.this.eventLoop = eventLoop;
​
            // 判断当前线程是否是NioEvenetLoop中的线程
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    // 如果当前线程不在NioEvenetLoop中,将register0任务丢到eventLoop去执行
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }

继续跟进register0方法

@Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                // 使用java的jdk的Nio变成,将channel注册到selector上
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }

doBind0方法实现

doBind0中也是讲bind任务放到eventLoop中

跟进bind源码,他会让pipeline去实现bind方法,我们直接进入pipelinde的head

@Override
        public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
            assertEventLoop();
​
            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }
​
            // See: https://github.com/netty/netty/issues/576
            if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
                localAddress instanceof InetSocketAddress &&
                !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
                !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
                // Warn a user about the fact that a non-root user can't receive a
                // broadcast packet on *nix if the socket is bound on non-wildcard address.
                logger.warn(
                        "A non-root user can't receive a broadcast packet if the socket " +
                        "is not bound to a wildcard address; binding to a non-wildcard " +
                        "address (" + localAddress + ") anyway as requested.");
            }
​
            // 查看当前channel是否已经活跃
            boolean wasActive = isActive();
            try {
                // 完成bind
                doBind(localAddress);
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                closeIfClosed();
                return;
            }
​
            // bind完成以后完成激活channel
            if (!wasActive && isActive()) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        // 激活channel 
                        pipeline.fireChannelActive();
                    }
                });
            }
​
            safeSetSuccess(promise);
        }
​

1、jdk的bind方法

2、激活channel

不断跟进readIfIsAutoRead(),发现使用的selectKey的ops操作。

以上是关于netty源码之启动流程的主要内容,如果未能解决你的问题,请参考以下文章

Netty源码分析之NioEventLoop执行流程

Netty源码之——ChannelPipeline

Netty核心源码分析

Spark源码分析之SparkSubmit的流程

netty源码之读取数据

netty源码之读取数据