Netty服务端是如何一步一步启动的?

Posted 占小狼的博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty服务端是如何一步一步启动的?相关的知识,希望对你有一定的参考价值。

占小狼 转载请注明原创出处,谢谢!

Netty系列文章

1、

本文主要分析Netty服务端的启动过程

Netty是基于Nio实现的,所以也离不开selector、serverSocketChannel、socketChannel和selectKey等,只不过Netty把这些实现都封装在了底层。

从上篇文章的示例代码中可以看出,服务端逻辑的一切从ServerBootstrap开始。

ServerBootstrap实例中需要两个NioEventLoopGroup实例,按照职责划分成boss和work,有着不同的分工: 

1、boss负责请求的accept 

2、work负责请求的read、write

NioEventLoopGroup

NioEventLoopGroup主要管理eventLoop的生命周期。 eventLoop是什么?姑且把它看成是内部的一个处理线程,数量默认是处理器个数的两倍。

Netty服务端是如何一步一步启动的?

NioEventLoopGroup构造方法:

 
   
   
 
  1. public NioEventLoopGroup() {  

  2.    this(0);  

  3. }  

  4.   

  5. public NioEventLoopGroup(int nThreads) {  

  6.    this(nThreads, null); 

  7. }  

  8.   

  9. public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {  

  10.    this(nThreads, threadFactory, SelectorProvider.provider());  

  11. }  

  12.   

  13. public NioEventLoopGroup(  

  14.             int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {  

  15.    super(nThreads, threadFactory, selectorProvider);  

  16. }  

MultithreadEventLoopGroup是NioEventLoopGroup的父类,构造方法:

 
   
   
 
  1. protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {  

  2.    super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);  

  3. }  

其中 DEFAULTEVENTLOOP_THREADS 为处理器数量的两倍。

MultithreadEventExecutorGroup是核心,管理eventLoop的生命周期,先看看其中几个变量。 

1、children:EventExecutor数组,保存eventLoop。 

2、chooser:从children中选取一个eventLoop的策略。

构造方法:

 
   
   
 
  1. protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {

  2.    if (nThreads <= 0) {

  3.        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));

  4.    }

  5.    if (threadFactory == null) {

  6.        threadFactory = newDefaultThreadFactory();

  7.    }

  8.    children = new SingleThreadEventExecutor[nThreads];

  9.    if (isPowerOfTwo(children.length)) {

  10.        chooser = new PowerOfTwoEventExecutorChooser();

  11.    } else {

  12.        chooser = new GenericEventExecutorChooser();

  13.    }

  14.    for (int i = 0; i < nThreads; i ++) {

  15.        boolean success = false;

  16.        try {

  17.            children[i] = newChild(threadFactory, args);

  18.            success = true;

  19.        } catch (Exception e) {

  20.            // TODO: Think about if this is a good exception type

  21.            throw new IllegalStateException("failed to create a child event loop", e);

  22.        } finally {

  23.            if (!success) {

  24.                for (int j = 0; j < i; j ++) {

  25.                    children[j].shutdownGracefully();

  26.                }

  27.                for (int j = 0; j < i; j ++) {

  28.                    EventExecutor e = children[j];

  29.                    try {

  30.                        while (!e.isTerminated()) {

  31.                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);

  32.                        }

  33.                    } catch (InterruptedException interrupted) {

  34.                        Thread.currentThread().interrupt();

  35.                        break;

  36.                    }

  37.                }

  38.            }

  39.        }

  40.    }

  41.    final FutureListener<Object> terminationListener = new FutureListener<Object>() {

  42.        @Override

  43.        public void operationComplete(Future<Object> future) throws Exception {

  44.            if (terminatedChildren.incrementAndGet() == children.length) {

  45.                terminationFuture.setSuccess(null);

  46.            }

  47.        }

  48.    };

  49.    for (EventExecutor e: children) {

  50.        e.terminationFuture().addListener(terminationListener);

  51.    }

  52. }

  53.  protected EventExecutor newChild(  

  54.             ThreadFactory threadFactory, Object... args) throws Exception {  

  55.       return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);  

  56. }  

1、根据数组的大小,采用不同策略初始化chooser,如果大小为2的幂次方,则采用PowerOfTwoEventExecutorChooser,否则使用GenericEventExecutorChooser。

其中判断一个数是否是2的幂次方的方法,觉得很赞。

 
   
   
 
  1. private static boolean isPowerOfTwo(int val) {

  2.      return (val & -val) == val;

  3. }

2、newChild方法重载,初始化EventExecutor时,实际执行的是NioEventLoopGroup中的newChild方法,所以children元素的实际类型为NioEventLoop。

接下去看看NioEventLoop类。

NioEventLoop

每个eventLoop会维护一个selector和taskQueue,负责处理客户端请求和内部任务,如ServerSocketChannel注册和ServerSocket绑定等。

Netty服务端是如何一步一步启动的?

NioEventLoop构造方法:

 
   
   
 
  1.  NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {  

  2.       super(parent, threadFactory, false);  

  3.       if (selectorProvider == null) {  

  4.           throw new NullPointerException("selectorProvider");  

  5.       }  

  6.       provider = selectorProvider;  

  7.       selector = openSelector();  

  8. }  

当看到 selector = openSelector() 时,有没有觉得亲切了许多,这里先不管 selector,看看SingleThreadEventLoop类。

SingleThreadEventLoop是NioEventLoop的父类,构造方法:

 
   
   
 
  1. protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {

  2.    super(parent, threadFactory, addTaskWakesUp);

  3. }

啥事都没做...

继续看SingleThreadEventLoop的父类SingleThreadEventExecutor

从类名上可以看出,这是一个只有一个线程的线程池, 先看看其中的几个变量: 1、state:线程池当前的状态; 2、taskQueue:存放任务的队列; 3、thread:线程池维护的唯一线程; 4、scheduledTaskQueue:定义在其父类AbstractScheduledEventExecutor中,用以保存延迟执行的任务。 

... 构造方法:

 
   
   
 
  1. protected SingleThreadEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {

  2.    if (threadFactory == null) {

  3.        throw new NullPointerException("threadFactory");

  4.    }

  5.    this.parent = parent;

  6.    this.addTaskWakesUp = addTaskWakesUp;

  7.    thread = threadFactory.newThread(new Runnable() {

  8.        @Override

  9.        public void run() {

  10.            boolean success = false;

  11.            updateLastExecutionTime();

  12.            try {

  13.                SingleThreadEventExecutor.this.run();

  14.                success = true;

  15.            } catch (Throwable t) {

  16.                logger.warn("Unexpected exception from an event executor: ", t);

  17.            } finally {

  18.                for (;;) {

  19.                    int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);

  20.                    if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(

  21.                            SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {

  22.                        break;

  23.                    }

  24.                }

  25.                // Check if confirmShutdown() was called at the end of the loop.

  26.                if (success && gracefulShutdownStartTime == 0) {

  27.                    logger.error(

  28.                            "Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +

  29.                            SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +

  30.                            "before run() implementation terminates.");

  31.                }

  32.                try {

  33.                    // Run all remaining tasks and shutdown hooks.

  34.                    for (;;) {

  35.                        if (confirmShutdown()) {

  36.                            break;

  37.                        }

  38.                    }

  39.                } finally {

  40.                    try {

  41.                        cleanup();

  42.                    } finally {

  43.                        STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);

  44.                        threadLock.release();

  45.                        if (!taskQueue.isEmpty()) {

  46.                            logger.warn(

  47.                                    "An event executor terminated with " +

  48.                                    "non-empty task queue (" + taskQueue.size() + ')');

  49.                        }

  50.                        terminationFuture.setSuccess(null);

  51.                    }

  52.                }

  53.            }

  54.        }

  55.    });

  56.    threadProperties = new DefaultThreadProperties(thread);

  57.    taskQueue = newTaskQueue();

  58. }

代码很长,内容很简单: 

1、初始化一个线程,并在线程内部执行NioEventLoop类的run方法,当然这个线程不会立刻执行。 

2、使用LinkedBlockingQueue类初始化taskQueue。

到目前为止,相关的处理线程已经初始化完成。

ServerBootstrap

通过serverBootstrap.bind(port)启动服务,过程如下:

 
   
   
 
  1. /**

  2. * Create a new {@link Channel} and bind it.

  3. */

  4. public ChannelFuture bind() {

  5.    validate();

  6.    SocketAddress localAddress = this.localAddress;

  7.    if (localAddress == null) {

  8.       throw new IllegalStateException("localAddress not set");

  9.    }

  10.    return doBind(localAddress);

  11. }

Netty服务端是如何一步一步启动的?

doBind实现如下

 
   
   
 
  1. private ChannelFuture doBind(final SocketAddress localAddress) {

  2.    final ChannelFuture regFuture = initAndRegister();

  3.    final Channel channel = regFuture.channel();

  4.    if (regFuture.cause() != null) {

  5.        return regFuture;

  6.    }

  7.    if (regFuture.isDone()) {

  8.        // At this point we know that the registration was complete and successful.

  9.        ChannelPromise promise = channel.newPromise();

  10.        doBind0(regFuture, channel, localAddress, promise);

  11.        return promise;

  12.    } else {

  13.        // Registration future is almost always fulfilled already, but just in case it's not.

  14.        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);

  15.        regFuture.addListener(new ChannelFutureListener() {

  16.            @Override

  17.            public void operationComplete(ChannelFuture future) throws Exception {

  18.                Throwable cause = future.cause();

  19.                if (cause != null) {

  20.                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an

  21.                    // IllegalStateException once we try to access the EventLoop of the Channel.

  22.                    promise.setFailure(cause);

  23.                } else {

  24.                    // Registration was successful, so set the correct executor to use.

  25.                    // See https://github.com/netty/netty/issues/2586

  26.                    promise.executor = channel.eventLoop();

  27.                }

  28.                doBind0(regFuture, channel, localAddress, promise);

  29.            }

  30.        });

  31.        return promise;

  32.    }

  33. }

1、方法initAndRegister返回一个ChannelFuture实例regFuture,通过regFuture可以判断initAndRegister执行结果。 

2、如果regFuture.isDone()为true,说明initAndRegister已经执行完,则直接执行doBind0进行socket绑定。 

3、否则regFuture添加一个ChannelFutureListener监听,当initAndRegister执行完成时,调用operationComplete方法并执行doBind0进行socket绑定。

所以只有当initAndRegister操作结束之后才能进行bind操作。

initAndRegister实现

 
   
   
 
  1. final ChannelFuture initAndRegister() {

  2.    final Channel channel = channelFactory().newChannel();

  3.    try {

  4.        init(channel);

  5.    } catch (Throwable t) {

  6.        channel.unsafe().closeForcibly();

  7.        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor

  8.        return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);

  9.    }

  10.    ChannelFuture regFuture = group().register(channel);

  11.    if (regFuture.cause() != null) {

  12.        if (channel.isRegistered()) {

  13.            channel.close();

  14.        } else {

  15.            channel.unsafe().closeForcibly();

  16.        }

  17.    }

  18.    return regFuture;

  19. }

1、负责创建服务端的NioserverSocketChannel实例 

2、为NioServerSocketChannel的pipeline添加handler 

3、注册NioServerSocketChannel到selector

大部分的过程和NIO中类似。

NioServerSocketChannel

对Nio的ServerSocketChannel和SelectionKey进行了封装。

构造方法:

 
   
   
 
  1. public NioServerSocketChannel() {

  2.    this(newSocket(DEFAULT_SELECTOR_PROVIDER));

  3. }

  4. private static ServerSocketChannel newSocket(SelectorProvider provider) {

  5.    try {

  6.        return provider.openServerSocketChannel();

  7.    } catch (IOException e) {

  8.        throw new ChannelException(

  9.                "Failed to open a server socket.", e);

  10.    }

  11. }

  12. public NioServerSocketChannel(ServerSocketChannel channel) {

  13.    super(null, channel, SelectionKey.OP_ACCEPT);

  14.    config = new NioServerSocketChannelConfig(this, javaChannel().socket());

  15. }

1、方法newSocket利用 provider.openServerSocketChannel() 生成Nio中的ServerSocketChannel对象。 

2、设置SelectionKey.OP_ACCEPT事件。

AbstractNioMessageChannel构造方法

 
   
   
 
  1. protected  AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {

  2.    super(parent, ch, readInterestOp);

  3. }

啥也没做...

AbstractNioChannel构造方法

 
   
   
 
  1. protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {

  2.    super(parent);

  3.    this.ch = ch;

  4.    this.readInterestOp = readInterestOp;

  5.    try {

  6.        ch.configureBlocking(false);

  7.    } catch (IOException e) {

  8.        try {

  9.            ch.close();

  10.        } catch (IOException e2) {

  11.            if (logger.isWarnEnabled()) {

  12.                logger.warn(

  13.                        "Failed to close a partially initialized socket.", e2);

  14.            }

  15.        }

  16.        throw new ChannelException("Failed to enter non-blocking mode.", e);

  17.    }

  18. }

设置当前ServerSocketChannel为非阻塞通道。

AbstractChannel构造方法

 
   
   
 
  1. protected AbstractChannel(Channel parent) {

  2.    this.parent = parent;

  3.    unsafe = newUnsafe();

  4.    pipeline = new DefaultChannelPipeline(this);

  5. }

1、初始化unsafe,这里的Unsafe并非是jdk中底层Unsafe类,用来负责底层的connect、register、read和write等操作。 

2、初始化pipeline,每个Channel都有自己的pipeline,当有请求事件发生时,pipeline负责调用相应的hander进行处理。

unsafe和pipeline的具体实现原理会在后续进行分析。


回到ServerBootstrap的init(Channel channel)方法,添加handler到channel的pipeline中。

 
   
   
 
  1. void init(Channel channel) throws Exception {

  2.    final Map<ChannelOption<?>, Object> options = options();

  3.    synchronized (options) {

  4.        channel.config().setOptions(options);

  5.    }

  6.    final Map<AttributeKey<?>, Object> attrs = attrs();

  7.    synchronized (attrs) {

  8.        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {

  9.            @SuppressWarnings("unchecked")

  10.            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();

  11.            channel.attr(key).set(e.getValue());

  12.        }

  13.    }

  14.    ChannelPipeline p = channel.pipeline();

  15.    final EventLoopGroup currentChildGroup = childGroup;

  16.    final ChannelHandler currentChildHandler = childHandler;

  17.    final Entry<ChannelOption<?>, Object>[] currentChildOptions;

  18.    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;

  19.    synchronized (childOptions) {

  20.        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));

  21.    }

  22.    synchronized (childAttrs) {

  23.        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));

  24.    }

  25.    p.addLast(new ChannelInitializer<Channel>() {

  26.        @Override

  27.        public void initChannel(Channel ch) throws Exception {

  28.            ChannelPipeline pipeline = ch.pipeline();

  29.            ChannelHandler handler = handler();

  30.            if (handler != null) {

  31.                pipeline.addLast(handler);

  32.            }

  33.            pipeline.addLast(new ServerBootstrapAcceptor(

  34.                    currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));

  35.        }

  36.    });

  37. }

1、设置channel的options和attrs。 

2、在pipeline中添加一个ChannelInitializer对象。


init执行完,需要把当前channel注册到EventLoopGroup。 其实最终目的是为了实现Nio中把ServerSocket注册到selector上,这样就可以实现client请求的监听了。看看Netty中是如何实现的:

 
   
   
 
  1. public ChannelFuture register(Channel channel, ChannelPromise promise) {

  2.    return next().register(channel, promise);

  3. }

  4. public EventLoop next() {

  5.    return (EventLoop) super.next();

  6. }

  7. public EventExecutor next() {

  8.    return children[Math.abs(childIndex.getAndIncrement() % children.length)];

  9. }

因为EventLoopGroup中维护了多个eventLoop,next方法会调用chooser策略找到下一个eventLoop,并执行eventLoop的register方法进行注册。

 
   
   
 
  1. public ChannelFuture register(final Channel channel, final ChannelPromise promise) {

  2.    ...

  3.    channel.unsafe().register(this, promise);

  4.    return promise;

  5. }

channel.unsafe()是什么? NioServerSocketChannel初始化时,会创建一个NioMessageUnsafe实例,用于实现底层的register、read、write等操作。

 
   
   
 
  1. eventLoop.execute(new Runnable() {

  2.   @Override

  3.   public void run() {

  4.      register0(promise);

  5.   }

  6. });

  7. private void register0(ChannelPromise promise) {

  8.    try {

  9.        if (!ensureOpen(promise)) {

  10.            return;

  11.        }

  12.        Runnable postRegisterTask = doRegister();

  13.        registered = true;

  14.        promise.setSuccess();

  15.        pipeline.fireChannelRegistered();

  16.        if (postRegisterTask != null) {

  17.            postRegisterTask.run();

  18.        }

  19.        if (isActive()) {

  20.            pipeline.fireChannelActive();

  21.        }

  22.    } catch (Throwable t) {

  23.        // Close the channel directly to avoid FD leak.

  24.        closeForcibly();

  25.        if (!promise.tryFailure(t)) {

  26.        }

  27.        closeFuture.setClosed();

  28.    }

  29. }

  30. public void execute(Runnable task) {

  31.    if (task == null) {

  32.        throw new NullPointerException("task");

  33.    }

  34.    boolean inEventLoop = inEventLoop();

  35.    if (inEventLoop) {

  36.        addTask(task);

  37.    } else {

  38.        startThread();

  39.        addTask(task);

  40.        if (isShutdown() && removeTask(task)) {

  41.            reject();

  42.        }

  43.    }

  44.    if (!addTaskWakesUp) {

  45.        wakeup(inEventLoop);

  46.    }

  47. }

1、register0方法提交到eventLoop线程池中执行,这个时候会启动eventLoop中的线程。 

2、方法doRegister()才是最终Nio中的注册方法,方法javaChannel()获取ServerSocketChannel。

 
   
   
 
  1. protected Runnable doRegister() throws Exception {

  2.    boolean selected = false;

  3.    for (;;) {

  4.        try {

  5.            selectionKey = javaChannel().register(eventLoop().selector, 0, this);

  6.            return null;

  7.        } catch (CancelledKeyException e) {

  8.            if (!selected) {

  9.                // Force the Selector to select now  as the "canceled" SelectionKey may still be

  10.                // cached and not removed because no Select.select(..) operation was called yet.

  11.                eventLoop().selectNow();

  12.                selected = true;

  13.            } else {

  14.                // We forced a select operation on the selector before but the SelectionKey is still cached

  15.                // for whatever reason. JDK bug ?

  16.                throw e;

  17.            }

  18.        }

  19.    }

  20. }

ServerSocketChannel注册完之后,通知pipeline执行fireChannelRegistered方法,pipeline中维护了handler链表,通过遍历链表,执行InBound类型handler的channelRegistered方法,最终执行init中添加的ChannelInitializer handler。

 
   
   
 
  1. public final void channelRegistered(ChannelHandlerContext ctx)

  2.        throws Exception {

  3.    boolean removed = false;

  4.    boolean success = false;

  5.    try {

  6.        initChannel((C) ctx.channel());

  7.        ctx.pipeline().remove(this);

  8.        removed = true;

  9.        ctx.fireChannelRegistered();

  10.        success = true;

  11.    } catch (Throwable t) {

  12.        logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);

  13.    } finally {

  14.        if (!removed) {

  15.            ctx.pipeline().remove(this);

  16.        }

  17.        if (!success) {

  18.            ctx.close();

  19.        }

  20.    }

  21. }

1、initChannel方法最终把ServerBootstrapAcceptor添加到ServerSocketChannel的pipeline,负责accept客户端请求。 

2、在pipeline中删除对应的handler。 

3、触发fireChannelRegistered方法,可以自定义handler的channelRegistered方法。

到目前为止,ServerSocketChannel完成了初始化并注册到seletor上,启动线程执行selector.select()方法准备接受客户端请求。

细心的同学已经发现,ServerSocketChannel的socket还未绑定到指定端口,那么这一块Netty是如何实现的? Netty把注册操作放到eventLoop中执行。

 
   
   
 
  1. private static void doBind0(

  2.        final ChannelFuture regFuture,

  3.        final Channel channel,

  4.        final SocketAddress localAddress,

  5.        final ChannelPromise promise) {

  6.    channel.eventLoop().execute(new Runnable() {

  7.        @Override

  8.        public void run() {

  9.            if (regFuture.isSuccess()) {

  10.                channel.bind(localAddress, promise)

  11. .addListener(ChannelFutureListener.CLOSE_ON_FAILURE);

  12.            } else {

  13.                promise.setFailure(regFuture.cause());

  14.            }

  15.        }

  16.    });

  17. }

  18. public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {

  19.    return pipeline.bind(localAddress, promise);

  20. }

  21. @Override

  22. public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {

  23.    return tail.bind(localAddress, promise);

  24. }

  25. @Override

  26. public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {

  27.    if (localAddress == null) {

  28.        throw new NullPointerException("localAddress");

  29.    }

  30.    validatePromise(promise, false);

  31.    return findContextOutbound().invokeBind(localAddress, promise);

  32. }

  33. private ChannelFuture invokeBind(final SocketAddress localAddress, final ChannelPromise promise) {

  34.    EventExecutor executor = executor();

  35.    if (executor.inEventLoop()) {

  36.        invokeBind0(localAddress, promise);

  37.    } else {

  38.        executor.execute(new Runnable() {

  39.            @Override

  40.            public void run() {

  41.                invokeBind0(localAddress, promise);

  42.            }

  43.        });

  44.    }

  45.    return promise;

  46. }

  47. private void invokeBind0(SocketAddress localAddress, ChannelPromise promise) {

  48.    try {

  49.        ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);

  50.    } catch (Throwable t) {

  51.        notifyOutboundHandlerException(t, promise);

  52.    }

  53. }

  54. @Override

  55. public void bind(

  56.        ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)

  57.        throws Exception {

  58.    unsafe.bind(localAddress, promise);

  59. }

最终由unsafe实现端口的bind操作。

 
   
   
 
  1. public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {

  2.        if (!ensureOpen(promise)) {

  3.            return;

  4.        }

  5.        try {

  6.            boolean wasActive = isActive();

  7.            ...        

  8.            doBind(localAddress);

  9.            promise.setSuccess();

  10.            if (!wasActive && isActive()) {

  11.                pipeline.fireChannelActive();

  12.            }

  13.        } catch (Throwable t) {

  14.            promise.setFailure(t);

  15.            closeIfClosed();

  16.        }

  17.    }

  18. protected void doBind(SocketAddress localAddress) throws Exception {

  19.    javaChannel().socket().bind(localAddress, config.getBacklog());

  20. }

bind完成后,且ServerSocketChannel也已经注册完成,则触发pipeline的fireChannelActive方法,所以在这里可以自定义fireChannelActive方法,默认执行tail的fireChannelActive。

 
   
   
 
  1. @Override

  2. public ChannelPipeline fireChannelActive() {

  3.    head.fireChannelActive();

  4.    if (channel.config().isAutoRead()) {

  5.        channel.read();

  6.    }

  7.    return this;

  8. }

channel.read()方法会触发pipeline的行为:

 
   
   
 
  1.  @Override

  2. public Channel read() {

  3.    pipeline.read();

  4.    return this;

  5. }

  6. @Override

  7. public ChannelPipeline read() {

  8.    tail.read();

  9.    return this;

  10. }

  11. @Override

  12. public ChannelHandlerContext read() {

  13.    findContextOutbound().invokeRead();

  14.    return this;

  15. }

  16. private void invokeRead() {

  17.    EventExecutor executor = executor();

  18.    if (executor.inEventLoop()) {

  19.        invokeRead0();

  20.    } else {

  21.        Runnable task = invokeRead0Task;

  22.        if (task == null) {

  23.            invokeRead0Task = task = new Runnable() {

  24.                @Override

  25.                public void run() {

  26.                    invokeRead0();

  27.                }

  28.            };

  29.        }

  30.        executor.execute(task);

  31.    }

  32. }

  33. private void invokeRead0() {

  34.    try {

  35.        ((ChannelOutboundHandler) handler()).read(this);

  36.    } catch (Throwable t) {

  37.        notifyHandlerException(t);

  38.    }

  39. }

最终会在pipeline中找到handler执行read方法,默认是head。

至此为止,server已经启动完成。


END,我是占小狼。

Netty服务端是如何一步一步启动的?

2018

HAPPY  NEW  YEAR

每一次的思考

都想与你分享

点击阅读原文,查看更多

以上是关于Netty服务端是如何一步一步启动的?的主要内容,如果未能解决你的问题,请参考以下文章

WCF 一步一步 发布 WCF服务 到 IIS (图)

一步一步搭建Svn服务之TortoiseSVN基本操作

一步一步:如何进行 Xdebug 故障排除连接到客户端 IDE

一步一步,一步 从代码到,打包成为手机App,上传至nginx服务器 (Vue项目)

如何用IDEA一步一步开发WebService服务器端

一步一步搭建springCloud