Netty源码分析-AdaptiveRecvByteBufAllocator
Posted 征服.刘华强
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty源码分析-AdaptiveRecvByteBufAllocator相关的知识,希望对你有一定的参考价值。
这个类的核心目的就是根据从底层socket读取的字节数量,动态调整分配空间,以及是否继续从socket读取字节流
@Override
public final void read()
final ChannelConfig config = config();
if (shouldBreakReadReady(config))
clearReadPending();
return;
//每个channel对应一个PPLine
final ChannelPipeline pipeline = pipeline();
//ByteBuf分配器
final ByteBufAllocator allocator = config.getAllocator();
//容量计算器
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
//重置,把之前计数的值全部清空
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try
do
//分配内存,关键在于计算分配内存的大小(小了不够,大了浪费)
byteBuf = allocHandle.allocate(allocator);
//doReadBytes,从socket读取字节到byteBuf,返回真实读取数量
//更新容量计算器
allocHandle.lastBytesRead(doReadBytes(byteBuf));
//如果小于0 则socket关闭,如果等于0则没读取到数据
if (allocHandle.lastBytesRead() <= 0)
// nothing was read. release the buffer.
//释放资源
byteBuf.release();
byteBuf = null;
//如果小于0则意味着socket关闭
close = allocHandle.lastBytesRead() < 0;
if (close)
// There is nothing left to read as we received an EOF.
readPending = false;
break;
//增加循环计数器
allocHandle.incMessagesRead(1);
readPending = false;
//把读取到的数据,交给管道去处理
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
//判断是否继续从socket读取数据
while (allocHandle.continueReading());
//读取完成后调用readComplete,重新估算内存分配容量
allocHandle.readComplete();
//事件激发
pipeline.fireChannelReadComplete();
//如果需要关闭,则处理关闭
if (close)
closeOnRead(pipeline);
catch (Throwable t)
handleReadException(pipeline, byteBuf, t, close, allocHandle);
finally
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead())
removeReadOp();
public abstract class MaxMessageHandle implements ExtendedHandle
private ChannelConfig config;
private int maxMessagePerRead;
private int totalMessages;
private int totalBytesRead;
private int attemptedBytesRead;
private int lastBytesRead;
private final boolean respectMaybeMoreData = DefaultMaxMessagesRecvByteBufAllocator.this.respectMaybeMoreData;
private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier()
@Override
public boolean get()
return attemptedBytesRead == lastBytesRead;
;
/**
* Only @link ChannelConfig#getMaxMessagesPerRead() is used.
*/
@Override
public void reset(ChannelConfig config)
//每次底层socket开始读取数据时调用,清空状态。
this.config = config;
maxMessagePerRead = maxMessagesPerRead();
totalMessages = totalBytesRead = 0;
@Override
public ByteBuf allocate(ByteBufAllocator alloc)
//创建一个ByteBuf,重点在于guess方法
//最理想的状态是分配的容量刚好可以容纳本次socket的缓冲区字节
//分配空间过大浪费内存,过小需要循环读取多次
return alloc.ioBuffer(guess());
@Override
public final void incMessagesRead(int amt)
//每次通过socket读取消息时+1,记录从socket读取了几次数据
totalMessages += amt;
@Override
public void lastBytesRead(int bytes)
//每次通过socket读取字节后会记录字节数
lastBytesRead = bytes;
if (bytes > 0)
//累计总数
totalBytesRead += bytes;
@Override
public final int lastBytesRead()
//最后一次从socket读取的字节数
return lastBytesRead;
@Override
public boolean continueReading()
//计算是否需要继续从底层socket读取数据
return continueReading(defaultMaybeMoreSupplier);
//maybeMoreDataSupplier的实现是判断attemptedBytesRead == lastBytesRead;
//attemptedBytesRead是bytebuf的可写空间,也就是希望读取多少字节
//lastBytesRead是真实从socket读取到的字节数,如果一致说明bytebuf写满了,可能后续还有字节
@Override
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier)
//config.isAutoRead() 是否配置了自动读取
//totalMessages < maxMessagePerRead 总共读取的次数小于阈值
//totalBytesRead > 0 确实读取到了字节
return config.isAutoRead() &&
(!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
totalMessages < maxMessagePerRead &&
totalBytesRead > 0;
@Override
public void readComplete()
@Override
public int attemptedBytesRead()
return attemptedBytesRead;
@Override
public void attemptedBytesRead(int bytes)
attemptedBytesRead = bytes;
protected final int totalBytesRead()
return totalBytesRead < 0 ? Integer.MAX_VALUE : totalBytesRead;
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import java.util.ArrayList;
import java.util.List;
import static io.netty.util.internal.ObjectUtil.checkPositive;
import static java.lang.Math.max;
import static java.lang.Math.min;
/**
* The @link RecvByteBufAllocator that automatically increases and
* decreases the predicted buffer size on feed back.
* <p>
* It gradually increases the expected number of readable bytes if the previous
* read fully filled the allocated buffer. It gradually decreases the expected
* number of readable bytes if the read operation was not able to fill a certain
* amount of the allocated buffer two times consecutively. Otherwise, it keeps
* returning the same prediction.
*/
public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator
//最小字节大小
static final int DEFAULT_MINIMUM = 64;
//初始化字节大小
static final int DEFAULT_INITIAL = 1024;
//最大字节大小
static final int DEFAULT_MAXIMUM = 65536;
//递增,递减的索引大小
private static final int INDEX_INCREMENT = 4;
private static final int INDEX_DECREMENT = 1;
//容量数组16,32,48...512...1024...1073741824 成倍增长
private static final int[] SIZE_TABLE;
static
List<Integer> sizeTable = new ArrayList<Integer>();
//从16开始,每次增加16,一直到496
for (int i = 16; i < 512; i += 16)
sizeTable.add(i);
//从512开始,每次翻倍,一直到溢出为负数,最大值是1073741824
for (int i = 512; i > 0; i <<= 1)
sizeTable.add(i);
//把list当中的值放入数组中
SIZE_TABLE = new int[sizeTable.size()];
for (int i = 0; i < SIZE_TABLE.length; i ++)
SIZE_TABLE[i] = sizeTable.get(i);
/**
* @deprecated There is state for @link #maxMessagesPerRead() which is typically based upon channel type.
*/
@Deprecated
public static final AdaptiveRecvByteBufAllocator DEFAULT = new AdaptiveRecvByteBufAllocator();
//给定一个size,查找数组中相同容量值的下标位置,采用二分查找法
private static int getSizeTableIndex(final int size)
for (int low = 0, high = SIZE_TABLE.length - 1;;)
if (high < low)
return low;
if (high == low)
return high;
//这里+会优先计算,然后在右移(除2)
//mid取中间位置
int mid = low + high >>> 1;
int a = SIZE_TABLE[mid]; //中间位置值
int b = SIZE_TABLE[mid + 1];//中间位置+1的值
if (size > b)
//这种情况low需要变大
low = mid + 1;
else if (size < a)
//这种情况high需要变小
high = mid - 1;
else if (size == a)
//返回下标
return mid;
else
//返回下标
return mid + 1;
private final class HandleImpl extends MaxMessageHandle
private final int minIndex; //最小容量数组下标
private final int maxIndex;//最大容量数组下标
private int index;//初始化容量下标
private int nextReceiveBufferSize; //下次分配的内存大小,默认SIZE_TABLE[index]
private boolean decreaseNow;
HandleImpl(int minIndex, int maxIndex, int initial)
this.minIndex = minIndex;
this.maxIndex = maxIndex;
index = getSizeTableIndex(initial);
nextReceiveBufferSize = SIZE_TABLE[index];
@Override
public void lastBytesRead(int bytes)
// If we read as much as we asked for we should check if we need to ramp up the size of our next guess.
// This helps adjust more quickly when large amounts of data is pending and can avoid going back to
// the selector to check for more data. Going back to the selector can add significant latency for large
// data transfers.
//真实读取的字节数与期望(bytebuf的容量)一样多
//说明一次读取已经写满bytebuf,则应该调整guess的大小
if (bytes == attemptedBytesRead())
record(bytes);
//父类记录最后一次读取字节数
super.lastBytesRead(bytes);
@Override
public int guess()
//分配内存空间bytebuf的大小
return nextReceiveBufferSize;
//actualReadBytes=从socket读取的字节数
private void record(int actualReadBytes)
//如果读取字节数 小于等于SIZE_TABLE[index-1]则有必要减少分配空间
if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT)])
if (decreaseNow)
//把index减少后不能少于minIndex
index = max(index - INDEX_DECREMENT, minIndex);
//重新赋值,相当于减少了下次分配的空间大小
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
else
decreaseNow = true;
//如果读取字节数大于等于分配的空间,则说明有不要增大分配空间
else if (actualReadBytes >= nextReceiveBufferSize)
//把index增大,不超过maxIndex
index = min(index + INDEX_INCREMENT, maxIndex);
//重新赋值,相当于增大了下次分配的空间
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
@Override
public void readComplete()
//在读取完成后,传入所有读取的字节总数,进行容量调整
record(totalBytesRead());
private final int minIndex;
private final int maxIndex;
private final int initial;
/**
* Creates a new predictor with the default parameters. With the default
* parameters, the expected buffer size starts from @code 1024, does not
* go down below @code 64, and does not go up above @code 65536.
*/
public AdaptiveRecvByteBufAllocator()
this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);
/**
* Creates a new predictor with the specified parameters.
*
* @param minimum the inclusive lower bound of the expected buffer size
* @param initial the initial buffer size when no feed back was received
* @param maximum the inclusive upper bound of the expected buffer size
*/
public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum)
checkPositive(minimum, "minimum");
if (initial < minimum)
throw new IllegalArgumentException("initial: " + initial);
if (maximum < initial)
throw new IllegalArgumentException("maximum: " + maximum);
//根据minimum查找数组下标
int minIndex = getSizeTableIndex(minimum);
if (SIZE_TABLE[minIndex] < minimum)
this.minIndex = minIndex + 1;
else
this.minIndex = minIndex;
//根据maximum查找数组下标
int maxIndex = getSizeTableIndex(maximum);
if (SIZE_TABLE[maxIndex] > maximum)
this.maxIndex = maxIndex - 1;
else
this.maxIndex = maxIndex;
this.initial = initial;
@SuppressWarnings("deprecation")
@Override
public Handle newHandle()
return new HandleImpl(minIndex, maxIndex, initial);
@Override
public AdaptiveRecvByteBufAllocator respectMaybeMoreData(boolean respectMaybeMoreData)
super.respectMaybeMoreData(respectMaybeMoreData);
return this;
以上是关于Netty源码分析-AdaptiveRecvByteBufAllocator的主要内容,如果未能解决你的问题,请参考以下文章