C++高并發(fā)內(nèi)存池的實(shí)現(xiàn)
項(xiàng)目介紹
本項(xiàng)目實(shí)現(xiàn)的是一個(gè)高并發(fā)的內(nèi)存池,它的原型是Google的一個(gè)開(kāi)源項(xiàng)目tcmalloc,tcmalloc全稱Thread-Caching Malloc,即線程緩存的malloc,實(shí)現(xiàn)了高效的多線程內(nèi)存管理,用于替換系統(tǒng)的內(nèi)存分配相關(guān)函數(shù)malloc和free。
tcmalloc的知名度也是非常高的,不少公司都在用它,比如Go語(yǔ)言就直接用它做了自己的內(nèi)存分配器。
該項(xiàng)目就是把tcmalloc中最核心的框架簡(jiǎn)化后拿出來(lái),模擬實(shí)現(xiàn)出一個(gè)mini版的高并發(fā)內(nèi)存池,目的就是學(xué)習(xí)tcmalloc的精華。
該項(xiàng)目主要涉及C/C++、數(shù)據(jù)結(jié)構(gòu)(鏈表、哈希桶)、操作系統(tǒng)內(nèi)存管理、單例模式、多線程、互斥鎖等方面的技術(shù)。
內(nèi)存池介紹
池化技術(shù)
在說(shuō)內(nèi)存池之前,我們得先了解一下“池化技術(shù)”。所謂“池化技術(shù)”,就是程序先向系統(tǒng)申請(qǐng)過(guò)量的資源,然后自己進(jìn)行管理,以備不時(shí)之需。
之所以要申請(qǐng)過(guò)量的資源,是因?yàn)樯暾?qǐng)和釋放資源都有較大的開(kāi)銷,不如提前申請(qǐng)一些資源放入“池”中,當(dāng)需要資源時(shí)直接從“池”中獲取,不需要時(shí)就將該資源重新放回“池”中即可。這樣使用時(shí)就會(huì)變得非??旖荩梢源蟠筇岣叱绦虻倪\(yùn)行效率。
在計(jì)算機(jī)中,有很多使用“池”這種技術(shù)的地方,除了內(nèi)存池之外,還有連接池、線程池、對(duì)象池等。以服務(wù)器上的線程池為例,它的主要思想就是:先啟動(dòng)若干數(shù)量的線程,讓它們處于睡眠狀態(tài),當(dāng)接收到客戶端的請(qǐng)求時(shí),喚醒池中某個(gè)睡眠的線程,讓它來(lái)處理客戶端的請(qǐng)求,當(dāng)處理完這個(gè)請(qǐng)求后,線程又進(jìn)入睡眠狀態(tài)。
內(nèi)存池
內(nèi)存池是指程序預(yù)先向操作系統(tǒng)申請(qǐng)一塊足夠大的內(nèi)存,此后,當(dāng)程序中需要申請(qǐng)內(nèi)存的時(shí)候,不是直接向操作系統(tǒng)申請(qǐng),而是直接從內(nèi)存池中獲?。煌?,當(dāng)釋放內(nèi)存的時(shí)候,并不是真正將內(nèi)存返回給操作系統(tǒng),而是將內(nèi)存返回給內(nèi)存池。當(dāng)程序退出時(shí)(或某個(gè)特定時(shí)間),內(nèi)存池才將之前申請(qǐng)的內(nèi)存真正釋放。
內(nèi)存池主要解決的問(wèn)題
內(nèi)存池主要解決的就是效率的問(wèn)題,它能夠避免讓程序頻繁的向系統(tǒng)申請(qǐng)和釋放內(nèi)存。其次,內(nèi)存池作為系統(tǒng)的內(nèi)存分配器,還需要嘗試解決內(nèi)存碎片的問(wèn)題。
內(nèi)存碎片分為內(nèi)部碎片和外部碎片:
- 外部碎片是一些空閑的小塊內(nèi)存區(qū)域,由于這些內(nèi)存空間不連續(xù),以至于合計(jì)的內(nèi)存足夠,但是不能滿足一些內(nèi)存分配申請(qǐng)需求。
- 內(nèi)部碎片是由于一些對(duì)齊的需求,導(dǎo)致分配出去的空間中一些內(nèi)存無(wú)法被利用。
注意: 內(nèi)存池嘗試解決的是外部碎片的問(wèn)題,同時(shí)也盡可能的減少內(nèi)部碎片的產(chǎn)生。
malloc
C/C++中我們要?jiǎng)討B(tài)申請(qǐng)內(nèi)存并不是直接去堆申請(qǐng)的,而是通過(guò)malloc函數(shù)去申請(qǐng)的,包括C++中的new實(shí)際上也是封裝了malloc函數(shù)的。
我們申請(qǐng)內(nèi)存塊時(shí)是先調(diào)用malloc,malloc再去向操作系統(tǒng)申請(qǐng)內(nèi)存。malloc實(shí)際就是一個(gè)內(nèi)存池,malloc相當(dāng)于向操作系統(tǒng)“批發(fā)”了一塊較大的內(nèi)存空間,然后“零售”給程序用,當(dāng)全部“售完”或程序有大量的內(nèi)存需求時(shí),再根據(jù)實(shí)際需求向操作系統(tǒng)“進(jìn)貨”。
malloc的實(shí)現(xiàn)方式有很多種,一般不同編譯器平臺(tái)用的都是不同的。比如Windows的VS系列中的malloc就是微軟自行實(shí)現(xiàn)的,而Linux下的gcc用的是glibc中的ptmalloc。
定長(zhǎng)內(nèi)存池的實(shí)現(xiàn)
malloc其實(shí)就是一個(gè)通用的內(nèi)存池,在什么場(chǎng)景下都可以使用,但這也意味著malloc在什么場(chǎng)景下都不會(huì)有很高的性能,因?yàn)閙alloc并不是針對(duì)某種場(chǎng)景專門設(shè)計(jì)的。
定長(zhǎng)內(nèi)存池就是針對(duì)固定大小內(nèi)存塊的申請(qǐng)和釋放的內(nèi)存池,由于定長(zhǎng)內(nèi)存池只需要支持固定大小內(nèi)存塊的申請(qǐng)和釋放,因此我們可以將其性能做到極致,并且在實(shí)現(xiàn)定長(zhǎng)內(nèi)存池時(shí)不需要考慮內(nèi)存碎片等問(wèn)題,因?yàn)槲覀兩暾?qǐng)/釋放的都是固定大小的內(nèi)存塊。
我們可以通過(guò)實(shí)現(xiàn)定長(zhǎng)內(nèi)存池來(lái)熟悉一下對(duì)簡(jiǎn)單內(nèi)存池的控制,其次,這個(gè)定長(zhǎng)內(nèi)存池后面會(huì)作為高并發(fā)內(nèi)存池的一個(gè)基礎(chǔ)組件。
如何實(shí)現(xiàn)定長(zhǎng)?
在實(shí)現(xiàn)定長(zhǎng)內(nèi)存池時(shí)要做到“定長(zhǎng)”有很多種方法,比如我們可以使用非類型模板參數(shù),使得在該內(nèi)存池中申請(qǐng)到的對(duì)象的大小都是N。
template<size_t N> class ObjectPool {};
此外,定長(zhǎng)內(nèi)存池也叫做對(duì)象池,在創(chuàng)建對(duì)象池時(shí),對(duì)象池可以根據(jù)傳入的對(duì)象類型的大小來(lái)實(shí)現(xiàn)“定長(zhǎng)”,因此我們可以通過(guò)使用模板參數(shù)來(lái)實(shí)現(xiàn)“定長(zhǎng)”,比如創(chuàng)建定長(zhǎng)內(nèi)存池時(shí)傳入的對(duì)象類型是int,那么該內(nèi)存池就只支持4字節(jié)大小內(nèi)存的申請(qǐng)和釋放。
template<class T> class ObjectPool {};
如何直接向堆申請(qǐng)空間?
既然是內(nèi)存池,那么我們首先得向系統(tǒng)申請(qǐng)一塊內(nèi)存空間,然后對(duì)其進(jìn)行管理。要想直接向堆申請(qǐng)內(nèi)存空間,在Windows下,可以調(diào)用VirtualAlloc函數(shù);在Linux下,可以調(diào)用brk或mmap函數(shù)。
#ifdef _WIN32 #include <Windows.h> #else //... #endif //直接去堆上申請(qǐng)按頁(yè)申請(qǐng)空間 inline static void* SystemAlloc(size_t kpage) { #ifdef _WIN32 void* ptr = VirtualAlloc(0, kpage<<13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE); #else // linux下brk mmap等 #endif if (ptr == nullptr) throw std::bad_alloc(); return ptr; }
這里我們可以通過(guò)條件編譯將對(duì)應(yīng)平臺(tái)下向堆申請(qǐng)內(nèi)存的函數(shù)進(jìn)行封裝,此后我們就不必再關(guān)心當(dāng)前所在平臺(tái),當(dāng)我們需要直接向堆申請(qǐng)內(nèi)存時(shí)直接調(diào)用我們封裝后的SystemAlloc函數(shù)即可。
定長(zhǎng)內(nèi)存池中應(yīng)該包含哪些成員變量?
對(duì)于向堆申請(qǐng)到的大塊內(nèi)存,我們可以用一個(gè)指針來(lái)對(duì)其進(jìn)行管理,但僅用一個(gè)指針肯定是不夠的,我們還需要用一個(gè)變量來(lái)記錄這塊內(nèi)存的長(zhǎng)度。
由于此后我們需要將這塊內(nèi)存進(jìn)行切分,為了方便切分操作,指向這塊內(nèi)存的指針最好是字符指針,因?yàn)橹羔樀念愋蜎Q定了指針向前或向后走一步有多大距離,對(duì)于字符指針來(lái)說(shuō),當(dāng)我們需要向后移動(dòng)n個(gè)字節(jié)時(shí),直接對(duì)字符指針進(jìn)行加n操作即可。
其次,釋放回來(lái)的定長(zhǎng)內(nèi)存塊也需要被管理,我們可以將這些釋放回來(lái)的定長(zhǎng)內(nèi)存塊鏈接成一個(gè)鏈表,這里我們將管理釋放回來(lái)的內(nèi)存塊的鏈表叫做自由鏈表,為了能找到這個(gè)自由鏈表,我們還需要一個(gè)指向自由鏈表的指針。
因此,定長(zhǎng)內(nèi)存池當(dāng)中包含三個(gè)成員變量:
- _memory:指向大塊內(nèi)存的指針。
- _remainBytes:大塊內(nèi)存切分過(guò)程中剩余字節(jié)數(shù)。
- _freeList:還回來(lái)過(guò)程中鏈接的自由鏈表的頭指針。
內(nèi)存池如何管理釋放的對(duì)象?
對(duì)于還回來(lái)的定長(zhǎng)內(nèi)存塊,我們可以用自由鏈表將其鏈接起來(lái),但我們并不需要為其專門定義鏈?zhǔn)浇Y(jié)構(gòu),我們可以讓內(nèi)存塊的前4個(gè)字節(jié)(32位平臺(tái))或8個(gè)字節(jié)(64位平臺(tái))作為指針,存儲(chǔ)后面內(nèi)存塊的起始地址即可。
因此在向自由鏈表插入被釋放的內(nèi)存塊時(shí),先讓該內(nèi)存塊的前4個(gè)字節(jié)或8個(gè)字節(jié)存儲(chǔ)自由鏈表中第一個(gè)內(nèi)存塊的地址,然后再讓_freeList
指向該內(nèi)存塊即可,也就是一個(gè)簡(jiǎn)單的鏈表頭插操作。
這里有一個(gè)有趣問(wèn)題:如何讓一個(gè)指針在32位平臺(tái)下解引用后能向后訪問(wèn)4個(gè)字節(jié),在64位平臺(tái)下解引用后能向后訪問(wèn)8個(gè)字節(jié)?
首先我們得知道,32位平臺(tái)下指針的大小是4個(gè)字節(jié),64位平臺(tái)下指針的大小是8個(gè)字節(jié)。而指針指向數(shù)據(jù)的類型,決定了指針解引用后能向后訪問(wèn)的空間大小,因此我們這里需要的是一個(gè)指向指針的指針,這里使用二級(jí)指針就行了。
當(dāng)我們需要訪問(wèn)一個(gè)內(nèi)存塊的前4/8個(gè)字節(jié)時(shí),我們就可以先該內(nèi)存塊的地址先強(qiáng)轉(zhuǎn)為二級(jí)指針,由于二級(jí)指針存儲(chǔ)的是一級(jí)指針的地址,二級(jí)指針解引用能向后訪問(wèn)一個(gè)指針的大小,因此在32位平臺(tái)下訪問(wèn)的就是4個(gè)字節(jié),在64位平臺(tái)下訪問(wèn)的就是8個(gè)字節(jié),此時(shí)我們?cè)L問(wèn)到了該內(nèi)存塊的前4/8個(gè)字節(jié)。
void*& NextObj(void* ptr) { return (*(void**)ptr); }
需要注意的是,在釋放對(duì)象時(shí),我們應(yīng)該顯示調(diào)用該對(duì)象的析構(gòu)函數(shù)清理該對(duì)象,因?yàn)樵搶?duì)象可能還管理著其他某些資源,如果不對(duì)其進(jìn)行清理那么這些資源將無(wú)法被釋放,就會(huì)導(dǎo)致內(nèi)存泄漏。
//釋放對(duì)象 void Delete(T* obj) { //顯示調(diào)用T的析構(gòu)函數(shù)清理對(duì)象 obj->~T(); //將釋放的對(duì)象頭插到自由鏈表 NextObj(obj) = _freeList; _freeList = obj; }
內(nèi)存池如何為我們申請(qǐng)對(duì)象?
當(dāng)我們申請(qǐng)對(duì)象時(shí),內(nèi)存池應(yīng)該優(yōu)先把還回來(lái)的內(nèi)存塊對(duì)象再次重復(fù)利用,因此如果自由鏈表當(dāng)中有內(nèi)存塊的話,就直接從自由鏈表頭刪一個(gè)內(nèi)存塊進(jìn)行返回即可。
如果自由鏈表當(dāng)中沒(méi)有內(nèi)存塊,那么我們就在大塊內(nèi)存中切出定長(zhǎng)的內(nèi)存塊進(jìn)行返回,當(dāng)內(nèi)存塊切出后及時(shí)更新_memory
指針的指向,以及_remainBytes
的值即可。
需要特別注意的是,由于當(dāng)內(nèi)存塊釋放時(shí)我們需要將內(nèi)存塊鏈接到自由鏈表當(dāng)中,因此我們必須保證切出來(lái)的對(duì)象至少能夠存儲(chǔ)得下一個(gè)地址,所以當(dāng)對(duì)象的大小小于當(dāng)前所在平臺(tái)指針的大小時(shí),需要按指針的大小進(jìn)行內(nèi)存塊的切分。
此外,當(dāng)大塊內(nèi)存已經(jīng)不足以切分出一個(gè)對(duì)象時(shí),我們就應(yīng)該調(diào)用我們封裝的SystemAlloc函數(shù),再次向堆申請(qǐng)一塊內(nèi)存空間,此時(shí)也要注意及時(shí)更新_memory
指針的指向,以及_remainBytes
的值。
//申請(qǐng)對(duì)象 T* New() { T* obj = nullptr; //優(yōu)先把還回來(lái)的內(nèi)存塊對(duì)象,再次重復(fù)利用 if (_freeList != nullptr) { //從自由鏈表頭刪一個(gè)對(duì)象 obj = (T*)_freeList; _freeList = NextObj(_freeList); } else { //保證對(duì)象能夠存儲(chǔ)得下地址 size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T); //剩余內(nèi)存不夠一個(gè)對(duì)象大小時(shí),則重新開(kāi)大塊空間 if (_remainBytes < objSize) { _remainBytes = 128 * 1024; _memory = (char*)SystemAlloc(_remainBytes >> 13); if (_memory == nullptr) { throw std::bad_alloc(); } } //從大塊內(nèi)存中切出objSize字節(jié)的內(nèi)存 obj = (T*)_memory; _memory += objSize; _remainBytes -= objSize; } //定位new,顯示調(diào)用T的構(gòu)造函數(shù)初始化 new(obj)T; return obj; }
需要注意的是,與釋放對(duì)象時(shí)需要顯示調(diào)用該對(duì)象的析構(gòu)函數(shù)一樣,當(dāng)內(nèi)存塊切分出來(lái)后,我們也應(yīng)該使用定位new,顯示調(diào)用該對(duì)象的構(gòu)造函數(shù)對(duì)其進(jìn)行初始化。
定長(zhǎng)內(nèi)存池整體代碼如下:
//定長(zhǎng)內(nèi)存池 template<class T> class ObjectPool { public: //申請(qǐng)對(duì)象 T* New() { T* obj = nullptr; //優(yōu)先把還回來(lái)的內(nèi)存塊對(duì)象,再次重復(fù)利用 if (_freeList != nullptr) { //從自由鏈表頭刪一個(gè)對(duì)象 obj = (T*)_freeList; _freeList = NextObj(_freeList); } else { //保證對(duì)象能夠存儲(chǔ)得下地址 size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T); //剩余內(nèi)存不夠一個(gè)對(duì)象大小時(shí),則重新開(kāi)大塊空間 if (_remainBytes < objSize) { _remainBytes = 128 * 1024; //_memory = (char*)malloc(_remainBytes); _memory = (char*)SystemAlloc(_remainBytes >> 13); if (_memory == nullptr) { throw std::bad_alloc(); } } //從大塊內(nèi)存中切出objSize字節(jié)的內(nèi)存 obj = (T*)_memory; _memory += objSize; _remainBytes -= objSize; } //定位new,顯示調(diào)用T的構(gòu)造函數(shù)初始化 new(obj)T; return obj; } //釋放對(duì)象 void Delete(T* obj) { //顯示調(diào)用T的析構(gòu)函數(shù)清理對(duì)象 obj->~T(); //將釋放的對(duì)象頭插到自由鏈表 NextObj(obj) = _freeList; _freeList = obj; } private: char* _memory = nullptr; //指向大塊內(nèi)存的指針 size_t _remainBytes = 0; //大塊內(nèi)存在切分過(guò)程中剩余字節(jié)數(shù) void* _freeList = nullptr; //還回來(lái)過(guò)程中鏈接的自由鏈表的頭指針 };
性能對(duì)比
下面我們將實(shí)現(xiàn)的定長(zhǎng)內(nèi)存池和malloc/free進(jìn)行性能對(duì)比,測(cè)試代碼如下:
struct TreeNode { int _val; TreeNode* _left; TreeNode* _right; TreeNode() :_val(0) , _left(nullptr) , _right(nullptr) {} }; void TestObjectPool() { // 申請(qǐng)釋放的輪次 const size_t Rounds = 3; // 每輪申請(qǐng)釋放多少次 const size_t N = 1000000; std::vector<TreeNode*> v1; v1.reserve(N); //malloc和free size_t begin1 = clock(); for (size_t j = 0; j < Rounds; ++j) { for (int i = 0; i < N; ++i) { v1.push_back(new TreeNode); } for (int i = 0; i < N; ++i) { delete v1[i]; } v1.clear(); } size_t end1 = clock(); //定長(zhǎng)內(nèi)存池 ObjectPool<TreeNode> TNPool; std::vector<TreeNode*> v2; v2.reserve(N); size_t begin2 = clock(); for (size_t j = 0; j < Rounds; ++j) { for (int i = 0; i < N; ++i) { v2.push_back(TNPool.New()); } for (int i = 0; i < N; ++i) { TNPool.Delete(v2[i]); } v2.clear(); } size_t end2 = clock(); cout << "new cost time:" << end1 - begin1 << endl; cout << "object pool cost time:" << end2 - begin2 << endl; }
在代碼中,我們先用new申請(qǐng)若干個(gè)TreeNode對(duì)象,然后再用delete將這些對(duì)象再釋放,通過(guò)clock函數(shù)得到整個(gè)過(guò)程消耗的時(shí)間。(new和delete底層就是封裝的malloc和free)
然后再重復(fù)該過(guò)程,只不過(guò)將其中的new和delete替換為定長(zhǎng)內(nèi)存池當(dāng)中的New和Delete,此時(shí)再通過(guò)clock函數(shù)得到該過(guò)程消耗的時(shí)間。
可以看到在這個(gè)過(guò)程中,定長(zhǎng)內(nèi)存池消耗的時(shí)間比malloc/free消耗的時(shí)間要短。這就是因?yàn)閙alloc是一個(gè)通用的內(nèi)存池,而定長(zhǎng)內(nèi)存池是專門針對(duì)申請(qǐng)定長(zhǎng)對(duì)象而設(shè)計(jì)的,因此在這種特殊場(chǎng)景下定長(zhǎng)內(nèi)存池的效率更高,正所謂“尺有所短,寸有所長(zhǎng)”。
高并發(fā)內(nèi)存池整體框架設(shè)計(jì)
該項(xiàng)目解決的是什么問(wèn)題?
現(xiàn)代很多的開(kāi)發(fā)環(huán)境都是多核多線程,因此在申請(qǐng)內(nèi)存的時(shí),必然存在激烈的鎖競(jìng)爭(zhēng)問(wèn)題。malloc本身其實(shí)已經(jīng)很優(yōu)秀了,但是在并發(fā)場(chǎng)景下可能會(huì)因?yàn)轭l繁的加鎖和解鎖導(dǎo)致效率有所降低,而該項(xiàng)目的原型tcmalloc實(shí)現(xiàn)的就是一種在多線程高并發(fā)場(chǎng)景下更勝一籌的內(nèi)存池。
在實(shí)現(xiàn)內(nèi)存池時(shí)我們一般需要考慮到效率問(wèn)題和內(nèi)存碎片的問(wèn)題,但對(duì)于高并發(fā)內(nèi)存池來(lái)說(shuō),我們還需要考慮在多線程環(huán)境下的鎖競(jìng)爭(zhēng)問(wèn)題。
高并發(fā)內(nèi)存池整體框架設(shè)計(jì)
高并發(fā)內(nèi)存池主要由以下三個(gè)部分構(gòu)成:
- thread cache: 線程緩存是每個(gè)線程獨(dú)有的,用于小于等于256KB的內(nèi)存分配,每個(gè)線程獨(dú)享一個(gè)thread cache。
- central cache: 中心緩存是所有線程所共享的,當(dāng)thread cache需要內(nèi)存時(shí)會(huì)按需從central cache中獲取內(nèi)存,而當(dāng)thread cache中的內(nèi)存滿足一定條件時(shí),central cache也會(huì)在合適的時(shí)機(jī)對(duì)其進(jìn)行回收。
- page cache: 頁(yè)緩存中存儲(chǔ)的內(nèi)存是以頁(yè)為單位進(jìn)行存儲(chǔ)及分配的,當(dāng)central cache需要內(nèi)存時(shí),page cache會(huì)分配出一定數(shù)量的頁(yè)分配給central cache,而當(dāng)central cache中的內(nèi)存滿足一定條件時(shí),page cache也會(huì)在合適的時(shí)機(jī)對(duì)其進(jìn)行回收,并將回收的內(nèi)存盡可能的進(jìn)行合并,組成更大的連續(xù)內(nèi)存塊,緩解內(nèi)存碎片的問(wèn)題。
進(jìn)一步說(shuō)明:
每個(gè)線程都有一個(gè)屬于自己的thread cache,也就意味著線程在thread cache申請(qǐng)內(nèi)存時(shí)是不需要加鎖的,而一次性申請(qǐng)大于256KB內(nèi)存的情況是很少的,因此大部分情況下申請(qǐng)內(nèi)存時(shí)都是無(wú)鎖的,這也就是這個(gè)高并發(fā)內(nèi)存池高效的地方。
每個(gè)線程的thread cache會(huì)根據(jù)自己的情況向central cache申請(qǐng)或歸還內(nèi)存,這就避免了出現(xiàn)單個(gè)線程的thread cache占用太多內(nèi)存,而其余thread cache出現(xiàn)內(nèi)存吃緊的問(wèn)題。
多線程的thread cache可能會(huì)同時(shí)找central cache申請(qǐng)內(nèi)存,此時(shí)就會(huì)涉及線程安全的問(wèn)題,因此在訪問(wèn)central cache時(shí)是需要加鎖的,但central cache實(shí)際上是一個(gè)哈希桶的結(jié)構(gòu),只有當(dāng)多個(gè)線程同時(shí)訪問(wèn)同一個(gè)桶時(shí)才需要加鎖,所以這里的鎖競(jìng)爭(zhēng)也不會(huì)很激烈。
各個(gè)部分的主要作用
thread cache主要解決鎖競(jìng)爭(zhēng)的問(wèn)題,每個(gè)線程獨(dú)享自己的thread cache,當(dāng)自己的thread cache中有內(nèi)存時(shí)該線程不會(huì)去和其他線程進(jìn)行競(jìng)爭(zhēng),每個(gè)線程只要在自己的thread cache申請(qǐng)內(nèi)存就行了。
central cache主要起到一個(gè)居中調(diào)度的作用,每個(gè)線程的thread cache需要內(nèi)存時(shí)從central cache獲取,而當(dāng)thread cache的內(nèi)存多了就會(huì)將內(nèi)存還給central cache,其作用類似于一個(gè)中樞,因此取名為中心緩存。
page cache就負(fù)責(zé)提供以頁(yè)為單位的大塊內(nèi)存,當(dāng)central cache需要內(nèi)存時(shí)就會(huì)去向page cache申請(qǐng),而當(dāng)page cache沒(méi)有內(nèi)存了就會(huì)直接去找系統(tǒng),也就是直接去堆上按頁(yè)申請(qǐng)內(nèi)存塊。
threadcache
threadcache整體設(shè)計(jì)
定長(zhǎng)內(nèi)存池只支持固定大小內(nèi)存塊的申請(qǐng)釋放,因此定長(zhǎng)內(nèi)存池中只需要一個(gè)自由鏈表管理釋放回來(lái)的內(nèi)存塊?,F(xiàn)在我們要支持申請(qǐng)和釋放不同大小的內(nèi)存塊,那么我們就需要多個(gè)自由鏈表來(lái)管理釋放回來(lái)的內(nèi)存塊,因此thread cache實(shí)際上一個(gè)哈希桶結(jié)構(gòu),每個(gè)桶中存放的都是一個(gè)自由鏈表。
thread cache支持小于等于256KB內(nèi)存的申請(qǐng),如果我們將每種字節(jié)數(shù)的內(nèi)存塊都用一個(gè)自由鏈表進(jìn)行管理的話,那么此時(shí)我們就需要20多萬(wàn)個(gè)自由鏈表,光是存儲(chǔ)這些自由鏈表的頭指針就需要消耗大量?jī)?nèi)存,這顯然是得不償失的。
這時(shí)我們可以選擇做一些平衡的犧牲,讓這些字節(jié)數(shù)按照某種規(guī)則進(jìn)行對(duì)齊,例如我們讓這些字節(jié)數(shù)都按照8字節(jié)進(jìn)行向上對(duì)齊,那么thread cache的結(jié)構(gòu)就是下面這樣的,此時(shí)當(dāng)線程申請(qǐng)1~8字節(jié)的內(nèi)存時(shí)會(huì)直接給出8字節(jié),而當(dāng)線程申請(qǐng)9~16字節(jié)的內(nèi)存時(shí)會(huì)直接給出16字節(jié),以此類推。
因此當(dāng)線程要申請(qǐng)某一大小的內(nèi)存塊時(shí),就需要經(jīng)過(guò)某種計(jì)算得到對(duì)齊后的字節(jié)數(shù),進(jìn)而找到對(duì)應(yīng)的哈希桶,如果該哈希桶中的自由鏈表中有內(nèi)存塊,那就從自由鏈表中頭刪一個(gè)內(nèi)存塊進(jìn)行返回;如果該自由鏈表已經(jīng)為空了,那么就需要向下一層的central cache進(jìn)行獲取了。
但此時(shí)由于對(duì)齊的原因,就可能會(huì)產(chǎn)生一些碎片化的內(nèi)存無(wú)法被利用,比如線程只申請(qǐng)了6字節(jié)的內(nèi)存,而thread cache卻直接給了8字節(jié)的內(nèi)存,這多給出的2字節(jié)就無(wú)法被利用,導(dǎo)致了一定程度的空間浪費(fèi),這些因?yàn)槟承?duì)齊原因?qū)е聼o(wú)法被利用的內(nèi)存,就是內(nèi)存碎片中的內(nèi)部碎片。
鑒于當(dāng)前項(xiàng)目比較復(fù)雜,我們最好對(duì)自由鏈表這個(gè)結(jié)構(gòu)進(jìn)行封裝,目前我們就提供Push和Pop兩個(gè)成員函數(shù),對(duì)應(yīng)的操作分別是將對(duì)象插入到自由鏈表(頭插)和從自由鏈表獲取一個(gè)對(duì)象(頭刪),后面在需要時(shí)還會(huì)添加對(duì)應(yīng)的成員函數(shù)。
//管理切分好的小對(duì)象的自由鏈表 class FreeList { public: //將釋放的對(duì)象頭插到自由鏈表 void Push(void* obj) { assert(obj); //頭插 NextObj(obj) = _freeList; _freeList = obj; } //從自由鏈表頭部獲取一個(gè)對(duì)象 void* Pop() { assert(_freeList); //頭刪 void* obj = _freeList; _freeList = NextObj(_freeList); return obj; } private: void* _freeList = nullptr; //自由鏈表 };
因此thread cache實(shí)際就是一個(gè)數(shù)組,數(shù)組中存儲(chǔ)的就是一個(gè)個(gè)的自由鏈表,至于這個(gè)數(shù)組中到底存儲(chǔ)了多少個(gè)自由鏈表,就需要看我們?cè)谶M(jìn)行字節(jié)數(shù)對(duì)齊時(shí)具體用的是什么映射對(duì)齊規(guī)則了。
threadcache哈希桶映射對(duì)齊規(guī)則
如何進(jìn)行對(duì)齊?
上面已經(jīng)說(shuō)了,不是每個(gè)字節(jié)數(shù)都對(duì)應(yīng)一個(gè)自由鏈表,這樣開(kāi)銷太大了,因此我們需要制定一個(gè)合適的映射對(duì)齊規(guī)則。
首先,這些內(nèi)存塊是會(huì)被鏈接到自由鏈表上的,因此一開(kāi)始肯定是按8字節(jié)進(jìn)行對(duì)齊是最合適的,因?yàn)槲覀儽仨毐WC這些內(nèi)存塊,無(wú)論是在32位平臺(tái)下還是64位平臺(tái)下,都至少能夠存儲(chǔ)得下一個(gè)指針。
但如果所有的字節(jié)數(shù)都按照8字節(jié)進(jìn)行對(duì)齊的話,那么我們就需要建立 256 × 1024 ÷ 8 = 32768 256\times1024\div8=32768 256×1024÷8=32768個(gè)桶,這個(gè)數(shù)量還是比較多的,實(shí)際上我們可以讓不同范圍的字節(jié)數(shù)按照不同的對(duì)齊數(shù)進(jìn)行對(duì)齊,具體對(duì)齊方式如下:
字節(jié)數(shù) | 對(duì)齊數(shù) | 哈希桶下標(biāo) |
---|---|---|
[ [ [ 1 , 128 1,128 1,128 ] ] ] | 8 8 8 | [ [ [ 0 , 16 ) 0,16) 0,16) |
[ [ [ 128 + 1 , 1024 128+1,1024 128+1,1024 ] ] ] | 16 16 16 | [ [ [ 16 , 72 ) 16,72) 16,72) |
[ [ [ 1024 + 1 , 8 × 1024 1024+1,8\times1024 1024+1,8×1024 ] ] ] | 128 128 128 | [ [ [ 72 , 128 ) 72,128) 72,128) |
[ [ [ 8 × 1024 + 1 , 64 × 1024 8\times1024+1,64\times1024 8×1024+1,64×1024 ] ] ] | 1024 1024 1024 | [ [ [ 128 , 184 ) 128,184) 128,184) |
[ [ [ 64 × 1024 + 1 , 256 × 1024 64\times1024+1,256\times1024 64×1024+1,256×1024 ] ] ] | 8 × 1024 8\times1024 8×1024 | [ [ [ 184 , 208 ) 184,208) 184,208) |
空間浪費(fèi)率
雖然對(duì)齊產(chǎn)生的內(nèi)碎片會(huì)引起一定程度的空間浪費(fèi),但按照上面的對(duì)齊規(guī)則,我們可以將浪費(fèi)率控制到百分之十左右。需要說(shuō)明的是,1~128這個(gè)區(qū)間我們不做討論,因?yàn)?字節(jié)就算是對(duì)齊到2字節(jié)也有百分之五十的浪費(fèi)率,這里我們就從第二個(gè)區(qū)間開(kāi)始進(jìn)行計(jì)算。
根據(jù)上面的公式,我們要得到某個(gè)區(qū)間的最大浪費(fèi)率,就應(yīng)該讓分子取到最大,讓分母取到最小。比如129~1024這個(gè)區(qū)間,該區(qū)域的對(duì)齊數(shù)是16,那么最大浪費(fèi)的字節(jié)數(shù)就是15,而最小對(duì)齊后的字節(jié)數(shù)就是這個(gè)區(qū)間內(nèi)的前16個(gè)數(shù)所對(duì)齊到的字節(jié)數(shù),也就是144,那么該區(qū)間的最大浪費(fèi)率也就是 15 ÷ 144 ≈ 10.42 % 15\div144\approx10.42\% 15÷144≈10.42%。同樣的道理,后面兩個(gè)區(qū)間的最大浪費(fèi)率分別是 127 ÷ 1152 ≈ 11.02 % 127\div1152\approx11.02\% 127÷1152≈11.02%和 1023 ÷ 9216 ≈ 11.10 % 1023\div9216\approx11.10\% 1023÷9216≈11.10%。
對(duì)齊和映射相關(guān)函數(shù)的編寫(xiě)
此時(shí)有了字節(jié)數(shù)的對(duì)齊規(guī)則后,我們就需要提供兩個(gè)對(duì)應(yīng)的函數(shù),分別用于獲取某一字節(jié)數(shù)對(duì)齊后的字節(jié)數(shù),以及該字節(jié)數(shù)對(duì)應(yīng)的哈希桶下標(biāo)。關(guān)于處理對(duì)齊和映射的函數(shù),我們可以將其封裝到一個(gè)類當(dāng)中。
//管理對(duì)齊和映射等關(guān)系 class SizeClass { public: //獲取向上對(duì)齊后的字節(jié)數(shù) static inline size_t RoundUp(size_t bytes); //獲取對(duì)應(yīng)哈希桶的下標(biāo) static inline size_t Index(size_t bytes); };
需要注意的是,SizeClass類當(dāng)中的成員函數(shù)最好設(shè)置為靜態(tài)成員函數(shù),否則我們?cè)谡{(diào)用這些函數(shù)時(shí)就需要通過(guò)對(duì)象去調(diào)用,并且對(duì)于這些可能會(huì)頻繁調(diào)用的函數(shù),可以考慮將其設(shè)置為內(nèi)聯(lián)函數(shù)。
在獲取某一字節(jié)數(shù)向上對(duì)齊后的字節(jié)數(shù)時(shí),可以先判斷該字節(jié)數(shù)屬于哪一個(gè)區(qū)間,然后再通過(guò)調(diào)用一個(gè)子函數(shù)進(jìn)行進(jìn)一步處理。
//獲取向上對(duì)齊后的字節(jié)數(shù) static inline size_t RoundUp(size_t bytes) { if (bytes <= 128) { return _RoundUp(bytes, 8); } else if (bytes <= 1024) { return _RoundUp(bytes, 16); } else if (bytes <= 8 * 1024) { return _RoundUp(bytes, 128); } else if (bytes <= 64 * 1024) { return _RoundUp(bytes, 1024); } else if (bytes <= 256 * 1024) { return _RoundUp(bytes, 8 * 1024); } else { assert(false); return -1; } }
此時(shí)我們就需要編寫(xiě)一個(gè)子函數(shù),該子函數(shù)需要通過(guò)對(duì)齊數(shù)計(jì)算出某一字節(jié)數(shù)對(duì)齊后的字節(jié)數(shù),最容易想到的就是下面這種寫(xiě)法。
//一般寫(xiě)法 static inline size_t _RoundUp(size_t bytes, size_t alignNum) { size_t alignSize = 0; if (bytes%alignNum != 0) { alignSize = (bytes / alignNum + 1)*alignNum; } else { alignSize = bytes; } return alignSize; }
除了上述寫(xiě)法,我們還可以通過(guò)位運(yùn)算的方式來(lái)進(jìn)行計(jì)算,雖然位運(yùn)算可能并沒(méi)有上面的寫(xiě)法容易理解,但計(jì)算機(jī)執(zhí)行位運(yùn)算的速度是比執(zhí)行乘法和除法更快的。
//位運(yùn)算寫(xiě)法 static inline size_t _RoundUp(size_t bytes, size_t alignNum) { return ((bytes + alignNum - 1)&~(alignNum - 1)); }
對(duì)于上述位運(yùn)算,我們以10字節(jié)按8字節(jié)對(duì)齊為例進(jìn)行分析。 8 − 1 = 7 8-1=7 8−1=7,7就是一個(gè)低三位為1其余位為0的二進(jìn)制序列,我們將10與7相加,相當(dāng)于將10字節(jié)當(dāng)中不夠8字節(jié)的剩余字節(jié)數(shù)補(bǔ)上了。
然后我們?cè)賹⒃撝蹬c7按位取反后的值進(jìn)行與運(yùn)算,而7按位取反后是一個(gè)低三位為0其余位為1的二進(jìn)制序列,該操作進(jìn)行后相當(dāng)于屏蔽了該值的低三位而該值的其余位保持不變,此時(shí)得到的值就是10字節(jié)按8字節(jié)對(duì)齊后的值,即16字節(jié)。
在獲取某一字節(jié)數(shù)對(duì)應(yīng)的哈希桶下標(biāo)時(shí),也是先判斷該字節(jié)數(shù)屬于哪一個(gè)區(qū)間,然后再通過(guò)調(diào)用一個(gè)子函數(shù)進(jìn)行進(jìn)一步處理。
//獲取對(duì)應(yīng)哈希桶的下標(biāo) static inline size_t Index(size_t bytes) { //每個(gè)區(qū)間有多少個(gè)自由鏈表 static size_t groupArray[4] = { 16, 56, 56, 56 }; if (bytes <= 128) { return _Index(bytes, 3); } else if (bytes <= 1024) { return _Index(bytes - 128, 4) + groupArray[0]; } else if (bytes <= 8 * 1024) { return _Index(bytes - 1024, 7) + groupArray[0] + groupArray[1]; } else if (bytes <= 64 * 1024) { return _Index(bytes - 8 * 1024, 10) + groupArray[0] + groupArray[1] + groupArray[2]; } else if (bytes <= 256 * 1024) { return _Index(bytes - 64 * 1024, 13) + groupArray[0] + groupArray[1] + groupArray[2] + groupArray[3]; } else { assert(false); return -1; } }
此時(shí)我們需要編寫(xiě)一個(gè)子函數(shù)來(lái)繼續(xù)進(jìn)行處理,容易想到的就是根據(jù)對(duì)齊數(shù)來(lái)計(jì)算某一字節(jié)數(shù)對(duì)應(yīng)的下標(biāo)。
//一般寫(xiě)法 static inline size_t _Index(size_t bytes, size_t alignNum) { size_t index = 0; if (bytes%alignNum != 0) { index = bytes / alignNum; } else { index = bytes / alignNum - 1; } return index; }
當(dāng)然,為了提高效率下面也提供了一個(gè)用位運(yùn)算來(lái)解決的方法,需要注意的是,此時(shí)我們并不是傳入該字節(jié)數(shù)的對(duì)齊數(shù),而是將對(duì)齊數(shù)寫(xiě)成2的n次方的形式后,將這個(gè)n值進(jìn)行傳入。比如對(duì)齊數(shù)是8,傳入的就是3。
//位運(yùn)算寫(xiě)法 static inline size_t _Index(size_t bytes, size_t alignShift) { return ((bytes + (1 << alignShift) - 1) >> alignShift) - 1; }
這里我們還是以10字節(jié)按8字節(jié)對(duì)齊為例進(jìn)行分析。此時(shí)傳入的alignShift就是3,將1左移3位后得到的實(shí)際上就是對(duì)齊數(shù)8, 8 − 1 = 7 8-1=7 8−1=7,相當(dāng)于我們還是讓10與7相加。
之后我們?cè)賹⒃撝迪蛴乙?位,實(shí)際上就是讓這個(gè)值除以8,此時(shí)我們也是相當(dāng)于屏蔽了該值二進(jìn)制的低三位,因?yàn)槌?得到的值與其二進(jìn)制的低三位無(wú)關(guān),所以我們可以說(shuō)是將10對(duì)齊后的字節(jié)數(shù)除以了8,此時(shí)得到了2,而最后還需要減一是因?yàn)閿?shù)組的下標(biāo)是從0開(kāi)始的。
ThreadCache類
按照上述的對(duì)齊規(guī)則,thread cache中桶的個(gè)數(shù),也就是自由鏈表的個(gè)數(shù)是208,以及thread cache允許申請(qǐng)的最大內(nèi)存大小256KB,我們可以將這些數(shù)據(jù)按照如下方式進(jìn)行定義。
//小于等于MAX_BYTES,就找thread cache申請(qǐng) //大于MAX_BYTES,就直接找page cache或者系統(tǒng)堆申請(qǐng) static const size_t MAX_BYTES = 256 * 1024; //thread cache和central cache自由鏈表哈希桶的表大小 static const size_t NFREELISTS = 208;
現(xiàn)在就可以對(duì)ThreadCache類進(jìn)行定義了,thread cache就是一個(gè)存儲(chǔ)208個(gè)自由鏈表的數(shù)組,目前thread cache就先提供一個(gè)Allocate函數(shù)用于申請(qǐng)對(duì)象就行了,后面需要時(shí)再進(jìn)行增加。
class ThreadCache { public: //申請(qǐng)內(nèi)存對(duì)象 void* Allocate(size_t size); private: FreeList _freeLists[NFREELISTS]; //哈希桶 };
在thread cache申請(qǐng)對(duì)象時(shí),通過(guò)所給字節(jié)數(shù)計(jì)算出對(duì)應(yīng)的哈希桶下標(biāo),如果桶中自由鏈表不為空,則從該自由鏈表中取出一個(gè)對(duì)象進(jìn)行返回即可;但如果此時(shí)自由鏈表為空,那么我們就需要從central cache進(jìn)行獲取了,這里的FetchFromCentralCache函數(shù)也是thread cache類中的一個(gè)成員函數(shù),在后面再進(jìn)行具體實(shí)現(xiàn)。
//申請(qǐng)內(nèi)存對(duì)象 void* ThreadCache::Allocate(size_t size) { assert(size <= MAX_BYTES); size_t alignSize = SizeClass::RoundUp(size); size_t index = SizeClass::Index(size); if (!_freeLists[index].Empty()) { return _freeLists[index].Pop(); } else { return FetchFromCentralCache(index, alignSize); } }
threadcacheTLS無(wú)鎖訪問(wèn)
每個(gè)線程都有一個(gè)自己獨(dú)享的thread cache,那應(yīng)該如何創(chuàng)建這個(gè)thread cache呢?我們不能將這個(gè)thread cache創(chuàng)建為全局的,因?yàn)槿肿兞渴撬芯€程共享的,這樣就不可避免的需要鎖來(lái)控制,增加了控制成本和代碼復(fù)雜度。
要實(shí)現(xiàn)每個(gè)線程無(wú)鎖的訪問(wèn)屬于自己的thread cache,我們需要用到線程局部存儲(chǔ)TLS(Thread Local Storage),這是一種變量的存儲(chǔ)方法,使用該存儲(chǔ)方法的變量在它所在的線程是全局可訪問(wèn)的,但是不能被其他線程訪問(wèn)到,這樣就保持了數(shù)據(jù)的線程獨(dú)立性。
//TLS - Thread Local Storage static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
但不是每個(gè)線程被創(chuàng)建時(shí)就立馬有了屬于自己的thread cache,而是當(dāng)該線程調(diào)用相關(guān)申請(qǐng)內(nèi)存的接口時(shí)才會(huì)創(chuàng)建自己的thread cache,因此在申請(qǐng)內(nèi)存的函數(shù)中會(huì)包含以下邏輯。
//通過(guò)TLS,每個(gè)線程無(wú)鎖的獲取自己專屬的ThreadCache對(duì)象 if (pTLSThreadCache == nullptr) { pTLSThreadCache = new ThreadCache; }
centralcache
centralcache整體設(shè)計(jì)
當(dāng)線程申請(qǐng)某一大小的內(nèi)存時(shí),如果thread cache中對(duì)應(yīng)的自由鏈表不為空,那么直接取出一個(gè)內(nèi)存塊進(jìn)行返回即可,但如果此時(shí)該自由鏈表為空,那么這時(shí)thread cache就需要向central cache申請(qǐng)內(nèi)存了。
central cache的結(jié)構(gòu)與thread cache是一樣的,它們都是哈希桶的結(jié)構(gòu),并且它們遵循的對(duì)齊映射規(guī)則都是一樣的。這樣做的好處就是,當(dāng)thread cache的某個(gè)桶中沒(méi)有內(nèi)存了,就可以直接到central cache中對(duì)應(yīng)的哈希桶里去取內(nèi)存就行了。
central cache與thread cache的不同之處
central cache與thread cache有兩個(gè)明顯不同的地方,首先,thread cache是每個(gè)線程獨(dú)享的,而central cache是所有線程共享的,因?yàn)槊總€(gè)線程的thread cache沒(méi)有內(nèi)存了都會(huì)去找central cache,因此在訪問(wèn)central cache時(shí)是需要加鎖的。
但central cache在加鎖時(shí)并不是將整個(gè)central cache全部鎖上了,central cache在加鎖時(shí)用的是桶鎖,也就是說(shuō)每個(gè)桶都有一個(gè)鎖。此時(shí)只有當(dāng)多個(gè)線程同時(shí)訪問(wèn)central cache的同一個(gè)桶時(shí)才會(huì)存在鎖競(jìng)爭(zhēng),如果是多個(gè)線程同時(shí)訪問(wèn)central cache的不同桶就不會(huì)存在鎖競(jìng)爭(zhēng)。
central cache與thread cache的第二個(gè)不同之處就是,thread cache的每個(gè)桶中掛的是一個(gè)個(gè)切好的內(nèi)存塊,而central cache的每個(gè)桶中掛的是一個(gè)個(gè)的span。
每個(gè)span管理的都是一個(gè)以頁(yè)為單位的大塊內(nèi)存,每個(gè)桶里面的若干span是按照雙鏈表的形式鏈接起來(lái)的,并且每個(gè)span里面還有一個(gè)自由鏈表,這個(gè)自由鏈表里面掛的就是一個(gè)個(gè)切好了的內(nèi)存塊,根據(jù)其所在的哈希桶這些內(nèi)存塊被切成了對(duì)應(yīng)的大小。
centralcache結(jié)構(gòu)設(shè)計(jì)
頁(yè)號(hào)的類型?
每個(gè)程序運(yùn)行起來(lái)后都有自己的進(jìn)程地址空間,在32位平臺(tái)下,進(jìn)程地址空間的大小是232;而在64位平臺(tái)下,進(jìn)程地址空間的大小就是264。
頁(yè)的大小一般是4K或者8K,我們以8K為例。在32位平臺(tái)下,進(jìn)程地址空間就可以被分成 2 32 ÷ 2 13 = 2 19 2^{32}\div2^{13}=2^{19} 232÷213=219個(gè)頁(yè);在64位平臺(tái)下,進(jìn)程地址空間就可以被分成 2 64 ÷ 2 13 = 2 51 2^{64}\div2^{13}=2^{51} 264÷213=251個(gè)頁(yè)。頁(yè)號(hào)本質(zhì)與地址是一樣的,它們都是一個(gè)編號(hào),只不過(guò)地址是以一個(gè)字節(jié)為一個(gè)單位,而頁(yè)是以多個(gè)字節(jié)為一個(gè)單位。
由于頁(yè)號(hào)在64位平臺(tái)下的取值范圍是 [ [ [ 0 , 2 51 ) 0,2^{51}) 0,251),因此我們不能簡(jiǎn)單的用一個(gè)無(wú)符號(hào)整型來(lái)存儲(chǔ)頁(yè)號(hào),這時(shí)我們需要借助條件編譯來(lái)解決這個(gè)問(wèn)題。
#ifdef _WIN64 typedef unsigned long long PAGE_ID; #elif _WIN32 typedef size_t PAGE_ID; #else //linux #endif
需要注意的是,在32位下,_WIN32有定義,_WIN64沒(méi)有定義;而在64位下,_WIN32和_WIN64都有定義。因此在條件編譯時(shí),我們應(yīng)該先判斷_WIN64是否有定義,再判斷_WIN32是否有定義。
span的結(jié)構(gòu)
central cache的每個(gè)桶里掛的是一個(gè)個(gè)的span,span是一個(gè)管理以頁(yè)為單位的大塊內(nèi)存,span的結(jié)構(gòu)如下:
//管理以頁(yè)為單位的大塊內(nèi)存 struct Span { PAGE_ID _pageId = 0; //大塊內(nèi)存起始頁(yè)的頁(yè)號(hào) size_t _n = 0; //頁(yè)的數(shù)量 Span* _next = nullptr; //雙鏈表結(jié)構(gòu) Span* _prev = nullptr; size_t _useCount = 0; //切好的小塊內(nèi)存,被分配給thread cache的計(jì)數(shù) void* _freeList = nullptr; //切好的小塊內(nèi)存的自由鏈表 };
對(duì)于span管理的以頁(yè)為單位的大塊內(nèi)存,我們需要知道這塊內(nèi)存具體在哪一個(gè)位置,便于之后page cache進(jìn)行前后頁(yè)的合并,因此span結(jié)構(gòu)當(dāng)中會(huì)記錄所管理大塊內(nèi)存起始頁(yè)的頁(yè)號(hào)。
至于每一個(gè)span管理的到底是多少個(gè)頁(yè),這并不是固定的,需要根據(jù)多方面的因素來(lái)控制,因此span結(jié)構(gòu)當(dāng)中有一個(gè)_n成員,該成員就代表著該span管理的頁(yè)的數(shù)量。
此外,每個(gè)span管理的大塊內(nèi)存,都會(huì)被切成相應(yīng)大小的內(nèi)存塊掛到當(dāng)前span的自由鏈表中,比如8Byte哈希桶中的span,會(huì)被切成一個(gè)個(gè)8Byte大小的內(nèi)存塊掛到當(dāng)前span的自由鏈表中,因此span結(jié)構(gòu)中需要存儲(chǔ)切好的小塊內(nèi)存的自由鏈表。
span結(jié)構(gòu)當(dāng)中的_useCount成員記錄的就是,當(dāng)前span中切好的小塊內(nèi)存,被分配給thread cache的計(jì)數(shù),當(dāng)某個(gè)span的_useCount計(jì)數(shù)變?yōu)?時(shí),代表當(dāng)前span切出去的內(nèi)存塊對(duì)象全部還回來(lái)了,此時(shí)central cache就可以將這個(gè)span再還給page cache。
每個(gè)桶當(dāng)中的span是以雙鏈表的形式組織起來(lái)的,當(dāng)我們需要將某個(gè)span歸還給page cache時(shí),就可以很方便的將該span從雙鏈表結(jié)構(gòu)中移出。如果用單鏈表結(jié)構(gòu)的話就比較麻煩了,因?yàn)閱捂湵碓趧h除時(shí),需要知道當(dāng)前結(jié)點(diǎn)的前一個(gè)結(jié)點(diǎn)。
雙鏈表結(jié)構(gòu)
根據(jù)上面的描述,central cache的每個(gè)哈希桶里面存儲(chǔ)的都是一個(gè)雙鏈表結(jié)構(gòu),對(duì)于該雙鏈表結(jié)構(gòu)我們可以對(duì)其進(jìn)行封裝。
//帶頭雙向循環(huán)鏈表 class SpanList { public: SpanList() { _head = new Span; _head->_next = _head; _head->_prev = _head; } void Insert(Span* pos, Span* newSpan) { assert(pos); assert(newSpan); Span* prev = pos->_prev; prev->_next = newSpan; newSpan->_prev = prev; newSpan->_next = pos; pos->_prev = newSpan; } void Erase(Span* pos) { assert(pos); assert(pos != _head); //不能刪除哨兵位的頭結(jié)點(diǎn) Span* prev = pos->_prev; Span* next = pos->_next; prev->_next = next; next->_prev = prev; } private: Span* _head; public: std::mutex _mtx; //桶鎖 };
需要注意的是,從雙鏈表刪除的span會(huì)還給下一層的page cache,相當(dāng)于只是把這個(gè)span從雙鏈表中移除,因此不需要對(duì)刪除的span進(jìn)行delete操作。
central cache的結(jié)構(gòu)
central cache的映射規(guī)則和thread cache是一樣的,因此central cache里面哈希桶的個(gè)數(shù)也是208,但central cache每個(gè)哈希桶中存儲(chǔ)就是我們上面定義的雙鏈表結(jié)構(gòu)。
class CentralCache { public: //... private: SpanList _spanLists[NFREELISTS]; };
central cache和thread cache的映射規(guī)則一樣,有一個(gè)好處就是,當(dāng)thread cache的某個(gè)桶沒(méi)有內(nèi)存了,就可以直接去central cache對(duì)應(yīng)的哈希桶進(jìn)行申請(qǐng)就行了。
centralcache核心實(shí)現(xiàn)
central cache的實(shí)現(xiàn)方式
每個(gè)線程都有一個(gè)屬于自己的thread cache,我們是用TLS來(lái)實(shí)現(xiàn)每個(gè)線程無(wú)鎖的訪問(wèn)屬于自己的thread cache的。而central cache和page cache在整個(gè)進(jìn)程中只有一個(gè),對(duì)于這種只能創(chuàng)建一個(gè)對(duì)象的類,我們可以將其設(shè)置為單例模式。
單例模式可以保證系統(tǒng)中該類只有一個(gè)實(shí)例,并提供一個(gè)訪問(wèn)它的全局訪問(wèn)點(diǎn),該實(shí)例被所有程序模塊共享。單例模式又分為餓漢模式和懶漢模式,懶漢模式相對(duì)較復(fù)雜,我們這里使用餓漢模式就足夠了。
//單例模式 class CentralCache { public: //提供一個(gè)全局訪問(wèn)點(diǎn) static CentralCache* GetInstance() { return &_sInst; } private: SpanList _spanLists[NFREELISTS]; private: CentralCache() //構(gòu)造函數(shù)私有 {} CentralCache(const CentralCache&) = delete; //防拷貝 static CentralCache _sInst; };
為了保證CentralCache類只能創(chuàng)建一個(gè)對(duì)象,我們需要將central cache的構(gòu)造函數(shù)和拷貝構(gòu)造函數(shù)設(shè)置為私有,或者在C++11中也可以在函數(shù)聲明的后面加上=delete進(jìn)行修飾。
CentralCache類當(dāng)中還需要有一個(gè)CentralCache類型的靜態(tài)的成員變量,當(dāng)程序運(yùn)行起來(lái)后我們就立馬創(chuàng)建該對(duì)象,在此后的程序中就只有這一個(gè)單例了。
CentralCache CentralCache::_sInst;
最后central cache還需要提供一個(gè)公有的成員函數(shù),用于獲取該對(duì)象,此時(shí)在整個(gè)進(jìn)程中就只會(huì)有一個(gè)central cache對(duì)象了。
慢開(kāi)始反饋調(diào)節(jié)算法
當(dāng)thread cache向central cache申請(qǐng)內(nèi)存時(shí),central cache應(yīng)該給出多少個(gè)對(duì)象呢?這是一個(gè)值得思考的問(wèn)題,如果central cache給的太少,那么thread cache在短時(shí)間內(nèi)用完了又會(huì)來(lái)申請(qǐng);但如果一次性給的太多了,可能thread cache用不完也就浪費(fèi)了。
鑒于此,我們這里采用了一個(gè)慢開(kāi)始反饋調(diào)節(jié)算法。當(dāng)thread cache向central cache申請(qǐng)內(nèi)存時(shí),如果申請(qǐng)的是較小的對(duì)象,那么可以多給一點(diǎn),但如果申請(qǐng)的是較大的對(duì)象,就可以少給一點(diǎn)。
通過(guò)下面這個(gè)函數(shù),我們就可以根據(jù)所需申請(qǐng)的對(duì)象的大小計(jì)算出具體給出的對(duì)象個(gè)數(shù),并且可以將給出的對(duì)象個(gè)數(shù)控制到2~512個(gè)之間。也就是說(shuō),就算thread cache要申請(qǐng)的對(duì)象再小,我最多一次性給出512個(gè)對(duì)象;就算thread cache要申請(qǐng)的對(duì)象再大,我至少一次性給出2個(gè)對(duì)象。
//管理對(duì)齊和映射等關(guān)系 class SizeClass { public: //thread cache一次從central cache獲取對(duì)象的上限 static size_t NumMoveSize(size_t size) { assert(size > 0); //對(duì)象越小,計(jì)算出的上限越高 //對(duì)象越大,計(jì)算出的上限越低 int num = MAX_BYTES / size; if (num < 2) num = 2; if (num > 512) num = 512; return num; } };
但就算申請(qǐng)的是小對(duì)象,一次性給出512個(gè)也是比較多的,基于這個(gè)原因,我們可以在FreeList結(jié)構(gòu)中增加一個(gè)叫做_maxSize的成員變量,該變量的初始值設(shè)置為1,并且提供一個(gè)公有成員函數(shù)用于獲取這個(gè)變量。也就是說(shuō),現(xiàn)在thread cache中的每個(gè)自由鏈表都會(huì)有一個(gè)自己的_maxSize。
//管理切分好的小對(duì)象的自由鏈表 class FreeList { public: size_t& MaxSize() { return _maxSize; } private: void* _freeList = nullptr; //自由鏈表 size_t _maxSize = 1; };
此時(shí)當(dāng)thread cache申請(qǐng)對(duì)象時(shí),我們會(huì)比較_maxSize和計(jì)算得出的值,取出其中的較小值作為本次申請(qǐng)對(duì)象的個(gè)數(shù)。此外,如果本次采用的是_maxSize的值,那么還會(huì)將thread cache中該自由鏈表的_maxSize的值進(jìn)行加一。
因此,thread cache第一次向central cache申請(qǐng)某大小的對(duì)象時(shí),申請(qǐng)到的都是一個(gè),但下一次thread cache再向central cache申請(qǐng)同樣大小的對(duì)象時(shí),因?yàn)樵撟杂涉湵碇械腳maxSize增加了,最終就會(huì)申請(qǐng)到兩個(gè)。直到該自由鏈表中_maxSize的值,增長(zhǎng)到超過(guò)計(jì)算出的值后就不會(huì)繼續(xù)增長(zhǎng)了,此后申請(qǐng)到的對(duì)象個(gè)數(shù)就是計(jì)算出的個(gè)數(shù)。(這有點(diǎn)像網(wǎng)絡(luò)中擁塞控制的機(jī)制)
從中心緩存獲取對(duì)象
每次thread cache向central cache申請(qǐng)對(duì)象時(shí),我們先通過(guò)慢開(kāi)始反饋調(diào)節(jié)算法計(jì)算出本次應(yīng)該申請(qǐng)的對(duì)象的個(gè)數(shù),然后再向central cache進(jìn)行申請(qǐng)。
如果thread cache最終申請(qǐng)到對(duì)象的個(gè)數(shù)就是一個(gè),那么直接將該對(duì)象返回即可。為什么需要返回一個(gè)申請(qǐng)到的對(duì)象呢?因?yàn)閠hread cache要向central cache申請(qǐng)對(duì)象,其實(shí)由于某個(gè)線程向thread cache申請(qǐng)對(duì)象但thread cache當(dāng)中沒(méi)有,這才導(dǎo)致thread cache要向central cache申請(qǐng)對(duì)象。因此central cache將對(duì)象返回給thread cache后,thread cache會(huì)再將該對(duì)象返回給申請(qǐng)對(duì)象的線程。
但如果thread cache最終申請(qǐng)到的是多個(gè)對(duì)象,那么除了將第一個(gè)對(duì)象返回之外,還需要將剩下的對(duì)象掛到thread cache對(duì)應(yīng)的哈希桶當(dāng)中。
//從中心緩存獲取對(duì)象 void* ThreadCache::FetchFromCentralCache(size_t index, size_t size) { //慢開(kāi)始反饋調(diào)節(jié)算法 //1、最開(kāi)始不會(huì)一次向central cache一次批量要太多,因?yàn)橐嗔丝赡苡貌煌? //2、如果你不斷有size大小的內(nèi)存需求,那么batchNum就會(huì)不斷增長(zhǎng),直到上限 size_t batchNum = std::min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size)); if (batchNum == _freeLists[index].MaxSize()) { _freeLists[index].MaxSize() += 1; } void* start = nullptr; void* end = nullptr; size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size); assert(actualNum >= 1); //至少有一個(gè) if (actualNum == 1) //申請(qǐng)到對(duì)象的個(gè)數(shù)是一個(gè),則直接將這一個(gè)對(duì)象返回即可 { assert(start == end); return start; } else //申請(qǐng)到對(duì)象的個(gè)數(shù)是多個(gè),還需要將剩下的對(duì)象掛到thread cache中對(duì)應(yīng)的哈希桶中 { _freeLists[index].PushRange(NextObj(start), end); return start; } }
從中心緩存獲取一定數(shù)量的對(duì)象
這里我們要從central cache獲取n個(gè)指定大小的對(duì)象,這些對(duì)象肯定都是從central cache對(duì)應(yīng)哈希桶的某個(gè)span中取出來(lái)的,因此取出來(lái)的這n個(gè)對(duì)象是鏈接在一起的,我們只需要得到這段鏈表的頭和尾即可,這里可以采用輸出型參數(shù)進(jìn)行獲取。
//從central cache獲取一定數(shù)量的對(duì)象給thread cache size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t n, size_t size) { size_t index = SizeClass::Index(size); _spanLists[index]._mtx.lock(); //加鎖 //在對(duì)應(yīng)哈希桶中獲取一個(gè)非空的span Span* span = GetOneSpan(_spanLists[index], size); assert(span); //span不為空 assert(span->_freeList); //span當(dāng)中的自由鏈表也不為空 //從span中獲取n個(gè)對(duì)象 //如果不夠n個(gè),有多少拿多少 start = span->_freeList; end = span->_freeList; size_t actualNum = 1; while (NextObj(end)&&n - 1) { end = NextObj(end); actualNum++; n--; } span->_freeList = NextObj(end); //取完后剩下的對(duì)象繼續(xù)放到自由鏈表 NextObj(end) = nullptr; //取出的一段鏈表的表尾置空 span->_useCount += actualNum; //更新被分配給thread cache的計(jì)數(shù) _spanLists[index]._mtx.unlock(); //解鎖 return actualNum; }
由于central cache是所有線程共享的,所以我們?cè)谠L問(wèn)central cache中的哈希桶時(shí),需要先給對(duì)應(yīng)的哈希桶加上桶鎖,在獲取到對(duì)象后再將桶鎖解掉。
在向central cache獲取對(duì)象時(shí),先是在central cache對(duì)應(yīng)的哈希桶中獲取到一個(gè)非空的span,然后從這個(gè)span的自由鏈表中取出n個(gè)對(duì)象即可,但可能這個(gè)非空的span的自由鏈表當(dāng)中對(duì)象的個(gè)數(shù)不足n個(gè),這時(shí)該自由鏈表當(dāng)中有多少個(gè)對(duì)象就給多少就行了。
也就是說(shuō),thread cache實(shí)際從central cache獲得的對(duì)象的個(gè)數(shù)可能與我們傳入的n值是不一樣的,因此我們需要統(tǒng)計(jì)本次申請(qǐng)過(guò)程中,實(shí)際thread cache獲取到的對(duì)象個(gè)數(shù),然后根據(jù)該值及時(shí)更新這個(gè)span中的小對(duì)象被分配給thread cache的計(jì)數(shù)。
需要注意的是,雖然我們實(shí)際申請(qǐng)到對(duì)象的個(gè)數(shù)可能比n要小,但這并不會(huì)產(chǎn)生任何影響。因?yàn)閠hread cache的本意就是向central cache申請(qǐng)一個(gè)對(duì)象,我們之所以要一次多申請(qǐng)一些對(duì)象,是因?yàn)檫@樣一來(lái)下次線程再申請(qǐng)相同大小的對(duì)象時(shí)就可以直接在thread cache里面獲取了,而不用再向central cache申請(qǐng)對(duì)象。
插入一段范圍的對(duì)象到自由鏈表
此外,如果thread cache最終從central cache獲取到的對(duì)象個(gè)數(shù)是大于一的,那么我們還需要將剩下的對(duì)象插入到thread cache中對(duì)應(yīng)的哈希桶中,為了能讓自由鏈表支持插入一段范圍的對(duì)象,我們還需要在FreeList類中增加一個(gè)對(duì)應(yīng)的成員函數(shù)。
//管理切分好的小對(duì)象的自由鏈表 class FreeList { public: //插入一段范圍的對(duì)象到自由鏈表 void PushRange(void* start, void* end) { assert(start); assert(end); //頭插 NextObj(end) = _freeList; _freeList = start; } private: void* _freeList = nullptr; //自由鏈表 size_t _maxSize = 1; };
pagecache
pagecache整體設(shè)計(jì)
page cache與central cache結(jié)構(gòu)的相同之處
page cache與central cache一樣,它們都是哈希桶的結(jié)構(gòu),并且page cache的每個(gè)哈希桶中里掛的也是一個(gè)個(gè)的span,這些span也是按照雙鏈表的結(jié)構(gòu)鏈接起來(lái)的。
page cache與central cache結(jié)構(gòu)的不同之處
首先,central cache的映射規(guī)則與thread cache保持一致,而page cache的映射規(guī)則與它們都不相同。page cache的哈希桶映射規(guī)則采用的是直接定址法,比如1號(hào)桶掛的都是1頁(yè)的span,2號(hào)桶掛的都是2頁(yè)的span,以此類推。
其次,central cache每個(gè)桶中的span被切成了一個(gè)個(gè)對(duì)應(yīng)大小的對(duì)象,以供thread cache申請(qǐng)。而page cache當(dāng)中的span是沒(méi)有被進(jìn)一步切小的,因?yàn)閜age cache服務(wù)的是central cache,當(dāng)central cache沒(méi)有span時(shí),向page cache申請(qǐng)的是某一固定頁(yè)數(shù)的span,而如何切分申請(qǐng)到的這個(gè)span就應(yīng)該由central cache自己來(lái)決定。
至于page cache當(dāng)中究竟有多少個(gè)桶,這就要看你最大想掛幾頁(yè)的span了,這里我們就最大掛128頁(yè)的span,為了讓桶號(hào)與頁(yè)號(hào)對(duì)應(yīng)起來(lái),我們可以將第0號(hào)桶空出來(lái)不用,因此我們需要將哈希桶的個(gè)數(shù)設(shè)置為129。
//page cache中哈希桶的個(gè)數(shù) static const size_t NPAGES = 129;
為什么這里最大掛128頁(yè)的span呢?因?yàn)榫€程申請(qǐng)單個(gè)對(duì)象最大是256KB,而128頁(yè)可以被切成4個(gè)256KB的對(duì)象,因此是足夠的。當(dāng)然,如果你想在page cache中掛更大的span也是可以的,根據(jù)具體的需求進(jìn)行設(shè)置就行了。
在page cache獲取一個(gè)n頁(yè)的span的過(guò)程
如果central cache要獲取一個(gè)n頁(yè)的span,那我們就可以在page cache的第n號(hào)桶中取出一個(gè)span返回給central cache即可,但如果第n號(hào)桶中沒(méi)有span了,這時(shí)我們并不是直接轉(zhuǎn)而向堆申請(qǐng)一個(gè)n頁(yè)的span,而是要繼續(xù)在后面的桶當(dāng)中尋找span。
直接向堆申請(qǐng)以頁(yè)為單位的內(nèi)存時(shí),我們應(yīng)該盡量申請(qǐng)大塊一點(diǎn)的內(nèi)存塊,因?yàn)榇藭r(shí)申請(qǐng)到的內(nèi)存是連續(xù)的,當(dāng)線程需要內(nèi)存時(shí)我們可以將其切小后分配給線程,而當(dāng)線程將內(nèi)存釋放后我們又可以將其合并成大塊的連續(xù)內(nèi)存。如果我們向堆申請(qǐng)內(nèi)存時(shí)是小塊小塊的申請(qǐng)的,那么我們申請(qǐng)到的內(nèi)存就不一定是連續(xù)的了。
因此,當(dāng)?shù)趎號(hào)桶中沒(méi)有span時(shí),我們可以繼續(xù)找第n+1號(hào)桶,因?yàn)槲覀兛梢詫+1頁(yè)的span切分成一個(gè)n頁(yè)的span和一個(gè)1頁(yè)的span,這時(shí)我們就可以將n頁(yè)的span返回,而將切分后1頁(yè)的span掛到1號(hào)桶中。但如果后面的桶當(dāng)中都沒(méi)有span,這時(shí)我們就只能向堆申請(qǐng)一個(gè)128頁(yè)的內(nèi)存塊,并將其用一個(gè)span結(jié)構(gòu)管理起來(lái),然后將128頁(yè)的span切分成n頁(yè)的span和128-n頁(yè)的span,其中n頁(yè)的span返回給central cache,而128-n頁(yè)的span就掛到第128-n號(hào)桶中。
也就是說(shuō),我們每次向堆申請(qǐng)的都是128頁(yè)大小的內(nèi)存塊,central cache要的這些span實(shí)際都是由128頁(yè)的span切分出來(lái)的。
page cache的實(shí)現(xiàn)方式
當(dāng)每個(gè)線程的thread cache沒(méi)有內(nèi)存時(shí)都會(huì)向central cache申請(qǐng),此時(shí)多個(gè)線程的thread cache如果訪問(wèn)的不是central cache的同一個(gè)桶,那么這些線程是可以同時(shí)進(jìn)行訪問(wèn)的。這時(shí)central cache的多個(gè)桶就可能同時(shí)向page cache申請(qǐng)內(nèi)存的,所以page cache也是存在線程安全問(wèn)題的,因此在訪問(wèn)page cache時(shí)也必須要加鎖。
但是在page cache這里我們不能使用桶鎖,因?yàn)楫?dāng)central cache向page cache申請(qǐng)內(nèi)存時(shí),page cache可能會(huì)將其他桶當(dāng)中大頁(yè)的span切小后再給central cache。此外,當(dāng)central cache將某個(gè)span歸還給page cache時(shí),page cache也會(huì)嘗試將該span與其他桶當(dāng)中的span進(jìn)行合并。
也就是說(shuō),在訪問(wèn)page cache時(shí),我們可能需要訪問(wèn)page cache中的多個(gè)桶,如果page cache用桶鎖就會(huì)出現(xiàn)大量頻繁的加鎖和解鎖,導(dǎo)致程序的效率低下。因此我們?cè)谠L問(wèn)page cache時(shí)使用沒(méi)有使用桶鎖,而是用一個(gè)大鎖將整個(gè)page cache給鎖住。
而thread cache在訪問(wèn)central cache時(shí),只需要訪問(wèn)central cache中對(duì)應(yīng)的哈希桶就行了,因?yàn)閏entral cache的每個(gè)哈希桶中的span都被切分成了對(duì)應(yīng)大小,thread cache只需要根據(jù)自己所需對(duì)象的大小訪問(wèn)central cache中對(duì)應(yīng)的哈希桶即可,不會(huì)訪問(wèn)其他哈希桶,因此central cache可以用桶鎖。
此外,page cache在整個(gè)進(jìn)程中也是只能存在一個(gè)的,因此我們也需要將其設(shè)置為單例模式。
//單例模式 class PageCache { public: //提供一個(gè)全局訪問(wèn)點(diǎn) static PageCache* GetInstance() { return &_sInst; } private: SpanList _spanLists[NPAGES]; std::mutex _pageMtx; //大鎖 private: PageCache() //構(gòu)造函數(shù)私有 {} PageCache(const PageCache&) = delete; //防拷貝 static PageCache _sInst; };
當(dāng)程序運(yùn)行起來(lái)后我們就立馬創(chuàng)建該對(duì)象即可。
PageCache PageCache::_sInst;
pagecache中獲取Span
獲取一個(gè)非空的span
thread cache向central cache申請(qǐng)對(duì)象時(shí),central cache需要先從對(duì)應(yīng)的哈希桶中獲取到一個(gè)非空的span,然后從這個(gè)非空的span中取出若干對(duì)象返回給thread cache。那central cache到底是如何從對(duì)應(yīng)的哈希桶中,獲取到一個(gè)非空的span的呢?
首先當(dāng)然是先遍歷central cache對(duì)應(yīng)哈希桶當(dāng)中的雙鏈表,如果該雙鏈表中有非空的span,那么直接將該span進(jìn)行返回即可。為了方便遍歷這個(gè)雙鏈表,我們可以模擬迭代器的方式,給SpanList類提供Begin和End成員函數(shù),分別用于獲取雙鏈表中的第一個(gè)span和最后一個(gè)span的下一個(gè)位置,也就是頭結(jié)點(diǎn)。
//帶頭雙向循環(huán)鏈表 class SpanList { public: Span* Begin() { return _head->_next; } Span* End() { return _head; } private: Span* _head; public: std::mutex _mtx; //桶鎖 };
但如果遍歷雙鏈表后發(fā)現(xiàn)雙鏈表中沒(méi)有span,或該雙鏈表中的span都為空,那么此時(shí)central cache就需要向page cache申請(qǐng)內(nèi)存塊了。
那具體是向page cache申請(qǐng)多大的內(nèi)存塊呢?我們可以根據(jù)具體所需對(duì)象的大小來(lái)決定,就像之前我們根據(jù)對(duì)象的大小計(jì)算出,thread cache一次向central cache申請(qǐng)對(duì)象的個(gè)數(shù)上限,現(xiàn)在我們是根據(jù)對(duì)象的大小計(jì)算出,central cache一次應(yīng)該向page cache申請(qǐng)幾頁(yè)的內(nèi)存塊。
我們可以先根據(jù)對(duì)象的大小計(jì)算出,thread cache一次向central cache申請(qǐng)對(duì)象的個(gè)數(shù)上限,然后將這個(gè)上限值乘以單個(gè)對(duì)象的大小,就算出了具體需要多少字節(jié),最后再將這個(gè)算出來(lái)的字節(jié)數(shù)轉(zhuǎn)換為頁(yè)數(shù),如果轉(zhuǎn)換后不夠一頁(yè),那么我們就申請(qǐng)一頁(yè),否則轉(zhuǎn)換出來(lái)是幾頁(yè)就申請(qǐng)幾頁(yè)。也就是說(shuō),central cache向page cache申請(qǐng)內(nèi)存時(shí),要求申請(qǐng)到的內(nèi)存盡量能夠滿足thread cache向central cache申請(qǐng)時(shí)的上限。
//管理對(duì)齊和映射等關(guān)系 class SizeClass { public: //central cache一次向page cache獲取多少頁(yè) static size_t NumMovePage(size_t size) { size_t num = NumMoveSize(size); //計(jì)算出thread cache一次向central cache申請(qǐng)對(duì)象的個(gè)數(shù)上限 size_t nPage = num*size; //num個(gè)size大小的對(duì)象所需的字節(jié)數(shù) nPage >>= PAGE_SHIFT; //將字節(jié)數(shù)轉(zhuǎn)換為頁(yè)數(shù) if (nPage == 0) //至少給一頁(yè) nPage = 1; return nPage; } };
代碼中的PAGE_SHIFT
代表頁(yè)大小轉(zhuǎn)換偏移,我們這里以頁(yè)的大小為8K為例,PAGE_SHIFT
的值就是13。
//頁(yè)大小轉(zhuǎn)換偏移,即一頁(yè)定義為2^13,也就是8KB static const size_t PAGE_SHIFT = 13;
需要注意的是,當(dāng)central cache申請(qǐng)到若干頁(yè)的span后,還需要將這個(gè)span切成一個(gè)個(gè)對(duì)應(yīng)大小的對(duì)象掛到該span的自由鏈表當(dāng)中。
如何找到一個(gè)span所管理的內(nèi)存塊呢?首先需要計(jì)算出該span的起始地址,我們可以用這個(gè)span的起始頁(yè)號(hào)乘以一頁(yè)的大小即可得到這個(gè)span的起始地址,然后用這個(gè)span的頁(yè)數(shù)乘以一頁(yè)的大小就可以得到這個(gè)span所管理的內(nèi)存塊的大小,用起始地址加上內(nèi)存塊的大小即可得到這塊內(nèi)存塊的結(jié)束位置。
明確了這塊內(nèi)存的起始和結(jié)束位置后,我們就可以進(jìn)行切分了。根據(jù)所需對(duì)象的大小,每次從大塊內(nèi)存切出一塊固定大小的內(nèi)存塊尾插到span的自由鏈表中即可。
為什么是尾插呢?因?yàn)槲覀內(nèi)绻菍⑶泻玫膶?duì)象尾插到自由鏈表,這些對(duì)象看起來(lái)是按照鏈?zhǔn)浇Y(jié)構(gòu)鏈接起來(lái)的,而實(shí)際它們?cè)谖锢砩鲜沁B續(xù)的,這時(shí)當(dāng)我們把這些連續(xù)內(nèi)存分配給某個(gè)線程使用時(shí),可以提高該線程的CPU緩存利用率。
//獲取一個(gè)非空的span Span* CentralCache::GetOneSpan(SpanList& spanList, size_t size) { //1、先在spanList中尋找非空的span Span* it = spanList.Begin(); while (it != spanList.End()) { if (it->_freeList != nullptr) { return it; } else { it = it->_next; } } //2、spanList中沒(méi)有非空的span,只能向page cache申請(qǐng) Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size)); //計(jì)算span的大塊內(nèi)存的起始地址和大塊內(nèi)存的大?。ㄗ止?jié)數(shù)) char* start = (char*)(span->_pageId << PAGE_SHIFT); size_t bytes = span->_n << PAGE_SHIFT; //把大塊內(nèi)存切成size大小的對(duì)象鏈接起來(lái) char* end = start + bytes; //先切一塊下來(lái)去做尾,方便尾插 span->_freeList = start; start += size; void* tail = span->_freeList; //尾插 while (start < end) { NextObj(tail) = start; tail = NextObj(tail); start += size; } NextObj(tail) = nullptr; //尾的指向置空 //將切好的span頭插到spanList spanList.PushFront(span); return span; }
需要注意的是,當(dāng)我們把span切好后,需要將這個(gè)切好的span掛到central cache的對(duì)應(yīng)哈希桶中。因此SpanList類還需要提供一個(gè)接口,用于將一個(gè)span插入到該雙鏈表中。這里我們選擇的是頭插,這樣當(dāng)central cache下一次從該雙鏈表中獲取非空span時(shí),一來(lái)就能找到。
由于SpanList類之前實(shí)現(xiàn)了Insert和Begin函數(shù),這里實(shí)現(xiàn)雙鏈表頭插就非常簡(jiǎn)單,直接在雙鏈表的Begin位置進(jìn)行Insert即可。
//帶頭雙向循環(huán)鏈表 class SpanList { public: void PushFront(Span* span) { Insert(Begin(), span); } private: Span* _head; public: std::mutex _mtx; //桶鎖 };
獲取一個(gè)k頁(yè)的span
當(dāng)我們調(diào)用上述的GetOneSpan從central cache的某個(gè)哈希桶獲取一個(gè)非空的span時(shí),如果遍歷哈希桶中的雙鏈表后發(fā)現(xiàn)雙鏈表中沒(méi)有span,或該雙鏈表中的span都為空,那么此時(shí)central cache就需要向page cache申請(qǐng)若干頁(yè)的span了,下面我們就來(lái)說(shuō)說(shuō)如何從page cache獲取一個(gè)k頁(yè)的span。
因?yàn)閜age cache是直接按照頁(yè)數(shù)進(jìn)行映射的,因此我們要從page cache獲取一個(gè)k頁(yè)的span,就應(yīng)該直接先去找page cache的第k號(hào)桶,如果第k號(hào)桶中有span,那我們直接頭刪一個(gè)span返回給central cache就行了。所以我們這里需要再給SpanList類添加對(duì)應(yīng)的Empty和PopFront函數(shù)。
//帶頭雙向循環(huán)鏈表 class SpanList { public: bool Empty() { return _head == _head->_next; } Span* PopFront() { Span* front = _head->_next; Erase(front); return front; } private: Span* _head; public: std::mutex _mtx; //桶鎖 };
如果page cache的第k號(hào)桶中沒(méi)有span,我們就應(yīng)該繼續(xù)找后面的桶,只要后面任意一個(gè)桶中有一個(gè)n頁(yè)span,我們就可以將其切分成一個(gè)k頁(yè)的span和一個(gè)n-k頁(yè)的span,然后將切出來(lái)k頁(yè)的span返回給central cache,再將n-k頁(yè)的span掛到page cache的第n-k號(hào)桶即可。
但如果后面的桶中也都沒(méi)有span,此時(shí)我們就需要向堆申請(qǐng)一個(gè)128頁(yè)的span了,在向堆申請(qǐng)內(nèi)存時(shí),直接調(diào)用我們封裝的SystemAlloc函數(shù)即可。
需要注意的是,向堆申請(qǐng)內(nèi)存后得到的是這塊內(nèi)存的起始地址,此時(shí)我們需要將該地址轉(zhuǎn)換為頁(yè)號(hào)。由于我們向堆申請(qǐng)內(nèi)存時(shí)都是按頁(yè)進(jìn)行申請(qǐng)的,因此我們直接將該地址除以一頁(yè)的大小即可得到對(duì)應(yīng)的頁(yè)號(hào)。
//獲取一個(gè)k頁(yè)的span Span* PageCache::NewSpan(size_t k) { assert(k > 0 && k < NPAGES); //先檢查第k個(gè)桶里面有沒(méi)有span if (!_spanLists[k].Empty()) { return _spanLists[k].PopFront(); } //檢查一下后面的桶里面有沒(méi)有span,如果有可以將其進(jìn)行切分 for (size_t i = k + 1; i < NPAGES; i++) { if (!_spanLists[i].Empty()) { Span* nSpan = _spanLists[i].PopFront(); Span* kSpan = new Span; //在nSpan的頭部切k頁(yè)下來(lái) kSpan->_pageId = nSpan->_pageId; kSpan->_n = k; nSpan->_pageId += k; nSpan->_n -= k; //將剩下的掛到對(duì)應(yīng)映射的位置 _spanLists[nSpan->_n].PushFront(nSpan); return kSpan; } } //走到這里說(shuō)明后面沒(méi)有大頁(yè)的span了,這時(shí)就向堆申請(qǐng)一個(gè)128頁(yè)的span Span* bigSpan = new Span; void* ptr = SystemAlloc(NPAGES - 1); bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT; bigSpan->_n = NPAGES - 1; _spanLists[bigSpan->_n].PushFront(bigSpan); //盡量避免代碼重復(fù),遞歸調(diào)用自己 return NewSpan(k); }
這里說(shuō)明一下,當(dāng)我們向堆申請(qǐng)到128頁(yè)的span后,需要將其切分成k頁(yè)的span和128-k頁(yè)的span,但是為了盡量避免出現(xiàn)重復(fù)的代碼,我們最好不要再編寫(xiě)對(duì)應(yīng)的切分代碼。我們可以先將申請(qǐng)到的128頁(yè)的span掛到page cache對(duì)應(yīng)的哈希桶中,然后再遞歸調(diào)用該函數(shù)就行了,此時(shí)在往后找span時(shí)就一定會(huì)在第128號(hào)桶中找到該span,然后進(jìn)行切分。
這里其實(shí)有一個(gè)問(wèn)題:當(dāng)central cache向page cache申請(qǐng)內(nèi)存時(shí),central cache對(duì)應(yīng)的哈希桶是處于加鎖的狀態(tài)的,那在訪問(wèn)page cache之前我們應(yīng)不應(yīng)該把central cache對(duì)應(yīng)的桶鎖解掉呢?
這里建議在訪問(wèn)page cache前,先把central cache對(duì)應(yīng)的桶鎖解掉。雖然此時(shí)central cache的這個(gè)桶當(dāng)中是沒(méi)有內(nèi)存供其他thread cache申請(qǐng)的,但thread cache除了申請(qǐng)內(nèi)存還會(huì)釋放內(nèi)存,如果在訪問(wèn)page cache前將central cache對(duì)應(yīng)的桶鎖解掉,那么此時(shí)當(dāng)其他thread cache想要?dú)w還內(nèi)存到central cache的這個(gè)桶時(shí)就不會(huì)被阻塞。
因此在調(diào)用NewSpan函數(shù)之前,我們需要先將central cache對(duì)應(yīng)的桶鎖解掉,然后再將page cache的大鎖加上,當(dāng)申請(qǐng)到k頁(yè)的span后,我們需要將page cache的大鎖解掉,但此時(shí)我們不需要立刻獲取到central cache中對(duì)應(yīng)的桶鎖。因?yàn)閏entral cache拿到k頁(yè)的span后還會(huì)對(duì)其進(jìn)行切分操作,因此我們可以在span切好后需要將其掛到central cache對(duì)應(yīng)的桶上時(shí),再獲取對(duì)應(yīng)的桶鎖。
這里為了讓代碼清晰一點(diǎn),只寫(xiě)出了加鎖和解鎖的邏輯,我們只需要將這些邏輯添加到之前實(shí)現(xiàn)的GetOneSpan函數(shù)的對(duì)應(yīng)位置即可。
spanList._mtx.unlock(); //解桶鎖 PageCache::GetInstance()->_pageMtx.lock(); //加大鎖 //從page cache申請(qǐng)k頁(yè)的span PageCache::GetInstance()->_pageMtx.unlock(); //解大鎖 //進(jìn)行span的切分... spanList._mtx.lock(); //加桶鎖 //將span掛到central cache對(duì)應(yīng)的哈希桶
申請(qǐng)內(nèi)存過(guò)程聯(lián)調(diào)
ConcurrentAlloc函數(shù)
在將thread cache、central cache以及page cache的申請(qǐng)流程寫(xiě)通了之后,我們就可以向外提供一個(gè)ConcurrentAlloc函數(shù),用于申請(qǐng)內(nèi)存塊。每個(gè)線程第一次調(diào)用該函數(shù)時(shí)會(huì)通過(guò)TLS獲取到自己專屬的thread cache對(duì)象,然后每個(gè)線程就可以通過(guò)自己對(duì)應(yīng)的thread cache申請(qǐng)對(duì)象了。
static void* ConcurrentAlloc(size_t size) { //通過(guò)TLS,每個(gè)線程無(wú)鎖的獲取自己專屬的ThreadCache對(duì)象 if (pTLSThreadCache == nullptr) { pTLSThreadCache = new ThreadCache; } return pTLSThreadCache->Allocate(size); }
這里說(shuō)一下編譯時(shí)會(huì)出現(xiàn)的問(wèn)題,在C++的algorithm頭文件中有一個(gè)min函數(shù),這是一個(gè)函數(shù)模板,而在Windows.h頭文件中也有一個(gè)min,這是一個(gè)宏。由于調(diào)用函數(shù)模板時(shí)需要進(jìn)行參數(shù)類型的推演,因此當(dāng)我們調(diào)用min函數(shù)時(shí),編譯器會(huì)優(yōu)先匹配Windows.h當(dāng)中以宏的形式實(shí)現(xiàn)的min,此時(shí)當(dāng)我們以std::min的形式調(diào)用min函數(shù)時(shí)就會(huì)產(chǎn)生報(bào)錯(cuò),這就是沒(méi)有用命名空間進(jìn)行封裝的壞處,這時(shí)我們只能選擇將std::去掉,讓編譯器調(diào)用Windows.h當(dāng)中的min。
申請(qǐng)內(nèi)存過(guò)程聯(lián)調(diào)測(cè)試一
由于在多線程場(chǎng)景下調(diào)試觀察起來(lái)非常麻煩,這里就先不考慮多線程場(chǎng)景,看看在單線程場(chǎng)景下代碼的執(zhí)行邏輯是否符合我們的預(yù)期,其次,我們這里就只簡(jiǎn)單觀察在一個(gè)桶當(dāng)中的內(nèi)存申請(qǐng)就行了。
下面該線程進(jìn)行了三次內(nèi)存申請(qǐng),這三次內(nèi)存申請(qǐng)的字節(jié)數(shù)最終都對(duì)齊到了8,此時(shí)當(dāng)線程申請(qǐng)內(nèi)存時(shí)就只會(huì)訪問(wèn)到thread cache的第0號(hào)桶。
void* p1 = ConcurrentAlloc(6); void* p2 = ConcurrentAlloc(8); void* p3 = ConcurrentAlloc(1);
當(dāng)線程第一次申請(qǐng)內(nèi)存時(shí),該線程需要通過(guò)TLS獲取到自己專屬的thread cache對(duì)象,然后通過(guò)這個(gè)thread cache對(duì)象進(jìn)行內(nèi)存申請(qǐng)。
在申請(qǐng)內(nèi)存時(shí)通過(guò)計(jì)算索引到了thread cache的第0號(hào)桶,但此時(shí)thread cache的第0號(hào)桶中是沒(méi)有對(duì)象的,因此thread cache需要向central cache申請(qǐng)內(nèi)存塊。
在向central cache申請(qǐng)內(nèi)存塊前,首先通過(guò)NumMoveSize函數(shù)計(jì)算得出,thread cache一次最多可向central cache申請(qǐng)8字節(jié)大小對(duì)象的個(gè)數(shù)是512,但由于我們采用的是慢開(kāi)始算法,因此還需要將上限值與對(duì)應(yīng)自由鏈表的_maxSize的值進(jìn)行比較,而此時(shí)對(duì)應(yīng)自由鏈表_maxSize的值是1,所以最終得出本次thread cache向central cache申請(qǐng)8字節(jié)對(duì)象的個(gè)數(shù)是1個(gè)。
并且在此之后會(huì)將該自由鏈表中_maxSize的值進(jìn)行自增,下一次thread cache再向central cache申請(qǐng)8字節(jié)對(duì)象時(shí)最終申請(qǐng)對(duì)象的個(gè)數(shù)就會(huì)是2個(gè)了。
在thread cache向central cache申請(qǐng)對(duì)象之前,需要先將central cache的0號(hào)桶的鎖加上,然后再?gòu)脑撏矮@取一個(gè)非空的span。
在central cache的第0號(hào)桶獲取非空span時(shí),先遍歷對(duì)應(yīng)的span雙鏈表,看看有沒(méi)有非空的span,但此時(shí)肯定是沒(méi)有的,因此在這個(gè)過(guò)程中我們無(wú)法找到一個(gè)非空的span。
那么此時(shí)central cache就需要向page cache申請(qǐng)內(nèi)存了,但在此之前需要先把central cache第0號(hào)桶的鎖解掉,然后再將page cache的大鎖給加上,之后才能向page cache申請(qǐng)內(nèi)存。
在向page cache申請(qǐng)內(nèi)存時(shí),由于central cache一次給thread cache8字節(jié)對(duì)象的上限是512,對(duì)應(yīng)就需要4096字節(jié),所需字節(jié)數(shù)不足一頁(yè)就按一頁(yè)算,所以這里central cache就需要向page cache申請(qǐng)一頁(yè)的內(nèi)存塊。
但此時(shí)page cache的第1個(gè)桶以及之后的桶當(dāng)中都是沒(méi)有span的,因此page cache需要直接向堆申請(qǐng)一個(gè)128頁(yè)的span。
這里通過(guò)監(jiān)視窗口可以看到,用于管理申請(qǐng)到的128頁(yè)內(nèi)存的span信息。
我們可以順便驗(yàn)證一下,按頁(yè)向堆申請(qǐng)的內(nèi)存塊的起始地址和頁(yè)號(hào)之間是可以相互轉(zhuǎn)換的。
現(xiàn)在將申請(qǐng)到的128頁(yè)的span插入到page cache的第128號(hào)桶當(dāng)中,然后再調(diào)用一次NewSpan,在這次調(diào)用的時(shí)候,雖然在1號(hào)桶當(dāng)中沒(méi)有span,但是在往后找的過(guò)程中就一定會(huì)在第128號(hào)桶找到一個(gè)span。
此時(shí)我們就可以把這個(gè)128頁(yè)的span拿出來(lái),切分成1頁(yè)的span和127頁(yè)的span,將1頁(yè)的span返回給central cache,而把127頁(yè)的span掛到page cache的第127號(hào)桶即可。
從page cache返回后,就可以把page cache的大鎖解掉了,但緊接著還要將獲取到的1頁(yè)的span進(jìn)行切分,因此這里沒(méi)有立刻重新加上central cache對(duì)應(yīng)的桶鎖。
在進(jìn)行切分的時(shí)候,先通過(guò)該span的起始頁(yè)號(hào)得到該span的起始地址,然后通過(guò)該span的頁(yè)數(shù)得到該span所管理內(nèi)存塊的總的字節(jié)數(shù)。
在確定內(nèi)存塊的開(kāi)始和結(jié)束后,就可以將其切分成一個(gè)個(gè)8字節(jié)大小的對(duì)象掛到該span的自由鏈表中了。在調(diào)試過(guò)程中通過(guò)內(nèi)存監(jiān)視窗口可以看到,切分出來(lái)的每個(gè)8字節(jié)大小的對(duì)象的前四個(gè)字節(jié)存儲(chǔ)的都是下一個(gè)8字節(jié)對(duì)象的起始地址。
當(dāng)切分結(jié)束后再獲取central cache第0號(hào)桶的桶鎖,然后將這個(gè)切好的span插入到central cache的第0號(hào)桶中,最后再將這個(gè)非空的span返回,此時(shí)就獲取到了一個(gè)非空的span。
由于thread cache只向central cache申請(qǐng)了一個(gè)對(duì)象,因此拿到這個(gè)非空的span后,直接從這個(gè)span里面取出一個(gè)對(duì)象即可,此時(shí)該span的_useCount也由0變成了1。
由于此時(shí)thread cache實(shí)際只向central cache申請(qǐng)到了一個(gè)對(duì)象,因此直接將這個(gè)對(duì)象返回給線程即可。
當(dāng)線程第二次申請(qǐng)內(nèi)存塊時(shí)就不會(huì)再創(chuàng)建thread cache了,因?yàn)榈谝淮紊暾?qǐng)時(shí)就已經(jīng)創(chuàng)建好了,此時(shí)該線程直接獲取到對(duì)應(yīng)的thread cache進(jìn)行內(nèi)存塊申請(qǐng)即可。
當(dāng)該線程第二次申請(qǐng)8字節(jié)大小的對(duì)象時(shí),此時(shí)thread cache的0號(hào)桶中還是沒(méi)有對(duì)象的,因?yàn)榈谝淮蝨hread cache只向central cache申請(qǐng)了一個(gè)8字節(jié)對(duì)象,因此這次申請(qǐng)時(shí)還需要再向central cache申請(qǐng)對(duì)象。
這時(shí)thread cache向central cache申請(qǐng)對(duì)象時(shí),thread cache第0號(hào)桶中自由鏈表的_maxSize已經(jīng)慢增長(zhǎng)到2了,所以這次在向central cache申請(qǐng)對(duì)象時(shí)就會(huì)申請(qǐng)2個(gè)。如果下一次thread cache再向central cache申請(qǐng)8字節(jié)大小的對(duì)象,那么central cache會(huì)一次性給thread cache3個(gè),這就是所謂的慢增長(zhǎng)。
但由于第一次central cache向page cache申請(qǐng)了一頁(yè)的內(nèi)存塊,并將其切成了1024個(gè)8字節(jié)大小的對(duì)象,因此這次thread cache向central cache申請(qǐng)2兩個(gè)8字節(jié)的對(duì)象時(shí),central cache的第0號(hào)桶當(dāng)中是有對(duì)象的,直接返回兩個(gè)給thread cache即可,而不用再向page cache申請(qǐng)內(nèi)存了。
但線程實(shí)際申請(qǐng)的只是一個(gè)8字節(jié)對(duì)象,因此thread cache除了將一個(gè)對(duì)象返回之外,還需要將剩下的一個(gè)對(duì)象掛到thread cache的第0號(hào)桶當(dāng)中。
這樣一來(lái),當(dāng)線程第三次申請(qǐng)1字節(jié)的內(nèi)存時(shí),由于1字節(jié)對(duì)齊后也是8字節(jié),此時(shí)thread cache也就不需要再向central cache申請(qǐng)內(nèi)存塊了,直接將第0號(hào)桶當(dāng)中之前剩下的一個(gè)8字節(jié)對(duì)象返回即可。
申請(qǐng)內(nèi)存過(guò)程聯(lián)調(diào)測(cè)試二
為了進(jìn)一步測(cè)試代碼的正確性,我們可以做這樣一個(gè)測(cè)試:讓線程申請(qǐng)1024次8字節(jié)的對(duì)象,然后通過(guò)調(diào)試觀察在第1025次申請(qǐng)時(shí),central cache是否會(huì)再向page cache申請(qǐng)內(nèi)存塊。
for (size_t i = 0; i < 1024; i++) { void* p1 = ConcurrentAlloc(6); } void* p2 = ConcurrentAlloc(6);
因?yàn)閏entral cache第一次就是向page cache申請(qǐng)的一頁(yè)內(nèi)存,這一頁(yè)內(nèi)存被切成了1024個(gè)8字節(jié)大小的對(duì)象,當(dāng)這1024個(gè)對(duì)象全部被申請(qǐng)之后,再申請(qǐng)8字節(jié)大小的對(duì)象時(shí)central cache當(dāng)中就沒(méi)有對(duì)象了,此時(shí)就應(yīng)該向page cache申請(qǐng)內(nèi)存塊。
通過(guò)調(diào)試我們可以看到,第1025次申請(qǐng)8字節(jié)大小的對(duì)象時(shí),central cache第0號(hào)桶中的這個(gè)span的_useCount已經(jīng)增加到了1024,也就是說(shuō)這1024個(gè)對(duì)象都已經(jīng)被線程申請(qǐng)了,此時(shí)central cache就需要再向page cache申請(qǐng)一頁(yè)的span來(lái)進(jìn)行切分了。
而這次central cache在向page cache申請(qǐng)一頁(yè)的內(nèi)存時(shí),page cache就是將127頁(yè)span切分成了1頁(yè)的span和126頁(yè)的span了,然后central cache拿到這1頁(yè)的span后,又會(huì)將其切分成1024塊8字節(jié)大小的內(nèi)存塊以供thread cache申請(qǐng)。
threadcache回收內(nèi)存
當(dāng)某個(gè)線程申請(qǐng)的對(duì)象不用了,可以將其釋放給thread cache,然后thread cache將該對(duì)象插入到對(duì)應(yīng)哈希桶的自由鏈表當(dāng)中即可。
但是隨著線程不斷的釋放,對(duì)應(yīng)自由鏈表的長(zhǎng)度也會(huì)越來(lái)越長(zhǎng),這些內(nèi)存堆積在一個(gè)thread cache中就是一種浪費(fèi),我們應(yīng)該將這些內(nèi)存還給central cache,這樣一來(lái),這些內(nèi)存對(duì)其他線程來(lái)說(shuō)也是可申請(qǐng)的,因此當(dāng)thread cache某個(gè)桶當(dāng)中的自由鏈表太長(zhǎng)時(shí)我們可以進(jìn)行一些處理。
如果thread cache某個(gè)桶當(dāng)中自由鏈表的長(zhǎng)度超過(guò)它一次批量向central cache申請(qǐng)的對(duì)象個(gè)數(shù),那么此時(shí)我們就要把該自由鏈表當(dāng)中的這些對(duì)象還給central cache。
//釋放內(nèi)存對(duì)象 void ThreadCache::Deallocate(void* ptr, size_t size) { assert(ptr); assert(size <= MAX_BYTES); //找出對(duì)應(yīng)的自由鏈表桶將對(duì)象插入 size_t index = SizeClass::Index(size); _freeLists[index].Push(ptr); //當(dāng)自由鏈表長(zhǎng)度大于一次批量申請(qǐng)的對(duì)象個(gè)數(shù)時(shí)就開(kāi)始還一段list給central cache if (_freeLists[index].Size() >= _freeLists[index].MaxSize()) { ListTooLong(_freeLists[index], size); } }
當(dāng)自由鏈表的長(zhǎng)度大于一次批量申請(qǐng)的對(duì)象時(shí),我們具體的做法就是,從該自由鏈表中取出一次批量個(gè)數(shù)的對(duì)象,然后將取出的這些對(duì)象還給central cache中對(duì)應(yīng)的span即可。
//釋放對(duì)象導(dǎo)致鏈表過(guò)長(zhǎng),回收內(nèi)存到中心緩存 void ThreadCache::ListTooLong(FreeList& list, size_t size) { void* start = nullptr; void* end = nullptr; //從list中取出一次批量個(gè)數(shù)的對(duì)象 list.PopRange(start, end, list.MaxSize()); //將取出的對(duì)象還給central cache中對(duì)應(yīng)的span CentralCache::GetInstance()->ReleaseListToSpans(start, size); }
從上述代碼可以看出,F(xiàn)reeList類需要支持用Size函數(shù)獲取自由鏈表中對(duì)象的個(gè)數(shù),還需要支持用PopRange函數(shù)從自由鏈表中取出指定個(gè)數(shù)的對(duì)象。因此我們需要給FreeList類增加一個(gè)對(duì)應(yīng)的PopRange函數(shù),然后再增加一個(gè)_size成員變量,該成員變量用于記錄當(dāng)前自由鏈表中對(duì)象的個(gè)數(shù),當(dāng)我們向自由鏈表插入或刪除對(duì)象時(shí),都應(yīng)該更新_size的值。
//管理切分好的小對(duì)象的自由鏈表 class FreeList { public: //將釋放的對(duì)象頭插到自由鏈表 void Push(void* obj) { assert(obj); //頭插 NextObj(obj) = _freeList; _freeList = obj; _size++; } //從自由鏈表頭部獲取一個(gè)對(duì)象 void* Pop() { assert(_freeList); //頭刪 void* obj = _freeList; _freeList = NextObj(_freeList); _size--; return obj; } //插入一段范圍的對(duì)象到自由鏈表 void PushRange(void* start, void* end, size_t n) { assert(start); assert(end); //頭插 NextObj(end) = _freeList; _freeList = start; _size += n; } //從自由鏈表獲取一段范圍的對(duì)象 void PopRange(void*& start, void*& end, size_t n) { assert(n <= _size); //頭刪 start = _freeList; end = start; for (size_t i = 0; i < n - 1;i++) { end = NextObj(end); } _freeList = NextObj(end); //自由鏈表指向end的下一個(gè)對(duì)象 NextObj(end) = nullptr; //取出的一段鏈表的表尾置空 _size -= n; } bool Empty() { return _freeList == nullptr; } size_t& MaxSize() { return _maxSize; } size_t Size() { return _size; } private: void* _freeList = nullptr; //自由鏈表 size_t _maxSize = 1; size_t _size = 0; };
而對(duì)于FreeList類當(dāng)中的PushRange成員函數(shù),我們最好也像PopRange一樣給它增加一個(gè)參數(shù),表示插入對(duì)象的個(gè)數(shù),不然我們這時(shí)還需要通過(guò)遍歷統(tǒng)計(jì)插入對(duì)象的個(gè)數(shù)。
因此之前在調(diào)用PushRange的地方就需要修改一下,而我們實(shí)際就在一個(gè)地方調(diào)用過(guò)PushRange函數(shù),并且此時(shí)插入對(duì)象的個(gè)數(shù)也是很容易知道的。當(dāng)時(shí)thread cache從central cache獲取了actualNum個(gè)對(duì)象,將其中的一個(gè)返回給了申請(qǐng)對(duì)象的線程,剩下的actualNum-1個(gè)掛到了thread cache對(duì)應(yīng)的桶當(dāng)中,所以這里插入對(duì)象的個(gè)數(shù)就是actualNum-1。
_freeLists[index].PushRange(NextObj(start), end, actualNum - 1);
說(shuō)明一下:
當(dāng)thread cache的某個(gè)自由鏈表過(guò)長(zhǎng)時(shí),我們實(shí)際就是把這個(gè)自由鏈表當(dāng)中全部的對(duì)象都還給central cache了,但這里在設(shè)計(jì)PopRange接口時(shí)還是設(shè)計(jì)的是取出指定個(gè)數(shù)的對(duì)象,因?yàn)樵谀承┣闆r下當(dāng)自由鏈表過(guò)長(zhǎng)時(shí),我們可能并不一定想把鏈表中全部的對(duì)象都取出來(lái)還給central cache,這樣設(shè)計(jì)就是為了增加代碼的可修改性。
其次,當(dāng)我們判斷thread cache是否應(yīng)該還對(duì)象給central cache時(shí),還可以綜合考慮每個(gè)thread cache整體的大小。比如當(dāng)某個(gè)thread cache的總占用大小超過(guò)一定閾值時(shí),我們就將該thread cache當(dāng)中的對(duì)象還一些給central cache,這樣就盡量避免了某個(gè)線程的thread cache占用太多的內(nèi)存。對(duì)于這一點(diǎn),在tcmalloc當(dāng)中就是考慮到了的。
centralcache回收內(nèi)存
當(dāng)thread cache中某個(gè)自由鏈表太長(zhǎng)時(shí),會(huì)將自由鏈表當(dāng)中的這些對(duì)象還給central cache中的span。
但是需要注意的是,還給central cache的這些對(duì)象不一定都是屬于同一個(gè)span的。central cache中的每個(gè)哈希桶當(dāng)中可能都不止一個(gè)span,因此當(dāng)我們計(jì)算出還回來(lái)的對(duì)象應(yīng)該還給central cache的哪一個(gè)桶后,還需要知道這些對(duì)象到底應(yīng)該還給這個(gè)桶當(dāng)中的哪一個(gè)span。
如何根據(jù)對(duì)象的地址得到對(duì)象所在的頁(yè)號(hào)?
首先我們必須理解的是,某個(gè)頁(yè)當(dāng)中的所有地址除以頁(yè)的大小都等該頁(yè)的頁(yè)號(hào)。比如我們這里假設(shè)一頁(yè)的大小是100,那么地址0~99都屬于第0頁(yè),它們除以100都等于0,而地址100~199都屬于第1頁(yè),它們除以100都等于1。
如何找到一個(gè)對(duì)象對(duì)應(yīng)的span?
雖然我們現(xiàn)在可以通過(guò)對(duì)象的地址得到其所在的頁(yè)號(hào),但是我們還是不能知道這個(gè)對(duì)象到底屬于哪一個(gè)span。因?yàn)橐粋€(gè)span管理的可能是多個(gè)頁(yè)。
為了解決這個(gè)問(wèn)題,我們可以建立頁(yè)號(hào)和span之間的映射。由于這個(gè)映射關(guān)系在page cache進(jìn)行span的合并時(shí)也需要用到,因此我們直接將其存放到page cache里面。這時(shí)我們就需要在PageCache類當(dāng)中添加一個(gè)映射關(guān)系了,這里可以用C++當(dāng)中的unordered_map進(jìn)行實(shí)現(xiàn),并且添加一個(gè)函數(shù)接口,用于讓central cache獲取這里的映射關(guān)系。(下面代碼中只展示了PageCache類當(dāng)中新增的成員)
//單例模式class PageCache{<!--{C}%3C!%2D%2D%20%2D%2D%3E-->public://獲取從對(duì)象到span的映射Span* MapObjectToSpan(void* obj);private:std::unordered_map<PAGE_ID, Span*> _idSpanMap;};
每當(dāng)page cache分配span給central cache時(shí),都需要記錄一下頁(yè)號(hào)和span之間的映射關(guān)系。此后當(dāng)thread cache還對(duì)象給central cache時(shí),才知道應(yīng)該具體還給哪一個(gè)span。
因此當(dāng)central cache在調(diào)用NewSpan接口向page cache申請(qǐng)k頁(yè)的span時(shí),page cache在返回這個(gè)k頁(yè)的span給central cache之前,應(yīng)該建立這k個(gè)頁(yè)號(hào)與該span之間的映射關(guān)系。
//獲取一個(gè)k頁(yè)的span Span* PageCache::NewSpan(size_t k) { assert(k > 0 && k < NPAGES); //先檢查第k個(gè)桶里面有沒(méi)有span if (!_spanLists[k].Empty()) { Span* kSpan = _spanLists[k].PopFront(); //建立頁(yè)號(hào)與span的映射,方便central cache回收小塊內(nèi)存時(shí)查找對(duì)應(yīng)的span for (PAGE_ID i = 0; i < kSpan->_n; i++) { _idSpanMap[kSpan->_pageId + i] = kSpan; } return kSpan; } //檢查一下后面的桶里面有沒(méi)有span,如果有可以將其進(jìn)行切分 for (size_t i = k + 1; i < NPAGES; i++) { if (!_spanLists[i].Empty()) { Span* nSpan = _spanLists[i].PopFront(); Span* kSpan = new Span; //在nSpan的頭部切k頁(yè)下來(lái) kSpan->_pageId = nSpan->_pageId; kSpan->_n = k; nSpan->_pageId += k; nSpan->_n -= k; //將剩下的掛到對(duì)應(yīng)映射的位置 _spanLists[nSpan->_n].PushFront(nSpan); //建立頁(yè)號(hào)與span的映射,方便central cache回收小塊內(nèi)存時(shí)查找對(duì)應(yīng)的span for (PAGE_ID i = 0; i < kSpan->_n; i++) { _idSpanMap[kSpan->_pageId + i] = kSpan; } return kSpan; } } //走到這里說(shuō)明后面沒(méi)有大頁(yè)的span了,這時(shí)就向堆申請(qǐng)一個(gè)128頁(yè)的span Span* bigSpan = new Span; void* ptr = SystemAlloc(NPAGES - 1); bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT; bigSpan->_n = NPAGES - 1; _spanLists[bigSpan->_n].PushFront(bigSpan); //盡量避免代碼重復(fù),遞歸調(diào)用自己 return NewSpan(k); }
此時(shí)我們就可以通過(guò)對(duì)象的地址找到該對(duì)象對(duì)應(yīng)的span了,直接將該對(duì)象的地址除以頁(yè)的大小得到頁(yè)號(hào),然后在unordered_map當(dāng)中找到其對(duì)應(yīng)的span即可。
//獲取從對(duì)象到span的映射 Span* PageCache::MapObjectToSpan(void* obj) { PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT; //頁(yè)號(hào) auto ret = _idSpanMap.find(id); if (ret != _idSpanMap.end()) { return ret->second; } else { assert(false); return nullptr; } }
注意一下,當(dāng)我們要通過(guò)某個(gè)頁(yè)號(hào)查找其對(duì)應(yīng)的span時(shí),該頁(yè)號(hào)與其span之間的映射一定是建立過(guò)的,如果此時(shí)我們沒(méi)有在unordered_map當(dāng)中找到,則說(shuō)明我們之前的代碼邏輯有問(wèn)題,因此當(dāng)沒(méi)有找到對(duì)應(yīng)的span時(shí)可以直接用斷言結(jié)束程序,以表明程序邏輯出錯(cuò)。
central cache回收內(nèi)存
這時(shí)當(dāng)thread cache還對(duì)象給central cache時(shí),就可以依次遍歷這些對(duì)象,將這些對(duì)象插入到其對(duì)應(yīng)span的自由鏈表當(dāng)中,并且及時(shí)更新該span的_usseCount計(jì)數(shù)即可。
在thread cache還對(duì)象給central cache的過(guò)程中,如果central cache中某個(gè)span的_useCount減到0時(shí),說(shuō)明這個(gè)span分配出去的對(duì)象全部都還回來(lái)了,那么此時(shí)就可以將這個(gè)span再進(jìn)一步還給page cache。
//將一定數(shù)量的對(duì)象還給對(duì)應(yīng)的span void CentralCache::ReleaseListToSpans(void* start, size_t size) { size_t index = SizeClass::Index(size); _spanLists[index]._mtx.lock(); //加鎖 while (start) { void* next = NextObj(start); //記錄下一個(gè) Span* span = PageCache::GetInstance()->MapObjectToSpan(start); //將對(duì)象頭插到span的自由鏈表 NextObj(start) = span->_freeList; span->_freeList = start; span->_useCount--; //更新被分配給thread cache的計(jì)數(shù) if (span->_useCount == 0) //說(shuō)明這個(gè)span分配出去的對(duì)象全部都回來(lái)了 { //此時(shí)這個(gè)span就可以再回收給page cache,page cache可以再嘗試去做前后頁(yè)的合并 _spanLists[index].Erase(span); span->_freeList = nullptr; //自由鏈表置空 span->_next = nullptr; span->_prev = nullptr; //釋放span給page cache時(shí),使用page cache的鎖就可以了,這時(shí)把桶鎖解掉 _spanLists[index]._mtx.unlock(); //解桶鎖 PageCache::GetInstance()->_pageMtx.lock(); //加大鎖 PageCache::GetInstance()->ReleaseSpanToPageCache(span); PageCache::GetInstance()->_pageMtx.unlock(); //解大鎖 _spanLists[index]._mtx.lock(); //加桶鎖 } start = next; } _spanLists[index]._mtx.unlock(); //解鎖 }
需要注意,如果要把某個(gè)span還給page cache,我們需要先將這個(gè)span從central cache對(duì)應(yīng)的雙鏈表中移除,然后再將該span的自由鏈表置空,因?yàn)閜age cache中的span是不需要切分成一個(gè)個(gè)的小對(duì)象的,以及該span的前后指針也都應(yīng)該置空,因?yàn)橹笠獙⑵洳迦氲絧age cache對(duì)應(yīng)的雙鏈表中。但span當(dāng)中記錄的起始頁(yè)號(hào)以及它管理的頁(yè)數(shù)是不能清除的,否則對(duì)應(yīng)內(nèi)存塊就找不到了。
并且在central cache還span給page cache時(shí)也存在鎖的問(wèn)題,此時(shí)需要先將central cache中對(duì)應(yīng)的桶鎖解掉,然后再加上page cache的大鎖之后才能進(jìn)入page cache進(jìn)行相關(guān)操作,當(dāng)處理完畢回到central cache時(shí),除了將page cache的大鎖解掉,還需要立刻獲得central cache對(duì)應(yīng)的桶鎖,然后將還未還完對(duì)象繼續(xù)還給central cache中對(duì)應(yīng)的span。
pagecache回收內(nèi)存
如果central cache中有某個(gè)span的_useCount減到0了,那么central cache就需要將這個(gè)span還給page cache了。
這個(gè)過(guò)程看似是非常簡(jiǎn)單的,page cache只需將還回來(lái)的span掛到對(duì)應(yīng)的哈希桶上就行了。但實(shí)際為了緩解內(nèi)存碎片的問(wèn)題,page cache還需要嘗試將還回來(lái)的span與其他空閑的span進(jìn)行合并。
page cache進(jìn)行前后頁(yè)的合并
合并的過(guò)程可以分為向前合并和向后合并。如果還回來(lái)的span的起始頁(yè)號(hào)是num,該span所管理的頁(yè)數(shù)是n。那么在向前合并時(shí),就需要判斷第num-1頁(yè)對(duì)應(yīng)span是否空閑,如果空閑則可以將其進(jìn)行合并,并且合并后還需要繼續(xù)向前嘗試進(jìn)行合并,直到不能進(jìn)行合并為止。而在向后合并時(shí),就需要判斷第num+n頁(yè)對(duì)應(yīng)的span是否空閑,如果空閑則可以將其進(jìn)行合并,并且合并后還需要繼續(xù)向后嘗試進(jìn)行合并,直到不能進(jìn)行合并為止。
因此page cache在合并span時(shí),是需要通過(guò)頁(yè)號(hào)獲取到對(duì)應(yīng)的span的,這就是我們要把頁(yè)號(hào)與span之間的映射關(guān)系存儲(chǔ)到page cache的原因。
但需要注意的是,當(dāng)我們通過(guò)頁(yè)號(hào)找到其對(duì)應(yīng)的span時(shí),這個(gè)span此時(shí)可能掛在page cache,也可能掛在central cache。而在合并時(shí)我們只能合并掛在page cache的span,因?yàn)閽煸赾entral cache的span當(dāng)中的對(duì)象正在被其他線程使用。
可是我們不能通過(guò)span結(jié)構(gòu)當(dāng)中的_useCount成員,來(lái)判斷某個(gè)span到底是在central cache還是在page cache。因?yàn)楫?dāng)central cache剛向page cache申請(qǐng)到一個(gè)span時(shí),這個(gè)span的_useCount就是等于0的,這時(shí)可能當(dāng)我們正在對(duì)該span進(jìn)行切分的時(shí)候,page cache就把這個(gè)span拿去進(jìn)行合并了,這顯然是不合理的。
鑒于此,我們可以在span結(jié)構(gòu)中再增加一個(gè)_isUse成員,用于標(biāo)記這個(gè)span是否正在被使用,而當(dāng)一個(gè)span結(jié)構(gòu)被創(chuàng)建時(shí)我們默認(rèn)該span是沒(méi)有被使用的。
//管理以頁(yè)為單位的大塊內(nèi)存 struct Span { PAGE_ID _pageId = 0; //大塊內(nèi)存起始頁(yè)的頁(yè)號(hào) size_t _n = 0; //頁(yè)的數(shù)量 Span* _next = nullptr; //雙鏈表結(jié)構(gòu) Span* _prev = nullptr; size_t _useCount = 0; //切好的小塊內(nèi)存,被分配給thread cache的計(jì)數(shù) void* _freeList = nullptr; //切好的小塊內(nèi)存的自由鏈表 bool _isUse = false; //是否在被使用 };
因此當(dāng)central cache向page cache申請(qǐng)到一個(gè)span時(shí),需要立即將該span的_isUse改為true。
span->_isUse = true;
而當(dāng)central cache將某個(gè)span還給page cache時(shí),也就需要將該span的_isUse改成false。
span->_isUse = false;
由于在合并page cache當(dāng)中的span時(shí),需要通過(guò)頁(yè)號(hào)找到其對(duì)應(yīng)的span,而一個(gè)span是在被分配給central cache時(shí),才建立的各個(gè)頁(yè)號(hào)與span之間的映射關(guān)系,因此page cache當(dāng)中的span也需要建立頁(yè)號(hào)與span之間的映射關(guān)系。
與central cache中的span不同的是,在page cache中,只需建立一個(gè)span的首尾頁(yè)號(hào)與該span之間的映射關(guān)系。因?yàn)楫?dāng)一個(gè)span在嘗試進(jìn)行合并時(shí),如果是往前合并,那么只需要通過(guò)一個(gè)span的尾頁(yè)找到這個(gè)span,如果是向后合并,那么只需要通過(guò)一個(gè)span的首頁(yè)找到這個(gè)span。也就是說(shuō),在進(jìn)行合并時(shí)我們只需要用到span與其首尾頁(yè)之間的映射關(guān)系就夠了。
因此當(dāng)我們申請(qǐng)k頁(yè)的span時(shí),如果是將n頁(yè)的span切成了一個(gè)k頁(yè)的span和一個(gè)n-k頁(yè)的span,我們除了需要建立k頁(yè)span中每個(gè)頁(yè)與該span之間的映射關(guān)系之外,還需要建立剩下的n-k頁(yè)的span與其首尾頁(yè)之間的映射關(guān)系。
//獲取一個(gè)k頁(yè)的span Span* PageCache::NewSpan(size_t k) { assert(k > 0 && k < NPAGES); //先檢查第k個(gè)桶里面有沒(méi)有span if (!_spanLists[k].Empty()) { Span* kSpan = _spanLists[k].PopFront(); //建立頁(yè)號(hào)與span的映射,方便central cache回收小塊內(nèi)存時(shí)查找對(duì)應(yīng)的span for (PAGE_ID i = 0; i < kSpan->_n; i++) { _idSpanMap[kSpan->_pageId + i] = kSpan; } return kSpan; } //檢查一下后面的桶里面有沒(méi)有span,如果有可以將其進(jìn)行切分 for (size_t i = k + 1; i < NPAGES; i++) { if (!_spanLists[i].Empty()) { Span* nSpan = _spanLists[i].PopFront(); Span* kSpan = new Span; //在nSpan的頭部切k頁(yè)下來(lái) kSpan->_pageId = nSpan->_pageId; kSpan->_n = k; nSpan->_pageId += k; nSpan->_n -= k; //將剩下的掛到對(duì)應(yīng)映射的位置 _spanLists[nSpan->_n].PushFront(nSpan); //存儲(chǔ)nSpan的首尾頁(yè)號(hào)與nSpan之間的映射,方便page cache合并span時(shí)進(jìn)行前后頁(yè)的查找 _idSpanMap[nSpan->_pageId] = nSpan; _idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan; //建立頁(yè)號(hào)與span的映射,方便central cache回收小塊內(nèi)存時(shí)查找對(duì)應(yīng)的span for (PAGE_ID i = 0; i < kSpan->_n; i++) { _idSpanMap[kSpan->_pageId + i] = kSpan; } return kSpan; } } //走到這里說(shuō)明后面沒(méi)有大頁(yè)的span了,這時(shí)就向堆申請(qǐng)一個(gè)128頁(yè)的span Span* bigSpan = new Span; void* ptr = SystemAlloc(NPAGES - 1); bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT; bigSpan->_n = NPAGES - 1; _spanLists[bigSpan->_n].PushFront(bigSpan); //盡量避免代碼重復(fù),遞歸調(diào)用自己 return NewSpan(k); }
此時(shí)page cache當(dāng)中的span就都與其首尾頁(yè)之間建立了映射關(guān)系,現(xiàn)在我們就可以進(jìn)行span的合并了,其合并邏輯如下:
//釋放空閑的span回到PageCache,并合并相鄰的span void PageCache::ReleaseSpanToPageCache(Span* span) { //對(duì)span的前后頁(yè),嘗試進(jìn)行合并,緩解內(nèi)存碎片問(wèn)題 //1、向前合并 while (1) { PAGE_ID prevId = span->_pageId - 1; auto ret = _idSpanMap.find(prevId); //前面的頁(yè)號(hào)沒(méi)有(還未向系統(tǒng)申請(qǐng)),停止向前合并 if (ret == _idSpanMap.end()) { break; } //前面的頁(yè)號(hào)對(duì)應(yīng)的span正在被使用,停止向前合并 Span* prevSpan = ret->second; if (prevSpan->_isUse == true) { break; } //合并出超過(guò)128頁(yè)的span無(wú)法進(jìn)行管理,停止向前合并 if (prevSpan->_n + span->_n > NPAGES - 1) { break; } //進(jìn)行向前合并 span->_pageId = prevSpan->_pageId; span->_n += prevSpan->_n; //將prevSpan從對(duì)應(yīng)的雙鏈表中移除 _spanLists[prevSpan->_n].Erase(prevSpan); delete prevSpan; } //2、向后合并 while (1) { PAGE_ID nextId = span->_pageId + span->_n; auto ret = _idSpanMap.find(nextId); //后面的頁(yè)號(hào)沒(méi)有(還未向系統(tǒng)申請(qǐng)),停止向后合并 if (ret == _idSpanMap.end()) { break; } //后面的頁(yè)號(hào)對(duì)應(yīng)的span正在被使用,停止向后合并 Span* nextSpan = ret->second; if (nextSpan->_isUse == true) { break; } //合并出超過(guò)128頁(yè)的span無(wú)法進(jìn)行管理,停止向后合并 if (nextSpan->_n + span->_n > NPAGES - 1) { break; } //進(jìn)行向后合并 span->_n += nextSpan->_n; //將nextSpan從對(duì)應(yīng)的雙鏈表中移除 _spanLists[nextSpan->_n].Erase(nextSpan); delete nextSpan; } //將合并后的span掛到對(duì)應(yīng)的雙鏈表當(dāng)中 _spanLists[span->_n].PushFront(span); //建立該span與其首尾頁(yè)的映射 _idSpanMap[span->_pageId] = span; _idSpanMap[span->_pageId + span->_n - 1] = span; //將該span設(shè)置為未被使用的狀態(tài) span->_isUse = false; }
需要注意的是,在向前或向后進(jìn)行合并的過(guò)程中:
- 如果沒(méi)有通過(guò)頁(yè)號(hào)獲取到其對(duì)應(yīng)的span,說(shuō)明對(duì)應(yīng)到該頁(yè)的內(nèi)存塊還未申請(qǐng),此時(shí)需要停止合并。
- 如果通過(guò)頁(yè)號(hào)獲取到了其對(duì)應(yīng)的span,但該span處于被使用的狀態(tài),那我們也必須停止合并。
- 如果合并后大于128頁(yè)則不能進(jìn)行本次合并,因?yàn)閜age cache無(wú)法對(duì)大于128頁(yè)的span進(jìn)行管理。
在合并span時(shí),由于這個(gè)span是在page cache的某個(gè)哈希桶的雙鏈表當(dāng)中的,因此在合并后需要將其從對(duì)應(yīng)的雙鏈表中移除,然后再將這個(gè)被合并了的span結(jié)構(gòu)進(jìn)行delete。
除此之外,在合并結(jié)束后,除了將合并后的span掛到page cache對(duì)應(yīng)哈希桶的雙鏈表當(dāng)中,還需要建立該span與其首位頁(yè)之間的映射關(guān)系,便于此后合并出更大的span。
釋放內(nèi)存過(guò)程聯(lián)調(diào)
ConcurrentFree函數(shù)
至此我們將thread cache、central cache以及page cache的釋放流程也都寫(xiě)完了,此時(shí)我們就可以向外提供一個(gè)ConcurrentFree函數(shù),用于釋放內(nèi)存塊,釋放內(nèi)存塊時(shí)每個(gè)線程通過(guò)自己的thread cache對(duì)象,調(diào)用thread cache中釋放內(nèi)存對(duì)象的接口即可。
static void ConcurrentFree(void* ptr, size_t size/*暫時(shí)*/) { assert(pTLSThreadCache); pTLSThreadCache->Deallocate(ptr, size); }
釋放內(nèi)存過(guò)程聯(lián)調(diào)測(cè)試
之前我們?cè)跍y(cè)試申請(qǐng)流程時(shí),讓單個(gè)線程進(jìn)行了三次內(nèi)存申請(qǐng),現(xiàn)在我們?cè)賹⑦@三個(gè)對(duì)象再進(jìn)行釋放,看看這其中的釋放流程是如何進(jìn)行的。
void* p1 = ConcurrentAlloc(6); void* p2 = ConcurrentAlloc(8); void* p3 = ConcurrentAlloc(1); ConcurrentFree(p1, 6); ConcurrentFree(p2, 8); ConcurrentFree(p3, 1);
首先,這三次申請(qǐng)和釋放的對(duì)象大小進(jìn)行對(duì)齊后都是8字節(jié),因此對(duì)應(yīng)操作的就是thread cache和central cache的第0號(hào)桶,以及page cache的第1號(hào)桶。
由于第三次對(duì)象申請(qǐng)時(shí),剛好將thread cache第0號(hào)桶當(dāng)中僅剩的一個(gè)對(duì)象拿走了,因此在三次對(duì)象申請(qǐng)后thread cache的第0號(hào)桶當(dāng)中是沒(méi)有對(duì)象的。
通過(guò)監(jiān)視窗口可以看到,此時(shí)thread cache第0號(hào)桶中自由鏈表的_maxSize已經(jīng)慢增長(zhǎng)到了3,而當(dāng)我們釋放完第一個(gè)對(duì)象后,該自由鏈表當(dāng)中對(duì)象的個(gè)數(shù)只有一個(gè),因此不會(huì)將該自由鏈表當(dāng)中的對(duì)象進(jìn)一步還給central cache。
當(dāng)?shù)诙€(gè)對(duì)象釋放給thread cache的第0號(hào)桶后,該桶對(duì)應(yīng)自由鏈表當(dāng)中對(duì)象的個(gè)數(shù)變成了2,也是不會(huì)進(jìn)行ListTooLong操作的。
直到第三個(gè)對(duì)象釋放給thread cache的第0號(hào)桶時(shí),此時(shí)該自由鏈表的_size的值變?yōu)?,與_maxSize的值相等,現(xiàn)在thread cache就需要將對(duì)象給central cache了。
thread cache先是將第0號(hào)桶當(dāng)中的對(duì)象彈出MaxSize個(gè),在這里實(shí)際上就是全部彈出,此時(shí)該自由鏈表_size的值變?yōu)?,然后繼續(xù)調(diào)用central cache當(dāng)中的ReleaseListToSpans函數(shù),將這三個(gè)對(duì)象還給central cache當(dāng)中對(duì)應(yīng)的span。
在進(jìn)入central cache的第0號(hào)桶還對(duì)象之前,先把第0號(hào)桶對(duì)應(yīng)的桶鎖加上,然后通過(guò)查page cache中的映射表找到其對(duì)應(yīng)的span,最后將這個(gè)對(duì)象頭插到該span的自由鏈表中,并將該span的_useCount進(jìn)行--
。當(dāng)?shù)谝粋€(gè)對(duì)象還給其對(duì)應(yīng)的span時(shí),可以看到該span的_useCount減到了2。
而由于我們只進(jìn)行了三次對(duì)象申請(qǐng),并且這些對(duì)象大小對(duì)齊后大小都是8字節(jié),因此我們申請(qǐng)的這三個(gè)對(duì)象實(shí)際都是同一個(gè)span切分出來(lái)的。當(dāng)我們將這三個(gè)對(duì)象都還給這個(gè)span時(shí),該span的_useCount就減為了0。
現(xiàn)在central cache就需要將這個(gè)span進(jìn)一步還給page cache,而在將該span交給page cache之前,會(huì)將該span的自由鏈表以及前后指針都置空。并且在進(jìn)入page cache之前會(huì)先將central cache第0號(hào)桶的桶鎖解掉,然后再加上page cache的大鎖,之后才能進(jìn)入page cache進(jìn)行相關(guān)操作。
由于這個(gè)一頁(yè)的span是從128頁(yè)的span的頭部切下來(lái)的,在向前合并時(shí)由于前面的頁(yè)還未向系統(tǒng)申請(qǐng),因此在查映射關(guān)系時(shí)是無(wú)法找到的,此時(shí)直接停止了向前合并。
(說(shuō)明一下:由于下面是重新另外進(jìn)行的一次調(diào)試,因此監(jiān)視窗口顯示的span的起始頁(yè)號(hào)與之前的不同,實(shí)際應(yīng)該與上面一致)
而在向后合并時(shí),由于page cache沒(méi)有將該頁(yè)后面的頁(yè)分配給central cache,因此在向后合并時(shí)肯定能夠找到一個(gè)127頁(yè)的span進(jìn)行合并。合并后就變成了一個(gè)128頁(yè)的span,這時(shí)我們將原來(lái)127頁(yè)的span從第127號(hào)桶刪除,然后還需要將該127頁(yè)的span結(jié)構(gòu)進(jìn)行delete,因?yàn)樗芾淼?27頁(yè)已經(jīng)與1頁(yè)的span進(jìn)行合并了,不再需要它來(lái)管理了。
緊接著將這個(gè)128頁(yè)的span插入到第128號(hào)桶,然后建立該span與其首尾頁(yè)的映射,便于下次被用于合并,最后再將該span的狀態(tài)設(shè)置為未被使用的狀態(tài)即可。
當(dāng)從page cache回來(lái)后,除了將page cache的大鎖解掉,還需要立刻加上central cache中對(duì)應(yīng)的桶鎖,然后繼續(xù)將對(duì)象還給central cache中的span,但此時(shí)實(shí)際上是還完了,因此再將central cache的桶鎖解掉就行了。
至此我們便完成了這三個(gè)對(duì)象的申請(qǐng)和釋放流程。
大于256KB的大塊內(nèi)存申請(qǐng)問(wèn)題
申請(qǐng)過(guò)程
之前說(shuō)到,每個(gè)線程的thread cache是用于申請(qǐng)小于等于256KB的內(nèi)存的,而對(duì)于大于256KB的內(nèi)存,我們可以考慮直接向page cache申請(qǐng),但page cache中最大的頁(yè)也就只有128頁(yè),因此如果是大于128頁(yè)的內(nèi)存申請(qǐng),就只能直接向堆申請(qǐng)了。
申請(qǐng)內(nèi)存的大小 | 申請(qǐng)方式 |
---|---|
x ≤ 256 K B ( 32 頁(yè) ) x \leq 256KB(32頁(yè)) x≤256KB(32頁(yè)) | 向thread cache申請(qǐng) |
32 頁(yè) < x ≤ 128 頁(yè) 32 頁(yè)< x \leq 128頁(yè) 32頁(yè)<x≤128頁(yè) | 向page cache申請(qǐng) |
x ≥ 128 頁(yè) x \geq 128頁(yè) x≥128頁(yè) | 向堆申請(qǐng) |
當(dāng)申請(qǐng)的內(nèi)存大于256KB時(shí),雖然不是從thread cache進(jìn)行獲取,但在分配內(nèi)存時(shí)也是需要進(jìn)行向上對(duì)齊的,對(duì)于大于256KB的內(nèi)存我們可以直接按頁(yè)進(jìn)行對(duì)齊。 |
而我們之前實(shí)現(xiàn)RoundUp函數(shù)時(shí),對(duì)傳入字節(jié)數(shù)大于256KB的情況直接做了斷言處理,因此這里需要對(duì)RoundUp函數(shù)稍作修改。
//獲取向上對(duì)齊后的字節(jié)數(shù) static inline size_t RoundUp(size_t bytes) { if (bytes <= 128) { return _RoundUp(bytes, 8); } else if (bytes <= 1024) { return _RoundUp(bytes, 16); } else if (bytes <= 8 * 1024) { return _RoundUp(bytes, 128); } else if (bytes <= 64 * 1024) { return _RoundUp(bytes, 1024); } else if (bytes <= 256 * 1024) { return _RoundUp(bytes, 8 * 1024); } else { //大于256KB的按頁(yè)對(duì)齊 return _RoundUp(bytes, 1 << PAGE_SHIFT); } }
現(xiàn)在對(duì)于之前的申請(qǐng)邏輯就需要進(jìn)行修改了,當(dāng)申請(qǐng)對(duì)象的大小大于256KB時(shí),就不用向thread cache申請(qǐng)了,這時(shí)先計(jì)算出按頁(yè)對(duì)齊后實(shí)際需要申請(qǐng)的頁(yè)數(shù),然后通過(guò)調(diào)用NewSpan申請(qǐng)指定頁(yè)數(shù)的span即可。
static void* ConcurrentAlloc(size_t size) { if (size > MAX_BYTES) //大于256KB的內(nèi)存申請(qǐng) { //計(jì)算出對(duì)齊后需要申請(qǐng)的頁(yè)數(shù) size_t alignSize = SizeClass::RoundUp(size); size_t kPage = alignSize >> PAGE_SHIFT; //向page cache申請(qǐng)kPage頁(yè)的span PageCache::GetInstance()->_pageMtx.lock(); Span* span = PageCache::GetInstance()->NewSpan(kPage); PageCache::GetInstance()->_pageMtx.unlock(); void* ptr = (void*)(span->_pageId << PAGE_SHIFT); return ptr; } else { //通過(guò)TLS,每個(gè)線程無(wú)鎖的獲取自己專屬的ThreadCache對(duì)象 if (pTLSThreadCache == nullptr) { pTLSThreadCache = new ThreadCache; } cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl; return pTLSThreadCache->Allocate(size); } }
也就是說(shuō),申請(qǐng)大于256KB的內(nèi)存時(shí),會(huì)直接調(diào)用page cache當(dāng)中的NewSpan函數(shù)進(jìn)行申請(qǐng),因此這里我們需要再對(duì)NewSpan函數(shù)進(jìn)行改造,當(dāng)需要申請(qǐng)的內(nèi)存頁(yè)數(shù)大于128頁(yè)時(shí),就直接向堆申請(qǐng)對(duì)應(yīng)頁(yè)數(shù)的內(nèi)存塊。而如果申請(qǐng)的內(nèi)存頁(yè)數(shù)是小于128頁(yè)的,那就在page cache中進(jìn)行申請(qǐng),因此當(dāng)申請(qǐng)大于256KB的內(nèi)存調(diào)用NewSpan函數(shù)時(shí)也是需要加鎖的,因?yàn)槲覀兛赡苁窃趐age cache中進(jìn)行申請(qǐng)的。
//獲取一個(gè)k頁(yè)的span Span* PageCache::NewSpan(size_t k) { assert(k > 0); if (k > NPAGES - 1) //大于128頁(yè)直接找堆申請(qǐng) { void* ptr = SystemAlloc(k); Span* span = new Span; span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT; span->_n = k; //建立頁(yè)號(hào)與span之間的映射 _idSpanMap[span->_pageId] = span; return span; } //先檢查第k個(gè)桶里面有沒(méi)有span if (!_spanLists[k].Empty()) { Span* kSpan = _spanLists[k].PopFront(); //建立頁(yè)號(hào)與span的映射,方便central cache回收小塊內(nèi)存時(shí)查找對(duì)應(yīng)的span for (PAGE_ID i = 0; i < kSpan->_n; i++) { _idSpanMap[kSpan->_pageId + i] = kSpan; } return kSpan; } //檢查一下后面的桶里面有沒(méi)有span,如果有可以將其進(jìn)行切分 for (size_t i = k + 1; i < NPAGES; i++) { if (!_spanLists[i].Empty()) { Span* nSpan = _spanLists[i].PopFront(); Span* kSpan = new Span; //在nSpan的頭部切k頁(yè)下來(lái) kSpan->_pageId = nSpan->_pageId; kSpan->_n = k; nSpan->_pageId += k; nSpan->_n -= k; //將剩下的掛到對(duì)應(yīng)映射的位置 _spanLists[nSpan->_n].PushFront(nSpan); //存儲(chǔ)nSpan的首尾頁(yè)號(hào)與nSpan之間的映射,方便page cache合并span時(shí)進(jìn)行前后頁(yè)的查找 _idSpanMap[nSpan->_pageId] = nSpan; _idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan; //建立頁(yè)號(hào)與span的映射,方便central cache回收小塊內(nèi)存時(shí)查找對(duì)應(yīng)的span for (PAGE_ID i = 0; i < kSpan->_n; i++) { _idSpanMap[kSpan->_pageId + i] = kSpan; } return kSpan; } } //走到這里說(shuō)明后面沒(méi)有大頁(yè)的span了,這時(shí)就向堆申請(qǐng)一個(gè)128頁(yè)的span Span* bigSpan = new Span; void* ptr = SystemAlloc(NPAGES - 1); bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT; bigSpan->_n = NPAGES - 1; _spanLists[bigSpan->_n].PushFront(bigSpan); //盡量避免代碼重復(fù),遞歸調(diào)用自己 return NewSpan(k); }
釋放過(guò)程
當(dāng)釋放對(duì)象時(shí),我們需要判斷釋放對(duì)象的大小:
釋放內(nèi)存的大小 | 釋放方式 |
---|---|
x ≤ 256 K B ( 32 頁(yè) ) x \leq 256KB(32頁(yè)) x≤256KB(32頁(yè)) | 釋放給thread cache |
32 頁(yè) < x ≤ 128 頁(yè) 32 頁(yè)< x \leq 128頁(yè) 32頁(yè)<x≤128頁(yè) | 釋放給page cache |
x ≥ 128 頁(yè) x \geq 128頁(yè) x≥128頁(yè) | 釋放給堆 |
因此當(dāng)釋放對(duì)象時(shí),我們需要先找到該對(duì)象對(duì)應(yīng)的span,但是在釋放對(duì)象時(shí)我們只知道該對(duì)象的起始地址。這也就是我們?cè)谏暾?qǐng)大于256KB的內(nèi)存時(shí),也要給申請(qǐng)到的內(nèi)存建立span結(jié)構(gòu),并建立起始頁(yè)號(hào)與該span之間的映射關(guān)系的原因。此時(shí)我們就可以通過(guò)釋放對(duì)象的起始地址計(jì)算出起始頁(yè)號(hào),進(jìn)而通過(guò)頁(yè)號(hào)找到該對(duì)象對(duì)應(yīng)的span。
static void ConcurrentFree(void* ptr, size_t size) { if (size > MAX_BYTES) //大于256KB的內(nèi)存釋放 { Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr); PageCache::GetInstance()->_pageMtx.lock(); PageCache::GetInstance()->ReleaseSpanToPageCache(span); PageCache::GetInstance()->_pageMtx.unlock(); } else { assert(pTLSThreadCache); pTLSThreadCache->Deallocate(ptr, size); } }
因此page cache在回收span時(shí)也需要進(jìn)行判斷,如果該span的大小是小于等于128頁(yè)的,那么直接還給page cache進(jìn)行了,page cache會(huì)嘗試對(duì)其進(jìn)行合并。而如果該span的大小是大于128頁(yè)的,那么說(shuō)明該span是直接向堆申請(qǐng)的,我們直接將這塊內(nèi)存釋放給堆,然后將這個(gè)span結(jié)構(gòu)進(jìn)行delete就行了。
//釋放空閑的span回到PageCache,并合并相鄰的span void PageCache::ReleaseSpanToPageCache(Span* span) { if (span->_n > NPAGES - 1) //大于128頁(yè)直接釋放給堆 { void* ptr = (void*)(span->_pageId << PAGE_SHIFT); SystemFree(ptr); delete span; return; } //對(duì)span的前后頁(yè),嘗試進(jìn)行合并,緩解內(nèi)存碎片問(wèn)題 //1、向前合并 while (1) { PAGE_ID prevId = span->_pageId - 1; auto ret = _idSpanMap.find(prevId); //前面的頁(yè)號(hào)沒(méi)有(還未向系統(tǒng)申請(qǐng)),停止向前合并 if (ret == _idSpanMap.end()) { break; } //前面的頁(yè)號(hào)對(duì)應(yīng)的span正在被使用,停止向前合并 Span* prevSpan = ret->second; if (prevSpan->_isUse == true) { break; } //合并出超過(guò)128頁(yè)的span無(wú)法進(jìn)行管理,停止向前合并 if (prevSpan->_n + span->_n > NPAGES - 1) { break; } //進(jìn)行向前合并 span->_pageId = prevSpan->_pageId; span->_n += prevSpan->_n; //將prevSpan從對(duì)應(yīng)的雙鏈表中移除 _spanLists[prevSpan->_n].Erase(prevSpan); delete prevSpan; } //2、向后合并 while (1) { PAGE_ID nextId = span->_pageId + span->_n; auto ret = _idSpanMap.find(nextId); //后面的頁(yè)號(hào)沒(méi)有(還未向系統(tǒng)申請(qǐng)),停止向后合并 if (ret == _idSpanMap.end()) { break; } //后面的頁(yè)號(hào)對(duì)應(yīng)的span正在被使用,停止向后合并 Span* nextSpan = ret->second; if (nextSpan->_isUse == true) { break; } //合并出超過(guò)128頁(yè)的span無(wú)法進(jìn)行管理,停止向后合并 if (nextSpan->_n + span->_n > NPAGES - 1) { break; } //進(jìn)行向后合并 span->_n += nextSpan->_n; //將nextSpan從對(duì)應(yīng)的雙鏈表中移除 _spanLists[nextSpan->_n].Erase(nextSpan); delete nextSpan; } //將合并后的span掛到對(duì)應(yīng)的雙鏈表當(dāng)中 _spanLists[span->_n].PushFront(span); //建立該span與其首尾頁(yè)的映射 _idSpanMap[span->_pageId] = span; _idSpanMap[span->_pageId + span->_n - 1] = span; //將該span設(shè)置為未被使用的狀態(tài) span->_isUse = false; }
說(shuō)明一下,直接向堆申請(qǐng)內(nèi)存時(shí)我們調(diào)用的接口是VirtualAlloc,與之對(duì)應(yīng)的將內(nèi)存釋放給堆的接口叫做VirtualFree,而Linux下的brk和mmap對(duì)應(yīng)的釋放接口叫做sbrk和unmmap。此時(shí)我們也可以將這些釋放接口封裝成一個(gè)叫做SystemFree的接口,當(dāng)我們需要將內(nèi)存釋放給堆時(shí)直接調(diào)用SystemFree即可。
//直接將內(nèi)存還給堆 inline static void SystemFree(void* ptr) { #ifdef _WIN32 VirtualFree(ptr, 0, MEM_RELEASE); #else //linux下sbrk unmmap等 #endif }
簡(jiǎn)單測(cè)試
下面我們對(duì)大于256KB的申請(qǐng)釋放流程進(jìn)行簡(jiǎn)單的測(cè)試:
//找page cache申請(qǐng) void* p1 = ConcurrentAlloc(257 * 1024); //257KB ConcurrentFree(p1, 257 * 1024); //找堆申請(qǐng) void* p2 = ConcurrentAlloc(129 * 8 * 1024); //129頁(yè) ConcurrentFree(p2, 129 * 8 * 1024);
當(dāng)申請(qǐng)257KB的內(nèi)存時(shí),由于257KB的內(nèi)存按頁(yè)向上對(duì)齊后是33頁(yè),并沒(méi)有大于128頁(yè),因此不會(huì)直接向堆進(jìn)行申請(qǐng),會(huì)向page cache申請(qǐng)內(nèi)存,但此時(shí)page cache當(dāng)中實(shí)際是沒(méi)有內(nèi)存的,最終page cache就會(huì)向堆申請(qǐng)一個(gè)128頁(yè)的span,將其切分成33頁(yè)的span和95頁(yè)的span,并將33頁(yè)的span進(jìn)行返回。
而在釋放內(nèi)存時(shí),由于該對(duì)象的大小大于了256KB,因此不會(huì)將其還給thread cache,而是直接調(diào)用的page cache當(dāng)中的釋放接口。
由于該對(duì)象的大小是33頁(yè),不大于128頁(yè),因此page cache也不會(huì)直接將該對(duì)象還給堆,而是嘗試對(duì)其進(jìn)行合并,最終就會(huì)把這個(gè)33頁(yè)的span和之前剩下的95頁(yè)的span進(jìn)行合并,最終將合并后的128頁(yè)的span掛到第128號(hào)桶中。
當(dāng)申請(qǐng)129頁(yè)的內(nèi)存時(shí),由于是大于256KB的,于是還是調(diào)用的page cache對(duì)應(yīng)的申請(qǐng)接口,但此時(shí)申請(qǐng)的內(nèi)存同時(shí)也大于128頁(yè),因此會(huì)直接向堆申請(qǐng)。在申請(qǐng)后還會(huì)建立該span與其起始頁(yè)號(hào)之間的映射,便于釋放時(shí)可以通過(guò)頁(yè)號(hào)找到該span。
在釋放內(nèi)存時(shí),通過(guò)對(duì)象的地址找到其對(duì)應(yīng)的span,從span結(jié)構(gòu)中得知釋放內(nèi)存的大小大于128頁(yè),于是會(huì)將該內(nèi)存直接還給堆。
使用定長(zhǎng)內(nèi)存池配合脫離使用new
tcmalloc是要在高并發(fā)場(chǎng)景下替代malloc進(jìn)行內(nèi)存申請(qǐng)的,因此tcmalloc在實(shí)現(xiàn)的時(shí),其內(nèi)部是不能調(diào)用malloc函數(shù)的,我們當(dāng)前的代碼中存在通過(guò)new獲取到的內(nèi)存,而new在底層實(shí)際上就是封裝了malloc。
為了完全脫離掉malloc函數(shù),此時(shí)我們之前實(shí)現(xiàn)的定長(zhǎng)內(nèi)存池就起作用了,代碼中使用new時(shí)基本都是為Span結(jié)構(gòu)的對(duì)象申請(qǐng)空間,而span對(duì)象基本都是在page cache層創(chuàng)建的,因此我們可以在PageCache類當(dāng)中定義一個(gè)_spanPool,用于span對(duì)象的申請(qǐng)和釋放。
//單例模式 class PageCache { public: //... private: ObjectPool<Span> _spanPool; };
然后將代碼中使用new的地方替換為調(diào)用定長(zhǎng)內(nèi)存池當(dāng)中的New函數(shù),將代碼中使用delete的地方替換為調(diào)用定長(zhǎng)內(nèi)存池當(dāng)中的Delete函數(shù)。
//申請(qǐng)span對(duì)象 Span* span = _spanPool.New(); //釋放span對(duì)象 _spanPool.Delete(span);
注意,當(dāng)使用定長(zhǎng)內(nèi)存池當(dāng)中的New函數(shù)申請(qǐng)Span對(duì)象時(shí),New函數(shù)通過(guò)定位new也是對(duì)Span對(duì)象進(jìn)行了初始化的。
此外,每個(gè)線程第一次申請(qǐng)內(nèi)存時(shí)都會(huì)創(chuàng)建其專屬的thread cache,而這個(gè)thread cache目前也是new出來(lái)的,我們也需要對(duì)其進(jìn)行替換。
//通過(guò)TLS,每個(gè)線程無(wú)鎖的獲取自己專屬的ThreadCache對(duì)象 if (pTLSThreadCache == nullptr) { static std::mutex tcMtx; static ObjectPool<ThreadCache> tcPool; tcMtx.lock(); pTLSThreadCache = tcPool.New(); tcMtx.unlock(); }
這里我們將用于申請(qǐng)ThreadCache類對(duì)象的定長(zhǎng)內(nèi)存池定義為靜態(tài)的,保持全局只有一個(gè),讓所有線程創(chuàng)建自己的thread cache時(shí),都在個(gè)定長(zhǎng)內(nèi)存池中申請(qǐng)內(nèi)存就行了。
但注意在從該定長(zhǎng)內(nèi)存池中申請(qǐng)內(nèi)存時(shí)需要加鎖,防止多個(gè)線程同時(shí)申請(qǐng)自己的ThreadCache對(duì)象而導(dǎo)致線程安全問(wèn)題。
最后在SpanList的構(gòu)造函數(shù)中也用到了new,因?yàn)镾panList是帶頭循環(huán)雙向鏈表,所以在構(gòu)造期間我們需要申請(qǐng)一個(gè)span對(duì)象作為雙鏈表的頭結(jié)點(diǎn)。
//帶頭雙向循環(huán)鏈表 class SpanList { public: SpanList() { _head = _spanPool.New(); _head->_next = _head; _head->_prev = _head; } private: Span* _head; static ObjectPool<Span> _spanPool; };
由于每個(gè)span雙鏈表只需要一個(gè)頭結(jié)點(diǎn),因此將這個(gè)定長(zhǎng)內(nèi)存池定義為靜態(tài)的,保持全局只有一個(gè),讓所有span雙鏈表在申請(qǐng)頭結(jié)點(diǎn)時(shí),都在一個(gè)定長(zhǎng)內(nèi)存池中申請(qǐng)內(nèi)存就行了。
釋放對(duì)象時(shí)優(yōu)化為不傳對(duì)象大小
當(dāng)我們使用malloc函數(shù)申請(qǐng)內(nèi)存時(shí),需要指明申請(qǐng)內(nèi)存的大??;而當(dāng)我們使用free函數(shù)釋放內(nèi)存時(shí),只需要傳入指向這塊內(nèi)存的指針即可。
而我們目前實(shí)現(xiàn)的內(nèi)存池,在釋放對(duì)象時(shí)除了需要傳入指向該對(duì)象的指針,還需要傳入該對(duì)象的大小。
原因如下:
- 如果釋放的是大于256KB的對(duì)象,需要根據(jù)對(duì)象的大小來(lái)判斷這塊內(nèi)存到底應(yīng)該還給page cache,還是應(yīng)該直接還給堆。
- 如果釋放的是小于等于256KB的對(duì)象,需要根據(jù)對(duì)象的大小計(jì)算出應(yīng)該還給thread cache的哪一個(gè)哈希桶。
如果我們也想做到,在釋放對(duì)象時(shí)不用傳入對(duì)象的大小,那么我們就需要建立對(duì)象地址與對(duì)象大小之間的映射。由于現(xiàn)在可以通過(guò)對(duì)象的地址找到其對(duì)應(yīng)的span,而span的自由鏈表中掛的都是相同大小的對(duì)象。
因此我們可以在Span結(jié)構(gòu)中再增加一個(gè)_objSize成員,該成員代表著這個(gè)span管理的內(nèi)存塊被切成的一個(gè)個(gè)對(duì)象的大小。
//管理以頁(yè)為單位的大塊內(nèi)存 struct Span { PAGE_ID _pageId = 0; //大塊內(nèi)存起始頁(yè)的頁(yè)號(hào) size_t _n = 0; //頁(yè)的數(shù)量 Span* _next = nullptr; //雙鏈表結(jié)構(gòu) Span* _prev = nullptr; size_t _objSize = 0; //切好的小對(duì)象的大小 size_t _useCount = 0; //切好的小塊內(nèi)存,被分配給thread cache的計(jì)數(shù) void* _freeList = nullptr; //切好的小塊內(nèi)存的自由鏈表 bool _isUse = false; //是否在被使用 };
而所有的span都是從page cache中拿出來(lái)的,因此每當(dāng)我們調(diào)用NewSpan獲取到一個(gè)k頁(yè)的span時(shí),就應(yīng)該將這個(gè)span的_objSize保存下來(lái)。
Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size)); span->_objSize = size;
代碼中有兩處,一處是在central cache中獲取非空span時(shí),如果central cache對(duì)應(yīng)的桶中沒(méi)有非空的span,此時(shí)會(huì)調(diào)用NewSpan獲取一個(gè)k頁(yè)的span;另一處是當(dāng)申請(qǐng)大于256KB內(nèi)存時(shí),會(huì)直接調(diào)用NewSpan獲取一個(gè)k頁(yè)的span。
此時(shí)當(dāng)我們釋放對(duì)象時(shí),就可以直接從對(duì)象的span中獲取到該對(duì)象的大小,準(zhǔn)確來(lái)說(shuō)獲取到的是對(duì)齊以后的大小。
static void ConcurrentFree(void* ptr) { Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr); size_t size = span->_objSize; if (size > MAX_BYTES) //大于256KB的內(nèi)存釋放 { PageCache::GetInstance()->_pageMtx.lock(); PageCache::GetInstance()->ReleaseSpanToPageCache(span); PageCache::GetInstance()->_pageMtx.unlock(); } else { assert(pTLSThreadCache); pTLSThreadCache->Deallocate(ptr, size); } }
讀取映射關(guān)系時(shí)的加鎖問(wèn)題
我們將頁(yè)號(hào)與span之間的映射關(guān)系是存儲(chǔ)在PageCache類當(dāng)中的,當(dāng)我們?cè)L問(wèn)這個(gè)映射關(guān)系時(shí)是需要加鎖的,因?yàn)镾TL容器是不保證線程安全的。
對(duì)于當(dāng)前代碼來(lái)說(shuō),如果我們此時(shí)正在page cache進(jìn)行相關(guān)操作,那么訪問(wèn)這個(gè)映射關(guān)系是安全的,因?yàn)楫?dāng)進(jìn)入page cache之前是需要加鎖的,因此可以保證此時(shí)只有一個(gè)線程在進(jìn)行訪問(wèn)。
但如果我們是在central cache訪問(wèn)這個(gè)映射關(guān)系,或是在調(diào)用ConcurrentFree函數(shù)釋放內(nèi)存時(shí)訪問(wèn)這個(gè)映射關(guān)系,那么就存在線程安全的問(wèn)題。因?yàn)榇藭r(shí)可能其他線程正在page cache當(dāng)中進(jìn)行某些操作,并且該線程此時(shí)可能也在訪問(wèn)這個(gè)映射關(guān)系,因此當(dāng)我們?cè)趐age cache外部訪問(wèn)這個(gè)映射關(guān)系時(shí)是需要加鎖的。
實(shí)際就是在調(diào)用page cache對(duì)外提供訪問(wèn)映射關(guān)系的函數(shù)時(shí)需要加鎖,這里我們可以考慮使用C++當(dāng)中的unique_lock,當(dāng)然你也可以用普通的鎖。
//獲取從對(duì)象到span的映射 Span* PageCache::MapObjectToSpan(void* obj) { PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT; //頁(yè)號(hào) std::unique_lock<std::mutex> lock(_pageMtx); //構(gòu)造時(shí)加鎖,析構(gòu)時(shí)自動(dòng)解鎖 auto ret = _idSpanMap.find(id); if (ret != _idSpanMap.end()) { return ret->second; } else { assert(false); return nullptr; } }
多線程環(huán)境下對(duì)比malloc測(cè)試
之前我們只是對(duì)代碼進(jìn)行了一些基礎(chǔ)的單元測(cè)試,下面我們?cè)诙嗑€程場(chǎng)景下對(duì)比malloc進(jìn)行測(cè)試。
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds) { std::vector<std::thread> vthread(nworks); std::atomic<size_t> malloc_costtime = 0; std::atomic<size_t> free_costtime = 0; for (size_t k = 0; k < nworks; ++k) { vthread[k] = std::thread([&, k]() { std::vector<void*> v; v.reserve(ntimes); for (size_t j = 0; j < rounds; ++j) { size_t begin1 = clock(); for (size_t i = 0; i < ntimes; i++) { v.push_back(malloc(16)); //v.push_back(malloc((16 + i) % 8192 + 1)); } size_t end1 = clock(); size_t begin2 = clock(); for (size_t i = 0; i < ntimes; i++) { free(v[i]); } size_t end2 = clock(); v.clear(); malloc_costtime += (end1 - begin1); free_costtime += (end2 - begin2); } }); } for (auto& t : vthread) { t.join(); } printf("%u個(gè)線程并發(fā)執(zhí)行%u輪次,每輪次malloc %u次: 花費(fèi):%u ms\n", nworks, rounds, ntimes, malloc_costtime); printf("%u個(gè)線程并發(fā)執(zhí)行%u輪次,每輪次free %u次: 花費(fèi):%u ms\n", nworks, rounds, ntimes, free_costtime); printf("%u個(gè)線程并發(fā)malloc&free %u次,總計(jì)花費(fèi):%u ms\n", nworks, nworks*rounds*ntimes, malloc_costtime + free_costtime); } void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds) { std::vector<std::thread> vthread(nworks); std::atomic<size_t> malloc_costtime = 0; std::atomic<size_t> free_costtime = 0; for (size_t k = 0; k < nworks; ++k) { vthread[k] = std::thread([&]() { std::vector<void*> v; v.reserve(ntimes); for (size_t j = 0; j < rounds; ++j) { size_t begin1 = clock(); for (size_t i = 0; i < ntimes; i++) { v.push_back(ConcurrentAlloc(16)); //v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1)); } size_t end1 = clock(); size_t begin2 = clock(); for (size_t i = 0; i < ntimes; i++) { ConcurrentFree(v[i]); } size_t end2 = clock(); v.clear(); malloc_costtime += (end1 - begin1); free_costtime += (end2 - begin2); } }); } for (auto& t : vthread) { t.join(); } printf("%u個(gè)線程并發(fā)執(zhí)行%u輪次,每輪次concurrent alloc %u次: 花費(fèi):%u ms\n", nworks, rounds, ntimes, malloc_costtime); printf("%u個(gè)線程并發(fā)執(zhí)行%u輪次,每輪次concurrent dealloc %u次: 花費(fèi):%u ms\n", nworks, rounds, ntimes, free_costtime); printf("%u個(gè)線程并發(fā)concurrent alloc&dealloc %u次,總計(jì)花費(fèi):%u ms\n", nworks, nworks*rounds*ntimes, malloc_costtime + free_costtime); } int main() { size_t n = 10000; cout << "==========================================================" << endl; BenchmarkConcurrentMalloc(n, 4, 10); cout << endl << endl; BenchmarkMalloc(n, 4, 10); cout << "==========================================================" << endl; return 0; }
其中測(cè)試函數(shù)各個(gè)參數(shù)的含義如下:
- ntimes:?jiǎn)屋喆紊暾?qǐng)和釋放內(nèi)存的次數(shù)。
- nworks:線程數(shù)。
- rounds:輪次。
在測(cè)試函數(shù)中,我們通過(guò)clock函數(shù)分別獲取到每輪次申請(qǐng)和釋放所花費(fèi)的時(shí)間,然后將其對(duì)應(yīng)累加到malloc_costtime和free_costtime上。最后我們就得到了,nworks個(gè)線程跑rounds輪,每輪申請(qǐng)和釋放ntimes次,這個(gè)過(guò)程申請(qǐng)所消耗的時(shí)間、釋放所消耗的時(shí)間、申請(qǐng)和釋放總共消耗的時(shí)間。
注意,我們創(chuàng)建線程時(shí)讓線程執(zhí)行的是lambda表達(dá)式,而我們這里在使用lambda表達(dá)式時(shí),以值傳遞的方式捕捉了變量k,以引用傳遞的方式捕捉了其他父作用域中的變量,因此我們可以將各個(gè)線程消耗的時(shí)間累加到一起。
我們將所有線程申請(qǐng)內(nèi)存消耗的時(shí)間都累加到malloc_costtime上, 將釋放內(nèi)存消耗的時(shí)間都累加到free_costtime上,此時(shí)malloc_costtime和free_costtime可能被多個(gè)線程同時(shí)進(jìn)行累加操作的,所以存在線程安全的問(wèn)題。鑒于此,我們?cè)诙x這兩個(gè)變量時(shí)使用了atomic類模板,這時(shí)對(duì)它們的操作就是原子操作了。
固定大小內(nèi)存的申請(qǐng)和釋放
我們先來(lái)測(cè)試一下固定大小內(nèi)存的申請(qǐng)和釋放:
v.push_back(malloc(16)); v.push_back(ConcurrentAlloc(16));
此時(shí)4個(gè)線程執(zhí)行10輪操作,每輪申請(qǐng)釋放10000次,總共申請(qǐng)釋放了40萬(wàn)次,運(yùn)行后可以看到,malloc的效率還是更高的。
由于此時(shí)我們申請(qǐng)釋放的都是固定大小的對(duì)象,每個(gè)線程申請(qǐng)釋放時(shí)訪問(wèn)的都是各自thread cache的同一個(gè)桶,當(dāng)thread cache的這個(gè)桶中沒(méi)有對(duì)象或?qū)ο筇嘁獨(dú)w還時(shí),也都會(huì)訪問(wèn)central cache的同一個(gè)桶。此時(shí)central cache中的桶鎖就不起作用了,因?yàn)槲覀冏宑entral cache使用桶鎖的目的就是為了,讓多個(gè)thread cache可以同時(shí)訪問(wèn)central cache的不同桶,而此時(shí)每個(gè)thread cache訪問(wèn)的卻都是central cache中的同一個(gè)桶。
不同大小內(nèi)存的申請(qǐng)和釋放
下面我們?cè)賮?lái)測(cè)試一下不同大小內(nèi)存的申請(qǐng)和釋放:
v.push_back(malloc((16 + i) % 8192 + 1)); v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
運(yùn)行后可以看到,由于申請(qǐng)和釋放內(nèi)存的大小是不同的,此時(shí)central cache當(dāng)中的桶鎖就起作用了,ConcurrentAlloc的效率也有了較大增長(zhǎng),但相比malloc來(lái)說(shuō)還是差一點(diǎn)點(diǎn)。
復(fù)雜問(wèn)題的調(diào)試技巧
多線程調(diào)試比單線程調(diào)試要復(fù)雜得多,調(diào)試時(shí)各個(gè)線程之間會(huì)相互切換,并且每次調(diào)試切換的時(shí)機(jī)也是不固定的,這就使得調(diào)試過(guò)程變得非常難以控制。
下面給出三個(gè)調(diào)試時(shí)的小技巧:
1、條件斷點(diǎn)
一般情況下我們可以直接運(yùn)行程序,通過(guò)報(bào)錯(cuò)來(lái)查找問(wèn)題。如果此時(shí)報(bào)的是斷言錯(cuò)誤,那么我們可以直接定位到報(bào)錯(cuò)的位置,然后將此處的斷言改為與斷言條件相反的if判斷,在if語(yǔ)句里面打上一個(gè)斷點(diǎn),但注意空語(yǔ)句是無(wú)法打斷點(diǎn)的,這時(shí)我們隨便在if里面加上一句代碼就可以打斷點(diǎn)了。
此外,條件斷點(diǎn)也可以通過(guò)右擊普通斷點(diǎn)來(lái)進(jìn)行設(shè)置。
右擊后即可設(shè)置相應(yīng)的條件,程序運(yùn)行到此處時(shí)如果滿足該條件則會(huì)停下來(lái)。
運(yùn)行到條件斷點(diǎn)處后,我們就可以對(duì)當(dāng)前程序進(jìn)行進(jìn)一步分析,找出斷言錯(cuò)誤的被觸發(fā)原因。
2、查看函數(shù)棧幀
當(dāng)程序運(yùn)行到斷點(diǎn)處時(shí),我們需要對(duì)當(dāng)前位置進(jìn)行分析,如果檢查后發(fā)現(xiàn)當(dāng)前函數(shù)是沒(méi)有問(wèn)題的,這時(shí)可能需要回到調(diào)用該函數(shù)的地方進(jìn)行進(jìn)一步分析,此時(shí)我們可以依次點(diǎn)擊“調(diào)試→窗口→調(diào)用堆棧”。
此時(shí)我們就可以看到當(dāng)前函數(shù)棧幀的調(diào)用情況,其中黃色箭頭指向的是當(dāng)前所在的函數(shù)棧幀。
雙擊函數(shù)棧幀中的其他函數(shù),就可以跳轉(zhuǎn)到該函數(shù)對(duì)應(yīng)的棧幀,此時(shí)淺灰色箭頭指向的就是當(dāng)前跳轉(zhuǎn)到的函數(shù)棧幀。
需要注意的是,監(jiān)視窗口只能查看當(dāng)前棧幀中的變量。如果要查看此時(shí)其他函數(shù)棧幀中變量的情況,就可以通過(guò)函數(shù)棧幀跳轉(zhuǎn)來(lái)查看。
3、疑似死循環(huán)時(shí)中斷程序
當(dāng)你在某個(gè)地方設(shè)置斷點(diǎn)后,如果遲遲沒(méi)有運(yùn)行到斷點(diǎn)處,而程序也沒(méi)有崩潰,這時(shí)有可能是程序進(jìn)入到某個(gè)死循環(huán)了。
這時(shí)我們可以依次點(diǎn)擊“調(diào)試→全部中斷”。
這時(shí)程序就會(huì)在當(dāng)前運(yùn)行的地方停下來(lái)。
性能瓶頸分析
經(jīng)過(guò)前面的測(cè)試可以看到,我們的代碼此時(shí)與malloc之間還是有差距的,此時(shí)我們就應(yīng)該分析分析我們當(dāng)前項(xiàng)目的瓶頸在哪里,但這不能簡(jiǎn)單的憑感覺(jué),我們應(yīng)該用性能分析的工具來(lái)進(jìn)行分析。
VS編譯器下性能分析的操作步驟
VS編譯器中就帶有性能分析的工具的,我們可以依次點(diǎn)擊“調(diào)試→性能和診斷”進(jìn)行性能分析,注意該操作要在Debug模式下進(jìn)行。
同時(shí)我們將代碼中n的值由10000調(diào)成了1000,否則該分析過(guò)程可能會(huì)花費(fèi)較多時(shí)間,并且將malloc的測(cè)試代碼進(jìn)行了屏蔽,因?yàn)槲覀円治龅氖俏覀儗?shí)現(xiàn)的高并發(fā)內(nèi)存池。
int main() { size_t n = 1000; cout << "==========================================================" << endl; BenchmarkConcurrentMalloc(n, 4, 10); cout << endl << endl; //BenchmarkMalloc(n, 4, 10); cout << "==========================================================" << endl; return 0; }
在點(diǎn)擊了“調(diào)試→性能和診斷”后會(huì)彈出一個(gè)提示框,我們直接點(diǎn)擊“開(kāi)始”進(jìn)行了。
然后會(huì)彈出一個(gè)選項(xiàng)框,這里我們選擇的是第二個(gè),因?yàn)槲覀円治龅氖歉鱾€(gè)函數(shù)的用時(shí)時(shí)間,然后點(diǎn)擊下一步。
出現(xiàn)以下選項(xiàng)框繼續(xù)點(diǎn)擊下一步。
最后點(diǎn)擊完成,就可以等待分析結(jié)果了。
分析性能瓶頸
通過(guò)分析結(jié)果可以看到,光是Deallocate和MapObjectToSpan這兩個(gè)函數(shù)就占用了一半多的時(shí)間。
而在Deallocate函數(shù)中,調(diào)用ListTooLong函數(shù)時(shí)消耗的時(shí)間是最多的。
繼續(xù)往下看,在ListTooLong函數(shù)中,調(diào)用ReleaseListToSpans函數(shù)時(shí)消耗的時(shí)間是最多的。
再進(jìn)一步看,在ReleaseListToSpans函數(shù)中,調(diào)用MapObjectToSpan函數(shù)時(shí)消耗的時(shí)間是最多的。
也就是說(shuō),最終消耗時(shí)間最多的實(shí)際就是MapObjectToSpan函數(shù),我們這時(shí)再來(lái)看看為什么調(diào)用MapObjectToSpan函數(shù)會(huì)消耗這么多時(shí)間。通過(guò)觀察我們最終發(fā)現(xiàn),調(diào)用該函數(shù)時(shí)會(huì)消耗這么多時(shí)間就是因?yàn)殒i的原因。
因此當(dāng)前項(xiàng)目的瓶頸點(diǎn)就在鎖競(jìng)爭(zhēng)上面,需要解決調(diào)用MapObjectToSpan函數(shù)訪問(wèn)映射關(guān)系時(shí)的加鎖問(wèn)題。tcmalloc當(dāng)中針對(duì)這一點(diǎn)使用了基數(shù)樹(shù)進(jìn)行優(yōu)化,使得在讀取這個(gè)映射關(guān)系時(shí)可以做到不加鎖。
針對(duì)性能瓶頸使用基數(shù)樹(shù)進(jìn)行優(yōu)化
基數(shù)樹(shù)實(shí)際上就是一個(gè)分層的哈希表,根據(jù)所分層數(shù)不同可分為單層基數(shù)樹(shù)、二層基數(shù)樹(shù)、三層基數(shù)樹(shù)等。
單層基數(shù)樹(shù)
單層基數(shù)樹(shù)實(shí)際采用的就是直接定址法,每一個(gè)頁(yè)號(hào)對(duì)應(yīng)span的地址就存儲(chǔ)數(shù)組中在以該頁(yè)號(hào)為下標(biāo)的位置。
最壞的情況下我們需要建立所有頁(yè)號(hào)與其span之間的映射關(guān)系,因此這個(gè)數(shù)組中元素個(gè)數(shù)應(yīng)該與頁(yè)號(hào)的數(shù)目相同,數(shù)組中每個(gè)位置存儲(chǔ)的就是對(duì)應(yīng)span的指針。
//單層基數(shù)樹(shù) template <int BITS> class TCMalloc_PageMap1 { public: typedef uintptr_t Number; explicit TCMalloc_PageMap1() { size_t size = sizeof(void*) << BITS; //需要開(kāi)辟數(shù)組的大小 size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT); //按頁(yè)對(duì)齊后的大小 array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT); //向堆申請(qǐng)空間 memset(array_, 0, size); //對(duì)申請(qǐng)到的內(nèi)存進(jìn)行清理 } void* get(Number k) const { if ((k >> BITS) > 0) //k的范圍不在[0, 2^BITS-1] { return NULL; } return array_[k]; //返回該頁(yè)號(hào)對(duì)應(yīng)的span } void set(Number k, void* v) { assert((k >> BITS) == 0); //k的范圍必須在[0, 2^BITS-1] array_[k] = v; //建立映射 } private: void** array_; //存儲(chǔ)映射關(guān)系的數(shù)組 static const int LENGTH = 1 << BITS; //頁(yè)的數(shù)目 };
此時(shí)當(dāng)我們需要建立映射時(shí)就調(diào)用set函數(shù),需要讀取映射關(guān)系時(shí),就調(diào)用get函數(shù)就行了。
代碼中的非類型模板參數(shù)BITS表示存儲(chǔ)頁(yè)號(hào)最多需要比特位的個(gè)數(shù)。在32位下我們傳入的是32-PAGE_SHIFT,在64位下傳入的是64-PAGE_SHIFT。而其中的LENGTH成員代表的就是頁(yè)號(hào)的數(shù)目,即 2 B I T S 2^{BITS} 2BITS。
比如32位平臺(tái)下,以一頁(yè)大小為8K為例,此時(shí)頁(yè)的數(shù)目就是 2 32 ÷ 2 13 = 2 19 2^{32}\div2^{13}=2^{19} 232÷213=219,因此存儲(chǔ)頁(yè)號(hào)最多需要19個(gè)比特位,此時(shí)傳入非類型模板參數(shù)的值就是 32 − 13 = 19 32-13=19 32−13=19。由于32位平臺(tái)下指針的大小是4字節(jié),因此該數(shù)組的大小就是 2 19 × 4 = 2 21 = 2 M 2^{19}\times4=2^{21}=2M 219×4=221=2M,內(nèi)存消耗不大,是可行的。但如果是在64位平臺(tái)下,此時(shí)該數(shù)組的大小是 2 51 × 8 = 2 54 = 2 24 G 2^{51}\times8=2^{54}=2^{24}G 251×8=254=224G,這顯然是不可行的,實(shí)際上對(duì)于64位的平臺(tái),我們需要使用三層基數(shù)樹(shù)。
二層基數(shù)樹(shù)
這里還是以32位平臺(tái)下,一頁(yè)的大小為8K為例來(lái)說(shuō)明,此時(shí)存儲(chǔ)頁(yè)號(hào)最多需要19個(gè)比特位。而二層基數(shù)樹(shù)實(shí)際上就是把這19個(gè)比特位分為兩次進(jìn)行映射。
比如用前5個(gè)比特位在基數(shù)樹(shù)的第一層進(jìn)行映射,映射后得到對(duì)應(yīng)的第二層,然后用剩下的比特位在基數(shù)樹(shù)的第二層進(jìn)行映射,映射后最終得到該頁(yè)號(hào)對(duì)應(yīng)的span指針。
在二層基數(shù)樹(shù)中,第一層的數(shù)組占用 2 5 × 4 = 2 7 B y t e 2^{5}\times4=2^{7}Byte 25×4=27Byte空間,第二層的數(shù)組最多占用 2 5 × 2 14 × 4 = 2 21 = 2 M 2^{5}\times2^{14}\times4=2^{21}=2M 25×214×4=221=2M。二層基數(shù)樹(shù)相比一層基數(shù)樹(shù)的好處就是,一層基數(shù)樹(shù)必須一開(kāi)始就把 2 M 2M 2M的數(shù)組開(kāi)辟出來(lái),而二層基數(shù)樹(shù)一開(kāi)始時(shí)只需將第一層的數(shù)組開(kāi)辟出來(lái),當(dāng)需要進(jìn)行某一頁(yè)號(hào)映射時(shí)再開(kāi)辟對(duì)應(yīng)的第二層的數(shù)組就行了。
//二層基數(shù)樹(shù) template <int BITS> class TCMalloc_PageMap2 { private: static const int ROOT_BITS = 5; //第一層對(duì)應(yīng)頁(yè)號(hào)的前5個(gè)比特位 static const int ROOT_LENGTH = 1 << ROOT_BITS; //第一層存儲(chǔ)元素的個(gè)數(shù) static const int LEAF_BITS = BITS - ROOT_BITS; //第二層對(duì)應(yīng)頁(yè)號(hào)的其余比特位 static const int LEAF_LENGTH = 1 << LEAF_BITS; //第二層存儲(chǔ)元素的個(gè)數(shù) //第一層數(shù)組中存儲(chǔ)的元素類型 struct Leaf { void* values[LEAF_LENGTH]; }; Leaf* root_[ROOT_LENGTH]; //第一層數(shù)組 public: typedef uintptr_t Number; explicit TCMalloc_PageMap2() { memset(root_, 0, sizeof(root_)); //將第一層的空間進(jìn)行清理 PreallocateMoreMemory(); //直接將第二層全部開(kāi)辟 } void* get(Number k) const { const Number i1 = k >> LEAF_BITS; //第一層對(duì)應(yīng)的下標(biāo) const Number i2 = k & (LEAF_LENGTH - 1); //第二層對(duì)應(yīng)的下標(biāo) if ((k >> BITS) > 0 || root_[i1] == NULL) //頁(yè)號(hào)值不在范圍或沒(méi)有建立過(guò)映射 { return NULL; } return root_[i1]->values[i2]; //返回該頁(yè)號(hào)對(duì)應(yīng)span的指針 } void set(Number k, void* v) { const Number i1 = k >> LEAF_BITS; //第一層對(duì)應(yīng)的下標(biāo) const Number i2 = k & (LEAF_LENGTH - 1); //第二層對(duì)應(yīng)的下標(biāo) assert(i1 < ROOT_LENGTH); root_[i1]->values[i2] = v; //建立該頁(yè)號(hào)與對(duì)應(yīng)span的映射 } //確保映射[start,start_n-1]頁(yè)號(hào)的空間是開(kāi)辟好了的 bool Ensure(Number start, size_t n) { for (Number key = start; key <= start + n - 1;) { const Number i1 = key >> LEAF_BITS; if (i1 >= ROOT_LENGTH) //頁(yè)號(hào)超出范圍 return false; if (root_[i1] == NULL) //第一層i1下標(biāo)指向的空間未開(kāi)辟 { //開(kāi)辟對(duì)應(yīng)空間 static ObjectPool<Leaf> leafPool; Leaf* leaf = (Leaf*)leafPool.New(); memset(leaf, 0, sizeof(*leaf)); root_[i1] = leaf; } key = ((key >> LEAF_BITS) + 1) << LEAF_BITS; //繼續(xù)后續(xù)檢查 } return true; } void PreallocateMoreMemory() { Ensure(0, 1 << BITS); //將第二層的空間全部開(kāi)辟好 } };
因此在二層基數(shù)樹(shù)中有一個(gè)Ensure函數(shù),當(dāng)需要建立某一頁(yè)號(hào)與其span之間的映射關(guān)系時(shí),需要先調(diào)用該Ensure函數(shù)確保用于映射該頁(yè)號(hào)的空間是開(kāi)辟了的,如果沒(méi)有開(kāi)辟則會(huì)立即開(kāi)辟。
而在32位平臺(tái)下,就算將二層基數(shù)樹(shù)第二層的數(shù)組全部開(kāi)辟出來(lái)也就消耗了 2 M 2M 2M的空間,內(nèi)存消耗也不算太多,因此我們可以在構(gòu)造二層基數(shù)樹(shù)時(shí)就把第二層的數(shù)組全部開(kāi)辟出來(lái)。
三層基數(shù)樹(shù)
上面一層基數(shù)樹(shù)和二層基數(shù)樹(shù)都適用于32位平臺(tái),而對(duì)于64位的平臺(tái)就需要用三層基數(shù)樹(shù)了。三層基數(shù)樹(shù)與二層基數(shù)樹(shù)類似,三層基數(shù)樹(shù)實(shí)際上就是把存儲(chǔ)頁(yè)號(hào)的若干比特位分為三次進(jìn)行映射。
此時(shí)只有當(dāng)要建立某一頁(yè)號(hào)的映射關(guān)系時(shí),再開(kāi)辟對(duì)應(yīng)的數(shù)組空間,而沒(méi)有建立映射的頁(yè)號(hào)就可以不用開(kāi)辟其對(duì)應(yīng)的數(shù)組空間,此時(shí)就能在一定程度上節(jié)省內(nèi)存空間。
//三層基數(shù)樹(shù) template <int BITS> class TCMalloc_PageMap3 { private: static const int INTERIOR_BITS = (BITS + 2) / 3; //第一、二層對(duì)應(yīng)頁(yè)號(hào)的比特位個(gè)數(shù) static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS; //第一、二層存儲(chǔ)元素的個(gè)數(shù) static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS; //第三層對(duì)應(yīng)頁(yè)號(hào)的比特位個(gè)數(shù) static const int LEAF_LENGTH = 1 << LEAF_BITS; //第三層存儲(chǔ)元素的個(gè)數(shù) struct Node { Node* ptrs[INTERIOR_LENGTH]; }; struct Leaf { void* values[LEAF_LENGTH]; }; Node* NewNode() { static ObjectPool<Node> nodePool; Node* result = nodePool.New(); if (result != NULL) { memset(result, 0, sizeof(*result)); } return result; } Node* root_; public: typedef uintptr_t Number; explicit TCMalloc_PageMap3() { root_ = NewNode(); } void* get(Number k) const { const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS); //第一層對(duì)應(yīng)的下標(biāo) const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1); //第二層對(duì)應(yīng)的下標(biāo) const Number i3 = k & (LEAF_LENGTH - 1); //第三層對(duì)應(yīng)的下標(biāo) //頁(yè)號(hào)超出范圍,或映射該頁(yè)號(hào)的空間未開(kāi)辟 if ((k >> BITS) > 0 || root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) { return NULL; } return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3]; //返回該頁(yè)號(hào)對(duì)應(yīng)span的指針 } void set(Number k, void* v) { assert(k >> BITS == 0); const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS); //第一層對(duì)應(yīng)的下標(biāo) const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1); //第二層對(duì)應(yīng)的下標(biāo) const Number i3 = k & (LEAF_LENGTH - 1); //第三層對(duì)應(yīng)的下標(biāo) Ensure(k, 1); //確保映射第k頁(yè)頁(yè)號(hào)的空間是開(kāi)辟好了的 reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v; //建立該頁(yè)號(hào)與對(duì)應(yīng)span的映射 } //確保映射[start,start+n-1]頁(yè)號(hào)的空間是開(kāi)辟好了的 bool Ensure(Number start, size_t n) { for (Number key = start; key <= start + n - 1;) { const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS); //第一層對(duì)應(yīng)的下標(biāo) const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1); //第二層對(duì)應(yīng)的下標(biāo) if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH) //下標(biāo)值超出范圍 return false; if (root_->ptrs[i1] == NULL) //第一層i1下標(biāo)指向的空間未開(kāi)辟 { //開(kāi)辟對(duì)應(yīng)空間 Node* n = NewNode(); if (n == NULL) return false; root_->ptrs[i1] = n; } if (root_->ptrs[i1]->ptrs[i2] == NULL) //第二層i2下標(biāo)指向的空間未開(kāi)辟 { //開(kāi)辟對(duì)應(yīng)空間 static ObjectPool<Leaf> leafPool; Leaf* leaf = leafPool.New(); if (leaf == NULL) return false; memset(leaf, 0, sizeof(*leaf)); root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf); } key = ((key >> LEAF_BITS) + 1) << LEAF_BITS; //繼續(xù)后續(xù)檢查 } return true; } void PreallocateMoreMemory() {} };
因此當(dāng)我們要建立某一頁(yè)號(hào)的映射關(guān)系時(shí),需要先確保存儲(chǔ)該頁(yè)映射的數(shù)組空間是開(kāi)辟好了的,也就是調(diào)用代碼中的Ensure函數(shù),如果對(duì)應(yīng)數(shù)組空間未開(kāi)辟則會(huì)立馬開(kāi)辟對(duì)應(yīng)的空間。
使用基數(shù)樹(shù)進(jìn)行優(yōu)化代碼實(shí)現(xiàn)
代碼更改
現(xiàn)在我們用基數(shù)樹(shù)對(duì)代碼進(jìn)行優(yōu)化,此時(shí)將PageCache類當(dāng)中的unorder_map用基數(shù)樹(shù)進(jìn)行替換即可,由于當(dāng)前是32位平臺(tái),因此這里隨便用幾層基數(shù)樹(shù)都可以。
//單例模式 class PageCache { public: //... private: //std::unordered_map<PAGE_ID, Span*> _idSpanMap; TCMalloc_PageMap1<32 - PAGE_SHIFT> _idSpanMap; };
此時(shí)當(dāng)我們需要建立頁(yè)號(hào)與span的映射時(shí),就調(diào)用基數(shù)樹(shù)當(dāng)中的set函數(shù)。
_idSpanMap.set(span->_pageId, span);
而當(dāng)我們需要讀取某一頁(yè)號(hào)對(duì)應(yīng)的span時(shí),就調(diào)用基數(shù)樹(shù)當(dāng)中的get函數(shù)。
Span* ret = (Span*)_idSpanMap.get(id);
并且現(xiàn)在PageCache類向外提供的,用于讀取映射關(guān)系的MapObjectToSpan函數(shù)內(nèi)部就不需要加鎖了。
//獲取從對(duì)象到span的映射 Span* PageCache::MapObjectToSpan(void* obj) { PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT; //頁(yè)號(hào) Span* ret = (Span*)_idSpanMap.get(id); assert(ret != nullptr); return ret; }
為什么讀取基數(shù)樹(shù)映射關(guān)系時(shí)不需要加鎖?
當(dāng)某個(gè)線程在讀取映射關(guān)系時(shí),可能另外一個(gè)線程正在建立其他頁(yè)號(hào)的映射關(guān)系,而此時(shí)無(wú)論我們用的是C++當(dāng)中的map還是unordered_map,在讀取映射關(guān)系時(shí)都是需要加鎖的。
因?yàn)镃++中map的底層數(shù)據(jù)結(jié)構(gòu)是紅黑樹(shù),unordered_map的底層數(shù)據(jù)結(jié)構(gòu)是哈希表,而無(wú)論是紅黑樹(shù)還是哈希表,當(dāng)我們?cè)诓迦霐?shù)據(jù)時(shí)其底層的結(jié)構(gòu)都有可能會(huì)發(fā)生變化。比如紅黑樹(shù)在插入數(shù)據(jù)時(shí)可能會(huì)引起樹(shù)的旋轉(zhuǎn),而哈希表在插入數(shù)據(jù)時(shí)可能會(huì)引起哈希表擴(kuò)容。此時(shí)要避免出現(xiàn)數(shù)據(jù)不一致的問(wèn)題,就不能讓插入操作和讀取操作同時(shí)進(jìn)行,因此我們?cè)谧x取映射關(guān)系的時(shí)候是需要加鎖的。
而對(duì)于基數(shù)樹(shù)來(lái)說(shuō)就不一樣了,基數(shù)樹(shù)的空間一旦開(kāi)辟好了就不會(huì)發(fā)生變化,因此無(wú)論什么時(shí)候去讀取某個(gè)頁(yè)的映射,都是對(duì)應(yīng)在一個(gè)固定的位置進(jìn)行讀取的。并且我們不會(huì)同時(shí)對(duì)同一個(gè)頁(yè)進(jìn)行讀取映射和建立映射的操作,因?yàn)槲覀冎挥性卺尫艑?duì)象時(shí)才需要讀取映射,而建立映射的操作都是在page cache進(jìn)行的。也就是說(shuō),讀取映射時(shí)讀取的都是對(duì)應(yīng)span的_useCount不等于0的頁(yè),而建立映射時(shí)建立的都是對(duì)應(yīng)span的_useCount等于0的頁(yè),所以說(shuō)我們不會(huì)同時(shí)對(duì)同一個(gè)頁(yè)進(jìn)行讀取映射和建立映射的操作。
再次對(duì)比malloc進(jìn)行測(cè)試
還是同樣的代碼,只不過(guò)我們用基數(shù)樹(shù)對(duì)代碼進(jìn)行了優(yōu)化,這時(shí)測(cè)試固定大小內(nèi)存的申請(qǐng)和釋放的結(jié)果如下:
可以看到,這時(shí)就算申請(qǐng)釋放的是固定大小的對(duì)象,其效率都是malloc的兩倍。下面在申請(qǐng)釋放不同大小的對(duì)象時(shí),由于central cache的桶鎖起作用了,其效率更是變成了malloc的好幾倍。
打包成動(dòng)靜態(tài)庫(kù)
實(shí)際Google開(kāi)源的tcmalloc是會(huì)直接用于替換malloc的,不同平臺(tái)替換的方式不同。比如基于Unix的系統(tǒng)上的glibc,使用了weak alias的方式替換;而對(duì)于某些其他平臺(tái),需要使用hook的鉤子技術(shù)來(lái)做。
對(duì)于我們當(dāng)前實(shí)現(xiàn)的項(xiàng)目,可以考慮將其打包成靜態(tài)庫(kù)或動(dòng)態(tài)庫(kù)。我們先右擊解決方案資源管理器當(dāng)中的項(xiàng)目名稱,然后選擇屬性。
此時(shí)會(huì)彈出該選項(xiàng)卡,按照以下圖示就可以選擇將其打包成靜態(tài)庫(kù)或動(dòng)態(tài)庫(kù)了。
項(xiàng)目源碼
Github:https://github.com/chenlong-xcy/standard-project/tree/main/ConcurrentMemoryPool
到此這篇關(guān)于C++高并發(fā)內(nèi)存池的實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)C++高并發(fā)內(nèi)存池內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- C++基本組件之內(nèi)存池詳解
- C++中內(nèi)存池的簡(jiǎn)單原理及實(shí)現(xiàn)詳解
- C++中高性能內(nèi)存池的實(shí)現(xiàn)詳解
- C++設(shè)計(jì)一個(gè)簡(jiǎn)單內(nèi)存池的全過(guò)程
- C++如何實(shí)現(xiàn)定長(zhǎng)內(nèi)存池詳解
- C++內(nèi)存池兩種方案解析
- C++手寫(xiě)內(nèi)存池的案例詳解
- C++內(nèi)存池的簡(jiǎn)單實(shí)現(xiàn)
- C++高并發(fā)內(nèi)存池的整體設(shè)計(jì)和實(shí)現(xiàn)思路
- C++高效內(nèi)存池實(shí)現(xiàn)減少動(dòng)態(tài)分配開(kāi)銷的解決方案
相關(guān)文章
詳解如何將Spire.Doc for C++集成到C++程序中
Spire.Doc for C++是一個(gè)專業(yè)的Word庫(kù),供開(kāi)發(fā)人員在任何類型的C++應(yīng)用程序中閱讀、創(chuàng)建、編輯、比較和轉(zhuǎn)換 Word 文檔,本文演示了如何以兩種不同的方式將 Spire.Doc for C++ 集成到您的 C++ 應(yīng)用程序中,希望對(duì)大家有所幫助2023-05-05C++中產(chǎn)生臨時(shí)對(duì)象的情況及其解決方案
這篇文章主要介紹了C++中產(chǎn)生臨時(shí)對(duì)象的情況及其解決方案,以值傳遞的方式給函數(shù)傳參,類型轉(zhuǎn)換以及函數(shù)需要返回對(duì)象時(shí),并給對(duì)應(yīng)給出了詳細(xì)的解決方案,通過(guò)圖文結(jié)合的方式講解的非常詳細(xì),需要的朋友可以參考下2024-05-05VC基于ADO技術(shù)訪問(wèn)數(shù)據(jù)庫(kù)的方法
這篇文章主要介紹了VC基于ADO技術(shù)訪問(wèn)數(shù)據(jù)庫(kù)的方法,較為詳細(xì)的分析了VC使用ADO操作數(shù)據(jù)庫(kù)的相關(guān)實(shí)現(xiàn)技巧,具有一定參考借鑒價(jià)值,需要的朋友可以參考下2015-10-10淺談stringstream 的.str()正確用法和清空操作
下面小編就為大家?guī)?lái)一篇淺談stringstream 的.str()正確用法和清空操作。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2016-12-12C++實(shí)現(xiàn)寵物商店信息管理系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了C++實(shí)現(xiàn)寵物商店信息管理系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-03-03