netty代码解析

Posted v4ki5mqu

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了netty代码解析相关的知识,希望对你有一定的参考价值。

boot.bind:大致流程:boot.bind会在bossLoop注册一条serverChannel(关注accept事件),然后开启一个线程不停的selector,当有事件发生的时候,会产生一个channel和客户端通信,把这个channel注册到workLoop上,workLoop开启一个线程不停的selector。特别的,一个channel只能注册到一个loop上,一个loop可以接受多个channel。

通道pipeline的生成:在bind/connect过程中都会进入initAndRegister,会调用channelFactory.newChannel方法调用NioserverSocketChannel的构造方法,在构造方法里会通过provider创建出底层的nioSocketChannel。构建完成后会调用init(channel),会在channel的pipeline中添加ChannelInitializer,ChannelInitializer的作用就是initChannel时,将handler添加进去。如果是服务端,会额外的添加一个ServerBootstrapAcceptor(作用就是收到客户端连接时,将通道注册到workerGroup)。newChannel、init(channel)之后channel已经被创建出来,并且添加了一个ChannelInitializer。在init(channel)后会调用group.register(channel)。group会根据chooser选择出一个loop注册channel,会将channel的loop指定为选择出的loop(通道绑定loop),然后会把register0(promise)作为任务提交到loop。loop会开启一个线程(只会开启一个),这个线程负责调用SingleThreadEventExecutor.this.run();这个run中会根据select策略、是否有任务进行select,然后调用updateSelectKeys,(unsafe.read()--->pipeline.fireChannelRead--->pipeline处理。),然后runAllTasks,保证所有提交任务都会执行。register0(promise)中会进行调用doRegister()JDK 底层的操作将Channel 注册到 Selector上,然后会调用pipeline.invokeHandlerAddedIfNeeded(),将pipeline构造完成(用户的handler会添加进去,ChannelInitializer会移除)。

服务端的 boot.handler和boot.childHandler区别:handler对serverChannel生效,也就是accpet的时候。childHandler是对客户端连接之后,服务端生成一个channel和客户端通信,child中的handler对这个channel生效。

参考:https://www.cnblogs.com/yuand...

// 源码解析
初始化eventLoopGroup:
最终调用到MultithreadEventExecutorGroup.MultithreadEventExecutorGroup()

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }
    // 这个就是NioEventLoop执行任务的执行器,NioEventLoop绑定的线程由它产生。
    // 并且这里构造的线程是FastThreadLocalThread
    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }
    // 构建EventLoop数组
    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i++) {
        boolean success = false;
        try {
            // 初始化EventLoop,这个就是工作线程了
            // 主要是赋值provider seletor
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) { // 如果一个失败了 全部关闭
                for (int j = 0; j < i; j++) {
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j++) { 
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) { // 失败了 死循环中断
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        // Let the caller handle the interruption.
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }
    // 提交一个任务到线程池的时候,线程池需要选择(choose)其中的一个线程来执行这个任务 选择器
    chooser = chooserFactory.newChooser(children);

    // 中断监听
    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };

    for (EventExecutor e : children) {
        e.terminationFuture().addListener(terminationListener);
    }
    // 生成一个只读的set
    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

newChild()最终调用到NioEventLoopGroup.newChild() 然后调用的NioEventLoop.NioEventLoop()

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    if (strategy == null) {
        throw new NullPointerException("selectStrategy");
    }
    // 设置selector的Provider
    provider = selectorProvider;
    // 根据provider获取selector 这个就是nio的selector,负责轮询的
    selector = openSelector();
    // selector.select()时候的策略
    selectStrategy = strategy;
}

super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);调用的是SingleThreadEventExecutor.SingleThreadEventExecutor()

    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                    boolean addTaskWakesUp, int maxPendingTasks,
                                    RejectedExecutionHandler rejectedHandler) {
    super(parent);
    // 设置添加任务是否唤醒
    this.addTaskWakesUp = addTaskWakesUp;
    // 最大任务数
    this.maxPendingTasks = Math.max(16, maxPendingTasks);
    // 任务执行器
    this.executor = ObjectUtil.checkNotNull(executor, "executor");
    // 任务队列
    taskQueue = newTaskQueue(this.maxPendingTasks);
    // 拒绝策略
    rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

eventLoopGroup初始化:

nThreads:线程池中的线程数,也就是 NioEventLoop 的实例数量,默认地 nThreads 会被设置为 CPU 核心数 *2。
executor:给NioEventLoop使用的。NioEventLoop运行任务的线程就是由它创造执行的。
chooserFactory:提交一个任务到线程池的时候,线程池需要选择(choose)其中的一个线程来执行这个任务,这个就是用来实现选择策略的。
selectorProvider:实例化 JDK 的 Selector,每个线程池都持有一个 selectorProvider 实例。
selectStrategyFactory:NioEventLoop在select是参考的策略。
rejectedExecutionHandler:线程池拒绝策略。
NioEventLoop的构建(构建过程中调用newChild):在eventLoopGroup初始化过程中,会根据nThreads生成对应数量的loop,调用newChid()构建loop,会调用NioEventLoop的构造函数生成loop,构建时会根据系统生成provider,然后产生对应的selector,也就是每个loop都有selector。特别的:每个loop都是一个线程,这个线程的主要工作就是从taskQueue中拉去任务运行,loop中的线程是由loopGroup.executor产生的。bind/connect本质上就是提交一个任务到loop的taskQueue。当有新的连接accept到服务端,服务端的ServerBootstrapAcceptor会把新开的channel注册到workGroup上(根据chooser策略选择一个loop),就是提交任务到loop的taskQueue上。

绑定过程

private ChannelFuture doBind(final SocketAddress localAddress) {
   // 初始化并注册通道
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        // 进行socket绑定。
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it\'s not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();

                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

initAndRegister方法:创建、初始化channel

    final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        // 创建channel
        channel = channelFactory.newChannel();
        // 初始化channel
        init(channel);
    } catch (Throwable t) {
        if (channel != null) {
            // channel can be null if newChannel crashed (eg SocketException("too many open files"))
            channel.unsafe().closeForcibly();
        }
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
    }
    // 向group注册channel
    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
    return regFuture;
}

newChannel方法:创建channel,根据配置的channel工厂利用反射创建channel,设置了opt,最终使用provider.openServerSocketChannel()/provider.openSocketChannel()生成channel
init方法:设置channl属性,设置pipeline

 void init(Channel channel) throws Exception {
    // options设置
    final Map<ChannelOption<?>, Object> options = options0();
    synchronized (options) {
        channel.config().setOptions(options);
    }
    // 属性设置
    final Map<AttributeKey<?>, Object> attrs = attrs0();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }
   // 设置pipeline
    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
    }
    // 这里添加了ChannelInitializer,这里添加了ServerBootstrapAcceptor,这个会将新连接的channel注册到work group去
    p.addLast(new ChannelInitializer<Channel>() {
        @Override 
        public void initChannel(Channel ch) throws Exception { // 通道注册时就会调用
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }
            // 其中的handler是用户配置的监听连接的handlers,有可能是ChannelInitializer
            // 为了保证用户的handlers全部能够再ServerBootstrapAcceptor之前被加载,
            // 必须要延后加载ServerBootstapAcceptor
            ch.eventLoop().execute(new Runnable() { // 通道注册完毕了,开始监听事件了
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

register方法:向group注册channel

    @Override
    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        if (eventLoop == null) {
            throw new NullPointerException("eventLoop");
        }
        if (isRegistered()) {
            promise.setFailure(new IllegalStateException("registered to an event loop already"));
            return;
        }
        if (!isCompatible(eventLoop)) {
            promise.setFailure(
                    new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
            return;
        }
        // 通道和eventLoop绑定
        AbstractChannel.this.eventLoop = eventLoop;
        // 如果是自己线程 直接register0
        if (eventLoop.inEventLoop()) {
            register0(promise);
        } else {
            try {
                // 提交一个任务到eventLoop
                eventLoop.execute(new Runnable() {
                    @Override
                    public void run() {
                        register0(promise);
                    }
                });
            } catch (Throwable t) {
                logger.warn(
                        "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                        AbstractChannel.this, t);
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }
    }

register0 方法:

     private void register0(ChannelPromise promise) {
        try {
            // check if the channel is still open as it could be closed in the mean time when the register
            // call was outside of the eventLoop
            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }
            boolean firstRegistration = neverRegistered;
            // 真正注册的地方
            doRegister();
            neverRegistered = false;
            registered = true;

            // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
            // user may already fire events through the pipeline in the ChannelFutureListener.
            // 触发用户定义的handlerAdded(...)监听
            pipeline.invokeHandlerAddedIfNeeded();

            safeSetSuccess(promise);
            // 传播一个register事件放入pipeline 所有关心 register 事件的 handler会触发对应函数
            pipeline.fireChannelRegistered();
            // Only fire a channelActive if the channel has never been registered. This prevents firing
            // multiple channel actives if the channel is deregistered and re-registered.
            // 第一次注册不会进入此逻辑 还没有绑定socket激活
            // bind完端口才会激活active。端口都没绑定channel是没法激活的
            // 在通道中传播active事件
            if (isActive()) {
                if (firstRegistration) {
                    pipeline.fireChannelActive();
                } else if (config().isAutoRead()) {
                    // This channel was registered before and autoRead() is set. This means we need to begin read
                    // again so that we process inbound data.
                    //
                    // See https://github.com/netty/netty/issues/4805
                    beginRead();
                }
            }
        } catch (Throwable t) {
            // Close the channel directly to avoid FD leak.
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }  

invokeHandlerAddedIfNeeded方法:第一次注册就调用callHandlerAddedForAllHandlers

    final void invokeHandlerAddedIfNeeded() {
    assert channel.eventLoop().inEventLoop();
    if (firstRegistration) {
        firstRegistration = false;
        // We are now registered to the EventLoop. It\'s time to call the callbacks for the ChannelHandlers,
        // that were added before the registration was done.
        callHandlerAddedForAllHandlers();
    }
}

callHandlerAddedForAllHandlers方法:

    private void callHandlerAddedForAllHandlers() {
    final PendingHandlerCallback pendingHandlerCallbackHead;
    synchronized (this) {
        assert !registered;

        // This Channel itself was registered.
        registered = true;

        pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
        // Null out so it can be GC\'ed.
        this.pendingHandlerCallbackHead = null;
    }

    // This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
    // holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
    // the EventLoop.
    PendingHandlerCallback task = pendingHandlerCallbackHead;
    // while循环 遍历所有的handler
    while (task != null) {
        task.execute();
        task = task.next;
    }
}

最终进入

  private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
    try {
    // 触发所有hander的handlerAdded方法
    // 如果是ChannelInitializer最终会调用initChannel(Server bind过程中就添加的ChannelInitializer的initChannel会在这里触发)
        ctx.handler().handlerAdded(ctx);
        ctx.setAddComplete();
    }
    ....
    }

fireChannelRegistered函数:

public final ChannelPipeline fireChannelRegistered() {
// 传递的是head
    AbstractChannelHandlerContext.invokeChannelRegistered(head);
    return this;
}

invokeChannelRegistered函数:

static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
// 还是交给channel对应的eventLoop中的线程执行
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRegistered();
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRegistered();
            }
        });
    }
} 

invokeChannelRegistered方法:

 private void invokeChannelRegistered() {
 //    确保hander添加完毕
    if (invokeHandler()) {
        try {
            // ChannelInboundHandler类型的hander才会触发
            ((ChannelInboundHandler) handler()).channelRegistered(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRegistered();
    }
}

进入head的channelRegistered方法:

    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        invokeHandlerAddedIfNeeded();
        ctx.fireChannelRegistered();
    }

fireChannelRegistered 方法:

 public ChannelHandlerContext fireChannelRegistered() {
 //findContextInbound 寻找head的下一个inboundHandler 触发register事件
    invokeChannelRegistered(findContextInbound());
    return this;
}
  

doRegister方法:真正绑定socket的地方

protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            // 将通道绑定到eventLoop的selector,
            // 这里注册时 ops 设置的是 0,也就是说 ServerSocketChannel 仅仅只是表示了注册成功,还不能监听任何网络操作
            // 注册方式是多态的,它既可以被 NIOServerSocketChannel 用来监听客户端的连接接入,也可以注册 SocketChannel 用来监听网络读或者写操作。
            // 通过 SelectionKey.interestOps(int ops) 方法可以方便地修改监听操作位。
            // 所以,此处注册只需要获取 SelectionKey 并给 AbstractNIOChannel 的成员变量 selectionKey 赋值
            selectionKey = javaChannel().register(eventLoop().selector, 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }
}

initAndRegister之后是doBind0方法:进行socket绑定,这里正式开始监听

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {
    // 提交词任务到channel的eventLoop中去
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) { // channel注册成功了
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

bind方法:最终进入的是

public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
    if (localAddress == null) {
        throw new NullPointerException("localAddress");
    }
    if (!validatePromise(promise, false)) {
        // cancelled
        return promise;
    }
    // findContextOutbound 从tail节点往前走,寻找outBound类型的handler
    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeBind(localAddress, promise);
    } else {
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                next.invokeBind(localAddress, promise);
            }
        }, promise, null);
    }
    return promise;
}

invokeBind方法:

private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
    if (invokeHandler()) {
        try {
                // 一般的handler没有重写bind方法,最终会走到head里面
            ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    } else {
        bind(localAddress, promise);
    }
}

head的bind方法:

    public void bind(
            ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
            throws Exception {
            // 真正绑定的地方
        unsafe.bind(localAddress, promise);
    }

AbstractChannel.bind方法

    public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
        assertEventLoop();
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        // See: https://github.com/netty/netty/issues/576
        if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
            localAddress instanceof InetSocketAddress &&
            !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
            !PlatformDependent.isWindows() && !PlatformDependent.isRoot()) {
            // Warn a user about the fact that a non-root user can\'t receive a
            // broadcast packet on *nix if the socket is bound on non-wildcard address.
            logger.warn(
                    "A non-root user can\'t receive a broadcast packet if the socket " +
                    "is not bound to a wildcard address; binding to a non-wildcard " +
                    "address (" + localAddress + ") anyway as requested.");
        }
        boolean wasActive = isActive();
        try {
            doBind(localAddress);
        } catch (Throwable t) {
            safeSetFailure(promise, t);
            closeIfClosed();
            return;
        }
        // dobind之后,通道绑定了socket,已经被激活了
        if (!wasActive && isActive()) {
            invokeLater(new Runnable() {
                @Override
                public void run() {
                    pipeline.fireChannelActive();
                }
            });
        }
        safeSetSuccess(promise);
    }

doBind:最终绑定进入的是NioServerSocketChannel.bind:底层socket和channel绑定
private void doBind0(SocketAddress localAddress) throws Exception {

    if (PlatformDependent.javaVersion() >= 7) {
        javaChannel().bind(localAddress);
    } else {
        javaChannel().socket().bind(localAddress);
    }
}

绑定之后会调用pipeline.fireChannelActive():在通道中传播通道激活的事件
在head之中的channelActive方法会调用readIfIsAutoRead,
这个会在通道中传播read事件,每个handler都会调用read事件,最终会调用head的read方法,修改channel的感兴趣事件.此事件在初始化的时候就设定好了

//参考 https://blog.csdn.net/u013828...

//
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {

final NioUnsafe unsafe = ch.unsafe();
//检查该SelectionKey是否有效,如果无效,则关闭channel
if (!k.isValid()) {
    // close the channel if the key is not valid anymore
    unsafe.close(unsafe.voidPromise());
    return;
}

try {
    int readyOps = k.readyOps();
    // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
    // to a spin loop
    // 如果准备好READ或ACCEPT则触发unsafe.read() ,检查是否为0,如上面的源码英文注释所说:解决JDK可能会产生死循环的一个bug。
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
        unsafe.read();
        if (!ch.isOpen()) {//如果已经关闭,则直接返回即可,不需要再处理该channel的其他事件
            // Connection already closed - no need to handle write.
            return;
        }
    }
    // 如果准备好了WRITE则将缓冲区中的数据发送出去,如果缓冲区中数据都发送完成,则清除之前关注的OP_WRITE标记
    if ((readyOps & SelectionKey.OP_WRITE) != 0) {
        // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
        ch.unsafe().forceFlush();
    }
    // 如果是OP_CONNECT,则需要移除OP_CONNECT否则Selector.select(timeout)将立即返回不会有任何阻塞,这样可能会出现cpu 100%
    if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
        // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
        // See https://github.com/netty/netty/issues/924
        int ops = k.interestOps();
        ops &= ~SelectionKey.OP_CONNECT;
        k.interestOps(ops);

        unsafe.finishConnect();
    }
} catch (CancelledKeyException ignored) {
    unsafe.close(unsafe.voidPromise());
}

}
//该方法主要是对SelectionKey k进行了检查,有如下几种不同的情况

1)OP_ACCEPT,接受客户端连接
2)OP_READ, 可读事件, 即 Channel 中收到了新数据可供上层读取。
3)OP_WRITE, 可写事件, 即上层可以向 Channel 写入数据。
4)OP_CONNECT, 连接建立事件, 即 TCP 连接已经建立, Channel 处于 active 状态。

// read过程:unsafe.read();在channel构造时初始化了unsafe,channel为 NioServerSocketChannel,则unsafe为NioServerSocketChannel的属性为NioMessageUnsafe
// 主要是 doReadMessages(readBuf) -->pipeline.fireChannelRead(readBuf.get(i))-->pipeline.fireChannelReadComplete()

public void read() {
    assert eventLoop().inEventLoop();
    final ChannelPipeline pipeline = pipeline();
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    do {
    // readBuf 是NioMessageUnsafe的类属性 List<Object>

        int localRead = doReadMessages(readBuf);
        if (localRead == 0) {
            break;
        }
        if (localRead < 0) {
            closed = true;
            break;
        }
    } while (allocHandle.continueReading());
    int size = readBuf.size();
    for (int i = 0; i < size; i ++) {
        pipeline.fireChannelRead(readBuf.get(i));
    }
    readBuf.clear();
    pipeline.fireChannelReadComplete();
}

doReadMessages函数:服务端是NioServerSocketChannel.doReadMessages
protected int doReadMessages(List<Object> buf) throws Exception {

    SocketChannel ch = javaChannel().accept();

    try {
        if (ch != null) {
        // 新增了一个NioSocketChannel 最终并且为这个NioSocketChannel设置了readOpt
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        logger.warn("Failed to create a new channel from an accepted socket.", t);
        try {
            ch.close();
        } catch (Throwable t2) {
            logger.warn("Failed to close a socket.", t2);
        }
    }
    return 0;
} 

new NioSocketChannel(this, ch)最终调用:

  protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
  // 初始化的是OP_READ
    super(parent, ch, SelectionKey.OP_READ);
}

pipeline.fireChannelRead(readBuf.get(i))会触发pipeline上所有inbound的channelRead函数,也会调用ServerBootstrapAcceptor.channelRead,服务端的channel初始化时,在pipeline中,已经自动添加了一个pipeline处理器 ServerBootstrapAcceptor。
ServerBootstrapAcceptor.channelRead函数:

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    // 这个强制转化是因为前面doReadMessages函数放入的就是这个
    final Channel child = (Channel) msg;

    child.pipeline().addLast(childHandler);

    for (Entry<ChannelOption<?>, Object> e: childOptions) {
        try {
            if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                logger.warn("Unknown channel option: " + e);
            }
        } catch (Throwable t) {
            logger.warn("Failed to set a channel option: " + child, t);
        }
    }

    for (Entry<AttributeKey<?>, Object> e: childAttrs) {
        child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
    }

    try {
    // 注册通道到childGroup
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

// write过程
AbstractChannelHandlerContext.write方法:
从tail开始找到pipeline中的第一个outbound的handler,然后调用 invokeWrite(m, promise),最终进入head调用unsafe.write(msg, promise);

     public final void write(Object msg, ChannelPromise promise) {
        assertEventLoop();
        // ChannelOutboundBuffer内部维护了一个Entry链表,并使用Entry封装msg。链表维护了三个指针
        // 1.flushedEntry 指针表示第一个被写到操作系统Socket缓冲区中的节点
        // 2.unFlushedEntry 指针表示第一个未被写入到操作系统Socket缓冲区中的节点
        // 3.tailEntry指针表示ChannelOutboundBuffer缓冲区的最后一个节点
        ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        if (outboundBuffer == null) {
            // If the outboundBuffer is null we know the channel was closed and so
            // need to fail the future right away. If it is not null the handling of the rest
            // will be done in flush0()
            // See https://github.com/netty/netty/issues/2362
            safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
            // release message now to prevent resource-leak
            ReferenceCountUtil.release(msg);
            return;
        }

        int size;
        try {
            // 将待写入的对象过滤,把非ByteBuf对象和FileRegion过滤,把所有的非直接内存转换成直接内存DirectBuffer
            msg = filterOutboundMessage(msg);
            // 计算消息大小
            size = pipeline.estimatorHandle().size(msg);
            if (size < 0) {
                size = 0;
            }
        } catch (Throwable t) {
            safeSetFailure(promise, t);
            ReferenceCountUtil.release(msg);
            return;
        }
        // 放入链表
        outboundBuffer.addMessage(msg, size, promise);
    }

// flush过程
最终进入到head调用unsafe.flush

public final void flush() {
    assertEventLoop();
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        return;
    }
    // 确认要发送的entry
    outboundBuffer.addFlush();
    // 发送数据
    flush0();
}

flush0方法:

    protected final void flush0() {
        // Flush immediately only when there\'s no pending flush.
        // If there\'s a pending flush operation, event loop will call forceFlush() later,
        // and thus there\'s no need to call it now.
        if (isFlushPending()) { // 如果注册了write事件,认为是socket暂时不可写。Socket绝大部分情况下是可以写的,只有缓存区满了或者网络原因才不可写
                                // 等内核主动告知可写的时候再写入,由selector触发flush操作
            return;
        }
        super.flush0();
    }

最终进入doWrite方法:
NioSocketChannel#doWrite

protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    for (;;) {
        int size = in.size();
        if (size == 0) {
            // All written so clear OP_WRITE
            clearOpWrite();
            break;
        }
        long writtenBytes = 0;
        boolean done = false;
        boolean setOpWrite = false;

        // Ensure the pending writes are made of ByteBufs only.
        ByteBuffer[] nioBuffers = in.nioBuffers();
        int nioBufferCnt = in.nioBufferCount();
        long expectedWrittenBytes = in.nioBufferSize();
        SocketChannel ch = javaChannel();

        // Always us nioBuffers() to workaround data-corruption.
        // See https://github.com/netty/netty/issues/2761
        switch (nioBufferCnt) { //根据bytebuf的数量调用不同函数
            case 0:
                // We have something else beside ByteBuffers to write so fallback to normal writes.
                super.doWrite(in);
                return;
            case 1:
                // Only one ByteBuf so use non-gathering write
                ByteBuffer nioBuffer = nioBuffers[0];
                for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                    final int localWrittenBytes = ch.write(nioBuffer);
                    if (localWrittenBytes == 0) {
                        setOpWrite = true;
                        break;
                    }
                    expectedWrittenBytes -= localWrittenBytes;
                    writtenBytes += localWrittenBytes;
                    if (expectedWrittenBytes == 0) {
                        done = true;
                        break;
                    }
                }
                break;
            default:
                for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                    final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                    if (localWrittenBytes == 0) {
                        setOpWrite = true;
                        break;
                    }
                    expectedWrittenBytes -= localWrittenBytes;
                    writtenBytes += localWrittenBytes;
                    if (expectedWrittenBytes == 0) {
                        done = true;
                        break;
                    }
                }
                break;
        }

        // Release the fully written buffers, and update the indexes of the partially written buffer.
        in.removeBytes(writtenBytes);

        if (!done) {
            // Did not write all buffers completely.
            // 没有把所有的buffers写完
            // 这里会向通道注册write事件
            incompleteWrite(setOpWrite);
            break;
        }
    }
}

javaChannel.write最终调用的是 n = IOUtil.write(fd, src, -1, nd);

static int write(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
//如果是DirectBuffer,直接写,将堆外缓存中的数据拷贝到内核缓存中进行发送
if (var1 instanceof DirectBuffer) {
    return writeFromNativeBuffer(var0, var1, var2, var4);
} else {
    //非DirectBuffer
    //获取已经读取到的位置
    int var5 = var1.position();
    //获取可以读到的位置
    int var6 = var1.limit();

    assert var5 <= var6;
    //申请一个原buffer可读大小的DirectByteBuffer
    int var7 = var5 <= var6 ? var6 - var5 : 0;
    ByteBuffer var8 = Util.getTemporaryDirectBuffer(var7);
    int var10;
    try {
        var8.put(var1);
        var8.flip();
        var1.position(var5);
        //通过DirectBuffer写,将堆外缓存的数据拷贝到内核缓存中进行发送
        int var9 = writeFromNativeBuffer(var0, var8, var2, var4);
        if (var9 > 0) {
            var1.position(var5 + var9);
        }
        var10 = var9;
    } finally {
        //回收分配的DirectByteBuffer
        Util.offerFirstTemporaryDirectBuffer(var8);
    }
    return var10;
}

}
// 参考 https://blog.csdn.net/lblblbl...

https://www.cnblogs.com/java-chen-hao/p/11477384.html
https://blog.csdn.net/TheLudlows/article/details/83997280


// 对象池 https://www.jianshu.com/p/854...
// 内存分配
// https://blog.csdn.net/pentium...
// https://www.cnblogs.com/ricki...
// FastThreadLocal https://www.jianshu.com/p/14f...

以上是关于netty代码解析的主要内容,如果未能解决你的问题,请参考以下文章

灰常详细的Netty实现聊天室的代码解析

Netty原理实践解析

Netty系列・高级篇Netty核心源码解析

片段(Java) | 机试题+算法思路+考点+代码解析 2023

netty源码解解析(4.0)-16 ChannelHandler概览

spring webflux(netty)处理程序无法解析包含大于 750 字节的 json 的 ServerRequest