C++实现的高并发内存池

Posted 任我驰骋.

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了C++实现的高并发内存池相关的知识,希望对你有一定的参考价值。

高并发内存池

一、为什么设计高并发内存池

C/C++下内存管理是件很头疼的事,分配足够的内存、追踪内存的分配、在不需要的时候释放内存—这个任务很复杂。如果直接使用系统调用malloc/free、new/delete进行内存分配和释放,则会有以下弊端:

  1. 调用malloc/new,系统需要根据“最先匹配”、“最优匹配”或其他算法在内存空闲块表中查找一块空闲内存,调用free/delete,系统可能需要合并空闲内存块,这些会产生额外开销;
  2. 频繁使用时会产生大量内存碎片,从而降低程序运行效率;
  3. 容易造成内存泄漏;

内存池(memory pool)是代替直接调用malloc/free、new/delete进行内存管理的常用方法,当我们申请内存空间时,首先到我们的内存池中查找合适的内存块,而不是直接向操作系统申请,优势在于:

  1. 比malloc/free进行内存申请/释放的方式快;
  2. 不会产生或很少产生堆碎片;
  3. 可避免内存泄漏;

二、高并发内存池整体框架设计

现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc本身其实已经很优秀了,但是我们的项目原型tcmalloc在多线程高并发的场景下更胜一筹,所以这次我们实现的内存池要考虑如下几个方面。

1.性能问题
2.多线程环境下,内存申请和内存释放所引起的锁竞争问题
3.内存碎片问题

Concurrent Memory pool主要由以下3个部分构成:

  1. Thread Cache:线程缓存每个线程独有,可以用来分配小于256KB的内存,每个线程独享一个cache,这也是这个线程池高效的地方。thread cache中是一个哈希桶挂自由链表的结构,当我们线程需要内存时,我们就从按大小分配好的自由链表中截取内存块给到线程,当自由链表中内存块不够时,我们就会向下一层获取。如果我们一次获取内存块多于我们一个链表限定的最大个数,我们就会将多出来的部分返还给central cache。
  2. central cache:中心缓存所有线程共享,当我们的thread cache没有内存块时会向central cache申请,central cahce也是一个哈希桶的结构,不过它挂载的是Span List链表结构,Span List中则有一个个相同大小的内存块按照哈希桶的映射后,通过双向链表的形式挂在相应的Span上,Span是一个以页为单位的大块内存对象,每一页都固定好相同大小的内存块随时准备分配给thread cache,因为我们所有的thread cache都会访问一个central cache,所以需要加锁,thread cache是按需从central cache获取对象,先去非空闲的span上查看,有足够的内存则直接从span上获取,当对应的span list中所有span都没有内存以后则会向下一层page cache申请内存。
  3. page cache:存储的内存以page为单位存储以及分配的,当central cache没有内存对象时,我们从page cache分配出一定数量的page,并切割成定长大小的小块内存,分配给central cache,当一个span的几个跨度页的对象都收回以后,page cache会回收central cache满足条件的span对象,合并相邻的页,组成更大的页,这样缓解内存碎片问题,而当我们page cache中也没有内存时,则会去调用系统调用,给最大的一个页补充内存256KB,随后分割给需要的central cache,剩下的挂在相应页数的哈希桶上。

三、内部细节构成介绍

1.Thread Cache

#pragma once

#include "Common.h"

class ThreadCache

public:
	// 申请和释放内存对象
	void* Allocate(size_t size);
	void Deallocate(void* ptr, size_t size);

	// 从中心缓存获取对象
	void* FetchFromCentralCache(size_t index, size_t size);

	// 释放对象时,链表过长时,回收内存回到中心缓存
	void ListTooLong(FreeList& list, size_t size);
private:
	FreeList _freeLists[NFREELIST];
;

// TLS thread local storage
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;

Thread Cache是哈希桶结构,每个桶是一个按桶位置映射大小的内存块对象的自由链表。每个线程都会有一个Thread Cache对象,这样每个线程在这里获取对象和释放对象时是无锁的,大大提高了内存池的效率。

不分段的话若全是按8字节分,则会需要32768个桶,但但是每个桶都是16Byte则会差生较多的内存碎片。因此这里采用了不同段的内存使用不同的内存对齐规则,既控制了桶的数量不会太多,又整体将内存碎片浪费控制在10%左右

对齐和映射相关函数的编写:
此时有了字节数的对齐规则后,我们就需要提供两个对应的函数,分别用于获取某一字节数对齐后的字节数,以及该字节数对应的哈希桶下标。关于处理对齐和映射的函数,我们可以将其封装到一个类当中。

//管理对齐和映射等关系
class SizeClass

public:
	//获取向上对齐后的字节数
	static inline size_t RoundUp(size_t bytes);
	//获取对应哈希桶的下标
	static inline size_t Index(size_t bytes);
;

需要注意的是,SizeClass类当中的成员函数最好设置为静态成员函数,否则我们在调用这些函数时就需要通过对象去调用,并且对于这些可能会频繁调用的函数,可以考虑将其设置为内联函数。

对齐映射规则:

static inline size_t _RoundUp(size_t bytes, size_t alignNum)
	
		return ((bytes + alignNum - 1) & ~(alignNum - 1));
	
	//内联函数:调用频繁,因此写成内联函数
	//向上对齐
	static inline size_t RoundUp(size_t size)
	
		if (size <= 128)
		
			return _RoundUp(size, 8);
		
		else if (size <= 1024)
		
			return _RoundUp(size, 16);
		
		else if (size <= 8*1024)
		
			return _RoundUp(size, 128);
		
		else if (size <= 64*1024)
		
			return _RoundUp(size, 1024);
		
		else if (size <= 256 * 1024)
		
			return _RoundUp(size, 8*1024);
		
		else//size > 256 * 1024 byte 
		
			return _RoundUp(size, 1<<PAGE_SHIFT);
		
	

	static inline size_t _Index(size_t bytes, size_t align_shift)
	
		return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
	
	//计算映射的哪一个自由链表桶中
	static inline size_t Index(size_t bytes)
	
		assert(bytes <= MAX_BYTES);
		static int group_array[4] =  16, 56, 56, 56 ;
		if (bytes <= 128)
		
			return _Index(bytes, 3);
		
		else if (bytes <= 1024)
		
			return _Index(bytes - 128, 4) + group_array[0];
		
		else if (bytes <= 8 * 1024)
		
			return _Index(bytes - 1024, 7) + group_array[0] + group_array[1];
		
		else if (bytes <= 64 * 1024)
		
			return _Index(bytes - 8 * 1024, 10) + group_array[0] + group_array[1] + group_array[2];
		
		else if (bytes <= 256 * 1024)
		
			return _Index(bytes - 64 * 1024, 13) + group_array[0] + group_array[1] + group_array[2] + group_array[3];
		
		else
		
			assert(false);
			return -1;
		
	

申请内存:

  1. 当申请内存size <=256k时,在Thread Cache中申请内存,通过size计算出自由链表中的桶的位置,如果自由链表对应的桶中有内存,则直接从Free List[i]的首部取出一块内存,因为这里使用的是哈希映射,并且没有锁竞争,因此时间复杂度位O(1)。
  2. 当Free List中没有对象时,则会批量的从Central Cache中获取一定数量的内存,返回一个内存并将之前批量申请的剩余的内存挂到对应free list桶中。

释放内存:
当FreeList[i]中没有对象时,则批量从Central cache中获取一定数量的对象,剩余的n-1个对象插入到自由链表并返回一 个对象。

2.Central Cache

  1. Central Cache本质是由一个哈希映射的span对象自由双向链表构成
  2. 为了保证全局只有唯一的Central Cache,这个类因此可以被设计称单例模式(这里使用的是饿汉模式)
    饿汉模式:构造函数私有,对象设为静态私有。拷贝构造和赋值重载设为delete(防拷贝)

Central Cache.h

#pragma once

#include "Common.h"

// 单例模式
class CentralCache

public:
	static CentralCache* GetInstance()
	
		return &_sInst;
	

	// 获取一个非空的span
	Span* GetOneSpan(SpanList& list, size_t byte_size);

	// 从中心缓存获取一定数量的对象给thread cache
	size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);

	// 将一定数量的对象释放到span跨度
	void ReleaseListToSpans(void* start, size_t byte_size);
	
private:
	SpanList _spanLists[NFREELIST];

private:
	CentralCache()
	

	CentralCache(const CentralCache&) = delete;

	static CentralCache _sInst;
;

span对象:一个由多个页组成的内存块,每页大小是8K。

//管理多个连续页的大块内存跨度结构
struct Span

	PAGE_ID _pageId = 0;//大块内存起始页的页号
	size_t _n = 0;//页的数量

	Span* _next = nullptr;//双向链表的结构
	Span* _prev = nullptr;

	size_t _objSize = 0;//切好的小对象的大小
	size_t _usecount = 0;//切好的小块内存,被分配给thread cache的计数
	void* _freeList = nullptr;//切好的小块内存的自由链表

	bool _isUse = false;//是否在被使用
;

SpanList:一个双向链表,插入删除效率较高

//带头双向循环链表
class SpanList

public:
	SpanList()
	
		_head = new Span;
		_head->_next = _head;
		_head->_prev = _head;
	

	void Insert(Span* pos, Span* newSpan)
	
		assert(pos);
		assert(newSpan);
		Span* prev = pos->_prev;
		prev->_next = newSpan;
		newSpan->_prev = prev;
		newSpan->_next = pos;
		pos->_prev = newSpan;
	

	Span* Begin()
	
		return _head->_next;
	

	Span* End()
	
		return _head;
	

	bool Empty()
	
		return _head->_next == _head;
	
	void PushFront(Span* span)
	
		Insert(Begin(), span);
	

	Span* PopFront()
	
		Span* front = _head->_next;
		Erase(front);
		return front;
	
	void Erase(Span* pos)
	
		assert(pos);
		assert(pos != _head);

		Span* prev = pos->_prev;
		Span* next = pos->_next;
		prev->_next = next;
		next->_prev = prev;
	
public:
	std::mutex _mtx;//桶锁
private:
	Span* _head;
;


Central Cache申请内存:

  1. 当thread cache中没有内存时,就会批量向central cache申请一些内存对象,这里的批量获取对象的数量使用了类似网络tcp协议拥塞控制的慢开始算法;central cache也有一个哈希映射的spanlist,spanlist中挂着span对象,从span中取出对象给ThreadCache,这个过程是需要加锁的,这里使用的是一个桶锁,尽可能提高效率(桶锁限制的只是当两个线程取相同的size的内存块才会发生线程锁竞争)。
  2. Central Cache映射的spanlist中所有span对象都没有内存以后,则需要向PageCache申请一个新的span对象,拿到span对象之后,将span管理的内存块按大小切好作为自由链表连接到一起,然后从span中取对象返回给ThreadCache。
  3. Central Cache的中挂的span中use_count记录了该span借给ThreadCache多少个对象出去,每借出一个use_count++。当这个span的使用计数为0,说明这个span所有的内存对象都是空闲的,然后将它交给Page Cache合并成更大的页,减少内存碎片(外碎片)。起到承上启下的作用。

Central Cache释放内存:
当thread cache过长或者线程销毁,则会将内存释放回Central cache中,每释放一个内存对象,检查该内存所在的span使用计数是否为空,释放回来一个时–use_count。

当use_count减到0时则表示所有对象都回到了span,则将span释放回Page cache,在Page cache中会对前后相邻的空闲页进行合并。

如何将Thread Cache中的内存对象回收到Central Cache中呢?
实际上每当Page Cache借出一定数量页的span时,就会对该span中的页号和该span一 一建立映射,并且将剩余的span对象重新挂到其对应的的桶上,并将起始页和尾页与剩余的span建立映射,以确保往后回收的时候,可以进行前后页合并。达到了内存分配在多个线程中更均衡的按需调度的目的。

3.Page Cache

申请内存:

  1. 当central cache向page cache申请内存时,page cache先检查对应位置没有span,如果没有则向更大页寻找一个span,如果找到则分裂成两个。比如:申请的是4页page,4页page后面没有挂span,则向后面寻找更大的span,假设在10页page位置找到一个span,则将10页pagespan分裂为一个4页page span和一个6页page span。
  2. 如果找到_spanlist[128]都没有合适的span,则向系统使用mmap、brk或者是VirtualAlloc等方式申请128页page span挂在自由链表中,再重复1中的过程。
  3. 需要注意的是central cache和page cache的核心结构都是spanlist的哈希桶,但是他们是有本质区别的,central cache中哈希桶,是按跟thread cache一样的大小对齐关系映射的,它的spanlist中挂的span中的内存都被按映射关系切好链接成小块内存的自由链表

释放内存:

  1. 如果central cache释放回一个span,则依次寻找span的前后page id的没有在使用的空闲span,看是否可以合并,如果合并继续向前寻找。这样就可以将切小的内存合并收缩成大的span,减少内存碎片。

    PageCache.h
#pragma once

#include "Common.h"
#include "ObjectPool.h"
#include "PageMap.h"

class PageCache

public:
	static PageCache* GetInstance()
	
		return &_sInst;
	

	// 获取从对象到span的映射
	Span* MapObjectToSpan(void* obj);

	// 释放空闲span回到Pagecache,并合并相邻的span
	void ReleaseSpanToPageCache(Span* span);

	// 获取一个K页的span
	Span* NewSpan(size_t k);

	std::mutex _pageMtx;
private:
	SpanList _spanLists[NPAGES];
	ObjectPool<Span> _spanPool;

	//std::unordered_map<PAGE_ID, Span*> _idSpanMap;
	//std::map<PAGE_ID, Span*> _idSpanMap;
	TCMalloc_PageMap1<32 - PAGE_SHIFT> _idSpanMap;

	PageCache()
	
	PageCache(const PageCache&) = delete;


	static PageCache _sInst;
;

windows和Linux下如何直接向堆申请页为单位的大块内存:
VirtualAlloc
brk和mmap

inline static void* SystemAlloc(size_t kpage) 

#ifdef _WIN32
	 void* ptr = VirtualAlloc(0, kpage*(1 << PAGE_SHIFT),
	 	MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
	 // brk mmap等
#endif

	 if (ptr == nullptr)
	 	throw std::bad_alloc();
	 	
	 return ptr; 


inline static void SystemFree(void* ptr) 

#ifdef _WIN32
	 VirtualFree(ptr, 0, MEM_RELEASE);
#else
	 // sbrk unmmap等
 #endif

4.申请内存流程

Linux平台:
使用brk或mmap向系统直接申请堆内存

申请小于128k的内存,使用brk分配内存,将_edata往高地址推(只分配虚拟空间,不对应物理内存(因此没有初始化),第一次读/写数据时,引起内核缺页中断,内核才分配对应的物理内存,然后虚拟地址空间建立映射关系)。

申请大于128k的内存,使用mmap分配内存,在堆和栈之间找一块空闲内存分配(对应独立内存,而且初始化为0),Windows平台下使用VirtualAlloc向系统申请和释放堆内存

//直接去堆上按页申请空间
inline static void* SystemAlloc(size_t kpage)

#ifdef _WIN32
	void* ptr = VirtualAlloc(0, kpage * (1 << 13), MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);//kpage是字节大小
#else
	// linux下brk mmap等
#endif

	if (ptr == nullptr)
		throw std::bad_alloc();

	return ptr;


inline static void SystemFree(void* ptr)

#ifdef _WIN32
	VirtualFree(ptr, 0, MEM_RELEASE);
#else
	//sbrk  unmmap等
#endif // _WIN32


我们的高并发内存池项目对外仅提供两个接口,对于申请内存的接口是ConcurrentAlloc()

  1. 如果申请的内存超过256 K(32页),则直接会越过Thread Cache去调用Page Cache的NewSpan()函数,NewSpan的功能是获取一个K页的span对象,这里有两种情况,第一种情况是申请的页数介于32页和128页之间,若PageCache里面有,则向PageCache索要,若索要的页数所在的spanList链表为空则向后寻找更大的页,若一直没有相匹配的页,则最终会向堆申请128页,然后进行切分,返回你申请的那一部分内存。第二种情况是申请的页数大于128页,这种情况会直接调用VirtualAlloc()向系统申请内存。
  2. 如果申请的内存小于256 K,则会先在线程独享的ThreadCache里所提供的Allocate()接口,计算出要找哪一个索引下标index的freelist,如果有则直接返回。没有就会调FetchFromCentralCache接口,通过你要的内存size计算出需要返回的批量个数(慢启动方式),并且调用CentralCache的FetchRangeObj()计算出Central Cache中心缓存实际真正能给你返回的数量,这时就得调用Central Cache 中的GetOneSpan()接口在获取一个非空的SpanCentral Cache ,如果Central Cache中对应的SpanList[i]中没有空闲的Span或者内存被用完了,这时就找Page Cache要,通过调用Page Cache中的New Span接口来获取一个K页的span对象,计算索引看Page Cache中是否有合适的页,如果没有则需要往后找更大的页,如果一直没找到,就只能向系统申请一个128页的内存了,然后进行切分,由于Central Cache中的span都是已经切好的,因此在返回这K页的内存时,需要提前按照之前计算字节向上对齐的大小切好,然后再返回给Central Cache,然后计算出Central Cache中心缓存实际真正能给你返回的数量。然后返回的内存块挂接到Thread Cache的freeList[i]上,取出一个内存块返回给用户。

5.释放内存流程

threadcache回收内存:

当某个线程申请的对象不用了,可以将其释放给thread cache,然后thread cache将该对象插入到对应哈希桶的自由链表当中即可。

但是随着线程不断的释放,对应自由链表的长度也会越来越长,这些内存堆积在一个thread cache中就是一种浪费,我们应该将这些内存还给central cache,这样一来,这些内存对其他线程来说也是可申请的,因此当thread cache某个桶当中的自由链表太长时我们可以进行一些处理。

如果thread cache某个桶当中自由链表的长度超过它一次批量向central cache申请的对象个数,那么此时我们就要把该自由链表当中的这些对象还给central cache。
  
centralcache回收内存:

当thread cache中某个自由链表太长时,会将自由链表当中的这些对象还给central cache中的span。

但是需要注意的是,还给central cache的这些对象不一定都是属于同一个span的。我们需要通过C++当中的unordered_map通过映射关系找到span的页号然后进行内存归还。

在thread cache还对象给central cache的过程中,如果central cache中某个span的_useCount减到0时,说明这个span分配出去的对象全部都还回来了,那么此时就可以将这个span再进一步还给page cache。

并且在central cache还span给page cache时也存在锁的问题,此时需要先将central cache中对应的桶锁解掉,然后再加上page cache的大锁之后才能进入page cache进行相关操作,当处理完毕回到central cache时,除了将page cache的大锁解掉,还需要立刻获得central cache对应的桶锁,然后将还未还完对象继续还给central cache中对应的span。

pagecache回收内存:

如果central cache中有某个span的_useCount减到0了,那么central cache就需要将这个span还给page cache了。

这个过程看似是非常简单的,page cache只需将还回来的span挂到对应的哈希桶上就行了。但实际为了缓解内存碎片的问题,page cache还需要尝试将还回来的span与其他空闲的span进行合并

需要注意的是,在向前或向后进行合并的过程中:

如果没有通过页号获取到其对应的span,说明对应到该页的内存块还未申请,此时需要停止合并。
如果通过页号获取到了其对应的span,但该span处于被使用的状态,那我们也必须停止合并。
如果合并后大于128页则不能进行本次合并,因为page cache无法对大于128页的span进行管理。

四、多线程环境下对比malloc测试

之前我们只是对代码进行了一些基础的单元测试,下面我们在多线程场景下对比malloc进行测试。

void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)

	std::vector<std::thread> vthread(nworks);
	std::atomic<size_t> malloc_costtime = 0;
	std::atomic<size_t> free_costtime = 0;
	for (size_t k = 0; k < nworks; ++k)
	
		vthread[k] = std::thread([&, k]() 
			std::vector<void*> v;
			v.reserve(ntimes);
			for (size_t j = 0; j < rounds; ++j)
			
				size_t begin1 = clock();
				for (size_t i = 0; i < ntimes; i++)
				
					v.push_back(malloc(16));
					//v.push_back(malloc((16 + i) % 8192 + 1));
				
				size_t end1 = clock();
				size_t begin2 = clock();
				for (size_t i = 0; i < ntimes; i++)
				
					实现高并发内存池

实现高并发内存池

C++从零实现一个高并发内存池

C++从零实现一个高并发内存池

C++从零实现一个高并发内存池

C++高并发内存池的设计和实现