实战项目高并发内存池
Posted 小倪同学 -_-
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了实战项目高并发内存池相关的知识,希望对你有一定的参考价值。
文章目录
- 项目介绍
- 内存池技术
- 设计一个定长的内存池
- 高并发内存池整体框架设计
- thread cache
- central cache
- page cache
- 申请内存流程
- 内存释放
- 内存释放流程
- 使用定长内存池配合脱离使用new
- 释放对象时优化为不传对象大小
- 多线程环境下对比malloc测试
- 性能瓶颈分析
- 使用基数树进行优化
- 项目源码
项目介绍
当前项目是实现一个高并发的内存池,他的原型是google的一个开源项目tcmalloc,tcmalloc全称Thread-Caching Malloc,即线程缓存的malloc,实现了高效的多线程内存管理,用于替代系统的内存分配相关的函数(malloc、free)。
这个项目是把tcmalloc最核心的框架简化后拿出来,模拟实现出一个自己的高并发内存池,目的就是学习tcamlloc的精华。
这个项目会用到C/C++、数据结构(链表、哈希桶)、操作系统内存管理、单例模式、多线程、互斥锁等方面的知识。
内存池技术
池化技术
所谓“池化技术”,就是程序先向系统申请过量的资源,然后自己管理,以备不时之需。之所以要申请过量的资源,是因为每次申请该资源都有较大的开销,不如提前申请好了,这样使用时就会变得非常快捷,大大提高程序运行效率。
在计算机中,有很多使用“池”这种技术的地方,除了内存池,还有连接池、线程池、对象池等。以服务器上的线程池为例,它的主要思想是:先启动若干数量的线程,让它们处于睡眠状态,当接收到客户端的请求时,唤醒池中某个睡眠的线程,让它来处理客户端的请求,当处理完这个请求,线程又进入睡眠状态。
内存池
内存池是指程序预先从操作系统申请一块足够大内存,此后,当程序中需要申请内存的时候,不是直接向操作系统申请,而是直接从内存池中获取;同理,当程序释放内存的时候,并不真正将内存返回给操作系统,而是返回内存池。当程序退出(或者特定时间)时,内存池才将之前申请的内存真正释放。
内存池主要解决的问题
内存池主要解决的还是效率的问题,其次如果从系统的内存分配器的角度来看,还需要解决一下内存碎片的问题。那么什么是内存碎片呢?
采用分区式存储管理的系统,在储存分配过程中产生的、不能供用户作业使用的主存里的小分区称成内存碎片,如下图所示
内存碎片又分为外碎片和内碎片,上面演示的是外碎片。外部碎片是一些空闲的连续内存区域太小,这些内存空间不连续,以至于合计的内存足够,但是不能满足一些的内存分配申请需求。内部碎片是由于一些对齐的需求,导致分配出去的空间中一些内存无法被利用。
malloc
C/C++中动态申请内存并不是直接去堆上申请的,而是通过malloc函数去申请的,C++中的new本质上也是封装了malloc函数。
malloc就是一个内存池。malloc() 相当于向操作系统“批发”了一块较大的内存空间,然后“零售”给程序用。当全部“售完”或程序有大量的内存需求时,再根据实际需求向操作系统“进货”。
malloc的实现方式有很多种,一般不同编译器平台用的都是不同的。比如windows的vs系列用的微软自己实现的一套,linux下的gcc用的glibc中的ptmalloc。
设计一个定长的内存池
我们知道申请内存使用的是malloc,它在任何场景下都可以用,这就意味着什么场景下它都不会有很高的性能。
定长内存池是针对固定大小内存块的申请和释放的问题,因为它申请和释放的内存块大小是固定的,所以不需要考虑内存碎片化的问题。
通过定长内存池,我们先熟悉一下简单内存池是如何控制的,其次,它也是后面高并发内存池的一个基础组件。
如何实现定长
我们可以利用非类型模板参数来控制向该内存池申请的内存大小,如下面代码,可以控制每次向内存池申请的内存大小为N
template<size_t N>
class ObjectPool
;
此外,定长内存池也叫做对象池,在创建对象池时,对象池可以根据传入的对象类型的大小来实现“定长”,我们可以通过模板参数来实现定长,例如创建定长内存池时传入的对象类型是int,那么该内存池就只支持4字节大小内存的申请和释放。
template<class T>
class ObjectPool
;
定长内存池向堆申请空间
这里申请空间不用malloc,而是用malloc的底层,直接向系统要内存,在Windows下,可以调用VirtualAlloc函数,在Linux下,可以调用brk或mmap函数。这里以Windows为主。
#ifdef _WIN32
#include<windows.h>
#else
//
#endif
// 该函数短小,可设置成内联函数提高效率
inline static void* SystemAlloc(size_t kpage)
#ifdef _WIN32
// 向堆上申请kpage块8192字节空间
void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
// linux下brk mmap等
#endif
if (ptr == nullptr)
throw std::bad_alloc();
return ptr;
定长内存池中的成员变量
对于申请的大块内存,我们可以利用指针进行管理,再用一个变量来记录申请的内存中剩余的内存大小。指针最好为字符指针,因为字符指针一次可以走任意的字节,很灵活。
对于释放回来的内存,我们可以利用链表来管理,这就需要一个指向链表的指针。
所以定长内存池中设计了三个变量
- 指向大块内存的指针
- 记录大块内存在切分过程中剩余字节数的变量
- 记录回收内存自由链表的头指针
定长内存池为用户申请空间
当我们为用户申请空间时,优先使用释放回来的内存,即自由链表。将自由链表头删一块内存返回。
如果自由链表当中没有内存块,那么我们就在大块内存中切出定长的内存块进行返回。内存块切出后,及时更新_memory指针的指向,以及_remainBytes的值。
当大块内存不够切分出一个对象时,调用封装的SystemAlloc函数向系统申请一大块内存,再进行切分。
注意:为了让释放的内存能够并入自由链表中,我们必须保证切分出来的对象能够存下一个地址,即申请的内存块至少为4字节(32位)或8字节(64位)。
template<class T>
class ObjectPool
public:
T* New()
T* obj = nullptr;
// 优先把还回来内存块对象,再次重复利用
if (_freeList)
// 从自由链表头删一个对象
void* next = *((void**)_freeList);
obj = (T*)_freeList;
_freeList = next;
else
// 剩余内存不够一个对象大小时,则重新开大块空间
if (_remainBytes < sizeof(T))
_remainBytes = 128 * 1024;
_memory = (char*)SystemAlloc(_remainBytes >> 13);
// 申请内存失败抛异常
if (_memory == nullptr)
throw std::bad_alloc();
//从大块内存中切出objSize字节的内存
obj = (T*)_memory;
//保证对象能够存下一个地址
size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
// 调整成员变量
_memory += objSize;
_remainBytes -= objSize;
// 定位new,显示调用T的构造函数初始化
new(obj)T;
return obj;
private:
char* _memory = nullptr;// 指向大块内存的指针
size_t _remainBytes = 0;// 大块内存在切分过程中剩余字节数
void* _freeList = nullptr;// 还回来过程中链接的自由链表的头指针
;
定长内存池管理回收的内存
我们用链表管理回收的内存,为了方便使用和节省空间,我们用内存块的前4个字节(32位平台)或8个字节(64位平台)记录下一个内存块的起始地址,如下图所示。
当回收内存块时,将内存块头插入自由链表即可。
代码实现起来也很简单,就是链表的头插。
void Delete(T* obj)
// 显示调用析构函数清理对象
obj->~T();
// 头插
*(void**)obj = _freeList;
_freeList = obj;
这里还存在一个问题:如何让一个指针在32位平台下解引用后能向后访问4个字节,在64位平台下解引用后能向后访问8个字节呢?
这里我们利用二级指针,因为二级指针存储的是一级指针的地址,而一级指针会在不同的平台下呈现出不同的大小(32位平台大小为4字节,64位平台大小为8字节),二级指针解引用会向后访问一级指针的大小。这个操作在下面项目中会经常使用,建议写成函数。
static void*& NextObj(void* obj)
return *(void**)obj;
定长内存池总体代码
template<class T>
class ObjectPool
public:
T* New()
T* obj = nullptr;
// 优先使用还回来内存块对象,再次重复利用
if (_freeList)
// 从自由链表头删一个对象返回
void* next = *((void**)_freeList);
obj = (T*)_freeList;
_freeList = next;
else
// 剩余内存不够一个对象大小时,则重新开大块空间
if (_remainBytes < sizeof(T))
_remainBytes = 128 * 1024;
_memory = (char*)SystemAlloc(_remainBytes >> 13);
// 申请内存失败抛异常
if (_memory == nullptr)
throw std::bad_alloc();
//从大块内存中切出objSize字节的内存
obj = (T*)_memory;
//保证对象能够存下一个地址
size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
// 调整成员变量
_memory += objSize;
_remainBytes -= objSize;
// 定位new,显示调用T的构造函数初始化
new(obj)T;
return obj;
void Delete(T* obj)
// 显示调用析构函数清理对象
obj->~T();
// 头插
*(void**)obj = _freeList;
_freeList = obj;
private:
char* _memory = nullptr;// 指向大块内存的指针
size_t _remainBytes = 0;// 大块内存在切分过程中剩余字节数
void* _freeList = nullptr;// 还回来过程中链接的自由链表的头指针
;
性能检测
下面将定长内存池和malloc/free进行性能对比
先用new和delete多次申请和释放TreeNode结点,利用clock函数记录整个过程消耗的时间。再用我们自己设计的定长内存池的New和Delete多次申请和释放TreeNode结点,记录整个过程消耗的时间。对两次使用的时间进行比较。
测试代码如下
void TestObjectPool()
// 申请释放的轮次
const size_t Rounds = 5;
// 每轮申请释放多少次
const size_t N = 100000;
std::vector<TreeNode*> v1;
v1.reserve(N);
size_t begin1 = clock();
for (size_t j = 0; j < Rounds; ++j)
for (int i = 0; i < N; ++i)
v1.push_back(new TreeNode);
for (int i = 0; i < N; ++i)
delete v1[i];
v1.clear();
size_t end1 = clock();
std::vector<TreeNode*> v2;
v2.reserve(N);
ObjectPool<TreeNode> TNPool;
size_t begin2 = clock();
for (size_t j = 0; j < Rounds; ++j)
for (int i = 0; i < N; ++i)
v2.push_back(TNPool.New());
for (int i = 0; i < N; ++i)
TNPool.Delete(v2[i]);
v2.clear();
size_t end2 = clock();
cout << "new cost time:" << end1 - begin1 << endl;
cout << "object pool cost time:" << end2 - begin2 << endl;
这里我们调成Release版进行测试
从结果中我们可以看出,设计的定长内存池要比malloc和free快一些。但是定长内存池只适用于申请和释放固定大小的内存,而malloc和free可以申请和释放任意大小的内存。为了解决定长内存池的局限性,谷歌大佬设计了tcmalloc,下面模拟实现tcmalloc简易版本。
高并发内存池整体框架设计
现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc本身其实已经很优秀,那么我们项目的原型tcmalloc就是在多线程高并发的场景下更胜一筹,所以这次我们实现的内存池需要考虑以下几方面的问题。
- 性能问题。
- 多线程环境下,锁竞争问题。
- 内存碎片问题。
concurrent memory pool主要由以下3个部分构成:
- thread cache: 线程缓存是每个线程独有的,用于小于256KB的内存的分配,线程从这里申请内存不需要加锁,每个线程独享一个cache,这也就是这个并发线程池高效的地方。
- central cache: 中心缓存是所有线程所共享,thread cache是按需从central cache中获取的对象。central cache合适的时机回收thread cache中的对象,避免一个线程占用了太多的内存,而其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的目的。central cache是存在竞争的,所以从这里取内存对象是需要加锁,首先这里用的是桶锁,其次只有thread cache的没有内存对象时才会找central cache,所以这里竞争不会很激烈。
- page cache: 页缓存是在central cache缓存上面的一层缓存,存储的内存是以页为单位存储及分配的,central cache没有内存对象时,从page cache分配出一定数量的page,并切割成定长大小的小块内存,分配给central cache。当一个span的几个跨度页的对象都回收以后,page cache会回收central cache满足条件的span对象,并且合并相邻的页,组成更大的页,缓解内存碎片的问题。
thread cache
thread cache整体框架
thread cache是哈希桶结构,每个桶是一个按桶位置映射大小的内存块对象的自由链表。每个线程都会有一个thread cache对象,这样每个线程在这里获取对象和释放对象时是无锁的。
当线程要申请内存时,通过计算得到对齐后的字节数,从而找到对应的哈希桶,如果哈希桶中的自由链表不为空,就从自由链表中头删一块内存返回。如果哈希桶中的自由链表为空,就需要向下一层的central cache申请内存。
thread cache 代码框架如下
class ThreadCache
public:
// 申请和释放内存对象
void* Allocate(size_t size);
void Deallocate(void* ptr, size_t size);
// 从中心缓存获取对象
void* FetchFromCentralCache(size_t index, size_t size);
// 释放对象时,链表过长时,回收内存到中心缓存
void ListTooLong(FreeList& list, size_t size);
private:
// 哈希桶
FreeList _freeLists[NFREELIST];
;
// TLS thread local storage(TLS线程本地存储)
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
哈希桶中的自由链表是单链表结构,和上文实现的定长内存池一样,通过内存块的前4位或8位地址连接下一内存块。
代码如下
static void*& NextObj(void* obj)
return *(void**)obj;
class FreeList
public:
// 将释放的对象头插到自由链表
void Push(void* obj)
assert(obj);
//头插
NextObj(obj) = _freeList;
_freeList = obj;
++_size;
// 从自由链表头部获取一个对象
void* Pop()
assert(_freeList);
// 头删
void* obj = _freeList;
_freeList = NextObj(obj);
--_size;
return obj;
// 将释放的n个内存块头插入自由链表
void PushRange(void* start, void* end,size_t n)
NextObj(end) = _freeList;
_freeList = start;
_size += n;
// 从自由链表头部获取n个内存块
void PopRange(void*& start, void*& end, size_t n)
assert(n >= _size);
start = _freeList;
end = start;
// 确定获取内存块链表结尾
for (size_t i = 0; i < n - 1; i++)
end = NextObj(end);
_freeList = NextObj(end);
NextObj(end) = nullptr;
// 更新内存块剩余数量
_size -= n;
// 判断自由链表是否为空
bool Empty()
return _freeList == nullptr;
// 记录当前一次申请内存块的数量
size_t& MaxSize()
return _maxSize;
// 自由链表中内存块的数量
size_t Size()
return _size;
private:
void* _freeList=nullptr;// 指向自由链表的指针
size_t _maxSize = 1;// 一次申请内存块的数量
size_t _size = 0;// 记录自由链表中内存块数量
;
threadcache哈希桶映射对齐规则
对象大小的对齐映射规则
对象大小的对齐映射并不是均匀的,而是成倍增长的。对象大小的对齐映射固定不变的话,如果映射值较小,就会创建大量的哈希桶,例如256kb如果按照8byte划分,则会创建32768个哈希桶。如果映射值较大,又会造成大量的空间浪费,产生内碎片问题。
为了减少空间浪费率和创建哈希桶的内存开销,我们设计了如下映射关系
空间浪费率
空间浪费率为浪费的字节数除以对齐后的字节数,以129~1024这个区间为例,该区域的对齐数是16,那么最大浪费的字节数就是15,而最小对齐后的字节数就是这个区间内的前16个数所对齐到的字节数,也就是144,那么该区间的最大浪费率就是 15 ÷ 144 ≈ 10.42%
计算对象大小的对齐映射数
计算对象大小的对齐映射数时,我们可以先判断该字节属于哪个区间,再调用子函数完成映射
static size_t _RoundUp(size_t size, size_t alignNum)
size_t alignSize=0;
if (size%alignNum != 0)
alignSize = (size / alignNum + 1)*alignNum;
else
alignSize = size;
return alignSize;
//获取向上对齐后的字节数
static inline size_t RoundUp(size_t size)
if (size <= 128)
return _RoundUp(size, 8);
else if (size <= 1024)
return _RoundUp(size, 16);
else if (size <= 8 * 1024)
return _RoundUp(size, 128);
else if (size <= 64 * 1024)
return _RoundUp(size, 1024);
else if (size <= 256 * 1024)
return _RoundUp(size, 8 * 1024);
else
assert(false);
return -1;
子函数也可以利用位运算,位运算的速度是比乘法和除法更快的,但是这种方法不易想到
static inline size_t _RoundUp(size_t bytes, size_t alignNum)
return ((bytes + alignNum - 1)&~(alignNum - 1));
计算内存映射的哈希桶
获取字节对应的哈希桶下标时,也是先判断它在哪个区间,再调用子函数去找。
size_t _Index(size_t bytes, size_t alignNum)
if (bytes%alignNum == 0)
return bytes / alignNum - 1;
else
return bytes / alignNum;
// 计算映射的哪一个自由链表桶
static inline size_t Index(size_t bytes)
assert实现高并发内存池