netty源码之断开连接

Posted better_hui

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了netty源码之断开连接相关的知识,希望对你有一定的参考价值。

目录

一、前言

二、正常关闭

1、关闭前的处理工作

2、doClose0调用jdk的close 关闭channel

3、fireChannelInactiveAndDeregister 传播关闭、解绑定事件

4、doDeregister取消selector上的selectorKey

三、异常关闭

四、总结


一、前言

关闭的逻辑是在read方法里 , 当接受的数据<0时,说明没有可接受的数据了,可以关闭了。

二、正常关闭

1.allocHandle.lastBytesRead() <= 0这一行会发现数据小于零,释放byteBuf 2.进入到closeOnRead(pipeline);完成关闭事件

    @Override
    public final void read() {
        final ChannelConfig config = config();
        if (shouldBreakReadReady(config)) {
            clearReadPending();
            return;
        }
        final ChannelPipeline pipeline = pipeline();
        // 内存分配器
        final ByteBufAllocator allocator = config.getAllocator();
        // 接收数据测handler
        final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
        allocHandle.reset(config);
​
        ByteBuf byteBuf = null;
        boolean close = false;
        try {
            do {
                // 分配内存是自适应的
                byteBuf = allocHandle.allocate(allocator);
                // 开始读取数据
                allocHandle.lastBytesRead(doReadBytes(byteBuf));
                if (allocHandle.lastBytesRead() <= 0) {// 读取为-1,表示断开连接
                    // nothing was read. release the buffer.
                    byteBuf.release();//释放byteBuf的空间
                    byteBuf = null;
                    close = allocHandle.lastBytesRead() < 0;
                    if (close) {
                        // There is nothing left to read as we received an EOF.
                        readPending = false;
                    }
                    break;
                }
​
                allocHandle.incMessagesRead(1);//读取一次数据
                readPending = false;
                pipeline.fireChannelRead(byteBuf);// 将读到的数据传递出去
                byteBuf = null;
            } while (allocHandle.continueReading());//继续读取
​
            // 通过当前
            allocHandle.readComplete();//计算下一次的需要分配的空间
            pipeline.fireChannelReadComplete();// 将完成读取的事件传递出去
​
            if (close) {
                closeOnRead(pipeline);//关闭
            }
        } catch (Throwable t) {
            handleReadException(pipeline, byteBuf, t, close, allocHandle);
        } finally {
            // Check if there is a readPending which was not processed yet.
            // This could be for two reasons:
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
            //
            // See https://github.com/netty/netty/issues/2254
            if (!readPending && !config.isAutoRead()) {
                removeReadOp();
            }
        }
    }
}

继续跟进核心的方法在AbstractChannel.close方法里

private void close(final ChannelPromise promise, final Throwable cause, final ClosedChannelException closeCause, final boolean notify) { if (!promise.setUncancellable()) { return; }

        if (closeInitiated) {
            if (closeFuture.isDone()) {
                // Closed already.
                safeSetSuccess(promise);
            } else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
                // This means close() was called before so we just register a listener and return
                closeFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        promise.setSuccess();
                    }
                });
            }
            return;
        }
​
        closeInitiated = true;
​
        final boolean wasActive = isActive();
        final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        // outboundBuffer置空
        this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
        Executor closeExecutor = prepareToClose();// 关闭之前将数据发送完成之前的一些操作
        if (closeExecutor != null) {
            closeExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        // Execute the close.
                        doClose0(promise);
                    } finally {
                        // Call invokeLater so closeAndDeregister is executed in the EventLoop again!
                        invokeLater(new Runnable() {
                            @Override
                            public void run() {
                                if (outboundBuffer != null) {
                                    // Fail all the queued messages
                                    outboundBuffer.failFlushed(cause, notify);
                                    outboundBuffer.close(closeCause);
                                }
                                fireChannelInactiveAndDeregister(wasActive);
                            }
                        });
                    }
                }
            });
        } else {
            try {
                // Close the channel and fail the queued messages in all cases.
                doClose0(promise);// 执行close操作
            } finally {
                // 强行关闭outboundBuffer
                if (outboundBuffer != null) {
                    // Fail all the queued messages.
                    outboundBuffer.failFlushed(cause, notify);
                    outboundBuffer.close(closeCause);
                }
            }
            if (inFlush0) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        fireChannelInactiveAndDeregister(wasActive);
                    }
                });
            } else {
                // 将channel不活跃的事件传播出去
                fireChannelInactiveAndDeregister(wasActive);
            }
        }
    }

1、关闭前的处理工作

1.首先判断是否需要阻塞,先将数据发送完成 2.doDeregister()使得所有的selectorKey失效,防止关闭的时候注册事件到selector上 3.返回一个GlobalEventExecutor.INSTANCE线程池执行任务

2、doClose0调用jdk的close 关闭channel

3、fireChannelInactiveAndDeregister 传播关闭、解绑定事件

1.cancel所有的selectKey 2.pipeline.fireChannelInactive()将close事件传播出去 3.pipeline也取消注册

    private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
        if (!promise.setUncancellable()) {
            return;
        }
​
        if (!registered) {
            safeSetSuccess(promise);
            return;
        }
​
        // As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
        // we need to ensure we do the actual deregister operation later. This is needed as for example,
        // we may be in the ByteToMessageDecoder.callDecode(...) method and so still try to do processing in
        // the old EventLoop while the user already registered the Channel to a new EventLoop. Without delay,
        // the deregister operation this could lead to have a handler invoked by different EventLoop and so
        // threads.
        //
        // See:
        // https://github.com/netty/netty/issues/4435
        invokeLater(new Runnable() {
            @Override
            public void run() {
                try {
                    doDeregister();// cancel所有的selectKey
                } catch (Throwable t) {
                    logger.warn("Unexpected exception occurred while deregistering a channel.", t);
                } finally {
                    if (fireChannelInactive) {
                        // 最后将这个关闭事件传播出去
                        pipeline.fireChannelInactive();
                    }
                    // Some transports like local and AIO does not allow the deregistration of
                    // an open channel.  Their doDeregister() calls close(). Consequently,
                    // close() calls deregister() again - no need to fire channelUnregistered, so check
                    // if it was registered.
                    if (registered) {
                        registered = false;
                        // pipeline也取消注册
                        pipeline.fireChannelUnregistered();
                    }
                    safeSetSuccess(promise);
                }
            }
        });
    }

4、doDeregister取消selector上的selectorKey

它本质上就是取消了selector上的所有的selectorKey,但是做出了一些优化

三、异常关闭

进入AbstractNioByteChannel,其中的read方法为例,假设正在读取数据的时候突然抛出异常,会被handleReadException(pipeline, byteBuf, t, close, allocHandle);处理,并关闭channel。

    @Override
    public final void read() {
        final ChannelConfig config = config();
        if (shouldBreakReadReady(config)) {
            clearReadPending();
            return;
        }
        final ChannelPipeline pipeline = pipeline();
        // 内存分配器
        final ByteBufAllocator allocator = config.getAllocator();
        // 接收数据测handler
        final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
        allocHandle.reset(config);
​
        ByteBuf byteBuf = null;
        boolean close = false;
        try {
            do {
                // 分配内存是自适应的
                byteBuf = allocHandle.allocate(allocator);
                // 开始读取数据
                allocHandle.lastBytesRead(doReadBytes(byteBuf));
                if (allocHandle.lastBytesRead() <= 0) {// 读取为-1,表示断开连接
                    // nothing was read. release the buffer.
                    byteBuf.release();//释放byteBuf的空间
                    byteBuf = null;
                    close = allocHandle.lastBytesRead() < 0;
                    if (close) {
                        // There is nothing left to read as we received an EOF.
                        readPending = false;
                    }
                    break;
                }
​
                allocHandle.incMessagesRead(1);//读取一次数据
                readPending = false;
                pipeline.fireChannelRead(byteBuf);// 将读到的数据传递出去
                byteBuf = null;
            } while (allocHandle.continueReading());//继续读取
​
            // 通过当前
            allocHandle.readComplete();//计算下一次的需要分配的空间
            pipeline.fireChannelReadComplete();// 将完成读取的事件传递出去
​
            if (close) {
                closeOnRead(pipeline);//关闭
            }
        } catch (Throwable t) {
            handleReadException(pipeline, byteBuf, t, close, allocHandle);
        } finally {
            // Check if there is a readPending which was not processed yet.
            // This could be for two reasons:
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
            //
            // See https://github.com/netty/netty/issues/2254
            if (!readPending && !config.isAutoRead()) {
                removeReadOp();
            }
        }
    }

四、总结

以上是关于netty源码之断开连接的主要内容,如果未能解决你的问题,请参考以下文章

netty源码之接收连接

netty源码之接收连接

netty源码之接收连接

Netty源码分析之处理新连接

Netty源码,详解Http协议的数据包解码过程

netty 可以可靠地检测通道关闭/断开连接吗?