高并发内存池项目(C++实战项目)
Posted 林慢慢脑瓜子嗡嗡的
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了高并发内存池项目(C++实战项目)相关的知识,希望对你有一定的参考价值。
文章目录
- 🎄项目介绍
- 🎄设计思路
- 🎄细节与性能优化
- 🎄项目总结
🎄项目介绍
◎项目来源
本项目实现了一个高并发内存池,参考了Google的开源项目tcmalloc实现的简易版;其功能就是实现高效的多线程内存管理。由功能可知,高并发指的是高效的多线程,而内存池则是实现内存管理的。
tcmalloc源码
▶项目源码
◎内存池相关知识
1、池化技术
池化技术就是程序先向系统申请过量的资源,并将这些资源管理起来,避免频繁的申请和释放资源导致的开销。
内存池可以使用池化技术来维护可用内存块的链表。当程序需要分配内存时,内存池会从链表中分配一个可用的内存块。如果没有可用的内存块,内存池会从操作系统申请更多的内存,并将新分配的内存块添加到链表中。当程序释放内存时,内存池会将内存块添加回链表中,以便将来使用。
池化技术可以有效地减少内存碎片,因为它可以将多个小内存块组合成更大的内存块,这样就可以分配更大的连续内存空间,并减少碎片。此外,池化技术还可以提高内存使用效率,因为它可以快速分配和释放内存,而无需每次都调用操作系统的内存分配和释放函数。
2、内存池
内存池指的是程序预先向操作系统申请足够大的一块内存空间;此后,程序中需要申请内存时,不需要直接向操作系统申请,而是直接从内存池中获取;同理,程序释放内存时,也不是将内存直接还给操作系统,而是将内存归还给内存池。当程序退出(或者特定时间)时,内存池才将之前申请的内存真正释放。
3、内存池主要解决的问题
由上可知,内存池首要解决的是效率问题,其次从系统的内存分配器角度出发,还需要解决内存碎片的问题。那么什么是内存碎片问题呢?
内存碎片分为外碎片和内碎片。
- 外碎片由下图所示:对于程序员申请的内存,可能因为频繁的申请和释放内存导致内存空间不连续,那么就会出现明明由足够大的内存空间,但程序员却申请不出连续的空间出来,这便是外碎片问题了。
- 内碎片则是由于一些对齐的需求,导致分配出去的内存空间无法被利用,比如本项目中的
Round(Size)
对size进行的对齐。
4、malloc
C语言中动态申请内存是通过malloc函数去申请内存的,但是实际上malloc并不是直接向堆申请内存的,而malloc也可以使用内存池来管理内存分配,在某些情况下,操作系统或C语言标准库可能会使用内存池来管理堆内存,以提高内存分配效率。当程序将malloc管理的内存池中内存全部申请完时,malloc函数就会继续向操作系统申请空间。
🎄设计思路
◎第一阶段–设计一个定长的内存池
我们知道malloc函数申请内存空间是通用的,即任何场景下都可以使用,但是各方面都通用就意味着各方面都不顶尖,那么我们可以设计一个定长内存池来保证特定场景下的内存申请效率要高于malloc函数。
适应平台的指针方案
在这里,我们想取出一块对象内存中的前4个字节(32位系统)或者8个字节(64位系统)的内存来存储一个指针指向下一块释放回来的自由对象内存,那么在这里为了不作平台系统的判断,可以使用一个小技巧,即将对象内存强转成void**
的类型,那么再对这个二级指针类型解引用就可以取出当前对象的前4个字节(32位系统)或8个字节(64位系统)。
由于这个操作之后会频繁使用,因此定义为内敛函数放在common.h头文件中方便调用:
static inline void*& NextObj(void* obj)
return *(void**)obj;
由此,我们就可以设计出定长内存池的对象:
定长内存池池的基本思想是在程序运行时预先分配一大块内存,然后在需要使用某个对象时,从这块内存中分配给它。当对象不再使用时,将它归还给对象池,供其他对象使用。这样做的好处在于减少了内存分配和释放的次数,从而减少了内存碎片的产生,并降低了内存分配的开销。
在这段代码中,
ObjectPool
类的主要功能包括:
New()
函数:用于分配一个新的对象,如果有自由链表中有空闲的对象,则直接从自由链表中取出;否则,如果当前剩余内存块大小不够一个对象的大小,则重新申请一个内存块。申请到内存后,调用对象的构造函数来进行初始化。Delete()
函数:用于释放一个对象,调用对象的析构函数进行清理,然后将其加入自由链表中。在这段代码中,
ObjectPool
类的成员变量包括:
_memory
:指向当前申请的内存块的指针。
_remainBytes
:当前内存块剩余的字节数。
_freeList
:自由链表的头指针,用于保存当前有哪些对象可以被重复利用。在这段代码中,还有一个函数
SystemAlloc()
,这是为了避免使用malloc而使用的,它的作用是申请一个新的内存块。如果申请失败,则抛出std::bad_alloc
异常。总的来说,这段代码实现了一个简单的对象池,可以有效地管理类型为
T
的对象的内存分配和释放,从而减少了内存碎片的产生,并降低了内存分配的开销。
template<class T>
class ObjectPool
public:
T* New()
T* obj = nullptr;
// 如果自由链表非空,以“头删”的方式从自由链表取走内存块,重复利用
if (_freeList)
// 技巧:(void**)强转方便32位下获取前4字节,64位下获取前8字节
void* next = *((void**)_freeList);
obj = (T*)_freeList;
_freeList = next;
else
// 剩余内存_remainBytes不够一个对象大小时,重新开一块大空间
if (_remainBytes < sizeof(T))
_remainBytes = 128 * 1024;
// 分配了 _remainBytes 个字节的空间,即(128 *1024字节,128KB)
// memory = (char*)malloc(_remainBytes);
// >>13 其实就是一页8KB的大小,可以得到具体多少页
_memory = (char*)SystemAlloc(_remainBytes >> 13);
if (_memory == nullptr)
throw std::bad_alloc();
obj = (T*)_memory;
// 保证一次分配的空间够存放下当前平台的指针
size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
// 大块内存块往后走,前面objSize大小的内存该分配出去了
_memory += objSize;
_remainBytes -= objSize;
// 定位new显式调用T类型构造函数:在内存地址obj处创建一个新的T类型的对象,并调用该对象的构造函数。
// 与普通的new运算符不同的是,它不会使用动态内存分配器来分配内存,而是使用指定的内存地址。
new(obj)T;
return obj;
//将obj这块内存链接到_freeList中
void Delete(T* obj)
//显式调用obj对象的析构函数,清理空间
obj->~T();
//将obj内存块头插
*(void**)obj = _freeList;
_freeList = obj;
private:
char* _memory = nullptr; // 指向大块内存的指针
size_t _remainBytes = 0; // 大块内存在切分过程中的剩余字节数
void* _freeList = nullptr; // 自由链表的头指针,用于保存当前有哪些对象可以被重复利用。
;
对于我们设计的定长内存池,可以通过下面的测试代码来比较一下malloc与定长内存池的效率:
struct TreeNode
int _val;
TreeNode* _left;
TreeNode* _right;
TreeNode():_val(0), _left(NULL),_right(NULL)
TreeNode(int x) : _val(x), _left(nullptr), _right(nullptr)
;
void TestObjectPool()
// 申请释放的轮次
const size_t Rounds = 5;
// 每轮申请释放多少次
const size_t N = 1000000;
size_t begin1 = clock();
std::vector<TreeNode*> v1;
v1.reserve(N);
for (size_t j = 0; j < Rounds; ++j)
for (int i = 0; i < N; ++i)
v1.push_back(new TreeNode);
for (int i = 0; i < N; ++i)
delete v1[i];
v1.clear();
size_t end1 = clock();
ObjectPool<TreeNode> TNPool;
size_t begin2 = clock();
std::vector<TreeNode*> v2;
v2.reserve(N);
for (size_t j = 0; j < Rounds; ++j)
for (int i = 0; i < N; ++i)
v2.push_back(TNPool.New());
for (int i = 0; i < 100000; ++i)
TNPool.Delete(v2[i]);
v2.clear();
size_t end2 = clock();
cout << "new cost time:" << end1 - begin1 << endl;
cout << "object pool cost time:" << end2 - begin2 << endl;
可以明显的看出,定长内存池的开销是要低于malloc的,由此可见,在特定场景下,定长内存池的效率高于malloc函数。
◎第二阶段–高并发内存池整体框架设计
现代开发环境大多都是多核多线程,那么在申请内存的场景下,必然存在激烈的锁竞争问题。其实,malloc本身就已经足够优秀了,但本项目的原型tcmalloc将在多线程高并发的场景下更胜一筹。
而本项目实现的内存池将考虑以下几方面的问题:
- 1.性能问题
- 2.多线程场景下的锁竞争问题
- 3.内存碎片问题
concurrent memory pool(并发内存池),主要有以下3个部分组成:
1.线程缓存(thread cache)
线程缓存是每个线程独有的,用于小于256kb内存的分配。那么对于每一个线程从thread cache申请资源,就无需考虑加锁问题,每个线程独享一个缓存(cache),这也是并发线程池高效的地方。
2.中心缓存(central cache)
中心缓存有所有线程所共享,thread cache 按需从central cache处获取对象,而central cache在合适的时机从thread cache处回收对象从而避免一个线程占用太多资源,导致其他线程资源吃紧,进而实现内存分配在多个线程更加均衡的按需调度。由于所有thread cache都从一个central cache中取内存对象,故central cache是存在竞争的,也就是说从central cache中取内存对象需要加锁,但我们在central cache这里用的是桶锁,且只有thread cache中没有对象后才会来central cache处取对象,因此锁的竞争不会很激烈。
3.页缓存(page cache)
页缓存是中心缓存上一级的缓存,存储并分配以页为单位的内存,central cache中没有内存对象时,会从page cache中分配出一定数量的page,并切割成定长大小的小块内存,给central cache。当page cache中一个span的几个跨度页都回收以后,page cache会回收central cache中满足条件的span对象,并且合并相邻的页,组成更大的页,从而缓解内存碎片(外碎片)的问题。
◎第三阶段–三级缓存的具体实现
1.Thread Cache框架构建及核心实现
thread cache是哈希桶结构,每个桶是一个根据桶位置映射的挂接内存块的自由链表,每个线程都会有一个thread cache对象,这样就可以保证线程在申请和释放对象时是无锁访问的。
🌕申请与释放内存的规则及无锁访问
- 申请内存
- 当内存申请大小size不超过256KB,则先获取到线程本地存储的thread cache对象,计算size映射的哈希桶自由链表下标i。
- 如果自由链表_freeLists[i]中有对象,则直接Pop一个内存对象返回。
- 如果_freeLists[i]中没有对象时,则批量从central cache中获取一定数量的对象,插入到自由链表并返回一个对象。
释放内存
1.当释放内存小于256kb时将内存释放回thread cache,计算size映射自由链表桶位置i,将对象Push到_freeLists[i]。
2.当链表的长度过长,则回收一部分内存对象到central cache。
tls - thread local storage
线程局部存储(tls),是一种变量的存储方法,这个变量在它所在的线程内是全局可访问的,但是不能被其他线程访问到,这样就保持了数据的线程独立性。而熟知的全局变量,是所有线程都可以访问的,这样就不可避免需要锁来控制,增加了控制成本和代码复杂度。
//TLS: thread local storage,实现线程的无锁访问
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
🌕管理内存对齐和映射等关系
▶计算对齐大小映射的规则
thread cache中的哈希桶映射比例比非均匀的,如果将内存大小均匀划分的话,则会划分出大量的哈希桶,比如256kb如果按照8byte划分,则会创建32768个哈希桶,这就有较大的内存开销;而如果按照更大的字节划分,那么内存开销虽然减少了,但照顾到的场景也少了,且会产生内碎片问题。
那么参考tcmalloc项目,为了保证内碎片的浪费整体控制在10%左右进行的区间映射,同时没有那么大的开销。使用RoundUp
函数的将输入的 size
对齐到一个固定的对齐值。对齐值是根据 size
的大小而定的,它分成了五个区间:
- 如果
size
位于[1,128]
之间,那么size
将被对齐到 8 字节。 - 如果
size
位于[128+1,1024]
之间,那么size
将被对齐到 16 字节。 - 如果
size
位于[1024+1,8*1024]
之间,那么size
将被对齐到 128 字节。 - 如果
size
位于[8*1024+1,64*1024]
之间,那么size
将被对齐到 1024 字节。 - 如果
size
位于[64*1024+1,256*1024]
之间,那么size
将被对齐到 8192 字节。
这个函数内部使用了另外一个静态函数
_RoundUp
来实际计算对齐后的值。
也就是说,对于1byte到128byte的内存对象,按照8byte对齐,划分为下标0-15号的哈希桶,而129byte到1kb的内存对象,按照16byte对齐,划分下标16-71号的哈希桶,以此类推,最终划分为0-207号总共208个哈希桶,这样就保证了内存较小的开销,同时各个对齐关系中内碎片浪费控制在10%左右,比如129byte到144byte区间,取144byte的内存对象,浪费率为(144 - 129) / 144 = 10.42%,当然对于最开始的1byte申请8byte内存对象,虽然浪费高达87.5%,但考虑到最终内碎片浪费了7byte,对比后续内碎片一次浪费7kb来说可以忽略不计了。
这便是申请的内存对象大小对齐的映射关系,这个关系在后续central cache及page cache中仍有应用,因此可以将其定义在头文件common.h中,以后内存大小对齐的管理。
▶计算相应内存映射在哪一个哈希桶中
这里使用Index
函数计算将输入的 size
映射到哪个自由链表桶(freelist)。和 RoundUp
函数一样,这个函数也根据 size
的大小将它分成了五个区间,但是它返回的是一个数组下标。数组的大小和每个区间内的自由链表桶数量是固定的。
这个函数内部使用了另一个静态函数
_Index
来计算桶的下标。在代码中,size
表示要被对齐的内存块的大小,alignNum
表示对齐的值,align_shift
表示对齐的值的二进制位数。
关于 _RoundUp
和 _Index
:
对于 _RoundUp
函数,它使用位运算将 size
对齐到最接近它的大于等于它的 alignNum
的倍数。这里有一个简单的例子:假设我们有一个值 size=11
,我们想将它对齐到 alignNum=8
的倍数。那么 _RoundUp
函数会返回 16,因为 16 是最接近 11 且大于等于 11 的 alignNum
的倍数。
对于 _Index
函数,它计算的是将 size
映射到桶链的下标。它的计算方法是将 size
向上对齐到最接近它的大于等于它的 2^align_shift
的倍数,然后再减去 1。这个函数的作用和 _RoundUp
函数类似,但是它返回的是下标而不是对齐后的值。
//计算对齐数
size_t _RoundUp(size_t size, size_t alignNum)
size_t alignSize;
if (size % alignNum != 0)
alignSize = (size / alignNum + 1) * alignNum;
else
alignSize = size;
return alignSize;
//计算对应链桶的下标
static inline size_t _Index(size_t bytes, size_t alignNum)
if (bytes % alignNum == 0)
return bytes / alignNum - 1;
else
return bytes / alignNum;
但是参考tcmalloc源码,考虑到位移运算更加接近底层,效率更高,而实际应用中映射对应关系的计算是相当频繁的,因此使用位运算来改进算法。
static inline size_t _RoundUp(size_t bytes, size_t alignNum)
return ((bytes + alignNum - 1) & ~(alignNum - 1));
static inline size_t _Index(size_t bytes, size_t align_shift)
return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
▶代码实现
// 计算对象大小的对齐映射规则
class SizeClass
public:
// 整体控制在最多10%左右的内碎片浪费
// [1,128] 8byte对齐 freelist[0,16)
// [128+1,1024] 16byte对齐 freelist[16,72)
// [1024+1,8*1024] 128byte对齐 freelist[72,128)
// [8*1024+1,64*1024] 1024byte对齐 freelist[128,184)
// [64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208)
// 使用位运算将 size 对齐到最接近它的大于等于它的 alignNum 的倍数
// 比如size = 11对齐到16
static inline size_t _RoundUp(size_t bytes, size_t alignNum)
return ((bytes + alignNum - 1) & ~(alignNum - 1));
static inline size_t RoundUp(size_t size)
if (size <= 128)
return _RoundUp(size, 8);
else if (size <= 1024)
return _RoundUp(size, 16);
else if (size <= 8 * 1024)
return _RoundUp(size, 128);
else if (size <= 64 * 1024)
return _RoundUp(size, 1024);
else if (size <= 256 * 1024)
return _RoundUp(size, 8 * 1024);
else
assert(false);
return -1;
// 将 size 映射到桶链的下标:
// 这个函数的作用和 _RoundUp 函数类似,但是它返回的是下标而不是对齐后的值。
// 比如size = 11映射下标到(2 - 1 = 1)
static inline size_t _Index(size_t bytes, size_t align_shift)
return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
// 计算映射的哪一个自由链表桶
static inline size_t Index(size_t bytes)
assert(bytes <= MAX_BYTES);
// 每个区间有多少个链
static int group_array[4] = 16, 56, 56, 56 ;// 打表
if (bytes <= 128)
return _Index(bytes, 3);
else if (bytes <= 1024)
return _Index(bytes - 128, 4) + group_array[0];
else if (bytes <= 8 * 1024)
return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];
else if (bytes <= 64 * 1024)
return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];
else if (bytes <= 256 * 1024)
return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];
else
assert(false);
return -1;
// 计算ThreadCache一次从中心缓存CentralCache获取多少个小对象,总的大小就是MAX_BYTES = 256KB
static size_t NumMoveSize(size_t size)
assert(size > 0);
// [2, 512],一次批量移动多少个对象的(慢启动)上限值
// 小对象一次批量上限高
// 小对象一次批量上限低
int num = MAX_BYTES / size;
if (num < 2)
num = 2;
if (num > 512)
num = 512;
return num;
// 计算中心缓存CentralCache一次向PageCache获取多少页
// 单个对象 8byte
// ...
// 单个对象 256KB
static size_t NumMovePage(size_t size)
// 计算一次从中心缓存获取的对象个数num
size_t num = NumMoveSize(size);
// 单个对象大小与对象个数相乘,获得一次需要向PageCache申请的内存大小
size_t npage = num * size;
npage >>= PAGE_SHIFT;
if (npage == 0)
npage = 1;
return npage;
;
NumMoveSize 函数的作用是计算一次从中心缓存获取多少个对象。它的计算方法是首先将单个对象大小除以总的缓存大小 MAX_BYTES,得到的结果即为一次从中心缓存获取的对象个数。为了避免数量太少或太多,可以设置一个范围,在 [2, 512] 之间。如果计算出的对象数量不在这个范围内,就取边界值。
NumMovePage函数的作用是计算中心缓存CentralCache一次向PageCache获取多少页。一页的大小是由PAGE_SHIFT指定的。本项目中一个页大小是8KB,即2的13次方,所以PAGE_SHIFT = 13
。NumMovePage函数先调用NumMoveSize函数计算出一次从CentralCache获取多少个对象,然后乘上对象大小,就获得需要向PageCache申请的内存大小,然后除以单个页的大小(左移PAGE_SHIFT)
以上是关于高并发内存池项目(C++实战项目)的主要内容,如果未能解决你的问题,请参考以下文章