高性能Netty之内存池源码分析

Posted 搬运工来架构

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了高性能Netty之内存池源码分析相关的知识,希望对你有一定的参考价值。



Netty内存池是将内存的分配管理起来减少内存碎片和避免内存浪费,Netty内存池参考了Slab分配和Buddy分配思想;Slab分配是将内存分割成大小不等的内存块,在用户线程请求时根据请求的内存大小分配最为贴近size的内存快,减少了内存碎片同时避免了内存浪费;Buddy分配是把一块内存块等量分割,回收时候进行合并,尽可能保证系统中有足够大的连续内存;


(0) 内存数据结构

内存分级从上到下主要分为:Arena,ChunkList,Chunk,Page,SubPage五级

ooledArena是一块连续的内存块,为了优化并发性能在Netty内存池中存在一个由多个Arena组成的数组,在多个线程进行内存分配时会按照轮询策略选择一个Arena进行内存分配

一个PoolArena内存块是由两个SubPagePools(用来存储零碎内存)和多个ChunkList组成,两个SubpagePools数组分别为tinySubpagePools和smallSubpagePools。每个ChunkList里包含多个Chunk按照双向链表排列,每个Chunk里包含多个Page(默认2048个),每个Page(默认大小为8k字节)由多个Subpage组成。

每个ChunkList里包含的Chunk数量会动态变化,比如当该chunk的内存利用率变化时会向其它ChunkList里移动。

 1final PooledByteBufAllocator parent;
2
3private final int maxOrder;
4final int pageSize;
5final int pageShifts;
6final int chunkSize;
7final int subpageOverflowMask;
8final int numSmallSubpagePools;
9final int directMemoryCacheAlignment;
10final int directMemoryCacheAlignmentMask;
11
12private final PoolSubpage<T>[] tinySubpagePools;
13private final PoolSubpage<T>[] smallSubpagePools;
14
15private final PoolChunkList<T> q050;
16private final PoolChunkList<T> q025;
17private final PoolChunkList<T> q000;
18private final PoolChunkList<T> qInit;
19private final PoolChunkList<T> q075;
20private final PoolChunkList<T> q100;

内存池内存分配规则

对于小于PageSize大小的内存分配,会在tinySubPagePools和smallSubPagePools中分配,tinySubPagePools用来分配小于512字节的内存,smallSubPagePools用来分配大于512字节小于PageSize的内存;

对于大于PageSize小于ChunkSize的内存分配,会在PoolChunkList中的Chunk中分配

对于大于ChunkSize的内存分配,会之间直接创建非池化的Chunk来分配,并且该Chunk不会放在内存池中重用。

(1) 内存池的入口PoolByteBufAllocator

内存池进行内存分配是通过PooledByteBufAllocator类的buffer()方法实现的

1public static void main(String[] args) {
2    ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(1024);    //默认直接内存
3    buf.writeBytes("hello".getBytes());
4
5    PooledByteBufAllocator p = new PooledByteBufAllocator(false);    //堆内存(false)或者直接内存
6    ByteBuf buf1 = p.buffer(1024);
7    buf1.writeBytes("world".getBytes());
8}

判断创建的缓冲区的类型,直接缓冲区或者堆缓冲区,如果在创建PooledByteBufAllocator实例时参数是false则为堆缓冲区

1public ByteBuf buffer(int initialCapacity) {
2    if (directByDefault) {
3        return directBuffer(initialCapacity);
4    }
5    return heapBuffer(initialCapacity);
6}

通过newHeapBuffer()方法创建堆缓冲区

 1public ByteBuf heapBuffer(int initialCapacity) {
2    return heapBuffer(initialCapacity, DEFAULT_MAX_CAPACITY);
3}
4
5@Override
6public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
7    if (initialCapacity == 0 && maxCapacity == 0) {
8        return emptyBuf;
9    }
10    validate(initialCapacity, maxCapacity);
11    return newHeapBuffer(initialCapacity, maxCapacity);
12}

newHeapBuffer()方法首先从PoolThreadLocalCache中获取与线程绑定的缓存池PoolThreadCache,缓存池中保存着回收的内存;

PoolThreadLocalCache继承了FastThreadLocal保存线程与内存缓冲池(PoolThreadCache)的映射,在进行内存分配时先映射中取出缓存内存块Arena,再将内存分配委托给内存块Arena的allocate()方法;

 1protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity{
2    PoolThreadCache cache = threadCache.get();
3    PoolArena<byte[]> heapArena = cache.heapArena;
4
5    final ByteBuf buf;
6    if (heapArena != null) {
7        buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
8    } else {
9        buf = PlatformDependent.hasUnsafe() ?
10                new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
11                new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
12    }
13
14    return toLeakAwareBuffer(buf);
15}

如果不存在与线程对应的缓存则轮询分配一个Arean数组中的Arena内存块创建一个新的PoolThreadCache作为内存缓存

 1protected synchronized PoolThreadCache initialValue() {
2    final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
3    final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
4
5    if (useCacheForAllThreads || Thread.currentThread() instanceof FastThreadLocalThread) {
6        return new PoolThreadCache(
7                heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
8                DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
9    }
10    // No caching for non FastThreadLocalThreads.
11    return new PoolThreadCache(heapArena, directArena, 00000);
12}


(2) 内存块PoolArena

在应用层通过设置PooledByteBufAllocator来执行ByteBuf的分配,但是最终的内存分配工作被委托给PoolArena;由于Netty常用于高并发系统,所以各个线程进行内存分配时竞争不可避免,这可能会极大的影响内存分配的效率,为了缓解高并发时的线程竞争,Netty允许使用者创建多个分配器(Arena)来分离锁,提高内存分配效率,当然是以内存来作为代价的。

PoolByteBufAllocator将内存分配的任务委托给Arena进行,主要包括两步:一步是从Recycler对象池中获取复用的Buf对象,另外一步是为Buf对象分配内存;

1PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity{
2    PooledByteBuf<T> buf = newByteBuf(maxCapacity);    //获取复用对象
3    allocate(cache, buf, reqCapacity);    //分配内存
4    return buf;
5}

调用allocate()方法从Arena内存块中分配内存

判断需要分配的内存大小是否大于PageSize,如果小于PageSize则分配tiny内存或者small内存

  • 如果需要分配的内存小于PageSize,判断是否小于512,如果小于则调用allocateTiny()方法进行tiny内存分配,否则调用allocateSmall()方法进行small内存分配;

如果需要分配的内存大于PageSize,再判断是否大于ChunkSize,如果小于ChunkSize则调用allocateNormal()方法进行normal内存分配

如果需要分配的内存大于ChunkSize,内存池无法分配需要JVM分配则调用allocateHuge()方法在池外进行分配

 1private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
2    final int normCapacity = normalizeCapacity(reqCapacity);
3    if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
4        int tableIdx;
5        PoolSubpage<T>[] table;
6        boolean tiny = isTiny(normCapacity);
7        if (tiny) { // < 512
8            if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
9                // was able to allocate out of the cache so move on
10                return;
11            }
12            tableIdx = tinyIdx(normCapacity);
13            table = tinySubpagePools;
14        } else {
15            if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
16                // was able to allocate out of the cache so move on
17                return;
18            }
19            tableIdx = smallIdx(normCapacity);
20            table = smallSubpagePools;
21        }
22
23        final PoolSubpage<T> head = table[tableIdx];
24
25        /**
26         * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
27         * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
28         */

29        synchronized (head) {
30            final PoolSubpage<T> s = head.next;
31            if (s != head) {
32                assert s.doNotDestroy && s.elemSize == normCapacity;
33                long handle = s.allocate();
34                assert handle >= 0;
35                s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
36                incTinySmallAllocation(tiny);
37                return;
38            }
39        }
40        synchronized (this) {
41            allocateNormal(buf, reqCapacity, normCapacity);
42        }
43
44        incTinySmallAllocation(tiny);
45        return;
46    }
47    if (normCapacity <= chunkSize) {
48        if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
49            // was able to allocate out of the cache so move on
50            return;
51        }
52        synchronized (this) {
53            allocateNormal(buf, reqCapacity, normCapacity);
54            ++allocationsNormal;
55        }
56    } else {
57        // Huge allocations are never served via the cache so just call allocateHuge
58        allocateHuge(buf, reqCapacity);
59    }
60}

内存池的初始阶段,线程是没有内存缓存的,所以最开始的内存分配都需要在Chunk分配区进行分配;也就是说无论是tinySubpagePools还是smallSubpagePools成员,在内存池初始化时是不会预置内存的,所以最开始的内存分配都会进入PoolArena的allocateNormal方法:

调用allocateNormal()方法从Chunk级别上分配内存,从PoolChunkList中查找可用PoolChunk并进行内存分配,如果没有可用的PoolChunk则创建一个并加入到PoolChunkList中,完成此次内存分配

 1private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity{
2    if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
3        q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
4        q075.allocate(buf, reqCapacity, normCapacity)) {
5        return;
6    }
7
8    // Add a new chunk.
9    PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
10    long handle = c.allocate(normCapacity);
11    assert handle > 0;
12    c.initBuf(buf, handle, reqCapacity);
13    qInit.add(c);
14}

从Arena中创建新的PoolChunk后根据其内存占用率放入相应的ChunkList中;

 1void add(PoolChunk<T> chunk{
2
3    if (chunk.usage() >= maxUsage) {
4
5        nextList.add(chunk);
6
7        return;
8
9    }
10
11    chunk.parent = this;
12
13    if (head == null) {
14
15        head = chunk;
16
17        chunk.prev = null;
18
19        chunk.next = null;
20
21    } else {
22
23        chunk.prev = null;
24
25        chunk.next = head;
26
27        head.prev = chunk;
28
29        head = chunk;
30
31    }
32
33}

(3) 内存块分配基本单元PoolChunk

PoolChunk的几个重要参数

memory,物理内存,内存请求者千辛万苦拐弯抹角就是为了得到它,在HeapArena中它就是一个chunkSize大小的byte数组;默认PoolChunk是由11层二叉树构成,也就是大小为ChunkSize=2048*PageSize

memoryMap数组,内存分配控制信息,数组元素是一个32位的整数

subpages数组,页分配信息,数组元素的个数等于chunk中page的数量。

从Arena中创建PoolChunk后,通过调用PoolChunk.allocate()方法真正进行内存分配:

在Chunk中的内存分配是根据需要分配的内存大小将Page内存页划分为SunPage,并将多余的SubPage加入到SubPagePools缓存中,将被分配的Page和SubPage在控制数组中进行标记;

 1private long allocateSubpage(int normCapacity) {
2    PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);
3    synchronized (head) {
4        int d = maxOrder; // subpages are only be allocated from pages i.e., leaves
5        int id = allocateNode(d);
6        if (id < 0) {
7            return id;
8        }
9
10        final PoolSubpage<T>[] subpages = this.subpages;
11        final int pageSize = this.pageSize;
12
13        freeBytes -= pageSize;
14
15        int subpageIdx = subpageIdx(id);
16        PoolSubpage<T> subpage = subpages[subpageIdx];
17        if (subpage == null) {
18            subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);
19            subpages[subpageIdx] = subpage;
20        } else {
21            subpage.init(head, normCapacity);
22        }
23        return subpage.allocate();
24    }
25}

总结

内存池主要是将内存分配管理起来不经过JVM的内存分配,有效减小内存碎片避免内存浪费,同时也能减少频繁GC带来的性能影响;

内存池内存分配入口是PoolByteBufAllocator类,该类最终将内存分配委托给PoolArena进行;为了减少高并发下多线程内存分配碰撞带来的性能影响,PoolByteBufAllocator维护着一个PoolArena数组,线程通过轮询获取其中一个进行内存分配,进而实现锁分离;

内存分配的基本单元是PoolChunk,从PoolArena中分配获取一个PoolChunk,一个PoolChunk包含多个Page内存页,通过完全二叉树维护多个内存页用于内存分配。




推荐阅读






搬运工来架构

以上是关于高性能Netty之内存池源码分析的主要内容,如果未能解决你的问题,请参考以下文章

netty源码之内存池

netty源码之内存池

netty源码之内存池

netty源码之内存池

Netty对象重用:Recycler源码分析

Netty内存池之PoolChunk