高并发内存池——基于Google开源项目tcmalloc的简洁实现

Posted 赏一杯茶:

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了高并发内存池——基于Google开源项目tcmalloc的简洁实现相关的知识,希望对你有一定的参考价值。

从零实现一个高并发内存池

1. 初识高并发内存池

1.1 项目介绍

当前项目是实现一个高并发的内存池,它的原型是google的一个开源项目tcmalloc,tcmalloc全称Thread-Caching Malloc,即线程缓存的malloc,实现了高效的多线程内存1管理,用于替代系统的内存分配相关的函数(malloc 、free)。

1.2 项目所需的知识

这个项目会用到C/C++、数据结构(链表、哈希桶)、操作系统内存管理、单例模式、多线程、互斥锁等方面的知识。

1.3 了解池化技术

所谓“池化技术”,就是程序先向系统申请过量的资源,然后自己管理,以备不时之需。之所以要申请过量的资源,是因为每次申请该资源都有较大的开销,不如提前申请好了,这样使用时就会变得非常快捷,大大提高程序运行效率。
在计算机中,有很多使用“池”这种技术的地方,除了内存池,还有连接池、线程池、对象池等。以服务器上的线程池为例,它的主要思想是:先启动若干数量的线程,让它们处于睡眠状态,当接收到客户端的请求时,唤醒池中某个睡眠的线程,让它来处理客户端的请求,当处理完这个请求,线程又进入睡眠状态。
内存池是指程序预先从操作系统申请一块足够大内存,此后,当程序中需要申请内存的时候,不是直接向操作系统申请,而是直接从内存池中获取;同理,当程序释放内存的时候,并不真正将内存返回给操作系统,而是返回内存池。当程序退出(或者特定时间)时,内存池才将之前申请的内存真正释放。

1.4 内存池主要解决的问题

内存池主要解决的是效率的问题,其次如果作为系统的内存分配器的角度,还需要解决一下内存碎片的问题。那什么是内存碎片呢?
这是我们申请空间的一般步骤:

当我们将指针a与c管理的空间释放后,堆区现在有2KB的空间,但是我们要申请2KB的空间却申请不出来,因为这两块空间碎片化,不连续了!

上面讲的是外碎片问题,指的是一些空闲的连续内存区域太小,且这些内存空间不连续,以至于合计的内存足够,但是不能满足一些内存分配的申请需求。内存碎片问题也包括内部碎片的问题,指的是由于一些对齐的要求,导致分配出去的空间中一些内存无法被利用(比如struct结构体中的内存对齐)。

2. 小试牛刀–设计一个定长内存池

我们先来设计一个定长内存池做个开胃菜,当然这个定长内存池在我们后面的高并发内存池中也是有价值的,所以设计它的目的有两个,先熟悉一下简单内存池是如何控制的,第二它会作为我们后面内存池的一个基础组件。
内存池框架代码如下:

template<class T>
class ObjectPool 
public:
    T* New() //从内存池申请一块内存

    void Delete(T* obj) //释放一块内存至内存池
private:
    char* _memory = nullptr; //管理向系统申请的大块内存
    void* _freeList = nullptr; //管理返回定长内存的头指针
    size_t _remainBytes = 0; //记录大块内存所剩字节数
;

2.1 将内存还给定长内存池

因为我们采用了池化技术,所以对于还回来的内存,我们需要将它们组织管理起来,那么如何管理呢,我们采用类似于单链表的方式,如下图:

每块内存的前4个字节(32位下)或8个字节(64位下)存放的是指向下一块空间的指针,增加头指针是为了方便对还回的内存进行插入,否则每次插入都选择尾插效率太低。
对应Delete的实现:

void Delete(T* obj) 
        //显示调用析构函数处理对象
        obj->~T();

        //将该内存头插进自由链表中
        (*(void**))obj = _freeList;
        _freeList = obj;
    

2.2 向定长内存池中申请内存

  1. 当对应的内存池中已经有被还回来的内存块,即自由链表不为空时

我们直接从自由链表中获取内存块,头删一块内存块,如下图示意:


该部分代码实现如下:

T* New() 
        T* obj;
        if (_freeList) 
            void* next = (*(void**))_freeList;
            obj = (T*)_freeList;
            _freeList = next;
        
        return obj;
    
  1. 当自由链表为空并且所剩内存字节数不满足我们要求时,就要向系统申请以页为单位的空间(这里一页为8KB)

申请出的空间逻辑示意图如下:

当自由链表为空时,我们就在该大内存上切下我们需要的空间,示意图如下:


切下我们所需要大小的内存块,剩下的内存继续由_memory管理。
在此需要注意内存对齐的问题,比如我们的T是char类型的,当我们返回这块空间时,这块空间实际上是1字节,但自由链表存储空间的最低要求是4字节或8字节(因为要存指针),因此切内存时要注意内存对齐。
整体代码如下:

    T* New() 
        T* obj;
        if (_freeList) 
            void* next = (*(void**))_freeList;
            obj = (T*)_freeList;
            _freeList = next;
        
        else 
            if (_remainBytes < sizeof(T))  //如果剩余内存不够了,向系统申请
                _remainBytes = 1024 * 128;
                _memory = (char*)SystemAlloc(_remainBytes >> 13);
                if (_memory == nullptr) 
                    throw std::bad_alloc();
                
            
            obj = (T*)_memory;

            size_t objSize = sizeof(T) > sizeof(void*) ? sizeof(T) : sizeof(void*); //内存对齐
            _memory += objSize;
            _remainBytes -= objSize;
        
        new(obj)T; //显示调用new初始化
        return obj;
        
    

3. 高并发内存池整体框架设计

现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc本身其实已经很优秀,那么我们项目的原型tcmalloc就是在多线程高并发的场景下更胜一筹,所以这次我们实现的内存池需要考虑以下几方面的问题。
1. 性能问题。
2. 多线程环境下,锁竞争问题。
3. 内存碎片问题

concurrent memory pool主要由以下3个部分构成:

  1. thread cache:线程缓存是每个线程独有的,用于小于256KB的内存的分配,线程从这里申请内存不需要加锁,每个线程独享一个cache,这也就是这个并发线程池高效的方。
  2. central cache:中心缓存是所有线程所共享,thread cache是按需从central cache中获取的对象。central cache合适的时机回收thread cache中的对象,避免一个线程占用了太多的内存,而其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的目的。central cache是存在竞争的,所以从这里取内存对象是需要加锁,首先这里用的是桶锁,其次只有thread cache的没有内存对象时才会找central cache,所以这里竞争不会很激烈。
  3. page cache:页缓存是在central cache缓存上面的一层缓存,存储的内存是以页为单位存储及分配的,central cache没有内存对象时,从page cache分配出一定数量的page,并切割成定长大小的小块内存,分配给central cache。当一个span的几个跨度页的对象都回收以后,page cache会回收central cache满足条件的span对象,并且合并相邻的页,组成更大的页,缓解内存碎片的问题。

4. 梳理向内存池中申请内存的步骤

4.1 哈希表与内存对齐

在使用ThreadCache和CentralCache时,都需要根据所需内存的字节数去对应的哈希桶中申请内存,若我们采用一对一的哈希映射关系,即以字节为单位的任意大小的内存均要对应上哈希表的下标,且因为管理内存采用的是单链表的方式(申请的内存大小至少为4或8字节),即哈希表的下标范围应为[8,256*1024],显然下标范围这么大的哈希表不切实际,且实用性低。因此要采用一定的内存对齐原则。我们采用如下原则进行内存对齐:

这样不仅缩小哈希表长度,大幅减少哈希表占用的空间,而且有效控制了内碎片的浪费问题。
这样我们可以根据所需字节数算出对齐后的字节数,进而根据对齐后的字节数算出哈希表对应的桶的位置。
代码实现如下:

static const size_t MAX_BYTES = 256 * 1024; //申请空间的最大字节数
static const size_t NFREELIST = 208; //哈希表的长度

class SizeClass 
public:
    static size_t _RoundUp(size_t size, size_t align) 
        if (size % align == 0) 
            return size;
        
        return (align - (size % align) + size);
    
    static size_t _Index(size_t alignSize, size_t align) 
        return alignSize / align - 1;
    

    static size_t RoundUp(size_t size)  //算内存大小的对齐数
         // 整体控制在最多10%左右的内碎片浪费
         // [1,128]               8byte对齐       freelist[0,16)
         // [128+1,1024]          16byte对齐      freelist[16,72)
         // [1024+1,8*1024]       128byte对齐     freelist[72,128)
         // [8*1024+1,64*1024]    1024byte对齐    freelist[128,184)
         // [64*1024+1,256*1024]  8*1024byte对齐  freelist[184,208)
        if (size <= 128) 
            return _RoundUp(size, 8);
        
        else if (size <= 1024) 
            return _RoundUp(size, 16);
        
        else if (size <= 8 * 1024) 
            return _RoundUp(size, 128);
        
        else if (size <= 64 * 1024) 
            return _RoundUp(size, 1024);
        
        else if (size <= 256 * 1024) 
            return _RoundUp(size, 8 * 1024);
        
        else 
            assert(size < 256 * 1024);
        
    
    static size_t Index(size_t size)  //计算哈希桶下标
        assert(size < 256 * 1024);
        size_t alignSize = RoundUp(size);

        static int group_array[4] =  16,56,56,56 ;//每个对齐区间有多长
        if (alignSize <= 128) 
            return _Index(alignSize,8);
        
        else if (alignSize <= 1024) 
            return _Index(alignSize - 128 , 16) + group_array[0];
        
        else if (alignSize <= 8 * 1024) 
            return _Index(alignSize - 1024 , 128) + group_array[0] + group_array[1];
        
        else if (alignSize <= 64 * 1024) 
            return _Index(alignSize - 8 * 1024, 1024) + group_array[0] + group_array[1] + group_array[2];
        
        else if (alignSize <= 256 * 1024) 
            return _Index(alignSize - 64 * 1024, 8 * 1024) + group_array[0] + group_array[1] + group_array[2] + group_array[3];
        
        return -1;
    
 ;

4.2 ThreadCache结构分析与申请内存流程

  1. 线程如何独享ThreadCache

每个线程去申请ThreadCache时是在堆上申请的,但一个进程中每个线程共享一份堆区,所以无法保证每个线程独享一份ThreadCache,只能通过加锁的方式来共用ThreadCache。那有办法支持线程无锁访问ThreadCache并且每个线程都独享一份ThreadCache吗?答案是有的。即线程局部存储:线程局部存储(TLS),是一种变量的存储方法,这个变量在它所在的线程内是全局可访问的,但是不能被其他线程访问到,这样就保持了数据的线程独立性。
我们通过加关键字静态编译来实现该方法:

// TLS thread local storage
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
  1. ThreadCache基本结构

ThreadCache中是哈希表的结构,哈希表中的哈希桶采用单链表的形式将内存块链接起来,与定长内存池链接内存块的方式一致,结构逻辑图如下:

下面是哈希桶中自由链表的实现代码:

static void*& NextObj(void* obj)

    return *(void**)obj;


class FreeList 
public:
    void Push(void* obj)  //插入一块内存
        assert(obj);

        NextObj(obj) = _freeList;
        _freeList = obj;
        
    
    void* Pop()  //弹出一块内存
        assert(_freeList);

        void* obj = nullptr;
        obj = _freeList;
        _freeList = NextObj(obj);
        return obj;
    
bool Empty()  //查看自由链表是否为空
        return _freeList == nullptr;
    
private:
    void* _freeList = nullptr;
;


当每个线程去自己独享的ThreadCache中申请内存时,ThreadCache会根据所需字节数先按照内存对齐的原则,再找到对应桶的下标,去取内存块给线程。当对应下标的桶中没有内存时,则该Threadcache应该向CentralCache申请内存。Threadcache结构代码如下:

class ThreadCache 
public:
    //申请和释放内存对象
    void* Allocate(size_t size);
    void Deallocate(void* ptr, size_t size);

    //当内存不够时,向CentralCache获取对象
    void* FetchFromCentralCache(size_t index, size_t size);


private:
    FreeList _freeLists[NFREELIST]; //内存对齐的哈希表
;
// TLS thread local storage
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;

Threadcache中Allocate函数实现较为简单,不再图解,实现代码如下:

void* ThreadCache::Allocate(size_t size) 
    assert(size <= MAX_BYTES);

    size_t alignSize = SizeClass::RoundUp(size);
    size_t index = SizeClass::Index(size);

    void* obj = nullptr;
    if (!_freeLists[index].Empty()) 
        //证明该桶中有内存块
        obj = _freeLists[index].Pop();
    
    else 
        //该桶为空,那么我们要向CentralCache申请内存
        obj = FetchFromCentralCache(index, alignSize);
    
    return obj;

4.3 CentralCache结构分析与申请内存流程

4.3.1 CentralCache结构分析(承上启下,非常关键)

CentralCache也是一个哈希桶结构,它的哈希桶的映射关系跟ThreadCache是一样的。不同的是他的每个哈希桶位置挂的是SpanList链表结构,不过每个映射桶下面的Span中的大内存块被按映射关系切成了所对应特定字节的一个个小内存块对象挂在Span的自由链表中。结构示意图如下:

因为已经知晓了STL中的List,且该SpanList与STL中的List结构相似,因此这里直接将SpanList结构的代码给出

class SpanList 
public:
    SpanList() 
        _head = new Span;
        _head->_next = _head;
        _head->_prev = _head;
    
    void Insert(Span* pos, Span* newSpan) 
        //在pos位之前插入span
        assert(pos);
        assert(newSpan);

        Span* prevSpan = pos->_prev;
        newSpan->_prev = prevSpan;
        pos->_prev = newSpan;

        prevSpan->_next = newSpan;
        newSpan->_next = pos;
    
    void Erase(Span* pos) 
        assert(pos);
        assert(pos != _head);

        Span* prevSpan = pos->_prev;
        Span* nextSpan = pos->_next;

        prevSpan->_next = nextSpan;
        nextSpan->_prev = prevSpan;
    
private:
    Span* _head;
public:
    std::mutex _mtx; // 桶锁
;

那么Span的结构是什么样呢,我们将Google中Span的核心框架拿出来,一个个给大家解释:

#ifdef _WIN64
typedef unsigned long long PAGE_ID;
#elif _WIN32
typedef size_t PAGE_ID;
#else
#endif

static const size_t PAGE_SHIFT = 13;//一页是8kb,也就是2^13字节

struct Span 
    //1.一页的内存我们假定为8KB
    //2.Span管理的是根据起始地址到末尾地址的大块内存
    //3.管理以页为单位的大块内存
    //4.管理多个连续页大块内存跨度结构

    PAGE_ID _pageId = 0;//span管理的大块内存起始页的页号
    size_t _n = 0; //span管理的有多少页

    Span* _next = nullptr; //双向链表的结构
    Span* _prev = nullptr;

    size_t _objectSize = 0; //切好的小内存对象的大小
    size_t _useCount = 0; //对被分配给threadcache的小块内存计数
    void* _freeList = nullptr; //切好的小块内存的自由链表

    bool _isUse = false; //该span是否在被使用
;
  1. 关于_pageId与_n

当我们走到最底层,即向系统申请内存空间时,系统返回给我们的指向这段空间起始地址的指针(32位系统下是4字节,64位系统下是八字节)。而_pageId就是与指针建立起映射关系!!假如现在我们申请的起始地址是(32位系统下)0xffff0000,那么它对应的起始页号_pageId就是524280因为我们一页的大小为8KB,所以让它除以8192(十进制)就可以得出起始页号,同理根据它的页数_n的数量,就可以知道Span管理的大块内存的末尾地址,假设_n=1,那么它的末尾地址就是0xffff2000。而为了在不同的系统下都能够保存下_pageId的值,所以采用条件编译,32位系统下采用size_t类型,64位系统下采用unisgned long long类型。

  1. 关于_objectSize与_useCount

因为CentralCache采用的哈希表映射规则与Threadcache一致,所以每个Span挂到相应位置的桶的时候,它所切成的小块内存的大小必须是符合ThreadCache哈希表映射关系的,所以_objectSize可以是8字节-1024*256字节。前文说过,CentralCache是起一个调度作用的中间媒介,当ThreadCache中内存过多时,要进行回收并整合,同理当Span过多时,也要返回给PageCache,而_useCount就是对分出去的小内存块进行计数,并在必要时回收
分析完CentralCache的哈希表结构后,再来思考一下如何将CentralCache只在全局中设计出一份,让每个线程都共享CentralCache。我们采用单例模式,即将CentralCache的构造和拷贝构造都加上delete,在全局只创建出唯一一个CentralCache。 代码如下:

class CentralCache 
public:
    CentralCache(const CentralCache& c) = delete;
    CentralCache& operator=(const CentralCache& c) = delete;

    static CentralCache* GetInstance()  //单例模式
        return &_sInst;
    
private:
    CentralCache()
    SpanList _spanLists[NFREELIST]; //与ThreadCache结构相似的哈希表

    static CentralCache _sInst; //单例模式
;

ThreadCache来CentralCache中申请内存时,我们先要获取一个非空的Span,再从这个Span中拿出一段内存给ThreadCache。基于此要增加两个成员函数。

class CentralCache 
public:
    CentralCache() = delete;
    CentralCache(const CentralCache& c) = delete;
    CentralCache& operator=(const CentralCache& c) = delete;

    高并发内存池——基于Google开源项目tcmalloc的简洁实现

高并发内存池的介绍

高并发内存池的介绍

高并发内存池的介绍

实战项目高并发内存池

项目设计高并发内存池