C++高并發(fā)內(nèi)存池的實現(xiàn)
項目介紹
本項目實現(xiàn)的是一個高并發(fā)的內(nèi)存池,它的原型是Google的一個開源項目tcmalloc,tcmalloc全稱Thread-Caching Malloc,即線程緩存的malloc,實現(xiàn)了高效的多線程內(nèi)存管理,用于替換系統(tǒng)的內(nèi)存分配相關(guān)函數(shù)malloc和free。
tcmalloc的知名度也是非常高的,不少公司都在用它,比如Go語言就直接用它做了自己的內(nèi)存分配器。
該項目就是把tcmalloc中最核心的框架簡化后拿出來,模擬實現(xiàn)出一個mini版的高并發(fā)內(nèi)存池,目的就是學習tcmalloc的精華。
該項目主要涉及C/C++、數(shù)據(jù)結(jié)構(gòu)(鏈表、哈希桶)、操作系統(tǒng)內(nèi)存管理、單例模式、多線程、互斥鎖等方面的技術(shù)。
內(nèi)存池介紹
池化技術(shù)
在說內(nèi)存池之前,我們得先了解一下“池化技術(shù)”。所謂“池化技術(shù)”,就是程序先向系統(tǒng)申請過量的資源,然后自己進行管理,以備不時之需。
之所以要申請過量的資源,是因為申請和釋放資源都有較大的開銷,不如提前申請一些資源放入“池”中,當需要資源時直接從“池”中獲取,不需要時就將該資源重新放回“池”中即可。這樣使用時就會變得非常快捷,可以大大提高程序的運行效率。
在計算機中,有很多使用“池”這種技術(shù)的地方,除了內(nèi)存池之外,還有連接池、線程池、對象池等。以服務(wù)器上的線程池為例,它的主要思想就是:先啟動若干數(shù)量的線程,讓它們處于睡眠狀態(tài),當接收到客戶端的請求時,喚醒池中某個睡眠的線程,讓它來處理客戶端的請求,當處理完這個請求后,線程又進入睡眠狀態(tài)。
內(nèi)存池
內(nèi)存池是指程序預(yù)先向操作系統(tǒng)申請一塊足夠大的內(nèi)存,此后,當程序中需要申請內(nèi)存的時候,不是直接向操作系統(tǒng)申請,而是直接從內(nèi)存池中獲?。煌?,當釋放內(nèi)存的時候,并不是真正將內(nèi)存返回給操作系統(tǒng),而是將內(nèi)存返回給內(nèi)存池。當程序退出時(或某個特定時間),內(nèi)存池才將之前申請的內(nèi)存真正釋放。
內(nèi)存池主要解決的問題
內(nèi)存池主要解決的就是效率的問題,它能夠避免讓程序頻繁的向系統(tǒng)申請和釋放內(nèi)存。其次,內(nèi)存池作為系統(tǒng)的內(nèi)存分配器,還需要嘗試解決內(nèi)存碎片的問題。
內(nèi)存碎片分為內(nèi)部碎片和外部碎片:
- 外部碎片是一些空閑的小塊內(nèi)存區(qū)域,由于這些內(nèi)存空間不連續(xù),以至于合計的內(nèi)存足夠,但是不能滿足一些內(nèi)存分配申請需求。
- 內(nèi)部碎片是由于一些對齊的需求,導(dǎo)致分配出去的空間中一些內(nèi)存無法被利用。
注意: 內(nèi)存池嘗試解決的是外部碎片的問題,同時也盡可能的減少內(nèi)部碎片的產(chǎn)生。
malloc
C/C++中我們要動態(tài)申請內(nèi)存并不是直接去堆申請的,而是通過malloc函數(shù)去申請的,包括C++中的new實際上也是封裝了malloc函數(shù)的。
我們申請內(nèi)存塊時是先調(diào)用malloc,malloc再去向操作系統(tǒng)申請內(nèi)存。malloc實際就是一個內(nèi)存池,malloc相當于向操作系統(tǒng)“批發(fā)”了一塊較大的內(nèi)存空間,然后“零售”給程序用,當全部“售完”或程序有大量的內(nèi)存需求時,再根據(jù)實際需求向操作系統(tǒng)“進貨”。
malloc的實現(xiàn)方式有很多種,一般不同編譯器平臺用的都是不同的。比如Windows的VS系列中的malloc就是微軟自行實現(xiàn)的,而Linux下的gcc用的是glibc中的ptmalloc。
定長內(nèi)存池的實現(xiàn)
malloc其實就是一個通用的內(nèi)存池,在什么場景下都可以使用,但這也意味著malloc在什么場景下都不會有很高的性能,因為malloc并不是針對某種場景專門設(shè)計的。
定長內(nèi)存池就是針對固定大小內(nèi)存塊的申請和釋放的內(nèi)存池,由于定長內(nèi)存池只需要支持固定大小內(nèi)存塊的申請和釋放,因此我們可以將其性能做到極致,并且在實現(xiàn)定長內(nèi)存池時不需要考慮內(nèi)存碎片等問題,因為我們申請/釋放的都是固定大小的內(nèi)存塊。
我們可以通過實現(xiàn)定長內(nèi)存池來熟悉一下對簡單內(nèi)存池的控制,其次,這個定長內(nèi)存池后面會作為高并發(fā)內(nèi)存池的一個基礎(chǔ)組件。
如何實現(xiàn)定長?
在實現(xiàn)定長內(nèi)存池時要做到“定長”有很多種方法,比如我們可以使用非類型模板參數(shù),使得在該內(nèi)存池中申請到的對象的大小都是N。
template<size_t N> class ObjectPool {};
此外,定長內(nèi)存池也叫做對象池,在創(chuàng)建對象池時,對象池可以根據(jù)傳入的對象類型的大小來實現(xiàn)“定長”,因此我們可以通過使用模板參數(shù)來實現(xiàn)“定長”,比如創(chuàng)建定長內(nèi)存池時傳入的對象類型是int,那么該內(nèi)存池就只支持4字節(jié)大小內(nèi)存的申請和釋放。
template<class T> class ObjectPool {};
如何直接向堆申請空間?
既然是內(nèi)存池,那么我們首先得向系統(tǒng)申請一塊內(nèi)存空間,然后對其進行管理。要想直接向堆申請內(nèi)存空間,在Windows下,可以調(diào)用VirtualAlloc函數(shù);在Linux下,可以調(diào)用brk或mmap函數(shù)。
#ifdef _WIN32 #include <Windows.h> #else //... #endif //直接去堆上申請按頁申請空間 inline static void* SystemAlloc(size_t kpage) { #ifdef _WIN32 void* ptr = VirtualAlloc(0, kpage<<13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE); #else // linux下brk mmap等 #endif if (ptr == nullptr) throw std::bad_alloc(); return ptr; }
這里我們可以通過條件編譯將對應(yīng)平臺下向堆申請內(nèi)存的函數(shù)進行封裝,此后我們就不必再關(guān)心當前所在平臺,當我們需要直接向堆申請內(nèi)存時直接調(diào)用我們封裝后的SystemAlloc函數(shù)即可。
定長內(nèi)存池中應(yīng)該包含哪些成員變量?
對于向堆申請到的大塊內(nèi)存,我們可以用一個指針來對其進行管理,但僅用一個指針肯定是不夠的,我們還需要用一個變量來記錄這塊內(nèi)存的長度。
由于此后我們需要將這塊內(nèi)存進行切分,為了方便切分操作,指向這塊內(nèi)存的指針最好是字符指針,因為指針的類型決定了指針向前或向后走一步有多大距離,對于字符指針來說,當我們需要向后移動n個字節(jié)時,直接對字符指針進行加n操作即可。
其次,釋放回來的定長內(nèi)存塊也需要被管理,我們可以將這些釋放回來的定長內(nèi)存塊鏈接成一個鏈表,這里我們將管理釋放回來的內(nèi)存塊的鏈表叫做自由鏈表,為了能找到這個自由鏈表,我們還需要一個指向自由鏈表的指針。
因此,定長內(nèi)存池當中包含三個成員變量:
- _memory:指向大塊內(nèi)存的指針。
- _remainBytes:大塊內(nèi)存切分過程中剩余字節(jié)數(shù)。
- _freeList:還回來過程中鏈接的自由鏈表的頭指針。
內(nèi)存池如何管理釋放的對象?
對于還回來的定長內(nèi)存塊,我們可以用自由鏈表將其鏈接起來,但我們并不需要為其專門定義鏈式結(jié)構(gòu),我們可以讓內(nèi)存塊的前4個字節(jié)(32位平臺)或8個字節(jié)(64位平臺)作為指針,存儲后面內(nèi)存塊的起始地址即可。
因此在向自由鏈表插入被釋放的內(nèi)存塊時,先讓該內(nèi)存塊的前4個字節(jié)或8個字節(jié)存儲自由鏈表中第一個內(nèi)存塊的地址,然后再讓_freeList
指向該內(nèi)存塊即可,也就是一個簡單的鏈表頭插操作。
這里有一個有趣問題:如何讓一個指針在32位平臺下解引用后能向后訪問4個字節(jié),在64位平臺下解引用后能向后訪問8個字節(jié)?
首先我們得知道,32位平臺下指針的大小是4個字節(jié),64位平臺下指針的大小是8個字節(jié)。而指針指向數(shù)據(jù)的類型,決定了指針解引用后能向后訪問的空間大小,因此我們這里需要的是一個指向指針的指針,這里使用二級指針就行了。
當我們需要訪問一個內(nèi)存塊的前4/8個字節(jié)時,我們就可以先該內(nèi)存塊的地址先強轉(zhuǎn)為二級指針,由于二級指針存儲的是一級指針的地址,二級指針解引用能向后訪問一個指針的大小,因此在32位平臺下訪問的就是4個字節(jié),在64位平臺下訪問的就是8個字節(jié),此時我們訪問到了該內(nèi)存塊的前4/8個字節(jié)。
void*& NextObj(void* ptr) { return (*(void**)ptr); }
需要注意的是,在釋放對象時,我們應(yīng)該顯示調(diào)用該對象的析構(gòu)函數(shù)清理該對象,因為該對象可能還管理著其他某些資源,如果不對其進行清理那么這些資源將無法被釋放,就會導(dǎo)致內(nèi)存泄漏。
//釋放對象 void Delete(T* obj) { //顯示調(diào)用T的析構(gòu)函數(shù)清理對象 obj->~T(); //將釋放的對象頭插到自由鏈表 NextObj(obj) = _freeList; _freeList = obj; }
內(nèi)存池如何為我們申請對象?
當我們申請對象時,內(nèi)存池應(yīng)該優(yōu)先把還回來的內(nèi)存塊對象再次重復(fù)利用,因此如果自由鏈表當中有內(nèi)存塊的話,就直接從自由鏈表頭刪一個內(nèi)存塊進行返回即可。
如果自由鏈表當中沒有內(nèi)存塊,那么我們就在大塊內(nèi)存中切出定長的內(nèi)存塊進行返回,當內(nèi)存塊切出后及時更新_memory
指針的指向,以及_remainBytes
的值即可。
需要特別注意的是,由于當內(nèi)存塊釋放時我們需要將內(nèi)存塊鏈接到自由鏈表當中,因此我們必須保證切出來的對象至少能夠存儲得下一個地址,所以當對象的大小小于當前所在平臺指針的大小時,需要按指針的大小進行內(nèi)存塊的切分。
此外,當大塊內(nèi)存已經(jīng)不足以切分出一個對象時,我們就應(yīng)該調(diào)用我們封裝的SystemAlloc函數(shù),再次向堆申請一塊內(nèi)存空間,此時也要注意及時更新_memory
指針的指向,以及_remainBytes
的值。
//申請對象 T* New() { T* obj = nullptr; //優(yōu)先把還回來的內(nèi)存塊對象,再次重復(fù)利用 if (_freeList != nullptr) { //從自由鏈表頭刪一個對象 obj = (T*)_freeList; _freeList = NextObj(_freeList); } else { //保證對象能夠存儲得下地址 size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T); //剩余內(nèi)存不夠一個對象大小時,則重新開大塊空間 if (_remainBytes < objSize) { _remainBytes = 128 * 1024; _memory = (char*)SystemAlloc(_remainBytes >> 13); if (_memory == nullptr) { throw std::bad_alloc(); } } //從大塊內(nèi)存中切出objSize字節(jié)的內(nèi)存 obj = (T*)_memory; _memory += objSize; _remainBytes -= objSize; } //定位new,顯示調(diào)用T的構(gòu)造函數(shù)初始化 new(obj)T; return obj; }
需要注意的是,與釋放對象時需要顯示調(diào)用該對象的析構(gòu)函數(shù)一樣,當內(nèi)存塊切分出來后,我們也應(yīng)該使用定位new,顯示調(diào)用該對象的構(gòu)造函數(shù)對其進行初始化。
定長內(nèi)存池整體代碼如下:
//定長內(nèi)存池 template<class T> class ObjectPool { public: //申請對象 T* New() { T* obj = nullptr; //優(yōu)先把還回來的內(nèi)存塊對象,再次重復(fù)利用 if (_freeList != nullptr) { //從自由鏈表頭刪一個對象 obj = (T*)_freeList; _freeList = NextObj(_freeList); } else { //保證對象能夠存儲得下地址 size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T); //剩余內(nèi)存不夠一個對象大小時,則重新開大塊空間 if (_remainBytes < objSize) { _remainBytes = 128 * 1024; //_memory = (char*)malloc(_remainBytes); _memory = (char*)SystemAlloc(_remainBytes >> 13); if (_memory == nullptr) { throw std::bad_alloc(); } } //從大塊內(nèi)存中切出objSize字節(jié)的內(nèi)存 obj = (T*)_memory; _memory += objSize; _remainBytes -= objSize; } //定位new,顯示調(diào)用T的構(gòu)造函數(shù)初始化 new(obj)T; return obj; } //釋放對象 void Delete(T* obj) { //顯示調(diào)用T的析構(gòu)函數(shù)清理對象 obj->~T(); //將釋放的對象頭插到自由鏈表 NextObj(obj) = _freeList; _freeList = obj; } private: char* _memory = nullptr; //指向大塊內(nèi)存的指針 size_t _remainBytes = 0; //大塊內(nèi)存在切分過程中剩余字節(jié)數(shù) void* _freeList = nullptr; //還回來過程中鏈接的自由鏈表的頭指針 };
性能對比
下面我們將實現(xiàn)的定長內(nèi)存池和malloc/free進行性能對比,測試代碼如下:
struct TreeNode { int _val; TreeNode* _left; TreeNode* _right; TreeNode() :_val(0) , _left(nullptr) , _right(nullptr) {} }; void TestObjectPool() { // 申請釋放的輪次 const size_t Rounds = 3; // 每輪申請釋放多少次 const size_t N = 1000000; std::vector<TreeNode*> v1; v1.reserve(N); //malloc和free size_t begin1 = clock(); for (size_t j = 0; j < Rounds; ++j) { for (int i = 0; i < N; ++i) { v1.push_back(new TreeNode); } for (int i = 0; i < N; ++i) { delete v1[i]; } v1.clear(); } size_t end1 = clock(); //定長內(nèi)存池 ObjectPool<TreeNode> TNPool; std::vector<TreeNode*> v2; v2.reserve(N); size_t begin2 = clock(); for (size_t j = 0; j < Rounds; ++j) { for (int i = 0; i < N; ++i) { v2.push_back(TNPool.New()); } for (int i = 0; i < N; ++i) { TNPool.Delete(v2[i]); } v2.clear(); } size_t end2 = clock(); cout << "new cost time:" << end1 - begin1 << endl; cout << "object pool cost time:" << end2 - begin2 << endl; }
在代碼中,我們先用new申請若干個TreeNode對象,然后再用delete將這些對象再釋放,通過clock函數(shù)得到整個過程消耗的時間。(new和delete底層就是封裝的malloc和free)
然后再重復(fù)該過程,只不過將其中的new和delete替換為定長內(nèi)存池當中的New和Delete,此時再通過clock函數(shù)得到該過程消耗的時間。
可以看到在這個過程中,定長內(nèi)存池消耗的時間比malloc/free消耗的時間要短。這就是因為malloc是一個通用的內(nèi)存池,而定長內(nèi)存池是專門針對申請定長對象而設(shè)計的,因此在這種特殊場景下定長內(nèi)存池的效率更高,正所謂“尺有所短,寸有所長”。
高并發(fā)內(nèi)存池整體框架設(shè)計
該項目解決的是什么問題?
現(xiàn)代很多的開發(fā)環(huán)境都是多核多線程,因此在申請內(nèi)存的時,必然存在激烈的鎖競爭問題。malloc本身其實已經(jīng)很優(yōu)秀了,但是在并發(fā)場景下可能會因為頻繁的加鎖和解鎖導(dǎo)致效率有所降低,而該項目的原型tcmalloc實現(xiàn)的就是一種在多線程高并發(fā)場景下更勝一籌的內(nèi)存池。
在實現(xiàn)內(nèi)存池時我們一般需要考慮到效率問題和內(nèi)存碎片的問題,但對于高并發(fā)內(nèi)存池來說,我們還需要考慮在多線程環(huán)境下的鎖競爭問題。
高并發(fā)內(nèi)存池整體框架設(shè)計
高并發(fā)內(nèi)存池主要由以下三個部分構(gòu)成:
- thread cache: 線程緩存是每個線程獨有的,用于小于等于256KB的內(nèi)存分配,每個線程獨享一個thread cache。
- central cache: 中心緩存是所有線程所共享的,當thread cache需要內(nèi)存時會按需從central cache中獲取內(nèi)存,而當thread cache中的內(nèi)存滿足一定條件時,central cache也會在合適的時機對其進行回收。
- page cache: 頁緩存中存儲的內(nèi)存是以頁為單位進行存儲及分配的,當central cache需要內(nèi)存時,page cache會分配出一定數(shù)量的頁分配給central cache,而當central cache中的內(nèi)存滿足一定條件時,page cache也會在合適的時機對其進行回收,并將回收的內(nèi)存盡可能的進行合并,組成更大的連續(xù)內(nèi)存塊,緩解內(nèi)存碎片的問題。
進一步說明:
每個線程都有一個屬于自己的thread cache,也就意味著線程在thread cache申請內(nèi)存時是不需要加鎖的,而一次性申請大于256KB內(nèi)存的情況是很少的,因此大部分情況下申請內(nèi)存時都是無鎖的,這也就是這個高并發(fā)內(nèi)存池高效的地方。
每個線程的thread cache會根據(jù)自己的情況向central cache申請或歸還內(nèi)存,這就避免了出現(xiàn)單個線程的thread cache占用太多內(nèi)存,而其余thread cache出現(xiàn)內(nèi)存吃緊的問題。
多線程的thread cache可能會同時找central cache申請內(nèi)存,此時就會涉及線程安全的問題,因此在訪問central cache時是需要加鎖的,但central cache實際上是一個哈希桶的結(jié)構(gòu),只有當多個線程同時訪問同一個桶時才需要加鎖,所以這里的鎖競爭也不會很激烈。
各個部分的主要作用
thread cache主要解決鎖競爭的問題,每個線程獨享自己的thread cache,當自己的thread cache中有內(nèi)存時該線程不會去和其他線程進行競爭,每個線程只要在自己的thread cache申請內(nèi)存就行了。
central cache主要起到一個居中調(diào)度的作用,每個線程的thread cache需要內(nèi)存時從central cache獲取,而當thread cache的內(nèi)存多了就會將內(nèi)存還給central cache,其作用類似于一個中樞,因此取名為中心緩存。
page cache就負責提供以頁為單位的大塊內(nèi)存,當central cache需要內(nèi)存時就會去向page cache申請,而當page cache沒有內(nèi)存了就會直接去找系統(tǒng),也就是直接去堆上按頁申請內(nèi)存塊。
threadcache
threadcache整體設(shè)計
定長內(nèi)存池只支持固定大小內(nèi)存塊的申請釋放,因此定長內(nèi)存池中只需要一個自由鏈表管理釋放回來的內(nèi)存塊。現(xiàn)在我們要支持申請和釋放不同大小的內(nèi)存塊,那么我們就需要多個自由鏈表來管理釋放回來的內(nèi)存塊,因此thread cache實際上一個哈希桶結(jié)構(gòu),每個桶中存放的都是一個自由鏈表。
thread cache支持小于等于256KB內(nèi)存的申請,如果我們將每種字節(jié)數(shù)的內(nèi)存塊都用一個自由鏈表進行管理的話,那么此時我們就需要20多萬個自由鏈表,光是存儲這些自由鏈表的頭指針就需要消耗大量內(nèi)存,這顯然是得不償失的。
這時我們可以選擇做一些平衡的犧牲,讓這些字節(jié)數(shù)按照某種規(guī)則進行對齊,例如我們讓這些字節(jié)數(shù)都按照8字節(jié)進行向上對齊,那么thread cache的結(jié)構(gòu)就是下面這樣的,此時當線程申請1~8字節(jié)的內(nèi)存時會直接給出8字節(jié),而當線程申請9~16字節(jié)的內(nèi)存時會直接給出16字節(jié),以此類推。
因此當線程要申請某一大小的內(nèi)存塊時,就需要經(jīng)過某種計算得到對齊后的字節(jié)數(shù),進而找到對應(yīng)的哈希桶,如果該哈希桶中的自由鏈表中有內(nèi)存塊,那就從自由鏈表中頭刪一個內(nèi)存塊進行返回;如果該自由鏈表已經(jīng)為空了,那么就需要向下一層的central cache進行獲取了。
但此時由于對齊的原因,就可能會產(chǎn)生一些碎片化的內(nèi)存無法被利用,比如線程只申請了6字節(jié)的內(nèi)存,而thread cache卻直接給了8字節(jié)的內(nèi)存,這多給出的2字節(jié)就無法被利用,導(dǎo)致了一定程度的空間浪費,這些因為某些對齊原因?qū)е聼o法被利用的內(nèi)存,就是內(nèi)存碎片中的內(nèi)部碎片。
鑒于當前項目比較復(fù)雜,我們最好對自由鏈表這個結(jié)構(gòu)進行封裝,目前我們就提供Push和Pop兩個成員函數(shù),對應(yīng)的操作分別是將對象插入到自由鏈表(頭插)和從自由鏈表獲取一個對象(頭刪),后面在需要時還會添加對應(yīng)的成員函數(shù)。
//管理切分好的小對象的自由鏈表 class FreeList { public: //將釋放的對象頭插到自由鏈表 void Push(void* obj) { assert(obj); //頭插 NextObj(obj) = _freeList; _freeList = obj; } //從自由鏈表頭部獲取一個對象 void* Pop() { assert(_freeList); //頭刪 void* obj = _freeList; _freeList = NextObj(_freeList); return obj; } private: void* _freeList = nullptr; //自由鏈表 };
因此thread cache實際就是一個數(shù)組,數(shù)組中存儲的就是一個個的自由鏈表,至于這個數(shù)組中到底存儲了多少個自由鏈表,就需要看我們在進行字節(jié)數(shù)對齊時具體用的是什么映射對齊規(guī)則了。
threadcache哈希桶映射對齊規(guī)則
如何進行對齊?
上面已經(jīng)說了,不是每個字節(jié)數(shù)都對應(yīng)一個自由鏈表,這樣開銷太大了,因此我們需要制定一個合適的映射對齊規(guī)則。
首先,這些內(nèi)存塊是會被鏈接到自由鏈表上的,因此一開始肯定是按8字節(jié)進行對齊是最合適的,因為我們必須保證這些內(nèi)存塊,無論是在32位平臺下還是64位平臺下,都至少能夠存儲得下一個指針。
但如果所有的字節(jié)數(shù)都按照8字節(jié)進行對齊的話,那么我們就需要建立 256 × 1024 ÷ 8 = 32768 256\times1024\div8=32768 256×1024÷8=32768個桶,這個數(shù)量還是比較多的,實際上我們可以讓不同范圍的字節(jié)數(shù)按照不同的對齊數(shù)進行對齊,具體對齊方式如下:
字節(jié)數(shù) | 對齊數(shù) | 哈希桶下標 |
---|---|---|
[ [ [ 1 , 128 1,128 1,128 ] ] ] | 8 8 8 | [ [ [ 0 , 16 ) 0,16) 0,16) |
[ [ [ 128 + 1 , 1024 128+1,1024 128+1,1024 ] ] ] | 16 16 16 | [ [ [ 16 , 72 ) 16,72) 16,72) |
[ [ [ 1024 + 1 , 8 × 1024 1024+1,8\times1024 1024+1,8×1024 ] ] ] | 128 128 128 | [ [ [ 72 , 128 ) 72,128) 72,128) |
[ [ [ 8 × 1024 + 1 , 64 × 1024 8\times1024+1,64\times1024 8×1024+1,64×1024 ] ] ] | 1024 1024 1024 | [ [ [ 128 , 184 ) 128,184) 128,184) |
[ [ [ 64 × 1024 + 1 , 256 × 1024 64\times1024+1,256\times1024 64×1024+1,256×1024 ] ] ] | 8 × 1024 8\times1024 8×1024 | [ [ [ 184 , 208 ) 184,208) 184,208) |
空間浪費率
雖然對齊產(chǎn)生的內(nèi)碎片會引起一定程度的空間浪費,但按照上面的對齊規(guī)則,我們可以將浪費率控制到百分之十左右。需要說明的是,1~128這個區(qū)間我們不做討論,因為1字節(jié)就算是對齊到2字節(jié)也有百分之五十的浪費率,這里我們就從第二個區(qū)間開始進行計算。
根據(jù)上面的公式,我們要得到某個區(qū)間的最大浪費率,就應(yīng)該讓分子取到最大,讓分母取到最小。比如129~1024這個區(qū)間,該區(qū)域的對齊數(shù)是16,那么最大浪費的字節(jié)數(shù)就是15,而最小對齊后的字節(jié)數(shù)就是這個區(qū)間內(nèi)的前16個數(shù)所對齊到的字節(jié)數(shù),也就是144,那么該區(qū)間的最大浪費率也就是 15 ÷ 144 ≈ 10.42 % 15\div144\approx10.42\% 15÷144≈10.42%。同樣的道理,后面兩個區(qū)間的最大浪費率分別是 127 ÷ 1152 ≈ 11.02 % 127\div1152\approx11.02\% 127÷1152≈11.02%和 1023 ÷ 9216 ≈ 11.10 % 1023\div9216\approx11.10\% 1023÷9216≈11.10%。
對齊和映射相關(guān)函數(shù)的編寫
此時有了字節(jié)數(shù)的對齊規(guī)則后,我們就需要提供兩個對應(yīng)的函數(shù),分別用于獲取某一字節(jié)數(shù)對齊后的字節(jié)數(shù),以及該字節(jié)數(shù)對應(yīng)的哈希桶下標。關(guān)于處理對齊和映射的函數(shù),我們可以將其封裝到一個類當中。
//管理對齊和映射等關(guān)系 class SizeClass { public: //獲取向上對齊后的字節(jié)數(shù) static inline size_t RoundUp(size_t bytes); //獲取對應(yīng)哈希桶的下標 static inline size_t Index(size_t bytes); };
需要注意的是,SizeClass類當中的成員函數(shù)最好設(shè)置為靜態(tài)成員函數(shù),否則我們在調(diào)用這些函數(shù)時就需要通過對象去調(diào)用,并且對于這些可能會頻繁調(diào)用的函數(shù),可以考慮將其設(shè)置為內(nèi)聯(lián)函數(shù)。
在獲取某一字節(jié)數(shù)向上對齊后的字節(jié)數(shù)時,可以先判斷該字節(jié)數(shù)屬于哪一個區(qū)間,然后再通過調(diào)用一個子函數(shù)進行進一步處理。
//獲取向上對齊后的字節(jié)數(shù) static inline size_t RoundUp(size_t bytes) { if (bytes <= 128) { return _RoundUp(bytes, 8); } else if (bytes <= 1024) { return _RoundUp(bytes, 16); } else if (bytes <= 8 * 1024) { return _RoundUp(bytes, 128); } else if (bytes <= 64 * 1024) { return _RoundUp(bytes, 1024); } else if (bytes <= 256 * 1024) { return _RoundUp(bytes, 8 * 1024); } else { assert(false); return -1; } }
此時我們就需要編寫一個子函數(shù),該子函數(shù)需要通過對齊數(shù)計算出某一字節(jié)數(shù)對齊后的字節(jié)數(shù),最容易想到的就是下面這種寫法。
//一般寫法 static inline size_t _RoundUp(size_t bytes, size_t alignNum) { size_t alignSize = 0; if (bytes%alignNum != 0) { alignSize = (bytes / alignNum + 1)*alignNum; } else { alignSize = bytes; } return alignSize; }
除了上述寫法,我們還可以通過位運算的方式來進行計算,雖然位運算可能并沒有上面的寫法容易理解,但計算機執(zhí)行位運算的速度是比執(zhí)行乘法和除法更快的。
//位運算寫法 static inline size_t _RoundUp(size_t bytes, size_t alignNum) { return ((bytes + alignNum - 1)&~(alignNum - 1)); }
對于上述位運算,我們以10字節(jié)按8字節(jié)對齊為例進行分析。 8 − 1 = 7 8-1=7 8−1=7,7就是一個低三位為1其余位為0的二進制序列,我們將10與7相加,相當于將10字節(jié)當中不夠8字節(jié)的剩余字節(jié)數(shù)補上了。
然后我們再將該值與7按位取反后的值進行與運算,而7按位取反后是一個低三位為0其余位為1的二進制序列,該操作進行后相當于屏蔽了該值的低三位而該值的其余位保持不變,此時得到的值就是10字節(jié)按8字節(jié)對齊后的值,即16字節(jié)。
在獲取某一字節(jié)數(shù)對應(yīng)的哈希桶下標時,也是先判斷該字節(jié)數(shù)屬于哪一個區(qū)間,然后再通過調(diào)用一個子函數(shù)進行進一步處理。
//獲取對應(yīng)哈希桶的下標 static inline size_t Index(size_t bytes) { //每個區(qū)間有多少個自由鏈表 static size_t groupArray[4] = { 16, 56, 56, 56 }; if (bytes <= 128) { return _Index(bytes, 3); } else if (bytes <= 1024) { return _Index(bytes - 128, 4) + groupArray[0]; } else if (bytes <= 8 * 1024) { return _Index(bytes - 1024, 7) + groupArray[0] + groupArray[1]; } else if (bytes <= 64 * 1024) { return _Index(bytes - 8 * 1024, 10) + groupArray[0] + groupArray[1] + groupArray[2]; } else if (bytes <= 256 * 1024) { return _Index(bytes - 64 * 1024, 13) + groupArray[0] + groupArray[1] + groupArray[2] + groupArray[3]; } else { assert(false); return -1; } }
此時我們需要編寫一個子函數(shù)來繼續(xù)進行處理,容易想到的就是根據(jù)對齊數(shù)來計算某一字節(jié)數(shù)對應(yīng)的下標。
//一般寫法 static inline size_t _Index(size_t bytes, size_t alignNum) { size_t index = 0; if (bytes%alignNum != 0) { index = bytes / alignNum; } else { index = bytes / alignNum - 1; } return index; }
當然,為了提高效率下面也提供了一個用位運算來解決的方法,需要注意的是,此時我們并不是傳入該字節(jié)數(shù)的對齊數(shù),而是將對齊數(shù)寫成2的n次方的形式后,將這個n值進行傳入。比如對齊數(shù)是8,傳入的就是3。
//位運算寫法 static inline size_t _Index(size_t bytes, size_t alignShift) { return ((bytes + (1 << alignShift) - 1) >> alignShift) - 1; }
這里我們還是以10字節(jié)按8字節(jié)對齊為例進行分析。此時傳入的alignShift就是3,將1左移3位后得到的實際上就是對齊數(shù)8, 8 − 1 = 7 8-1=7 8−1=7,相當于我們還是讓10與7相加。
之后我們再將該值向右移3位,實際上就是讓這個值除以8,此時我們也是相當于屏蔽了該值二進制的低三位,因為除以8得到的值與其二進制的低三位無關(guān),所以我們可以說是將10對齊后的字節(jié)數(shù)除以了8,此時得到了2,而最后還需要減一是因為數(shù)組的下標是從0開始的。
ThreadCache類
按照上述的對齊規(guī)則,thread cache中桶的個數(shù),也就是自由鏈表的個數(shù)是208,以及thread cache允許申請的最大內(nèi)存大小256KB,我們可以將這些數(shù)據(jù)按照如下方式進行定義。
//小于等于MAX_BYTES,就找thread cache申請 //大于MAX_BYTES,就直接找page cache或者系統(tǒng)堆申請 static const size_t MAX_BYTES = 256 * 1024; //thread cache和central cache自由鏈表哈希桶的表大小 static const size_t NFREELISTS = 208;
現(xiàn)在就可以對ThreadCache類進行定義了,thread cache就是一個存儲208個自由鏈表的數(shù)組,目前thread cache就先提供一個Allocate函數(shù)用于申請對象就行了,后面需要時再進行增加。
class ThreadCache { public: //申請內(nèi)存對象 void* Allocate(size_t size); private: FreeList _freeLists[NFREELISTS]; //哈希桶 };
在thread cache申請對象時,通過所給字節(jié)數(shù)計算出對應(yīng)的哈希桶下標,如果桶中自由鏈表不為空,則從該自由鏈表中取出一個對象進行返回即可;但如果此時自由鏈表為空,那么我們就需要從central cache進行獲取了,這里的FetchFromCentralCache函數(shù)也是thread cache類中的一個成員函數(shù),在后面再進行具體實現(xiàn)。
//申請內(nèi)存對象 void* ThreadCache::Allocate(size_t size) { assert(size <= MAX_BYTES); size_t alignSize = SizeClass::RoundUp(size); size_t index = SizeClass::Index(size); if (!_freeLists[index].Empty()) { return _freeLists[index].Pop(); } else { return FetchFromCentralCache(index, alignSize); } }
threadcacheTLS無鎖訪問
每個線程都有一個自己獨享的thread cache,那應(yīng)該如何創(chuàng)建這個thread cache呢?我們不能將這個thread cache創(chuàng)建為全局的,因為全局變量是所有線程共享的,這樣就不可避免的需要鎖來控制,增加了控制成本和代碼復(fù)雜度。
要實現(xiàn)每個線程無鎖的訪問屬于自己的thread cache,我們需要用到線程局部存儲TLS(Thread Local Storage),這是一種變量的存儲方法,使用該存儲方法的變量在它所在的線程是全局可訪問的,但是不能被其他線程訪問到,這樣就保持了數(shù)據(jù)的線程獨立性。
//TLS - Thread Local Storage static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
但不是每個線程被創(chuàng)建時就立馬有了屬于自己的thread cache,而是當該線程調(diào)用相關(guān)申請內(nèi)存的接口時才會創(chuàng)建自己的thread cache,因此在申請內(nèi)存的函數(shù)中會包含以下邏輯。
//通過TLS,每個線程無鎖的獲取自己專屬的ThreadCache對象 if (pTLSThreadCache == nullptr) { pTLSThreadCache = new ThreadCache; }
centralcache
centralcache整體設(shè)計
當線程申請某一大小的內(nèi)存時,如果thread cache中對應(yīng)的自由鏈表不為空,那么直接取出一個內(nèi)存塊進行返回即可,但如果此時該自由鏈表為空,那么這時thread cache就需要向central cache申請內(nèi)存了。
central cache的結(jié)構(gòu)與thread cache是一樣的,它們都是哈希桶的結(jié)構(gòu),并且它們遵循的對齊映射規(guī)則都是一樣的。這樣做的好處就是,當thread cache的某個桶中沒有內(nèi)存了,就可以直接到central cache中對應(yīng)的哈希桶里去取內(nèi)存就行了。
central cache與thread cache的不同之處
central cache與thread cache有兩個明顯不同的地方,首先,thread cache是每個線程獨享的,而central cache是所有線程共享的,因為每個線程的thread cache沒有內(nèi)存了都會去找central cache,因此在訪問central cache時是需要加鎖的。
但central cache在加鎖時并不是將整個central cache全部鎖上了,central cache在加鎖時用的是桶鎖,也就是說每個桶都有一個鎖。此時只有當多個線程同時訪問central cache的同一個桶時才會存在鎖競爭,如果是多個線程同時訪問central cache的不同桶就不會存在鎖競爭。
central cache與thread cache的第二個不同之處就是,thread cache的每個桶中掛的是一個個切好的內(nèi)存塊,而central cache的每個桶中掛的是一個個的span。
每個span管理的都是一個以頁為單位的大塊內(nèi)存,每個桶里面的若干span是按照雙鏈表的形式鏈接起來的,并且每個span里面還有一個自由鏈表,這個自由鏈表里面掛的就是一個個切好了的內(nèi)存塊,根據(jù)其所在的哈希桶這些內(nèi)存塊被切成了對應(yīng)的大小。
centralcache結(jié)構(gòu)設(shè)計
頁號的類型?
每個程序運行起來后都有自己的進程地址空間,在32位平臺下,進程地址空間的大小是232;而在64位平臺下,進程地址空間的大小就是264。
頁的大小一般是4K或者8K,我們以8K為例。在32位平臺下,進程地址空間就可以被分成 2 32 ÷ 2 13 = 2 19 2^{32}\div2^{13}=2^{19} 232÷213=219個頁;在64位平臺下,進程地址空間就可以被分成 2 64 ÷ 2 13 = 2 51 2^{64}\div2^{13}=2^{51} 264÷213=251個頁。頁號本質(zhì)與地址是一樣的,它們都是一個編號,只不過地址是以一個字節(jié)為一個單位,而頁是以多個字節(jié)為一個單位。
由于頁號在64位平臺下的取值范圍是 [ [ [ 0 , 2 51 ) 0,2^{51}) 0,251),因此我們不能簡單的用一個無符號整型來存儲頁號,這時我們需要借助條件編譯來解決這個問題。
#ifdef _WIN64 typedef unsigned long long PAGE_ID; #elif _WIN32 typedef size_t PAGE_ID; #else //linux #endif
需要注意的是,在32位下,_WIN32有定義,_WIN64沒有定義;而在64位下,_WIN32和_WIN64都有定義。因此在條件編譯時,我們應(yīng)該先判斷_WIN64是否有定義,再判斷_WIN32是否有定義。
span的結(jié)構(gòu)
central cache的每個桶里掛的是一個個的span,span是一個管理以頁為單位的大塊內(nèi)存,span的結(jié)構(gòu)如下:
//管理以頁為單位的大塊內(nèi)存 struct Span { PAGE_ID _pageId = 0; //大塊內(nèi)存起始頁的頁號 size_t _n = 0; //頁的數(shù)量 Span* _next = nullptr; //雙鏈表結(jié)構(gòu) Span* _prev = nullptr; size_t _useCount = 0; //切好的小塊內(nèi)存,被分配給thread cache的計數(shù) void* _freeList = nullptr; //切好的小塊內(nèi)存的自由鏈表 };
對于span管理的以頁為單位的大塊內(nèi)存,我們需要知道這塊內(nèi)存具體在哪一個位置,便于之后page cache進行前后頁的合并,因此span結(jié)構(gòu)當中會記錄所管理大塊內(nèi)存起始頁的頁號。
至于每一個span管理的到底是多少個頁,這并不是固定的,需要根據(jù)多方面的因素來控制,因此span結(jié)構(gòu)當中有一個_n成員,該成員就代表著該span管理的頁的數(shù)量。
此外,每個span管理的大塊內(nèi)存,都會被切成相應(yīng)大小的內(nèi)存塊掛到當前span的自由鏈表中,比如8Byte哈希桶中的span,會被切成一個個8Byte大小的內(nèi)存塊掛到當前span的自由鏈表中,因此span結(jié)構(gòu)中需要存儲切好的小塊內(nèi)存的自由鏈表。
span結(jié)構(gòu)當中的_useCount成員記錄的就是,當前span中切好的小塊內(nèi)存,被分配給thread cache的計數(shù),當某個span的_useCount計數(shù)變?yōu)?時,代表當前span切出去的內(nèi)存塊對象全部還回來了,此時central cache就可以將這個span再還給page cache。
每個桶當中的span是以雙鏈表的形式組織起來的,當我們需要將某個span歸還給page cache時,就可以很方便的將該span從雙鏈表結(jié)構(gòu)中移出。如果用單鏈表結(jié)構(gòu)的話就比較麻煩了,因為單鏈表在刪除時,需要知道當前結(jié)點的前一個結(jié)點。
雙鏈表結(jié)構(gòu)
根據(jù)上面的描述,central cache的每個哈希桶里面存儲的都是一個雙鏈表結(jié)構(gòu),對于該雙鏈表結(jié)構(gòu)我們可以對其進行封裝。
//帶頭雙向循環(huán)鏈表 class SpanList { public: SpanList() { _head = new Span; _head->_next = _head; _head->_prev = _head; } void Insert(Span* pos, Span* newSpan) { assert(pos); assert(newSpan); Span* prev = pos->_prev; prev->_next = newSpan; newSpan->_prev = prev; newSpan->_next = pos; pos->_prev = newSpan; } void Erase(Span* pos) { assert(pos); assert(pos != _head); //不能刪除哨兵位的頭結(jié)點 Span* prev = pos->_prev; Span* next = pos->_next; prev->_next = next; next->_prev = prev; } private: Span* _head; public: std::mutex _mtx; //桶鎖 };
需要注意的是,從雙鏈表刪除的span會還給下一層的page cache,相當于只是把這個span從雙鏈表中移除,因此不需要對刪除的span進行delete操作。
central cache的結(jié)構(gòu)
central cache的映射規(guī)則和thread cache是一樣的,因此central cache里面哈希桶的個數(shù)也是208,但central cache每個哈希桶中存儲就是我們上面定義的雙鏈表結(jié)構(gòu)。
class CentralCache { public: //... private: SpanList _spanLists[NFREELISTS]; };
central cache和thread cache的映射規(guī)則一樣,有一個好處就是,當thread cache的某個桶沒有內(nèi)存了,就可以直接去central cache對應(yīng)的哈希桶進行申請就行了。
centralcache核心實現(xiàn)
central cache的實現(xiàn)方式
每個線程都有一個屬于自己的thread cache,我們是用TLS來實現(xiàn)每個線程無鎖的訪問屬于自己的thread cache的。而central cache和page cache在整個進程中只有一個,對于這種只能創(chuàng)建一個對象的類,我們可以將其設(shè)置為單例模式。
單例模式可以保證系統(tǒng)中該類只有一個實例,并提供一個訪問它的全局訪問點,該實例被所有程序模塊共享。單例模式又分為餓漢模式和懶漢模式,懶漢模式相對較復(fù)雜,我們這里使用餓漢模式就足夠了。
//單例模式 class CentralCache { public: //提供一個全局訪問點 static CentralCache* GetInstance() { return &_sInst; } private: SpanList _spanLists[NFREELISTS]; private: CentralCache() //構(gòu)造函數(shù)私有 {} CentralCache(const CentralCache&) = delete; //防拷貝 static CentralCache _sInst; };
為了保證CentralCache類只能創(chuàng)建一個對象,我們需要將central cache的構(gòu)造函數(shù)和拷貝構(gòu)造函數(shù)設(shè)置為私有,或者在C++11中也可以在函數(shù)聲明的后面加上=delete進行修飾。
CentralCache類當中還需要有一個CentralCache類型的靜態(tài)的成員變量,當程序運行起來后我們就立馬創(chuàng)建該對象,在此后的程序中就只有這一個單例了。
CentralCache CentralCache::_sInst;
最后central cache還需要提供一個公有的成員函數(shù),用于獲取該對象,此時在整個進程中就只會有一個central cache對象了。
慢開始反饋調(diào)節(jié)算法
當thread cache向central cache申請內(nèi)存時,central cache應(yīng)該給出多少個對象呢?這是一個值得思考的問題,如果central cache給的太少,那么thread cache在短時間內(nèi)用完了又會來申請;但如果一次性給的太多了,可能thread cache用不完也就浪費了。
鑒于此,我們這里采用了一個慢開始反饋調(diào)節(jié)算法。當thread cache向central cache申請內(nèi)存時,如果申請的是較小的對象,那么可以多給一點,但如果申請的是較大的對象,就可以少給一點。
通過下面這個函數(shù),我們就可以根據(jù)所需申請的對象的大小計算出具體給出的對象個數(shù),并且可以將給出的對象個數(shù)控制到2~512個之間。也就是說,就算thread cache要申請的對象再小,我最多一次性給出512個對象;就算thread cache要申請的對象再大,我至少一次性給出2個對象。
//管理對齊和映射等關(guān)系 class SizeClass { public: //thread cache一次從central cache獲取對象的上限 static size_t NumMoveSize(size_t size) { assert(size > 0); //對象越小,計算出的上限越高 //對象越大,計算出的上限越低 int num = MAX_BYTES / size; if (num < 2) num = 2; if (num > 512) num = 512; return num; } };
但就算申請的是小對象,一次性給出512個也是比較多的,基于這個原因,我們可以在FreeList結(jié)構(gòu)中增加一個叫做_maxSize的成員變量,該變量的初始值設(shè)置為1,并且提供一個公有成員函數(shù)用于獲取這個變量。也就是說,現(xiàn)在thread cache中的每個自由鏈表都會有一個自己的_maxSize。
//管理切分好的小對象的自由鏈表 class FreeList { public: size_t& MaxSize() { return _maxSize; } private: void* _freeList = nullptr; //自由鏈表 size_t _maxSize = 1; };
此時當thread cache申請對象時,我們會比較_maxSize和計算得出的值,取出其中的較小值作為本次申請對象的個數(shù)。此外,如果本次采用的是_maxSize的值,那么還會將thread cache中該自由鏈表的_maxSize的值進行加一。
因此,thread cache第一次向central cache申請某大小的對象時,申請到的都是一個,但下一次thread cache再向central cache申請同樣大小的對象時,因為該自由鏈表中的_maxSize增加了,最終就會申請到兩個。直到該自由鏈表中_maxSize的值,增長到超過計算出的值后就不會繼續(xù)增長了,此后申請到的對象個數(shù)就是計算出的個數(shù)。(這有點像網(wǎng)絡(luò)中擁塞控制的機制)
從中心緩存獲取對象
每次thread cache向central cache申請對象時,我們先通過慢開始反饋調(diào)節(jié)算法計算出本次應(yīng)該申請的對象的個數(shù),然后再向central cache進行申請。
如果thread cache最終申請到對象的個數(shù)就是一個,那么直接將該對象返回即可。為什么需要返回一個申請到的對象呢?因為thread cache要向central cache申請對象,其實由于某個線程向thread cache申請對象但thread cache當中沒有,這才導(dǎo)致thread cache要向central cache申請對象。因此central cache將對象返回給thread cache后,thread cache會再將該對象返回給申請對象的線程。
但如果thread cache最終申請到的是多個對象,那么除了將第一個對象返回之外,還需要將剩下的對象掛到thread cache對應(yīng)的哈希桶當中。
//從中心緩存獲取對象 void* ThreadCache::FetchFromCentralCache(size_t index, size_t size) { //慢開始反饋調(diào)節(jié)算法 //1、最開始不會一次向central cache一次批量要太多,因為要太多了可能用不完 //2、如果你不斷有size大小的內(nèi)存需求,那么batchNum就會不斷增長,直到上限 size_t batchNum = std::min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size)); if (batchNum == _freeLists[index].MaxSize()) { _freeLists[index].MaxSize() += 1; } void* start = nullptr; void* end = nullptr; size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size); assert(actualNum >= 1); //至少有一個 if (actualNum == 1) //申請到對象的個數(shù)是一個,則直接將這一個對象返回即可 { assert(start == end); return start; } else //申請到對象的個數(shù)是多個,還需要將剩下的對象掛到thread cache中對應(yīng)的哈希桶中 { _freeLists[index].PushRange(NextObj(start), end); return start; } }
從中心緩存獲取一定數(shù)量的對象
這里我們要從central cache獲取n個指定大小的對象,這些對象肯定都是從central cache對應(yīng)哈希桶的某個span中取出來的,因此取出來的這n個對象是鏈接在一起的,我們只需要得到這段鏈表的頭和尾即可,這里可以采用輸出型參數(shù)進行獲取。
//從central cache獲取一定數(shù)量的對象給thread cache size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t n, size_t size) { size_t index = SizeClass::Index(size); _spanLists[index]._mtx.lock(); //加鎖 //在對應(yīng)哈希桶中獲取一個非空的span Span* span = GetOneSpan(_spanLists[index], size); assert(span); //span不為空 assert(span->_freeList); //span當中的自由鏈表也不為空 //從span中獲取n個對象 //如果不夠n個,有多少拿多少 start = span->_freeList; end = span->_freeList; size_t actualNum = 1; while (NextObj(end)&&n - 1) { end = NextObj(end); actualNum++; n--; } span->_freeList = NextObj(end); //取完后剩下的對象繼續(xù)放到自由鏈表 NextObj(end) = nullptr; //取出的一段鏈表的表尾置空 span->_useCount += actualNum; //更新被分配給thread cache的計數(shù) _spanLists[index]._mtx.unlock(); //解鎖 return actualNum; }
由于central cache是所有線程共享的,所以我們在訪問central cache中的哈希桶時,需要先給對應(yīng)的哈希桶加上桶鎖,在獲取到對象后再將桶鎖解掉。
在向central cache獲取對象時,先是在central cache對應(yīng)的哈希桶中獲取到一個非空的span,然后從這個span的自由鏈表中取出n個對象即可,但可能這個非空的span的自由鏈表當中對象的個數(shù)不足n個,這時該自由鏈表當中有多少個對象就給多少就行了。
也就是說,thread cache實際從central cache獲得的對象的個數(shù)可能與我們傳入的n值是不一樣的,因此我們需要統(tǒng)計本次申請過程中,實際thread cache獲取到的對象個數(shù),然后根據(jù)該值及時更新這個span中的小對象被分配給thread cache的計數(shù)。
需要注意的是,雖然我們實際申請到對象的個數(shù)可能比n要小,但這并不會產(chǎn)生任何影響。因為thread cache的本意就是向central cache申請一個對象,我們之所以要一次多申請一些對象,是因為這樣一來下次線程再申請相同大小的對象時就可以直接在thread cache里面獲取了,而不用再向central cache申請對象。
插入一段范圍的對象到自由鏈表
此外,如果thread cache最終從central cache獲取到的對象個數(shù)是大于一的,那么我們還需要將剩下的對象插入到thread cache中對應(yīng)的哈希桶中,為了能讓自由鏈表支持插入一段范圍的對象,我們還需要在FreeList類中增加一個對應(yīng)的成員函數(shù)。
//管理切分好的小對象的自由鏈表 class FreeList { public: //插入一段范圍的對象到自由鏈表 void PushRange(void* start, void* end) { assert(start); assert(end); //頭插 NextObj(end) = _freeList; _freeList = start; } private: void* _freeList = nullptr; //自由鏈表 size_t _maxSize = 1; };
pagecache
pagecache整體設(shè)計
page cache與central cache結(jié)構(gòu)的相同之處
page cache與central cache一樣,它們都是哈希桶的結(jié)構(gòu),并且page cache的每個哈希桶中里掛的也是一個個的span,這些span也是按照雙鏈表的結(jié)構(gòu)鏈接起來的。
page cache與central cache結(jié)構(gòu)的不同之處
首先,central cache的映射規(guī)則與thread cache保持一致,而page cache的映射規(guī)則與它們都不相同。page cache的哈希桶映射規(guī)則采用的是直接定址法,比如1號桶掛的都是1頁的span,2號桶掛的都是2頁的span,以此類推。
其次,central cache每個桶中的span被切成了一個個對應(yīng)大小的對象,以供thread cache申請。而page cache當中的span是沒有被進一步切小的,因為page cache服務(wù)的是central cache,當central cache沒有span時,向page cache申請的是某一固定頁數(shù)的span,而如何切分申請到的這個span就應(yīng)該由central cache自己來決定。
至于page cache當中究竟有多少個桶,這就要看你最大想掛幾頁的span了,這里我們就最大掛128頁的span,為了讓桶號與頁號對應(yīng)起來,我們可以將第0號桶空出來不用,因此我們需要將哈希桶的個數(shù)設(shè)置為129。
//page cache中哈希桶的個數(shù) static const size_t NPAGES = 129;
為什么這里最大掛128頁的span呢?因為線程申請單個對象最大是256KB,而128頁可以被切成4個256KB的對象,因此是足夠的。當然,如果你想在page cache中掛更大的span也是可以的,根據(jù)具體的需求進行設(shè)置就行了。
在page cache獲取一個n頁的span的過程
如果central cache要獲取一個n頁的span,那我們就可以在page cache的第n號桶中取出一個span返回給central cache即可,但如果第n號桶中沒有span了,這時我們并不是直接轉(zhuǎn)而向堆申請一個n頁的span,而是要繼續(xù)在后面的桶當中尋找span。
直接向堆申請以頁為單位的內(nèi)存時,我們應(yīng)該盡量申請大塊一點的內(nèi)存塊,因為此時申請到的內(nèi)存是連續(xù)的,當線程需要內(nèi)存時我們可以將其切小后分配給線程,而當線程將內(nèi)存釋放后我們又可以將其合并成大塊的連續(xù)內(nèi)存。如果我們向堆申請內(nèi)存時是小塊小塊的申請的,那么我們申請到的內(nèi)存就不一定是連續(xù)的了。
因此,當?shù)趎號桶中沒有span時,我們可以繼續(xù)找第n+1號桶,因為我們可以將n+1頁的span切分成一個n頁的span和一個1頁的span,這時我們就可以將n頁的span返回,而將切分后1頁的span掛到1號桶中。但如果后面的桶當中都沒有span,這時我們就只能向堆申請一個128頁的內(nèi)存塊,并將其用一個span結(jié)構(gòu)管理起來,然后將128頁的span切分成n頁的span和128-n頁的span,其中n頁的span返回給central cache,而128-n頁的span就掛到第128-n號桶中。
也就是說,我們每次向堆申請的都是128頁大小的內(nèi)存塊,central cache要的這些span實際都是由128頁的span切分出來的。
page cache的實現(xiàn)方式
當每個線程的thread cache沒有內(nèi)存時都會向central cache申請,此時多個線程的thread cache如果訪問的不是central cache的同一個桶,那么這些線程是可以同時進行訪問的。這時central cache的多個桶就可能同時向page cache申請內(nèi)存的,所以page cache也是存在線程安全問題的,因此在訪問page cache時也必須要加鎖。
但是在page cache這里我們不能使用桶鎖,因為當central cache向page cache申請內(nèi)存時,page cache可能會將其他桶當中大頁的span切小后再給central cache。此外,當central cache將某個span歸還給page cache時,page cache也會嘗試將該span與其他桶當中的span進行合并。
也就是說,在訪問page cache時,我們可能需要訪問page cache中的多個桶,如果page cache用桶鎖就會出現(xiàn)大量頻繁的加鎖和解鎖,導(dǎo)致程序的效率低下。因此我們在訪問page cache時使用沒有使用桶鎖,而是用一個大鎖將整個page cache給鎖住。
而thread cache在訪問central cache時,只需要訪問central cache中對應(yīng)的哈希桶就行了,因為central cache的每個哈希桶中的span都被切分成了對應(yīng)大小,thread cache只需要根據(jù)自己所需對象的大小訪問central cache中對應(yīng)的哈希桶即可,不會訪問其他哈希桶,因此central cache可以用桶鎖。
此外,page cache在整個進程中也是只能存在一個的,因此我們也需要將其設(shè)置為單例模式。
//單例模式 class PageCache { public: //提供一個全局訪問點 static PageCache* GetInstance() { return &_sInst; } private: SpanList _spanLists[NPAGES]; std::mutex _pageMtx; //大鎖 private: PageCache() //構(gòu)造函數(shù)私有 {} PageCache(const PageCache&) = delete; //防拷貝 static PageCache _sInst; };
當程序運行起來后我們就立馬創(chuàng)建該對象即可。
PageCache PageCache::_sInst;
pagecache中獲取Span
獲取一個非空的span
thread cache向central cache申請對象時,central cache需要先從對應(yīng)的哈希桶中獲取到一個非空的span,然后從這個非空的span中取出若干對象返回給thread cache。那central cache到底是如何從對應(yīng)的哈希桶中,獲取到一個非空的span的呢?
首先當然是先遍歷central cache對應(yīng)哈希桶當中的雙鏈表,如果該雙鏈表中有非空的span,那么直接將該span進行返回即可。為了方便遍歷這個雙鏈表,我們可以模擬迭代器的方式,給SpanList類提供Begin和End成員函數(shù),分別用于獲取雙鏈表中的第一個span和最后一個span的下一個位置,也就是頭結(jié)點。
//帶頭雙向循環(huán)鏈表 class SpanList { public: Span* Begin() { return _head->_next; } Span* End() { return _head; } private: Span* _head; public: std::mutex _mtx; //桶鎖 };
但如果遍歷雙鏈表后發(fā)現(xiàn)雙鏈表中沒有span,或該雙鏈表中的span都為空,那么此時central cache就需要向page cache申請內(nèi)存塊了。
那具體是向page cache申請多大的內(nèi)存塊呢?我們可以根據(jù)具體所需對象的大小來決定,就像之前我們根據(jù)對象的大小計算出,thread cache一次向central cache申請對象的個數(shù)上限,現(xiàn)在我們是根據(jù)對象的大小計算出,central cache一次應(yīng)該向page cache申請幾頁的內(nèi)存塊。
我們可以先根據(jù)對象的大小計算出,thread cache一次向central cache申請對象的個數(shù)上限,然后將這個上限值乘以單個對象的大小,就算出了具體需要多少字節(jié),最后再將這個算出來的字節(jié)數(shù)轉(zhuǎn)換為頁數(shù),如果轉(zhuǎn)換后不夠一頁,那么我們就申請一頁,否則轉(zhuǎn)換出來是幾頁就申請幾頁。也就是說,central cache向page cache申請內(nèi)存時,要求申請到的內(nèi)存盡量能夠滿足thread cache向central cache申請時的上限。
//管理對齊和映射等關(guān)系 class SizeClass { public: //central cache一次向page cache獲取多少頁 static size_t NumMovePage(size_t size) { size_t num = NumMoveSize(size); //計算出thread cache一次向central cache申請對象的個數(shù)上限 size_t nPage = num*size; //num個size大小的對象所需的字節(jié)數(shù) nPage >>= PAGE_SHIFT; //將字節(jié)數(shù)轉(zhuǎn)換為頁數(shù) if (nPage == 0) //至少給一頁 nPage = 1; return nPage; } };
代碼中的PAGE_SHIFT
代表頁大小轉(zhuǎn)換偏移,我們這里以頁的大小為8K為例,PAGE_SHIFT
的值就是13。
//頁大小轉(zhuǎn)換偏移,即一頁定義為2^13,也就是8KB static const size_t PAGE_SHIFT = 13;
需要注意的是,當central cache申請到若干頁的span后,還需要將這個span切成一個個對應(yīng)大小的對象掛到該span的自由鏈表當中。
如何找到一個span所管理的內(nèi)存塊呢?首先需要計算出該span的起始地址,我們可以用這個span的起始頁號乘以一頁的大小即可得到這個span的起始地址,然后用這個span的頁數(shù)乘以一頁的大小就可以得到這個span所管理的內(nèi)存塊的大小,用起始地址加上內(nèi)存塊的大小即可得到這塊內(nèi)存塊的結(jié)束位置。
明確了這塊內(nèi)存的起始和結(jié)束位置后,我們就可以進行切分了。根據(jù)所需對象的大小,每次從大塊內(nèi)存切出一塊固定大小的內(nèi)存塊尾插到span的自由鏈表中即可。
為什么是尾插呢?因為我們?nèi)绻菍⑶泻玫膶ο笪膊宓阶杂涉湵恚@些對象看起來是按照鏈式結(jié)構(gòu)鏈接起來的,而實際它們在物理上是連續(xù)的,這時當我們把這些連續(xù)內(nèi)存分配給某個線程使用時,可以提高該線程的CPU緩存利用率。
//獲取一個非空的span Span* CentralCache::GetOneSpan(SpanList& spanList, size_t size) { //1、先在spanList中尋找非空的span Span* it = spanList.Begin(); while (it != spanList.End()) { if (it->_freeList != nullptr) { return it; } else { it = it->_next; } } //2、spanList中沒有非空的span,只能向page cache申請 Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size)); //計算span的大塊內(nèi)存的起始地址和大塊內(nèi)存的大?。ㄗ止?jié)數(shù)) char* start = (char*)(span->_pageId << PAGE_SHIFT); size_t bytes = span->_n << PAGE_SHIFT; //把大塊內(nèi)存切成size大小的對象鏈接起來 char* end = start + bytes; //先切一塊下來去做尾,方便尾插 span->_freeList = start; start += size; void* tail = span->_freeList; //尾插 while (start < end) { NextObj(tail) = start; tail = NextObj(tail); start += size; } NextObj(tail) = nullptr; //尾的指向置空 //將切好的span頭插到spanList spanList.PushFront(span); return span; }
需要注意的是,當我們把span切好后,需要將這個切好的span掛到central cache的對應(yīng)哈希桶中。因此SpanList類還需要提供一個接口,用于將一個span插入到該雙鏈表中。這里我們選擇的是頭插,這樣當central cache下一次從該雙鏈表中獲取非空span時,一來就能找到。
由于SpanList類之前實現(xiàn)了Insert和Begin函數(shù),這里實現(xiàn)雙鏈表頭插就非常簡單,直接在雙鏈表的Begin位置進行Insert即可。
//帶頭雙向循環(huán)鏈表 class SpanList { public: void PushFront(Span* span) { Insert(Begin(), span); } private: Span* _head; public: std::mutex _mtx; //桶鎖 };
獲取一個k頁的span
當我們調(diào)用上述的GetOneSpan從central cache的某個哈希桶獲取一個非空的span時,如果遍歷哈希桶中的雙鏈表后發(fā)現(xiàn)雙鏈表中沒有span,或該雙鏈表中的span都為空,那么此時central cache就需要向page cache申請若干頁的span了,下面我們就來說說如何從page cache獲取一個k頁的span。
因為page cache是直接按照頁數(shù)進行映射的,因此我們要從page cache獲取一個k頁的span,就應(yīng)該直接先去找page cache的第k號桶,如果第k號桶中有span,那我們直接頭刪一個span返回給central cache就行了。所以我們這里需要再給SpanList類添加對應(yīng)的Empty和PopFront函數(shù)。
//帶頭雙向循環(huán)鏈表 class SpanList { public: bool Empty() { return _head == _head->_next; } Span* PopFront() { Span* front = _head->_next; Erase(front); return front; } private: Span* _head; public: std::mutex _mtx; //桶鎖 };
如果page cache的第k號桶中沒有span,我們就應(yīng)該繼續(xù)找后面的桶,只要后面任意一個桶中有一個n頁span,我們就可以將其切分成一個k頁的span和一個n-k頁的span,然后將切出來k頁的span返回給central cache,再將n-k頁的span掛到page cache的第n-k號桶即可。
但如果后面的桶中也都沒有span,此時我們就需要向堆申請一個128頁的span了,在向堆申請內(nèi)存時,直接調(diào)用我們封裝的SystemAlloc函數(shù)即可。
需要注意的是,向堆申請內(nèi)存后得到的是這塊內(nèi)存的起始地址,此時我們需要將該地址轉(zhuǎn)換為頁號。由于我們向堆申請內(nèi)存時都是按頁進行申請的,因此我們直接將該地址除以一頁的大小即可得到對應(yīng)的頁號。
//獲取一個k頁的span Span* PageCache::NewSpan(size_t k) { assert(k > 0 && k < NPAGES); //先檢查第k個桶里面有沒有span if (!_spanLists[k].Empty()) { return _spanLists[k].PopFront(); } //檢查一下后面的桶里面有沒有span,如果有可以將其進行切分 for (size_t i = k + 1; i < NPAGES; i++) { if (!_spanLists[i].Empty()) { Span* nSpan = _spanLists[i].PopFront(); Span* kSpan = new Span; //在nSpan的頭部切k頁下來 kSpan->_pageId = nSpan->_pageId; kSpan->_n = k; nSpan->_pageId += k; nSpan->_n -= k; //將剩下的掛到對應(yīng)映射的位置 _spanLists[nSpan->_n].PushFront(nSpan); return kSpan; } } //走到這里說明后面沒有大頁的span了,這時就向堆申請一個128頁的span Span* bigSpan = new Span; void* ptr = SystemAlloc(NPAGES - 1); bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT; bigSpan->_n = NPAGES - 1; _spanLists[bigSpan->_n].PushFront(bigSpan); //盡量避免代碼重復(fù),遞歸調(diào)用自己 return NewSpan(k); }
這里說明一下,當我們向堆申請到128頁的span后,需要將其切分成k頁的span和128-k頁的span,但是為了盡量避免出現(xiàn)重復(fù)的代碼,我們最好不要再編寫對應(yīng)的切分代碼。我們可以先將申請到的128頁的span掛到page cache對應(yīng)的哈希桶中,然后再遞歸調(diào)用該函數(shù)就行了,此時在往后找span時就一定會在第128號桶中找到該span,然后進行切分。
這里其實有一個問題:當central cache向page cache申請內(nèi)存時,central cache對應(yīng)的哈希桶是處于加鎖的狀態(tài)的,那在訪問page cache之前我們應(yīng)不應(yīng)該把central cache對應(yīng)的桶鎖解掉呢?
這里建議在訪問page cache前,先把central cache對應(yīng)的桶鎖解掉。雖然此時central cache的這個桶當中是沒有內(nèi)存供其他thread cache申請的,但thread cache除了申請內(nèi)存還會釋放內(nèi)存,如果在訪問page cache前將central cache對應(yīng)的桶鎖解掉,那么此時當其他thread cache想要歸還內(nèi)存到central cache的這個桶時就不會被阻塞。
因此在調(diào)用NewSpan函數(shù)之前,我們需要先將central cache對應(yīng)的桶鎖解掉,然后再將page cache的大鎖加上,當申請到k頁的span后,我們需要將page cache的大鎖解掉,但此時我們不需要立刻獲取到central cache中對應(yīng)的桶鎖。因為central cache拿到k頁的span后還會對其進行切分操作,因此我們可以在span切好后需要將其掛到central cache對應(yīng)的桶上時,再獲取對應(yīng)的桶鎖。
這里為了讓代碼清晰一點,只寫出了加鎖和解鎖的邏輯,我們只需要將這些邏輯添加到之前實現(xiàn)的GetOneSpan函數(shù)的對應(yīng)位置即可。
spanList._mtx.unlock(); //解桶鎖 PageCache::GetInstance()->_pageMtx.lock(); //加大鎖 //從page cache申請k頁的span PageCache::GetInstance()->_pageMtx.unlock(); //解大鎖 //進行span的切分... spanList._mtx.lock(); //加桶鎖 //將span掛到central cache對應(yīng)的哈希桶
申請內(nèi)存過程聯(lián)調(diào)
ConcurrentAlloc函數(shù)
在將thread cache、central cache以及page cache的申請流程寫通了之后,我們就可以向外提供一個ConcurrentAlloc函數(shù),用于申請內(nèi)存塊。每個線程第一次調(diào)用該函數(shù)時會通過TLS獲取到自己專屬的thread cache對象,然后每個線程就可以通過自己對應(yīng)的thread cache申請對象了。
static void* ConcurrentAlloc(size_t size) { //通過TLS,每個線程無鎖的獲取自己專屬的ThreadCache對象 if (pTLSThreadCache == nullptr) { pTLSThreadCache = new ThreadCache; } return pTLSThreadCache->Allocate(size); }
這里說一下編譯時會出現(xiàn)的問題,在C++的algorithm頭文件中有一個min函數(shù),這是一個函數(shù)模板,而在Windows.h頭文件中也有一個min,這是一個宏。由于調(diào)用函數(shù)模板時需要進行參數(shù)類型的推演,因此當我們調(diào)用min函數(shù)時,編譯器會優(yōu)先匹配Windows.h當中以宏的形式實現(xiàn)的min,此時當我們以std::min的形式調(diào)用min函數(shù)時就會產(chǎn)生報錯,這就是沒有用命名空間進行封裝的壞處,這時我們只能選擇將std::去掉,讓編譯器調(diào)用Windows.h當中的min。
申請內(nèi)存過程聯(lián)調(diào)測試一
由于在多線程場景下調(diào)試觀察起來非常麻煩,這里就先不考慮多線程場景,看看在單線程場景下代碼的執(zhí)行邏輯是否符合我們的預(yù)期,其次,我們這里就只簡單觀察在一個桶當中的內(nèi)存申請就行了。
下面該線程進行了三次內(nèi)存申請,這三次內(nèi)存申請的字節(jié)數(shù)最終都對齊到了8,此時當線程申請內(nèi)存時就只會訪問到thread cache的第0號桶。
void* p1 = ConcurrentAlloc(6); void* p2 = ConcurrentAlloc(8); void* p3 = ConcurrentAlloc(1);
當線程第一次申請內(nèi)存時,該線程需要通過TLS獲取到自己專屬的thread cache對象,然后通過這個thread cache對象進行內(nèi)存申請。
在申請內(nèi)存時通過計算索引到了thread cache的第0號桶,但此時thread cache的第0號桶中是沒有對象的,因此thread cache需要向central cache申請內(nèi)存塊。
在向central cache申請內(nèi)存塊前,首先通過NumMoveSize函數(shù)計算得出,thread cache一次最多可向central cache申請8字節(jié)大小對象的個數(shù)是512,但由于我們采用的是慢開始算法,因此還需要將上限值與對應(yīng)自由鏈表的_maxSize的值進行比較,而此時對應(yīng)自由鏈表_maxSize的值是1,所以最終得出本次thread cache向central cache申請8字節(jié)對象的個數(shù)是1個。
并且在此之后會將該自由鏈表中_maxSize的值進行自增,下一次thread cache再向central cache申請8字節(jié)對象時最終申請對象的個數(shù)就會是2個了。
在thread cache向central cache申請對象之前,需要先將central cache的0號桶的鎖加上,然后再從該桶獲取一個非空的span。
在central cache的第0號桶獲取非空span時,先遍歷對應(yīng)的span雙鏈表,看看有沒有非空的span,但此時肯定是沒有的,因此在這個過程中我們無法找到一個非空的span。
那么此時central cache就需要向page cache申請內(nèi)存了,但在此之前需要先把central cache第0號桶的鎖解掉,然后再將page cache的大鎖給加上,之后才能向page cache申請內(nèi)存。
在向page cache申請內(nèi)存時,由于central cache一次給thread cache8字節(jié)對象的上限是512,對應(yīng)就需要4096字節(jié),所需字節(jié)數(shù)不足一頁就按一頁算,所以這里central cache就需要向page cache申請一頁的內(nèi)存塊。
但此時page cache的第1個桶以及之后的桶當中都是沒有span的,因此page cache需要直接向堆申請一個128頁的span。
這里通過監(jiān)視窗口可以看到,用于管理申請到的128頁內(nèi)存的span信息。
我們可以順便驗證一下,按頁向堆申請的內(nèi)存塊的起始地址和頁號之間是可以相互轉(zhuǎn)換的。
現(xiàn)在將申請到的128頁的span插入到page cache的第128號桶當中,然后再調(diào)用一次NewSpan,在這次調(diào)用的時候,雖然在1號桶當中沒有span,但是在往后找的過程中就一定會在第128號桶找到一個span。
此時我們就可以把這個128頁的span拿出來,切分成1頁的span和127頁的span,將1頁的span返回給central cache,而把127頁的span掛到page cache的第127號桶即可。
從page cache返回后,就可以把page cache的大鎖解掉了,但緊接著還要將獲取到的1頁的span進行切分,因此這里沒有立刻重新加上central cache對應(yīng)的桶鎖。
在進行切分的時候,先通過該span的起始頁號得到該span的起始地址,然后通過該span的頁數(shù)得到該span所管理內(nèi)存塊的總的字節(jié)數(shù)。
在確定內(nèi)存塊的開始和結(jié)束后,就可以將其切分成一個個8字節(jié)大小的對象掛到該span的自由鏈表中了。在調(diào)試過程中通過內(nèi)存監(jiān)視窗口可以看到,切分出來的每個8字節(jié)大小的對象的前四個字節(jié)存儲的都是下一個8字節(jié)對象的起始地址。
當切分結(jié)束后再獲取central cache第0號桶的桶鎖,然后將這個切好的span插入到central cache的第0號桶中,最后再將這個非空的span返回,此時就獲取到了一個非空的span。
由于thread cache只向central cache申請了一個對象,因此拿到這個非空的span后,直接從這個span里面取出一個對象即可,此時該span的_useCount也由0變成了1。
由于此時thread cache實際只向central cache申請到了一個對象,因此直接將這個對象返回給線程即可。
當線程第二次申請內(nèi)存塊時就不會再創(chuàng)建thread cache了,因為第一次申請時就已經(jīng)創(chuàng)建好了,此時該線程直接獲取到對應(yīng)的thread cache進行內(nèi)存塊申請即可。
當該線程第二次申請8字節(jié)大小的對象時,此時thread cache的0號桶中還是沒有對象的,因為第一次thread cache只向central cache申請了一個8字節(jié)對象,因此這次申請時還需要再向central cache申請對象。
這時thread cache向central cache申請對象時,thread cache第0號桶中自由鏈表的_maxSize已經(jīng)慢增長到2了,所以這次在向central cache申請對象時就會申請2個。如果下一次thread cache再向central cache申請8字節(jié)大小的對象,那么central cache會一次性給thread cache3個,這就是所謂的慢增長。
但由于第一次central cache向page cache申請了一頁的內(nèi)存塊,并將其切成了1024個8字節(jié)大小的對象,因此這次thread cache向central cache申請2兩個8字節(jié)的對象時,central cache的第0號桶當中是有對象的,直接返回兩個給thread cache即可,而不用再向page cache申請內(nèi)存了。
但線程實際申請的只是一個8字節(jié)對象,因此thread cache除了將一個對象返回之外,還需要將剩下的一個對象掛到thread cache的第0號桶當中。
這樣一來,當線程第三次申請1字節(jié)的內(nèi)存時,由于1字節(jié)對齊后也是8字節(jié),此時thread cache也就不需要再向central cache申請內(nèi)存塊了,直接將第0號桶當中之前剩下的一個8字節(jié)對象返回即可。
申請內(nèi)存過程聯(lián)調(diào)測試二
為了進一步測試代碼的正確性,我們可以做這樣一個測試:讓線程申請1024次8字節(jié)的對象,然后通過調(diào)試觀察在第1025次申請時,central cache是否會再向page cache申請內(nèi)存塊。
for (size_t i = 0; i < 1024; i++) { void* p1 = ConcurrentAlloc(6); } void* p2 = ConcurrentAlloc(6);
因為central cache第一次就是向page cache申請的一頁內(nèi)存,這一頁內(nèi)存被切成了1024個8字節(jié)大小的對象,當這1024個對象全部被申請之后,再申請8字節(jié)大小的對象時central cache當中就沒有對象了,此時就應(yīng)該向page cache申請內(nèi)存塊。
通過調(diào)試我們可以看到,第1025次申請8字節(jié)大小的對象時,central cache第0號桶中的這個span的_useCount已經(jīng)增加到了1024,也就是說這1024個對象都已經(jīng)被線程申請了,此時central cache就需要再向page cache申請一頁的span來進行切分了。
而這次central cache在向page cache申請一頁的內(nèi)存時,page cache就是將127頁span切分成了1頁的span和126頁的span了,然后central cache拿到這1頁的span后,又會將其切分成1024塊8字節(jié)大小的內(nèi)存塊以供thread cache申請。
threadcache回收內(nèi)存
當某個線程申請的對象不用了,可以將其釋放給thread cache,然后thread cache將該對象插入到對應(yīng)哈希桶的自由鏈表當中即可。
但是隨著線程不斷的釋放,對應(yīng)自由鏈表的長度也會越來越長,這些內(nèi)存堆積在一個thread cache中就是一種浪費,我們應(yīng)該將這些內(nèi)存還給central cache,這樣一來,這些內(nèi)存對其他線程來說也是可申請的,因此當thread cache某個桶當中的自由鏈表太長時我們可以進行一些處理。
如果thread cache某個桶當中自由鏈表的長度超過它一次批量向central cache申請的對象個數(shù),那么此時我們就要把該自由鏈表當中的這些對象還給central cache。
//釋放內(nèi)存對象 void ThreadCache::Deallocate(void* ptr, size_t size) { assert(ptr); assert(size <= MAX_BYTES); //找出對應(yīng)的自由鏈表桶將對象插入 size_t index = SizeClass::Index(size); _freeLists[index].Push(ptr); //當自由鏈表長度大于一次批量申請的對象個數(shù)時就開始還一段list給central cache if (_freeLists[index].Size() >= _freeLists[index].MaxSize()) { ListTooLong(_freeLists[index], size); } }
當自由鏈表的長度大于一次批量申請的對象時,我們具體的做法就是,從該自由鏈表中取出一次批量個數(shù)的對象,然后將取出的這些對象還給central cache中對應(yīng)的span即可。
//釋放對象導(dǎo)致鏈表過長,回收內(nèi)存到中心緩存 void ThreadCache::ListTooLong(FreeList& list, size_t size) { void* start = nullptr; void* end = nullptr; //從list中取出一次批量個數(shù)的對象 list.PopRange(start, end, list.MaxSize()); //將取出的對象還給central cache中對應(yīng)的span CentralCache::GetInstance()->ReleaseListToSpans(start, size); }
從上述代碼可以看出,F(xiàn)reeList類需要支持用Size函數(shù)獲取自由鏈表中對象的個數(shù),還需要支持用PopRange函數(shù)從自由鏈表中取出指定個數(shù)的對象。因此我們需要給FreeList類增加一個對應(yīng)的PopRange函數(shù),然后再增加一個_size成員變量,該成員變量用于記錄當前自由鏈表中對象的個數(shù),當我們向自由鏈表插入或刪除對象時,都應(yīng)該更新_size的值。
//管理切分好的小對象的自由鏈表 class FreeList { public: //將釋放的對象頭插到自由鏈表 void Push(void* obj) { assert(obj); //頭插 NextObj(obj) = _freeList; _freeList = obj; _size++; } //從自由鏈表頭部獲取一個對象 void* Pop() { assert(_freeList); //頭刪 void* obj = _freeList; _freeList = NextObj(_freeList); _size--; return obj; } //插入一段范圍的對象到自由鏈表 void PushRange(void* start, void* end, size_t n) { assert(start); assert(end); //頭插 NextObj(end) = _freeList; _freeList = start; _size += n; } //從自由鏈表獲取一段范圍的對象 void PopRange(void*& start, void*& end, size_t n) { assert(n <= _size); //頭刪 start = _freeList; end = start; for (size_t i = 0; i < n - 1;i++) { end = NextObj(end); } _freeList = NextObj(end); //自由鏈表指向end的下一個對象 NextObj(end) = nullptr; //取出的一段鏈表的表尾置空 _size -= n; } bool Empty() { return _freeList == nullptr; } size_t& MaxSize() { return _maxSize; } size_t Size() { return _size; } private: void* _freeList = nullptr; //自由鏈表 size_t _maxSize = 1; size_t _size = 0; };
而對于FreeList類當中的PushRange成員函數(shù),我們最好也像PopRange一樣給它增加一個參數(shù),表示插入對象的個數(shù),不然我們這時還需要通過遍歷統(tǒng)計插入對象的個數(shù)。
因此之前在調(diào)用PushRange的地方就需要修改一下,而我們實際就在一個地方調(diào)用過PushRange函數(shù),并且此時插入對象的個數(shù)也是很容易知道的。當時thread cache從central cache獲取了actualNum個對象,將其中的一個返回給了申請對象的線程,剩下的actualNum-1個掛到了thread cache對應(yīng)的桶當中,所以這里插入對象的個數(shù)就是actualNum-1。
_freeLists[index].PushRange(NextObj(start), end, actualNum - 1);
說明一下:
當thread cache的某個自由鏈表過長時,我們實際就是把這個自由鏈表當中全部的對象都還給central cache了,但這里在設(shè)計PopRange接口時還是設(shè)計的是取出指定個數(shù)的對象,因為在某些情況下當自由鏈表過長時,我們可能并不一定想把鏈表中全部的對象都取出來還給central cache,這樣設(shè)計就是為了增加代碼的可修改性。
其次,當我們判斷thread cache是否應(yīng)該還對象給central cache時,還可以綜合考慮每個thread cache整體的大小。比如當某個thread cache的總占用大小超過一定閾值時,我們就將該thread cache當中的對象還一些給central cache,這樣就盡量避免了某個線程的thread cache占用太多的內(nèi)存。對于這一點,在tcmalloc當中就是考慮到了的。
centralcache回收內(nèi)存
當thread cache中某個自由鏈表太長時,會將自由鏈表當中的這些對象還給central cache中的span。
但是需要注意的是,還給central cache的這些對象不一定都是屬于同一個span的。central cache中的每個哈希桶當中可能都不止一個span,因此當我們計算出還回來的對象應(yīng)該還給central cache的哪一個桶后,還需要知道這些對象到底應(yīng)該還給這個桶當中的哪一個span。
如何根據(jù)對象的地址得到對象所在的頁號?
首先我們必須理解的是,某個頁當中的所有地址除以頁的大小都等該頁的頁號。比如我們這里假設(shè)一頁的大小是100,那么地址0~99都屬于第0頁,它們除以100都等于0,而地址100~199都屬于第1頁,它們除以100都等于1。
如何找到一個對象對應(yīng)的span?
雖然我們現(xiàn)在可以通過對象的地址得到其所在的頁號,但是我們還是不能知道這個對象到底屬于哪一個span。因為一個span管理的可能是多個頁。
為了解決這個問題,我們可以建立頁號和span之間的映射。由于這個映射關(guān)系在page cache進行span的合并時也需要用到,因此我們直接將其存放到page cache里面。這時我們就需要在PageCache類當中添加一個映射關(guān)系了,這里可以用C++當中的unordered_map進行實現(xiàn),并且添加一個函數(shù)接口,用于讓central cache獲取這里的映射關(guān)系。(下面代碼中只展示了PageCache類當中新增的成員)
//單例模式class PageCache{<!--{C}%3C!%2D%2D%20%2D%2D%3E-->public://獲取從對象到span的映射Span* MapObjectToSpan(void* obj);private:std::unordered_map<PAGE_ID, Span*> _idSpanMap;};
每當page cache分配span給central cache時,都需要記錄一下頁號和span之間的映射關(guān)系。此后當thread cache還對象給central cache時,才知道應(yīng)該具體還給哪一個span。
因此當central cache在調(diào)用NewSpan接口向page cache申請k頁的span時,page cache在返回這個k頁的span給central cache之前,應(yīng)該建立這k個頁號與該span之間的映射關(guān)系。
//獲取一個k頁的span Span* PageCache::NewSpan(size_t k) { assert(k > 0 && k < NPAGES); //先檢查第k個桶里面有沒有span if (!_spanLists[k].Empty()) { Span* kSpan = _spanLists[k].PopFront(); //建立頁號與span的映射,方便central cache回收小塊內(nèi)存時查找對應(yīng)的span for (PAGE_ID i = 0; i < kSpan->_n; i++) { _idSpanMap[kSpan->_pageId + i] = kSpan; } return kSpan; } //檢查一下后面的桶里面有沒有span,如果有可以將其進行切分 for (size_t i = k + 1; i < NPAGES; i++) { if (!_spanLists[i].Empty()) { Span* nSpan = _spanLists[i].PopFront(); Span* kSpan = new Span; //在nSpan的頭部切k頁下來 kSpan->_pageId = nSpan->_pageId; kSpan->_n = k; nSpan->_pageId += k; nSpan->_n -= k; //將剩下的掛到對應(yīng)映射的位置 _spanLists[nSpan->_n].PushFront(nSpan); //建立頁號與span的映射,方便central cache回收小塊內(nèi)存時查找對應(yīng)的span for (PAGE_ID i = 0; i < kSpan->_n; i++) { _idSpanMap[kSpan->_pageId + i] = kSpan; } return kSpan; } } //走到這里說明后面沒有大頁的span了,這時就向堆申請一個128頁的span Span* bigSpan = new Span; void* ptr = SystemAlloc(NPAGES - 1); bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT; bigSpan->_n = NPAGES - 1; _spanLists[bigSpan->_n].PushFront(bigSpan); //盡量避免代碼重復(fù),遞歸調(diào)用自己 return NewSpan(k); }
此時我們就可以通過對象的地址找到該對象對應(yīng)的span了,直接將該對象的地址除以頁的大小得到頁號,然后在unordered_map當中找到其對應(yīng)的span即可。
//獲取從對象到span的映射 Span* PageCache::MapObjectToSpan(void* obj) { PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT; //頁號 auto ret = _idSpanMap.find(id); if (ret != _idSpanMap.end()) { return ret->second; } else { assert(false); return nullptr; } }
注意一下,當我們要通過某個頁號查找其對應(yīng)的span時,該頁號與其span之間的映射一定是建立過的,如果此時我們沒有在unordered_map當中找到,則說明我們之前的代碼邏輯有問題,因此當沒有找到對應(yīng)的span時可以直接用斷言結(jié)束程序,以表明程序邏輯出錯。
central cache回收內(nèi)存
這時當thread cache還對象給central cache時,就可以依次遍歷這些對象,將這些對象插入到其對應(yīng)span的自由鏈表當中,并且及時更新該span的_usseCount計數(shù)即可。
在thread cache還對象給central cache的過程中,如果central cache中某個span的_useCount減到0時,說明這個span分配出去的對象全部都還回來了,那么此時就可以將這個span再進一步還給page cache。
//將一定數(shù)量的對象還給對應(yīng)的span void CentralCache::ReleaseListToSpans(void* start, size_t size) { size_t index = SizeClass::Index(size); _spanLists[index]._mtx.lock(); //加鎖 while (start) { void* next = NextObj(start); //記錄下一個 Span* span = PageCache::GetInstance()->MapObjectToSpan(start); //將對象頭插到span的自由鏈表 NextObj(start) = span->_freeList; span->_freeList = start; span->_useCount--; //更新被分配給thread cache的計數(shù) if (span->_useCount == 0) //說明這個span分配出去的對象全部都回來了 { //此時這個span就可以再回收給page cache,page cache可以再嘗試去做前后頁的合并 _spanLists[index].Erase(span); span->_freeList = nullptr; //自由鏈表置空 span->_next = nullptr; span->_prev = nullptr; //釋放span給page cache時,使用page cache的鎖就可以了,這時把桶鎖解掉 _spanLists[index]._mtx.unlock(); //解桶鎖 PageCache::GetInstance()->_pageMtx.lock(); //加大鎖 PageCache::GetInstance()->ReleaseSpanToPageCache(span); PageCache::GetInstance()->_pageMtx.unlock(); //解大鎖 _spanLists[index]._mtx.lock(); //加桶鎖 } start = next; } _spanLists[index]._mtx.unlock(); //解鎖 }
需要注意,如果要把某個span還給page cache,我們需要先將這個span從central cache對應(yīng)的雙鏈表中移除,然后再將該span的自由鏈表置空,因為page cache中的span是不需要切分成一個個的小對象的,以及該span的前后指針也都應(yīng)該置空,因為之后要將其插入到page cache對應(yīng)的雙鏈表中。但span當中記錄的起始頁號以及它管理的頁數(shù)是不能清除的,否則對應(yīng)內(nèi)存塊就找不到了。
并且在central cache還span給page cache時也存在鎖的問題,此時需要先將central cache中對應(yīng)的桶鎖解掉,然后再加上page cache的大鎖之后才能進入page cache進行相關(guān)操作,當處理完畢回到central cache時,除了將page cache的大鎖解掉,還需要立刻獲得central cache對應(yīng)的桶鎖,然后將還未還完對象繼續(xù)還給central cache中對應(yīng)的span。
pagecache回收內(nèi)存
如果central cache中有某個span的_useCount減到0了,那么central cache就需要將這個span還給page cache了。
這個過程看似是非常簡單的,page cache只需將還回來的span掛到對應(yīng)的哈希桶上就行了。但實際為了緩解內(nèi)存碎片的問題,page cache還需要嘗試將還回來的span與其他空閑的span進行合并。
page cache進行前后頁的合并
合并的過程可以分為向前合并和向后合并。如果還回來的span的起始頁號是num,該span所管理的頁數(shù)是n。那么在向前合并時,就需要判斷第num-1頁對應(yīng)span是否空閑,如果空閑則可以將其進行合并,并且合并后還需要繼續(xù)向前嘗試進行合并,直到不能進行合并為止。而在向后合并時,就需要判斷第num+n頁對應(yīng)的span是否空閑,如果空閑則可以將其進行合并,并且合并后還需要繼續(xù)向后嘗試進行合并,直到不能進行合并為止。
因此page cache在合并span時,是需要通過頁號獲取到對應(yīng)的span的,這就是我們要把頁號與span之間的映射關(guān)系存儲到page cache的原因。
但需要注意的是,當我們通過頁號找到其對應(yīng)的span時,這個span此時可能掛在page cache,也可能掛在central cache。而在合并時我們只能合并掛在page cache的span,因為掛在central cache的span當中的對象正在被其他線程使用。
可是我們不能通過span結(jié)構(gòu)當中的_useCount成員,來判斷某個span到底是在central cache還是在page cache。因為當central cache剛向page cache申請到一個span時,這個span的_useCount就是等于0的,這時可能當我們正在對該span進行切分的時候,page cache就把這個span拿去進行合并了,這顯然是不合理的。
鑒于此,我們可以在span結(jié)構(gòu)中再增加一個_isUse成員,用于標記這個span是否正在被使用,而當一個span結(jié)構(gòu)被創(chuàng)建時我們默認該span是沒有被使用的。
//管理以頁為單位的大塊內(nèi)存 struct Span { PAGE_ID _pageId = 0; //大塊內(nèi)存起始頁的頁號 size_t _n = 0; //頁的數(shù)量 Span* _next = nullptr; //雙鏈表結(jié)構(gòu) Span* _prev = nullptr; size_t _useCount = 0; //切好的小塊內(nèi)存,被分配給thread cache的計數(shù) void* _freeList = nullptr; //切好的小塊內(nèi)存的自由鏈表 bool _isUse = false; //是否在被使用 };
因此當central cache向page cache申請到一個span時,需要立即將該span的_isUse改為true。
span->_isUse = true;
而當central cache將某個span還給page cache時,也就需要將該span的_isUse改成false。
span->_isUse = false;
由于在合并page cache當中的span時,需要通過頁號找到其對應(yīng)的span,而一個span是在被分配給central cache時,才建立的各個頁號與span之間的映射關(guān)系,因此page cache當中的span也需要建立頁號與span之間的映射關(guān)系。
與central cache中的span不同的是,在page cache中,只需建立一個span的首尾頁號與該span之間的映射關(guān)系。因為當一個span在嘗試進行合并時,如果是往前合并,那么只需要通過一個span的尾頁找到這個span,如果是向后合并,那么只需要通過一個span的首頁找到這個span。也就是說,在進行合并時我們只需要用到span與其首尾頁之間的映射關(guān)系就夠了。
因此當我們申請k頁的span時,如果是將n頁的span切成了一個k頁的span和一個n-k頁的span,我們除了需要建立k頁span中每個頁與該span之間的映射關(guān)系之外,還需要建立剩下的n-k頁的span與其首尾頁之間的映射關(guān)系。
//獲取一個k頁的span Span* PageCache::NewSpan(size_t k) { assert(k > 0 && k < NPAGES); //先檢查第k個桶里面有沒有span if (!_spanLists[k].Empty()) { Span* kSpan = _spanLists[k].PopFront(); //建立頁號與span的映射,方便central cache回收小塊內(nèi)存時查找對應(yīng)的span for (PAGE_ID i = 0; i < kSpan->_n; i++) { _idSpanMap[kSpan->_pageId + i] = kSpan; } return kSpan; } //檢查一下后面的桶里面有沒有span,如果有可以將其進行切分 for (size_t i = k + 1; i < NPAGES; i++) { if (!_spanLists[i].Empty()) { Span* nSpan = _spanLists[i].PopFront(); Span* kSpan = new Span; //在nSpan的頭部切k頁下來 kSpan->_pageId = nSpan->_pageId; kSpan->_n = k; nSpan->_pageId += k; nSpan->_n -= k; //將剩下的掛到對應(yīng)映射的位置 _spanLists[nSpan->_n].PushFront(nSpan); //存儲nSpan的首尾頁號與nSpan之間的映射,方便page cache合并span時進行前后頁的查找 _idSpanMap[nSpan->_pageId] = nSpan; _idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan; //建立頁號與span的映射,方便central cache回收小塊內(nèi)存時查找對應(yīng)的span for (PAGE_ID i = 0; i < kSpan->_n; i++) { _idSpanMap[kSpan->_pageId + i] = kSpan; } return kSpan; } } //走到這里說明后面沒有大頁的span了,這時就向堆申請一個128頁的span Span* bigSpan = new Span; void* ptr = SystemAlloc(NPAGES - 1); bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT; bigSpan->_n = NPAGES - 1; _spanLists[bigSpan->_n].PushFront(bigSpan); //盡量避免代碼重復(fù),遞歸調(diào)用自己 return NewSpan(k); }
此時page cache當中的span就都與其首尾頁之間建立了映射關(guān)系,現(xiàn)在我們就可以進行span的合并了,其合并邏輯如下:
//釋放空閑的span回到PageCache,并合并相鄰的span void PageCache::ReleaseSpanToPageCache(Span* span) { //對span的前后頁,嘗試進行合并,緩解內(nèi)存碎片問題 //1、向前合并 while (1) { PAGE_ID prevId = span->_pageId - 1; auto ret = _idSpanMap.find(prevId); //前面的頁號沒有(還未向系統(tǒng)申請),停止向前合并 if (ret == _idSpanMap.end()) { break; } //前面的頁號對應(yīng)的span正在被使用,停止向前合并 Span* prevSpan = ret->second; if (prevSpan->_isUse == true) { break; } //合并出超過128頁的span無法進行管理,停止向前合并 if (prevSpan->_n + span->_n > NPAGES - 1) { break; } //進行向前合并 span->_pageId = prevSpan->_pageId; span->_n += prevSpan->_n; //將prevSpan從對應(yīng)的雙鏈表中移除 _spanLists[prevSpan->_n].Erase(prevSpan); delete prevSpan; } //2、向后合并 while (1) { PAGE_ID nextId = span->_pageId + span->_n; auto ret = _idSpanMap.find(nextId); //后面的頁號沒有(還未向系統(tǒng)申請),停止向后合并 if (ret == _idSpanMap.end()) { break; } //后面的頁號對應(yīng)的span正在被使用,停止向后合并 Span* nextSpan = ret->second; if (nextSpan->_isUse == true) { break; } //合并出超過128頁的span無法進行管理,停止向后合并 if (nextSpan->_n + span->_n > NPAGES - 1) { break; } //進行向后合并 span->_n += nextSpan->_n; //將nextSpan從對應(yīng)的雙鏈表中移除 _spanLists[nextSpan->_n].Erase(nextSpan); delete nextSpan; } //將合并后的span掛到對應(yīng)的雙鏈表當中 _spanLists[span->_n].PushFront(span); //建立該span與其首尾頁的映射 _idSpanMap[span->_pageId] = span; _idSpanMap[span->_pageId + span->_n - 1] = span; //將該span設(shè)置為未被使用的狀態(tài) span->_isUse = false; }
需要注意的是,在向前或向后進行合并的過程中:
- 如果沒有通過頁號獲取到其對應(yīng)的span,說明對應(yīng)到該頁的內(nèi)存塊還未申請,此時需要停止合并。
- 如果通過頁號獲取到了其對應(yīng)的span,但該span處于被使用的狀態(tài),那我們也必須停止合并。
- 如果合并后大于128頁則不能進行本次合并,因為page cache無法對大于128頁的span進行管理。
在合并span時,由于這個span是在page cache的某個哈希桶的雙鏈表當中的,因此在合并后需要將其從對應(yīng)的雙鏈表中移除,然后再將這個被合并了的span結(jié)構(gòu)進行delete。
除此之外,在合并結(jié)束后,除了將合并后的span掛到page cache對應(yīng)哈希桶的雙鏈表當中,還需要建立該span與其首位頁之間的映射關(guān)系,便于此后合并出更大的span。
釋放內(nèi)存過程聯(lián)調(diào)
ConcurrentFree函數(shù)
至此我們將thread cache、central cache以及page cache的釋放流程也都寫完了,此時我們就可以向外提供一個ConcurrentFree函數(shù),用于釋放內(nèi)存塊,釋放內(nèi)存塊時每個線程通過自己的thread cache對象,調(diào)用thread cache中釋放內(nèi)存對象的接口即可。
static void ConcurrentFree(void* ptr, size_t size/*暫時*/) { assert(pTLSThreadCache); pTLSThreadCache->Deallocate(ptr, size); }
釋放內(nèi)存過程聯(lián)調(diào)測試
之前我們在測試申請流程時,讓單個線程進行了三次內(nèi)存申請,現(xiàn)在我們再將這三個對象再進行釋放,看看這其中的釋放流程是如何進行的。
void* p1 = ConcurrentAlloc(6); void* p2 = ConcurrentAlloc(8); void* p3 = ConcurrentAlloc(1); ConcurrentFree(p1, 6); ConcurrentFree(p2, 8); ConcurrentFree(p3, 1);
首先,這三次申請和釋放的對象大小進行對齊后都是8字節(jié),因此對應(yīng)操作的就是thread cache和central cache的第0號桶,以及page cache的第1號桶。
由于第三次對象申請時,剛好將thread cache第0號桶當中僅剩的一個對象拿走了,因此在三次對象申請后thread cache的第0號桶當中是沒有對象的。
通過監(jiān)視窗口可以看到,此時thread cache第0號桶中自由鏈表的_maxSize已經(jīng)慢增長到了3,而當我們釋放完第一個對象后,該自由鏈表當中對象的個數(shù)只有一個,因此不會將該自由鏈表當中的對象進一步還給central cache。
當?shù)诙€對象釋放給thread cache的第0號桶后,該桶對應(yīng)自由鏈表當中對象的個數(shù)變成了2,也是不會進行ListTooLong操作的。
直到第三個對象釋放給thread cache的第0號桶時,此時該自由鏈表的_size的值變?yōu)?,與_maxSize的值相等,現(xiàn)在thread cache就需要將對象給central cache了。
thread cache先是將第0號桶當中的對象彈出MaxSize個,在這里實際上就是全部彈出,此時該自由鏈表_size的值變?yōu)?,然后繼續(xù)調(diào)用central cache當中的ReleaseListToSpans函數(shù),將這三個對象還給central cache當中對應(yīng)的span。
在進入central cache的第0號桶還對象之前,先把第0號桶對應(yīng)的桶鎖加上,然后通過查page cache中的映射表找到其對應(yīng)的span,最后將這個對象頭插到該span的自由鏈表中,并將該span的_useCount進行--
。當?shù)谝粋€對象還給其對應(yīng)的span時,可以看到該span的_useCount減到了2。
而由于我們只進行了三次對象申請,并且這些對象大小對齊后大小都是8字節(jié),因此我們申請的這三個對象實際都是同一個span切分出來的。當我們將這三個對象都還給這個span時,該span的_useCount就減為了0。
現(xiàn)在central cache就需要將這個span進一步還給page cache,而在將該span交給page cache之前,會將該span的自由鏈表以及前后指針都置空。并且在進入page cache之前會先將central cache第0號桶的桶鎖解掉,然后再加上page cache的大鎖,之后才能進入page cache進行相關(guān)操作。
由于這個一頁的span是從128頁的span的頭部切下來的,在向前合并時由于前面的頁還未向系統(tǒng)申請,因此在查映射關(guān)系時是無法找到的,此時直接停止了向前合并。
(說明一下:由于下面是重新另外進行的一次調(diào)試,因此監(jiān)視窗口顯示的span的起始頁號與之前的不同,實際應(yīng)該與上面一致)
而在向后合并時,由于page cache沒有將該頁后面的頁分配給central cache,因此在向后合并時肯定能夠找到一個127頁的span進行合并。合并后就變成了一個128頁的span,這時我們將原來127頁的span從第127號桶刪除,然后還需要將該127頁的span結(jié)構(gòu)進行delete,因為它管理的127頁已經(jīng)與1頁的span進行合并了,不再需要它來管理了。
緊接著將這個128頁的span插入到第128號桶,然后建立該span與其首尾頁的映射,便于下次被用于合并,最后再將該span的狀態(tài)設(shè)置為未被使用的狀態(tài)即可。
當從page cache回來后,除了將page cache的大鎖解掉,還需要立刻加上central cache中對應(yīng)的桶鎖,然后繼續(xù)將對象還給central cache中的span,但此時實際上是還完了,因此再將central cache的桶鎖解掉就行了。
至此我們便完成了這三個對象的申請和釋放流程。
大于256KB的大塊內(nèi)存申請問題
申請過程
之前說到,每個線程的thread cache是用于申請小于等于256KB的內(nèi)存的,而對于大于256KB的內(nèi)存,我們可以考慮直接向page cache申請,但page cache中最大的頁也就只有128頁,因此如果是大于128頁的內(nèi)存申請,就只能直接向堆申請了。
申請內(nèi)存的大小 | 申請方式 |
---|---|
x ≤ 256 K B ( 32 頁 ) x \leq 256KB(32頁) x≤256KB(32頁) | 向thread cache申請 |
32 頁 < x ≤ 128 頁 32 頁< x \leq 128頁 32頁<x≤128頁 | 向page cache申請 |
x ≥ 128 頁 x \geq 128頁 x≥128頁 | 向堆申請 |
當申請的內(nèi)存大于256KB時,雖然不是從thread cache進行獲取,但在分配內(nèi)存時也是需要進行向上對齊的,對于大于256KB的內(nèi)存我們可以直接按頁進行對齊。 |
而我們之前實現(xiàn)RoundUp函數(shù)時,對傳入字節(jié)數(shù)大于256KB的情況直接做了斷言處理,因此這里需要對RoundUp函數(shù)稍作修改。
//獲取向上對齊后的字節(jié)數(shù) static inline size_t RoundUp(size_t bytes) { if (bytes <= 128) { return _RoundUp(bytes, 8); } else if (bytes <= 1024) { return _RoundUp(bytes, 16); } else if (bytes <= 8 * 1024) { return _RoundUp(bytes, 128); } else if (bytes <= 64 * 1024) { return _RoundUp(bytes, 1024); } else if (bytes <= 256 * 1024) { return _RoundUp(bytes, 8 * 1024); } else { //大于256KB的按頁對齊 return _RoundUp(bytes, 1 << PAGE_SHIFT); } }
現(xiàn)在對于之前的申請邏輯就需要進行修改了,當申請對象的大小大于256KB時,就不用向thread cache申請了,這時先計算出按頁對齊后實際需要申請的頁數(shù),然后通過調(diào)用NewSpan申請指定頁數(shù)的span即可。
static void* ConcurrentAlloc(size_t size) { if (size > MAX_BYTES) //大于256KB的內(nèi)存申請 { //計算出對齊后需要申請的頁數(shù) size_t alignSize = SizeClass::RoundUp(size); size_t kPage = alignSize >> PAGE_SHIFT; //向page cache申請kPage頁的span PageCache::GetInstance()->_pageMtx.lock(); Span* span = PageCache::GetInstance()->NewSpan(kPage); PageCache::GetInstance()->_pageMtx.unlock(); void* ptr = (void*)(span->_pageId << PAGE_SHIFT); return ptr; } else { //通過TLS,每個線程無鎖的獲取自己專屬的ThreadCache對象 if (pTLSThreadCache == nullptr) { pTLSThreadCache = new ThreadCache; } cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl; return pTLSThreadCache->Allocate(size); } }
也就是說,申請大于256KB的內(nèi)存時,會直接調(diào)用page cache當中的NewSpan函數(shù)進行申請,因此這里我們需要再對NewSpan函數(shù)進行改造,當需要申請的內(nèi)存頁數(shù)大于128頁時,就直接向堆申請對應(yīng)頁數(shù)的內(nèi)存塊。而如果申請的內(nèi)存頁數(shù)是小于128頁的,那就在page cache中進行申請,因此當申請大于256KB的內(nèi)存調(diào)用NewSpan函數(shù)時也是需要加鎖的,因為我們可能是在page cache中進行申請的。
//獲取一個k頁的span Span* PageCache::NewSpan(size_t k) { assert(k > 0); if (k > NPAGES - 1) //大于128頁直接找堆申請 { void* ptr = SystemAlloc(k); Span* span = new Span; span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT; span->_n = k; //建立頁號與span之間的映射 _idSpanMap[span->_pageId] = span; return span; } //先檢查第k個桶里面有沒有span if (!_spanLists[k].Empty()) { Span* kSpan = _spanLists[k].PopFront(); //建立頁號與span的映射,方便central cache回收小塊內(nèi)存時查找對應(yīng)的span for (PAGE_ID i = 0; i < kSpan->_n; i++) { _idSpanMap[kSpan->_pageId + i] = kSpan; } return kSpan; } //檢查一下后面的桶里面有沒有span,如果有可以將其進行切分 for (size_t i = k + 1; i < NPAGES; i++) { if (!_spanLists[i].Empty()) { Span* nSpan = _spanLists[i].PopFront(); Span* kSpan = new Span; //在nSpan的頭部切k頁下來 kSpan->_pageId = nSpan->_pageId; kSpan->_n = k; nSpan->_pageId += k; nSpan->_n -= k; //將剩下的掛到對應(yīng)映射的位置 _spanLists[nSpan->_n].PushFront(nSpan); //存儲nSpan的首尾頁號與nSpan之間的映射,方便page cache合并span時進行前后頁的查找 _idSpanMap[nSpan->_pageId] = nSpan; _idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan; //建立頁號與span的映射,方便central cache回收小塊內(nèi)存時查找對應(yīng)的span for (PAGE_ID i = 0; i < kSpan->_n; i++) { _idSpanMap[kSpan->_pageId + i] = kSpan; } return kSpan; } } //走到這里說明后面沒有大頁的span了,這時就向堆申請一個128頁的span Span* bigSpan = new Span; void* ptr = SystemAlloc(NPAGES - 1); bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT; bigSpan->_n = NPAGES - 1; _spanLists[bigSpan->_n].PushFront(bigSpan); //盡量避免代碼重復(fù),遞歸調(diào)用自己 return NewSpan(k); }
釋放過程
當釋放對象時,我們需要判斷釋放對象的大小:
釋放內(nèi)存的大小 | 釋放方式 |
---|---|
x ≤ 256 K B ( 32 頁 ) x \leq 256KB(32頁) x≤256KB(32頁) | 釋放給thread cache |
32 頁 < x ≤ 128 頁 32 頁< x \leq 128頁 32頁<x≤128頁 | 釋放給page cache |
x ≥ 128 頁 x \geq 128頁 x≥128頁 | 釋放給堆 |
因此當釋放對象時,我們需要先找到該對象對應(yīng)的span,但是在釋放對象時我們只知道該對象的起始地址。這也就是我們在申請大于256KB的內(nèi)存時,也要給申請到的內(nèi)存建立span結(jié)構(gòu),并建立起始頁號與該span之間的映射關(guān)系的原因。此時我們就可以通過釋放對象的起始地址計算出起始頁號,進而通過頁號找到該對象對應(yīng)的span。
static void ConcurrentFree(void* ptr, size_t size) { if (size > MAX_BYTES) //大于256KB的內(nèi)存釋放 { Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr); PageCache::GetInstance()->_pageMtx.lock(); PageCache::GetInstance()->ReleaseSpanToPageCache(span); PageCache::GetInstance()->_pageMtx.unlock(); } else { assert(pTLSThreadCache); pTLSThreadCache->Deallocate(ptr, size); } }
因此page cache在回收span時也需要進行判斷,如果該span的大小是小于等于128頁的,那么直接還給page cache進行了,page cache會嘗試對其進行合并。而如果該span的大小是大于128頁的,那么說明該span是直接向堆申請的,我們直接將這塊內(nèi)存釋放給堆,然后將這個span結(jié)構(gòu)進行delete就行了。
//釋放空閑的span回到PageCache,并合并相鄰的span void PageCache::ReleaseSpanToPageCache(Span* span) { if (span->_n > NPAGES - 1) //大于128頁直接釋放給堆 { void* ptr = (void*)(span->_pageId << PAGE_SHIFT); SystemFree(ptr); delete span; return; } //對span的前后頁,嘗試進行合并,緩解內(nèi)存碎片問題 //1、向前合并 while (1) { PAGE_ID prevId = span->_pageId - 1; auto ret = _idSpanMap.find(prevId); //前面的頁號沒有(還未向系統(tǒng)申請),停止向前合并 if (ret == _idSpanMap.end()) { break; } //前面的頁號對應(yīng)的span正在被使用,停止向前合并 Span* prevSpan = ret->second; if (prevSpan->_isUse == true) { break; } //合并出超過128頁的span無法進行管理,停止向前合并 if (prevSpan->_n + span->_n > NPAGES - 1) { break; } //進行向前合并 span->_pageId = prevSpan->_pageId; span->_n += prevSpan->_n; //將prevSpan從對應(yīng)的雙鏈表中移除 _spanLists[prevSpan->_n].Erase(prevSpan); delete prevSpan; } //2、向后合并 while (1) { PAGE_ID nextId = span->_pageId + span->_n; auto ret = _idSpanMap.find(nextId); //后面的頁號沒有(還未向系統(tǒng)申請),停止向后合并 if (ret == _idSpanMap.end()) { break; } //后面的頁號對應(yīng)的span正在被使用,停止向后合并 Span* nextSpan = ret->second; if (nextSpan->_isUse == true) { break; } //合并出超過128頁的span無法進行管理,停止向后合并 if (nextSpan->_n + span->_n > NPAGES - 1) { break; } //進行向后合并 span->_n += nextSpan->_n; //將nextSpan從對應(yīng)的雙鏈表中移除 _spanLists[nextSpan->_n].Erase(nextSpan); delete nextSpan; } //將合并后的span掛到對應(yīng)的雙鏈表當中 _spanLists[span->_n].PushFront(span); //建立該span與其首尾頁的映射 _idSpanMap[span->_pageId] = span; _idSpanMap[span->_pageId + span->_n - 1] = span; //將該span設(shè)置為未被使用的狀態(tài) span->_isUse = false; }
說明一下,直接向堆申請內(nèi)存時我們調(diào)用的接口是VirtualAlloc,與之對應(yīng)的將內(nèi)存釋放給堆的接口叫做VirtualFree,而Linux下的brk和mmap對應(yīng)的釋放接口叫做sbrk和unmmap。此時我們也可以將這些釋放接口封裝成一個叫做SystemFree的接口,當我們需要將內(nèi)存釋放給堆時直接調(diào)用SystemFree即可。
//直接將內(nèi)存還給堆 inline static void SystemFree(void* ptr) { #ifdef _WIN32 VirtualFree(ptr, 0, MEM_RELEASE); #else //linux下sbrk unmmap等 #endif }
簡單測試
下面我們對大于256KB的申請釋放流程進行簡單的測試:
//找page cache申請 void* p1 = ConcurrentAlloc(257 * 1024); //257KB ConcurrentFree(p1, 257 * 1024); //找堆申請 void* p2 = ConcurrentAlloc(129 * 8 * 1024); //129頁 ConcurrentFree(p2, 129 * 8 * 1024);
當申請257KB的內(nèi)存時,由于257KB的內(nèi)存按頁向上對齊后是33頁,并沒有大于128頁,因此不會直接向堆進行申請,會向page cache申請內(nèi)存,但此時page cache當中實際是沒有內(nèi)存的,最終page cache就會向堆申請一個128頁的span,將其切分成33頁的span和95頁的span,并將33頁的span進行返回。
而在釋放內(nèi)存時,由于該對象的大小大于了256KB,因此不會將其還給thread cache,而是直接調(diào)用的page cache當中的釋放接口。
由于該對象的大小是33頁,不大于128頁,因此page cache也不會直接將該對象還給堆,而是嘗試對其進行合并,最終就會把這個33頁的span和之前剩下的95頁的span進行合并,最終將合并后的128頁的span掛到第128號桶中。
當申請129頁的內(nèi)存時,由于是大于256KB的,于是還是調(diào)用的page cache對應(yīng)的申請接口,但此時申請的內(nèi)存同時也大于128頁,因此會直接向堆申請。在申請后還會建立該span與其起始頁號之間的映射,便于釋放時可以通過頁號找到該span。
在釋放內(nèi)存時,通過對象的地址找到其對應(yīng)的span,從span結(jié)構(gòu)中得知釋放內(nèi)存的大小大于128頁,于是會將該內(nèi)存直接還給堆。
使用定長內(nèi)存池配合脫離使用new
tcmalloc是要在高并發(fā)場景下替代malloc進行內(nèi)存申請的,因此tcmalloc在實現(xiàn)的時,其內(nèi)部是不能調(diào)用malloc函數(shù)的,我們當前的代碼中存在通過new獲取到的內(nèi)存,而new在底層實際上就是封裝了malloc。
為了完全脫離掉malloc函數(shù),此時我們之前實現(xiàn)的定長內(nèi)存池就起作用了,代碼中使用new時基本都是為Span結(jié)構(gòu)的對象申請空間,而span對象基本都是在page cache層創(chuàng)建的,因此我們可以在PageCache類當中定義一個_spanPool,用于span對象的申請和釋放。
//單例模式 class PageCache { public: //... private: ObjectPool<Span> _spanPool; };
然后將代碼中使用new的地方替換為調(diào)用定長內(nèi)存池當中的New函數(shù),將代碼中使用delete的地方替換為調(diào)用定長內(nèi)存池當中的Delete函數(shù)。
//申請span對象 Span* span = _spanPool.New(); //釋放span對象 _spanPool.Delete(span);
注意,當使用定長內(nèi)存池當中的New函數(shù)申請Span對象時,New函數(shù)通過定位new也是對Span對象進行了初始化的。
此外,每個線程第一次申請內(nèi)存時都會創(chuàng)建其專屬的thread cache,而這個thread cache目前也是new出來的,我們也需要對其進行替換。
//通過TLS,每個線程無鎖的獲取自己專屬的ThreadCache對象 if (pTLSThreadCache == nullptr) { static std::mutex tcMtx; static ObjectPool<ThreadCache> tcPool; tcMtx.lock(); pTLSThreadCache = tcPool.New(); tcMtx.unlock(); }
這里我們將用于申請ThreadCache類對象的定長內(nèi)存池定義為靜態(tài)的,保持全局只有一個,讓所有線程創(chuàng)建自己的thread cache時,都在個定長內(nèi)存池中申請內(nèi)存就行了。
但注意在從該定長內(nèi)存池中申請內(nèi)存時需要加鎖,防止多個線程同時申請自己的ThreadCache對象而導(dǎo)致線程安全問題。
最后在SpanList的構(gòu)造函數(shù)中也用到了new,因為SpanList是帶頭循環(huán)雙向鏈表,所以在構(gòu)造期間我們需要申請一個span對象作為雙鏈表的頭結(jié)點。
//帶頭雙向循環(huán)鏈表 class SpanList { public: SpanList() { _head = _spanPool.New(); _head->_next = _head; _head->_prev = _head; } private: Span* _head; static ObjectPool<Span> _spanPool; };
由于每個span雙鏈表只需要一個頭結(jié)點,因此將這個定長內(nèi)存池定義為靜態(tài)的,保持全局只有一個,讓所有span雙鏈表在申請頭結(jié)點時,都在一個定長內(nèi)存池中申請內(nèi)存就行了。
釋放對象時優(yōu)化為不傳對象大小
當我們使用malloc函數(shù)申請內(nèi)存時,需要指明申請內(nèi)存的大??;而當我們使用free函數(shù)釋放內(nèi)存時,只需要傳入指向這塊內(nèi)存的指針即可。
而我們目前實現(xiàn)的內(nèi)存池,在釋放對象時除了需要傳入指向該對象的指針,還需要傳入該對象的大小。
原因如下:
- 如果釋放的是大于256KB的對象,需要根據(jù)對象的大小來判斷這塊內(nèi)存到底應(yīng)該還給page cache,還是應(yīng)該直接還給堆。
- 如果釋放的是小于等于256KB的對象,需要根據(jù)對象的大小計算出應(yīng)該還給thread cache的哪一個哈希桶。
如果我們也想做到,在釋放對象時不用傳入對象的大小,那么我們就需要建立對象地址與對象大小之間的映射。由于現(xiàn)在可以通過對象的地址找到其對應(yīng)的span,而span的自由鏈表中掛的都是相同大小的對象。
因此我們可以在Span結(jié)構(gòu)中再增加一個_objSize成員,該成員代表著這個span管理的內(nèi)存塊被切成的一個個對象的大小。
//管理以頁為單位的大塊內(nèi)存 struct Span { PAGE_ID _pageId = 0; //大塊內(nèi)存起始頁的頁號 size_t _n = 0; //頁的數(shù)量 Span* _next = nullptr; //雙鏈表結(jié)構(gòu) Span* _prev = nullptr; size_t _objSize = 0; //切好的小對象的大小 size_t _useCount = 0; //切好的小塊內(nèi)存,被分配給thread cache的計數(shù) void* _freeList = nullptr; //切好的小塊內(nèi)存的自由鏈表 bool _isUse = false; //是否在被使用 };
而所有的span都是從page cache中拿出來的,因此每當我們調(diào)用NewSpan獲取到一個k頁的span時,就應(yīng)該將這個span的_objSize保存下來。
Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size)); span->_objSize = size;
代碼中有兩處,一處是在central cache中獲取非空span時,如果central cache對應(yīng)的桶中沒有非空的span,此時會調(diào)用NewSpan獲取一個k頁的span;另一處是當申請大于256KB內(nèi)存時,會直接調(diào)用NewSpan獲取一個k頁的span。
此時當我們釋放對象時,就可以直接從對象的span中獲取到該對象的大小,準確來說獲取到的是對齊以后的大小。
static void ConcurrentFree(void* ptr) { Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr); size_t size = span->_objSize; if (size > MAX_BYTES) //大于256KB的內(nèi)存釋放 { PageCache::GetInstance()->_pageMtx.lock(); PageCache::GetInstance()->ReleaseSpanToPageCache(span); PageCache::GetInstance()->_pageMtx.unlock(); } else { assert(pTLSThreadCache); pTLSThreadCache->Deallocate(ptr, size); } }
讀取映射關(guān)系時的加鎖問題
我們將頁號與span之間的映射關(guān)系是存儲在PageCache類當中的,當我們訪問這個映射關(guān)系時是需要加鎖的,因為STL容器是不保證線程安全的。
對于當前代碼來說,如果我們此時正在page cache進行相關(guān)操作,那么訪問這個映射關(guān)系是安全的,因為當進入page cache之前是需要加鎖的,因此可以保證此時只有一個線程在進行訪問。
但如果我們是在central cache訪問這個映射關(guān)系,或是在調(diào)用ConcurrentFree函數(shù)釋放內(nèi)存時訪問這個映射關(guān)系,那么就存在線程安全的問題。因為此時可能其他線程正在page cache當中進行某些操作,并且該線程此時可能也在訪問這個映射關(guān)系,因此當我們在page cache外部訪問這個映射關(guān)系時是需要加鎖的。
實際就是在調(diào)用page cache對外提供訪問映射關(guān)系的函數(shù)時需要加鎖,這里我們可以考慮使用C++當中的unique_lock,當然你也可以用普通的鎖。
//獲取從對象到span的映射 Span* PageCache::MapObjectToSpan(void* obj) { PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT; //頁號 std::unique_lock<std::mutex> lock(_pageMtx); //構(gòu)造時加鎖,析構(gòu)時自動解鎖 auto ret = _idSpanMap.find(id); if (ret != _idSpanMap.end()) { return ret->second; } else { assert(false); return nullptr; } }
多線程環(huán)境下對比malloc測試
之前我們只是對代碼進行了一些基礎(chǔ)的單元測試,下面我們在多線程場景下對比malloc進行測試。
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds) { std::vector<std::thread> vthread(nworks); std::atomic<size_t> malloc_costtime = 0; std::atomic<size_t> free_costtime = 0; for (size_t k = 0; k < nworks; ++k) { vthread[k] = std::thread([&, k]() { std::vector<void*> v; v.reserve(ntimes); for (size_t j = 0; j < rounds; ++j) { size_t begin1 = clock(); for (size_t i = 0; i < ntimes; i++) { v.push_back(malloc(16)); //v.push_back(malloc((16 + i) % 8192 + 1)); } size_t end1 = clock(); size_t begin2 = clock(); for (size_t i = 0; i < ntimes; i++) { free(v[i]); } size_t end2 = clock(); v.clear(); malloc_costtime += (end1 - begin1); free_costtime += (end2 - begin2); } }); } for (auto& t : vthread) { t.join(); } printf("%u個線程并發(fā)執(zhí)行%u輪次,每輪次malloc %u次: 花費:%u ms\n", nworks, rounds, ntimes, malloc_costtime); printf("%u個線程并發(fā)執(zhí)行%u輪次,每輪次free %u次: 花費:%u ms\n", nworks, rounds, ntimes, free_costtime); printf("%u個線程并發(fā)malloc&free %u次,總計花費:%u ms\n", nworks, nworks*rounds*ntimes, malloc_costtime + free_costtime); } void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds) { std::vector<std::thread> vthread(nworks); std::atomic<size_t> malloc_costtime = 0; std::atomic<size_t> free_costtime = 0; for (size_t k = 0; k < nworks; ++k) { vthread[k] = std::thread([&]() { std::vector<void*> v; v.reserve(ntimes); for (size_t j = 0; j < rounds; ++j) { size_t begin1 = clock(); for (size_t i = 0; i < ntimes; i++) { v.push_back(ConcurrentAlloc(16)); //v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1)); } size_t end1 = clock(); size_t begin2 = clock(); for (size_t i = 0; i < ntimes; i++) { ConcurrentFree(v[i]); } size_t end2 = clock(); v.clear(); malloc_costtime += (end1 - begin1); free_costtime += (end2 - begin2); } }); } for (auto& t : vthread) { t.join(); } printf("%u個線程并發(fā)執(zhí)行%u輪次,每輪次concurrent alloc %u次: 花費:%u ms\n", nworks, rounds, ntimes, malloc_costtime); printf("%u個線程并發(fā)執(zhí)行%u輪次,每輪次concurrent dealloc %u次: 花費:%u ms\n", nworks, rounds, ntimes, free_costtime); printf("%u個線程并發(fā)concurrent alloc&dealloc %u次,總計花費:%u ms\n", nworks, nworks*rounds*ntimes, malloc_costtime + free_costtime); } int main() { size_t n = 10000; cout << "==========================================================" << endl; BenchmarkConcurrentMalloc(n, 4, 10); cout << endl << endl; BenchmarkMalloc(n, 4, 10); cout << "==========================================================" << endl; return 0; }
其中測試函數(shù)各個參數(shù)的含義如下:
- ntimes:單輪次申請和釋放內(nèi)存的次數(shù)。
- nworks:線程數(shù)。
- rounds:輪次。
在測試函數(shù)中,我們通過clock函數(shù)分別獲取到每輪次申請和釋放所花費的時間,然后將其對應(yīng)累加到malloc_costtime和free_costtime上。最后我們就得到了,nworks個線程跑rounds輪,每輪申請和釋放ntimes次,這個過程申請所消耗的時間、釋放所消耗的時間、申請和釋放總共消耗的時間。
注意,我們創(chuàng)建線程時讓線程執(zhí)行的是lambda表達式,而我們這里在使用lambda表達式時,以值傳遞的方式捕捉了變量k,以引用傳遞的方式捕捉了其他父作用域中的變量,因此我們可以將各個線程消耗的時間累加到一起。
我們將所有線程申請內(nèi)存消耗的時間都累加到malloc_costtime上, 將釋放內(nèi)存消耗的時間都累加到free_costtime上,此時malloc_costtime和free_costtime可能被多個線程同時進行累加操作的,所以存在線程安全的問題。鑒于此,我們在定義這兩個變量時使用了atomic類模板,這時對它們的操作就是原子操作了。
固定大小內(nèi)存的申請和釋放
我們先來測試一下固定大小內(nèi)存的申請和釋放:
v.push_back(malloc(16)); v.push_back(ConcurrentAlloc(16));
此時4個線程執(zhí)行10輪操作,每輪申請釋放10000次,總共申請釋放了40萬次,運行后可以看到,malloc的效率還是更高的。
由于此時我們申請釋放的都是固定大小的對象,每個線程申請釋放時訪問的都是各自thread cache的同一個桶,當thread cache的這個桶中沒有對象或?qū)ο筇嘁獨w還時,也都會訪問central cache的同一個桶。此時central cache中的桶鎖就不起作用了,因為我們讓central cache使用桶鎖的目的就是為了,讓多個thread cache可以同時訪問central cache的不同桶,而此時每個thread cache訪問的卻都是central cache中的同一個桶。
不同大小內(nèi)存的申請和釋放
下面我們再來測試一下不同大小內(nèi)存的申請和釋放:
v.push_back(malloc((16 + i) % 8192 + 1)); v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
運行后可以看到,由于申請和釋放內(nèi)存的大小是不同的,此時central cache當中的桶鎖就起作用了,ConcurrentAlloc的效率也有了較大增長,但相比malloc來說還是差一點點。
復(fù)雜問題的調(diào)試技巧
多線程調(diào)試比單線程調(diào)試要復(fù)雜得多,調(diào)試時各個線程之間會相互切換,并且每次調(diào)試切換的時機也是不固定的,這就使得調(diào)試過程變得非常難以控制。
下面給出三個調(diào)試時的小技巧:
1、條件斷點
一般情況下我們可以直接運行程序,通過報錯來查找問題。如果此時報的是斷言錯誤,那么我們可以直接定位到報錯的位置,然后將此處的斷言改為與斷言條件相反的if判斷,在if語句里面打上一個斷點,但注意空語句是無法打斷點的,這時我們隨便在if里面加上一句代碼就可以打斷點了。
此外,條件斷點也可以通過右擊普通斷點來進行設(shè)置。
右擊后即可設(shè)置相應(yīng)的條件,程序運行到此處時如果滿足該條件則會停下來。
運行到條件斷點處后,我們就可以對當前程序進行進一步分析,找出斷言錯誤的被觸發(fā)原因。
2、查看函數(shù)棧幀
當程序運行到斷點處時,我們需要對當前位置進行分析,如果檢查后發(fā)現(xiàn)當前函數(shù)是沒有問題的,這時可能需要回到調(diào)用該函數(shù)的地方進行進一步分析,此時我們可以依次點擊“調(diào)試→窗口→調(diào)用堆棧”。
此時我們就可以看到當前函數(shù)棧幀的調(diào)用情況,其中黃色箭頭指向的是當前所在的函數(shù)棧幀。
雙擊函數(shù)棧幀中的其他函數(shù),就可以跳轉(zhuǎn)到該函數(shù)對應(yīng)的棧幀,此時淺灰色箭頭指向的就是當前跳轉(zhuǎn)到的函數(shù)棧幀。
需要注意的是,監(jiān)視窗口只能查看當前棧幀中的變量。如果要查看此時其他函數(shù)棧幀中變量的情況,就可以通過函數(shù)棧幀跳轉(zhuǎn)來查看。
3、疑似死循環(huán)時中斷程序
當你在某個地方設(shè)置斷點后,如果遲遲沒有運行到斷點處,而程序也沒有崩潰,這時有可能是程序進入到某個死循環(huán)了。
這時我們可以依次點擊“調(diào)試→全部中斷”。
這時程序就會在當前運行的地方停下來。
性能瓶頸分析
經(jīng)過前面的測試可以看到,我們的代碼此時與malloc之間還是有差距的,此時我們就應(yīng)該分析分析我們當前項目的瓶頸在哪里,但這不能簡單的憑感覺,我們應(yīng)該用性能分析的工具來進行分析。
VS編譯器下性能分析的操作步驟
VS編譯器中就帶有性能分析的工具的,我們可以依次點擊“調(diào)試→性能和診斷”進行性能分析,注意該操作要在Debug模式下進行。
同時我們將代碼中n的值由10000調(diào)成了1000,否則該分析過程可能會花費較多時間,并且將malloc的測試代碼進行了屏蔽,因為我們要分析的是我們實現(xiàn)的高并發(fā)內(nèi)存池。
int main() { size_t n = 1000; cout << "==========================================================" << endl; BenchmarkConcurrentMalloc(n, 4, 10); cout << endl << endl; //BenchmarkMalloc(n, 4, 10); cout << "==========================================================" << endl; return 0; }
在點擊了“調(diào)試→性能和診斷”后會彈出一個提示框,我們直接點擊“開始”進行了。
然后會彈出一個選項框,這里我們選擇的是第二個,因為我們要分析的是各個函數(shù)的用時時間,然后點擊下一步。
出現(xiàn)以下選項框繼續(xù)點擊下一步。
最后點擊完成,就可以等待分析結(jié)果了。
分析性能瓶頸
通過分析結(jié)果可以看到,光是Deallocate和MapObjectToSpan這兩個函數(shù)就占用了一半多的時間。
而在Deallocate函數(shù)中,調(diào)用ListTooLong函數(shù)時消耗的時間是最多的。
繼續(xù)往下看,在ListTooLong函數(shù)中,調(diào)用ReleaseListToSpans函數(shù)時消耗的時間是最多的。
再進一步看,在ReleaseListToSpans函數(shù)中,調(diào)用MapObjectToSpan函數(shù)時消耗的時間是最多的。
也就是說,最終消耗時間最多的實際就是MapObjectToSpan函數(shù),我們這時再來看看為什么調(diào)用MapObjectToSpan函數(shù)會消耗這么多時間。通過觀察我們最終發(fā)現(xiàn),調(diào)用該函數(shù)時會消耗這么多時間就是因為鎖的原因。
因此當前項目的瓶頸點就在鎖競爭上面,需要解決調(diào)用MapObjectToSpan函數(shù)訪問映射關(guān)系時的加鎖問題。tcmalloc當中針對這一點使用了基數(shù)樹進行優(yōu)化,使得在讀取這個映射關(guān)系時可以做到不加鎖。
針對性能瓶頸使用基數(shù)樹進行優(yōu)化
基數(shù)樹實際上就是一個分層的哈希表,根據(jù)所分層數(shù)不同可分為單層基數(shù)樹、二層基數(shù)樹、三層基數(shù)樹等。
單層基數(shù)樹
單層基數(shù)樹實際采用的就是直接定址法,每一個頁號對應(yīng)span的地址就存儲數(shù)組中在以該頁號為下標的位置。
最壞的情況下我們需要建立所有頁號與其span之間的映射關(guān)系,因此這個數(shù)組中元素個數(shù)應(yīng)該與頁號的數(shù)目相同,數(shù)組中每個位置存儲的就是對應(yīng)span的指針。
//單層基數(shù)樹 template <int BITS> class TCMalloc_PageMap1 { public: typedef uintptr_t Number; explicit TCMalloc_PageMap1() { size_t size = sizeof(void*) << BITS; //需要開辟數(shù)組的大小 size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT); //按頁對齊后的大小 array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT); //向堆申請空間 memset(array_, 0, size); //對申請到的內(nèi)存進行清理 } void* get(Number k) const { if ((k >> BITS) > 0) //k的范圍不在[0, 2^BITS-1] { return NULL; } return array_[k]; //返回該頁號對應(yīng)的span } void set(Number k, void* v) { assert((k >> BITS) == 0); //k的范圍必須在[0, 2^BITS-1] array_[k] = v; //建立映射 } private: void** array_; //存儲映射關(guān)系的數(shù)組 static const int LENGTH = 1 << BITS; //頁的數(shù)目 };
此時當我們需要建立映射時就調(diào)用set函數(shù),需要讀取映射關(guān)系時,就調(diào)用get函數(shù)就行了。
代碼中的非類型模板參數(shù)BITS表示存儲頁號最多需要比特位的個數(shù)。在32位下我們傳入的是32-PAGE_SHIFT,在64位下傳入的是64-PAGE_SHIFT。而其中的LENGTH成員代表的就是頁號的數(shù)目,即 2 B I T S 2^{BITS} 2BITS。
比如32位平臺下,以一頁大小為8K為例,此時頁的數(shù)目就是 2 32 ÷ 2 13 = 2 19 2^{32}\div2^{13}=2^{19} 232÷213=219,因此存儲頁號最多需要19個比特位,此時傳入非類型模板參數(shù)的值就是 32 − 13 = 19 32-13=19 32−13=19。由于32位平臺下指針的大小是4字節(jié),因此該數(shù)組的大小就是 2 19 × 4 = 2 21 = 2 M 2^{19}\times4=2^{21}=2M 219×4=221=2M,內(nèi)存消耗不大,是可行的。但如果是在64位平臺下,此時該數(shù)組的大小是 2 51 × 8 = 2 54 = 2 24 G 2^{51}\times8=2^{54}=2^{24}G 251×8=254=224G,這顯然是不可行的,實際上對于64位的平臺,我們需要使用三層基數(shù)樹。
二層基數(shù)樹
這里還是以32位平臺下,一頁的大小為8K為例來說明,此時存儲頁號最多需要19個比特位。而二層基數(shù)樹實際上就是把這19個比特位分為兩次進行映射。
比如用前5個比特位在基數(shù)樹的第一層進行映射,映射后得到對應(yīng)的第二層,然后用剩下的比特位在基數(shù)樹的第二層進行映射,映射后最終得到該頁號對應(yīng)的span指針。
在二層基數(shù)樹中,第一層的數(shù)組占用 2 5 × 4 = 2 7 B y t e 2^{5}\times4=2^{7}Byte 25×4=27Byte空間,第二層的數(shù)組最多占用 2 5 × 2 14 × 4 = 2 21 = 2 M 2^{5}\times2^{14}\times4=2^{21}=2M 25×214×4=221=2M。二層基數(shù)樹相比一層基數(shù)樹的好處就是,一層基數(shù)樹必須一開始就把 2 M 2M 2M的數(shù)組開辟出來,而二層基數(shù)樹一開始時只需將第一層的數(shù)組開辟出來,當需要進行某一頁號映射時再開辟對應(yīng)的第二層的數(shù)組就行了。
//二層基數(shù)樹 template <int BITS> class TCMalloc_PageMap2 { private: static const int ROOT_BITS = 5; //第一層對應(yīng)頁號的前5個比特位 static const int ROOT_LENGTH = 1 << ROOT_BITS; //第一層存儲元素的個數(shù) static const int LEAF_BITS = BITS - ROOT_BITS; //第二層對應(yīng)頁號的其余比特位 static const int LEAF_LENGTH = 1 << LEAF_BITS; //第二層存儲元素的個數(shù) //第一層數(shù)組中存儲的元素類型 struct Leaf { void* values[LEAF_LENGTH]; }; Leaf* root_[ROOT_LENGTH]; //第一層數(shù)組 public: typedef uintptr_t Number; explicit TCMalloc_PageMap2() { memset(root_, 0, sizeof(root_)); //將第一層的空間進行清理 PreallocateMoreMemory(); //直接將第二層全部開辟 } void* get(Number k) const { const Number i1 = k >> LEAF_BITS; //第一層對應(yīng)的下標 const Number i2 = k & (LEAF_LENGTH - 1); //第二層對應(yīng)的下標 if ((k >> BITS) > 0 || root_[i1] == NULL) //頁號值不在范圍或沒有建立過映射 { return NULL; } return root_[i1]->values[i2]; //返回該頁號對應(yīng)span的指針 } void set(Number k, void* v) { const Number i1 = k >> LEAF_BITS; //第一層對應(yīng)的下標 const Number i2 = k & (LEAF_LENGTH - 1); //第二層對應(yīng)的下標 assert(i1 < ROOT_LENGTH); root_[i1]->values[i2] = v; //建立該頁號與對應(yīng)span的映射 } //確保映射[start,start_n-1]頁號的空間是開辟好了的 bool Ensure(Number start, size_t n) { for (Number key = start; key <= start + n - 1;) { const Number i1 = key >> LEAF_BITS; if (i1 >= ROOT_LENGTH) //頁號超出范圍 return false; if (root_[i1] == NULL) //第一層i1下標指向的空間未開辟 { //開辟對應(yīng)空間 static ObjectPool<Leaf> leafPool; Leaf* leaf = (Leaf*)leafPool.New(); memset(leaf, 0, sizeof(*leaf)); root_[i1] = leaf; } key = ((key >> LEAF_BITS) + 1) << LEAF_BITS; //繼續(xù)后續(xù)檢查 } return true; } void PreallocateMoreMemory() { Ensure(0, 1 << BITS); //將第二層的空間全部開辟好 } };
因此在二層基數(shù)樹中有一個Ensure函數(shù),當需要建立某一頁號與其span之間的映射關(guān)系時,需要先調(diào)用該Ensure函數(shù)確保用于映射該頁號的空間是開辟了的,如果沒有開辟則會立即開辟。
而在32位平臺下,就算將二層基數(shù)樹第二層的數(shù)組全部開辟出來也就消耗了 2 M 2M 2M的空間,內(nèi)存消耗也不算太多,因此我們可以在構(gòu)造二層基數(shù)樹時就把第二層的數(shù)組全部開辟出來。
三層基數(shù)樹
上面一層基數(shù)樹和二層基數(shù)樹都適用于32位平臺,而對于64位的平臺就需要用三層基數(shù)樹了。三層基數(shù)樹與二層基數(shù)樹類似,三層基數(shù)樹實際上就是把存儲頁號的若干比特位分為三次進行映射。
此時只有當要建立某一頁號的映射關(guān)系時,再開辟對應(yīng)的數(shù)組空間,而沒有建立映射的頁號就可以不用開辟其對應(yīng)的數(shù)組空間,此時就能在一定程度上節(jié)省內(nèi)存空間。
//三層基數(shù)樹 template <int BITS> class TCMalloc_PageMap3 { private: static const int INTERIOR_BITS = (BITS + 2) / 3; //第一、二層對應(yīng)頁號的比特位個數(shù) static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS; //第一、二層存儲元素的個數(shù) static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS; //第三層對應(yīng)頁號的比特位個數(shù) static const int LEAF_LENGTH = 1 << LEAF_BITS; //第三層存儲元素的個數(shù) struct Node { Node* ptrs[INTERIOR_LENGTH]; }; struct Leaf { void* values[LEAF_LENGTH]; }; Node* NewNode() { static ObjectPool<Node> nodePool; Node* result = nodePool.New(); if (result != NULL) { memset(result, 0, sizeof(*result)); } return result; } Node* root_; public: typedef uintptr_t Number; explicit TCMalloc_PageMap3() { root_ = NewNode(); } void* get(Number k) const { const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS); //第一層對應(yīng)的下標 const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1); //第二層對應(yīng)的下標 const Number i3 = k & (LEAF_LENGTH - 1); //第三層對應(yīng)的下標 //頁號超出范圍,或映射該頁號的空間未開辟 if ((k >> BITS) > 0 || root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) { return NULL; } return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3]; //返回該頁號對應(yīng)span的指針 } void set(Number k, void* v) { assert(k >> BITS == 0); const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS); //第一層對應(yīng)的下標 const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1); //第二層對應(yīng)的下標 const Number i3 = k & (LEAF_LENGTH - 1); //第三層對應(yīng)的下標 Ensure(k, 1); //確保映射第k頁頁號的空間是開辟好了的 reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v; //建立該頁號與對應(yīng)span的映射 } //確保映射[start,start+n-1]頁號的空間是開辟好了的 bool Ensure(Number start, size_t n) { for (Number key = start; key <= start + n - 1;) { const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS); //第一層對應(yīng)的下標 const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1); //第二層對應(yīng)的下標 if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH) //下標值超出范圍 return false; if (root_->ptrs[i1] == NULL) //第一層i1下標指向的空間未開辟 { //開辟對應(yīng)空間 Node* n = NewNode(); if (n == NULL) return false; root_->ptrs[i1] = n; } if (root_->ptrs[i1]->ptrs[i2] == NULL) //第二層i2下標指向的空間未開辟 { //開辟對應(yīng)空間 static ObjectPool<Leaf> leafPool; Leaf* leaf = leafPool.New(); if (leaf == NULL) return false; memset(leaf, 0, sizeof(*leaf)); root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf); } key = ((key >> LEAF_BITS) + 1) << LEAF_BITS; //繼續(xù)后續(xù)檢查 } return true; } void PreallocateMoreMemory() {} };
因此當我們要建立某一頁號的映射關(guān)系時,需要先確保存儲該頁映射的數(shù)組空間是開辟好了的,也就是調(diào)用代碼中的Ensure函數(shù),如果對應(yīng)數(shù)組空間未開辟則會立馬開辟對應(yīng)的空間。
使用基數(shù)樹進行優(yōu)化代碼實現(xiàn)
代碼更改
現(xiàn)在我們用基數(shù)樹對代碼進行優(yōu)化,此時將PageCache類當中的unorder_map用基數(shù)樹進行替換即可,由于當前是32位平臺,因此這里隨便用幾層基數(shù)樹都可以。
//單例模式 class PageCache { public: //... private: //std::unordered_map<PAGE_ID, Span*> _idSpanMap; TCMalloc_PageMap1<32 - PAGE_SHIFT> _idSpanMap; };
此時當我們需要建立頁號與span的映射時,就調(diào)用基數(shù)樹當中的set函數(shù)。
_idSpanMap.set(span->_pageId, span);
而當我們需要讀取某一頁號對應(yīng)的span時,就調(diào)用基數(shù)樹當中的get函數(shù)。
Span* ret = (Span*)_idSpanMap.get(id);
并且現(xiàn)在PageCache類向外提供的,用于讀取映射關(guān)系的MapObjectToSpan函數(shù)內(nèi)部就不需要加鎖了。
//獲取從對象到span的映射 Span* PageCache::MapObjectToSpan(void* obj) { PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT; //頁號 Span* ret = (Span*)_idSpanMap.get(id); assert(ret != nullptr); return ret; }
為什么讀取基數(shù)樹映射關(guān)系時不需要加鎖?
當某個線程在讀取映射關(guān)系時,可能另外一個線程正在建立其他頁號的映射關(guān)系,而此時無論我們用的是C++當中的map還是unordered_map,在讀取映射關(guān)系時都是需要加鎖的。
因為C++中map的底層數(shù)據(jù)結(jié)構(gòu)是紅黑樹,unordered_map的底層數(shù)據(jù)結(jié)構(gòu)是哈希表,而無論是紅黑樹還是哈希表,當我們在插入數(shù)據(jù)時其底層的結(jié)構(gòu)都有可能會發(fā)生變化。比如紅黑樹在插入數(shù)據(jù)時可能會引起樹的旋轉(zhuǎn),而哈希表在插入數(shù)據(jù)時可能會引起哈希表擴容。此時要避免出現(xiàn)數(shù)據(jù)不一致的問題,就不能讓插入操作和讀取操作同時進行,因此我們在讀取映射關(guān)系的時候是需要加鎖的。
而對于基數(shù)樹來說就不一樣了,基數(shù)樹的空間一旦開辟好了就不會發(fā)生變化,因此無論什么時候去讀取某個頁的映射,都是對應(yīng)在一個固定的位置進行讀取的。并且我們不會同時對同一個頁進行讀取映射和建立映射的操作,因為我們只有在釋放對象時才需要讀取映射,而建立映射的操作都是在page cache進行的。也就是說,讀取映射時讀取的都是對應(yīng)span的_useCount不等于0的頁,而建立映射時建立的都是對應(yīng)span的_useCount等于0的頁,所以說我們不會同時對同一個頁進行讀取映射和建立映射的操作。
再次對比malloc進行測試
還是同樣的代碼,只不過我們用基數(shù)樹對代碼進行了優(yōu)化,這時測試固定大小內(nèi)存的申請和釋放的結(jié)果如下:
可以看到,這時就算申請釋放的是固定大小的對象,其效率都是malloc的兩倍。下面在申請釋放不同大小的對象時,由于central cache的桶鎖起作用了,其效率更是變成了malloc的好幾倍。
打包成動靜態(tài)庫
實際Google開源的tcmalloc是會直接用于替換malloc的,不同平臺替換的方式不同。比如基于Unix的系統(tǒng)上的glibc,使用了weak alias的方式替換;而對于某些其他平臺,需要使用hook的鉤子技術(shù)來做。
對于我們當前實現(xiàn)的項目,可以考慮將其打包成靜態(tài)庫或動態(tài)庫。我們先右擊解決方案資源管理器當中的項目名稱,然后選擇屬性。
此時會彈出該選項卡,按照以下圖示就可以選擇將其打包成靜態(tài)庫或動態(tài)庫了。
項目源碼
Github:https://github.com/chenlong-xcy/standard-project/tree/main/ConcurrentMemoryPool
到此這篇關(guān)于C++高并發(fā)內(nèi)存池的實現(xiàn)的文章就介紹到這了,更多相關(guān)C++高并發(fā)內(nèi)存池內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
C++運算符重載實例代碼詳解(調(diào)試環(huán)境 Visual Studio 2019)
這篇文章主要介紹了C++運算符重載實例(調(diào)試環(huán)境 Visual Studio 2019),本文通過示例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-03-03Linux搭建C++開發(fā)調(diào)試環(huán)境的方法步驟
這篇文章主要介紹了Linux搭建C++開發(fā)調(diào)試環(huán)境的方法步驟,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-10-10C語言詳解用char實現(xiàn)大小寫字母的轉(zhuǎn)換
這篇文章主要給大家介紹了關(guān)于C語言實現(xiàn)大小寫字母轉(zhuǎn)換的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2022-05-05OpenCV霍夫變換(Hough Transform)直線檢測詳解
這篇文章主要為大家詳細介紹了OpenCV霍夫變換直線檢測的相關(guān)資料,具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-12-12