Netty源码分析-AdaptiveRecvByteBufAllocator

Posted 征服.刘华强

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty源码分析-AdaptiveRecvByteBufAllocator相关的知识,希望对你有一定的参考价值。

这个类的核心目的就是根据从底层socket读取的字节数量,动态调整分配空间,以及是否继续从socket读取字节流

 

        @Override
        public final void read() 
            final ChannelConfig config = config();
            if (shouldBreakReadReady(config)) 
                clearReadPending();
                return;
            
            //每个channel对应一个PPLine
            final ChannelPipeline pipeline = pipeline();
            //ByteBuf分配器
            final ByteBufAllocator allocator = config.getAllocator();
            //容量计算器
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            //重置,把之前计数的值全部清空
            allocHandle.reset(config);

            ByteBuf byteBuf = null;
            boolean close = false;
            try 
                do 
                    //分配内存,关键在于计算分配内存的大小(小了不够,大了浪费)
                    byteBuf = allocHandle.allocate(allocator);
                    //doReadBytes,从socket读取字节到byteBuf,返回真实读取数量
                    //更新容量计算器
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    //如果小于0 则socket关闭,如果等于0则没读取到数据
                    if (allocHandle.lastBytesRead() <= 0) 
                        // nothing was read. release the buffer.
                        //释放资源
                        byteBuf.release();
                        byteBuf = null;
                        //如果小于0则意味着socket关闭
                        close = allocHandle.lastBytesRead() < 0;
                        if (close) 
                            // There is nothing left to read as we received an EOF.
                            readPending = false;
                        
                        break;
                    

                    //增加循环计数器
                    allocHandle.incMessagesRead(1);
                    readPending = false;
                    //把读取到的数据,交给管道去处理
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;
                    //判断是否继续从socket读取数据
                 while (allocHandle.continueReading());

                //读取完成后调用readComplete,重新估算内存分配容量
                allocHandle.readComplete();
                //事件激发
                pipeline.fireChannelReadComplete();

                //如果需要关闭,则处理关闭
                if (close) 
                    closeOnRead(pipeline);
                
             catch (Throwable t) 
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
             finally 
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!readPending && !config.isAutoRead()) 
                    removeReadOp();
                
            
        
    

 

 public abstract class MaxMessageHandle implements ExtendedHandle 
        private ChannelConfig config;
        private int maxMessagePerRead;
        private int totalMessages;
        private int totalBytesRead;
        private int attemptedBytesRead;
        private int lastBytesRead;
        private final boolean respectMaybeMoreData = DefaultMaxMessagesRecvByteBufAllocator.this.respectMaybeMoreData;
        private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() 
            @Override
            public boolean get() 
                return attemptedBytesRead == lastBytesRead;
            
        ;

        /**
         * Only @link ChannelConfig#getMaxMessagesPerRead() is used.
         */
        @Override
        public void reset(ChannelConfig config) 
            //每次底层socket开始读取数据时调用,清空状态。
            this.config = config;
            maxMessagePerRead = maxMessagesPerRead();
            totalMessages = totalBytesRead = 0;
        

        @Override
        public ByteBuf allocate(ByteBufAllocator alloc) 
            //创建一个ByteBuf,重点在于guess方法
            //最理想的状态是分配的容量刚好可以容纳本次socket的缓冲区字节
            //分配空间过大浪费内存,过小需要循环读取多次
            return alloc.ioBuffer(guess());
        

        @Override
        public final void incMessagesRead(int amt) 
            //每次通过socket读取消息时+1,记录从socket读取了几次数据
            totalMessages += amt;
        

        @Override
        public void lastBytesRead(int bytes) 
            //每次通过socket读取字节后会记录字节数
            lastBytesRead = bytes;
            if (bytes > 0) 
                //累计总数
                totalBytesRead += bytes;
            
        

        @Override
        public final int lastBytesRead() 
            //最后一次从socket读取的字节数
            return lastBytesRead;
        

        @Override
        public boolean continueReading() 
            //计算是否需要继续从底层socket读取数据
            return continueReading(defaultMaybeMoreSupplier);
        

        //maybeMoreDataSupplier的实现是判断attemptedBytesRead == lastBytesRead;
        //attemptedBytesRead是bytebuf的可写空间,也就是希望读取多少字节
        //lastBytesRead是真实从socket读取到的字节数,如果一致说明bytebuf写满了,可能后续还有字节
        @Override
        public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) 
            //config.isAutoRead() 是否配置了自动读取
            //totalMessages < maxMessagePerRead 总共读取的次数小于阈值
            //totalBytesRead > 0 确实读取到了字节
            return config.isAutoRead() &&
                   (!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
                   totalMessages < maxMessagePerRead &&
                   totalBytesRead > 0;
        

        @Override
        public void readComplete() 
        

        @Override
        public int attemptedBytesRead() 
            return attemptedBytesRead;
        

        @Override
        public void attemptedBytesRead(int bytes) 
            attemptedBytesRead = bytes;
        

        protected final int totalBytesRead() 
            return totalBytesRead < 0 ? Integer.MAX_VALUE : totalBytesRead;
        
    

 

/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package io.netty.channel;

import java.util.ArrayList;
import java.util.List;

import static io.netty.util.internal.ObjectUtil.checkPositive;
import static java.lang.Math.max;
import static java.lang.Math.min;

/**
 * The @link RecvByteBufAllocator that automatically increases and
 * decreases the predicted buffer size on feed back.
 * <p>
 * It gradually increases the expected number of readable bytes if the previous
 * read fully filled the allocated buffer.  It gradually decreases the expected
 * number of readable bytes if the read operation was not able to fill a certain
 * amount of the allocated buffer two times consecutively.  Otherwise, it keeps
 * returning the same prediction.
 */
public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator 

    //最小字节大小
    static final int DEFAULT_MINIMUM = 64;
    //初始化字节大小
    static final int DEFAULT_INITIAL = 1024;
    //最大字节大小
    static final int DEFAULT_MAXIMUM = 65536;

    //递增,递减的索引大小
    private static final int INDEX_INCREMENT = 4;
    private static final int INDEX_DECREMENT = 1;

    //容量数组16,32,48...512...1024...1073741824 成倍增长
    private static final int[] SIZE_TABLE;

    static 
        List<Integer> sizeTable = new ArrayList<Integer>();
        //从16开始,每次增加16,一直到496
        for (int i = 16; i < 512; i += 16) 
            sizeTable.add(i);
        

        //从512开始,每次翻倍,一直到溢出为负数,最大值是1073741824
        for (int i = 512; i > 0; i <<= 1) 
            sizeTable.add(i);
        

        //把list当中的值放入数组中
        SIZE_TABLE = new int[sizeTable.size()];
        for (int i = 0; i < SIZE_TABLE.length; i ++) 
            SIZE_TABLE[i] = sizeTable.get(i);
        
    

    /**
     * @deprecated There is state for @link #maxMessagesPerRead() which is typically based upon channel type.
     */
    @Deprecated
    public static final AdaptiveRecvByteBufAllocator DEFAULT = new AdaptiveRecvByteBufAllocator();

    //给定一个size,查找数组中相同容量值的下标位置,采用二分查找法
    private static int getSizeTableIndex(final int size) 
        for (int low = 0, high = SIZE_TABLE.length - 1;;) 
            if (high < low) 
                return low;
            
            if (high == low) 
                return high;
            
            //这里+会优先计算,然后在右移(除2)
            //mid取中间位置
            int mid = low + high >>> 1;
            int a = SIZE_TABLE[mid]; //中间位置值
            int b = SIZE_TABLE[mid + 1];//中间位置+1的值
            if (size > b) 
                //这种情况low需要变大
                low = mid + 1;
             else if (size < a) 
                //这种情况high需要变小
                high = mid - 1;
             else if (size == a) 
                //返回下标
                return mid;
             else 
                //返回下标
                return mid + 1;
            
        
    

    private final class HandleImpl extends MaxMessageHandle 
        private final int minIndex; //最小容量数组下标
        private final int maxIndex;//最大容量数组下标
        private int index;//初始化容量下标
        private int nextReceiveBufferSize; //下次分配的内存大小,默认SIZE_TABLE[index]
        private boolean decreaseNow;

        HandleImpl(int minIndex, int maxIndex, int initial) 
            this.minIndex = minIndex;
            this.maxIndex = maxIndex;

            index = getSizeTableIndex(initial);
            nextReceiveBufferSize = SIZE_TABLE[index];
        

        @Override
        public void lastBytesRead(int bytes) 
            // If we read as much as we asked for we should check if we need to ramp up the size of our next guess.
            // This helps adjust more quickly when large amounts of data is pending and can avoid going back to
            // the selector to check for more data. Going back to the selector can add significant latency for large
            // data transfers.
            //真实读取的字节数与期望(bytebuf的容量)一样多
            //说明一次读取已经写满bytebuf,则应该调整guess的大小
            if (bytes == attemptedBytesRead()) 
                record(bytes);
            
            //父类记录最后一次读取字节数
            super.lastBytesRead(bytes);
        

        @Override
        public int guess() 
            //分配内存空间bytebuf的大小
            return nextReceiveBufferSize;
        

        //actualReadBytes=从socket读取的字节数
        private void record(int actualReadBytes) 
            //如果读取字节数 小于等于SIZE_TABLE[index-1]则有必要减少分配空间
            if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT)]) 
                if (decreaseNow) 
                    //把index减少后不能少于minIndex
                    index = max(index - INDEX_DECREMENT, minIndex);
                    //重新赋值,相当于减少了下次分配的空间大小
                    nextReceiveBufferSize = SIZE_TABLE[index];
                    decreaseNow = false;
                 else 
                    decreaseNow = true;
                
                //如果读取字节数大于等于分配的空间,则说明有不要增大分配空间
             else if (actualReadBytes >= nextReceiveBufferSize) 
                //把index增大,不超过maxIndex
                index = min(index + INDEX_INCREMENT, maxIndex);
                //重新赋值,相当于增大了下次分配的空间
                nextReceiveBufferSize = SIZE_TABLE[index];
                decreaseNow = false;
            
        

        @Override
        public void readComplete() 
            //在读取完成后,传入所有读取的字节总数,进行容量调整
            record(totalBytesRead());
        
    

    private final int minIndex;
    private final int maxIndex;
    private final int initial;

    /**
     * Creates a new predictor with the default parameters.  With the default
     * parameters, the expected buffer size starts from @code 1024, does not
     * go down below @code 64, and does not go up above @code 65536.
     */
    public AdaptiveRecvByteBufAllocator() 
        this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);
    

    /**
     * Creates a new predictor with the specified parameters.
     *
     * @param minimum  the inclusive lower bound of the expected buffer size
     * @param initial  the initial buffer size when no feed back was received
     * @param maximum  the inclusive upper bound of the expected buffer size
     */
    public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) 
        checkPositive(minimum, "minimum");
        if (initial < minimum) 
            throw new IllegalArgumentException("initial: " + initial);
        
        if (maximum < initial) 
            throw new IllegalArgumentException("maximum: " + maximum);
        

        //根据minimum查找数组下标
        int minIndex = getSizeTableIndex(minimum);
        if (SIZE_TABLE[minIndex] < minimum) 
            this.minIndex = minIndex + 1;
         else 
            this.minIndex = minIndex;
        

        //根据maximum查找数组下标
        int maxIndex = getSizeTableIndex(maximum);
        if (SIZE_TABLE[maxIndex] > maximum) 
            this.maxIndex = maxIndex - 1;
         else 
            this.maxIndex = maxIndex;
        

        this.initial = initial;
    

    @SuppressWarnings("deprecation")
    @Override
    public Handle newHandle() 
        return new HandleImpl(minIndex, maxIndex, initial);
    

    @Override
    public AdaptiveRecvByteBufAllocator respectMaybeMoreData(boolean respectMaybeMoreData) 
        super.respectMaybeMoreData(respectMaybeMoreData);
        return this;
    

 

以上是关于Netty源码分析-AdaptiveRecvByteBufAllocator的主要内容,如果未能解决你的问题,请参考以下文章

源码分析Netty4专栏

源码分析Netty4专栏

Netty-源码分析LineBasedFrameDecoder

Netty源码分析:read

Netty源码分析:read

[Netty源码分析]ByteBuf(一)