Netty源码分析-NioByteUnsafe(read读取流程)

Posted 征服.刘华强

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty源码分析-NioByteUnsafe(read读取流程)相关的知识,希望对你有一定的参考价值。

NioByteUnsafe封装了NiosocketChannel读取底层数据的流程。

NioEventLoop负责监听Selector上所有的事件,当发生事件时根据事件类型调用Channel的UnSafe中的方法去处理。

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) 
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) 
            final EventLoop eventLoop;
            try 
                eventLoop = ch.eventLoop();
             catch (Throwable ignored) 
                // If the channel implementation throws an exception because there is no event loop, we ignore this
                // because we are only trying to determine if ch is registered to this event loop and thus has authority
                // to close ch.
                return;
            
            // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
            // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
            // still healthy and should not be closed.
            // See https://github.com/netty/netty/issues/5125
            if (eventLoop == this) 
                // close the channel if the key is not valid anymore
                unsafe.close(unsafe.voidPromise());
            
            return;
        

        try 
            int readyOps = k.readyOps();
            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
            // the NIO JDK channel implementation may throw a NotYetConnectedException.
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) 
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            

            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            if ((readyOps & SelectionKey.OP_WRITE) != 0) 
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            

            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) 
                unsafe.read();
            
         catch (CancelledKeyException ignored) 
            unsafe.close(unsafe.voidPromise());
        
    

 

NioByteUnsafe当中的read方法

1、分配ByteBuf。

2、从底层SocketChannel中读取字节,封装到ByteBuf中。

3、调用Channel的PPLine交给管道中的编解码器去处理。

4、调用各种事件。

 @Override
        public final void read() 
            final ChannelConfig config = config();
            if (shouldBreakReadReady(config)) 
                clearReadPending();
                return;
            
            //每个channel对应一个PPLine
            final ChannelPipeline pipeline = pipeline();
            //ByteBuf分配器
            final ByteBufAllocator allocator = config.getAllocator();
            //容量计算器
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            //重置,把之前计数的值全部清空
            allocHandle.reset(config);

            ByteBuf byteBuf = null;
            boolean close = false;
            try 
                do 
                    //分配内存,关键在于计算分配内存的大小(小了不够,大了浪费)
                    byteBuf = allocHandle.allocate(allocator);
                    //doReadBytes,从socket读取字节到byteBuf,返回真实读取数量
                    //更新容量计算器
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    //如果小于0 则socket关闭,如果等于0则没读取到数据
                    if (allocHandle.lastBytesRead() <= 0) 
                        // nothing was read. release the buffer.
                        //释放资源
                        byteBuf.release();
                        byteBuf = null;
                        //如果小于0则意味着socket关闭
                        close = allocHandle.lastBytesRead() < 0;
                        if (close) 
                            // There is nothing left to read as we received an EOF.
                            readPending = false;
                        
                        break;
                    

                    //增加循环计数器
                    allocHandle.incMessagesRead(1);
                    readPending = false;
                    //把读取到的数据,交给管道去处理
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;
                    //判断是否继续从socket读取数据
                 while (allocHandle.continueReading());

                //读取完成后调用readComplete,重新估算内存分配容量
                allocHandle.readComplete();
                //事件激发
                pipeline.fireChannelReadComplete();

                //如果需要关闭,则处理关闭
                if (close) 
                    closeOnRead(pipeline);
                
             catch (Throwable t) 
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
             finally 
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                //
                //根据情况移除OP_READ事件
                if (!readPending && !config.isAutoRead()) 
                    removeReadOp();
                
            
        

在读取中遇到异常,或者返回EOF(-1),说明底层通道关闭,需要处理关闭逻辑。

 private void closeOnRead(ChannelPipeline pipeline) 
            //底层输入流未关闭
            if (!isInputShutdown0()) 
                //是否允许办关闭
                if (isAllowHalfClosure(config())) 
                    //关闭输入流
                    shutdownInput();
                    pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
                 else 
                    //关闭socket
                    close(voidPromise());
                
             else 
                //激发事件
                inputClosedSeenErrorOnRead = true;
                pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
            
        

异常处理的逻辑

        //异常处理
        private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
                RecvByteBufAllocator.Handle allocHandle) 
            if (byteBuf != null) 
                //如果byteBuf有数据,则交给管道处理
                if (byteBuf.isReadable()) 
                    readPending = false;
                    pipeline.fireChannelRead(byteBuf);
                 else 
                    //释放资源
                    byteBuf.release();
                
            
            //容量计算机更新guess大小
            allocHandle.readComplete();
            //激发读完成事件和异常事件
            pipeline.fireChannelReadComplete();
            pipeline.fireExceptionCaught(cause);

            //关闭socket
            if (close || cause instanceof IOException) 
                closeOnRead(pipeline);
            
        

 

关闭socket的逻辑,需要释放输出队列缓存,触发关闭事件,关闭底层socket等。

 private void close(final ChannelPromise promise, final Throwable cause,
                           final ClosedChannelException closeCause, final boolean notify) 
            //设置不能取消
            if (!promise.setUncancellable()) 
                return;
            

            //防止二次关闭,如果已经关闭
            if (closeInitiated) 
                if (closeFuture.isDone()) 
                    // Closed already.
                    safeSetSuccess(promise);
                 else if (!(promise instanceof VoidChannelPromise))  // Only needed if no VoidChannelPromise.
                    // This means close() was called before so we just register a listener and return
                    closeFuture.addListener(new ChannelFutureListener() 
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception 
                            promise.setSuccess();
                        
                    );
                
                return;
            

            closeInitiated = true;
            //判断当前连接是否还连着
            final boolean wasActive = isActive();
            //输出缓存队列
            final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
            Executor closeExecutor = prepareToClose();
            if (closeExecutor != null) 
                closeExecutor.execute(new Runnable() 
                    @Override
                    public void run() 
                        try 
                            // Execute the close.
                            //调用底层关闭socket
                            doClose0(promise);
                         finally 
                            // Call invokeLater so closeAndDeregister is executed in the EventLoop again!
                            invokeLater(new Runnable() 
                                @Override
                                public void run() 
                                    if (outboundBuffer != null) 
                                        // Fail all the queued messages
                                        //删除队列缓存中还没发出去的消息,释放资源
                                        outboundBuffer.failFlushed(cause, notify);
                                        outboundBuffer.close(closeCause);
                                    
                                    //激发fireChannelInactive事件
                                    //激发fireChannelUnregistered事件
                                    fireChannelInactiveAndDeregister(wasActive);
                                
                            );
                        
                    
                );
             else 
                try 
                    // Close the channel and fail the queued messages in all cases.
                    doClose0(promise);
                 finally 
                    if (outboundBuffer != null) 
                        // Fail all the queued messages.
                        outboundBuffer.failFlushed(cause, notify);
                        outboundBuffer.close(closeCause);
                    
                
                if (inFlush0) 
                    invokeLater(new Runnable() 
                        @Override
                        public void run() 
                            fireChannelInactiveAndDeregister(wasActive);
                        
                    );
                 else 
                    fireChannelInactiveAndDeregister(wasActive);
                
            
        

 

 

技术交流QQ:212320390

关注公众号

以上是关于Netty源码分析-NioByteUnsafe(read读取流程)的主要内容,如果未能解决你的问题,请参考以下文章

如何编译 netty 源码并导入android studio

netty里的ByteBuf扩容源码分析

Netty源码分析(七) PoolChunk

源码分析Netty4专栏

源码分析Netty4专栏

Netty源码分析:read