高并发内存池
Posted 雨轩(爵丶迹)
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了高并发内存池相关的知识,希望对你有一定的参考价值。
高并发内存池
- 项目介绍
- 什么是内存池
- 设计一个自己的定长内存池(自己开超市)
- 高并发内存池框架整体设计
- 高并发内存池--Thread Cache(第一层)
- 高并发内存池--Central Cache(第二层)
- 高并发内存池--Page Cache(第三层)
- windows和Linux下如何直接向堆申请和释放页为单位的大块内存
- 释放内存整体框架
- 大于256KB的大块内存申请问题
- 释放对象时优化为不传对象大小
- 申请内存和释放内存的总函数接口
- 性能瓶颈分析
- 使用tcmalloc源码中实现基数树进行优化
- 优化代码实现
- 扩展学习及当前项目实现的不足
项目介绍
当前项目是实现一个高并发的内存池,原型是google的一个开源项目tcmalloc,tcmalloc全称Thread-Caching Malloc,即线程缓存的malloc,实现了高效的多线程内存管理,用于替代系统的内存分配相关的函数(malloc、free)。
什么是内存池
1.池化技术
所谓“池化技术”,就是程序先向系统申请过量的资源,然后自己管理,以备不时之需。之所以要申请过量的资源,是因为每次申请该资源都有较大的开销,不如提前申请好了,这样使用时就会变得非常快捷,大大提高程序运行效率。
2.内存池
内存池是指程序预先从操作系统申请一块足够大内存,此后,当程序中需要申请内存的时候,不是直接向操作系统申请,而是直接从内存池中获取;同理,当程序释放内存的时候,并不真正将内存返回给操作系统,而是返回内存池。当程序退出(或者特定时间)时,内存池才将之前申请的内存真正释放。
3.内存池主要解决的问题
这一点是我们最关心的,也是为什么会有这个东西的原因。
内存池主要解决的是效率的问题。其次站在系统的内存分配器的角度,还需要解决一下内存碎
片的问题。那什么是内存碎片?
内存碎片分为外碎片和内碎片。上面是外碎片问题。外部碎片是一些空闲的连续内存区域太小,这些内存空间不连续,以至于合计的内存足够,但是不能满足一些的内存分配申请需求。内部碎片是由于一些对齐的需求,导致分配出去的空间中一些内存无法被利用。内碎片问题,在后面项目中就会看到。
4.malloc
C/C++中我们要动态申请内存都是通过malloc去申请内存,但是我们要知道,实际我们不是直接去堆获取内存的,而malloc就是一个内存池。malloc() 相当于向操作系统“批发”了一块较大的内存空间,然后“零售”给程序用。当全部“售完”或程序有大量的内存需求时,再根据实际需求向操作系统“进货”。
设计一个自己的定长内存池(自己开超市)
作为程序员(C/C++)我们知道申请内存使用的是malloc,malloc其实就是一个通用的大众货,什么场景下都可以用,但是什么场景下都可以用就意味着什么场景下都不会有很高的性能。
先熟悉一下简单内存池是如何控制的,第二他会作为我们后面内存池的一个基础组件。
定长内存池说明:当需要一块内存空间的时候,我们先到定长内存池中取内存对象(开始freelist为空)
1、先向系统申请一块很大的内存空间,这里我们申请的是128k,
2、从定长内存池中取走一块T类型大小的内存对象
3、释放的时候,直接归还给自由链表,下一次取内存对象的话先从自由链表freelist中取,如果freelist为空,在去定长内存池中取。
需要注意的是,自由链表中指向下一个结点的地址,这里取类型的转换,32位下是没有问题的,能取到4个字节,但是64位下,就存在问题了。
所以我们这里这里可以将 int* 改为 void**,32位和64位都是void*(指针,改成int**一样的)大小,指针会变化,这样64位下也可以取到下一个结点地址。
具体代码如下:
template<class T>
class ObjectPool
public:
//开辟一块内存池
T* New()
T* obj = nullptr;
//如果自由链表上面有内存块,则先从自由链表上面取
if (_freeList)
obj = (T*)_freeList;
_freeList = *((void**)_freeList);//指向下一个内存块
//_freeList = *((int*)_freeList);
else//从定长内存池中取一块内存块
//如果定长内存池中剩余字节连一个类型都无法满足,此时需要重新向系统申请一块内存
if (_leftBytes < sizeof(T))
_leftBytes = 128 * 1024;
// _memory = (char*)malloc(_leftBytes);
_memory = (char*)SystemAlloc(_leftBytes >> 13);
//有可能开辟失败
if (_memory == nullptr)
throw std::bad_alloc();
//取一块内存
obj = (T*)_memory;
//_memory向后移动一个T类型字节,确保能取到下一个结点地址
size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
_memory += objSize;//向后移动一个内存块
_leftBytes -= objSize;//内存池减少一个内存块
new(obj) T;//使用定位new调用T的构造函数初始化
return obj;
//不需要的内存给自由链表
void Delete(T* obj)
obj->~T();//显示调用T类型的析构函数进行资源清理
//obj指向第一个结点,头指针freelist指向obj
*((void**)obj) = _freeList;
//*((int*)obj) = _freeList;
_freeList = obj;
private:
char* _memory = nullptr;//指向内存块的指针
int _leftBytes = 0;//内存块中剩余字节
void* _freeList = nullptr;//自由链表
;
高并发内存池框架整体设计
现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc本身其实已经很优秀,那么我们项目的原型tcmalloc就是在多线程高并发的场景下更胜一筹,所以这次我们实现的内存池需要考虑以下几方面的问题。
- 性能问题。
- 多线程环境下,锁竞争问题。
- 内存碎片问题。
concurrent memory pool主要由以下3个部分构成:
- thread cache:线程缓存是每个线程独有的,用于小于256KB的内存的分配,线程从这里申请内存不需要加锁,每个线程独享一个cache,这也就是这个并发线程池高效的地方。
- central cache:中心缓存是所有线程所共享,thread cache是按需从central cache中获取的对象。central cache合适的时机回收thread cache中的对象,避免一个线程占用了太多的内存,而其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的目的。central cache是存在竞争的,所以从这里取内存对象是需要加锁,首先这里用的是桶锁,其次只有thread cache的没有内存对象时才会找central cache,所以这里竞争不会很激烈。
- page cache:页缓存是在central cache缓存上面的一层缓存,存储的内存是以页为单位存储及分配的,central cache没有内存对象时,从page cache分配出一定数量的page,并切割成定长大小的小块内存,分配给central cache。当一个span的几个跨度页的对象都回收以后,page cache会回收central cache满足条件的span对象,并且合并相邻的页,组成更大的页,缓解内存碎片的问题。
高并发内存池–Thread Cache(第一层)
thread cache是哈希桶结构,每个桶是一个按桶位置映射大小的内存块对象的自由链表。每个线程都会有一个thread cache对象,这样每个线程在这里获取对象和释放对象时是无锁的。
申请内存:
- 当内存申请size<=256KB,先获取到线程本地存储的thread cache对象,计算size映射的哈希桶自由链表下标index。
- 如果自由链表_freeLists[index]中有对象,则直接Pop一个内存对象返回。
- 如果_freeLists[index]中没有对象时,则批量从central cache中获取一定数量的对象,插入到自由链表并返回一个对象。
我们先处理自由链表,将映射关系建立好,自由链表的哈希桶跟对象大小的映射关系:
哈希桶
上面我们可以看到Thread Cache 从8bit、16bit…256KB增长的,这样的话我们就需要32768个桶,32768个桶是可以申请的,但是我们没有必要申请这么多,这样产生的自由链表太多,减少浪费。
我们来看看大佬的设计
内存对齐:
整体控制在最多10%左右的内碎片浪费 129 + 15 15 / 144 ~ 10%
为什么不8个位去对齐?这样产生的自由链表太多,减少浪费
[1,128] 8byte对齐 freelist[0,16)
[128+1,1024] 16byte对齐 freelist[16,72) (1024-128+1)/16,增加56个链表,区间[16,72]
[1024+1,8*1024] 128byte对齐 freelist[72,128)
[8*1024+1,64*1024] 1024byte对齐 freelist[128,184)
[64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208)
static inline size_t _RoundUp(size_t bytes, size_t alignNum)
第一种写法
size_t alignSize = 0;
if (bytes % alignNum != 0)
alignSize = (bytes / alignNum + 1) / alignNum;//+1向上提升,不足8个字节,需要对齐
else
alignSize = bytes;
return alignSize;
//7 + 8 - 1
//&
//7
//1110
//1000
//==8
第二种写法
return ((bytes + alignNum - 1) & ~(alignNum - 1));
//对齐数划分区间
static inline size_t RoundUp(size_t size)
if (size <= 128)
return _RoundUp(size, 8);
else if (size <= 1024)
return _RoundUp(size, 16);
else if (size <= 8 * 1024)
return _RoundUp(size, 128);
else if (size <= 64 * 1024)
return _RoundUp(size, 1024);
//256kb到thread cache的头
else if (size <= 256 * 1024)
return _RoundUp(size, 8 * 1024);
else
return _RoundUp(size, 1 << PAGE_SHIFT);
//return -1;//超过,返回-1
对象大小:
哈希映射位置,计算映射的那个一个自由链表桶
1 + 7 8
2 9
...
8 15
static inline size_t _Index(size_t bytes, size_t align_shift)
第一种写法
if (bytes % align_shift == 0)
return bytes / align_shift - 1;//16/8 - 1 = 1
else
return bytes / align_shift;//15 / 8 = 1
//16 + 1<<3 -1 = 23 >> 3 -1 = 2 - 1 = 1
第二种写法
return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
static inline size_t Index(size_t bytes)
assert(bytes <= MAX_BYTES);//不能超过桶大小
static int group_array[4] = 16, 56, 56, 56 ;
if (bytes <= 128)
return _Index(bytes, 3);
else if (bytes <= 1024)
return _Index(bytes - 128, 4) + group_array[0];//按照与算,需要-128,在加上一个区间有多少个自由链表。
else if (bytes <= 8 * 1024)
return _Index(bytes - 1024, 7) + group_array[0] + group_array[1];
else if (bytes <= 64 * 1024)
return _Index(bytes - 8 * 1024, 10) + group_array[0] + group_array[1] + group_array[2];
else if (bytes <= 256 * 1024)
return _Index(bytes - 64 * 1024, 13) + group_array[0] + group_array[1] + group_array[2] + group_array[3];
else
assert(false);
return -1;
ThreadCache中的自由链表(FreeList)
FreeList这里是一个单链表,维护着申请和释放内存对象的关系。
//管理每个小对象的自由链表
class FreeList
public:
//头插
void Push(void* obj)
assert(obj);
NextObj(obj) = _freelist;
_freelist = obj;
++_size;
//插入一段内存
void PushRange(void* start, void* end, size_t n)
NextObj(end) = _freelist;//指向第一个结点
_freelist = start;//指向新插入的内存块的第一个结点
_size += n;//链表长度
//释放
void PopRange(void* start, void* end, size_t n)
assert(n >= _size);
start = _freelist;
end = start;
//释放n个内存块到Central Cache中
for (size_t i = 0; i < n - 1; ++i)
end = NextObj(end);
_freelist = NextObj(end);
NextObj(end) = nullptr;
_size -= n;//链表长度-n
//借走一块内存块
void* Pop()
assert(_freelist);//确保链表不为空
void* obj = _freelist;
_freelist = NextObj(obj);
--_size;
return obj;
bool Empty()
return _freelist == nullptr;
//需要修改,引用
size_t& MaxSize()
return _maxsize;
//链表长度
size_t Size()
return _size;
private:
void* _freelist = nullptr;//头指针
size_t _maxsize = 1;//慢增长的大小
size_t _size = 0;//链表的长度
;
其中的nextObj函数是取的指针obj的四个字节,指向一个结点,我们这里以头插举例:
//下一个结点地址
static void*& NextObj(void* obj)
return *((void**)obj);
ThreadCache框架
- thread cache本质是由一个哈希映射的对象自由链表构成。
//封装了一层
class ThreadCache
public:
void* Allocate(size_t size);//申请对象
void Deallocate(void* ptr, size_t size);//释放对象
// 从中心缓存获取对象
void* FetchFromCentralCache(size_t index, size_t size);
// 释放对象时,链表过长时,回收内存回到中心缓存
void ListTooLong(FreeList& list, size_t size);
private:
//自由链表
FreeList _freeList[FREELISTNUM];//开辟208个
;
// TLS thread local storage
线程局部存储(TLS),是一种变量的存储方法,这个变量在它所在的线程内是全局可访问的,但是不能被其他线程访问到,这样就保持了数据的线程独立性。
而熟知的全局变量,是所有线程都可以访问的,这样就不可避免需要锁来控制,增加了控制成本和代码复杂度。
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
线程局部存储说明(TLS):
- 线程局部存储(TLS),是一种变量的存储方法,这个变量在它所在的线程内是全局可访问的,但是不能被其他线程访问到,这样就保持了数据的线程独立性。所以threadcache中是不需要加锁的。
- 而熟知的全局变量,是所有线程都可以访问的,这样就不可避免需要锁来控制,增加了控制成本和代码复杂度。
threadcache中申请的是小于256kb的内存对象,且有208个桶,我们这里定义出来
static const size_t MAX_BYTES = 256 * 1024;//256kb thread cache这一层,每个线程独享
static const size_t FREELISTNUM = 208;//freelist的表大小
申请内存空间
void* ThreadCache::Allocate(size_t size)
assert(size <= MAX_BYTES);//小于最大位数
size_t alignSize = SizeClass::RoundUp(size);//获取对齐数
size_t index = SizeClass::Index(size);//获取映射位置
//如果自由链表不为空,直接弹出一个对象
if (!_freeList[index].Empty())
return _freeList[index].Pop();
else
//没有对象,从central cache层获取一定数量对象,插入到自由链表中并返回一个对象。
return FetchFromCentralCache(index, alignSize);//这里的size注意是对齐后的
从中心缓存获取内存对象
慢增长方法,同网络的拥塞控制,开始需要1个给1个,后面1个给2个,2个给3个,慢慢增长,避免浪费内存。
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
//1、最开始不会向central catch要太多对象,要多了用不完,比如需要8个字节,结果给了500个对象,但实际只用了50个,就存在浪费的情况
//2、如果不需要size这个大小的内存需求,batchNum就会一直增大
//3、size越小,获取到的batchNum越大
//4、size越大,获取到的batchNum越小
size_t batchNum = min(_freeList[index].MaxSize(), SizeClass::NumMoveSize(size));//取最小值
//保证最小取到一个对象 要1个,给2个,1个给3个,依次递增,同拥塞控制
if (_freeList[index].MaxSize() == batchNum)
_freeList[index].MaxSize() += 1;
//去central catch中获取batchNum个对象,有多少取多少
void* start = nullptr;
void* end = nullptr;
//获取对象 不要忘记这里使用的是单例模式,注意调用方法
size_t actualNum = CentralCatch::GetInstance()->FetchRangeObj(start, end, batchNum, size);
assert(actualNum > 0);//确保申请成功
if (actualNum == 1)
assert(start == end);//一个的时候这两个相等
return start;
else
_freeList[index<以上是关于高并发内存池的主要内容,如果未能解决你的问题,请参考以下文章