netty源码之断开连接

Posted better_hui

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了netty源码之断开连接相关的知识,希望对你有一定的参考价值。

目录

一、前言

二、正常关闭

1、关闭前的处理工作

2、doClose0调用jdk的close 关闭channel

3、fireChannelInactiveAndDeregister 传播关闭、解绑定事件

4、doDeregister取消selector上的selectorKey

三、异常关闭

四、总结


一、前言

关闭的逻辑是在read方法里 , 当接受的数据<0时,说明没有可接受的数据了,可以关闭了。

二、正常关闭

1.allocHandle.lastBytesRead() <= 0这一行会发现数据小于零,释放byteBuf 2.进入到closeOnRead(pipeline);完成关闭事件

    @Override
    public final void read() 
        final ChannelConfig config = config();
        if (shouldBreakReadReady(config)) 
            clearReadPending();
            return;
        
        final ChannelPipeline pipeline = pipeline();
        // 内存分配器
        final ByteBufAllocator allocator = config.getAllocator();
        // 接收数据测handler
        final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
        allocHandle.reset(config);
​
        ByteBuf byteBuf = null;
        boolean close = false;
        try 
            do 
                // 分配内存是自适应的
                byteBuf = allocHandle.allocate(allocator);
                // 开始读取数据
                allocHandle.lastBytesRead(doReadBytes(byteBuf));
                if (allocHandle.lastBytesRead() <= 0) // 读取为-1,表示断开连接
                    // nothing was read. release the buffer.
                    byteBuf.release();//释放byteBuf的空间
                    byteBuf = null;
                    close = allocHandle.lastBytesRead() < 0;
                    if (close) 
                        // There is nothing left to read as we received an EOF.
                        readPending = false;
                    
                    break;
                
​
                allocHandle.incMessagesRead(1);//读取一次数据
                readPending = false;
                pipeline.fireChannelRead(byteBuf);// 将读到的数据传递出去
                byteBuf = null;
             while (allocHandle.continueReading());//继续读取
​
            // 通过当前
            allocHandle.readComplete();//计算下一次的需要分配的空间
            pipeline.fireChannelReadComplete();// 将完成读取的事件传递出去
​
            if (close) 
                closeOnRead(pipeline);//关闭
            
         catch (Throwable t) 
            handleReadException(pipeline, byteBuf, t, close, allocHandle);
         finally 
            // Check if there is a readPending which was not processed yet.
            // This could be for two reasons:
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
            //
            // See https://github.com/netty/netty/issues/2254
            if (!readPending && !config.isAutoRead()) 
                removeReadOp();
            
        
    

继续跟进核心的方法在AbstractChannel.close方法里

private void close(final ChannelPromise promise, final Throwable cause, final ClosedChannelException closeCause, final boolean notify) if (!promise.setUncancellable()) return;

        if (closeInitiated) 
            if (closeFuture.isDone()) 
                // Closed already.
                safeSetSuccess(promise);
             else if (!(promise instanceof VoidChannelPromise))  // Only needed if no VoidChannelPromise.
                // This means close() was called before so we just register a listener and return
                closeFuture.addListener(new ChannelFutureListener() 
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception 
                        promise.setSuccess();
                    
                );
            
            return;
        
​
        closeInitiated = true;
​
        final boolean wasActive = isActive();
        final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        // outboundBuffer置空
        this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
        Executor closeExecutor = prepareToClose();// 关闭之前将数据发送完成之前的一些操作
        if (closeExecutor != null) 
            closeExecutor.execute(new Runnable() 
                @Override
                public void run() 
                    try 
                        // Execute the close.
                        doClose0(promise);
                     finally 
                        // Call invokeLater so closeAndDeregister is executed in the EventLoop again!
                        invokeLater(new Runnable() 
                            @Override
                            public void run() 
                                if (outboundBuffer != null) 
                                    // Fail all the queued messages
                                    outboundBuffer.failFlushed(cause, notify);
                                    outboundBuffer.close(closeCause);
                                
                                fireChannelInactiveAndDeregister(wasActive);
                            
                        );
                    
                
            );
         else 
            try 
                // Close the channel and fail the queued messages in all cases.
                doClose0(promise);// 执行close操作
             finally 
                // 强行关闭outboundBuffer
                if (outboundBuffer != null) 
                    // Fail all the queued messages.
                    outboundBuffer.failFlushed(cause, notify);
                    outboundBuffer.close(closeCause);
                
            
            if (inFlush0) 
                invokeLater(new Runnable() 
                    @Override
                    public void run() 
                        fireChannelInactiveAndDeregister(wasActive);
                    
                );
             else 
                // 将channel不活跃的事件传播出去
                fireChannelInactiveAndDeregister(wasActive);
            
        
    

1、关闭前的处理工作

1.首先判断是否需要阻塞,先将数据发送完成 2.doDeregister()使得所有的selectorKey失效,防止关闭的时候注册事件到selector上 3.返回一个GlobalEventExecutor.INSTANCE线程池执行任务

2、doClose0调用jdk的close 关闭channel

3、fireChannelInactiveAndDeregister 传播关闭、解绑定事件

1.cancel所有的selectKey 2.pipeline.fireChannelInactive()将close事件传播出去 3.pipeline也取消注册

    private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) 
        if (!promise.setUncancellable()) 
            return;
        
​
        if (!registered) 
            safeSetSuccess(promise);
            return;
        
​
        // As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
        // we need to ensure we do the actual deregister operation later. This is needed as for example,
        // we may be in the ByteToMessageDecoder.callDecode(...) method and so still try to do processing in
        // the old EventLoop while the user already registered the Channel to a new EventLoop. Without delay,
        // the deregister operation this could lead to have a handler invoked by different EventLoop and so
        // threads.
        //
        // See:
        // https://github.com/netty/netty/issues/4435
        invokeLater(new Runnable() 
            @Override
            public void run() 
                try 
                    doDeregister();// cancel所有的selectKey
                 catch (Throwable t) 
                    logger.warn("Unexpected exception occurred while deregistering a channel.", t);
                 finally 
                    if (fireChannelInactive) 
                        // 最后将这个关闭事件传播出去
                        pipeline.fireChannelInactive();
                    
                    // Some transports like local and AIO does not allow the deregistration of
                    // an open channel.  Their doDeregister() calls close(). Consequently,
                    // close() calls deregister() again - no need to fire channelUnregistered, so check
                    // if it was registered.
                    if (registered) 
                        registered = false;
                        // pipeline也取消注册
                        pipeline.fireChannelUnregistered();
                    
                    safeSetSuccess(promise);
                
            
        );
    

4、doDeregister取消selector上的selectorKey

它本质上就是取消了selector上的所有的selectorKey,但是做出了一些优化

三、异常关闭

进入AbstractNioByteChannel,其中的read方法为例,假设正在读取数据的时候突然抛出异常,会被handleReadException(pipeline, byteBuf, t, close, allocHandle);处理,并关闭channel。

    @Override
    public final void read() 
        final ChannelConfig config = config();
        if (shouldBreakReadReady(config)) 
            clearReadPending();
            return;
        
        final ChannelPipeline pipeline = pipeline();
        // 内存分配器
        final ByteBufAllocator allocator = config.getAllocator();
        // 接收数据测handler
        final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
        allocHandle.reset(config);
​
        ByteBuf byteBuf = null;
        boolean close = false;
        try 
            do 
                // 分配内存是自适应的
                byteBuf = allocHandle.allocate(allocator);
                // 开始读取数据
                allocHandle.lastBytesRead(doReadBytes(byteBuf));
                if (allocHandle.lastBytesRead() <= 0) // 读取为-1,表示断开连接
                    // nothing was read. release the buffer.
                    byteBuf.release();//释放byteBuf的空间
                    byteBuf = null;
                    close = allocHandle.lastBytesRead() < 0;
                    if (close) 
                        // There is nothing left to read as we received an EOF.
                        readPending = false;
                    
                    break;
                
​
                allocHandle.incMessagesRead(1);//读取一次数据
                readPending = false;
                pipeline.fireChannelRead(byteBuf);// 将读到的数据传递出去
                byteBuf = null;
             while (allocHandle.continueReading());//继续读取
​
            // 通过当前
            allocHandle.readComplete();//计算下一次的需要分配的空间
            pipeline.fireChannelReadComplete();// 将完成读取的事件传递出去
​
            if (close) 
                closeOnRead(pipeline);//关闭
            
         catch (Throwable t) 
            handleReadException(pipeline, byteBuf, t, close, allocHandle);
         finally 
            // Check if there is a readPending which was not processed yet.
            // This could be for two reasons:
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
            //
            // See https://github.com/netty/netty/issues/2254
            if (!readPending && !config.isAutoRead()) 
                removeReadOp();
            
        
    

四、总结

以上是关于netty源码之断开连接的主要内容,如果未能解决你的问题,请参考以下文章

netty源码之接收连接

netty源码之接收连接

netty源码之接收连接

Netty源码分析之处理新连接

Netty源码,详解Http协议的数据包解码过程

netty 可以可靠地检测通道关闭/断开连接吗?