实现高并发内存池

Posted qnbk

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了实现高并发内存池相关的知识,希望对你有一定的参考价值。

高并发内存池

什么是内存池

池化技术

所谓“池化技术”,就是程序先向系统申请过量的资源,然后自己管理以备不时之需。之所以要申请过量的资源,是因为每次申请该资源都有较大的开销,不如提前申请好,这样使用时就会变得非常快捷,大大提高程序运行效率。在计算机中,有很多使用“池”这种技术的地方,除了内存池,还有连接池、线程池、对象池等。
以服务器上的线程池为例,它的主要思想是:先启动若干数量的线程,让它们处于睡眠状态,当接收到客户端的请求时,唤醒池中某个睡眠的线程,让它来处理客户端的请求,当处理完这个请求,线程又进入睡眠状态。

内存池

内存池是指程序预先从操作系统申请一块足够大内存,此后,当程序中需要申请内存的时候,不是直接向操作系统申请,而是直接从内存池中获取;同理,当程序释放内存的时候,并不真正将内存返回给操作系统,而是返回内存池。当程序退出(或者特定时间)时,内存池才将之前申请的内存真正释放。

内存池主要解决的问题

内存池主要解决的当然是效率的问题,其次,作为系统的内存分配器的角度,还需要解决一下内存碎片的问题。

内存碎片


内存碎片分为外碎片和内碎片

  • 外部碎片是一些空闲的连续内存区域太小,这些内存空间不连续,以至于合计的内存足够,但是不能满足一些的内存分配申请需求。
  • 内部碎片是由于一些对齐的需求,导致分配出去的空间中一些内存无法被利用。

malloc

C/C++中我们要动态申请内存都是通过malloc去申请内存,实际我们不是直接去堆获取内存的。
而malloc就是一个内存池。malloc() 相当于向操作系统申请了一块较大的内存空间。当内存用完或程序有大量的内存需求时,再根据实际需求向操作系统“申请。

定长内存池

申请内存使用的是malloc,什么场景下都可以用,但是意味着什么场景下都不会有很高的性能,下面我们就先来设计一个定长内存池

ObjectPool.h

#pragma once
#include <iostream>
#include <vector>
#include <time.h>

using std::cout;
using std::endl;
//定长内存池

//template <size_t N>
//class ObjectPool
//;
#ifdef _WIN32
	#include <windows.h>
#else
	
#endif

inline static void* SystemAlloc(size_t kpage)//直接去堆上按页申请内存

#ifdef _WIN32
	void* ptr = VirtualAlloc(0, kpage<<13, MEM_COMMIT | MEM_RESERVE,
		PAGE_READWRITE);
#else
	// linux下brk mmap等
#endif
	if (ptr == nullptr)
		throw std::bad_alloc();
	return ptr;

template <class T>
class ObjectPool

public:
	T* New()
	
		T* obj = nullptr;
		if (_freeList)
		
			//优先把还回来的内存块再次重复利用
			void* next = (*(void**)_freeList);
			obj = (T*)_freeList;
			_freeList = next;
		
		else
		
			//剩余内存不够一个对象大小时,重新开大块空间
			if (remainBytes < sizeof(T))
			
				remainBytes = 128 * 1024 ;
				//_memory = (char*)malloc(remainBytes);
				_memory = (char*)SystemAlloc(remainBytes >> 13);
				if (_memory == nullptr)
				
					throw std::bad_alloc();
				
			
			obj = (T*)_memory;
			size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
			_memory += objSize;
			remainBytes -= objSize;			
		
		//定位new,显示调用T的构造函数初始化,对已有的空间初始化
		new(obj)T;

		return obj;

		
	
	void Delete(T* obj)
	
		
		//还回来
		
		//显示调用析构函数清理对象
		obj->~T();
		if (_freeList == nullptr)
		
			_freeList = obj;
			//*(int*)obj = nullptr;//前四个字节用来保存下一个内存的地址 把obj强转成int* 再解引用->int 获得此地址 64位下跑不了
			*(void**)obj = nullptr;//64位下解引用是void *,*(int**)也可以
		
		else
		
			//头插
			*(void**)obj = _freeList;
			_freeList = obj;
		
	
private:
	char* _memory = nullptr;//指向大块内存,char是一个字节,好切分内存
	size_t remainBytes = 0;//大块内存中剩余数
	void* _freeList = nullptr;//管理换回来的内存(链表)的头指针
	
;

高并发内存池整体框架

现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。
内存池需要考虑以下几方面的问题。

  1. 性能问题。
  2. 多线程环境下,锁竞争问题。
  3. 内存碎片问题。

concurrent memory pool:

  • thread cache:线程缓存是每个线程独有的,用于小于256KB的内存的分配,线程从这里申请内存不需要加锁,每个线程独享一个cache,这也就是这个并发线程池高效的地方。
  • central cache:中心缓存是所有线程所共享,thread cache是按需从central cache中获取的对象。central cache合适的时机回收thread cache中的对象,避免一个线程占用了太多的内存,而其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的目的。central cache是存在竞争的,所以从这里取内存对象是需要加锁,首先这里用的是桶锁,其次只有thread cache的没有内存对象时才会找central cache,所以这里竞争不会很激烈。
  • page cache:页缓存是在central cache缓存上面的一层缓存,存储的内存是以页为单位存储及分配的,central cache没有内存对象时,从page cache分配出一定数量的page,并切割成定长大小的小块内存,分配给central cache。当一个span的几个跨度页的对象都回收以后,page cache会回收central cache满足条件的span对象,并且合并相邻的页,组成更大的页,缓解内存碎片的问题。

thread cache

thread cache是哈希桶结构,每个桶是一个按桶位置映射大小的内存块对象的自由链表。每个线程都会有一个thread cache对象,这样每个线程在这里获取对象和释放对象时是无锁的。

自由链表的哈希桶跟对象大小的映射关系

class SizeClass//计算对象大小的对齐映射规则

public:
	// 整体控制在最多10%左右的内碎片浪费
	// [1,128] 8byte对齐       freelist[0,16)
	// [128+1,1024] 16byte对齐   freelist[16,72)
	// [1024+1,8*1024] 128byte对齐   freelist[72,128)
	// [8*1024+1,64*1024] 1024byte对齐     freelist[128,184)
	// [64*1024+1,256*1024] 8*1024byte对齐   freelist[184,208)
	static inline size_t _RoundUp(size_t bytes, size_t alignNum)//计算对齐数
	
		//size_t alignSize ;//对齐
		//if (size % 8 != 0)
		//
		//	alignSize = (size / alignNum + 1) * alignNum;
		//
		//else
		//
		//	alignSize = size;
		//
		//return alignSize;
		return ((bytes + alignNum - 1) & ~(alignNum - 1));
	
	static inline size_t RoundUp(size_t size)
	
		if (size <= 128)
		
			return _RoundUp(size, 8);
		
		else if (size <= 1024)
		
			return _RoundUp(size, 16);
		
		else if (size <= 8 * 1024)
		
			return _RoundUp(size, 128);
		
		else if (size <= 64 * 1024)
		
			return _RoundUp(size, 1024);
		
		else if (size <= 256 * 1024)
		
			return _RoundUp(size, 8 * 1024);
		
		else
		
			return _RoundUp(size, 1 << PAGE_SHIFT);
		
	
	

映射哪一个自由链表桶

static inline size_t _Index(size_t bytes, size_t alignNum)
	
		/*if (bytes % alignNum == 0)
		
		return bytes / alignNum - 1;
		
		else
		
		return bytes / alignNum;
		*/
		return ((bytes + (1 << alignNum) - 1) >> alignNum) - 1;
	
	// 计算映射的哪一个自由链表桶
	static inline size_t Index(size_t bytes)
	
		assert(bytes <= MAX_BYTES);
		// 每个区间有多少个链
		static int group_array[4] =  16, 56, 56, 56 ;
		if (bytes <= 128) 
			return _Index(bytes, 3);//8  2^3
		
		else if (bytes <= 1024) 
			return _Index(bytes - 128, 4) + group_array[0];//把前面128减掉,再加上前一个桶的数量
		
		else if (bytes <= 8 * 1024) 
			return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];
		
		else if (bytes <= 64 * 1024) 
			return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];
		
		else if (bytes <= 256 * 1024) 
			return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];
		
		else 
			assert(false);
		
		return -1;
	

申请内存:

  • 当内存申请size<=256KB,先获取到线程本地存储的thread cache对象,计算size映射的哈希桶自由链表下标i。
  • 如果自由链表_freeLists[i]中有对象,则直接Pop一个内存对象返回。
  • 如果_freeLists[i]中没有对象时,则批量从central cache中获取一定数量的对象,插入到自由链表并返回一个对象。

释放内存

  • .当释放内存小于256k时将内存释放回thread cache,计算size映射自由链表桶位置i,将对象Push到_freeLists[i]。
  • 当链表的长度过长,则回收一部分内存对象到central cache。

thread cache 设计

#pragma once
#include "Common.h"

class ThreadCache

public:
	//申请和释放对象
	void* Allocate(size_t size);
	void Deallocate(void* ptr, size_t size);
	//从中心缓存获取对象
	void* FetchFromCentralCache(size_t index, size_t size);

	void ListTooLong(FreeList& list, size_t size);//释放对象时,链表过长  ,回收内存到centrral cache
private:
	FreeList _freeLists[NFREELISTS];//哈希表,每个位置挂的都是_freeList
;

// TLS:在线程内全局可访问,但不能被其他线程访问到->保持数据的独立性,不需要锁控制,减少成本
static _declspec(thread) ThreadCache * pTLSThreadCache = nullptr;

central cache

central cache也是一个哈希桶结构(t桶锁),他的哈希桶的映射关系跟thread cache是一样的。不同的是他的每个哈希桶位置挂是SpanList链表结构,不过每个映射桶下面的span中的大内存块被按映射关系切成了一个个小内存块对象挂在span的自由链表中。


申请内存:

  • 当thread cache中没有内存时,就会批量向central cache申请一些内存对象,这里的批量获取对 象的数量使用了类似网络tcp协议拥塞控制的慢开始算法;central cache也有一个哈希映射的spanlist,spanlist中挂着span,从span中取出对象给thread cache,这个过程是需要加锁的,这里使用的是一个桶锁,尽可能提高效率。
  • central cache映射的spanlist中所有span的都没有内存以后,则需要向page cache申请一个新的span对象,拿到span以后将span管理的内存按大小切好作为自由链表链接到一起。然后从span中取对象给thread cache
  • central cache的中挂的span中use_count记录分配了多少个对象出去,分配一个对象给threadcache,就++use_count

释放内存

  • 当thread_cache过长或者线程销毁,则会将内存释放回central cache中的,释放回来时–use_count。当use_count减到0时则表示所有对象都回到了span,则将span释放回page cache,page cache中会对前后相邻的空闲页进行合并。

以页为单位的大内存管理span的定义及spanlist定义

struct Span//管理多个连续大块内存跨度结构

	PAGE_ID _pageId = 0;//大块内存起始页号
	size_t n = 0;//页的数量
	Span* _next = nullptr;//双向链表
	Span* _prev = nullptr;//双向链表

	size_t _objSize = 0;//切好的小对象的大小
	size_t _useCount = 0;//切好的小块内存,被分给thread cache计数
	void* _freeList = nullptr;//切好的小块内存自由链表
	bool _isUse = false;//是否被使用

;
class SpanList//带头双向链表

public:
	SpanList()
	
		_head = new Span;
		_head->_next = _head;
		_head->_prev = _head;
	

	Span* Begin()
	
		return _head->_next;
	
	Span* End()
	
		return _head;
	
	bool Empty()
	
		//cout << "heool spanlist empty" << endl;
		return _head->_next == _head;
	
	void PushFront(Span* span)
	
		//cout << "hello common pushfront" << endl;
		Insert(Begin(), span);
	
	Span* PopFront()
	
		//cout << "hello commom popfront" << endl;
		Span* front = _head->_next;
		Erase(front);
		return front;
	
	void Insert(Span* pos, Span* newSpan)
	
		//cout << "hello commom insert" << endl;
		assert(pos);
		assert(newSpan);
		Span* prev = pos->_prev;
		prev->_next = newSpan;
		newSpan->_prev = prev;
		newSpan->_next = pos;
		pos->_prev = newSpan;
	
	void Erase(Span* pos)
	
		assert(pos);
		assert(pos != _head);

		Span* prev = pos->_prev;
		Span* next = pos->_next;
		prev->_next = next;
		next->_prev = prev;

	
private:
	Span* _head;
public:

	std::mutex _mtx;//桶锁

;

central cache整体设计

#pragma once
#include "Common.h"

//单例模式
class CentralCache

public:
	static CentralCache* GetInstance()
	
		return &_sInst;
	

	// 获取一个非空的span
	Span* GetOneSpan(SpanList& list, size_t byte_size);

	// 从中心缓存获取一定数量的对象给thread cache
	size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);

	// 将一定数量的对象释放到span跨度
	void ReleaseListToSpans(void* start, size_t byte_size);

private:
	SpanList _spanLists[NFREELISTS];//在ThreadCache是几号桶,在CentralCache就是几号桶
private:
	CentralCache() //把构造函数放在私有:别人不能创建对象
	
	CentralCache(const CentralCache&) = delete;
	static CentralCache _sInst;

;

page cache


申请内存:

  • 当central cache向page cache申请内存时,page cache先检查对应位置有没有span,如果没有则向更大页寻找一个span,如果找到则分裂成两个。比如:申请的是4页page,4页page后面没有挂span,则向后面寻找更大的span,假设在10页page位置找到一个span,则将10页page span分裂为一个4页pagespan和一个6页page span
  • 如果找到_spanList[128]都没有合适的span,则向系统使用mmap、brk或者是VirtualAlloc等方式申请128页page span挂在自由链表中,再重复1中的过程。
  • 需要注意的是central cache和page cache 的核心结构都是spanlist的哈希桶,但是他们是有本质区别的,central cache中哈希桶,是按跟thread cache一样的大小对齐关系映射的,他的spanlist中挂的span中的内存都被按映射关系切好链接成小块内存的自由链表。而page cache 中的spanlist则是按下标桶号映射的,也就是说第i号桶中挂的span都是i页内存。

释放内存:

  • 如果central cache释放回一个span,则依次寻找span的前后page id的没有在使用的空闲span,看是否可以合并,如果合并继续向前寻找。这样就可以将切小的内存合并收缩成大的span,减少内存碎片
  • 如果Central Cache中的span usecount=0说明切分给 thread cache的小块内存都回来了则Central Cache 把这个span还给page cache,page cache通过页号查看前后相邻页是否空闲,是就合并出更大的页

整体设计

#pragma once
#include "Common.h"
#include "ObjectPool.h"

class PageCache

public:
	static PageCache* GetInstance()
	
		//cout << "hello Page cache getinstance" << endl;
		return &_sInst;
	
	Span* MapObjectToSpan(void* obj);//获取对象到span的映射

	Span* NewSpan(size_t k);//获取一个k页的span

	void ReleaseSpanToPageCache(Span* span);//释放空闲span,合并相邻的span
	std::mutex _pageMtx;
private:
	SpanList _spanList[NPAGES];
	ObjectPool<Span> spanPool;
	std::unordered_map<PAGE_ID,Span*> _idSpanMap;//页号跟span的映射
	PageCache() 
	PageCache(const PageCache&) = delete;
	static PageCache _sInst;
;

代码总体实现

ObjectPool.h

#pragma once
#pragma once
#include <iostream>
#include <vector>
#include <time.h>
#include "Common.h"
using std::cout;
using std::endl;
//定长内存池

//template <size_t N>
//class ObjectPool
//;

/*
#ifdef _WIN32
#include <windows.h>
#else

#endif
inline static void* SystemAlloc(size_t kpage)//直接去堆上按页申请内存

#ifdef _WIN32
	void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE,
		PAGE_READWRITE);
#else
	// linux下brk mmap等
#endif
	if (ptr == nullptr)
		throw std::bad_alloc();
	return ptr;

*/
template <class T>
class ObjectPool

public:
	T* New()
	
		T* obj = nullptr;
		if (_freeList)
		
			//优先把还回来的内存块再次重复利用
			void* next = (*(void**)_freeList);
			obj = (T*)_freeList;
			_freeList = next;
			
		
		else
		
			//剩余内存不够一个对象大小时,重新开大块空间
			if (remainBytes < sizeof(T))
			
				remainBytes = 128 * 1024;
				//_memory = (char*)malloc(remainBytes);
				_memory = (char*)SystemAlloc(remainBytes >> 13);
				if (_memory =

以上是关于实现高并发内存池的主要内容,如果未能解决你的问题,请参考以下文章

C++实现的高并发内存池

C++实现的高并发内存池

高并发内存池——基于Google开源项目tcmalloc的简洁实现

C++从零实现一个高并发内存池

高并发内存池

高并发内存池