高并发内存池

Posted 雨轩(爵丶迹)

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了高并发内存池相关的知识,希望对你有一定的参考价值。

高并发内存池

项目介绍

当前项目是实现一个高并发的内存池,原型是google的一个开源项目tcmalloc,tcmalloc全称Thread-Caching Malloc,即线程缓存的malloc,实现了高效的多线程内存管理,用于替代系统的内存分配相关的函数(malloc、free)。

什么是内存池

1.池化技术

所谓“池化技术”,就是程序先向系统申请过量的资源,然后自己管理,以备不时之需。之所以要申请过量的资源,是因为每次申请该资源都有较大的开销,不如提前申请好了,这样使用时就会变得非常快捷,大大提高程序运行效率

2.内存池

内存池是指程序预先从操作系统申请一块足够大内存,此后,当程序中需要申请内存的时候,不是直接向操作系统申请,而是直接从内存池中获取;同理,当程序释放内存的时候,并不真正将内存返回给操作系统,而是返回内存池。当程序退出(或者特定时间)时,内存池才将之前申请的内存真正释放。

3.内存池主要解决的问题

这一点是我们最关心的,也是为什么会有这个东西的原因。

内存池主要解决的是效率的问题。其次站在系统的内存分配器的角度,还需要解决一下内存碎
片的问题。那什么是内存碎片?

内存碎片分为外碎片和内碎片。上面是外碎片问题。外部碎片是一些空闲的连续内存区域太小,这些内存空间不连续,以至于合计的内存足够,但是不能满足一些的内存分配申请需求。内部碎片是由于一些对齐的需求,导致分配出去的空间中一些内存无法被利用。内碎片问题,在后面项目中就会看到。

4.malloc

C/C++中我们要动态申请内存都是通过malloc去申请内存,但是我们要知道,实际我们不是直接去堆获取内存的,而malloc就是一个内存池。malloc() 相当于向操作系统“批发”了一块较大的内存空间然后“零售”给程序用。当全部“售完”或程序有大量的内存需求时,再根据实际需求向操作系统“进货”。

设计一个自己的定长内存池(自己开超市)

作为程序员(C/C++)我们知道申请内存使用的是malloc,malloc其实就是一个通用的大众货,什么场景下都可以用,但是什么场景下都可以用就意味着什么场景下都不会有很高的性能

先熟悉一下简单内存池是如何控制的,第二他会作为我们后面内存池的一个基础组件。


定长内存池说明:当需要一块内存空间的时候,我们先到定长内存池中取内存对象(开始freelist为空)
1、先向系统申请一块很大的内存空间,这里我们申请的是128k,
2、从定长内存池中取走一块T类型大小的内存对象
3、释放的时候,直接归还给自由链表,下一次取内存对象的话先从自由链表freelist中取,如果freelist为空,在去定长内存池中取。

需要注意的是,自由链表中指向下一个结点的地址,这里取类型的转换,32位下是没有问题的,能取到4个字节,但是64位下,就存在问题了。

所以我们这里这里可以将 int* 改为 void**,32位和64位都是void*(指针,改成int**一样的)大小,指针会变化,这样64位下也可以取到下一个结点地址。

具体代码如下:

template<class T>
class ObjectPool

public:
    //开辟一块内存池
    T* New()
    
        T* obj = nullptr;
        //如果自由链表上面有内存块,则先从自由链表上面取
        if (_freeList)
        
            obj = (T*)_freeList;
            _freeList = *((void**)_freeList);//指向下一个内存块
            //_freeList = *((int*)_freeList);
        
        else//从定长内存池中取一块内存块
        
            //如果定长内存池中剩余字节连一个类型都无法满足,此时需要重新向系统申请一块内存
            if (_leftBytes < sizeof(T))
            
                _leftBytes = 128 * 1024;
                //                _memory = (char*)malloc(_leftBytes);
                _memory = (char*)SystemAlloc(_leftBytes >> 13);
                //有可能开辟失败
                if (_memory == nullptr)
                
                    throw std::bad_alloc();
                
            

            //取一块内存
            obj = (T*)_memory;
            //_memory向后移动一个T类型字节,确保能取到下一个结点地址
            size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
            _memory += objSize;//向后移动一个内存块
            _leftBytes -= objSize;//内存池减少一个内存块
        

        new(obj) T;//使用定位new调用T的构造函数初始化
        return obj;
    
    //不需要的内存给自由链表
    void Delete(T* obj)
    
        obj->~T();//显示调用T类型的析构函数进行资源清理
        //obj指向第一个结点,头指针freelist指向obj
        *((void**)obj) = _freeList;
        //*((int*)obj) = _freeList;
        _freeList = obj;
    
private:
    char* _memory = nullptr;//指向内存块的指针
    int _leftBytes = 0;//内存块中剩余字节
    void* _freeList = nullptr;//自由链表
;

高并发内存池框架整体设计

现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc本身其实已经很优秀,那么我们项目的原型tcmalloc就是在多线程高并发的场景下更胜一筹,所以这次我们实现的内存池需要考虑以下几方面的问题

  1. 性能问题。
  2. 多线程环境下,锁竞争问题。
  3. 内存碎片问题。

concurrent memory pool主要由以下3个部分构成:

  1. thread cache:线程缓存是每个线程独有的,用于小于256KB的内存的分配,线程从这里申请内存不需要加锁每个线程独享一个cache,这也就是这个并发线程池高效的地方
  2. central cache:中心缓存是所有线程所共享,thread cache是按需从central cache中获取的对象。central cache合适的时机回收thread cache中的对象,避免一个线程占用了太多的内存,而其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的目的。central cache是存在竞争的,所以从这里取内存对象是需要加锁,首先这里用的是桶锁,其次只有thread cache的没有内存对象时才会找central cache,所以这里竞争不会很激烈
  3. page cache:页缓存是在central cache缓存上面的一层缓存,存储的内存是以页为单位存储及分配的,central cache没有内存对象时,从page cache分配出一定数量的page,并切割成定长大小的小块内存,分配给central cache。当一个span的几个跨度页的对象都回收以后,page cache会回收central cache满足条件的span对象,并且合并相邻的页,组成更大的页,缓解内存碎片的问题

高并发内存池–Thread Cache(第一层)

thread cache是哈希桶结构,每个桶是一个按桶位置映射大小的内存块对象的自由链表。每个线程都会有一个thread cache对象,这样每个线程在这里获取对象和释放对象时是无锁的

申请内存

  1. 当内存申请size<=256KB,先获取到线程本地存储的thread cache对象,计算size映射的哈希桶自由链表下标index。
  2. 如果自由链表_freeLists[index]中有对象,则直接Pop一个内存对象返回。
  3. 如果_freeLists[index]中没有对象时,则批量从central cache中获取一定数量的对象,插入到自由链表并返回一个对象。

我们先处理自由链表,将映射关系建立好,自由链表的哈希桶跟对象大小的映射关系:

哈希桶

上面我们可以看到Thread Cache 从8bit、16bit…256KB增长的,这样的话我们就需要32768个桶,32768个桶是可以申请的,但是我们没有必要申请这么多,这样产生的自由链表太多,减少浪费。

我们来看看大佬的设计
内存对齐

 整体控制在最多10%左右的内碎片浪费    129 + 15   15 / 144 ~ 10%
 为什么不8个位去对齐?这样产生的自由链表太多,减少浪费
 [1,128] 8byte对齐 freelist[0,16)
 [128+1,1024] 16byte对齐 freelist[16,72)1024-128+1/16,增加56个链表,区间[16,72]
 [1024+1,8*1024] 128byte对齐 freelist[72,128)
 [8*1024+1,64*1024] 1024byte对齐 freelist[128,184)
 [64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208)
static inline size_t _RoundUp(size_t bytes, size_t alignNum)

	第一种写法
	size_t alignSize = 0;
	if (bytes % alignNum != 0)
	
		alignSize = (bytes / alignNum + 1) / alignNum;//+1向上提升,不足8个字节,需要对齐
	
	else
	
		alignSize = bytes;
	
	return alignSize;
	
	//7 + 8 - 1
	//&
	//7
	//1110
	//1000
	//==8
	第二种写法
	return ((bytes + alignNum - 1) & ~(alignNum - 1));

//对齐数划分区间
static inline size_t RoundUp(size_t size)

	if (size <= 128)
	
		return _RoundUp(size, 8);
	
	else if (size <= 1024)
	
		return _RoundUp(size, 16);
	
	else if (size <= 8 * 1024)
	
		return _RoundUp(size, 128);
	
	else if (size <= 64 * 1024)
	
		return _RoundUp(size, 1024);
	//256kb到thread cache的头
	else if (size <= 256 * 1024)
	
		return _RoundUp(size, 8 * 1024);
	
	else
	
		return _RoundUp(size, 1 << PAGE_SHIFT);
		//return -1;//超过,返回-1
	

对象大小

哈希映射位置,计算映射的那个一个自由链表桶
	1 + 7 8
	2       9
	 ...
	8      15 
static inline size_t _Index(size_t bytes, size_t align_shift)

	第一种写法
	if (bytes % align_shift == 0)
	
		return bytes / align_shift - 1;//16/8 - 1 = 1
	
	else
	
		return bytes / align_shift;//15 / 8 = 1
	
	//16 + 1<<3 -1  = 23 >> 3 -1 = 2 - 1 = 1
	第二种写法
	return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;

static inline size_t Index(size_t bytes)

	assert(bytes <= MAX_BYTES);//不能超过桶大小

	static int group_array[4] =  16, 56, 56, 56 ;
	if (bytes <= 128)
	
		return _Index(bytes, 3);
	
	else if (bytes <= 1024)
	
		return _Index(bytes - 128, 4) + group_array[0];//按照与算,需要-128,在加上一个区间有多少个自由链表。
	
	else if (bytes <= 8 * 1024)
	
		return _Index(bytes - 1024, 7) + group_array[0] + group_array[1];
	
	else if (bytes <= 64 * 1024)
	
		return _Index(bytes - 8 * 1024, 10) + group_array[0] + group_array[1] + group_array[2];
	
	else if (bytes <= 256 * 1024)
	
		return _Index(bytes - 64 * 1024, 13) + group_array[0] + group_array[1] + group_array[2] + group_array[3];
	
	else
	
		assert(false);
		return -1;
	

ThreadCache中的自由链表(FreeList)

FreeList这里是一个单链表,维护着申请和释放内存对象的关系。

//管理每个小对象的自由链表
class FreeList

public:
	//头插
	void Push(void* obj)
	
		assert(obj);
		NextObj(obj) = _freelist;
		_freelist = obj;
		++_size;
	
	//插入一段内存
	void PushRange(void* start, void* end, size_t n)
	
		NextObj(end) = _freelist;//指向第一个结点
		_freelist = start;//指向新插入的内存块的第一个结点
		_size += n;//链表长度
	
	//释放
	void PopRange(void* start, void* end, size_t n)
	
		assert(n >= _size);
		start = _freelist;
		end = start;
		//释放n个内存块到Central Cache中
		for (size_t i = 0; i < n - 1; ++i)
		
			end = NextObj(end);
		
		_freelist = NextObj(end);
		NextObj(end) = nullptr;
		_size -= n;//链表长度-n
	
	//借走一块内存块
	void* Pop()
	
		assert(_freelist);//确保链表不为空
		void* obj = _freelist;
		_freelist = NextObj(obj);
		--_size;
		return obj;
	
	bool Empty()
	
		return _freelist == nullptr;
	
	//需要修改,引用
	size_t& MaxSize()
	
		return _maxsize;
	
	//链表长度
	size_t Size()
	
		return _size;
	
private:
	void* _freelist = nullptr;//头指针
	size_t _maxsize = 1;//慢增长的大小
	size_t _size = 0;//链表的长度
;

其中的nextObj函数是取的指针obj的四个字节,指向一个结点,我们这里以头插举例:

//下一个结点地址
static void*& NextObj(void* obj)

	return *((void**)obj);

ThreadCache框架

  1. thread cache本质是由一个哈希映射的对象自由链表构成。
//封装了一层
class ThreadCache

public:
	void* Allocate(size_t size);//申请对象
	void Deallocate(void* ptr, size_t size);//释放对象
	// 从中心缓存获取对象
	void* FetchFromCentralCache(size_t index, size_t size);
	// 释放对象时,链表过长时,回收内存回到中心缓存
	void ListTooLong(FreeList& list, size_t size);
private:
	//自由链表
	FreeList _freeList[FREELISTNUM];//开辟208个
;
// TLS thread local storage
线程局部存储(TLS),是一种变量的存储方法,这个变量在它所在的线程内是全局可访问的,但是不能被其他线程访问到,这样就保持了数据的线程独立性。
而熟知的全局变量,是所有线程都可以访问的,这样就不可避免需要锁来控制,增加了控制成本和代码复杂度。
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;

线程局部存储说明(TLS):

  1. 线程局部存储(TLS),是一种变量的存储方法,这个变量在它所在的线程内是全局可访问的,但是不能被其他线程访问到,这样就保持了数据的线程独立性。所以threadcache中是不需要加锁的
  2. 而熟知的全局变量,是所有线程都可以访问的,这样就不可避免需要锁来控制,增加了控制成本和代码复杂度。

threadcache中申请的是小于256kb的内存对象,且有208个桶,我们这里定义出来

static const size_t MAX_BYTES = 256 * 1024;//256kb  thread cache这一层,每个线程独享
static const size_t FREELISTNUM = 208;//freelist的表大小

申请内存空间

void* ThreadCache::Allocate(size_t size)

	assert(size <= MAX_BYTES);//小于最大位数
	size_t alignSize = SizeClass::RoundUp(size);//获取对齐数
	size_t index = SizeClass::Index(size);//获取映射位置
	//如果自由链表不为空,直接弹出一个对象
	if (!_freeList[index].Empty())
	
		return _freeList[index].Pop();
	
	else
	
		//没有对象,从central cache层获取一定数量对象,插入到自由链表中并返回一个对象。
		return FetchFromCentralCache(index, alignSize);//这里的size注意是对齐后的
	

从中心缓存获取内存对象

慢增长方法,同网络的拥塞控制,开始需要1个给1个,后面1个给2个,2个给3个,慢慢增长,避免浪费内存。

void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)

	//1、最开始不会向central catch要太多对象,要多了用不完,比如需要8个字节,结果给了500个对象,但实际只用了50个,就存在浪费的情况
	//2、如果不需要size这个大小的内存需求,batchNum就会一直增大
	//3、size越小,获取到的batchNum越大
	//4、size越大,获取到的batchNum越小
	size_t batchNum = min(_freeList[index].MaxSize(), SizeClass::NumMoveSize(size));//取最小值
	//保证最小取到一个对象    要1个,给2个,1个给3个,依次递增,同拥塞控制
	if (_freeList[index].MaxSize() == batchNum)
	
		_freeList[index].MaxSize() += 1;
	
	//去central catch中获取batchNum个对象,有多少取多少
	void* start = nullptr;
	void* end = nullptr;
	//获取对象              不要忘记这里使用的是单例模式,注意调用方法
	size_t actualNum = CentralCatch::GetInstance()->FetchRangeObj(start, end, batchNum, size);
	assert(actualNum > 0);//确保申请成功
	if (actualNum == 1)
	
		assert(start == end);//一个的时候这两个相等
		return start;
	
	else
	
		_freeList[index<

以上是关于高并发内存池的主要内容,如果未能解决你的问题,请参考以下文章

C++实现的高并发内存池

C++实现的高并发内存池

高并发内存池

高并发内存池

高并发内存池

高并发内存池