Netty源码分析:PooledByteBufAllocator
Posted HelloWorld_EE
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty源码分析:PooledByteBufAllocator相关的知识,希望对你有一定的参考价值。
Netty源码分析:PooledByteBufAllocator
无论是我们使用语句ByteBuf byteBuf = Unpooled.buffer(256);
来分配buf,还是使用如下的语句来分配Buf:
PooledByteBufAllocator allocator = new PooledByteBufAllocator(false);
ByteBuf byteBuf = allocator.heapBuffer(15);
都是使用了 PooledByteBufAllocator 这个类类分配Buf。因此就来分析下这个类。
1、常量的说明
public class PooledByteBufAllocator extends AbstractByteBufAllocator
private static final InternalLogger logger = InternalLoggerFactory.getInstance(PooledByteBufAllocator.class);
//默认的PoolArena个数,堆内存类型
private static final int DEFAULT_NUM_HEAP_ARENA;
//默认的PoolArena个数,直接内存类型
private static final int DEFAULT_NUM_DIRECT_ARENA;
//默认的Page的个数,最小为4K,默认为8K
private static final int DEFAULT_PAGE_SIZE;
/*
由于每个chunk中的page是用平衡二叉树映射管理每个PoolSubpage是否被分配,
maxOrder为树的深度,深度为maxOrder层的节点数量为 1 << maxOrder。
*/
private static final int DEFAULT_MAX_ORDER; //默认为 11 //默认的tiny cache 的大小
private static final int DEFAULT_TINY_CACHE_SIZE; //512
//默认的small cache的大小
private static final int DEFAULT_SMALL_CACHE_SIZE;// 256
//默认的normal cache的大小
private static final int DEFAULT_NORMAL_CACHE_SIZE;//64
private static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY;
private static final int DEFAULT_CACHE_TRIM_INTERVAL;
//page容量的最小值,为4K。
private static final int MIN_PAGE_SIZE = 4096;
//最大chunk的大小,等于2的30次方,即1G。
private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + 1) / 2);
以上这些常量除了最后两个都是在如下的static块中进行初始化。
static
int defaultPageSize = SystemPropertyUtil.getInt("io.netty.allocator.pageSize", 8192);
Throwable pageSizeFallbackCause = null;
try
validateAndCalculatePageShifts(defaultPageSize);
catch (Throwable t)
pageSizeFallbackCause = t;
defaultPageSize = 8192;
DEFAULT_PAGE_SIZE = defaultPageSize;
int defaultMaxOrder = SystemPropertyUtil.getInt("io.netty.allocator.maxOrder", 11);
Throwable maxOrderFallbackCause = null;
try
validateAndCalculateChunkSize(DEFAULT_PAGE_SIZE, defaultMaxOrder);
catch (Throwable t)
maxOrderFallbackCause = t;
defaultMaxOrder = 11;
DEFAULT_MAX_ORDER = defaultMaxOrder;
// Determine reasonable default for nHeapArena and nDirectArena.
// Assuming each arena has 3 chunks, the pool should not consume more than 50% of max memory.
final Runtime runtime = Runtime.getRuntime();
final int defaultChunkSize = DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER;
DEFAULT_NUM_HEAP_ARENA = Math.max(0,
SystemPropertyUtil.getInt(
"io.netty.allocator.numHeapArenas",
(int) Math.min(
runtime.availableProcessors(),
Runtime.getRuntime().maxMemory() / defaultChunkSize / 2 / 3)));
DEFAULT_NUM_DIRECT_ARENA = Math.max(0,
SystemPropertyUtil.getInt(
"io.netty.allocator.numDirectArenas",
(int) Math.min(
runtime.availableProcessors(),
PlatformDependent.maxDirectMemory() / defaultChunkSize / 2 / 3)));
// cache sizes
DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512);
DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256);
DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64);
// 32 kb is the default maximum capacity of the cached buffer. Similar to what is explained in
// 'Scalable memory allocation using jemalloc'
DEFAULT_MAX_CACHED_BUFFER_CAPACITY = SystemPropertyUtil.getInt(
"io.netty.allocator.maxCachedBufferCapacity", 32 * 1024);
// the number of threshold of allocations when cached entries will be freed up if not frequently used
DEFAULT_CACHE_TRIM_INTERVAL = SystemPropertyUtil.getInt(
"io.netty.allocator.cacheTrimInterval", 8192);
//省略了部分日志输出代码
从中可以得到如下的信息
1、首先是对DEFAULT_PAGE_SIZE进行初始化,默认是8K,用户可以通过设置io.netty.allocator.pageSize来设置。
2、validateAndCalculatePageShifts函数用来检查pageSize是否大于MIN_PAGE_SIZE
(4K)且是2的幂次方。
3、对树的深度DEFAULT_MAX_ORDER进行初始化,默认是11,用户可以通过io.netty.allocator.maxOrder来进行设置。
4、初始化默认chunk的大小,为PageSize * (2 的 maxOrder幂)。
defaultChunkSize = DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER
5、 计算PoolAreana的个数,PoolArena默认为: cpu核心线程数 与 最大堆内存/2/(3*chunkSize) 这两个数中的较小者。这里的除以2是为了确保系统分配的所有PoolArena占用的内存不超过系统可用内存的一半,这里的除以3是为了保证每个PoolArena至少可以由3个PoolChunk组成。
用户如果想修改,则通过设置io.netty.allocator.numHeapArenas/numDirectArenas来进行修改。
6、对cache sizes进行了设置,如下:
DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512);
DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256);
DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64);
7、 还对其他常量也进行了设置。
2、构造函数
下面来看下PooledByteBufAllocator的构造函数。
public PooledByteBufAllocator(boolean preferDirect)
this(preferDirect, DEFAULT_NUM_HEAP_ARENA, DEFAULT_NUM_DIRECT_ARENA, DEFAULT_PAGE_SIZE, DEFAULT_MAX_ORDER);
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder)
this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder,
DEFAULT_TINY_CACHE_SIZE, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE);
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
int tinyCacheSize, int smallCacheSize, int normalCacheSize)
super(preferDirect);
threadCache = new PoolThreadLocalCache();
this.tinyCacheSize = tinyCacheSize;
this.smallCacheSize = smallCacheSize;
this.normalCacheSize = normalCacheSize;
//得到chunkSize,其值为:pageSize*2^maxOrder
final int chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);
if (nHeapArena < 0)
throw new IllegalArgumentException("nHeapArena: " + nHeapArena + " (expected: >= 0)");
if (nDirectArena < 0)
throw new IllegalArgumentException("nDirectArea: " + nDirectArena + " (expected: >= 0)");
int pageShifts = validateAndCalculatePageShifts(pageSize);
if (nHeapArena > 0)
heapArenas = newArenaArray(nHeapArena);
for (int i = 0; i < heapArenas.length; i ++)
heapArenas[i] = new PoolArena.HeapArena(this, pageSize, maxOrder, pageShifts, chunkSize);
else
heapArenas = null;
if (nDirectArena > 0)
directArenas = newArenaArray(nDirectArena);
for (int i = 0; i < directArenas.length; i ++)
directArenas[i] = new PoolArena.DirectArena(this, pageSize, maxOrder, pageShifts, chunkSize);
else
directArenas = null;
通过如上的构造函数可以看到,干了如下几件事:
1)使用了默认的值初始化了如下的字段:
private final int tinyCacheSize;
private final int smallCacheSize;
private final int normalCacheSize;
2)使用new PoolThreadLocalCache()实例化了threadCache 字段。
final PoolThreadLocalCache threadCache;
3)重点:实例化了如下两个数组。
private final PoolArena<byte[]>[] heapArenas;
private final PoolArena<ByteBuffer>[] directArenas;
在实例化上面两个PoolArena时,用到了如下的两个参数
3.1)chunkSize
调用validateAndCalculateChunkSize函数求得,其值为:pageSize*2^maxOrder
3.2)pageShifts
调用如下的validateAndCalculatePageShifts求得,
private static int validateAndCalculatePageShifts(int pageSize)
if (pageSize < MIN_PAGE_SIZE)
throw new IllegalArgumentException("pageSize: " + pageSize + " (expected: " + MIN_PAGE_SIZE + "+)");
if ((pageSize & pageSize - 1) != 0)
throw new IllegalArgumentException("pageSize: " + pageSize + " (expected: power of 2)");
// Logarithm base 2. At this point we know that pageSize is a power of two.
return Integer.SIZE - 1 - Integer.numberOfLeadingZeros(pageSize);
该函数首先检查了pageSize是否大于4K且为2的幂次方,如果不是则抛异常。
如果是,则返回Integer.SIZE - 1 - Integer.numberOfLeadingZeros(pageSize)的结果,结果是什么呢?
假设pageSize为8192, 2的13次方的二进制码为:(0000 0000 0000 0000 0010 0000 0000 0000),其补码与原码一样,而Integer.numberOfLeadingZeros返回pageSize的补码最高位的1的左边连续零的个数。而8192的二进制的高位有18个0,因此pageShifts为13。简单来说pageShifts=log(pageSize)。
既然看到这里,PoolArena.HeapArena和PoolArena.DirectArena中如下的构造函数.
heapArenas[i] = new PoolArena.HeapArena(this, pageSize, maxOrder, pageShifts, chunkSize);
directArenas[i] = new PoolArena.DirectArena(this, pageSize, maxOrder, pageShifts, chunkSize);
PoolArena.HeapArena构造函数如下;
HeapArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize)
super(parent, pageSize, maxOrder, pageShifts, chunkSize);
调用了父类PooledArena如下的构造函数
protected PoolArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize)
//从PooledByteBufAllocator中传送过来的相关字段值。
this.parent = parent;
this.pageSize = pageSize;
this.maxOrder = maxOrder;
this.pageShifts = pageShifts;
this.chunkSize = chunkSize;
subpageOverflowMask = ~(pageSize - 1);//该变量用于判断申请的内存大小与page之间的关系,是大于,还是小于
tinySubpagePools = newSubpagePoolArray(numTinySubpagePools);
for (int i = 0; i < tinySubpagePools.length; i ++)
tinySubpagePools[i] = newSubpagePoolHead(pageSize);
numSmallSubpagePools = pageShifts - 9;
smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
for (int i = 0; i < smallSubpagePools.length; i ++)
smallSubpagePools[i] = newSubpagePoolHead(pageSize);
q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE);
q075 = new PoolChunkList<T>(this, q100, 75, 100);
q050 = new PoolChunkList<T>(this, q075, 50, 100);
q025 = new PoolChunkList<T>(this, q050, 25, 75);
q000 = new PoolChunkList<T>(this, q025, 1, 50);
qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25);
q100.prevList = q075;
q075.prevList = q050;
q050.prevList = q025;
q025.prevList = q000;
q000.prevList = null;
qInit.prevList = qInit;
该构造函数主要干了如下几件事
1)初始化parent、pageSize、maxOrder、pageShifts等字段
2)实例化了如下两个数组,这两个数组相当重要,在博文Netty源码分析:PoolArena中有详细的介绍,这里不再介绍。
private final PoolSubpage<T>[] tinySubpagePools;
private final PoolSubpage<T>[] smallSubpagePools;
3)创建了6个Chunk列表(PoolChunkList)来缓存用来分配给Normal(超过一页)大小内存的PoolChunk,每个PoolChunkList中用head字段维护一个PoolChunk链表的头部,每个PoolChunk中有prev,next字段。而PoolChunkList内部维护者一个PoolChunk链表头部。
这6个PoolChunkList解释如下:
qInit:存储剩余内存0-25%的chunk
q000:存储剩余内存1-50%的chunk
q025:存储剩余内存25-75%的chunk
q050:存储剩余内存50-100%个chunk
q075:存储剩余内存75-100%个chunk
q100:存储剩余内存100%chunk
这六个PoolChunkList也通过链表串联,串联关系是:qInit->q000->q025->q050->q075->q100.
3、ByteBuf byteBuf = allocator.heapBuffer(256)
接下来看下PooledByteBufAllocator类中heapBuffer方法(实际上是在其父类AbstractByteBufAllocator中定义的),代码如下:
@Override
public ByteBuf heapBuffer(int initialCapacity)
return heapBuffer(initialCapacity, Integer.MAX_VALUE);
@Override
public ByteBuf heapBuffer(int initialCapacity, int maxCapacity)
if (initialCapacity == 0 && maxCapacity == 0)
return emptyBuf;
validate(initialCapacity, maxCapacity);//检查参数是否正确
return newHeapBuffer(initialCapacity, maxCapacity);
继续看newHeapBuffer方法
PooledByteBufAllocator
@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity)
PoolThreadCache cache = threadCache.get();
PoolArena<byte[]> heapArena = cache.heapArena;
ByteBuf buf;
if (heapArena != null)
buf = heapArena.allocate(cache, initialCapacity, maxCapacity);//分析
else
buf = new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
return toLeakAwareBuffer(buf);
从上面的方法可知,接着就时调用了PoolArena的子类的allocate来分配内存,这个方法在博文Netty源码分析:PoolArena有详细的介绍,这里不再介绍。
小结
基于前面分析的PoolArena、PoolChunk、PoolSubage这个类之后,他们的关系总结如下:
图中想表达以下几点:
1、PooledByteBufAllocator包括2个数组:HeapArena数组和DirectArena数组。当利用PooledByteBufAllocator分配内存时,是利用Arena数组中的元素来完成。
2、HeapArena包括:6个PoolChunkList链表(链表中的元素为PoolChunk),和两个数组:tinySubpagePools和smallSubpagePools。当利用Arena来进行分配内存时,根据申请内存的大小有不同的策略,例如:如果申请内存的大小小于512时,则首先在cache尝试分配,如果分配不成功则会在tinySubpagePools尝试分配,如果分配不成功,则会在PoolChunk重新找一个PoolSubpage来进行内存分配,分配之后将此PoolSubpage保存到tinySubpagePools中。
3、PoolChunk中包括一大块内存T memory,将其分成N份,每一份就是一个PoolSubpage。
4、PoolSubpage由M个“块”构成,块的大小由第一次申请内存大小决定。当分配一次内存之后此page会被加入到PoolArena的tinySubpagePools或smallSubpagePools中,下次分配时就如果“块”大小相同,则尤其直接分配。
以上是关于Netty源码分析:PooledByteBufAllocator的主要内容,如果未能解决你的问题,请参考以下文章