项目设计高并发内存池—tcmalloc核心框架学习
Posted GG_Bond19
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了项目设计高并发内存池—tcmalloc核心框架学习相关的知识,希望对你有一定的参考价值。
目录
一、项目介绍
本项目实现的是一个高并发的内存池,其原型是Google的开源项目tcmalloc,tcmalloc全称Thread-Caching Malloc,即线程缓存malloc,实现了高效的多线程内存管理,用于替换系统的内存分配相关函数malloc和free
tcmalloc的知名度也是非常高的,不少公司都在用它,比如Go语言就直接用它做了内存分配器
本项目是将tcmalloc最核心的框架简化后拿出来,模拟实现出一个高并发内存池,目的是为了学习tcamlloc的精华
该项目主要涉及C/C++、数据结构(链表、哈希桶)、操作系统内存管理、单例模式、多线程、互斥锁等方面的技术
二、内存池的初步认识
2.1 池化技术
池化技术,就是程序先向系统申请过量的资源,然后自行进行管理,以备不时之需
之所以要申请过量的资源,是因为申请和释放资源都有较大的开销,不如提前申请一些资源放入"池"中,当需要资源时则可以直接从"池"中获取,不需要时就将该资源重新放回"池"中即可。这样使用时就会变得较为快捷,可以达到提高程序的运行效率的目的
在计算机中,有很多地方都使用了"池"这种技术,如连接池、线程池、对象池等。以服务器上的线程池为例,其主要思想就是:先启动若干数量的线程,让它们处于睡眠状态。当接收到客户端的请求时,再唤醒池中某个睡眠的线程,让它来处理客户端的请求,当处理完这个请求后,线程又进入睡眠状态
2.2 内存池
内存池是指程序预先向操作系统申请一块足够大的内存.此后,当程序中需要申请内存的时候,不需直接向操作系统申请,而是直接从内存池中获取;同理,当释放内存的时候,并不是真正将内存返回给操作系统,而是将内存返回给内存池。当程序退出时(或某个特定时间),内存池才将之前申请的内存真正释放
内存池所解决问题
内存池主要解决的就是效率的问题,其能够避免程序频繁的向系统申请和释放内存。其次,内存池作为系统的内存分配器,还需要尝试解决内存碎片的问题
内存碎片分为内部碎片和外部碎片:
外部碎片是一些空闲的小块内存区域,由于这些内存空间不连续,以至于合计的内存足够,但是不能满足一些内存分配申请需求
内部碎片是由于一些对齐的需求,导致分配出去的空间中一些内存无法被利用
注意: 内存池尝试解决的是外部碎片的问题,同时也尽可能的减少内部碎片的产生
2.3 malloc
C语言中动态申请内存并不是直接向堆申请的,而是通过malloc函数去申请的;C++中的new实际上也是封装了malloc函数
申请内存块时是先调用malloc,malloc再去向操作系统申请内存。malloc实际就是一个内存池,malloc相当于向操作系统"批发"了一块较大的内存空间,然后"零售"给程序用,当全部"售完"或程序有大量的内存需求时,再根据实际需求向操作系统"进货"
malloc的实现方式有很多种,一般情况下不同编译器平台用的是不同的。比如Windows的VS系列中的malloc就是微软自行实现的,而Linux下的gcc用的是glibc中的ptmalloc
三、定长内存池
malloc其实是一个通用的内存池,在什么场景下都适用,但也意味着malloc在什么场景下都不会具有很高的性能,因为malloc并不是针对某种场景专门设计的
定长内存池则是针对固定大小内存块的申请和释放的内存池,由于定长内存池只需要支持固定大小内存块的申请和释放,因此性能可以达到极致,并且在实现定长内存池时不需要考虑内存碎片等问题,因为申请/释放的都是固定大小的内存块
通过实现定长内存池可以熟悉对简单内存池的控制,其次,这个定长内存池也会在后面会作为高并发内存池的一个基础组件(代替new操作符)
实现定长
在实现定长内存池时要做到"定长"有许多方式,比如可以使用非类型模板参数,使得在该内存池中申请到的对象的大小都是N
template<size_t N>
class ObjectPool
;
定长内存池也可以被称为"对象池"。在创建对象池时,对象池可以根据传入的对象类型的大小来实现"定长",比如创建定长内存池时传入对象类型int,那么该内存池就只支持 sizeof(int) 字节大小内存的申请和释放
template<class T>
class ObjectPool
;
向堆区申请内存
既然是内存池,那么首先得向系统申请一块内存空间,然后对其进行管理。要想直接向堆申请内存空间,在Windows下可以调用VirtualAlloc()系统接口;在Linux下可以调用brk()或mmap()系统接口
#ifdef WIN32
#include <windows.h>
#else
#include <sys/mman.h>
#include <unistd.h>
#endif
inline static void* SystemAlloc(size_t kpage)//按页申请内存
#ifdef _WIN32
void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
//Linux下brk mmap等
#endif
if (ptr == nullptr) throw std::bad_alloc();
return ptr;
通过条件编译将对应平台下向堆区申请内存的函数进行封装,此后就不必再关心当前所在平台,当需要直接向堆区申请内存时直接调用封装后的SystemAlloc()函数即可
定长内存池中的成员变量
对于向堆区申请到的大块内存,可以用一个指针来对其进行管理,但仅用一个指针肯定不够,还需要一个变量来记录这块内存的长度
由于此后需要将这块内存进行切分,为了方便切分操作,指向这块内存的指针最好是字符指针,因为指针的类型决定了指针的步长以及查看数据的大小。对于字符指针而言,当需要向后移动n个字节时,直接对字符指针进行加n操作即可
使用完后释放回来的定长内存块也需被管理,可以将这些释放回来的定长内存块链接成一个链表。管理释放回来的内存块的链表被称为自由链表,为了能找到这个自由链表,因此还需要一个指向自由链表的指针
char* _memory = nullptr;//char*便于切割分配内存 指向大块内存
void* _freeList = nullptr;//自由链表 管理归还的内存块
size_t _remainBytes = 0;//记录剩余字节数
管理被释放内存块的具体方案
对于回收的定长内存块,可以使用自由链表将其链接起来,但并不需要为其专门定义链式结构,可以让内存块的前4个字节(32位平台)或8个字节(64位平台)存储后面内存块的起始地址
指针在32位平台上占用4个字节,在64位平台上占用8个字节,那么如何写出既适应32位平台也适应64位平台的代码呢?
当需要访问一个内存块的前4/8个字节时,可以先该内存块的首地址强转为二级指针,由于二级指针存储的是一级指针的地址,二级指针解引用能向后访问一个指针的大小(在32位下为4个字节、64位平台为8个字节,自动适应了环境),此时就访问到了该内存块的前4/8个字节,即下一个内存块的首地址
void*& NextObj(void* ptr) return *(void**)ptr;
申请对象
申请对象时,内存池应该优先把还回来的内存块对象再次重复利用,因此若自由链表中有内存块的话,就直接从自由链表中头删一个内存块直接返回即可
若自由链表中没有空闲内存块,那么就在大块内存中切出定长的内存块进行返回。当内存块切出后及时更行 _memory 指针的指向,以及 _remainBytes 的值即可
若大块内存已经不足以切分出一个对象时,就应该调用封装的SystemAlloc()函数,再次向堆申请一块内存空间,此时也要及时更新_memory指针的指向,以及_remainBytes的值(可能存在浪费内存,即所剩内存不足以切出一个对象但_memory却有了新的指向)
由于当内存块释放时需要将内存块链接到自由链表当中,因此必须保证切出来的对象至少能够存储得下一个地址,所以当对象的大小小于当前所在平台指针的大小时,需要按指针的大小进行内存块的切分,即需向上对齐
T* New()
T * obj = nullptr;
if (_freeList != nullptr)//优先使用分配过的内存
obj = (T*)_freeList;
_freeList = *(void**)_freeList;//强转为void**后解引用为void*,即在32位系统下可以看到4个字节,64位系统下可以看到8个字节
else
size_t objSize = sizeof(T) > sizeof(void*) ? sizeof(T) : sizeof(void*);//确保至少能存储一个指针大小
if (_remainBytes < objSize)//大块内存空间不足
_remainBytes = 128 * 1024;
_memory = (char*)SystemAlloc(_remainBytes >> 13);
obj = (T*)_memory;
_memory += objSize;
_remainBytes -= objSize;
new(obj)T;//定位new 调用对象构造函数初始化
return obj;
注意:这是一个定长对象内存池,当内存块切分出来后,应使用定位new,显示调用该对象的构造函数对其进行初始化
释放对象
注意:在释放对象时,应该显示调用该对象的析构函数清理该对象,因为该对象可能还管理着其他某些资源,若不对其进行清理那么这些资源将无法被释放,就会导致内存泄漏
析构后将该内存块头插入_freeList中即可
完整代码
#pragma once
#include <iostream>
using std::cout;
using std::endl;
#ifdef WIN32
#include <windows.h>
#else
#include <sys/mman.h>
#include <unistd.h>
#endif
inline static void* SystemAlloc(size_t kpage)
#ifdef _WIN32
void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
// linux下brk mmap等
#endif
if (ptr == nullptr) throw std::bad_alloc();
return ptr;
template <class T>
class ObjectPool
public:
T* New()
T * obj = nullptr;
if (_freeList != nullptr)//优先使用分配过的内存
obj = (T*)_freeList;
_freeList = *(void**)_freeList;//强转为void**后解引用为void*,即在32位系统下可以看到4个字节,64位系统下可以看到8个字节
else
size_t objSize = sizeof(T) > sizeof(void*) ? sizeof(T) : sizeof(void*);//确保至少能存储一个指针大小
if (_remainBytes < objSize)//大块内存空间不足
_remainBytes = 128 * 1024;
_memory = (char*)SystemAlloc(_remainBytes >> 13);
obj = (T*)_memory;
_memory += objSize;
_remainBytes -= objSize;
new(obj)T;//定位new 调用对象构造函数初始化
return obj;
void Delete(T* obj)
obj->~T();//调用对象析构函数
*(void**)obj = _freeList;//头插
_freeList = obj;
private:
char* _memory = nullptr;//char*便于切割分配内存 指向大块内存
void* _freeList = nullptr;//自由链表 管理归还的内存块
size_t _remainBytes = 0;//记录剩余字节数
;
性能对比
在只创建定长对象的情况下,使用下面的代码对new/delete和定长内存池进行性能对比
#include "Object_pool.h"
#include <vector>
#include <ctime>
using std::vector;
struct TreeNode
int _val;
TreeNode* _left;
TreeNode* _right;
TreeNode() :_val(0), _left(nullptr), _right(nullptr)
;
void TestObjectPool()
const size_t Rounds = 3;// 申请释放的轮次
const size_t N = 1000000;// 每轮申请释放多少次
std::vector<TreeNode*> v1;
v1.reserve(N);
size_t begin1 = clock();
for (size_t j = 0; j < Rounds; ++j)
for (int i = 0; i < N; ++i) v1.push_back(new TreeNode);
for (int i = 0; i < N; ++i) delete v1[i];
v1.clear();
size_t end1 = clock();
std::vector<TreeNode*> v2;
v2.reserve(N);
ObjectPool<TreeNode> TNPool;
size_t begin2 = clock();
for (size_t j = 0; j < Rounds; ++j)
for (int i = 0; i < N; ++i) v2.push_back(TNPool.New());
for (int i = 0; i < N; ++i) TNPool.Delete(v2[i]);
v2.clear();
size_t end2 = clock();
cout << "new cost time:" << end1 - begin1 << endl;
cout << "object pool cost time:" << end2 - begin2 << endl;
int main()
TestObjectPool();
return 0;
不难发现,定长内存池消耗的时间比malloc/free消耗的时间要短。这是因为malloc是一个通用的内存池,而定长内存池是专门针对申请定长对象而设计的,因此在这种特殊场景下定长内存池的效率更高
四、整体框架设计介绍
如今很多的开发环境都是多核多线程,因此在申请内存的时,必然存在激烈的锁竞争问题。malloc本身其实已经很优秀了,但是在并发场景下可能会因为频繁的加锁和解锁导致效率有所降低,而该项目的原型tcmalloc实现的就是一种在多线程高并发场景下更胜一筹的内存池
在实现内存池时一般需要考虑到效率问题和内存碎片的问题,但对于高并发内存池来说,还需要考虑在多线程环境下的锁竞争问题
- thread_cache: 线程缓存是每个线程独有的,用于小于等于256KB的内存分配,每个线程独享一个thread_cache
- central_cache: 中心缓存是所有线程所共享的,当thread_cache需要内存时会按需从central _cache中获取内存,而当thread_cache中的内存满足一定条件时,central_cache也会在合适的时机对其进行回收
- page_cache: 页缓存中存储的内存是以页为单位进行存储及分配的,当central_cache需要内存时,page_cache会分配出一定数量的页分配给central_cache,而当central_cache中的内存满足一定条件时,page_cache也会在合适的时机对其进行回收,并将回收的内存尽可能的进行合并,组成更大的连续内存块,缓解内存碎片的问题
解释:
- 每个线程都有一个属于自己的thread cache,也就意味着线程在thread_cache申请内存时是不需加锁的,而一次性申请大于256KB内存的情况是很少的,因此大部分情况下申请内存时都是无锁的,这也是高并发内存池高效的地方
- 每个线程的thread cache会根据自己的情况向central cache申请或归还内存,这避免了出现单个线程的thread cache占用太多内存,而其余thread cache出现内存吃紧的问题
- 多线程的thread cache可能会同时找central cache申请内存,此时就会涉及线程安全的问题,因此在访问central cache时是需要加锁的,但central cache实际上是一个哈希桶的结构,只有当多个线程同时访问同一个桶时才需要加锁,所以这里的锁竞争也不会很激烈
各个模块的作用
thread_cache主要解决锁竞争的问题,每个线程独享thread_cache,当自己的thread_cache中有内存时该线程不会去和其他线程进行竞争,每个线程只要在各自的thread_cache申请内存就行了
central_cache主要起到一个居中调度的作用,每个线程的thread_cache需要内存时从central _cache获取,而当thread_cache的内存多了就会将内存还给central_cache,其作用类似于一个中枢,因此取名为中心缓存
page_cache就负责提供以页为单位的大块内存,当central_cache需要内存时就会去向page_cache申请,而当page_cache没有内存了就会直接去找系统,也就是直接去堆上按页申请内存块
五、申请内存
5.1 ThreadCache
5.1.1 ThreadCache整体设计
定长内存池只需支持固定大小内存块的申请释放,因此定长内存池中只需一个自由链表管理释放回来的内存块。现在要支持申请和释放不同大小的内存块,那么就需要多个自由链表来管理释放回来的内存块。ThreadCache实际上是一个哈希桶结构,每个桶中存放的都是一个自由链表
ThreadCache支持小于等于256KB内存的申请,若将每种字节数的内存块都用一个自由链表进行管理的话,那么就需要20多万个(256*1024)自由链表,光是存储这些自由链表的头指针就需要消耗大量内存,这显然是得不偿失的
此时可以选择做一些平衡的牺牲,让这些字节数按照某种规则进行对齐,例如让这些字节数都按照8字节进行向上对齐。譬如当线程申请1~8字节的内存时会直接给出8字节,而当线程申请9~16字节的内存时会直接给出16字节,以此类推
因此当线程要申请某一大小的内存块时,就需要经过对齐规则计算得到对齐后的字节数,进而找到对应的哈希桶,若该哈希桶中的自由链表中有内存块,那就从自由链表中头删一个内存块进行返回;若该自由链表已经为空了,那么就需要向下一层的CentralCache进行获取
但由于对齐的原因,就会产生一些碎片化的内存无法被利用,比如线程只申请了6Byte的内存,而ThreadCache却直接给了8Byte的内存,多给出的2Byte就无法被利用,导致了一定程度的空间浪费,这些因为某些对齐原因导致无法被利用的内存,就是内存碎片中的内碎片问题
5.1.2 ThreadCache哈希桶映射与对齐规则
内存块是会被链接到自由链表上的,因此一开始肯定是按8字节进行对齐是最合适的,因为必须保证这些内存块,无论是在32位平台下还是64位平台下,都至少能够存储得下一个指针
但若所有的字节数都按照8字节进行对齐的话,那么就需要建立256 * 1024 ÷ 8 = 32768 个桶,这个数量还是比较多的,实际上可以让不同范围的字节数按照不同的对齐数进行对齐
虽然对齐产生的内碎片会引起一定程度上的空间浪费,但按照上面的对齐规则,可以将浪费率控制到百分之十左右。
需要说明的是,1~128这个区间不做讨论,因为1字节就算是对齐到2字节也有百分之五十的浪费率,并且小区间就算浪费率较高也并不会产生太大的浪费,这里从第二个区间开始进行计算
根据上面的公式,要得到某个区间的最大浪费率,就应该让分子取到最大,让分母取到最小。
比如 129~1024 这个区间的对齐数是16,那么最大浪费的字节数就是15,而最小对齐后的字节数就是这个区间内的前16个数所对齐到的字节数,即144。那么该区间的最大浪费率也就是15 ÷ 144 ≈ 10.42%。同样的道理,后面两个区间的最大浪费率分别是127 ÷ 1152 ≈ 11.02% 和1023 ÷ 9216 ≈ 11.10%。
对齐函数的编写
关于这个函数可以封装到一个DataHandleRules类中,但当中的成员函数最好设置为静态成员函数,否则在调用这些函数时就需要通过对象去调用。并且对于这些可能会频繁调用的函数,可以考虑将其设置为内联函数
在获取某一字节数向上对齐后的字节数时,可以先判断该字节数属于哪一个区间,然后再通过调用一个子函数进行进一步处理
static inline size_t AlignUp(size_t size)
if (size < 128) return _AlignUp(size, 8);
else if (size < 1024) return _AlignUp(size, 16);
else if (size < 8 * 1024) return _AlignUp(size, 128);
else if (size < 64 * 1024) return _AlignUp(size, 1024);
else if (size < 256 * 1024) return _AlignUp(size, 8 * 1024);
else
assert(false);
return -1;
此时就需要编写子函数,该子函数需要通过对齐数计算出某一字节数对齐后的字节数
//一般写法
static inline size_t _AlignUp(size_t bytes, size_t alignNum)
size_t alignSize = 0;
if (bytes%alignNum != 0)
alignSize = (bytes / alignNum + 1)*alignNum;
else
alignSize = bytes;
return alignSize;
除了上述写法还可以通过位运算的方式来进行计算,虽然位运算并不容易理解,但计算机执行位运算的速度是比执行乘法和除法更快的
static inline size_t _AlignUp(size_t bytes, size_t alignNum)
return ((bytes + alignNum - 1) & ~(alignNum - 1));
对于上述位运算,以10字节按8字节对齐为例进行分析。8 − 1 = 7,7就是一个低三位为1其余位为0的二进制序列,将10与7相加,相当于将10字节当中不够8字节的剩余字节数补上了
然后再将该值与7按位取反后的值(11000)进行与运算,而7按位取反后是一个低三位为0其余位为1的二进制序列,该操作进行后相当于屏蔽了该值的低三位而该值的其余位保持不变,此时得到的值就是10字节按8字节对齐后的值,即16字节
映射函数的编写
在获取某一字节数对应的哈希桶下标时,也是先判断该字节数属于哪一个区间,然后再通过调用子函数进一步处理
static inline size_t Index(size_t bytes)
assert(bytes <= MAX_BYTES);
static int group_array[4] = 16, 56, 56, 56 ;
if (bytes <= 128)
return _Index(bytes, 3);//8 等于 2的3次方
else if (bytes <= 1024)
return _Index(bytes - 128, 4) + group_array[0];
else if (bytes <= 8 * 1024)
return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];
else if (bytes <= 64 * 1024)
return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];
else if (bytes <= 256 * 1024)
return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];
else
assert(false);
return -1;
为了提高效率同样使用位运算来解决,但是此时传入的并不是该字节数的对齐数,而是将对齐数写成2的n次方的形式后,将这个n值进行传入。比如对齐数是8,传入的就是3
static inline size_t _Index(size_t bytes, size_t align_shift)
return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
以10字节按8字节对齐为例进行分析。此时传入的alignShift就是3,将1左移3位后得到的实际上就是对齐数8,8 − 1 = 7 ,即还是让10与7相加。
之后再将该值向右移3位,实际上就是让17除以8,此时相当于屏蔽了该值二进制的低三位,因为除以8得到的值与其二进制的低三位无关,所以我们可以说是将10对齐后的字节数除以了8,此时得到了2,而最后还需要减一是因为数组的下标是从0开始的
5.1.3 TSL无锁访问
每个线程都有一个各自独享的ThreadCache,那应该如何创建这个ThreadCache?显然不能将这个ThreadCache创建为全局属性,因为全局变量是所有线程共享的,这样就不可避免的需要使用锁来进行控制,会增加了控制成本和代码复杂度,并且效率也会有所降低
要实现每个线程无锁的访问属于独自的ThreadCache,可以使用线程局部存储TLS(Thread Local Storage)。这是一种变量的存储方法,使用该存储方法的变量在它所在的线程是全局可访问的,但是不能被其他线程访问到,这样就保持了数据的线程独立性
// TLS Thread Local Storage
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
通过TLS,每个线程可以无锁的获取各自专属的ThreadCache对象
if (pTLSThreadCache == nullptr)
pTLSThreadCache = new ThreadCache;
5.1.4 ThreadCache核心设计
按照上述的映射对齐规则,ThreadCache中桶的个数即自由链表的个数是208,以及ThreadCache允许申请的最大内存大小256KB,可以将这些数据按照如下方式进行定义:
static const size_t MAX_BYTES = 256 * 1024;//能在threadcache申请的最大字节数
static const size_t NFREELIST = 208;//thread_cache && central_cache 桶数
ThreadCache本质是一个存储208个自由链表的数组,目前ThreadCache就先提供一个Allocate()函数用于申请对即可,后面随着不断编写增加即可
class ThreadCache
public:
void* Allocate(size_t size);
private:
FreeList _freeLists[NFREELIST];
;
在ThreadCache申请对象时,通过所给字节数计算出对应的哈希桶下标。若桶中自由链表不为空,则从该自由链表中取出一个对象进行返回;但若此时自由链表为空,那么就需从CentralCache获取,FetchFromCentralCache()函数就是ThreadCache类中的一个成员函数
void* ThreadCache::Allocate(size_t size)
assert(size <= MAX_BYTES);
size_t alignSize = DataHandleRules::AlignUp(size);
size_t bucketIndex = DataHandleRules::Index(size);
if (!_freeLists[bucketIndex].IsEmpty())
return _freeLists[bucketIndex].Pop();
else
return FetchFromCentralCache(bucketIndex, alignSize);
慢开始反馈调节算法
当ThreadCache向CentralCache申请内存时,应该向CentralCach申请多少个小内存块呢?若申请的太少,那么ThreadCache在短时间内用完了又需要申请;但若一次性申请的太多,可能用不完就浪费了
鉴于此,这里采用慢开始反馈调节算法。当ThreadCache向CentralCache申请内存时,若申请的是较小的对象,那么可以多给一点,但若申请的是较大的对象,就可以少给一点
通过下面这个函数,就可以根据所需申请的内存块的大小计算出具体给出的内存块个数的上限值,并且将该上限值控制到2~512个之间。就算ThreadCache要申请的对象再小,最多CentralCache一次性给出512个内存块
static size_t MoveSize(size_t size)
assert(size > 0);
// [2, 512] 一次批量移动多少个对象的(慢启动)上限值
int num = MAX_BYTES / size;
if (num < 2) num = 2; //大对象一次批量上限低
if (num > 512) num = 512; //小对象一次批量上限高
return num;
既然计算的是上限值,那么具体该给出多少呢?
在FreeList结构中增加一个叫做_maxSize的成员变量,该变量的初始值设置为1,并且提供一个公有成员函数MaxSize()用于获取这个变量。即现在ThreadCache中的每个自由链表都会有一个各自的_maxSize
class FreeList//自由链表:用于管理切分过的小块内存
public:
void Push(void* obj)
assert(obj != nullptr);
NextObj(obj) = _freeList;
_freeList = obj;
++_size;
void* Pop()
assert(_freeList != nullptr);
void* obj = _freeList;
_freeList = NextObj(obj);
--_size;
return obj;
void PushRange(void* start, void* end, size_t n)//头插一段内存块
NextObj(end) = _freeList;
_freeList = start;
_size += n;
void PopRange(void*& start, void*& end, size_t n)
assert(n <= _size);
start = _freeList;
end = start;
for (size_t i = 0; i < n - 1; ++i)
end = NextObj(end);
_freeList = NextObj(end);
NextObj(end) = nullptr;
_size -= n;
bool IsEmpty() return _freeList == nullptr;
size_t& MaxSize() return _maxSize;
size_t Size() return _size;
private:
void* _freeList = nullptr;
size_t _maxSize = 1;
size_t _size = 0;
;
此时当ThreadCache申请对象时,会比较_maxSize和计算得出的值,取出其中的较小值作为本次申请对象的个数。若本次采用的是_maxSize的值,那么会将ThreadCache中该自由链表的_maxSize的值增加
ThreadCache第一次向CentralCache申请某大小的内存块时,申请到的都是一个,但下一次申请同样大小的对象时,因为该自由链表中的_maxSize增加了,就会申请到三个。直到该自由链表中_maxSize的值,超过上限值后就不会继续增长了,此后申请到的内存块数都是计算出的上限值
//慢开始反馈调节算法
//并不会一开始一批量向central_cache索要太多,可能使用不完
size_t batchNum = min(_freeLists[index].MaxSize(), DataHandleRules::MoveSize(size));
if (batchNum == _freeLists[index].MaxSize())
_freeLists[index].MaxSize() += 2;//若不断需要size大小的内存,那么batchNum就会不断增长直至上限
ThreadCache向CentralCache申请内存块
每次ThreadCache向CentralCache申请对象时,先通过慢开始反馈调节算法计算出本次应申请的小内存块的个数,然后再向CentralCache进行申请
若ThreadCache最终申请到小内存块的个数为1,那么直接将该内存块返回即可。为什么需要返回一个申请到的内存呢?因为ThreadCache要向CentralCache申请内存块,其实是由于某个线程向ThreadCache申请但ThreadCache当中没有,才导致ThreadCache向CentralCache申请内存块。因此CentralCache将内存块返回给ThreadCache后,ThreadCache会将该内存块返回给申请对象的线程
但若ThreadCache最终申请到多个内存块,那么除了将第一个内存块返回之外,还需要将剩下的内存块挂入ThreadCache对应的哈希桶中
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
//慢开始反馈调节算法
//并不会一开始一批量向central_cache索要太多,可能使用不完
size_t batchNum = min(_freeLists[index].MaxSize(), DataHandleRules::MoveSize(size));
if (batchNum == _freeLists[index].MaxSize())
_freeLists[index].MaxSize() += 2;//若不断需要size大小的内存,那么batchNum就会不断增长直至上限
void* start = nullptr, * end = nullptr;
size_t actualNum = CentralCache::GetInstance()->FetchMemoryBlock(start, end, batchNum, size);
assert(actualNum > 0);//至少分配一个
if (actualNum == 1)
assert(start == end);
return start;
else
_freeLists[index].PushRange(NextObj(start), end, actualNum - 1);//将后面的内存头插thread_cache自由链表中
return start;//将第一个内存块返回给外面使用
5.2 CentralCache
5.2.1 CentralCache整体设计
CentralCache与ThreadCache的相同之处
CentralCache结构与ThreadCache是基本类似的,都为哈希桶结构,并且遵循的对齐映射规则相同。这样的好处就是:当ThreadCache的某个桶中没有内存了,就可以直接到CentralCache中相同位置的哈希桶里去索取内存
CentralCache与ThreadCache的不同之处
CentralCache与ThreadCache有两个明显不同的地方:
- ThreadCache是每个线程独享的,但是CentralCache是所有线程共享的。每个线程的Thread Cache没有内存了都会去找CentralCache,因此在访问CentralCache时是需要加锁的。但在加锁时并不是将整个CentralCache全部锁上,而是使用桶锁,即每个桶都有一个锁。只有当多个线程同时访问CentralCache的同一个桶时才会存在锁竞争,若是多个线程同时访问CentralCache的不同桶就不会存在锁竞争,这也使得锁竞争并不是十分激烈
- ThreadCache的每个桶中挂的是一个个切好的内存块,而CentralCache的每个桶中挂的是一个个的Span(跨度)
每个span管理的都是一个以页为单位的大块内存,每个桶里面的若干span是按照双链表的形式链接起来的,并且每个span里面还有一个自由链表,这个自由链表里面挂的就是一个个切好了的内存块,根据其所在的哈希桶被切成了对应的大小
5.2.2 CentralCache结构设计
页号的类型
每个程序运行起来后都有自己的进程地址空间,在32位平台下,进程地址空间的大小是字节;而在64位平台下,进程地址空间的大小就是字节
页的大小一般是4K或者8K,以8K为例。在32位平台下,进程地址空间就可以被分成 ÷= 个页;在64位平台下,进程地址空间就可以被分成÷=个页。页号本质与地址一样,都是一个编号,只不过地址是以一个字节为一个单位,而页是以多个字节为一个单位
由于页号在64位平台下的取值范围是[0,) ,因此不能简单的用一个无符号整型来存储页号(只使用32位环境下),这时需要借助条件编译来解决该问题
//Win64环境下_WIN64和_WIN32都存在,Win32环境下只存在_WIN32
#ifdef _WIN64
typedef unsigned long long PAGE_ID;
#elif _WIN32
typedef size_t PAGE_ID;
#else//Linux
//...
#endif
Span的结构
CentralCache的每个桶里挂的是一个个的Span,Span是管理以页为单位的大块内存的,其结构如下:
//管理多个页的跨度结构
struct Span
Span* _prev = nullptr;//双向链表中的结构
Span* _next = nullptr;
PAGE_ID _pageId = 0;//页号
size_t _num = 0;//页的数量
void* _freeList = nullptr;//自由链表
size_t _use_count = 0;//记录已分配给ThreadCache的小块内存的数量
;
对于Span管理的以页为单位的大块内存,需要知道这块内存具体在哪一个位置,以便于之后PageCache进行前后页的合并缓解内存碎片问题,因此Span结构当中会记录所管理大块内存起始页的页号 (具体如何合并在后面讲解)
每一个Span管理多少页并不是固定的,由后面的算法来控制,因此span结构中有一个_num成员来代表着该Span管理的页的数量
每个Span管理的大块内存,都会被切成小内存块挂到当前Span的自由链表中,比如8Byte哈希桶中的Span,会被切成一个个8Byte大小的内存块挂到当前Span的自由链表中,因此Span结构中需要自由链表_freeList来存储小块内存块
Span结构中的_use_count成员记录的是,当前Span中已经分配给TreadCache的小块内存块,当某个Span的_use_count计数变为0时,代表当前Span分配出去的小内存块已经全部还回来了,此CentralCache就可以将这个Span再还给PageCache
每个桶当中的Span是以双链表的形式组织起来的,当需要将某个Span归还给PageCache时,就可以很方便的将该Span从双链表结构中移出。若用单链表结构的话则较为麻烦,因为单链表在删除时需要知道当前结点的前一个结点
双链表结构
CentralCache的每个哈希桶中存储的都是一个双链表结构,对于该双链表结构可以进行封装:
class SpanList
public:
SpanList()
_head = new Span;
assert(_head != nullptr);
_head->_next = _head;
_head->_prev = _head;
Span* Begin() return _head->_next;
Span* End() return _head;
bool IsEmpty() return _head == _head->_next;
void PushFront(Span* span) Insert(Begin(), span);
Span* PopFront()
Span* front = _head->_next;
Erase(front);
return front;
void Insert(Span* pos, Span* newSpan)
assert(pos);
assert(newSpan);
Span* prev = pos->_prev;
newSpan->_next = pos;
prev->_next = newSpan;
newSpan->_prev = prev;
pos->_prev = newSpan;
void Erase(Span* pos)
assert(pos != nullptr);
assert(pos != _head);
Span* prev = pos->_prev;
Span* next = pos->_next;
prev->_next = next;
next->_prev = prev;
private:
Span* _head = nullptr;
public:
std::mutex _mtx;//桶锁
;
注意:从双链表删除的Span会还给PageCache,相当于只是把这个Span从双链表中移除,不需要对删除的Span进行delete操作
5.2.3 CentralCaChe核心设计
所有线程使用的都是同一个CentralCache,即在整个项目中只需要有一个CentralCache对象即可,那么可以使用单例模式进行CentralCache的编写
单例模式可以保证项目中该类只有一个实例,并提供一个访问它的全局访问点,该实例被所有程序模块共享。单例模式又分为饿汉模式和懒汉模式,懒汉模式较为复杂,这里使用饿汉模式即可
//饿汉单例模式
class CentralCache
public:
static CentralCache* GetInstance();
private:
SpanList _spanLists[NFREELIST];
private:
CentralCache()
CentralCache(const CentralCache&) = delete;
static CentralCache _sInst;//声明
;
CentralCache CentralCache::_sInst;
CentralCache* CentralCache::GetInstance() return &_sInst;
CentralCache向ThreadCache提供内存块
要从CentralCache获取batchNum个指定大小的内存块,这些内存块肯定都是从CentralCache对应哈希桶中的某个Span中取出来的,因此取出来的这batchNum个内存块是链接在一起的,只需要得到这段链表的头和尾即可,可以采用输出型参数进行获取
size_t CentralCache::FetchMemoryBlock(void*& start, void*& end, size_t batchNum, size_t size)
size_t index = DataHandleRules::Index(size);
_spanLists[index]._mtx.lock();
Span* span = GetOneSpan(_spanLists[index], size);
assert(span);
assert(span->_freeList);
//从span中获取batchNum块内存块,若不够则有多少获取多少
start = span->_freeList;
end = span->_freeList;
size_t count = 0, actualNum = 1;
while (NextObj(end) != nullptr && count < batchNum - 1)
end = NextObj(end);
++count;
++actualNum;
span->_freeList = NextObj(end);
NextObj(end) = nullptr;
span->_use_count += actualNum;
_spanLists[index]._mtx.unlock();
return actualNum;
由于CentralCache是所有线程共享的,所以在访问CentralCache中的哈希桶时,需要先给对应的哈希桶加上桶锁,在获取到对象后再将桶锁解掉
在向CentralCache获取内存块时,先是在CentralCache对应的哈希桶中获取到一个非空的Span,然后从这个Span的自由链表中取出batchNum个对象即可,但可能这个非空的span的自由链表当中对象的个数不足batchNum个,这时该自由链表当中有多少个对象就给多少即可
ThreadCache实际从CentralCache获得的对象的个数可能与传入的batchNum值是不一样的,因此需要统计本次过程中实际ThreadCache获取到的内存块个数,并根据该值及时更新_use_count
虽然实际申请到对象的个数可能比batchNum要小,但这并不会产生任何影响。因为ThreadCache的本意就是向CentralCache申请一个内存块,之所以一次多申请一些内存块,是因为制定了这样的策略来提高效率,使得下次线程再申请相同大小的对象时就可以直接在ThreadCache中获取,而不用再向CentralCache申请对象
CentralCache从PageCache中获取一个非空的Span
ThreadCache向CentralCache申请内存块时,CentralCache需要先从对应的哈希桶中获取到一个非空的Span,然后从这个非空的Span中取出若干内存块给ThreadCache
首先遍历CentralCache对应哈希桶中的双链表,若该双链表中有非空的Span,那么直接将该Span进行返回即可。但若遍历双链表后发现双链表中没有空闲的Span,那么此时CentralCache就需要向PageCache申请内存块
但是,该向PageCache申请多大的内存块呢?可以根据具体所需内存块的大小来决定,之前就根据所需内存块的大小计算出ThreadCache一次向CentralCache申请内存块的个数上限,现在则是根据所需内存块的大小计算出CentralCache一次应该向PageCache申请几页的Span
先根据所需内存块的大小计算出ThreadCache一次向CentralCache申请内存块的个数上限,然后将这个上限值乘以单个内存块的大小,就算出了具体需要多少字节,最后再将这个算出来的字节数转换为页数。若转换后不够一页,那么就申请一页,否则转换出来是几页就申请几页。即Central Cache向PageCache申请内存时,要求申请到的内存尽量能够满足ThreadCache向CentralCache申请时的上限
//一次central_cache向page_cache获取多少页的Span
static size_t NumMovePage(size_t size)
size_t num = MoveSize(size);
size_t npage = num * size;
npage >>= PAGE_SHIFT;
if (npage == 0) npage = 1;
return npage;
代码中的PAGE_SHIFT代表页大小转换偏移。以页的大小为8K为例,PAGE_SHIFT的值为13
//页大小转换偏移 一页为2^13,即8KB
static const size_t PAGE_SHIFT = 13;
当CentralCache申请到若干页的Span后,还需要将这个Span切成一个个对应大小的小内存块挂到该Span的自由链表中
找到一个Span所管理的大块内存块呢?首先需要计算出该Span的起始地址,即用这个Span的起始页号乘以一页的大小即可得到这个Span的起始地址,然后用这个Span的页数乘以一页的大小就可以得到这个Span所管理的内存块的大小,用起始地址加上内存块的大小即可得到这块内存块的结束位置
明确了这块内存的起始和结束位置后,就可以进行切分了。根据所需内存块的大小,每次从大块内存切出一块固定大小的内存块尾插到Span的自由链表中即可
为什么是尾插呢?因为若是将切好的内存块尾插到自由链表,这些内存块看起来是按照链式结构链接起来的,而实际其在物理空间上是连续的,这时当把这些连续内存分配给某个线程使用时,可以提高该线程的CPU缓存命中率
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
//查看当前CentralCache中的spanlist中是否有还有尚未分配的span
Span* it = list.Begin();
while (it != list.End())
if (it->_freeList != nullptr)
return it;
else
it = it->_next;
list._mtx.unlock();//将central_cache桶锁释放,此时若其他线程释放内存回来并不会导致阻塞
//运行到此处时即没有空闲span,只能向Page_cache索取
PageCache::GetInstance()->_pageMutex.lock();
Span* span = PageCache::GetInstance()->NewSpan(DataHandleRules::NumMovePage(size));
PageCache::GetInstance()->_pageMutex.unlock();
//计算span的大块内存的起始地址和大块内存的大小(字节数)
char* start = (char*)(span->_pageId << PAGE_SHIFT);
size_t bytes = span->_num << PAGE_SHIFT;
char* end = start + bytes;
//将大块内存切成自由链表并链接起来
span->_freeList = start;
start += size;
void* tail = span->_freeList;
while (start < end)
NextObj(tail) = start;
tail = NextObj(tail);
start += size;
NextObj(tail) = nullptr;
list._mtx.lock();
list.PushFront(span);//将span挂入对应的桶中
return span;
在访问PageCache前,应先把CentralCache对应的桶锁解开。虽然此时CentralCache的这个桶当中是没有内存供其他ThreadCache申请的,但ThreadCache除了申请内存还会归还内存,若在访问PageCache前将CentralCache对应的桶锁解开,那么此时其他ThreadCache想要归还内存到CentralCache的这个桶时就不会发生阻塞
因此在调用NewSpan()函数之前,应先将CentralCache对应的桶锁解掉,然后再将PageCache的大锁加上,当申请到k页的Span后将PageCache的大锁解开,但此时不需要立刻加上桶锁。因为CentralCache拿到k页的Span后还需进行切分操作,此时别的线程访问不到该Span,可以在Span切好后需要将其挂入Central Cache对应的桶上时,再加上对应的桶锁
5.3 PageCache
5.3.1 PageCache整体设计
PageCache与CentralCache结构的相同之处
PageCache与CentralCache一样,都是哈希桶结构,并且PageCache的每个哈希桶中挂的也是一个个的Span,并且也是按照双链表的结构链接起来的
PageCache与CentralCache结构的不同之处
CentralCache的映射规则与ThreadCache保持一致,而PageCache的映射规则与它们都不相同PageCache的映射规则采用的是直接定址法,如1号桶挂的都是1页的Span,2号桶挂的都是2页的span,以此类推
CentralCache每个桶中的Span被切成了一个个对应大小的内存块,以供ThreadCache申请。而PageCache当中的Span是没有被进一步切小的,因为PageCache服务的是CentralCache,当CentralCache没有Span时,向PageCache申请某一固定页数的Span,而切分申请到的这个Span由CentralCache完成
PageCache中有多少个桶由编写自行决定,本博客中采用的就是最大128页的方案。为了让桶号与页号对应,将第0号桶空出,因此需要将哈希桶的个数设置为129
//page_cache的桶数+1 || page_cache的最大页数+1 (下标为0位置空出)
static const size_t NPAGES = 129;
本博客为什么采用最大128页的方案呢?因为线程申请单个内存块最大是256KB,而128页可以正好被切成4个256KB的内存块,因此是足够的。若是在采用更大页的方案也是可以的,根据具体的需求进行设置即可
PageCache类设计
当每个线程的ThreadCache没有内存时都会向CentralCache申请,此时多个线程的ThreadCache若访问的不是同一个桶,那么这些线程是可以同时进行访问的。这时CentralCache的多个桶就可能同时向PageCache申请内存的,所以PageCache是存在线程安全问题的,因此在访问PageCache时是必须要加锁的
但是在PageCache中不能使用桶锁,因为当CentralCache向PageCache申请内存时,PageCache可能会将其他桶当中大页的Span切小后再给CentralCache。此外,当CentralCache将某个Span归还给PageCache时,PageCache也会尝试将该Span与其他桶当中的Span进行合并
即在访问PageCache时,可能需要访问PageCache中的多个桶,若PageCache用桶锁就会出现大量频繁的加锁和解锁,导致程序的效率低下。因此在访问PageCache时使用一个大锁将整个Page Cache锁住
PageCache对象在整个进程中也是只能存在一个的,因此需要将其设计为单例模式
//饿汉单例模式
class PageCache
public:
static PageCache* GetInstance();//提供一个全局访问接口
private:
PageCache()
PageCache(const PageCache&) = delete;
static PageCache _sInst;//声明
SpanList _spanLists[NPAGES];
public:
std::mutex _pageMutex;//整个page_cache的锁
;
PageCache PageCache::_sInst;
PageCache* PageCache::GetInstance() return &_sInst;
5.3.2 PageCache获取Span
当调用上述的GetOneSpan()尝试从CentralCache的某个哈希桶获取一个非空的Span时,若遍历哈希桶中的双链表后发现双链表中没有Span,或该双链表中的Span都为空,那么CentralCache就需向PageCache申请若干页的Span了,PageCache获取一个k页的Span并提供给CentralCache呢?
PageCache是直接按照页数进行映射的,若CentralCache要获取一个k页的Span,在PageCache的第k号桶中取出一个Span返回给CentralCache即可。但若第k号桶中没有Span了,这时并不是直接转而向堆区申请一个k页的Span,而是要继续在后面更大的桶中寻找Span
直接向堆申请以页为单位的内存时,应尽量申请大块一点的内存块,因为此时申请到的内存是连续的,当线程需要内存时可以将其切小后分配给线程,而当线程将内存释放后又可以将其合并成大块的连续内存。若向堆申请内存时申请的是小块内存,而需申请多次,那么申请到的内存就不一定是连续的了
当第k号桶中没有空闲的Span时,可以继续找第k+1号
高并发内存池——基于Google开源项目tcmalloc的简洁实现
从零实现一个高并发内存池
- 1. 初识高并发内存池
- 2. 小试牛刀--设计一个定长内存池
- 3. 高并发内存池整体框架设计
- 4. 梳理向内存池中申请内存的步骤
- 5. 梳理将内存还给内存池的步骤
- 6. 超过128页内存的申请与性能测试
- 7.性能瓶颈优化
1. 初识高并发内存池
1.1 项目介绍
当前项目是实现一个高并发的内存池,它的原型是google的一个开源项目tcmalloc,tcmalloc全称Thread-Caching Malloc,即线程缓存的malloc,实现了高效的多线程内存1管理,用于替代系统的内存分配相关的函数(malloc 、free)。
1.2 项目所需的知识
这个项目会用到C/C++、数据结构(链表、哈希桶)、操作系统内存管理、单例模式、多线程、互斥锁等方面的知识。
1.3 了解池化技术
所谓“池化技术”,就是程序先向系统申请过量的资源,然后自己管理,以备不时之需。之所以要申请过量的资源,是因为每次申请该资源都有较大的开销,不如提前申请好了,这样使用时就会变得非常快捷,大大提高程序运行效率。
在计算机中,有很多使用“池”这种技术的地方,除了内存池,还有连接池、线程池、对象池等。以服务器上的线程池为例,它的主要思想是:先启动若干数量的线程,让它们处于睡眠状态,当接收到客户端的请求时,唤醒池中某个睡眠的线程,让它来处理客户端的请求,当处理完这个请求,线程又进入睡眠状态。
内存池是指程序预先从操作系统申请一块足够大内存,此后,当程序中需要申请内存的时候,不是直接向操作系统申请,而是直接从内存池中获取;同理,当程序释放内存的时候,并不真正将内存返回给操作系统,而是返回内存池。当程序退出(或者特定时间)时,内存池才将之前申请的内存真正释放。
1.4 内存池主要解决的问题
内存池主要解决的是效率的问题,其次如果作为系统的内存分配器的角度,还需要解决一下内存碎片的问题。那什么是内存碎片呢?
这是我们申请空间的一般步骤:
当我们将指针a与c管理的空间释放后,堆区现在有2KB的空间,但是我们要申请2KB的空间却申请不出来,因为这两块空间碎片化,不连续了!
上面讲的是外碎片问题,指的是一些空闲的连续内存区域太小,且这些内存空间不连续,以至于合计的内存足够,但是不能满足一些内存分配的申请需求。内存碎片问题也包括内部碎片的问题,指的是由于一些对齐的要求,导致分配出去的空间中一些内存无法被利用(比如struct结构体中的内存对齐)。
2. 小试牛刀–设计一个定长内存池
我们先来设计一个定长内存池做个开胃菜,当然这个定长内存池在我们后面的高并发内存池中也是有价值的,所以设计它的目的有两个,先熟悉一下简单内存池是如何控制的,第二它会作为我们后面内存池的一个基础组件。
内存池框架代码如下:
template<class T>
class ObjectPool
public:
T* New() //从内存池申请一块内存
void Delete(T* obj) //释放一块内存至内存池
private:
char* _memory = nullptr; //管理向系统申请的大块内存
void* _freeList = nullptr; //管理返回定长内存的头指针
size_t _remainBytes = 0; //记录大块内存所剩字节数
;
2.1 将内存还给定长内存池
因为我们采用了池化技术,所以对于还回来的内存,我们需要将它们组织管理起来,那么如何管理呢,我们采用类似于单链表的方式,如下图:
每块内存的前4个字节(32位下)或8个字节(64位下)存放的是指向下一块空间的指针,增加头指针是为了方便对还回的内存进行插入,否则每次插入都选择尾插效率太低。
对应Delete的实现:
void Delete(T* obj)
//显示调用析构函数处理对象
obj->~T();
//将该内存头插进自由链表中
(*(void**))obj = _freeList;
_freeList = obj;
2.2 向定长内存池中申请内存
- 当对应的内存池中已经有被还回来的内存块,即自由链表不为空时
我们直接从自由链表中获取内存块,头删一块内存块,如下图示意:
该部分代码实现如下:
T* New()
T* obj;
if (_freeList)
void* next = (*(void**))_freeList;
obj = (T*)_freeList;
_freeList = next;
return obj;
- 当自由链表为空并且所剩内存字节数不满足我们要求时,就要向系统申请以页为单位的空间(这里一页为8KB)
申请出的空间逻辑示意图如下:
当自由链表为空时,我们就在该大内存上切下我们需要的空间,示意图如下:
切下我们所需要大小的内存块,剩下的内存继续由_memory管理。
在此需要注意内存对齐的问题,比如我们的T是char类型的,当我们返回这块空间时,这块空间实际上是1字节,但自由链表存储空间的最低要求是4字节或8字节(因为要存指针),因此切内存时要注意内存对齐。
整体代码如下:
T* New()
T* obj;
if (_freeList)
void* next = (*(void**))_freeList;
obj = (T*)_freeList;
_freeList = next;
else
if (_remainBytes < sizeof(T)) //如果剩余内存不够了,向系统申请
_remainBytes = 1024 * 128;
_memory = (char*)SystemAlloc(_remainBytes >> 13);
if (_memory == nullptr)
throw std::bad_alloc();
obj = (T*)_memory;
size_t objSize = sizeof(T) > sizeof(void*) ? sizeof(T) : sizeof(void*); //内存对齐
_memory += objSize;
_remainBytes -= objSize;
new(obj)T; //显示调用new初始化
return obj;
3. 高并发内存池整体框架设计
现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc本身其实已经很优秀,那么我们项目的原型tcmalloc就是在多线程高并发的场景下更胜一筹,所以这次我们实现的内存池需要考虑以下几方面的问题。
1. 性能问题。
2. 多线程环境下,锁竞争问题。
3. 内存碎片问题。
concurrent memory pool主要由以下3个部分构成:
- thread cache:线程缓存是每个线程独有的,用于小于256KB的内存的分配,线程从这里申请内存不需要加锁,每个线程独享一个cache,这也就是这个并发线程池高效的方。
- central cache:中心缓存是所有线程所共享,thread cache是按需从central cache中获取的对象。central cache合适的时机回收thread cache中的对象,避免一个线程占用了太多的内存,而其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的目的。central cache是存在竞争的,所以从这里取内存对象是需要加锁,首先这里用的是桶锁,其次只有thread cache的没有内存对象时才会找central cache,所以这里竞争不会很激烈。
- page cache:页缓存是在central cache缓存上面的一层缓存,存储的内存是以页为单位存储及分配的,central cache没有内存对象时,从page cache分配出一定数量的page,并切割成定长大小的小块内存,分配给central cache。当一个span的几个跨度页的对象都回收以后,page cache会回收central cache满足条件的span对象,并且合并相邻的页,组成更大的页,缓解内存碎片的问题。
4. 梳理向内存池中申请内存的步骤
4.1 哈希表与内存对齐
在使用ThreadCache和CentralCache时,都需要根据所需内存的字节数去对应的哈希桶中申请内存,若我们采用一对一的哈希映射关系,即以字节为单位的任意大小的内存均要对应上哈希表的下标,且因为管理内存采用的是单链表的方式(申请的内存大小至少为4或8字节),即哈希表的下标范围应为[8,256*1024],显然下标范围这么大的哈希表不切实际,且实用性低。因此要采用一定的内存对齐原则。我们采用如下原则进行内存对齐:
这样不仅缩小哈希表长度,大幅减少哈希表占用的空间,而且有效控制了内碎片的浪费问题。
这样我们可以根据所需字节数算出对齐后的字节数,进而根据对齐后的字节数算出哈希表对应的桶的位置。
代码实现如下:
static const size_t MAX_BYTES = 256 * 1024; //申请空间的最大字节数
static const size_t NFREELIST = 208; //哈希表的长度
class SizeClass
public:
static size_t _RoundUp(size_t size, size_t align)
if (size % align == 0)
return size;
return (align - (size % align) + size);
static size_t _Index(size_t alignSize, size_t align)
return alignSize / align - 1;
static size_t RoundUp(size_t size) //算内存大小的对齐数
// 整体控制在最多10%左右的内碎片浪费
// [1,128] 8byte对齐 freelist[0,16)
// [128+1,1024] 16byte对齐 freelist[16,72)
// [1024+1,8*1024] 128byte对齐 freelist[72,128)
// [8*1024+1,64*1024] 1024byte对齐 freelist[128,184)
// [64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208)
if (size <= 128)
return _RoundUp(size, 8);
else if (size <= 1024)
return _RoundUp(size, 16);
else if (size <= 8 * 1024)
return _RoundUp(size, 128);
else if (size <= 64 * 1024)
return _RoundUp(size, 1024);
else if (size <= 256 * 1024)
return _RoundUp(size, 8 * 1024);
else
assert(size < 256 * 1024);
static size_t Index(size_t size) //计算哈希桶下标
assert(size < 256 * 1024);
size_t alignSize = RoundUp(size);
static int group_array[4] = 16,56,56,56 ;//每个对齐区间有多长
if (alignSize <= 128)
return _Index(alignSize,8);
else if (alignSize <= 1024)
return _Index(alignSize - 128 , 16) + group_array[0];
else if (alignSize <= 8 * 1024)
return _Index(alignSize - 1024 , 128) + group_array[0] + group_array[1];
else if (alignSize <= 64 * 1024)
return _Index(alignSize - 8 * 1024, 1024) + group_array[0] + group_array[1] + group_array[2];
else if (alignSize <= 256 * 1024)
return _Index(alignSize - 64 * 1024, 8 * 1024) + group_array[0] + group_array[1] + group_array[2] + group_array[3];
return -1;
;
4.2 ThreadCache结构分析与申请内存流程
- 线程如何独享ThreadCache
每个线程去申请ThreadCache时是在堆上申请的,但一个进程中每个线程共享一份堆区,所以无法保证每个线程独享一份ThreadCache,只能通过加锁的方式来共用ThreadCache。那有办法支持线程无锁访问ThreadCache并且每个线程都独享一份ThreadCache吗?答案是有的。即线程局部存储:线程局部存储(TLS),是一种变量的存储方法,这个变量在它所在的线程内是全局可访问的,但是不能被其他线程访问到,这样就保持了数据的线程独立性。
我们通过加关键字静态编译来实现该方法:
// TLS thread local storage
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
- ThreadCache基本结构
ThreadCache中是哈希表的结构,哈希表中的哈希桶采用单链表的形式将内存块链接起来,与定长内存池链接内存块的方式一致,结构逻辑图如下:
下面是哈希桶中自由链表的实现代码:
static void*& NextObj(void* obj)
return *(void**)obj;
class FreeList
public:
void Push(void* obj) //插入一块内存
assert(obj);
NextObj(obj) = _freeList;
_freeList = obj;
void* Pop() //弹出一块内存
assert(_freeList);
void* obj = nullptr;
obj = _freeList;
_freeList = NextObj(obj);
return obj;
bool Empty() //查看自由链表是否为空
return _freeList == nullptr;
private:
void* _freeList = nullptr;
;
当每个线程去自己独享的ThreadCache中申请内存时,ThreadCache会根据所需字节数先按照内存对齐的原则,再找到对应桶的下标,去取内存块给线程。当对应下标的桶中没有内存时,则该Threadcache应该向CentralCache申请内存。Threadcache结构代码如下:
class ThreadCache
public:
//申请和释放内存对象
void* Allocate(size_t size);
void Deallocate(void* ptr, size_t size);
//当内存不够时,向CentralCache获取对象
void* FetchFromCentralCache(size_t index, size_t size);
private:
FreeList _freeLists[NFREELIST]; //内存对齐的哈希表
;
// TLS thread local storage
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
Threadcache中Allocate函数实现较为简单,不再图解,实现代码如下:
void* ThreadCache::Allocate(size_t size)
assert(size <= MAX_BYTES);
size_t alignSize = SizeClass::RoundUp(size);
size_t index = SizeClass::Index(size);
void* obj = nullptr;
if (!_freeLists[index].Empty())
//证明该桶中有内存块
obj = _freeLists[index].Pop();
else
//该桶为空,那么我们要向CentralCache申请内存
obj = FetchFromCentralCache(index, alignSize);
return obj;
4.3 CentralCache结构分析与申请内存流程
4.3.1 CentralCache结构分析(承上启下,非常关键)
CentralCache也是一个哈希桶结构,它的哈希桶的映射关系跟ThreadCache是一样的。不同的是他的每个哈希桶位置挂的是SpanList链表结构,不过每个映射桶下面的Span中的大内存块被按映射关系切成了所对应特定字节的一个个小内存块对象挂在Span的自由链表中。结构示意图如下:
因为已经知晓了STL中的List,且该SpanList与STL中的List结构相似,因此这里直接将SpanList结构的代码给出
class SpanList
public:
SpanList()
_head = new Span;
_head->_next = _head;
_head->_prev = _head;
void Insert(Span* pos, Span* newSpan)
//在pos位之前插入span
assert(pos);
assert(newSpan);
Span* prevSpan = pos->_prev;
newSpan->_prev = prevSpan;
pos->_prev = newSpan;
prevSpan->_next = newSpan;
newSpan->_next = pos;
void Erase(Span* pos)
assert(pos);
assert(pos != _head);
Span* prevSpan = pos->_prev;
Span* nextSpan = pos->_next;
prevSpan->_next = nextSpan;
nextSpan->_prev = prevSpan;
private:
Span* _head;
public:
std::mutex _mtx; // 桶锁
;
那么Span的结构是什么样呢,我们将Google中Span的核心框架拿出来,一个个给大家解释:
#ifdef _WIN64
typedef unsigned long long PAGE_ID;
#elif _WIN32
typedef size_t PAGE_ID;
#else
#endif
static const size_t PAGE_SHIFT = 13;//一页是8kb,也就是2^13字节
struct Span
//1.一页的内存我们假定为8KB
//2.Span管理的是根据起始地址到末尾地址的大块内存
//3.管理以页为单位的大块内存
//4.管理多个连续页大块内存跨度结构
PAGE_ID _pageId = 0;//span管理的大块内存起始页的页号
size_t _n = 0; //span管理的有多少页
Span* _next = nullptr; //双向链表的结构
Span* _prev = nullptr;
size_t _objectSize = 0; //切好的小内存对象的大小
size_t _useCount = 0; //对被分配给threadcache的小块内存计数
void* _freeList = nullptr; //切好的小块内存的自由链表
bool _isUse = false; //该span是否在被使用
;
- 关于_pageId与_n
当我们走到最底层,即向系统申请内存空间时,系统返回给我们的指向这段空间起始地址的指针(32位系统下是4字节,64位系统下是八字节)。而_pageId就是与指针建立起映射关系!!假如现在我们申请的起始地址是(32位系统下)0xffff0000,那么它对应的起始页号_pageId就是524280。因为我们一页的大小为8KB,所以让它除以8192(十进制)就可以得出起始页号,同理根据它的页数_n的数量,就可以知道Span管理的大块内存的末尾地址,假设_n=1,那么它的末尾地址就是0xffff2000。而为了在不同的系统下都能够保存下_pageId的值,所以采用条件编译,32位系统下采用size_t类型,64位系统下采用unisgned long long类型。
- 关于_objectSize与_useCount
因为CentralCache采用的哈希表映射规则与Threadcache一致,所以每个Span挂到相应位置的桶的时候,它所切成的小块内存的大小必须是符合ThreadCache哈希表映射关系的,所以_objectSize可以是8字节-1024*256字节。前文说过,CentralCache是起一个调度作用的中间媒介,当ThreadCache中内存过多时,要进行回收并整合,同理当Span过多时,也要返回给PageCache,而_useCount就是对分出去的小内存块进行计数,并在必要时回收。
分析完CentralCache的哈希表结构后,再来思考一下如何将CentralCache只在全局中设计出一份,让每个线程都共享CentralCache。我们采用单例模式,即将CentralCache的构造和拷贝构造都加上delete,在全局只创建出唯一一个CentralCache。 代码如下:
class CentralCache
public:
CentralCache(const CentralCache& c) = delete;
CentralCache& operator=(const CentralCache& c) = delete;
static CentralCache* GetInstance() //单例模式
return &_sInst;
private:
CentralCache()
SpanList _spanLists[NFREELIST]; //与ThreadCache结构相似的哈希表
static CentralCache _sInst; //单例模式
;
ThreadCache来CentralCache中申请内存时,我们先要获取一个非空的Span,再从这个Span中拿出一段内存给ThreadCache。基于此要增加两个成员函数。
class CentralCache
public:
CentralCache() = delete;
CentralCache(const CentralCache& c) = delete;
CentralCache& operator=(const CentralCache& c) = delete;
高并发内存池——基于Google开源项目tcmalloc的简洁实现