欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

C++高并發(fā)內(nèi)存池的實現(xiàn)

 更新時間:2022年07月18日 11:28:58   作者:2021dragon  
本文主要介紹了C++高并發(fā)內(nèi)存池的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧

項目介紹

本項目實現(xiàn)的是一個高并發(fā)的內(nèi)存池,它的原型是Google的一個開源項目tcmalloc,tcmalloc全稱Thread-Caching Malloc,即線程緩存的malloc,實現(xiàn)了高效的多線程內(nèi)存管理,用于替換系統(tǒng)的內(nèi)存分配相關(guān)函數(shù)malloc和free。

在這里插入圖片描述

tcmalloc的知名度也是非常高的,不少公司都在用它,比如Go語言就直接用它做了自己的內(nèi)存分配器。

該項目就是把tcmalloc中最核心的框架簡化后拿出來,模擬實現(xiàn)出一個mini版的高并發(fā)內(nèi)存池,目的就是學習tcmalloc的精華。

該項目主要涉及C/C++、數(shù)據(jù)結(jié)構(gòu)(鏈表、哈希桶)、操作系統(tǒng)內(nèi)存管理、單例模式、多線程、互斥鎖等方面的技術(shù)。

內(nèi)存池介紹

池化技術(shù)

在說內(nèi)存池之前,我們得先了解一下“池化技術(shù)”。所謂“池化技術(shù)”,就是程序先向系統(tǒng)申請過量的資源,然后自己進行管理,以備不時之需。

之所以要申請過量的資源,是因為申請和釋放資源都有較大的開銷,不如提前申請一些資源放入“池”中,當需要資源時直接從“池”中獲取,不需要時就將該資源重新放回“池”中即可。這樣使用時就會變得非常快捷,可以大大提高程序的運行效率。

在計算機中,有很多使用“池”這種技術(shù)的地方,除了內(nèi)存池之外,還有連接池、線程池、對象池等。以服務(wù)器上的線程池為例,它的主要思想就是:先啟動若干數(shù)量的線程,讓它們處于睡眠狀態(tài),當接收到客戶端的請求時,喚醒池中某個睡眠的線程,讓它來處理客戶端的請求,當處理完這個請求后,線程又進入睡眠狀態(tài)。

內(nèi)存池

內(nèi)存池是指程序預(yù)先向操作系統(tǒng)申請一塊足夠大的內(nèi)存,此后,當程序中需要申請內(nèi)存的時候,不是直接向操作系統(tǒng)申請,而是直接從內(nèi)存池中獲?。煌?,當釋放內(nèi)存的時候,并不是真正將內(nèi)存返回給操作系統(tǒng),而是將內(nèi)存返回給內(nèi)存池。當程序退出時(或某個特定時間),內(nèi)存池才將之前申請的內(nèi)存真正釋放。

內(nèi)存池主要解決的問題

內(nèi)存池主要解決的就是效率的問題,它能夠避免讓程序頻繁的向系統(tǒng)申請和釋放內(nèi)存。其次,內(nèi)存池作為系統(tǒng)的內(nèi)存分配器,還需要嘗試解決內(nèi)存碎片的問題。

內(nèi)存碎片分為內(nèi)部碎片和外部碎片:

  • 外部碎片是一些空閑的小塊內(nèi)存區(qū)域,由于這些內(nèi)存空間不連續(xù),以至于合計的內(nèi)存足夠,但是不能滿足一些內(nèi)存分配申請需求。
  • 內(nèi)部碎片是由于一些對齊的需求,導(dǎo)致分配出去的空間中一些內(nèi)存無法被利用。

注意: 內(nèi)存池嘗試解決的是外部碎片的問題,同時也盡可能的減少內(nèi)部碎片的產(chǎn)生。

malloc

C/C++中我們要動態(tài)申請內(nèi)存并不是直接去堆申請的,而是通過malloc函數(shù)去申請的,包括C++中的new實際上也是封裝了malloc函數(shù)的。

我們申請內(nèi)存塊時是先調(diào)用malloc,malloc再去向操作系統(tǒng)申請內(nèi)存。malloc實際就是一個內(nèi)存池,malloc相當于向操作系統(tǒng)“批發(fā)”了一塊較大的內(nèi)存空間,然后“零售”給程序用,當全部“售完”或程序有大量的內(nèi)存需求時,再根據(jù)實際需求向操作系統(tǒng)“進貨”。

在這里插入圖片描述

malloc的實現(xiàn)方式有很多種,一般不同編譯器平臺用的都是不同的。比如Windows的VS系列中的malloc就是微軟自行實現(xiàn)的,而Linux下的gcc用的是glibc中的ptmalloc。

定長內(nèi)存池的實現(xiàn)

malloc其實就是一個通用的內(nèi)存池,在什么場景下都可以使用,但這也意味著malloc在什么場景下都不會有很高的性能,因為malloc并不是針對某種場景專門設(shè)計的。

定長內(nèi)存池就是針對固定大小內(nèi)存塊的申請和釋放的內(nèi)存池,由于定長內(nèi)存池只需要支持固定大小內(nèi)存塊的申請和釋放,因此我們可以將其性能做到極致,并且在實現(xiàn)定長內(nèi)存池時不需要考慮內(nèi)存碎片等問題,因為我們申請/釋放的都是固定大小的內(nèi)存塊。

我們可以通過實現(xiàn)定長內(nèi)存池來熟悉一下對簡單內(nèi)存池的控制,其次,這個定長內(nèi)存池后面會作為高并發(fā)內(nèi)存池的一個基礎(chǔ)組件。

如何實現(xiàn)定長?

在實現(xiàn)定長內(nèi)存池時要做到“定長”有很多種方法,比如我們可以使用非類型模板參數(shù),使得在該內(nèi)存池中申請到的對象的大小都是N。

template<size_t N>
class ObjectPool
{};

此外,定長內(nèi)存池也叫做對象池,在創(chuàng)建對象池時,對象池可以根據(jù)傳入的對象類型的大小來實現(xiàn)“定長”,因此我們可以通過使用模板參數(shù)來實現(xiàn)“定長”,比如創(chuàng)建定長內(nèi)存池時傳入的對象類型是int,那么該內(nèi)存池就只支持4字節(jié)大小內(nèi)存的申請和釋放。

template<class T>
class ObjectPool
{};

如何直接向堆申請空間?

既然是內(nèi)存池,那么我們首先得向系統(tǒng)申請一塊內(nèi)存空間,然后對其進行管理。要想直接向堆申請內(nèi)存空間,在Windows下,可以調(diào)用VirtualAlloc函數(shù);在Linux下,可以調(diào)用brk或mmap函數(shù)。

#ifdef _WIN32
	#include <Windows.h>
#else
	//...
#endif

//直接去堆上申請按頁申請空間
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
	void* ptr = VirtualAlloc(0, kpage<<13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
	// linux下brk mmap等
#endif
	if (ptr == nullptr)
		throw std::bad_alloc();
	return ptr;
}

這里我們可以通過條件編譯將對應(yīng)平臺下向堆申請內(nèi)存的函數(shù)進行封裝,此后我們就不必再關(guān)心當前所在平臺,當我們需要直接向堆申請內(nèi)存時直接調(diào)用我們封裝后的SystemAlloc函數(shù)即可。

定長內(nèi)存池中應(yīng)該包含哪些成員變量?

對于向堆申請到的大塊內(nèi)存,我們可以用一個指針來對其進行管理,但僅用一個指針肯定是不夠的,我們還需要用一個變量來記錄這塊內(nèi)存的長度。

由于此后我們需要將這塊內(nèi)存進行切分,為了方便切分操作,指向這塊內(nèi)存的指針最好是字符指針,因為指針的類型決定了指針向前或向后走一步有多大距離,對于字符指針來說,當我們需要向后移動n個字節(jié)時,直接對字符指針進行加n操作即可。

在這里插入圖片描述

其次,釋放回來的定長內(nèi)存塊也需要被管理,我們可以將這些釋放回來的定長內(nèi)存塊鏈接成一個鏈表,這里我們將管理釋放回來的內(nèi)存塊的鏈表叫做自由鏈表,為了能找到這個自由鏈表,我們還需要一個指向自由鏈表的指針。

在這里插入圖片描述

因此,定長內(nèi)存池當中包含三個成員變量:

  • _memory:指向大塊內(nèi)存的指針。
  • _remainBytes:大塊內(nèi)存切分過程中剩余字節(jié)數(shù)。
  • _freeList:還回來過程中鏈接的自由鏈表的頭指針。

內(nèi)存池如何管理釋放的對象?

對于還回來的定長內(nèi)存塊,我們可以用自由鏈表將其鏈接起來,但我們并不需要為其專門定義鏈式結(jié)構(gòu),我們可以讓內(nèi)存塊的前4個字節(jié)(32位平臺)或8個字節(jié)(64位平臺)作為指針,存儲后面內(nèi)存塊的起始地址即可。

因此在向自由鏈表插入被釋放的內(nèi)存塊時,先讓該內(nèi)存塊的前4個字節(jié)或8個字節(jié)存儲自由鏈表中第一個內(nèi)存塊的地址,然后再讓_freeList指向該內(nèi)存塊即可,也就是一個簡單的鏈表頭插操作。

在這里插入圖片描述

這里有一個有趣問題:如何讓一個指針在32位平臺下解引用后能向后訪問4個字節(jié),在64位平臺下解引用后能向后訪問8個字節(jié)?

首先我們得知道,32位平臺下指針的大小是4個字節(jié),64位平臺下指針的大小是8個字節(jié)。而指針指向數(shù)據(jù)的類型,決定了指針解引用后能向后訪問的空間大小,因此我們這里需要的是一個指向指針的指針,這里使用二級指針就行了。

當我們需要訪問一個內(nèi)存塊的前4/8個字節(jié)時,我們就可以先該內(nèi)存塊的地址先強轉(zhuǎn)為二級指針,由于二級指針存儲的是一級指針的地址,二級指針解引用能向后訪問一個指針的大小,因此在32位平臺下訪問的就是4個字節(jié),在64位平臺下訪問的就是8個字節(jié),此時我們訪問到了該內(nèi)存塊的前4/8個字節(jié)。

void*& NextObj(void* ptr)
{
	return (*(void**)ptr);
}

需要注意的是,在釋放對象時,我們應(yīng)該顯示調(diào)用該對象的析構(gòu)函數(shù)清理該對象,因為該對象可能還管理著其他某些資源,如果不對其進行清理那么這些資源將無法被釋放,就會導(dǎo)致內(nèi)存泄漏。

//釋放對象
void Delete(T* obj)
{
	//顯示調(diào)用T的析構(gòu)函數(shù)清理對象
	obj->~T();

	//將釋放的對象頭插到自由鏈表
	NextObj(obj) = _freeList;
	_freeList = obj;
}

內(nèi)存池如何為我們申請對象?

當我們申請對象時,內(nèi)存池應(yīng)該優(yōu)先把還回來的內(nèi)存塊對象再次重復(fù)利用,因此如果自由鏈表當中有內(nèi)存塊的話,就直接從自由鏈表頭刪一個內(nèi)存塊進行返回即可。

在這里插入圖片描述

如果自由鏈表當中沒有內(nèi)存塊,那么我們就在大塊內(nèi)存中切出定長的內(nèi)存塊進行返回,當內(nèi)存塊切出后及時更新_memory指針的指向,以及_remainBytes的值即可。

在這里插入圖片描述

需要特別注意的是,由于當內(nèi)存塊釋放時我們需要將內(nèi)存塊鏈接到自由鏈表當中,因此我們必須保證切出來的對象至少能夠存儲得下一個地址,所以當對象的大小小于當前所在平臺指針的大小時,需要按指針的大小進行內(nèi)存塊的切分。

此外,當大塊內(nèi)存已經(jīng)不足以切分出一個對象時,我們就應(yīng)該調(diào)用我們封裝的SystemAlloc函數(shù),再次向堆申請一塊內(nèi)存空間,此時也要注意及時更新_memory指針的指向,以及_remainBytes的值。

//申請對象
T* New()
{
	T* obj = nullptr;

	//優(yōu)先把還回來的內(nèi)存塊對象,再次重復(fù)利用
	if (_freeList != nullptr)
	{
		//從自由鏈表頭刪一個對象
		obj = (T*)_freeList;
		_freeList = NextObj(_freeList);
	}
	else
	{
		//保證對象能夠存儲得下地址
		size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
		//剩余內(nèi)存不夠一個對象大小時,則重新開大塊空間
		if (_remainBytes < objSize)
		{
			_remainBytes = 128 * 1024;
			_memory = (char*)SystemAlloc(_remainBytes >> 13);
			if (_memory == nullptr)
			{
				throw std::bad_alloc();
			}
		}
		//從大塊內(nèi)存中切出objSize字節(jié)的內(nèi)存
		obj = (T*)_memory;
		_memory += objSize;
		_remainBytes -= objSize;
	}
	//定位new,顯示調(diào)用T的構(gòu)造函數(shù)初始化
	new(obj)T;

	return obj;
}

需要注意的是,與釋放對象時需要顯示調(diào)用該對象的析構(gòu)函數(shù)一樣,當內(nèi)存塊切分出來后,我們也應(yīng)該使用定位new,顯示調(diào)用該對象的構(gòu)造函數(shù)對其進行初始化。

定長內(nèi)存池整體代碼如下:

//定長內(nèi)存池
template<class T>
class ObjectPool
{
public:
	//申請對象
	T* New()
	{
		T* obj = nullptr;

		//優(yōu)先把還回來的內(nèi)存塊對象,再次重復(fù)利用
		if (_freeList != nullptr)
		{
			//從自由鏈表頭刪一個對象
			obj = (T*)_freeList;
			_freeList = NextObj(_freeList);
		}
		else
		{
			//保證對象能夠存儲得下地址
			size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
			//剩余內(nèi)存不夠一個對象大小時,則重新開大塊空間
			if (_remainBytes < objSize)
			{
				_remainBytes = 128 * 1024;
				//_memory = (char*)malloc(_remainBytes);
				_memory = (char*)SystemAlloc(_remainBytes >> 13);
				if (_memory == nullptr)
				{
					throw std::bad_alloc();
				}
			}
			//從大塊內(nèi)存中切出objSize字節(jié)的內(nèi)存
			obj = (T*)_memory;
			_memory += objSize;
			_remainBytes -= objSize;
		}
		//定位new,顯示調(diào)用T的構(gòu)造函數(shù)初始化
		new(obj)T;

		return obj;
	}
	//釋放對象
	void Delete(T* obj)
	{
		//顯示調(diào)用T的析構(gòu)函數(shù)清理對象
		obj->~T();

		//將釋放的對象頭插到自由鏈表
		NextObj(obj) = _freeList;
		_freeList = obj;
	}
private:
	char* _memory = nullptr;     //指向大塊內(nèi)存的指針
	size_t _remainBytes = 0;     //大塊內(nèi)存在切分過程中剩余字節(jié)數(shù)

	void* _freeList = nullptr;   //還回來過程中鏈接的自由鏈表的頭指針
};

性能對比

下面我們將實現(xiàn)的定長內(nèi)存池和malloc/free進行性能對比,測試代碼如下:

struct TreeNode
{
	int _val;
	TreeNode* _left;
	TreeNode* _right;
	TreeNode()
		:_val(0)
		, _left(nullptr)
		, _right(nullptr)
	{}
};

void TestObjectPool()
{
	// 申請釋放的輪次
	const size_t Rounds = 3;
	// 每輪申請釋放多少次
	const size_t N = 1000000;
	std::vector<TreeNode*> v1;
	v1.reserve(N);

	//malloc和free
	size_t begin1 = clock();
	for (size_t j = 0; j < Rounds; ++j)
	{
		for (int i = 0; i < N; ++i)
		{
			v1.push_back(new TreeNode);
		}
		for (int i = 0; i < N; ++i)
		{
			delete v1[i];
		}
		v1.clear();
	}
	size_t end1 = clock();

	//定長內(nèi)存池
	ObjectPool<TreeNode> TNPool;
	std::vector<TreeNode*> v2;
	v2.reserve(N);
	size_t begin2 = clock();
	for (size_t j = 0; j < Rounds; ++j)
	{
		for (int i = 0; i < N; ++i)
		{
			v2.push_back(TNPool.New());
		}
		for (int i = 0; i < N; ++i)
		{
			TNPool.Delete(v2[i]);
		}
		v2.clear();
	}
	size_t end2 = clock();

	cout << "new cost time:" << end1 - begin1 << endl;
	cout << "object pool cost time:" << end2 - begin2 << endl;
}

在代碼中,我們先用new申請若干個TreeNode對象,然后再用delete將這些對象再釋放,通過clock函數(shù)得到整個過程消耗的時間。(new和delete底層就是封裝的malloc和free)

然后再重復(fù)該過程,只不過將其中的new和delete替換為定長內(nèi)存池當中的New和Delete,此時再通過clock函數(shù)得到該過程消耗的時間。

在這里插入圖片描述

可以看到在這個過程中,定長內(nèi)存池消耗的時間比malloc/free消耗的時間要短。這就是因為malloc是一個通用的內(nèi)存池,而定長內(nèi)存池是專門針對申請定長對象而設(shè)計的,因此在這種特殊場景下定長內(nèi)存池的效率更高,正所謂“尺有所短,寸有所長”。

高并發(fā)內(nèi)存池整體框架設(shè)計

該項目解決的是什么問題?

現(xiàn)代很多的開發(fā)環(huán)境都是多核多線程,因此在申請內(nèi)存的時,必然存在激烈的鎖競爭問題。malloc本身其實已經(jīng)很優(yōu)秀了,但是在并發(fā)場景下可能會因為頻繁的加鎖和解鎖導(dǎo)致效率有所降低,而該項目的原型tcmalloc實現(xiàn)的就是一種在多線程高并發(fā)場景下更勝一籌的內(nèi)存池。

在實現(xiàn)內(nèi)存池時我們一般需要考慮到效率問題和內(nèi)存碎片的問題,但對于高并發(fā)內(nèi)存池來說,我們還需要考慮在多線程環(huán)境下的鎖競爭問題。

高并發(fā)內(nèi)存池整體框架設(shè)計

在這里插入圖片描述

高并發(fā)內(nèi)存池主要由以下三個部分構(gòu)成:

  • thread cache: 線程緩存是每個線程獨有的,用于小于等于256KB的內(nèi)存分配,每個線程獨享一個thread cache。
  • central cache: 中心緩存是所有線程所共享的,當thread cache需要內(nèi)存時會按需從central cache中獲取內(nèi)存,而當thread cache中的內(nèi)存滿足一定條件時,central cache也會在合適的時機對其進行回收。
  • page cache: 頁緩存中存儲的內(nèi)存是以頁為單位進行存儲及分配的,當central cache需要內(nèi)存時,page cache會分配出一定數(shù)量的頁分配給central cache,而當central cache中的內(nèi)存滿足一定條件時,page cache也會在合適的時機對其進行回收,并將回收的內(nèi)存盡可能的進行合并,組成更大的連續(xù)內(nèi)存塊,緩解內(nèi)存碎片的問題。

進一步說明:

每個線程都有一個屬于自己的thread cache,也就意味著線程在thread cache申請內(nèi)存時是不需要加鎖的,而一次性申請大于256KB內(nèi)存的情況是很少的,因此大部分情況下申請內(nèi)存時都是無鎖的,這也就是這個高并發(fā)內(nèi)存池高效的地方。

每個線程的thread cache會根據(jù)自己的情況向central cache申請或歸還內(nèi)存,這就避免了出現(xiàn)單個線程的thread cache占用太多內(nèi)存,而其余thread cache出現(xiàn)內(nèi)存吃緊的問題。

多線程的thread cache可能會同時找central cache申請內(nèi)存,此時就會涉及線程安全的問題,因此在訪問central cache時是需要加鎖的,但central cache實際上是一個哈希桶的結(jié)構(gòu),只有當多個線程同時訪問同一個桶時才需要加鎖,所以這里的鎖競爭也不會很激烈。

各個部分的主要作用

thread cache主要解決鎖競爭的問題,每個線程獨享自己的thread cache,當自己的thread cache中有內(nèi)存時該線程不會去和其他線程進行競爭,每個線程只要在自己的thread cache申請內(nèi)存就行了。

central cache主要起到一個居中調(diào)度的作用,每個線程的thread cache需要內(nèi)存時從central cache獲取,而當thread cache的內(nèi)存多了就會將內(nèi)存還給central cache,其作用類似于一個中樞,因此取名為中心緩存。

page cache就負責提供以頁為單位的大塊內(nèi)存,當central cache需要內(nèi)存時就會去向page cache申請,而當page cache沒有內(nèi)存了就會直接去找系統(tǒng),也就是直接去堆上按頁申請內(nèi)存塊。

threadcache

threadcache整體設(shè)計

定長內(nèi)存池只支持固定大小內(nèi)存塊的申請釋放,因此定長內(nèi)存池中只需要一個自由鏈表管理釋放回來的內(nèi)存塊。現(xiàn)在我們要支持申請和釋放不同大小的內(nèi)存塊,那么我們就需要多個自由鏈表來管理釋放回來的內(nèi)存塊,因此thread cache實際上一個哈希桶結(jié)構(gòu),每個桶中存放的都是一個自由鏈表。

thread cache支持小于等于256KB內(nèi)存的申請,如果我們將每種字節(jié)數(shù)的內(nèi)存塊都用一個自由鏈表進行管理的話,那么此時我們就需要20多萬個自由鏈表,光是存儲這些自由鏈表的頭指針就需要消耗大量內(nèi)存,這顯然是得不償失的。

這時我們可以選擇做一些平衡的犧牲,讓這些字節(jié)數(shù)按照某種規(guī)則進行對齊,例如我們讓這些字節(jié)數(shù)都按照8字節(jié)進行向上對齊,那么thread cache的結(jié)構(gòu)就是下面這樣的,此時當線程申請1~8字節(jié)的內(nèi)存時會直接給出8字節(jié),而當線程申請9~16字節(jié)的內(nèi)存時會直接給出16字節(jié),以此類推。

在這里插入圖片描述

因此當線程要申請某一大小的內(nèi)存塊時,就需要經(jīng)過某種計算得到對齊后的字節(jié)數(shù),進而找到對應(yīng)的哈希桶,如果該哈希桶中的自由鏈表中有內(nèi)存塊,那就從自由鏈表中頭刪一個內(nèi)存塊進行返回;如果該自由鏈表已經(jīng)為空了,那么就需要向下一層的central cache進行獲取了。

但此時由于對齊的原因,就可能會產(chǎn)生一些碎片化的內(nèi)存無法被利用,比如線程只申請了6字節(jié)的內(nèi)存,而thread cache卻直接給了8字節(jié)的內(nèi)存,這多給出的2字節(jié)就無法被利用,導(dǎo)致了一定程度的空間浪費,這些因為某些對齊原因?qū)е聼o法被利用的內(nèi)存,就是內(nèi)存碎片中的內(nèi)部碎片。

鑒于當前項目比較復(fù)雜,我們最好對自由鏈表這個結(jié)構(gòu)進行封裝,目前我們就提供Push和Pop兩個成員函數(shù),對應(yīng)的操作分別是將對象插入到自由鏈表(頭插)和從自由鏈表獲取一個對象(頭刪),后面在需要時還會添加對應(yīng)的成員函數(shù)。

//管理切分好的小對象的自由鏈表
class FreeList
{
public:
	//將釋放的對象頭插到自由鏈表
	void Push(void* obj)
	{
		assert(obj);

		//頭插
		NextObj(obj) = _freeList;
		_freeList = obj;
	}

	//從自由鏈表頭部獲取一個對象
	void* Pop()
	{
		assert(_freeList);

		//頭刪
		void* obj = _freeList;
		_freeList = NextObj(_freeList);
		return obj;
	}

private:
	void* _freeList = nullptr; //自由鏈表
};

因此thread cache實際就是一個數(shù)組,數(shù)組中存儲的就是一個個的自由鏈表,至于這個數(shù)組中到底存儲了多少個自由鏈表,就需要看我們在進行字節(jié)數(shù)對齊時具體用的是什么映射對齊規(guī)則了。

threadcache哈希桶映射對齊規(guī)則

如何進行對齊?

上面已經(jīng)說了,不是每個字節(jié)數(shù)都對應(yīng)一個自由鏈表,這樣開銷太大了,因此我們需要制定一個合適的映射對齊規(guī)則。

首先,這些內(nèi)存塊是會被鏈接到自由鏈表上的,因此一開始肯定是按8字節(jié)進行對齊是最合適的,因為我們必須保證這些內(nèi)存塊,無論是在32位平臺下還是64位平臺下,都至少能夠存儲得下一個指針。

但如果所有的字節(jié)數(shù)都按照8字節(jié)進行對齊的話,那么我們就需要建立 256 × 1024 ÷ 8 = 32768 256\times1024\div8=32768 256×1024÷8=32768個桶,這個數(shù)量還是比較多的,實際上我們可以讓不同范圍的字節(jié)數(shù)按照不同的對齊數(shù)進行對齊,具體對齊方式如下:

字節(jié)數(shù)對齊數(shù)哈希桶下標
[ [ [ 1 , 128 1,128 1,128 ] ] ]8 8 8[ [ [ 0 , 16 ) 0,16) 0,16)
[ [ [ 128 + 1 , 1024 128+1,1024 128+1,1024 ] ] ]16 16 16[ [ [ 16 , 72 ) 16,72) 16,72)
[ [ [ 1024 + 1 , 8 × 1024 1024+1,8\times1024 1024+1,8×1024 ] ] ]128 128 128[ [ [ 72 , 128 ) 72,128) 72,128)
[ [ [ 8 × 1024 + 1 , 64 × 1024 8\times1024+1,64\times1024 8×1024+1,64×1024 ] ] ]1024 1024 1024[ [ [ 128 , 184 ) 128,184) 128,184)
[ [ [ 64 × 1024 + 1 , 256 × 1024 64\times1024+1,256\times1024 64×1024+1,256×1024 ] ] ]8 × 1024 8\times1024 8×1024[ [ [ 184 , 208 ) 184,208) 184,208)

空間浪費率

雖然對齊產(chǎn)生的內(nèi)碎片會引起一定程度的空間浪費,但按照上面的對齊規(guī)則,我們可以將浪費率控制到百分之十左右。需要說明的是,1~128這個區(qū)間我們不做討論,因為1字節(jié)就算是對齊到2字節(jié)也有百分之五十的浪費率,這里我們就從第二個區(qū)間開始進行計算。

根據(jù)上面的公式,我們要得到某個區(qū)間的最大浪費率,就應(yīng)該讓分子取到最大,讓分母取到最小。比如129~1024這個區(qū)間,該區(qū)域的對齊數(shù)是16,那么最大浪費的字節(jié)數(shù)就是15,而最小對齊后的字節(jié)數(shù)就是這個區(qū)間內(nèi)的前16個數(shù)所對齊到的字節(jié)數(shù),也就是144,那么該區(qū)間的最大浪費率也就是 15 ÷ 144 ≈ 10.42 % 15\div144\approx10.42\% 15÷144≈10.42%。同樣的道理,后面兩個區(qū)間的最大浪費率分別是 127 ÷ 1152 ≈ 11.02 % 127\div1152\approx11.02\% 127÷1152≈11.02%和 1023 ÷ 9216 ≈ 11.10 % 1023\div9216\approx11.10\% 1023÷9216≈11.10%。

對齊和映射相關(guān)函數(shù)的編寫

此時有了字節(jié)數(shù)的對齊規(guī)則后,我們就需要提供兩個對應(yīng)的函數(shù),分別用于獲取某一字節(jié)數(shù)對齊后的字節(jié)數(shù),以及該字節(jié)數(shù)對應(yīng)的哈希桶下標。關(guān)于處理對齊和映射的函數(shù),我們可以將其封裝到一個類當中。

//管理對齊和映射等關(guān)系
class SizeClass
{
public:
	//獲取向上對齊后的字節(jié)數(shù)
	static inline size_t RoundUp(size_t bytes);
	//獲取對應(yīng)哈希桶的下標
	static inline size_t Index(size_t bytes);
};

需要注意的是,SizeClass類當中的成員函數(shù)最好設(shè)置為靜態(tài)成員函數(shù),否則我們在調(diào)用這些函數(shù)時就需要通過對象去調(diào)用,并且對于這些可能會頻繁調(diào)用的函數(shù),可以考慮將其設(shè)置為內(nèi)聯(lián)函數(shù)。

在獲取某一字節(jié)數(shù)向上對齊后的字節(jié)數(shù)時,可以先判斷該字節(jié)數(shù)屬于哪一個區(qū)間,然后再通過調(diào)用一個子函數(shù)進行進一步處理。

//獲取向上對齊后的字節(jié)數(shù)
static inline size_t RoundUp(size_t bytes)
{
	if (bytes <= 128)
	{
		return _RoundUp(bytes, 8);
	}
	else if (bytes <= 1024)
	{
		return _RoundUp(bytes, 16);
	}
	else if (bytes <= 8 * 1024)
	{
		return _RoundUp(bytes, 128);
	}
	else if (bytes <= 64 * 1024)
	{
		return _RoundUp(bytes, 1024);
	}
	else if (bytes <= 256 * 1024)
	{
		return _RoundUp(bytes, 8 * 1024);
	}
	else
	{
		assert(false);
		return -1;
	}
}

此時我們就需要編寫一個子函數(shù),該子函數(shù)需要通過對齊數(shù)計算出某一字節(jié)數(shù)對齊后的字節(jié)數(shù),最容易想到的就是下面這種寫法。

//一般寫法
static inline size_t _RoundUp(size_t bytes, size_t alignNum)
{
	size_t alignSize = 0;
	if (bytes%alignNum != 0)
	{
		alignSize = (bytes / alignNum + 1)*alignNum;
	}
	else
	{
		alignSize = bytes;
	}
	return alignSize;
}

除了上述寫法,我們還可以通過位運算的方式來進行計算,雖然位運算可能并沒有上面的寫法容易理解,但計算機執(zhí)行位運算的速度是比執(zhí)行乘法和除法更快的。

//位運算寫法
static inline size_t _RoundUp(size_t bytes, size_t alignNum)
{
	return ((bytes + alignNum - 1)&~(alignNum - 1));
}

對于上述位運算,我們以10字節(jié)按8字節(jié)對齊為例進行分析。 8 − 1 = 7 8-1=7 8−1=7,7就是一個低三位為1其余位為0的二進制序列,我們將10與7相加,相當于將10字節(jié)當中不夠8字節(jié)的剩余字節(jié)數(shù)補上了。

在這里插入圖片描述

然后我們再將該值與7按位取反后的值進行與運算,而7按位取反后是一個低三位為0其余位為1的二進制序列,該操作進行后相當于屏蔽了該值的低三位而該值的其余位保持不變,此時得到的值就是10字節(jié)按8字節(jié)對齊后的值,即16字節(jié)。

在這里插入圖片描述

在獲取某一字節(jié)數(shù)對應(yīng)的哈希桶下標時,也是先判斷該字節(jié)數(shù)屬于哪一個區(qū)間,然后再通過調(diào)用一個子函數(shù)進行進一步處理。

//獲取對應(yīng)哈希桶的下標
static inline size_t Index(size_t bytes)
{
	//每個區(qū)間有多少個自由鏈表
	static size_t groupArray[4] = { 16, 56, 56, 56 };
	if (bytes <= 128)
	{
		return _Index(bytes, 3);
	}
	else if (bytes <= 1024)
	{
		return _Index(bytes - 128, 4) + groupArray[0];
	}
	else if (bytes <= 8 * 1024)
	{
		return _Index(bytes - 1024, 7) + groupArray[0] + groupArray[1];
	}
	else if (bytes <= 64 * 1024)
	{
		return _Index(bytes - 8 * 1024, 10) + groupArray[0] + groupArray[1] + groupArray[2];
	}
	else if (bytes <= 256 * 1024)
	{
		return _Index(bytes - 64 * 1024, 13) + groupArray[0] + groupArray[1] + groupArray[2] + groupArray[3];
	}
	else
	{
		assert(false);
		return -1;
	}
}

此時我們需要編寫一個子函數(shù)來繼續(xù)進行處理,容易想到的就是根據(jù)對齊數(shù)來計算某一字節(jié)數(shù)對應(yīng)的下標。

//一般寫法
static inline size_t _Index(size_t bytes, size_t alignNum)
{
	size_t index = 0;
	if (bytes%alignNum != 0)
	{
		index = bytes / alignNum;
	}
	else
	{
		index = bytes / alignNum - 1;
	}
	return index;
}

當然,為了提高效率下面也提供了一個用位運算來解決的方法,需要注意的是,此時我們并不是傳入該字節(jié)數(shù)的對齊數(shù),而是將對齊數(shù)寫成2的n次方的形式后,將這個n值進行傳入。比如對齊數(shù)是8,傳入的就是3。

//位運算寫法
static inline size_t _Index(size_t bytes, size_t alignShift)
{
	return ((bytes + (1 << alignShift) - 1) >> alignShift) - 1;
}

這里我們還是以10字節(jié)按8字節(jié)對齊為例進行分析。此時傳入的alignShift就是3,將1左移3位后得到的實際上就是對齊數(shù)8, 8 − 1 = 7 8-1=7 8−1=7,相當于我們還是讓10與7相加。

在這里插入圖片描述

之后我們再將該值向右移3位,實際上就是讓這個值除以8,此時我們也是相當于屏蔽了該值二進制的低三位,因為除以8得到的值與其二進制的低三位無關(guān),所以我們可以說是將10對齊后的字節(jié)數(shù)除以了8,此時得到了2,而最后還需要減一是因為數(shù)組的下標是從0開始的。

ThreadCache類

按照上述的對齊規(guī)則,thread cache中桶的個數(shù),也就是自由鏈表的個數(shù)是208,以及thread cache允許申請的最大內(nèi)存大小256KB,我們可以將這些數(shù)據(jù)按照如下方式進行定義。

//小于等于MAX_BYTES,就找thread cache申請
//大于MAX_BYTES,就直接找page cache或者系統(tǒng)堆申請
static const size_t MAX_BYTES = 256 * 1024;
//thread cache和central cache自由鏈表哈希桶的表大小
static const size_t NFREELISTS = 208;

現(xiàn)在就可以對ThreadCache類進行定義了,thread cache就是一個存儲208個自由鏈表的數(shù)組,目前thread cache就先提供一個Allocate函數(shù)用于申請對象就行了,后面需要時再進行增加。

class ThreadCache
{
public:
	//申請內(nèi)存對象
	void* Allocate(size_t size);

private:
	FreeList _freeLists[NFREELISTS]; //哈希桶
};

在thread cache申請對象時,通過所給字節(jié)數(shù)計算出對應(yīng)的哈希桶下標,如果桶中自由鏈表不為空,則從該自由鏈表中取出一個對象進行返回即可;但如果此時自由鏈表為空,那么我們就需要從central cache進行獲取了,這里的FetchFromCentralCache函數(shù)也是thread cache類中的一個成員函數(shù),在后面再進行具體實現(xiàn)。

//申請內(nèi)存對象
void* ThreadCache::Allocate(size_t size)
{
	assert(size <= MAX_BYTES);
	size_t alignSize = SizeClass::RoundUp(size);
	size_t index = SizeClass::Index(size);
	if (!_freeLists[index].Empty())
	{
		return _freeLists[index].Pop();
	}
	else
	{
		return FetchFromCentralCache(index, alignSize);
	}
}

threadcacheTLS無鎖訪問

每個線程都有一個自己獨享的thread cache,那應(yīng)該如何創(chuàng)建這個thread cache呢?我們不能將這個thread cache創(chuàng)建為全局的,因為全局變量是所有線程共享的,這樣就不可避免的需要鎖來控制,增加了控制成本和代碼復(fù)雜度。

要實現(xiàn)每個線程無鎖的訪問屬于自己的thread cache,我們需要用到線程局部存儲TLS(Thread Local Storage),這是一種變量的存儲方法,使用該存儲方法的變量在它所在的線程是全局可訪問的,但是不能被其他線程訪問到,這樣就保持了數(shù)據(jù)的線程獨立性。

//TLS - Thread Local Storage
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;

但不是每個線程被創(chuàng)建時就立馬有了屬于自己的thread cache,而是當該線程調(diào)用相關(guān)申請內(nèi)存的接口時才會創(chuàng)建自己的thread cache,因此在申請內(nèi)存的函數(shù)中會包含以下邏輯。

//通過TLS,每個線程無鎖的獲取自己專屬的ThreadCache對象
if (pTLSThreadCache == nullptr)
{
	pTLSThreadCache = new ThreadCache;
}

centralcache

centralcache整體設(shè)計

當線程申請某一大小的內(nèi)存時,如果thread cache中對應(yīng)的自由鏈表不為空,那么直接取出一個內(nèi)存塊進行返回即可,但如果此時該自由鏈表為空,那么這時thread cache就需要向central cache申請內(nèi)存了。

central cache的結(jié)構(gòu)與thread cache是一樣的,它們都是哈希桶的結(jié)構(gòu),并且它們遵循的對齊映射規(guī)則都是一樣的。這樣做的好處就是,當thread cache的某個桶中沒有內(nèi)存了,就可以直接到central cache中對應(yīng)的哈希桶里去取內(nèi)存就行了。

central cache與thread cache的不同之處

central cache與thread cache有兩個明顯不同的地方,首先,thread cache是每個線程獨享的,而central cache是所有線程共享的,因為每個線程的thread cache沒有內(nèi)存了都會去找central cache,因此在訪問central cache時是需要加鎖的。

但central cache在加鎖時并不是將整個central cache全部鎖上了,central cache在加鎖時用的是桶鎖,也就是說每個桶都有一個鎖。此時只有當多個線程同時訪問central cache的同一個桶時才會存在鎖競爭,如果是多個線程同時訪問central cache的不同桶就不會存在鎖競爭。

central cache與thread cache的第二個不同之處就是,thread cache的每個桶中掛的是一個個切好的內(nèi)存塊,而central cache的每個桶中掛的是一個個的span。

在這里插入圖片描述

每個span管理的都是一個以頁為單位的大塊內(nèi)存,每個桶里面的若干span是按照雙鏈表的形式鏈接起來的,并且每個span里面還有一個自由鏈表,這個自由鏈表里面掛的就是一個個切好了的內(nèi)存塊,根據(jù)其所在的哈希桶這些內(nèi)存塊被切成了對應(yīng)的大小。

centralcache結(jié)構(gòu)設(shè)計

頁號的類型?

每個程序運行起來后都有自己的進程地址空間,在32位平臺下,進程地址空間的大小是232;而在64位平臺下,進程地址空間的大小就是264。

頁的大小一般是4K或者8K,我們以8K為例。在32位平臺下,進程地址空間就可以被分成 2 32 ÷ 2 13 = 2 19 2^{32}\div2^{13}=2^{19} 232÷213=219個頁;在64位平臺下,進程地址空間就可以被分成 2 64 ÷ 2 13 = 2 51 2^{64}\div2^{13}=2^{51} 264÷213=251個頁。頁號本質(zhì)與地址是一樣的,它們都是一個編號,只不過地址是以一個字節(jié)為一個單位,而頁是以多個字節(jié)為一個單位。

由于頁號在64位平臺下的取值范圍是 [ [ [ 0 , 2 51 ) 0,2^{51}) 0,251),因此我們不能簡單的用一個無符號整型來存儲頁號,這時我們需要借助條件編譯來解決這個問題。

#ifdef _WIN64
	typedef unsigned long long PAGE_ID;
#elif _WIN32
	typedef size_t PAGE_ID;
#else
	//linux
#endif

需要注意的是,在32位下,_WIN32有定義,_WIN64沒有定義;而在64位下,_WIN32和_WIN64都有定義。因此在條件編譯時,我們應(yīng)該先判斷_WIN64是否有定義,再判斷_WIN32是否有定義。

span的結(jié)構(gòu)

central cache的每個桶里掛的是一個個的span,span是一個管理以頁為單位的大塊內(nèi)存,span的結(jié)構(gòu)如下:

//管理以頁為單位的大塊內(nèi)存
struct Span
{
	PAGE_ID _pageId = 0;        //大塊內(nèi)存起始頁的頁號
	size_t _n = 0;              //頁的數(shù)量

	Span* _next = nullptr;      //雙鏈表結(jié)構(gòu)
	Span* _prev = nullptr;

	size_t _useCount = 0;       //切好的小塊內(nèi)存,被分配給thread cache的計數(shù)
	void* _freeList = nullptr;  //切好的小塊內(nèi)存的自由鏈表
};

對于span管理的以頁為單位的大塊內(nèi)存,我們需要知道這塊內(nèi)存具體在哪一個位置,便于之后page cache進行前后頁的合并,因此span結(jié)構(gòu)當中會記錄所管理大塊內(nèi)存起始頁的頁號。

至于每一個span管理的到底是多少個頁,這并不是固定的,需要根據(jù)多方面的因素來控制,因此span結(jié)構(gòu)當中有一個_n成員,該成員就代表著該span管理的頁的數(shù)量。

此外,每個span管理的大塊內(nèi)存,都會被切成相應(yīng)大小的內(nèi)存塊掛到當前span的自由鏈表中,比如8Byte哈希桶中的span,會被切成一個個8Byte大小的內(nèi)存塊掛到當前span的自由鏈表中,因此span結(jié)構(gòu)中需要存儲切好的小塊內(nèi)存的自由鏈表。

span結(jié)構(gòu)當中的_useCount成員記錄的就是,當前span中切好的小塊內(nèi)存,被分配給thread cache的計數(shù),當某個span的_useCount計數(shù)變?yōu)?時,代表當前span切出去的內(nèi)存塊對象全部還回來了,此時central cache就可以將這個span再還給page cache。

每個桶當中的span是以雙鏈表的形式組織起來的,當我們需要將某個span歸還給page cache時,就可以很方便的將該span從雙鏈表結(jié)構(gòu)中移出。如果用單鏈表結(jié)構(gòu)的話就比較麻煩了,因為單鏈表在刪除時,需要知道當前結(jié)點的前一個結(jié)點。

雙鏈表結(jié)構(gòu)

根據(jù)上面的描述,central cache的每個哈希桶里面存儲的都是一個雙鏈表結(jié)構(gòu),對于該雙鏈表結(jié)構(gòu)我們可以對其進行封裝。

//帶頭雙向循環(huán)鏈表
class SpanList
{
public:
	SpanList()
	{
		_head = new Span;
		_head->_next = _head;
		_head->_prev = _head;
	}
	void Insert(Span* pos, Span* newSpan)
	{
		assert(pos);
		assert(newSpan);

		Span* prev = pos->_prev;

		prev->_next = newSpan;
		newSpan->_prev = prev;

		newSpan->_next = pos;
		pos->_prev = newSpan;
	}
	void Erase(Span* pos)
	{
		assert(pos);
		assert(pos != _head); //不能刪除哨兵位的頭結(jié)點

		Span* prev = pos->_prev;
		Span* next = pos->_next;

		prev->_next = next;
		next->_prev = prev;
	}
private:
	Span* _head;
public:
	std::mutex _mtx; //桶鎖
};

需要注意的是,從雙鏈表刪除的span會還給下一層的page cache,相當于只是把這個span從雙鏈表中移除,因此不需要對刪除的span進行delete操作。

central cache的結(jié)構(gòu)

central cache的映射規(guī)則和thread cache是一樣的,因此central cache里面哈希桶的個數(shù)也是208,但central cache每個哈希桶中存儲就是我們上面定義的雙鏈表結(jié)構(gòu)。

class CentralCache
{
public:
	//...
private:
	SpanList _spanLists[NFREELISTS];
};

central cache和thread cache的映射規(guī)則一樣,有一個好處就是,當thread cache的某個桶沒有內(nèi)存了,就可以直接去central cache對應(yīng)的哈希桶進行申請就行了。

centralcache核心實現(xiàn)

central cache的實現(xiàn)方式

每個線程都有一個屬于自己的thread cache,我們是用TLS來實現(xiàn)每個線程無鎖的訪問屬于自己的thread cache的。而central cache和page cache在整個進程中只有一個,對于這種只能創(chuàng)建一個對象的類,我們可以將其設(shè)置為單例模式。

單例模式可以保證系統(tǒng)中該類只有一個實例,并提供一個訪問它的全局訪問點,該實例被所有程序模塊共享。單例模式又分為餓漢模式和懶漢模式,懶漢模式相對較復(fù)雜,我們這里使用餓漢模式就足夠了。

//單例模式
class CentralCache
{
public:
	//提供一個全局訪問點
	static CentralCache* GetInstance()
	{
		return &_sInst;
	}
private:
	SpanList _spanLists[NFREELISTS];
private:
	CentralCache() //構(gòu)造函數(shù)私有
	{}
	CentralCache(const CentralCache&) = delete; //防拷貝

	static CentralCache _sInst;
};

為了保證CentralCache類只能創(chuàng)建一個對象,我們需要將central cache的構(gòu)造函數(shù)和拷貝構(gòu)造函數(shù)設(shè)置為私有,或者在C++11中也可以在函數(shù)聲明的后面加上=delete進行修飾。

CentralCache類當中還需要有一個CentralCache類型的靜態(tài)的成員變量,當程序運行起來后我們就立馬創(chuàng)建該對象,在此后的程序中就只有這一個單例了。

CentralCache CentralCache::_sInst;

最后central cache還需要提供一個公有的成員函數(shù),用于獲取該對象,此時在整個進程中就只會有一個central cache對象了。

慢開始反饋調(diào)節(jié)算法

當thread cache向central cache申請內(nèi)存時,central cache應(yīng)該給出多少個對象呢?這是一個值得思考的問題,如果central cache給的太少,那么thread cache在短時間內(nèi)用完了又會來申請;但如果一次性給的太多了,可能thread cache用不完也就浪費了。

鑒于此,我們這里采用了一個慢開始反饋調(diào)節(jié)算法。當thread cache向central cache申請內(nèi)存時,如果申請的是較小的對象,那么可以多給一點,但如果申請的是較大的對象,就可以少給一點。

通過下面這個函數(shù),我們就可以根據(jù)所需申請的對象的大小計算出具體給出的對象個數(shù),并且可以將給出的對象個數(shù)控制到2~512個之間。也就是說,就算thread cache要申請的對象再小,我最多一次性給出512個對象;就算thread cache要申請的對象再大,我至少一次性給出2個對象。

//管理對齊和映射等關(guān)系
class SizeClass
{
public:
	//thread cache一次從central cache獲取對象的上限
	static size_t NumMoveSize(size_t size)
	{
		assert(size > 0);
	
		//對象越小,計算出的上限越高
		//對象越大,計算出的上限越低
		int num = MAX_BYTES / size;
		if (num < 2)
			num = 2;
		if (num > 512)
			num = 512;
	
		return num;
	}
};

但就算申請的是小對象,一次性給出512個也是比較多的,基于這個原因,我們可以在FreeList結(jié)構(gòu)中增加一個叫做_maxSize的成員變量,該變量的初始值設(shè)置為1,并且提供一個公有成員函數(shù)用于獲取這個變量。也就是說,現(xiàn)在thread cache中的每個自由鏈表都會有一個自己的_maxSize。

//管理切分好的小對象的自由鏈表
class FreeList
{
public:
	size_t& MaxSize()
	{
		return _maxSize;
	}

private:
	void* _freeList = nullptr; //自由鏈表
	size_t _maxSize = 1;
};

此時當thread cache申請對象時,我們會比較_maxSize和計算得出的值,取出其中的較小值作為本次申請對象的個數(shù)。此外,如果本次采用的是_maxSize的值,那么還會將thread cache中該自由鏈表的_maxSize的值進行加一。

因此,thread cache第一次向central cache申請某大小的對象時,申請到的都是一個,但下一次thread cache再向central cache申請同樣大小的對象時,因為該自由鏈表中的_maxSize增加了,最終就會申請到兩個。直到該自由鏈表中_maxSize的值,增長到超過計算出的值后就不會繼續(xù)增長了,此后申請到的對象個數(shù)就是計算出的個數(shù)。(這有點像網(wǎng)絡(luò)中擁塞控制的機制)

從中心緩存獲取對象

每次thread cache向central cache申請對象時,我們先通過慢開始反饋調(diào)節(jié)算法計算出本次應(yīng)該申請的對象的個數(shù),然后再向central cache進行申請。

如果thread cache最終申請到對象的個數(shù)就是一個,那么直接將該對象返回即可。為什么需要返回一個申請到的對象呢?因為thread cache要向central cache申請對象,其實由于某個線程向thread cache申請對象但thread cache當中沒有,這才導(dǎo)致thread cache要向central cache申請對象。因此central cache將對象返回給thread cache后,thread cache會再將該對象返回給申請對象的線程。

但如果thread cache最終申請到的是多個對象,那么除了將第一個對象返回之外,還需要將剩下的對象掛到thread cache對應(yīng)的哈希桶當中。

//從中心緩存獲取對象
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{
	//慢開始反饋調(diào)節(jié)算法
	//1、最開始不會一次向central cache一次批量要太多,因為要太多了可能用不完
	//2、如果你不斷有size大小的內(nèi)存需求,那么batchNum就會不斷增長,直到上限
	size_t batchNum = std::min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));
	if (batchNum == _freeLists[index].MaxSize())
	{
		_freeLists[index].MaxSize() += 1;
	}
	void* start = nullptr;
	void* end = nullptr;
	size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);
	assert(actualNum >= 1); //至少有一個

	if (actualNum == 1) //申請到對象的個數(shù)是一個,則直接將這一個對象返回即可
	{
		assert(start == end);
		return start;
	}
	else //申請到對象的個數(shù)是多個,還需要將剩下的對象掛到thread cache中對應(yīng)的哈希桶中
	{
		_freeLists[index].PushRange(NextObj(start), end);
		return start;
	}
}

從中心緩存獲取一定數(shù)量的對象

這里我們要從central cache獲取n個指定大小的對象,這些對象肯定都是從central cache對應(yīng)哈希桶的某個span中取出來的,因此取出來的這n個對象是鏈接在一起的,我們只需要得到這段鏈表的頭和尾即可,這里可以采用輸出型參數(shù)進行獲取。

//從central cache獲取一定數(shù)量的對象給thread cache
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t n, size_t size)
{
	size_t index = SizeClass::Index(size);
	_spanLists[index]._mtx.lock(); //加鎖
	
	//在對應(yīng)哈希桶中獲取一個非空的span
	Span* span = GetOneSpan(_spanLists[index], size);
	assert(span); //span不為空
	assert(span->_freeList); //span當中的自由鏈表也不為空

	//從span中獲取n個對象
	//如果不夠n個,有多少拿多少
	start = span->_freeList;
	end = span->_freeList;
	size_t actualNum = 1;
	while (NextObj(end)&&n - 1)
	{
		end = NextObj(end);
		actualNum++;
		n--;
	}
	span->_freeList = NextObj(end); //取完后剩下的對象繼續(xù)放到自由鏈表
	NextObj(end) = nullptr; //取出的一段鏈表的表尾置空
	span->_useCount += actualNum; //更新被分配給thread cache的計數(shù)

	_spanLists[index]._mtx.unlock(); //解鎖
	return actualNum;
}

由于central cache是所有線程共享的,所以我們在訪問central cache中的哈希桶時,需要先給對應(yīng)的哈希桶加上桶鎖,在獲取到對象后再將桶鎖解掉。

在向central cache獲取對象時,先是在central cache對應(yīng)的哈希桶中獲取到一個非空的span,然后從這個span的自由鏈表中取出n個對象即可,但可能這個非空的span的自由鏈表當中對象的個數(shù)不足n個,這時該自由鏈表當中有多少個對象就給多少就行了。

也就是說,thread cache實際從central cache獲得的對象的個數(shù)可能與我們傳入的n值是不一樣的,因此我們需要統(tǒng)計本次申請過程中,實際thread cache獲取到的對象個數(shù),然后根據(jù)該值及時更新這個span中的小對象被分配給thread cache的計數(shù)。

需要注意的是,雖然我們實際申請到對象的個數(shù)可能比n要小,但這并不會產(chǎn)生任何影響。因為thread cache的本意就是向central cache申請一個對象,我們之所以要一次多申請一些對象,是因為這樣一來下次線程再申請相同大小的對象時就可以直接在thread cache里面獲取了,而不用再向central cache申請對象。

插入一段范圍的對象到自由鏈表

此外,如果thread cache最終從central cache獲取到的對象個數(shù)是大于一的,那么我們還需要將剩下的對象插入到thread cache中對應(yīng)的哈希桶中,為了能讓自由鏈表支持插入一段范圍的對象,我們還需要在FreeList類中增加一個對應(yīng)的成員函數(shù)。

//管理切分好的小對象的自由鏈表
class FreeList
{
public:
	//插入一段范圍的對象到自由鏈表
	void PushRange(void* start, void* end)
	{
		assert(start);
		assert(end);

		//頭插
		NextObj(end) = _freeList;
		_freeList = start;
	}
private:
	void* _freeList = nullptr; //自由鏈表
	size_t _maxSize = 1;
};

pagecache

pagecache整體設(shè)計

page cache與central cache結(jié)構(gòu)的相同之處

page cache與central cache一樣,它們都是哈希桶的結(jié)構(gòu),并且page cache的每個哈希桶中里掛的也是一個個的span,這些span也是按照雙鏈表的結(jié)構(gòu)鏈接起來的。

page cache與central cache結(jié)構(gòu)的不同之處

首先,central cache的映射規(guī)則與thread cache保持一致,而page cache的映射規(guī)則與它們都不相同。page cache的哈希桶映射規(guī)則采用的是直接定址法,比如1號桶掛的都是1頁的span,2號桶掛的都是2頁的span,以此類推。

其次,central cache每個桶中的span被切成了一個個對應(yīng)大小的對象,以供thread cache申請。而page cache當中的span是沒有被進一步切小的,因為page cache服務(wù)的是central cache,當central cache沒有span時,向page cache申請的是某一固定頁數(shù)的span,而如何切分申請到的這個span就應(yīng)該由central cache自己來決定。

在這里插入圖片描述

至于page cache當中究竟有多少個桶,這就要看你最大想掛幾頁的span了,這里我們就最大掛128頁的span,為了讓桶號與頁號對應(yīng)起來,我們可以將第0號桶空出來不用,因此我們需要將哈希桶的個數(shù)設(shè)置為129。

//page cache中哈希桶的個數(shù)
static const size_t NPAGES = 129;

為什么這里最大掛128頁的span呢?因為線程申請單個對象最大是256KB,而128頁可以被切成4個256KB的對象,因此是足夠的。當然,如果你想在page cache中掛更大的span也是可以的,根據(jù)具體的需求進行設(shè)置就行了。

在page cache獲取一個n頁的span的過程

如果central cache要獲取一個n頁的span,那我們就可以在page cache的第n號桶中取出一個span返回給central cache即可,但如果第n號桶中沒有span了,這時我們并不是直接轉(zhuǎn)而向堆申請一個n頁的span,而是要繼續(xù)在后面的桶當中尋找span。

直接向堆申請以頁為單位的內(nèi)存時,我們應(yīng)該盡量申請大塊一點的內(nèi)存塊,因為此時申請到的內(nèi)存是連續(xù)的,當線程需要內(nèi)存時我們可以將其切小后分配給線程,而當線程將內(nèi)存釋放后我們又可以將其合并成大塊的連續(xù)內(nèi)存。如果我們向堆申請內(nèi)存時是小塊小塊的申請的,那么我們申請到的內(nèi)存就不一定是連續(xù)的了。

因此,當?shù)趎號桶中沒有span時,我們可以繼續(xù)找第n+1號桶,因為我們可以將n+1頁的span切分成一個n頁的span和一個1頁的span,這時我們就可以將n頁的span返回,而將切分后1頁的span掛到1號桶中。但如果后面的桶當中都沒有span,這時我們就只能向堆申請一個128頁的內(nèi)存塊,并將其用一個span結(jié)構(gòu)管理起來,然后將128頁的span切分成n頁的span和128-n頁的span,其中n頁的span返回給central cache,而128-n頁的span就掛到第128-n號桶中。

也就是說,我們每次向堆申請的都是128頁大小的內(nèi)存塊,central cache要的這些span實際都是由128頁的span切分出來的。

page cache的實現(xiàn)方式

當每個線程的thread cache沒有內(nèi)存時都會向central cache申請,此時多個線程的thread cache如果訪問的不是central cache的同一個桶,那么這些線程是可以同時進行訪問的。這時central cache的多個桶就可能同時向page cache申請內(nèi)存的,所以page cache也是存在線程安全問題的,因此在訪問page cache時也必須要加鎖。

但是在page cache這里我們不能使用桶鎖,因為當central cache向page cache申請內(nèi)存時,page cache可能會將其他桶當中大頁的span切小后再給central cache。此外,當central cache將某個span歸還給page cache時,page cache也會嘗試將該span與其他桶當中的span進行合并。

也就是說,在訪問page cache時,我們可能需要訪問page cache中的多個桶,如果page cache用桶鎖就會出現(xiàn)大量頻繁的加鎖和解鎖,導(dǎo)致程序的效率低下。因此我們在訪問page cache時使用沒有使用桶鎖,而是用一個大鎖將整個page cache給鎖住。

而thread cache在訪問central cache時,只需要訪問central cache中對應(yīng)的哈希桶就行了,因為central cache的每個哈希桶中的span都被切分成了對應(yīng)大小,thread cache只需要根據(jù)自己所需對象的大小訪問central cache中對應(yīng)的哈希桶即可,不會訪問其他哈希桶,因此central cache可以用桶鎖。

此外,page cache在整個進程中也是只能存在一個的,因此我們也需要將其設(shè)置為單例模式。

//單例模式
class PageCache
{
public:
	//提供一個全局訪問點
	static PageCache* GetInstance()
	{
		return &_sInst;
	}
private:
	SpanList _spanLists[NPAGES];
	std::mutex _pageMtx; //大鎖
private:
	PageCache() //構(gòu)造函數(shù)私有
	{}
	PageCache(const PageCache&) = delete; //防拷貝

	static PageCache _sInst;
};

當程序運行起來后我們就立馬創(chuàng)建該對象即可。

PageCache PageCache::_sInst;

pagecache中獲取Span

獲取一個非空的span

thread cache向central cache申請對象時,central cache需要先從對應(yīng)的哈希桶中獲取到一個非空的span,然后從這個非空的span中取出若干對象返回給thread cache。那central cache到底是如何從對應(yīng)的哈希桶中,獲取到一個非空的span的呢?

首先當然是先遍歷central cache對應(yīng)哈希桶當中的雙鏈表,如果該雙鏈表中有非空的span,那么直接將該span進行返回即可。為了方便遍歷這個雙鏈表,我們可以模擬迭代器的方式,給SpanList類提供Begin和End成員函數(shù),分別用于獲取雙鏈表中的第一個span和最后一個span的下一個位置,也就是頭結(jié)點。

//帶頭雙向循環(huán)鏈表
class SpanList
{
public:
	Span* Begin()
	{
		return _head->_next;
	}
	Span* End()
	{
		return _head;
	}
private:
	Span* _head;
public:
	std::mutex _mtx; //桶鎖
};

但如果遍歷雙鏈表后發(fā)現(xiàn)雙鏈表中沒有span,或該雙鏈表中的span都為空,那么此時central cache就需要向page cache申請內(nèi)存塊了。

那具體是向page cache申請多大的內(nèi)存塊呢?我們可以根據(jù)具體所需對象的大小來決定,就像之前我們根據(jù)對象的大小計算出,thread cache一次向central cache申請對象的個數(shù)上限,現(xiàn)在我們是根據(jù)對象的大小計算出,central cache一次應(yīng)該向page cache申請幾頁的內(nèi)存塊。

我們可以先根據(jù)對象的大小計算出,thread cache一次向central cache申請對象的個數(shù)上限,然后將這個上限值乘以單個對象的大小,就算出了具體需要多少字節(jié),最后再將這個算出來的字節(jié)數(shù)轉(zhuǎn)換為頁數(shù),如果轉(zhuǎn)換后不夠一頁,那么我們就申請一頁,否則轉(zhuǎn)換出來是幾頁就申請幾頁。也就是說,central cache向page cache申請內(nèi)存時,要求申請到的內(nèi)存盡量能夠滿足thread cache向central cache申請時的上限。

//管理對齊和映射等關(guān)系
class SizeClass
{
public:
	//central cache一次向page cache獲取多少頁
	static size_t NumMovePage(size_t size)
	{
		size_t num = NumMoveSize(size); //計算出thread cache一次向central cache申請對象的個數(shù)上限
		size_t nPage = num*size; //num個size大小的對象所需的字節(jié)數(shù)

		nPage >>= PAGE_SHIFT; //將字節(jié)數(shù)轉(zhuǎn)換為頁數(shù)
		if (nPage == 0) //至少給一頁
			nPage = 1;

		return nPage;
	}
};

代碼中的PAGE_SHIFT代表頁大小轉(zhuǎn)換偏移,我們這里以頁的大小為8K為例,PAGE_SHIFT的值就是13。

//頁大小轉(zhuǎn)換偏移,即一頁定義為2^13,也就是8KB
static const size_t PAGE_SHIFT = 13;

需要注意的是,當central cache申請到若干頁的span后,還需要將這個span切成一個個對應(yīng)大小的對象掛到該span的自由鏈表當中。

如何找到一個span所管理的內(nèi)存塊呢?首先需要計算出該span的起始地址,我們可以用這個span的起始頁號乘以一頁的大小即可得到這個span的起始地址,然后用這個span的頁數(shù)乘以一頁的大小就可以得到這個span所管理的內(nèi)存塊的大小,用起始地址加上內(nèi)存塊的大小即可得到這塊內(nèi)存塊的結(jié)束位置。

明確了這塊內(nèi)存的起始和結(jié)束位置后,我們就可以進行切分了。根據(jù)所需對象的大小,每次從大塊內(nèi)存切出一塊固定大小的內(nèi)存塊尾插到span的自由鏈表中即可。

為什么是尾插呢?因為我們?nèi)绻菍⑶泻玫膶ο笪膊宓阶杂涉湵恚@些對象看起來是按照鏈式結(jié)構(gòu)鏈接起來的,而實際它們在物理上是連續(xù)的,這時當我們把這些連續(xù)內(nèi)存分配給某個線程使用時,可以提高該線程的CPU緩存利用率。

//獲取一個非空的span
Span* CentralCache::GetOneSpan(SpanList& spanList, size_t size)
{
	//1、先在spanList中尋找非空的span
	Span* it = spanList.Begin();
	while (it != spanList.End())
	{
		if (it->_freeList != nullptr)
		{
			return it;
		}
		else
		{
			it = it->_next;
		}
	}

	//2、spanList中沒有非空的span,只能向page cache申請
	Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
	//計算span的大塊內(nèi)存的起始地址和大塊內(nèi)存的大?。ㄗ止?jié)數(shù))
	char* start = (char*)(span->_pageId << PAGE_SHIFT);
	size_t bytes = span->_n << PAGE_SHIFT;

	//把大塊內(nèi)存切成size大小的對象鏈接起來
	char* end = start + bytes;
	//先切一塊下來去做尾,方便尾插
	span->_freeList = start;
	start += size;
	void* tail = span->_freeList;
	//尾插
	while (start < end)
	{
		NextObj(tail) = start;
		tail = NextObj(tail);
		start += size;
	}
	NextObj(tail) = nullptr; //尾的指向置空
	
	//將切好的span頭插到spanList
	spanList.PushFront(span);

	return span;
}

需要注意的是,當我們把span切好后,需要將這個切好的span掛到central cache的對應(yīng)哈希桶中。因此SpanList類還需要提供一個接口,用于將一個span插入到該雙鏈表中。這里我們選擇的是頭插,這樣當central cache下一次從該雙鏈表中獲取非空span時,一來就能找到。

由于SpanList類之前實現(xiàn)了Insert和Begin函數(shù),這里實現(xiàn)雙鏈表頭插就非常簡單,直接在雙鏈表的Begin位置進行Insert即可。

//帶頭雙向循環(huán)鏈表
class SpanList
{
public:
	void PushFront(Span* span)
	{
		Insert(Begin(), span);
	}
private:
	Span* _head;
public:
	std::mutex _mtx; //桶鎖
};

獲取一個k頁的span

當我們調(diào)用上述的GetOneSpan從central cache的某個哈希桶獲取一個非空的span時,如果遍歷哈希桶中的雙鏈表后發(fā)現(xiàn)雙鏈表中沒有span,或該雙鏈表中的span都為空,那么此時central cache就需要向page cache申請若干頁的span了,下面我們就來說說如何從page cache獲取一個k頁的span。

因為page cache是直接按照頁數(shù)進行映射的,因此我們要從page cache獲取一個k頁的span,就應(yīng)該直接先去找page cache的第k號桶,如果第k號桶中有span,那我們直接頭刪一個span返回給central cache就行了。所以我們這里需要再給SpanList類添加對應(yīng)的Empty和PopFront函數(shù)。

//帶頭雙向循環(huán)鏈表
class SpanList
{
public:
	bool Empty()
	{
		return _head == _head->_next;
	}
	Span* PopFront()
	{
		Span* front = _head->_next;
		Erase(front);
		return front;
	}
private:
	Span* _head;
public:
	std::mutex _mtx; //桶鎖
};

如果page cache的第k號桶中沒有span,我們就應(yīng)該繼續(xù)找后面的桶,只要后面任意一個桶中有一個n頁span,我們就可以將其切分成一個k頁的span和一個n-k頁的span,然后將切出來k頁的span返回給central cache,再將n-k頁的span掛到page cache的第n-k號桶即可。

但如果后面的桶中也都沒有span,此時我們就需要向堆申請一個128頁的span了,在向堆申請內(nèi)存時,直接調(diào)用我們封裝的SystemAlloc函數(shù)即可。

需要注意的是,向堆申請內(nèi)存后得到的是這塊內(nèi)存的起始地址,此時我們需要將該地址轉(zhuǎn)換為頁號。由于我們向堆申請內(nèi)存時都是按頁進行申請的,因此我們直接將該地址除以一頁的大小即可得到對應(yīng)的頁號。

//獲取一個k頁的span
Span* PageCache::NewSpan(size_t k)
{
	assert(k > 0 && k < NPAGES);
	//先檢查第k個桶里面有沒有span
	if (!_spanLists[k].Empty())
	{
		return _spanLists[k].PopFront();
	}
	//檢查一下后面的桶里面有沒有span,如果有可以將其進行切分
	for (size_t i = k + 1; i < NPAGES; i++)
	{
		if (!_spanLists[i].Empty())
		{
			Span* nSpan = _spanLists[i].PopFront();
			Span* kSpan = new Span;
			//在nSpan的頭部切k頁下來
			kSpan->_pageId = nSpan->_pageId;
			kSpan->_n = k;

			nSpan->_pageId += k;
			nSpan->_n -= k;
			//將剩下的掛到對應(yīng)映射的位置
			_spanLists[nSpan->_n].PushFront(nSpan);
			return kSpan;
		}
	}
	//走到這里說明后面沒有大頁的span了,這時就向堆申請一個128頁的span
	Span* bigSpan = new Span;
	void* ptr = SystemAlloc(NPAGES - 1);
	bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
	bigSpan->_n = NPAGES - 1;

	_spanLists[bigSpan->_n].PushFront(bigSpan);

	//盡量避免代碼重復(fù),遞歸調(diào)用自己
	return NewSpan(k);
}

這里說明一下,當我們向堆申請到128頁的span后,需要將其切分成k頁的span和128-k頁的span,但是為了盡量避免出現(xiàn)重復(fù)的代碼,我們最好不要再編寫對應(yīng)的切分代碼。我們可以先將申請到的128頁的span掛到page cache對應(yīng)的哈希桶中,然后再遞歸調(diào)用該函數(shù)就行了,此時在往后找span時就一定會在第128號桶中找到該span,然后進行切分。

這里其實有一個問題:當central cache向page cache申請內(nèi)存時,central cache對應(yīng)的哈希桶是處于加鎖的狀態(tài)的,那在訪問page cache之前我們應(yīng)不應(yīng)該把central cache對應(yīng)的桶鎖解掉呢?

這里建議在訪問page cache前,先把central cache對應(yīng)的桶鎖解掉。雖然此時central cache的這個桶當中是沒有內(nèi)存供其他thread cache申請的,但thread cache除了申請內(nèi)存還會釋放內(nèi)存,如果在訪問page cache前將central cache對應(yīng)的桶鎖解掉,那么此時當其他thread cache想要歸還內(nèi)存到central cache的這個桶時就不會被阻塞。

因此在調(diào)用NewSpan函數(shù)之前,我們需要先將central cache對應(yīng)的桶鎖解掉,然后再將page cache的大鎖加上,當申請到k頁的span后,我們需要將page cache的大鎖解掉,但此時我們不需要立刻獲取到central cache中對應(yīng)的桶鎖。因為central cache拿到k頁的span后還會對其進行切分操作,因此我們可以在span切好后需要將其掛到central cache對應(yīng)的桶上時,再獲取對應(yīng)的桶鎖。

這里為了讓代碼清晰一點,只寫出了加鎖和解鎖的邏輯,我們只需要將這些邏輯添加到之前實現(xiàn)的GetOneSpan函數(shù)的對應(yīng)位置即可。

spanList._mtx.unlock(); //解桶鎖
PageCache::GetInstance()->_pageMtx.lock(); //加大鎖

//從page cache申請k頁的span

PageCache::GetInstance()->_pageMtx.unlock(); //解大鎖

//進行span的切分...

spanList._mtx.lock(); //加桶鎖

//將span掛到central cache對應(yīng)的哈希桶

申請內(nèi)存過程聯(lián)調(diào)

ConcurrentAlloc函數(shù)

在將thread cache、central cache以及page cache的申請流程寫通了之后,我們就可以向外提供一個ConcurrentAlloc函數(shù),用于申請內(nèi)存塊。每個線程第一次調(diào)用該函數(shù)時會通過TLS獲取到自己專屬的thread cache對象,然后每個線程就可以通過自己對應(yīng)的thread cache申請對象了。

static void* ConcurrentAlloc(size_t size)
{
	//通過TLS,每個線程無鎖的獲取自己專屬的ThreadCache對象
	if (pTLSThreadCache == nullptr)
	{
		pTLSThreadCache = new ThreadCache;
	}
	return pTLSThreadCache->Allocate(size);
}

這里說一下編譯時會出現(xiàn)的問題,在C++的algorithm頭文件中有一個min函數(shù),這是一個函數(shù)模板,而在Windows.h頭文件中也有一個min,這是一個宏。由于調(diào)用函數(shù)模板時需要進行參數(shù)類型的推演,因此當我們調(diào)用min函數(shù)時,編譯器會優(yōu)先匹配Windows.h當中以宏的形式實現(xiàn)的min,此時當我們以std::min的形式調(diào)用min函數(shù)時就會產(chǎn)生報錯,這就是沒有用命名空間進行封裝的壞處,這時我們只能選擇將std::去掉,讓編譯器調(diào)用Windows.h當中的min。

申請內(nèi)存過程聯(lián)調(diào)測試一

由于在多線程場景下調(diào)試觀察起來非常麻煩,這里就先不考慮多線程場景,看看在單線程場景下代碼的執(zhí)行邏輯是否符合我們的預(yù)期,其次,我們這里就只簡單觀察在一個桶當中的內(nèi)存申請就行了。

下面該線程進行了三次內(nèi)存申請,這三次內(nèi)存申請的字節(jié)數(shù)最終都對齊到了8,此時當線程申請內(nèi)存時就只會訪問到thread cache的第0號桶。

void* p1 = ConcurrentAlloc(6);
void* p2 = ConcurrentAlloc(8);
void* p3 = ConcurrentAlloc(1);

當線程第一次申請內(nèi)存時,該線程需要通過TLS獲取到自己專屬的thread cache對象,然后通過這個thread cache對象進行內(nèi)存申請。

在這里插入圖片描述

在申請內(nèi)存時通過計算索引到了thread cache的第0號桶,但此時thread cache的第0號桶中是沒有對象的,因此thread cache需要向central cache申請內(nèi)存塊。

在這里插入圖片描述

在向central cache申請內(nèi)存塊前,首先通過NumMoveSize函數(shù)計算得出,thread cache一次最多可向central cache申請8字節(jié)大小對象的個數(shù)是512,但由于我們采用的是慢開始算法,因此還需要將上限值與對應(yīng)自由鏈表的_maxSize的值進行比較,而此時對應(yīng)自由鏈表_maxSize的值是1,所以最終得出本次thread cache向central cache申請8字節(jié)對象的個數(shù)是1個。

并且在此之后會將該自由鏈表中_maxSize的值進行自增,下一次thread cache再向central cache申請8字節(jié)對象時最終申請對象的個數(shù)就會是2個了。

在這里插入圖片描述

在thread cache向central cache申請對象之前,需要先將central cache的0號桶的鎖加上,然后再從該桶獲取一個非空的span。

在這里插入圖片描述

在central cache的第0號桶獲取非空span時,先遍歷對應(yīng)的span雙鏈表,看看有沒有非空的span,但此時肯定是沒有的,因此在這個過程中我們無法找到一個非空的span。

在這里插入圖片描述

那么此時central cache就需要向page cache申請內(nèi)存了,但在此之前需要先把central cache第0號桶的鎖解掉,然后再將page cache的大鎖給加上,之后才能向page cache申請內(nèi)存。

在這里插入圖片描述

在向page cache申請內(nèi)存時,由于central cache一次給thread cache8字節(jié)對象的上限是512,對應(yīng)就需要4096字節(jié),所需字節(jié)數(shù)不足一頁就按一頁算,所以這里central cache就需要向page cache申請一頁的內(nèi)存塊。

在這里插入圖片描述

但此時page cache的第1個桶以及之后的桶當中都是沒有span的,因此page cache需要直接向堆申請一個128頁的span。

在這里插入圖片描述

這里通過監(jiān)視窗口可以看到,用于管理申請到的128頁內(nèi)存的span信息。

在這里插入圖片描述

我們可以順便驗證一下,按頁向堆申請的內(nèi)存塊的起始地址和頁號之間是可以相互轉(zhuǎn)換的。

在這里插入圖片描述

現(xiàn)在將申請到的128頁的span插入到page cache的第128號桶當中,然后再調(diào)用一次NewSpan,在這次調(diào)用的時候,雖然在1號桶當中沒有span,但是在往后找的過程中就一定會在第128號桶找到一個span。

在這里插入圖片描述

此時我們就可以把這個128頁的span拿出來,切分成1頁的span和127頁的span,將1頁的span返回給central cache,而把127頁的span掛到page cache的第127號桶即可。

在這里插入圖片描述

從page cache返回后,就可以把page cache的大鎖解掉了,但緊接著還要將獲取到的1頁的span進行切分,因此這里沒有立刻重新加上central cache對應(yīng)的桶鎖。

在這里插入圖片描述

在進行切分的時候,先通過該span的起始頁號得到該span的起始地址,然后通過該span的頁數(shù)得到該span所管理內(nèi)存塊的總的字節(jié)數(shù)。

在這里插入圖片描述

在確定內(nèi)存塊的開始和結(jié)束后,就可以將其切分成一個個8字節(jié)大小的對象掛到該span的自由鏈表中了。在調(diào)試過程中通過內(nèi)存監(jiān)視窗口可以看到,切分出來的每個8字節(jié)大小的對象的前四個字節(jié)存儲的都是下一個8字節(jié)對象的起始地址。

在這里插入圖片描述

當切分結(jié)束后再獲取central cache第0號桶的桶鎖,然后將這個切好的span插入到central cache的第0號桶中,最后再將這個非空的span返回,此時就獲取到了一個非空的span。

在這里插入圖片描述

由于thread cache只向central cache申請了一個對象,因此拿到這個非空的span后,直接從這個span里面取出一個對象即可,此時該span的_useCount也由0變成了1。

在這里插入圖片描述

由于此時thread cache實際只向central cache申請到了一個對象,因此直接將這個對象返回給線程即可。

在這里插入圖片描述

當線程第二次申請內(nèi)存塊時就不會再創(chuàng)建thread cache了,因為第一次申請時就已經(jīng)創(chuàng)建好了,此時該線程直接獲取到對應(yīng)的thread cache進行內(nèi)存塊申請即可。

在這里插入圖片描述

當該線程第二次申請8字節(jié)大小的對象時,此時thread cache的0號桶中還是沒有對象的,因為第一次thread cache只向central cache申請了一個8字節(jié)對象,因此這次申請時還需要再向central cache申請對象。

在這里插入圖片描述

這時thread cache向central cache申請對象時,thread cache第0號桶中自由鏈表的_maxSize已經(jīng)慢增長到2了,所以這次在向central cache申請對象時就會申請2個。如果下一次thread cache再向central cache申請8字節(jié)大小的對象,那么central cache會一次性給thread cache3個,這就是所謂的慢增長。

在這里插入圖片描述

但由于第一次central cache向page cache申請了一頁的內(nèi)存塊,并將其切成了1024個8字節(jié)大小的對象,因此這次thread cache向central cache申請2兩個8字節(jié)的對象時,central cache的第0號桶當中是有對象的,直接返回兩個給thread cache即可,而不用再向page cache申請內(nèi)存了。

但線程實際申請的只是一個8字節(jié)對象,因此thread cache除了將一個對象返回之外,還需要將剩下的一個對象掛到thread cache的第0號桶當中。

在這里插入圖片描述

這樣一來,當線程第三次申請1字節(jié)的內(nèi)存時,由于1字節(jié)對齊后也是8字節(jié),此時thread cache也就不需要再向central cache申請內(nèi)存塊了,直接將第0號桶當中之前剩下的一個8字節(jié)對象返回即可。

在這里插入圖片描述

申請內(nèi)存過程聯(lián)調(diào)測試二

為了進一步測試代碼的正確性,我們可以做這樣一個測試:讓線程申請1024次8字節(jié)的對象,然后通過調(diào)試觀察在第1025次申請時,central cache是否會再向page cache申請內(nèi)存塊。

for (size_t i = 0; i < 1024; i++)
{
	void* p1 = ConcurrentAlloc(6);
}
void* p2 = ConcurrentAlloc(6);

因為central cache第一次就是向page cache申請的一頁內(nèi)存,這一頁內(nèi)存被切成了1024個8字節(jié)大小的對象,當這1024個對象全部被申請之后,再申請8字節(jié)大小的對象時central cache當中就沒有對象了,此時就應(yīng)該向page cache申請內(nèi)存塊。

通過調(diào)試我們可以看到,第1025次申請8字節(jié)大小的對象時,central cache第0號桶中的這個span的_useCount已經(jīng)增加到了1024,也就是說這1024個對象都已經(jīng)被線程申請了,此時central cache就需要再向page cache申請一頁的span來進行切分了。

在這里插入圖片描述

而這次central cache在向page cache申請一頁的內(nèi)存時,page cache就是將127頁span切分成了1頁的span和126頁的span了,然后central cache拿到這1頁的span后,又會將其切分成1024塊8字節(jié)大小的內(nèi)存塊以供thread cache申請。

在這里插入圖片描述

threadcache回收內(nèi)存

當某個線程申請的對象不用了,可以將其釋放給thread cache,然后thread cache將該對象插入到對應(yīng)哈希桶的自由鏈表當中即可。

但是隨著線程不斷的釋放,對應(yīng)自由鏈表的長度也會越來越長,這些內(nèi)存堆積在一個thread cache中就是一種浪費,我們應(yīng)該將這些內(nèi)存還給central cache,這樣一來,這些內(nèi)存對其他線程來說也是可申請的,因此當thread cache某個桶當中的自由鏈表太長時我們可以進行一些處理。

如果thread cache某個桶當中自由鏈表的長度超過它一次批量向central cache申請的對象個數(shù),那么此時我們就要把該自由鏈表當中的這些對象還給central cache。

//釋放內(nèi)存對象
void ThreadCache::Deallocate(void* ptr, size_t size)
{
	assert(ptr);
	assert(size <= MAX_BYTES);

	//找出對應(yīng)的自由鏈表桶將對象插入
	size_t index = SizeClass::Index(size);
	_freeLists[index].Push(ptr);

	//當自由鏈表長度大于一次批量申請的對象個數(shù)時就開始還一段list給central cache
	if (_freeLists[index].Size() >= _freeLists[index].MaxSize())
	{
		ListTooLong(_freeLists[index], size);
	}
}

當自由鏈表的長度大于一次批量申請的對象時,我們具體的做法就是,從該自由鏈表中取出一次批量個數(shù)的對象,然后將取出的這些對象還給central cache中對應(yīng)的span即可。

//釋放對象導(dǎo)致鏈表過長,回收內(nèi)存到中心緩存
void ThreadCache::ListTooLong(FreeList& list, size_t size)
{
	void* start = nullptr;
	void* end = nullptr;
	//從list中取出一次批量個數(shù)的對象
	list.PopRange(start, end, list.MaxSize());
	
	//將取出的對象還給central cache中對應(yīng)的span
	CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}

從上述代碼可以看出,F(xiàn)reeList類需要支持用Size函數(shù)獲取自由鏈表中對象的個數(shù),還需要支持用PopRange函數(shù)從自由鏈表中取出指定個數(shù)的對象。因此我們需要給FreeList類增加一個對應(yīng)的PopRange函數(shù),然后再增加一個_size成員變量,該成員變量用于記錄當前自由鏈表中對象的個數(shù),當我們向自由鏈表插入或刪除對象時,都應(yīng)該更新_size的值。

//管理切分好的小對象的自由鏈表
class FreeList
{
public:
	//將釋放的對象頭插到自由鏈表
	void Push(void* obj)
	{
		assert(obj);

		//頭插
		NextObj(obj) = _freeList;
		_freeList = obj;
		_size++;
	}
	//從自由鏈表頭部獲取一個對象
	void* Pop()
	{
		assert(_freeList);

		//頭刪
		void* obj = _freeList;
		_freeList = NextObj(_freeList);
		_size--;

		return obj;
	}
	//插入一段范圍的對象到自由鏈表
	void PushRange(void* start, void* end, size_t n)
	{
		assert(start);
		assert(end);

		//頭插
		NextObj(end) = _freeList;
		_freeList = start;
		_size += n;
	}
	//從自由鏈表獲取一段范圍的對象
	void PopRange(void*& start, void*& end, size_t n)
	{
		assert(n <= _size);

		//頭刪
		start = _freeList;
		end = start;
		for (size_t i = 0; i < n - 1;i++)
		{
			end = NextObj(end);
		}
		_freeList = NextObj(end); //自由鏈表指向end的下一個對象
		NextObj(end) = nullptr; //取出的一段鏈表的表尾置空
		_size -= n;
	}
	bool Empty()
	{
		return _freeList == nullptr;
	}
	size_t& MaxSize()
	{
		return _maxSize;
	}
	size_t Size()
	{
		return _size;
	}
private:
	void* _freeList = nullptr; //自由鏈表
	size_t _maxSize = 1;
	size_t _size = 0;
};

而對于FreeList類當中的PushRange成員函數(shù),我們最好也像PopRange一樣給它增加一個參數(shù),表示插入對象的個數(shù),不然我們這時還需要通過遍歷統(tǒng)計插入對象的個數(shù)。

因此之前在調(diào)用PushRange的地方就需要修改一下,而我們實際就在一個地方調(diào)用過PushRange函數(shù),并且此時插入對象的個數(shù)也是很容易知道的。當時thread cache從central cache獲取了actualNum個對象,將其中的一個返回給了申請對象的線程,剩下的actualNum-1個掛到了thread cache對應(yīng)的桶當中,所以這里插入對象的個數(shù)就是actualNum-1。

_freeLists[index].PushRange(NextObj(start), end, actualNum - 1);

說明一下:
當thread cache的某個自由鏈表過長時,我們實際就是把這個自由鏈表當中全部的對象都還給central cache了,但這里在設(shè)計PopRange接口時還是設(shè)計的是取出指定個數(shù)的對象,因為在某些情況下當自由鏈表過長時,我們可能并不一定想把鏈表中全部的對象都取出來還給central cache,這樣設(shè)計就是為了增加代碼的可修改性。

其次,當我們判斷thread cache是否應(yīng)該還對象給central cache時,還可以綜合考慮每個thread cache整體的大小。比如當某個thread cache的總占用大小超過一定閾值時,我們就將該thread cache當中的對象還一些給central cache,這樣就盡量避免了某個線程的thread cache占用太多的內(nèi)存。對于這一點,在tcmalloc當中就是考慮到了的。

centralcache回收內(nèi)存

當thread cache中某個自由鏈表太長時,會將自由鏈表當中的這些對象還給central cache中的span。

但是需要注意的是,還給central cache的這些對象不一定都是屬于同一個span的。central cache中的每個哈希桶當中可能都不止一個span,因此當我們計算出還回來的對象應(yīng)該還給central cache的哪一個桶后,還需要知道這些對象到底應(yīng)該還給這個桶當中的哪一個span。

如何根據(jù)對象的地址得到對象所在的頁號?

首先我們必須理解的是,某個頁當中的所有地址除以頁的大小都等該頁的頁號。比如我們這里假設(shè)一頁的大小是100,那么地址0~99都屬于第0頁,它們除以100都等于0,而地址100~199都屬于第1頁,它們除以100都等于1。

如何找到一個對象對應(yīng)的span?

雖然我們現(xiàn)在可以通過對象的地址得到其所在的頁號,但是我們還是不能知道這個對象到底屬于哪一個span。因為一個span管理的可能是多個頁。

為了解決這個問題,我們可以建立頁號和span之間的映射。由于這個映射關(guān)系在page cache進行span的合并時也需要用到,因此我們直接將其存放到page cache里面。這時我們就需要在PageCache類當中添加一個映射關(guān)系了,這里可以用C++當中的unordered_map進行實現(xiàn),并且添加一個函數(shù)接口,用于讓central cache獲取這里的映射關(guān)系。(下面代碼中只展示了PageCache類當中新增的成員)

//單例模式class PageCache{<!--{C}%3C!%2D%2D%20%2D%2D%3E-->public://獲取從對象到span的映射Span* MapObjectToSpan(void* obj);private:std::unordered_map<PAGE_ID, Span*> _idSpanMap;};

每當page cache分配span給central cache時,都需要記錄一下頁號和span之間的映射關(guān)系。此后當thread cache還對象給central cache時,才知道應(yīng)該具體還給哪一個span。

因此當central cache在調(diào)用NewSpan接口向page cache申請k頁的span時,page cache在返回這個k頁的span給central cache之前,應(yīng)該建立這k個頁號與該span之間的映射關(guān)系。

//獲取一個k頁的span
Span* PageCache::NewSpan(size_t k)
{
	assert(k > 0 && k < NPAGES);
	//先檢查第k個桶里面有沒有span
	if (!_spanLists[k].Empty())
	{
		Span* kSpan = _spanLists[k].PopFront();

		//建立頁號與span的映射,方便central cache回收小塊內(nèi)存時查找對應(yīng)的span
		for (PAGE_ID i = 0; i < kSpan->_n; i++)
		{
			_idSpanMap[kSpan->_pageId + i] = kSpan;
		}

		return kSpan;
	}
	//檢查一下后面的桶里面有沒有span,如果有可以將其進行切分
	for (size_t i = k + 1; i < NPAGES; i++)
	{
		if (!_spanLists[i].Empty())
		{
			Span* nSpan = _spanLists[i].PopFront();
			Span* kSpan = new Span;
			//在nSpan的頭部切k頁下來
			kSpan->_pageId = nSpan->_pageId;
			kSpan->_n = k;

			nSpan->_pageId += k;
			nSpan->_n -= k;
			//將剩下的掛到對應(yīng)映射的位置
			_spanLists[nSpan->_n].PushFront(nSpan);

			//建立頁號與span的映射,方便central cache回收小塊內(nèi)存時查找對應(yīng)的span
			for (PAGE_ID i = 0; i < kSpan->_n; i++)
			{
				_idSpanMap[kSpan->_pageId + i] = kSpan;
			}

			return kSpan;
		}
	}
	//走到這里說明后面沒有大頁的span了,這時就向堆申請一個128頁的span
	Span* bigSpan = new Span;
	void* ptr = SystemAlloc(NPAGES - 1);
	bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
	bigSpan->_n = NPAGES - 1;

	_spanLists[bigSpan->_n].PushFront(bigSpan);

	//盡量避免代碼重復(fù),遞歸調(diào)用自己
	return NewSpan(k);
}

此時我們就可以通過對象的地址找到該對象對應(yīng)的span了,直接將該對象的地址除以頁的大小得到頁號,然后在unordered_map當中找到其對應(yīng)的span即可。

//獲取從對象到span的映射
Span* PageCache::MapObjectToSpan(void* obj)
{
	PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT; //頁號
	auto ret = _idSpanMap.find(id);
	if (ret != _idSpanMap.end())
	{
		return ret->second;
	}
	else
	{
		assert(false);
		return nullptr;
	}
}

注意一下,當我們要通過某個頁號查找其對應(yīng)的span時,該頁號與其span之間的映射一定是建立過的,如果此時我們沒有在unordered_map當中找到,則說明我們之前的代碼邏輯有問題,因此當沒有找到對應(yīng)的span時可以直接用斷言結(jié)束程序,以表明程序邏輯出錯。

central cache回收內(nèi)存

這時當thread cache還對象給central cache時,就可以依次遍歷這些對象,將這些對象插入到其對應(yīng)span的自由鏈表當中,并且及時更新該span的_usseCount計數(shù)即可。

在thread cache還對象給central cache的過程中,如果central cache中某個span的_useCount減到0時,說明這個span分配出去的對象全部都還回來了,那么此時就可以將這個span再進一步還給page cache。

//將一定數(shù)量的對象還給對應(yīng)的span
void CentralCache::ReleaseListToSpans(void* start, size_t size)
{
	size_t index = SizeClass::Index(size);
	_spanLists[index]._mtx.lock(); //加鎖
	while (start)
	{
		void* next = NextObj(start); //記錄下一個
		Span* span = PageCache::GetInstance()->MapObjectToSpan(start);
		//將對象頭插到span的自由鏈表
		NextObj(start) = span->_freeList;
		span->_freeList = start;

		span->_useCount--; //更新被分配給thread cache的計數(shù)
		if (span->_useCount == 0) //說明這個span分配出去的對象全部都回來了
		{
			//此時這個span就可以再回收給page cache,page cache可以再嘗試去做前后頁的合并
			_spanLists[index].Erase(span);
			span->_freeList = nullptr; //自由鏈表置空
			span->_next = nullptr;
			span->_prev = nullptr;

			//釋放span給page cache時,使用page cache的鎖就可以了,這時把桶鎖解掉
			_spanLists[index]._mtx.unlock(); //解桶鎖
			PageCache::GetInstance()->_pageMtx.lock(); //加大鎖
			PageCache::GetInstance()->ReleaseSpanToPageCache(span);
			PageCache::GetInstance()->_pageMtx.unlock(); //解大鎖
			_spanLists[index]._mtx.lock(); //加桶鎖
		}

		start = next;
	}

	_spanLists[index]._mtx.unlock(); //解鎖
}

需要注意,如果要把某個span還給page cache,我們需要先將這個span從central cache對應(yīng)的雙鏈表中移除,然后再將該span的自由鏈表置空,因為page cache中的span是不需要切分成一個個的小對象的,以及該span的前后指針也都應(yīng)該置空,因為之后要將其插入到page cache對應(yīng)的雙鏈表中。但span當中記錄的起始頁號以及它管理的頁數(shù)是不能清除的,否則對應(yīng)內(nèi)存塊就找不到了。

并且在central cache還span給page cache時也存在鎖的問題,此時需要先將central cache中對應(yīng)的桶鎖解掉,然后再加上page cache的大鎖之后才能進入page cache進行相關(guān)操作,當處理完畢回到central cache時,除了將page cache的大鎖解掉,還需要立刻獲得central cache對應(yīng)的桶鎖,然后將還未還完對象繼續(xù)還給central cache中對應(yīng)的span。

pagecache回收內(nèi)存

如果central cache中有某個span的_useCount減到0了,那么central cache就需要將這個span還給page cache了。

這個過程看似是非常簡單的,page cache只需將還回來的span掛到對應(yīng)的哈希桶上就行了。但實際為了緩解內(nèi)存碎片的問題,page cache還需要嘗試將還回來的span與其他空閑的span進行合并。

page cache進行前后頁的合并

合并的過程可以分為向前合并和向后合并。如果還回來的span的起始頁號是num,該span所管理的頁數(shù)是n。那么在向前合并時,就需要判斷第num-1頁對應(yīng)span是否空閑,如果空閑則可以將其進行合并,并且合并后還需要繼續(xù)向前嘗試進行合并,直到不能進行合并為止。而在向后合并時,就需要判斷第num+n頁對應(yīng)的span是否空閑,如果空閑則可以將其進行合并,并且合并后還需要繼續(xù)向后嘗試進行合并,直到不能進行合并為止。

因此page cache在合并span時,是需要通過頁號獲取到對應(yīng)的span的,這就是我們要把頁號與span之間的映射關(guān)系存儲到page cache的原因。

但需要注意的是,當我們通過頁號找到其對應(yīng)的span時,這個span此時可能掛在page cache,也可能掛在central cache。而在合并時我們只能合并掛在page cache的span,因為掛在central cache的span當中的對象正在被其他線程使用。

可是我們不能通過span結(jié)構(gòu)當中的_useCount成員,來判斷某個span到底是在central cache還是在page cache。因為當central cache剛向page cache申請到一個span時,這個span的_useCount就是等于0的,這時可能當我們正在對該span進行切分的時候,page cache就把這個span拿去進行合并了,這顯然是不合理的。

鑒于此,我們可以在span結(jié)構(gòu)中再增加一個_isUse成員,用于標記這個span是否正在被使用,而當一個span結(jié)構(gòu)被創(chuàng)建時我們默認該span是沒有被使用的。

//管理以頁為單位的大塊內(nèi)存
struct Span
{
	PAGE_ID _pageId = 0;        //大塊內(nèi)存起始頁的頁號
	size_t _n = 0;              //頁的數(shù)量

	Span* _next = nullptr;      //雙鏈表結(jié)構(gòu)
	Span* _prev = nullptr;

	size_t _useCount = 0;       //切好的小塊內(nèi)存,被分配給thread cache的計數(shù)
	void* _freeList = nullptr;  //切好的小塊內(nèi)存的自由鏈表

	bool _isUse = false;        //是否在被使用
};

因此當central cache向page cache申請到一個span時,需要立即將該span的_isUse改為true。

span->_isUse = true;

而當central cache將某個span還給page cache時,也就需要將該span的_isUse改成false。

span->_isUse = false;

由于在合并page cache當中的span時,需要通過頁號找到其對應(yīng)的span,而一個span是在被分配給central cache時,才建立的各個頁號與span之間的映射關(guān)系,因此page cache當中的span也需要建立頁號與span之間的映射關(guān)系。

與central cache中的span不同的是,在page cache中,只需建立一個span的首尾頁號與該span之間的映射關(guān)系。因為當一個span在嘗試進行合并時,如果是往前合并,那么只需要通過一個span的尾頁找到這個span,如果是向后合并,那么只需要通過一個span的首頁找到這個span。也就是說,在進行合并時我們只需要用到span與其首尾頁之間的映射關(guān)系就夠了。

因此當我們申請k頁的span時,如果是將n頁的span切成了一個k頁的span和一個n-k頁的span,我們除了需要建立k頁span中每個頁與該span之間的映射關(guān)系之外,還需要建立剩下的n-k頁的span與其首尾頁之間的映射關(guān)系。

//獲取一個k頁的span
Span* PageCache::NewSpan(size_t k)
{
	assert(k > 0 && k < NPAGES);
	//先檢查第k個桶里面有沒有span
	if (!_spanLists[k].Empty())
	{
		Span* kSpan = _spanLists[k].PopFront();

		//建立頁號與span的映射,方便central cache回收小塊內(nèi)存時查找對應(yīng)的span
		for (PAGE_ID i = 0; i < kSpan->_n; i++)
		{
			_idSpanMap[kSpan->_pageId + i] = kSpan;
		}

		return kSpan;
	}
	//檢查一下后面的桶里面有沒有span,如果有可以將其進行切分
	for (size_t i = k + 1; i < NPAGES; i++)
	{
		if (!_spanLists[i].Empty())
		{
			Span* nSpan = _spanLists[i].PopFront();
			Span* kSpan = new Span;
			//在nSpan的頭部切k頁下來
			kSpan->_pageId = nSpan->_pageId;
			kSpan->_n = k;

			nSpan->_pageId += k;
			nSpan->_n -= k;
			//將剩下的掛到對應(yīng)映射的位置
			_spanLists[nSpan->_n].PushFront(nSpan);
			//存儲nSpan的首尾頁號與nSpan之間的映射,方便page cache合并span時進行前后頁的查找
			_idSpanMap[nSpan->_pageId] = nSpan;
			_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;

			//建立頁號與span的映射,方便central cache回收小塊內(nèi)存時查找對應(yīng)的span
			for (PAGE_ID i = 0; i < kSpan->_n; i++)
			{
				_idSpanMap[kSpan->_pageId + i] = kSpan;
			}

			return kSpan;
		}
	}
	//走到這里說明后面沒有大頁的span了,這時就向堆申請一個128頁的span
	Span* bigSpan = new Span;
	void* ptr = SystemAlloc(NPAGES - 1);
	bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
	bigSpan->_n = NPAGES - 1;

	_spanLists[bigSpan->_n].PushFront(bigSpan);

	//盡量避免代碼重復(fù),遞歸調(diào)用自己
	return NewSpan(k);
}

此時page cache當中的span就都與其首尾頁之間建立了映射關(guān)系,現(xiàn)在我們就可以進行span的合并了,其合并邏輯如下:

//釋放空閑的span回到PageCache,并合并相鄰的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{
	//對span的前后頁,嘗試進行合并,緩解內(nèi)存碎片問題
	//1、向前合并
	while (1)
	{
		PAGE_ID prevId = span->_pageId - 1;
		auto ret = _idSpanMap.find(prevId);
		//前面的頁號沒有(還未向系統(tǒng)申請),停止向前合并
		if (ret == _idSpanMap.end())
		{
			break;
		}
		//前面的頁號對應(yīng)的span正在被使用,停止向前合并
		Span* prevSpan = ret->second;
		if (prevSpan->_isUse == true)
		{
			break;
		}
		//合并出超過128頁的span無法進行管理,停止向前合并
		if (prevSpan->_n + span->_n > NPAGES - 1)
		{
			break;
		}
		//進行向前合并
		span->_pageId = prevSpan->_pageId;
		span->_n += prevSpan->_n;

		//將prevSpan從對應(yīng)的雙鏈表中移除
		_spanLists[prevSpan->_n].Erase(prevSpan);

		delete prevSpan;
	}
	//2、向后合并
	while (1)
	{
		PAGE_ID nextId = span->_pageId + span->_n;
		auto ret = _idSpanMap.find(nextId);
		//后面的頁號沒有(還未向系統(tǒng)申請),停止向后合并
		if (ret == _idSpanMap.end())
		{
			break;
		}
		//后面的頁號對應(yīng)的span正在被使用,停止向后合并
		Span* nextSpan = ret->second;
		if (nextSpan->_isUse == true)
		{
			break;
		}
		//合并出超過128頁的span無法進行管理,停止向后合并
		if (nextSpan->_n + span->_n > NPAGES - 1)
		{
			break;
		}
		//進行向后合并
		span->_n += nextSpan->_n;

		//將nextSpan從對應(yīng)的雙鏈表中移除
		_spanLists[nextSpan->_n].Erase(nextSpan);

		delete nextSpan;
	}
	//將合并后的span掛到對應(yīng)的雙鏈表當中
	_spanLists[span->_n].PushFront(span);
	//建立該span與其首尾頁的映射
	_idSpanMap[span->_pageId] = span;
	_idSpanMap[span->_pageId + span->_n - 1] = span;
	//將該span設(shè)置為未被使用的狀態(tài)
	span->_isUse = false;
}

需要注意的是,在向前或向后進行合并的過程中:

  • 如果沒有通過頁號獲取到其對應(yīng)的span,說明對應(yīng)到該頁的內(nèi)存塊還未申請,此時需要停止合并。
  • 如果通過頁號獲取到了其對應(yīng)的span,但該span處于被使用的狀態(tài),那我們也必須停止合并。
  • 如果合并后大于128頁則不能進行本次合并,因為page cache無法對大于128頁的span進行管理。

在合并span時,由于這個span是在page cache的某個哈希桶的雙鏈表當中的,因此在合并后需要將其從對應(yīng)的雙鏈表中移除,然后再將這個被合并了的span結(jié)構(gòu)進行delete。

除此之外,在合并結(jié)束后,除了將合并后的span掛到page cache對應(yīng)哈希桶的雙鏈表當中,還需要建立該span與其首位頁之間的映射關(guān)系,便于此后合并出更大的span。

釋放內(nèi)存過程聯(lián)調(diào)

ConcurrentFree函數(shù)

至此我們將thread cache、central cache以及page cache的釋放流程也都寫完了,此時我們就可以向外提供一個ConcurrentFree函數(shù),用于釋放內(nèi)存塊,釋放內(nèi)存塊時每個線程通過自己的thread cache對象,調(diào)用thread cache中釋放內(nèi)存對象的接口即可。

static void ConcurrentFree(void* ptr, size_t size/*暫時*/)
{
	assert(pTLSThreadCache);

	pTLSThreadCache->Deallocate(ptr, size);
}

釋放內(nèi)存過程聯(lián)調(diào)測試

之前我們在測試申請流程時,讓單個線程進行了三次內(nèi)存申請,現(xiàn)在我們再將這三個對象再進行釋放,看看這其中的釋放流程是如何進行的。

void* p1 = ConcurrentAlloc(6);
void* p2 = ConcurrentAlloc(8);
void* p3 = ConcurrentAlloc(1);

ConcurrentFree(p1, 6);
ConcurrentFree(p2, 8);
ConcurrentFree(p3, 1);

首先,這三次申請和釋放的對象大小進行對齊后都是8字節(jié),因此對應(yīng)操作的就是thread cache和central cache的第0號桶,以及page cache的第1號桶。

由于第三次對象申請時,剛好將thread cache第0號桶當中僅剩的一個對象拿走了,因此在三次對象申請后thread cache的第0號桶當中是沒有對象的。

通過監(jiān)視窗口可以看到,此時thread cache第0號桶中自由鏈表的_maxSize已經(jīng)慢增長到了3,而當我們釋放完第一個對象后,該自由鏈表當中對象的個數(shù)只有一個,因此不會將該自由鏈表當中的對象進一步還給central cache。

在這里插入圖片描述

當?shù)诙€對象釋放給thread cache的第0號桶后,該桶對應(yīng)自由鏈表當中對象的個數(shù)變成了2,也是不會進行ListTooLong操作的。

在這里插入圖片描述

直到第三個對象釋放給thread cache的第0號桶時,此時該自由鏈表的_size的值變?yōu)?,與_maxSize的值相等,現(xiàn)在thread cache就需要將對象給central cache了。

在這里插入圖片描述

thread cache先是將第0號桶當中的對象彈出MaxSize個,在這里實際上就是全部彈出,此時該自由鏈表_size的值變?yōu)?,然后繼續(xù)調(diào)用central cache當中的ReleaseListToSpans函數(shù),將這三個對象還給central cache當中對應(yīng)的span。

在這里插入圖片描述

在進入central cache的第0號桶還對象之前,先把第0號桶對應(yīng)的桶鎖加上,然后通過查page cache中的映射表找到其對應(yīng)的span,最后將這個對象頭插到該span的自由鏈表中,并將該span的_useCount進行--。當?shù)谝粋€對象還給其對應(yīng)的span時,可以看到該span的_useCount減到了2。

在這里插入圖片描述

而由于我們只進行了三次對象申請,并且這些對象大小對齊后大小都是8字節(jié),因此我們申請的這三個對象實際都是同一個span切分出來的。當我們將這三個對象都還給這個span時,該span的_useCount就減為了0。

在這里插入圖片描述

現(xiàn)在central cache就需要將這個span進一步還給page cache,而在將該span交給page cache之前,會將該span的自由鏈表以及前后指針都置空。并且在進入page cache之前會先將central cache第0號桶的桶鎖解掉,然后再加上page cache的大鎖,之后才能進入page cache進行相關(guān)操作。

在這里插入圖片描述

由于這個一頁的span是從128頁的span的頭部切下來的,在向前合并時由于前面的頁還未向系統(tǒng)申請,因此在查映射關(guān)系時是無法找到的,此時直接停止了向前合并。

(說明一下:由于下面是重新另外進行的一次調(diào)試,因此監(jiān)視窗口顯示的span的起始頁號與之前的不同,實際應(yīng)該與上面一致)

在這里插入圖片描述

而在向后合并時,由于page cache沒有將該頁后面的頁分配給central cache,因此在向后合并時肯定能夠找到一個127頁的span進行合并。合并后就變成了一個128頁的span,這時我們將原來127頁的span從第127號桶刪除,然后還需要將該127頁的span結(jié)構(gòu)進行delete,因為它管理的127頁已經(jīng)與1頁的span進行合并了,不再需要它來管理了。

在這里插入圖片描述

緊接著將這個128頁的span插入到第128號桶,然后建立該span與其首尾頁的映射,便于下次被用于合并,最后再將該span的狀態(tài)設(shè)置為未被使用的狀態(tài)即可。

在這里插入圖片描述

當從page cache回來后,除了將page cache的大鎖解掉,還需要立刻加上central cache中對應(yīng)的桶鎖,然后繼續(xù)將對象還給central cache中的span,但此時實際上是還完了,因此再將central cache的桶鎖解掉就行了。

在這里插入圖片描述

至此我們便完成了這三個對象的申請和釋放流程。

大于256KB的大塊內(nèi)存申請問題

申請過程

之前說到,每個線程的thread cache是用于申請小于等于256KB的內(nèi)存的,而對于大于256KB的內(nèi)存,我們可以考慮直接向page cache申請,但page cache中最大的頁也就只有128頁,因此如果是大于128頁的內(nèi)存申請,就只能直接向堆申請了。

申請內(nèi)存的大小申請方式
x ≤ 256 K B ( 32 頁 ) x \leq 256KB(32頁) x≤256KB(32頁)向thread cache申請
32 頁 < x ≤ 128 頁 32 頁< x \leq 128頁 32頁<x≤128頁向page cache申請
x ≥ 128 頁 x \geq 128頁 x≥128頁向堆申請
  當申請的內(nèi)存大于256KB時,雖然不是從thread cache進行獲取,但在分配內(nèi)存時也是需要進行向上對齊的,對于大于256KB的內(nèi)存我們可以直接按頁進行對齊。 

而我們之前實現(xiàn)RoundUp函數(shù)時,對傳入字節(jié)數(shù)大于256KB的情況直接做了斷言處理,因此這里需要對RoundUp函數(shù)稍作修改。

//獲取向上對齊后的字節(jié)數(shù)
static inline size_t RoundUp(size_t bytes)
{
	if (bytes <= 128)
	{
		return _RoundUp(bytes, 8);
	}
	else if (bytes <= 1024)
	{
		return _RoundUp(bytes, 16);
	}
	else if (bytes <= 8 * 1024)
	{
		return _RoundUp(bytes, 128);
	}
	else if (bytes <= 64 * 1024)
	{
		return _RoundUp(bytes, 1024);
	}
	else if (bytes <= 256 * 1024)
	{
		return _RoundUp(bytes, 8 * 1024);
	}
	else
	{
		//大于256KB的按頁對齊
		return _RoundUp(bytes, 1 << PAGE_SHIFT);
	}
}

現(xiàn)在對于之前的申請邏輯就需要進行修改了,當申請對象的大小大于256KB時,就不用向thread cache申請了,這時先計算出按頁對齊后實際需要申請的頁數(shù),然后通過調(diào)用NewSpan申請指定頁數(shù)的span即可。

static void* ConcurrentAlloc(size_t size)
{
	if (size > MAX_BYTES) //大于256KB的內(nèi)存申請
	{
		//計算出對齊后需要申請的頁數(shù)
		size_t alignSize = SizeClass::RoundUp(size);
		size_t kPage = alignSize >> PAGE_SHIFT;

		//向page cache申請kPage頁的span
		PageCache::GetInstance()->_pageMtx.lock();
		Span* span = PageCache::GetInstance()->NewSpan(kPage);
		PageCache::GetInstance()->_pageMtx.unlock();

		void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
		return ptr;
	}
	else
	{
		//通過TLS,每個線程無鎖的獲取自己專屬的ThreadCache對象
		if (pTLSThreadCache == nullptr)
		{
			pTLSThreadCache = new ThreadCache;
		}
		cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;

		return pTLSThreadCache->Allocate(size);
	}
}

也就是說,申請大于256KB的內(nèi)存時,會直接調(diào)用page cache當中的NewSpan函數(shù)進行申請,因此這里我們需要再對NewSpan函數(shù)進行改造,當需要申請的內(nèi)存頁數(shù)大于128頁時,就直接向堆申請對應(yīng)頁數(shù)的內(nèi)存塊。而如果申請的內(nèi)存頁數(shù)是小于128頁的,那就在page cache中進行申請,因此當申請大于256KB的內(nèi)存調(diào)用NewSpan函數(shù)時也是需要加鎖的,因為我們可能是在page cache中進行申請的。

//獲取一個k頁的span
Span* PageCache::NewSpan(size_t k)
{
	assert(k > 0);
	if (k > NPAGES - 1) //大于128頁直接找堆申請
	{
		void* ptr = SystemAlloc(k);
		Span* span = new Span;
		span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
		span->_n = k;
		//建立頁號與span之間的映射
		_idSpanMap[span->_pageId] = span;
		return span;
	}
	//先檢查第k個桶里面有沒有span
	if (!_spanLists[k].Empty())
	{
		Span* kSpan = _spanLists[k].PopFront();

		//建立頁號與span的映射,方便central cache回收小塊內(nèi)存時查找對應(yīng)的span
		for (PAGE_ID i = 0; i < kSpan->_n; i++)
		{
			_idSpanMap[kSpan->_pageId + i] = kSpan;
		}

		return kSpan;
	}
	//檢查一下后面的桶里面有沒有span,如果有可以將其進行切分
	for (size_t i = k + 1; i < NPAGES; i++)
	{
		if (!_spanLists[i].Empty())
		{
			Span* nSpan = _spanLists[i].PopFront();
			Span* kSpan = new Span;
			//在nSpan的頭部切k頁下來
			kSpan->_pageId = nSpan->_pageId;
			kSpan->_n = k;

			nSpan->_pageId += k;
			nSpan->_n -= k;
			//將剩下的掛到對應(yīng)映射的位置
			_spanLists[nSpan->_n].PushFront(nSpan);
			//存儲nSpan的首尾頁號與nSpan之間的映射,方便page cache合并span時進行前后頁的查找
			_idSpanMap[nSpan->_pageId] = nSpan;
			_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;

			//建立頁號與span的映射,方便central cache回收小塊內(nèi)存時查找對應(yīng)的span
			for (PAGE_ID i = 0; i < kSpan->_n; i++)
			{
				_idSpanMap[kSpan->_pageId + i] = kSpan;
			}

			return kSpan;
		}
	}
	//走到這里說明后面沒有大頁的span了,這時就向堆申請一個128頁的span
	Span* bigSpan = new Span;
	void* ptr = SystemAlloc(NPAGES - 1);
	bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
	bigSpan->_n = NPAGES - 1;

	_spanLists[bigSpan->_n].PushFront(bigSpan);

	//盡量避免代碼重復(fù),遞歸調(diào)用自己
	return NewSpan(k);
}

釋放過程

當釋放對象時,我們需要判斷釋放對象的大小:

釋放內(nèi)存的大小釋放方式
x ≤ 256 K B ( 32 頁 ) x \leq 256KB(32頁) x≤256KB(32頁)釋放給thread cache
32 頁 < x ≤ 128 頁 32 頁< x \leq 128頁 32頁<x≤128頁釋放給page cache
x ≥ 128 頁 x \geq 128頁 x≥128頁釋放給堆

因此當釋放對象時,我們需要先找到該對象對應(yīng)的span,但是在釋放對象時我們只知道該對象的起始地址。這也就是我們在申請大于256KB的內(nèi)存時,也要給申請到的內(nèi)存建立span結(jié)構(gòu),并建立起始頁號與該span之間的映射關(guān)系的原因。此時我們就可以通過釋放對象的起始地址計算出起始頁號,進而通過頁號找到該對象對應(yīng)的span。

static void ConcurrentFree(void* ptr, size_t size)
{
	if (size > MAX_BYTES) //大于256KB的內(nèi)存釋放
	{
		Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);

		PageCache::GetInstance()->_pageMtx.lock();
		PageCache::GetInstance()->ReleaseSpanToPageCache(span);
		PageCache::GetInstance()->_pageMtx.unlock();
	}
	else
	{
		assert(pTLSThreadCache);
		pTLSThreadCache->Deallocate(ptr, size);
	}
}

因此page cache在回收span時也需要進行判斷,如果該span的大小是小于等于128頁的,那么直接還給page cache進行了,page cache會嘗試對其進行合并。而如果該span的大小是大于128頁的,那么說明該span是直接向堆申請的,我們直接將這塊內(nèi)存釋放給堆,然后將這個span結(jié)構(gòu)進行delete就行了。

//釋放空閑的span回到PageCache,并合并相鄰的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{
	if (span->_n > NPAGES - 1) //大于128頁直接釋放給堆
	{
		void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
		SystemFree(ptr);
		delete span;
		return;
	}
	//對span的前后頁,嘗試進行合并,緩解內(nèi)存碎片問題
	//1、向前合并
	while (1)
	{
		PAGE_ID prevId = span->_pageId - 1;
		auto ret = _idSpanMap.find(prevId);
		//前面的頁號沒有(還未向系統(tǒng)申請),停止向前合并
		if (ret == _idSpanMap.end())
		{
			break;
		}
		//前面的頁號對應(yīng)的span正在被使用,停止向前合并
		Span* prevSpan = ret->second;
		if (prevSpan->_isUse == true)
		{
			break;
		}
		//合并出超過128頁的span無法進行管理,停止向前合并
		if (prevSpan->_n + span->_n > NPAGES - 1)
		{
			break;
		}
		//進行向前合并
		span->_pageId = prevSpan->_pageId;
		span->_n += prevSpan->_n;

		//將prevSpan從對應(yīng)的雙鏈表中移除
		_spanLists[prevSpan->_n].Erase(prevSpan);

		delete prevSpan;
	}
	//2、向后合并
	while (1)
	{
		PAGE_ID nextId = span->_pageId + span->_n;
		auto ret = _idSpanMap.find(nextId);
		//后面的頁號沒有(還未向系統(tǒng)申請),停止向后合并
		if (ret == _idSpanMap.end())
		{
			break;
		}
		//后面的頁號對應(yīng)的span正在被使用,停止向后合并
		Span* nextSpan = ret->second;
		if (nextSpan->_isUse == true)
		{
			break;
		}
		//合并出超過128頁的span無法進行管理,停止向后合并
		if (nextSpan->_n + span->_n > NPAGES - 1)
		{
			break;
		}
		//進行向后合并
		span->_n += nextSpan->_n;

		//將nextSpan從對應(yīng)的雙鏈表中移除
		_spanLists[nextSpan->_n].Erase(nextSpan);

		delete nextSpan;
	}
	//將合并后的span掛到對應(yīng)的雙鏈表當中
	_spanLists[span->_n].PushFront(span);
	//建立該span與其首尾頁的映射
	_idSpanMap[span->_pageId] = span;
	_idSpanMap[span->_pageId + span->_n - 1] = span;
	//將該span設(shè)置為未被使用的狀態(tài)
	span->_isUse = false;
}

說明一下,直接向堆申請內(nèi)存時我們調(diào)用的接口是VirtualAlloc,與之對應(yīng)的將內(nèi)存釋放給堆的接口叫做VirtualFree,而Linux下的brk和mmap對應(yīng)的釋放接口叫做sbrk和unmmap。此時我們也可以將這些釋放接口封裝成一個叫做SystemFree的接口,當我們需要將內(nèi)存釋放給堆時直接調(diào)用SystemFree即可。

//直接將內(nèi)存還給堆
inline static void SystemFree(void* ptr)
{
#ifdef _WIN32
	VirtualFree(ptr, 0, MEM_RELEASE);
#else
	//linux下sbrk unmmap等
#endif
}

簡單測試

下面我們對大于256KB的申請釋放流程進行簡單的測試:

//找page cache申請
void* p1 = ConcurrentAlloc(257 * 1024); //257KB
ConcurrentFree(p1, 257 * 1024);

//找堆申請
void* p2 = ConcurrentAlloc(129 * 8 * 1024); //129頁
ConcurrentFree(p2, 129 * 8 * 1024);

當申請257KB的內(nèi)存時,由于257KB的內(nèi)存按頁向上對齊后是33頁,并沒有大于128頁,因此不會直接向堆進行申請,會向page cache申請內(nèi)存,但此時page cache當中實際是沒有內(nèi)存的,最終page cache就會向堆申請一個128頁的span,將其切分成33頁的span和95頁的span,并將33頁的span進行返回。

在這里插入圖片描述

而在釋放內(nèi)存時,由于該對象的大小大于了256KB,因此不會將其還給thread cache,而是直接調(diào)用的page cache當中的釋放接口。

在這里插入圖片描述

由于該對象的大小是33頁,不大于128頁,因此page cache也不會直接將該對象還給堆,而是嘗試對其進行合并,最終就會把這個33頁的span和之前剩下的95頁的span進行合并,最終將合并后的128頁的span掛到第128號桶中。

在這里插入圖片描述

當申請129頁的內(nèi)存時,由于是大于256KB的,于是還是調(diào)用的page cache對應(yīng)的申請接口,但此時申請的內(nèi)存同時也大于128頁,因此會直接向堆申請。在申請后還會建立該span與其起始頁號之間的映射,便于釋放時可以通過頁號找到該span。

在這里插入圖片描述

在釋放內(nèi)存時,通過對象的地址找到其對應(yīng)的span,從span結(jié)構(gòu)中得知釋放內(nèi)存的大小大于128頁,于是會將該內(nèi)存直接還給堆。

在這里插入圖片描述

使用定長內(nèi)存池配合脫離使用new

tcmalloc是要在高并發(fā)場景下替代malloc進行內(nèi)存申請的,因此tcmalloc在實現(xiàn)的時,其內(nèi)部是不能調(diào)用malloc函數(shù)的,我們當前的代碼中存在通過new獲取到的內(nèi)存,而new在底層實際上就是封裝了malloc。

為了完全脫離掉malloc函數(shù),此時我們之前實現(xiàn)的定長內(nèi)存池就起作用了,代碼中使用new時基本都是為Span結(jié)構(gòu)的對象申請空間,而span對象基本都是在page cache層創(chuàng)建的,因此我們可以在PageCache類當中定義一個_spanPool,用于span對象的申請和釋放。

//單例模式
class PageCache
{
public:
	//...
private:
	ObjectPool<Span> _spanPool;
};

然后將代碼中使用new的地方替換為調(diào)用定長內(nèi)存池當中的New函數(shù),將代碼中使用delete的地方替換為調(diào)用定長內(nèi)存池當中的Delete函數(shù)。

//申請span對象
Span* span = _spanPool.New();
//釋放span對象
_spanPool.Delete(span);

注意,當使用定長內(nèi)存池當中的New函數(shù)申請Span對象時,New函數(shù)通過定位new也是對Span對象進行了初始化的。

此外,每個線程第一次申請內(nèi)存時都會創(chuàng)建其專屬的thread cache,而這個thread cache目前也是new出來的,我們也需要對其進行替換。

//通過TLS,每個線程無鎖的獲取自己專屬的ThreadCache對象
if (pTLSThreadCache == nullptr)
{
	static std::mutex tcMtx;
	static ObjectPool<ThreadCache> tcPool;
	tcMtx.lock();
	pTLSThreadCache = tcPool.New();
	tcMtx.unlock();
}

這里我們將用于申請ThreadCache類對象的定長內(nèi)存池定義為靜態(tài)的,保持全局只有一個,讓所有線程創(chuàng)建自己的thread cache時,都在個定長內(nèi)存池中申請內(nèi)存就行了。

但注意在從該定長內(nèi)存池中申請內(nèi)存時需要加鎖,防止多個線程同時申請自己的ThreadCache對象而導(dǎo)致線程安全問題。

最后在SpanList的構(gòu)造函數(shù)中也用到了new,因為SpanList是帶頭循環(huán)雙向鏈表,所以在構(gòu)造期間我們需要申請一個span對象作為雙鏈表的頭結(jié)點。

//帶頭雙向循環(huán)鏈表
class SpanList
{
public:
	SpanList()
	{
		_head = _spanPool.New();
		_head->_next = _head;
		_head->_prev = _head;
	}
private:
	Span* _head;
	static ObjectPool<Span> _spanPool;
};

由于每個span雙鏈表只需要一個頭結(jié)點,因此將這個定長內(nèi)存池定義為靜態(tài)的,保持全局只有一個,讓所有span雙鏈表在申請頭結(jié)點時,都在一個定長內(nèi)存池中申請內(nèi)存就行了。

釋放對象時優(yōu)化為不傳對象大小

當我們使用malloc函數(shù)申請內(nèi)存時,需要指明申請內(nèi)存的大??;而當我們使用free函數(shù)釋放內(nèi)存時,只需要傳入指向這塊內(nèi)存的指針即可。

而我們目前實現(xiàn)的內(nèi)存池,在釋放對象時除了需要傳入指向該對象的指針,還需要傳入該對象的大小。

原因如下:

  • 如果釋放的是大于256KB的對象,需要根據(jù)對象的大小來判斷這塊內(nèi)存到底應(yīng)該還給page cache,還是應(yīng)該直接還給堆。
  • 如果釋放的是小于等于256KB的對象,需要根據(jù)對象的大小計算出應(yīng)該還給thread cache的哪一個哈希桶。

如果我們也想做到,在釋放對象時不用傳入對象的大小,那么我們就需要建立對象地址與對象大小之間的映射。由于現(xiàn)在可以通過對象的地址找到其對應(yīng)的span,而span的自由鏈表中掛的都是相同大小的對象。

因此我們可以在Span結(jié)構(gòu)中再增加一個_objSize成員,該成員代表著這個span管理的內(nèi)存塊被切成的一個個對象的大小。

//管理以頁為單位的大塊內(nèi)存
struct Span
{
	PAGE_ID _pageId = 0;        //大塊內(nèi)存起始頁的頁號
	size_t _n = 0;              //頁的數(shù)量

	Span* _next = nullptr;      //雙鏈表結(jié)構(gòu)
	Span* _prev = nullptr;

	size_t _objSize = 0;        //切好的小對象的大小
	size_t _useCount = 0;       //切好的小塊內(nèi)存,被分配給thread cache的計數(shù)
	void* _freeList = nullptr;  //切好的小塊內(nèi)存的自由鏈表

	bool _isUse = false;        //是否在被使用
};

而所有的span都是從page cache中拿出來的,因此每當我們調(diào)用NewSpan獲取到一個k頁的span時,就應(yīng)該將這個span的_objSize保存下來。

Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
span->_objSize = size;

代碼中有兩處,一處是在central cache中獲取非空span時,如果central cache對應(yīng)的桶中沒有非空的span,此時會調(diào)用NewSpan獲取一個k頁的span;另一處是當申請大于256KB內(nèi)存時,會直接調(diào)用NewSpan獲取一個k頁的span。

此時當我們釋放對象時,就可以直接從對象的span中獲取到該對象的大小,準確來說獲取到的是對齊以后的大小。

static void ConcurrentFree(void* ptr)
{
	Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);
	size_t size = span->_objSize;
	if (size > MAX_BYTES) //大于256KB的內(nèi)存釋放
	{
		PageCache::GetInstance()->_pageMtx.lock();
		PageCache::GetInstance()->ReleaseSpanToPageCache(span);
		PageCache::GetInstance()->_pageMtx.unlock();
	}
	else
	{
		assert(pTLSThreadCache);
		pTLSThreadCache->Deallocate(ptr, size);
	}
}

讀取映射關(guān)系時的加鎖問題

我們將頁號與span之間的映射關(guān)系是存儲在PageCache類當中的,當我們訪問這個映射關(guān)系時是需要加鎖的,因為STL容器是不保證線程安全的。

對于當前代碼來說,如果我們此時正在page cache進行相關(guān)操作,那么訪問這個映射關(guān)系是安全的,因為當進入page cache之前是需要加鎖的,因此可以保證此時只有一個線程在進行訪問。

但如果我們是在central cache訪問這個映射關(guān)系,或是在調(diào)用ConcurrentFree函數(shù)釋放內(nèi)存時訪問這個映射關(guān)系,那么就存在線程安全的問題。因為此時可能其他線程正在page cache當中進行某些操作,并且該線程此時可能也在訪問這個映射關(guān)系,因此當我們在page cache外部訪問這個映射關(guān)系時是需要加鎖的。

實際就是在調(diào)用page cache對外提供訪問映射關(guān)系的函數(shù)時需要加鎖,這里我們可以考慮使用C++當中的unique_lock,當然你也可以用普通的鎖。

//獲取從對象到span的映射
Span* PageCache::MapObjectToSpan(void* obj)
{
	PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT; //頁號

	std::unique_lock<std::mutex> lock(_pageMtx); //構(gòu)造時加鎖,析構(gòu)時自動解鎖
	auto ret = _idSpanMap.find(id);
	if (ret != _idSpanMap.end())
	{
		return ret->second;
	}
	else
	{
		assert(false);
		return nullptr;
	}
}

多線程環(huán)境下對比malloc測試

之前我們只是對代碼進行了一些基礎(chǔ)的單元測試,下面我們在多線程場景下對比malloc進行測試。

void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
	std::vector<std::thread> vthread(nworks);
	std::atomic<size_t> malloc_costtime = 0;
	std::atomic<size_t> free_costtime = 0;
	for (size_t k = 0; k < nworks; ++k)
	{
		vthread[k] = std::thread([&, k]() {
			std::vector<void*> v;
			v.reserve(ntimes);
			for (size_t j = 0; j < rounds; ++j)
			{
				size_t begin1 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					v.push_back(malloc(16));
					//v.push_back(malloc((16 + i) % 8192 + 1));
				}
				size_t end1 = clock();
				size_t begin2 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					free(v[i]);
				}
				size_t end2 = clock();
				v.clear();
				malloc_costtime += (end1 - begin1);
				free_costtime += (end2 - begin2);
			}
		});
	}
	for (auto& t : vthread)
	{
		t.join();
	}
	printf("%u個線程并發(fā)執(zhí)行%u輪次,每輪次malloc %u次: 花費:%u ms\n",
		nworks, rounds, ntimes, malloc_costtime);
	printf("%u個線程并發(fā)執(zhí)行%u輪次,每輪次free %u次: 花費:%u ms\n",
		nworks, rounds, ntimes, free_costtime);
	printf("%u個線程并發(fā)malloc&free %u次,總計花費:%u ms\n",
		nworks, nworks*rounds*ntimes, malloc_costtime + free_costtime);
}

void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
	std::vector<std::thread> vthread(nworks);
	std::atomic<size_t> malloc_costtime = 0;
	std::atomic<size_t> free_costtime = 0;
	for (size_t k = 0; k < nworks; ++k)
	{
		vthread[k] = std::thread([&]() {
			std::vector<void*> v;
			v.reserve(ntimes);
			for (size_t j = 0; j < rounds; ++j)
			{
				size_t begin1 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					v.push_back(ConcurrentAlloc(16));
					//v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
				}
				size_t end1 = clock();
				size_t begin2 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					ConcurrentFree(v[i]);
				}
				size_t end2 = clock();
				v.clear();
				malloc_costtime += (end1 - begin1);
				free_costtime += (end2 - begin2);
			}
		});
	}
	for (auto& t : vthread)
	{
		t.join();
	}
	printf("%u個線程并發(fā)執(zhí)行%u輪次,每輪次concurrent alloc %u次: 花費:%u ms\n",
		nworks, rounds, ntimes, malloc_costtime);
	printf("%u個線程并發(fā)執(zhí)行%u輪次,每輪次concurrent dealloc %u次: 花費:%u ms\n",
		nworks, rounds, ntimes, free_costtime);
	printf("%u個線程并發(fā)concurrent alloc&dealloc %u次,總計花費:%u ms\n",
		nworks, nworks*rounds*ntimes, malloc_costtime + free_costtime);
}

int main()
{
	size_t n = 10000;
	cout << "==========================================================" <<
		endl;
	BenchmarkConcurrentMalloc(n, 4, 10);
	cout << endl << endl;
	BenchmarkMalloc(n, 4, 10);
	cout << "==========================================================" <<
		endl;
	return 0;
}

其中測試函數(shù)各個參數(shù)的含義如下:

  • ntimes:單輪次申請和釋放內(nèi)存的次數(shù)。
  • nworks:線程數(shù)。
  • rounds:輪次。

在測試函數(shù)中,我們通過clock函數(shù)分別獲取到每輪次申請和釋放所花費的時間,然后將其對應(yīng)累加到malloc_costtime和free_costtime上。最后我們就得到了,nworks個線程跑rounds輪,每輪申請和釋放ntimes次,這個過程申請所消耗的時間、釋放所消耗的時間、申請和釋放總共消耗的時間。

注意,我們創(chuàng)建線程時讓線程執(zhí)行的是lambda表達式,而我們這里在使用lambda表達式時,以值傳遞的方式捕捉了變量k,以引用傳遞的方式捕捉了其他父作用域中的變量,因此我們可以將各個線程消耗的時間累加到一起。

我們將所有線程申請內(nèi)存消耗的時間都累加到malloc_costtime上, 將釋放內(nèi)存消耗的時間都累加到free_costtime上,此時malloc_costtime和free_costtime可能被多個線程同時進行累加操作的,所以存在線程安全的問題。鑒于此,我們在定義這兩個變量時使用了atomic類模板,這時對它們的操作就是原子操作了。

固定大小內(nèi)存的申請和釋放

我們先來測試一下固定大小內(nèi)存的申請和釋放:

v.push_back(malloc(16));
v.push_back(ConcurrentAlloc(16));

此時4個線程執(zhí)行10輪操作,每輪申請釋放10000次,總共申請釋放了40萬次,運行后可以看到,malloc的效率還是更高的。

在這里插入圖片描述

由于此時我們申請釋放的都是固定大小的對象,每個線程申請釋放時訪問的都是各自thread cache的同一個桶,當thread cache的這個桶中沒有對象或?qū)ο筇嘁獨w還時,也都會訪問central cache的同一個桶。此時central cache中的桶鎖就不起作用了,因為我們讓central cache使用桶鎖的目的就是為了,讓多個thread cache可以同時訪問central cache的不同桶,而此時每個thread cache訪問的卻都是central cache中的同一個桶。

不同大小內(nèi)存的申請和釋放

下面我們再來測試一下不同大小內(nèi)存的申請和釋放:

v.push_back(malloc((16 + i) % 8192 + 1));
v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));

運行后可以看到,由于申請和釋放內(nèi)存的大小是不同的,此時central cache當中的桶鎖就起作用了,ConcurrentAlloc的效率也有了較大增長,但相比malloc來說還是差一點點。

在這里插入圖片描述

復(fù)雜問題的調(diào)試技巧

多線程調(diào)試比單線程調(diào)試要復(fù)雜得多,調(diào)試時各個線程之間會相互切換,并且每次調(diào)試切換的時機也是不固定的,這就使得調(diào)試過程變得非常難以控制。

下面給出三個調(diào)試時的小技巧:

1、條件斷點

一般情況下我們可以直接運行程序,通過報錯來查找問題。如果此時報的是斷言錯誤,那么我們可以直接定位到報錯的位置,然后將此處的斷言改為與斷言條件相反的if判斷,在if語句里面打上一個斷點,但注意空語句是無法打斷點的,這時我們隨便在if里面加上一句代碼就可以打斷點了。

在這里插入圖片描述

此外,條件斷點也可以通過右擊普通斷點來進行設(shè)置。

在這里插入圖片描述

右擊后即可設(shè)置相應(yīng)的條件,程序運行到此處時如果滿足該條件則會停下來。

在這里插入圖片描述

運行到條件斷點處后,我們就可以對當前程序進行進一步分析,找出斷言錯誤的被觸發(fā)原因。

2、查看函數(shù)棧幀

當程序運行到斷點處時,我們需要對當前位置進行分析,如果檢查后發(fā)現(xiàn)當前函數(shù)是沒有問題的,這時可能需要回到調(diào)用該函數(shù)的地方進行進一步分析,此時我們可以依次點擊“調(diào)試→窗口→調(diào)用堆棧”。

在這里插入圖片描述

此時我們就可以看到當前函數(shù)棧幀的調(diào)用情況,其中黃色箭頭指向的是當前所在的函數(shù)棧幀。

在這里插入圖片描述

雙擊函數(shù)棧幀中的其他函數(shù),就可以跳轉(zhuǎn)到該函數(shù)對應(yīng)的棧幀,此時淺灰色箭頭指向的就是當前跳轉(zhuǎn)到的函數(shù)棧幀。

在這里插入圖片描述

需要注意的是,監(jiān)視窗口只能查看當前棧幀中的變量。如果要查看此時其他函數(shù)棧幀中變量的情況,就可以通過函數(shù)棧幀跳轉(zhuǎn)來查看。

3、疑似死循環(huán)時中斷程序

當你在某個地方設(shè)置斷點后,如果遲遲沒有運行到斷點處,而程序也沒有崩潰,這時有可能是程序進入到某個死循環(huán)了。

在這里插入圖片描述

這時我們可以依次點擊“調(diào)試→全部中斷”。

在這里插入圖片描述

這時程序就會在當前運行的地方停下來。

在這里插入圖片描述

性能瓶頸分析

經(jīng)過前面的測試可以看到,我們的代碼此時與malloc之間還是有差距的,此時我們就應(yīng)該分析分析我們當前項目的瓶頸在哪里,但這不能簡單的憑感覺,我們應(yīng)該用性能分析的工具來進行分析。

VS編譯器下性能分析的操作步驟

VS編譯器中就帶有性能分析的工具的,我們可以依次點擊“調(diào)試→性能和診斷”進行性能分析,注意該操作要在Debug模式下進行。

在這里插入圖片描述

同時我們將代碼中n的值由10000調(diào)成了1000,否則該分析過程可能會花費較多時間,并且將malloc的測試代碼進行了屏蔽,因為我們要分析的是我們實現(xiàn)的高并發(fā)內(nèi)存池。

int main()
{
	size_t n = 1000;
	cout << "==========================================================" <<
		endl;
	BenchmarkConcurrentMalloc(n, 4, 10);
	cout << endl << endl;
	//BenchmarkMalloc(n, 4, 10);
	cout << "==========================================================" <<
		endl;
	return 0;
}

在點擊了“調(diào)試→性能和診斷”后會彈出一個提示框,我們直接點擊“開始”進行了。

在這里插入圖片描述

然后會彈出一個選項框,這里我們選擇的是第二個,因為我們要分析的是各個函數(shù)的用時時間,然后點擊下一步。

在這里插入圖片描述

出現(xiàn)以下選項框繼續(xù)點擊下一步。

在這里插入圖片描述

最后點擊完成,就可以等待分析結(jié)果了。

在這里插入圖片描述

分析性能瓶頸

通過分析結(jié)果可以看到,光是Deallocate和MapObjectToSpan這兩個函數(shù)就占用了一半多的時間。

在這里插入圖片描述

而在Deallocate函數(shù)中,調(diào)用ListTooLong函數(shù)時消耗的時間是最多的。

在這里插入圖片描述

繼續(xù)往下看,在ListTooLong函數(shù)中,調(diào)用ReleaseListToSpans函數(shù)時消耗的時間是最多的。

在這里插入圖片描述

再進一步看,在ReleaseListToSpans函數(shù)中,調(diào)用MapObjectToSpan函數(shù)時消耗的時間是最多的。

在這里插入圖片描述

也就是說,最終消耗時間最多的實際就是MapObjectToSpan函數(shù),我們這時再來看看為什么調(diào)用MapObjectToSpan函數(shù)會消耗這么多時間。通過觀察我們最終發(fā)現(xiàn),調(diào)用該函數(shù)時會消耗這么多時間就是因為鎖的原因。

在這里插入圖片描述

因此當前項目的瓶頸點就在鎖競爭上面,需要解決調(diào)用MapObjectToSpan函數(shù)訪問映射關(guān)系時的加鎖問題。tcmalloc當中針對這一點使用了基數(shù)樹進行優(yōu)化,使得在讀取這個映射關(guān)系時可以做到不加鎖。

針對性能瓶頸使用基數(shù)樹進行優(yōu)化

基數(shù)樹實際上就是一個分層的哈希表,根據(jù)所分層數(shù)不同可分為單層基數(shù)樹、二層基數(shù)樹、三層基數(shù)樹等。

單層基數(shù)樹

單層基數(shù)樹實際采用的就是直接定址法,每一個頁號對應(yīng)span的地址就存儲數(shù)組中在以該頁號為下標的位置。

在這里插入圖片描述

最壞的情況下我們需要建立所有頁號與其span之間的映射關(guān)系,因此這個數(shù)組中元素個數(shù)應(yīng)該與頁號的數(shù)目相同,數(shù)組中每個位置存儲的就是對應(yīng)span的指針。

//單層基數(shù)樹
template <int BITS>
class TCMalloc_PageMap1
{
public:
	typedef uintptr_t Number;
	explicit TCMalloc_PageMap1()
	{
		size_t size = sizeof(void*) << BITS; //需要開辟數(shù)組的大小
		size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT); //按頁對齊后的大小
		array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT); //向堆申請空間
		memset(array_, 0, size); //對申請到的內(nèi)存進行清理
	}
	void* get(Number k) const
	{
		if ((k >> BITS) > 0) //k的范圍不在[0, 2^BITS-1]
		{
			return NULL;
		}
		return array_[k]; //返回該頁號對應(yīng)的span
	}
	void set(Number k, void* v)
	{
		assert((k >> BITS) == 0); //k的范圍必須在[0, 2^BITS-1]
		array_[k] = v; //建立映射
	}
private:
	void** array_; //存儲映射關(guān)系的數(shù)組
	static const int LENGTH = 1 << BITS; //頁的數(shù)目
};

此時當我們需要建立映射時就調(diào)用set函數(shù),需要讀取映射關(guān)系時,就調(diào)用get函數(shù)就行了。

代碼中的非類型模板參數(shù)BITS表示存儲頁號最多需要比特位的個數(shù)。在32位下我們傳入的是32-PAGE_SHIFT,在64位下傳入的是64-PAGE_SHIFT。而其中的LENGTH成員代表的就是頁號的數(shù)目,即 2 B I T S 2^{BITS} 2BITS。

比如32位平臺下,以一頁大小為8K為例,此時頁的數(shù)目就是 2 32 ÷ 2 13 = 2 19 2^{32}\div2^{13}=2^{19} 232÷213=219,因此存儲頁號最多需要19個比特位,此時傳入非類型模板參數(shù)的值就是 32 − 13 = 19 32-13=19 32−13=19。由于32位平臺下指針的大小是4字節(jié),因此該數(shù)組的大小就是 2 19 × 4 = 2 21 = 2 M 2^{19}\times4=2^{21}=2M 219×4=221=2M,內(nèi)存消耗不大,是可行的。但如果是在64位平臺下,此時該數(shù)組的大小是 2 51 × 8 = 2 54 = 2 24 G 2^{51}\times8=2^{54}=2^{24}G 251×8=254=224G,這顯然是不可行的,實際上對于64位的平臺,我們需要使用三層基數(shù)樹。

二層基數(shù)樹

這里還是以32位平臺下,一頁的大小為8K為例來說明,此時存儲頁號最多需要19個比特位。而二層基數(shù)樹實際上就是把這19個比特位分為兩次進行映射。

比如用前5個比特位在基數(shù)樹的第一層進行映射,映射后得到對應(yīng)的第二層,然后用剩下的比特位在基數(shù)樹的第二層進行映射,映射后最終得到該頁號對應(yīng)的span指針。

在這里插入圖片描述

在二層基數(shù)樹中,第一層的數(shù)組占用 2 5 × 4 = 2 7 B y t e 2^{5}\times4=2^{7}Byte 25×4=27Byte空間,第二層的數(shù)組最多占用 2 5 × 2 14 × 4 = 2 21 = 2 M 2^{5}\times2^{14}\times4=2^{21}=2M 25×214×4=221=2M。二層基數(shù)樹相比一層基數(shù)樹的好處就是,一層基數(shù)樹必須一開始就把 2 M 2M 2M的數(shù)組開辟出來,而二層基數(shù)樹一開始時只需將第一層的數(shù)組開辟出來,當需要進行某一頁號映射時再開辟對應(yīng)的第二層的數(shù)組就行了。

//二層基數(shù)樹
template <int BITS>
class TCMalloc_PageMap2
{
private:
	static const int ROOT_BITS = 5;                //第一層對應(yīng)頁號的前5個比特位
	static const int ROOT_LENGTH = 1 << ROOT_BITS; //第一層存儲元素的個數(shù)
	static const int LEAF_BITS = BITS - ROOT_BITS; //第二層對應(yīng)頁號的其余比特位
	static const int LEAF_LENGTH = 1 << LEAF_BITS; //第二層存儲元素的個數(shù)
	//第一層數(shù)組中存儲的元素類型
	struct Leaf
	{
		void* values[LEAF_LENGTH];
	};
	Leaf* root_[ROOT_LENGTH]; //第一層數(shù)組
public:
	typedef uintptr_t Number;
	explicit TCMalloc_PageMap2()
	{
		memset(root_, 0, sizeof(root_)); //將第一層的空間進行清理
		PreallocateMoreMemory(); //直接將第二層全部開辟
	}
	void* get(Number k) const
	{
		const Number i1 = k >> LEAF_BITS;        //第一層對應(yīng)的下標
		const Number i2 = k & (LEAF_LENGTH - 1); //第二層對應(yīng)的下標
		if ((k >> BITS) > 0 || root_[i1] == NULL) //頁號值不在范圍或沒有建立過映射
		{
			return NULL;
		}
		return root_[i1]->values[i2]; //返回該頁號對應(yīng)span的指針
	}
	void set(Number k, void* v)
	{
		const Number i1 = k >> LEAF_BITS;        //第一層對應(yīng)的下標
		const Number i2 = k & (LEAF_LENGTH - 1); //第二層對應(yīng)的下標
		assert(i1 < ROOT_LENGTH);
		root_[i1]->values[i2] = v; //建立該頁號與對應(yīng)span的映射
	}
	//確保映射[start,start_n-1]頁號的空間是開辟好了的
	bool Ensure(Number start, size_t n)
	{
		for (Number key = start; key <= start + n - 1;)
		{
			const Number i1 = key >> LEAF_BITS;
			if (i1 >= ROOT_LENGTH) //頁號超出范圍
				return false;
			if (root_[i1] == NULL) //第一層i1下標指向的空間未開辟
			{
				//開辟對應(yīng)空間
				static ObjectPool<Leaf> leafPool;
				Leaf* leaf = (Leaf*)leafPool.New();
				memset(leaf, 0, sizeof(*leaf));
				root_[i1] = leaf;
			}
			key = ((key >> LEAF_BITS) + 1) << LEAF_BITS; //繼續(xù)后續(xù)檢查
		}
		return true;
	}
	void PreallocateMoreMemory()
	{
		Ensure(0, 1 << BITS); //將第二層的空間全部開辟好
	}
};

因此在二層基數(shù)樹中有一個Ensure函數(shù),當需要建立某一頁號與其span之間的映射關(guān)系時,需要先調(diào)用該Ensure函數(shù)確保用于映射該頁號的空間是開辟了的,如果沒有開辟則會立即開辟。

而在32位平臺下,就算將二層基數(shù)樹第二層的數(shù)組全部開辟出來也就消耗了 2 M 2M 2M的空間,內(nèi)存消耗也不算太多,因此我們可以在構(gòu)造二層基數(shù)樹時就把第二層的數(shù)組全部開辟出來。

三層基數(shù)樹

上面一層基數(shù)樹和二層基數(shù)樹都適用于32位平臺,而對于64位的平臺就需要用三層基數(shù)樹了。三層基數(shù)樹與二層基數(shù)樹類似,三層基數(shù)樹實際上就是把存儲頁號的若干比特位分為三次進行映射。

在這里插入圖片描述

此時只有當要建立某一頁號的映射關(guān)系時,再開辟對應(yīng)的數(shù)組空間,而沒有建立映射的頁號就可以不用開辟其對應(yīng)的數(shù)組空間,此時就能在一定程度上節(jié)省內(nèi)存空間。

//三層基數(shù)樹
template <int BITS>
class TCMalloc_PageMap3
{
private:
	static const int INTERIOR_BITS = (BITS + 2) / 3;       //第一、二層對應(yīng)頁號的比特位個數(shù)
	static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS; //第一、二層存儲元素的個數(shù)
	static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS; //第三層對應(yīng)頁號的比特位個數(shù)
	static const int LEAF_LENGTH = 1 << LEAF_BITS;         //第三層存儲元素的個數(shù)
	struct Node
	{
		Node* ptrs[INTERIOR_LENGTH];
	};
	struct Leaf
	{
		void* values[LEAF_LENGTH];
	};
	Node* NewNode()
	{
		static ObjectPool<Node> nodePool;
		Node* result = nodePool.New();
		if (result != NULL)
		{
			memset(result, 0, sizeof(*result));
		}
		return result;
	}
	Node* root_;
public:
	typedef uintptr_t Number;
	explicit TCMalloc_PageMap3()
	{
		root_ = NewNode();
	}
	void* get(Number k) const
	{
		const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);         //第一層對應(yīng)的下標
		const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1); //第二層對應(yīng)的下標
		const Number i3 = k & (LEAF_LENGTH - 1);                    //第三層對應(yīng)的下標
		//頁號超出范圍,或映射該頁號的空間未開辟
		if ((k >> BITS) > 0 || root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL)
		{
			return NULL;
		}
		return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3]; //返回該頁號對應(yīng)span的指針
	}
	void set(Number k, void* v)
	{
		assert(k >> BITS == 0);
		const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);         //第一層對應(yīng)的下標
		const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1); //第二層對應(yīng)的下標
		const Number i3 = k & (LEAF_LENGTH - 1);                    //第三層對應(yīng)的下標
		Ensure(k, 1); //確保映射第k頁頁號的空間是開辟好了的
		reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v; //建立該頁號與對應(yīng)span的映射
	}
	//確保映射[start,start+n-1]頁號的空間是開辟好了的
	bool Ensure(Number start, size_t n)
	{
		for (Number key = start; key <= start + n - 1;)
		{
			const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);         //第一層對應(yīng)的下標
			const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1); //第二層對應(yīng)的下標
			if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH) //下標值超出范圍
				return false;
			if (root_->ptrs[i1] == NULL) //第一層i1下標指向的空間未開辟
			{
				//開辟對應(yīng)空間
				Node* n = NewNode();
				if (n == NULL) return false;
				root_->ptrs[i1] = n;
			}
			if (root_->ptrs[i1]->ptrs[i2] == NULL) //第二層i2下標指向的空間未開辟
			{
				//開辟對應(yīng)空間
				static ObjectPool<Leaf> leafPool;
				Leaf* leaf = leafPool.New();
				if (leaf == NULL) return false;
				memset(leaf, 0, sizeof(*leaf));
				root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);
			}
			key = ((key >> LEAF_BITS) + 1) << LEAF_BITS; //繼續(xù)后續(xù)檢查
		}
		return true;
	}
	void PreallocateMoreMemory()
	{}
};

因此當我們要建立某一頁號的映射關(guān)系時,需要先確保存儲該頁映射的數(shù)組空間是開辟好了的,也就是調(diào)用代碼中的Ensure函數(shù),如果對應(yīng)數(shù)組空間未開辟則會立馬開辟對應(yīng)的空間。

使用基數(shù)樹進行優(yōu)化代碼實現(xiàn)

代碼更改

現(xiàn)在我們用基數(shù)樹對代碼進行優(yōu)化,此時將PageCache類當中的unorder_map用基數(shù)樹進行替換即可,由于當前是32位平臺,因此這里隨便用幾層基數(shù)樹都可以。

//單例模式
class PageCache
{
public:
	//...
private:
	//std::unordered_map<PAGE_ID, Span*> _idSpanMap;
	TCMalloc_PageMap1<32 - PAGE_SHIFT> _idSpanMap;
};

此時當我們需要建立頁號與span的映射時,就調(diào)用基數(shù)樹當中的set函數(shù)。

_idSpanMap.set(span->_pageId, span);

而當我們需要讀取某一頁號對應(yīng)的span時,就調(diào)用基數(shù)樹當中的get函數(shù)。

Span* ret = (Span*)_idSpanMap.get(id);

并且現(xiàn)在PageCache類向外提供的,用于讀取映射關(guān)系的MapObjectToSpan函數(shù)內(nèi)部就不需要加鎖了。

//獲取從對象到span的映射
Span* PageCache::MapObjectToSpan(void* obj)
{
	PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT; //頁號
	Span* ret = (Span*)_idSpanMap.get(id);
	assert(ret != nullptr);
	return ret;
}

為什么讀取基數(shù)樹映射關(guān)系時不需要加鎖?

當某個線程在讀取映射關(guān)系時,可能另外一個線程正在建立其他頁號的映射關(guān)系,而此時無論我們用的是C++當中的map還是unordered_map,在讀取映射關(guān)系時都是需要加鎖的。

因為C++中map的底層數(shù)據(jù)結(jié)構(gòu)是紅黑樹,unordered_map的底層數(shù)據(jù)結(jié)構(gòu)是哈希表,而無論是紅黑樹還是哈希表,當我們在插入數(shù)據(jù)時其底層的結(jié)構(gòu)都有可能會發(fā)生變化。比如紅黑樹在插入數(shù)據(jù)時可能會引起樹的旋轉(zhuǎn),而哈希表在插入數(shù)據(jù)時可能會引起哈希表擴容。此時要避免出現(xiàn)數(shù)據(jù)不一致的問題,就不能讓插入操作和讀取操作同時進行,因此我們在讀取映射關(guān)系的時候是需要加鎖的。

而對于基數(shù)樹來說就不一樣了,基數(shù)樹的空間一旦開辟好了就不會發(fā)生變化,因此無論什么時候去讀取某個頁的映射,都是對應(yīng)在一個固定的位置進行讀取的。并且我們不會同時對同一個頁進行讀取映射和建立映射的操作,因為我們只有在釋放對象時才需要讀取映射,而建立映射的操作都是在page cache進行的。也就是說,讀取映射時讀取的都是對應(yīng)span的_useCount不等于0的頁,而建立映射時建立的都是對應(yīng)span的_useCount等于0的頁,所以說我們不會同時對同一個頁進行讀取映射和建立映射的操作。

再次對比malloc進行測試

還是同樣的代碼,只不過我們用基數(shù)樹對代碼進行了優(yōu)化,這時測試固定大小內(nèi)存的申請和釋放的結(jié)果如下:

在這里插入圖片描述

可以看到,這時就算申請釋放的是固定大小的對象,其效率都是malloc的兩倍。下面在申請釋放不同大小的對象時,由于central cache的桶鎖起作用了,其效率更是變成了malloc的好幾倍。

在這里插入圖片描述

打包成動靜態(tài)庫

實際Google開源的tcmalloc是會直接用于替換malloc的,不同平臺替換的方式不同。比如基于Unix的系統(tǒng)上的glibc,使用了weak alias的方式替換;而對于某些其他平臺,需要使用hook的鉤子技術(shù)來做。

對于我們當前實現(xiàn)的項目,可以考慮將其打包成靜態(tài)庫或動態(tài)庫。我們先右擊解決方案資源管理器當中的項目名稱,然后選擇屬性。

在這里插入圖片描述

此時會彈出該選項卡,按照以下圖示就可以選擇將其打包成靜態(tài)庫或動態(tài)庫了。

在這里插入圖片描述

項目源碼

Github:https://github.com/chenlong-xcy/standard-project/tree/main/ConcurrentMemoryPool

到此這篇關(guān)于C++高并發(fā)內(nèi)存池的實現(xiàn)的文章就介紹到這了,更多相關(guān)C++高并發(fā)內(nèi)存池內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • C++運算符重載實例代碼詳解(調(diào)試環(huán)境 Visual Studio 2019)

    C++運算符重載實例代碼詳解(調(diào)試環(huán)境 Visual Studio 2019)

    這篇文章主要介紹了C++運算符重載實例(調(diào)試環(huán)境 Visual Studio 2019),本文通過示例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-03-03
  • Linux搭建C++開發(fā)調(diào)試環(huán)境的方法步驟

    Linux搭建C++開發(fā)調(diào)試環(huán)境的方法步驟

    這篇文章主要介紹了Linux搭建C++開發(fā)調(diào)試環(huán)境的方法步驟,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-10-10
  • C語言中的盜賊(小偷)問題詳解

    C語言中的盜賊(小偷)問題詳解

    大家好,本篇文章主要講的是C語言中的盜賊(小偷)問題詳解,感興趣的同學趕快來看一看吧,對你有幫助的話記得收藏一下
    2022-01-01
  • C++ STL list 遍歷刪除出錯解決方案

    C++ STL list 遍歷刪除出錯解決方案

    這篇文章主要介紹了C++ STL list 遍歷刪除出錯解決方案的相關(guān)資料,這里對出錯進行分析,并給出正確的解決方法,需要的朋友可以參考下
    2016-12-12
  • C語言詳解用char實現(xiàn)大小寫字母的轉(zhuǎn)換

    C語言詳解用char實現(xiàn)大小寫字母的轉(zhuǎn)換

    這篇文章主要給大家介紹了關(guān)于C語言實現(xiàn)大小寫字母轉(zhuǎn)換的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2022-05-05
  • opencv平均背景法詳解

    opencv平均背景法詳解

    這篇文章主要為大家詳細介紹了opencv平均背景法,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2020-03-03
  • 詳解C++編程中數(shù)組的基本用法

    詳解C++編程中數(shù)組的基本用法

    這篇文章主要介紹了C++編程中數(shù)組的基本用法,包括數(shù)組的初始化等基本知識,需要的朋友可以參考下
    2016-01-01
  • 深入解析C語言中typedef的四個用途

    深入解析C語言中typedef的四個用途

    以下是對C語言中typedef的四個用途進行了詳細的分析介紹,需要的朋友可以過來參考下
    2013-08-08
  • C++實現(xiàn)屏幕截圖(全屏截圖)

    C++實現(xiàn)屏幕截圖(全屏截圖)

    屏幕截圖已經(jīng)成為了所有IM即時通訊軟件的必備模塊,也是日常辦公中使用最頻繁的功能之一。今天我們從C++開發(fā)的角度,來看看屏幕截圖的主要功能點是如何實現(xiàn)的,感興趣的可以了解一下
    2021-11-11
  • OpenCV霍夫變換(Hough Transform)直線檢測詳解

    OpenCV霍夫變換(Hough Transform)直線檢測詳解

    這篇文章主要為大家詳細介紹了OpenCV霍夫變換直線檢測的相關(guān)資料,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2018-12-12

最新評論