高并发内存池

Posted 世_生

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了高并发内存池相关的知识,希望对你有一定的参考价值。

从零实现一个高并发内存池

高并发内存池整体框架设计

现在很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc本身就已经很优秀了,那么我们的项目原型tcmalloc就是在多线程高并发的场景下更胜一筹,所以这次我们实现的内存池要考虑几个方面。

  • 性能问题
  • 在多线程下,锁竞争问题
  • 内存碎片问题

concurrent memory pool主要由下面3个部分组成:

  1. Thread Cahce:每个线程都要自己独立的线程缓存,用于小于256KB的内存分配在这里申请内存不需要加锁
  2. Central Cache:程序中只要一个中心缓存,多个线程共享。thread cache是从这里按需获取对象。central cache具有回收机制,当一个线程中thread cache占用太多内存时,central cache会回收内存,避免其他线程对内存吃紧,达到了内存在分配多个线程时更均衡的按需调度的目的。central cache是多个线程共享的,所以需要锁竞争。但central cache是用的桶锁,所以竞争不是很激烈
  3. Paga Cache:存储的内存是以页为单位存储及分配给central cache,当central cache没有对象是,就从paga cache按需拿一定数量的paga,并切割成固定大小的小内存块给central cache。pagan cache也有回收机制,paga cache会回收已经满足条件的central cache中的span对象,并且合并相邻的页,合成一个大页,缓解了内存碎片的问题

申请内存部分

高并发内存池-Thread Cache

thread cache是哈希桶结构,每个桶的位置映射的是内存块对象的自由链表。每个线程都会有一个thread cache对象,在这个对象中获取和释放都是无锁的。


申请内存:

  1. 当内存申请size<=256KB,先获取到线程本地的thread cache对象,计算size映射的哈希桶自由链表的下标。
  2. 如果自由链表中有对象,则直接获取一个内存对象返回
  3. 如果自由链表在没有对象,则去central cache中获取一定数量的对象,插入到自由链表中

框架:

管理小对象的自由链表
static const NFREE_LIST=208;
static const MAX_SIZE=256*1024;
static const PAGE_SHIFT=13;
static const PAGA_LIST=129;

#ifdef _WIN64
typedef size_t PageID;
#elif _WIN32
typedef unsigned long long PageID;

#endif

inline static void* SystemAlloc(size_t kpage)

#ifdef _WIN32
	void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
	// linux下brk mmap等
#endif

	if (ptr == nullptr)
		throw std::bad_alloc();

	return ptr;



class FreeList

public:
	FreeList():_freelist(nullptr),_count(0),_size(1)
	
	~FreeList()
	//插入
	void Push(void* obj)
		assert(obj);
		(*(void**))obj=_freelist;
		_freelist=obj;
		//每次还回来一个要++
		_count++;
	
	//插入一段内存块
	void PushRange(void* start,void* end,size_t sum)
		assert(start&&end);
		*((void**)end)=_freelist;
		_freelist=start;
		_count+=sum;
	
	
	//删除一段内存块
	void PopRange(void*& start,void*& end,size_t sum)
	
		start=_freelist;
		end=start;
		for(size_t i=0;i<sum-1;i++)
			end=*(void**)end;
		
		_freelist=*(void**)end;
		*(void**)end=nullptr;
		_count-=sum;
	
	//删除
	void* Pop()
		assert(_freelist);
		void*obj=_freelist;
		_freelist=(*(void**))_freelist;
		(*(void**))obj=nullptr;
		//每次拿走都要--
		_count--;
		return obj;
	
	//判空
	bool Empty()
		return _freelist==nullptr;
	
	size_t Count()
		return _count;
	
	size_t& MaxSize()
		return _size;
	
private:
	void* _freelist;//指向自由链表的表头
	size_t _count;//该自由链表中节点的数量
	size_t _size;//用于慢增长(后面会讲)
;
对齐规则
我们最小都要用8字节对齐,因为我们要用链表把这些小内存块连接起来。当字节<=256KB时,就向thread cache申请。
如果我们都用8字节对齐,那么桶的个数就是32768个桶,太多了,没有必要。
所以、我们采用
整体控制在最多10%左右的内碎片浪费
[1,128]					8byte对齐	    freelist[0,16) 		16个桶
[128+1,1024]			16byte对齐	    freelist[16,72)		56个桶
[1024+1,8*1024]			128byte对齐	    freelist[72,128)	56个桶
[8*1024+1,64*1024]		1024byte对齐     freelist[128,184)	56个桶
[64*1024+1,256*1024]	8*1024byte对齐   freelist[184,208)	24个桶
总共 208个桶
例如:当我们需要9字节的内存时,thread cache给我们16字节
	 当我们需要129字节的内存时,threadcache给我们144字节
造成的内碎片控制在10%左右,且内碎片可以被下次申请时利用。
class SizeClass

public:
	普通人写法
	static inline size_t _RoundUp(size_t size,size_t alignNum)
		size_t align;
		if(size%alignNum!=0)
			align=(size/alignNum+1)*alignNum;
			return align;
		
		else
			align=size;
		
		return align;
	
	大佬写法
	static inline size_t _RoundUp(size_t size, size_t alignNum)
	
		return ((size+ alignNum - 1) & ~(alignNum - 1));
	
	多次调用的函数使用静态,该函数使用内联
	static inline size_t RoundUp(size_t size)
		if(size<=128)
			return _RoundUp(size,8);
		
		else if(size>128 && size<=1024)
			return _RoundUp(size,16);
		
		else if(size>1024 && size<=8*1024)
			return _RoundUp(size,128);
		
		else if(size>8*1024 && size<=64*1024)
			return _RoundUp(size,1024);
		
		else if(size>64*1024 && size<=256*1024)
			return _RoundUp(size,8*1024);
		
		//大于256KB,也按8*1024对齐
		else if(size>256*1024)
			return _RoundUp(size,8*1024);
		
		else
			assert(false);
			return -1;
		
	
	
	普通人写法
	static inline size_t _Index(size_t size,size_t alignNum)
		if(size % alignNum==0)
			return size/alignNum-1;
		
		else
			return size/alignNum;
		
	
	大佬写法
	static inline size_t _Index(size_t size, size_t alignNum)
	
		return ((size + (1 << alignNum) - 1) >> alignNum) - 1;
	
	映射的桶号
	static inline size_t Index(size_t size)
		assert(size<=MAX_SIZE);
		static int group_array[4]=16,56,56,56;
		if (size <= 128)
			return _Index(size, 3);
		
		else if (size <= 1024)
			return _Index(size - 128, 4) + group_array[0];
		
		else if (size <= 8 * 1024)
			return _Index(size - 1024, 7) + group_array[1] + group_array[0];
		
		else if (size <= 64 * 1024)
			return _Index(size - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];
		
		else if (size <= 256 * 1024)
			return _Index(size - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];
		
		else
			assert(false);
			return -1;
		
	
	
	static size_t NumMoveSize(size_t size)
	
		assert(size > 0);

		// [2, 512],一次批量移动多少个对象的(慢启动)上限值
		// 小对象一次批量上限高
		// 小对象一次批量上限低
		int num = MAX_BYTES / size;
		if (num < 2)
			num = 2;

		if (num > 512)
			num = 512;

		return num;
	
	//计算要申请的页的多少
	static size_t NumMovePaga(size_t size)
	
		size_t num = NumMoveSize(size);
		size_t npage = num*size;

		npage >>= PAGE_SHIFT;
		if (npage == 0)
			npage = 1;

		return npage;
		
;
class ThreadCache

public:
	//申请
	void* Allocate(size_t size);
	//thread cache中不够去central cache中去申请
	void* FetchFromCentralCache(size_t index, size_t alignsize);
	
private:
	FreeList freelist[NFREE_LIST];
;
每个线程获取自己独立的Thead Cache
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;

Allocate:

void* Allocate(size_t size)

	assert(size<=MAX_SIZE);
	//计算对齐数,和桶号
	size_t alignsize=RoundUp(size);
	size_t index=Index(size);
	if(freelist[index].Empty())
		void* obj;
		obj=freelist[index].Pop();
		return obj;
	
	else
		return FetchFromCentralCache(index,alignsize);
	

FetchFromCentralCache:

在central cache中获取内存块
在这里我们使用慢增长的方式,每一次去central cache中获取内存时,size++。
好处:当我们大量使用这个桶中的内存时,我们不是每次都在central cache中获取1个,而是每次+1,这样就减少了去central cache中的次数,最大限度是512
FetchFromCentralCache(size_t index,size_t alignsize)

	assert(index<NFREE_LIST);
	size_t batchsum = min(freelist[index].MaxSize(),SizeClass::NumMoveSize(size));
	if(batchsum==freelist[index].MaxSize())
		freelist[index].MaxSize()+=1:
	
	//输出型参数
	void* start=nullptr;
	void* end=nullptr;
	//从central cache中获取
	size_t sum=GetNumCache(start,end,batchsum,alignsize);
	assert(sum>0);
	if(sum==1)
		assert(start==end);
		return start;
	
	else
		//拿到了sum块大小为alignsize的内存块,连接起来,我们只需要一块
		freelist[index].PushRange(*(void**)start, end,sum-1);
	
	return start;

高并发内存池-Central Cache

central cache也是哈希桶的结构,和thread cache的映射关系一样,都是208个桶。不同的是central cache的每个哈希桶的位置都是SpanList链表结构,每个映射桶下面的Span中的大块内存都被切分成了小块内存通过自由链表连接起来。

框架:

描述Span
class Span

	Span* next=nullptr;
	Span* prev=nullptr;
	//页号
	size_t spanId=0;
	//页的数量
	size_t pagaSum=0;
	void* _freelist=nullptr;
	//对象使用页块的个数
	size_t _usecount=0;
	//该Span的状态
	bool state=false;
	//对象的大小
	size_t objSize=0;
;

class SpanList

public:
	SpanList()
	
		_head=new Span();
		_head->next=_head;
		_head->prev=_head;
	
	Span* Begin()
	
		return _head->next;
	
	Span* End()
	
		return _head;
	
	bool Empty()
	
		return _head->nex==_heaed;
	
	void Insert(Span* pos,Span* obj)
	
		assert(obj&&pos);
		Span* cur = pos->prev;
		cur->next = obj;
		pos->prev = obj;
	
	void Etase(Span* obj)
	
		assert(obj);
		assert(obj != _head);
		Span* cur = obj->prev;
		Span* pos = obj->next;

		cur->next = pos;
		pos->prev = cur;
	
	void PushFront(Span* obj)
	
		Insert(Begin(),obj);
	
	void* PopFront()
	
		Span* front=_head->next;
		assert(front!=_head);
		Etase(front);
		return front;
	
private:
	Span* _head;
public:
	std::mutex mtx;//桶锁
;
多个线程共享,则只能有一个对象,采用单例模式-饿汉模式
class CentralCache

public:
	static CentralCache* GetObjCen()
	
		return &_Inst;
	
	//获取一个非空的Span
	Span* GetOneSpan(SpanList& list,size_t size);
	//从central中获取一段数量的内存
	size_t GetNumCache(void*&start,void*&end,size_t batchsum,size_t size);
	
private:
	CentralCache();
	CentralCache(const CentralCache& c)=delete;
public:
	SpanList _spanlist[NFREE_LIST];
private:
	static CentralCache _Inst;
;
初始化
CentralCache CentralCache::_inst;

GetNumCache:


size_t GetNumCache(void*以上是关于高并发内存池的主要内容,如果未能解决你的问题,请参考以下文章

高并发内存池

高并发内存池

高并发内存池

java高并发之线程池

Java高并发之线程池详解

Java高并发之线程池详解