高性能Netty之内存池源码分析
Posted 搬运工来架构
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了高性能Netty之内存池源码分析相关的知识,希望对你有一定的参考价值。
Netty内存池是将内存的分配管理起来减少内存碎片和避免内存浪费,Netty内存池参考了Slab分配和Buddy分配思想;Slab分配是将内存分割成大小不等的内存块,在用户线程请求时根据请求的内存大小分配最为贴近size的内存快,减少了内存碎片同时避免了内存浪费;Buddy分配是把一块内存块等量分割,回收时候进行合并,尽可能保证系统中有足够大的连续内存;
(0) 内存数据结构
内存分级从上到下主要分为:Arena,ChunkList,Chunk,Page,SubPage五级;
ooledArena是一块连续的内存块,为了优化并发性能在Netty内存池中存在一个由多个Arena组成的数组,在多个线程进行内存分配时会按照轮询策略选择一个Arena进行内存分配;
●一个PoolArena内存块是由两个SubPagePools(用来存储零碎内存)和多个ChunkList组成,两个SubpagePools数组分别为tinySubpagePools和smallSubpagePools。每个ChunkList里包含多个Chunk按照双向链表排列,每个Chunk里包含多个Page(默认2048个),每个Page(默认大小为8k字节)由多个Subpage组成。
●每个ChunkList里包含的Chunk数量会动态变化,比如当该chunk的内存利用率变化时会向其它ChunkList里移动。
1final PooledByteBufAllocator parent;
2
3private final int maxOrder;
4final int pageSize;
5final int pageShifts;
6final int chunkSize;
7final int subpageOverflowMask;
8final int numSmallSubpagePools;
9final int directMemoryCacheAlignment;
10final int directMemoryCacheAlignmentMask;
11
12private final PoolSubpage<T>[] tinySubpagePools;
13private final PoolSubpage<T>[] smallSubpagePools;
14
15private final PoolChunkList<T> q050;
16private final PoolChunkList<T> q025;
17private final PoolChunkList<T> q000;
18private final PoolChunkList<T> qInit;
19private final PoolChunkList<T> q075;
20private final PoolChunkList<T> q100;
内存池内存分配规则
●对于小于PageSize大小的内存分配,会在tinySubPagePools和smallSubPagePools中分配,tinySubPagePools用来分配小于512字节的内存,smallSubPagePools用来分配大于512字节小于PageSize的内存;
●对于大于PageSize小于ChunkSize的内存分配,会在PoolChunkList中的Chunk中分配
●对于大于ChunkSize的内存分配,会之间直接创建非池化的Chunk来分配,并且该Chunk不会放在内存池中重用。
(1) 内存池的入口PoolByteBufAllocator
内存池进行内存分配是通过PooledByteBufAllocator类的buffer()方法实现的
1public static void main(String[] args) {
2 ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(1024); //默认直接内存
3 buf.writeBytes("hello".getBytes());
4
5 PooledByteBufAllocator p = new PooledByteBufAllocator(false); //堆内存(false)或者直接内存
6 ByteBuf buf1 = p.buffer(1024);
7 buf1.writeBytes("world".getBytes());
8}
判断创建的缓冲区的类型,直接缓冲区或者堆缓冲区,如果在创建PooledByteBufAllocator实例时参数是false则为堆缓冲区
1public ByteBuf buffer(int initialCapacity) {
2 if (directByDefault) {
3 return directBuffer(initialCapacity);
4 }
5 return heapBuffer(initialCapacity);
6}
通过newHeapBuffer()方法创建堆缓冲区
1public ByteBuf heapBuffer(int initialCapacity) {
2 return heapBuffer(initialCapacity, DEFAULT_MAX_CAPACITY);
3}
4
5@Override
6public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
7 if (initialCapacity == 0 && maxCapacity == 0) {
8 return emptyBuf;
9 }
10 validate(initialCapacity, maxCapacity);
11 return newHeapBuffer(initialCapacity, maxCapacity);
12}
newHeapBuffer()方法首先从PoolThreadLocalCache中获取与线程绑定的缓存池PoolThreadCache,缓存池中保存着回收的内存;
PoolThreadLocalCache继承了FastThreadLocal保存线程与内存缓冲池(PoolThreadCache)的映射,在进行内存分配时先映射中取出缓存内存块Arena,再将内存分配委托给内存块Arena的allocate()方法;
1protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
2 PoolThreadCache cache = threadCache.get();
3 PoolArena<byte[]> heapArena = cache.heapArena;
4
5 final ByteBuf buf;
6 if (heapArena != null) {
7 buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
8 } else {
9 buf = PlatformDependent.hasUnsafe() ?
10 new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
11 new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
12 }
13
14 return toLeakAwareBuffer(buf);
15}
如果不存在与线程对应的缓存则轮询分配一个Arean数组中的Arena内存块创建一个新的PoolThreadCache作为内存缓存
1protected synchronized PoolThreadCache initialValue() {
2 final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
3 final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
4
5 if (useCacheForAllThreads || Thread.currentThread() instanceof FastThreadLocalThread) {
6 return new PoolThreadCache(
7 heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
8 DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
9 }
10 // No caching for non FastThreadLocalThreads.
11 return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
12}
(2) 内存块PoolArena
在应用层通过设置PooledByteBufAllocator来执行ByteBuf的分配,但是最终的内存分配工作被委托给PoolArena;由于Netty常用于高并发系统,所以各个线程进行内存分配时竞争不可避免,这可能会极大的影响内存分配的效率,为了缓解高并发时的线程竞争,Netty允许使用者创建多个分配器(Arena)来分离锁,提高内存分配效率,当然是以内存来作为代价的。
PoolByteBufAllocator将内存分配的任务委托给Arena进行,主要包括两步:一步是从Recycler对象池中获取复用的Buf对象,另外一步是为Buf对象分配内存;
1PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
2 PooledByteBuf<T> buf = newByteBuf(maxCapacity); //获取复用对象
3 allocate(cache, buf, reqCapacity); //分配内存
4 return buf;
5}
调用allocate()方法从Arena内存块中分配内存
●判断需要分配的内存大小是否大于PageSize,如果小于PageSize则分配tiny内存或者small内存
如果需要分配的内存小于PageSize,判断是否小于512,如果小于则调用allocateTiny()方法进行tiny内存分配,否则调用allocateSmall()方法进行small内存分配;
●如果需要分配的内存大于PageSize,再判断是否大于ChunkSize,如果小于ChunkSize则调用allocateNormal()方法进行normal内存分配;
●如果需要分配的内存大于ChunkSize,内存池无法分配需要JVM分配则调用allocateHuge()方法在池外进行分配;
1private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
2 final int normCapacity = normalizeCapacity(reqCapacity);
3 if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
4 int tableIdx;
5 PoolSubpage<T>[] table;
6 boolean tiny = isTiny(normCapacity);
7 if (tiny) { // < 512
8 if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
9 // was able to allocate out of the cache so move on
10 return;
11 }
12 tableIdx = tinyIdx(normCapacity);
13 table = tinySubpagePools;
14 } else {
15 if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
16 // was able to allocate out of the cache so move on
17 return;
18 }
19 tableIdx = smallIdx(normCapacity);
20 table = smallSubpagePools;
21 }
22
23 final PoolSubpage<T> head = table[tableIdx];
24
25 /**
26 * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
27 * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
28 */
29 synchronized (head) {
30 final PoolSubpage<T> s = head.next;
31 if (s != head) {
32 assert s.doNotDestroy && s.elemSize == normCapacity;
33 long handle = s.allocate();
34 assert handle >= 0;
35 s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
36 incTinySmallAllocation(tiny);
37 return;
38 }
39 }
40 synchronized (this) {
41 allocateNormal(buf, reqCapacity, normCapacity);
42 }
43
44 incTinySmallAllocation(tiny);
45 return;
46 }
47 if (normCapacity <= chunkSize) {
48 if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
49 // was able to allocate out of the cache so move on
50 return;
51 }
52 synchronized (this) {
53 allocateNormal(buf, reqCapacity, normCapacity);
54 ++allocationsNormal;
55 }
56 } else {
57 // Huge allocations are never served via the cache so just call allocateHuge
58 allocateHuge(buf, reqCapacity);
59 }
60}
内存池的初始阶段,线程是没有内存缓存的,所以最开始的内存分配都需要在Chunk分配区进行分配;也就是说无论是tinySubpagePools还是smallSubpagePools成员,在内存池初始化时是不会预置内存的,所以最开始的内存分配都会进入PoolArena的allocateNormal方法:
调用allocateNormal()方法从Chunk级别上分配内存,从PoolChunkList中查找可用PoolChunk并进行内存分配,如果没有可用的PoolChunk则创建一个并加入到PoolChunkList中,完成此次内存分配
1private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
2 if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
3 q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
4 q075.allocate(buf, reqCapacity, normCapacity)) {
5 return;
6 }
7
8 // Add a new chunk.
9 PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
10 long handle = c.allocate(normCapacity);
11 assert handle > 0;
12 c.initBuf(buf, handle, reqCapacity);
13 qInit.add(c);
14}
从Arena中创建新的PoolChunk后根据其内存占用率放入相应的ChunkList中;
1void add(PoolChunk<T> chunk) {
2
3 if (chunk.usage() >= maxUsage) {
4
5 nextList.add(chunk);
6
7 return;
8
9 }
10
11 chunk.parent = this;
12
13 if (head == null) {
14
15 head = chunk;
16
17 chunk.prev = null;
18
19 chunk.next = null;
20
21 } else {
22
23 chunk.prev = null;
24
25 chunk.next = head;
26
27 head.prev = chunk;
28
29 head = chunk;
30
31 }
32
33}
(3) 内存块分配基本单元PoolChunk
PoolChunk的几个重要参数
●memory,物理内存,内存请求者千辛万苦拐弯抹角就是为了得到它,在HeapArena中它就是一个chunkSize大小的byte数组;默认PoolChunk是由11层二叉树构成,也就是大小为ChunkSize=2048*PageSize
●memoryMap数组,内存分配控制信息,数组元素是一个32位的整数
●subpages数组,页分配信息,数组元素的个数等于chunk中page的数量。
从Arena中创建PoolChunk后,通过调用PoolChunk.allocate()方法真正进行内存分配:
在Chunk中的内存分配是根据需要分配的内存大小将Page内存页划分为SunPage,并将多余的SubPage加入到SubPagePools缓存中,将被分配的Page和SubPage在控制数组中进行标记;
1private long allocateSubpage(int normCapacity) {
2 PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);
3 synchronized (head) {
4 int d = maxOrder; // subpages are only be allocated from pages i.e., leaves
5 int id = allocateNode(d);
6 if (id < 0) {
7 return id;
8 }
9
10 final PoolSubpage<T>[] subpages = this.subpages;
11 final int pageSize = this.pageSize;
12
13 freeBytes -= pageSize;
14
15 int subpageIdx = subpageIdx(id);
16 PoolSubpage<T> subpage = subpages[subpageIdx];
17 if (subpage == null) {
18 subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);
19 subpages[subpageIdx] = subpage;
20 } else {
21 subpage.init(head, normCapacity);
22 }
23 return subpage.allocate();
24 }
25}
总结
◆内存池主要是将内存分配管理起来不经过JVM的内存分配,有效减小内存碎片避免内存浪费,同时也能减少频繁GC带来的性能影响;
◆内存池内存分配入口是PoolByteBufAllocator类,该类最终将内存分配委托给PoolArena进行;为了减少高并发下多线程内存分配碰撞带来的性能影响,PoolByteBufAllocator维护着一个PoolArena数组,线程通过轮询获取其中一个进行内存分配,进而实现锁分离;
◆内存分配的基本单元是PoolChunk,从PoolArena中分配获取一个PoolChunk,一个PoolChunk包含多个Page内存页,通过完全二叉树维护多个内存页用于内存分配。
推荐阅读
搬运工来架构
以上是关于高性能Netty之内存池源码分析的主要内容,如果未能解决你的问题,请参考以下文章