原項(xiàng)目地址: 高并發(fā)內(nèi)存池項(xiàng)目: 高并發(fā)內(nèi)存池項(xiàng)目的課堂板書+代碼 (gitee.com)
寫在前面
本打算利用五一假期的時(shí)間將這個(gè)項(xiàng)目一口氣開發(fā)完成,但由于本人的懈怠,這個(gè)項(xiàng)目最終只完成了80%。于是利用長(zhǎng)假后的一天假期,將這個(gè)項(xiàng)目的框架搭建完成。本以為這個(gè)項(xiàng)目就此結(jié)束,但是調(diào)試項(xiàng)目所花費(fèi)的時(shí)間遠(yuǎn)遠(yuǎn)超出了預(yù)期,直到現(xiàn)在這個(gè)項(xiàng)目還是有著未知的bug。修好了一個(gè)bug,測(cè)試用例終于能跑通時(shí)。以為終于結(jié)束,換個(gè)測(cè)試用例,下個(gè)bug又馬上到來(lái)。比起上個(gè)項(xiàng)目的順利,這個(gè)項(xiàng)目中的無(wú)數(shù)問(wèn)題(大多是運(yùn)行時(shí)內(nèi)存崩潰)也算得上是C++的一種“魅力”吧。學(xué)了這么久C++,我也總會(huì)體會(huì)到了它的“魅力”。
之所以提前寫這篇文章,是因?yàn)槲也恢肋@個(gè)項(xiàng)目的調(diào)試何時(shí)能完成。故文章中列出的代碼可能是有問(wèn)題的,因此這篇文章的重點(diǎn)不在代碼,重點(diǎn)在于項(xiàng)目的整體框架。并且在開發(fā)項(xiàng)目的過(guò)程中,記錄下的類似草稿的文字(包括代碼)已經(jīng)超過(guò)了40000個(gè)字符,當(dāng)項(xiàng)目完成時(shí),我想我應(yīng)該沒(méi)有勇氣再將這些筆記整理成文章了。因此這里就先整理一部分的筆記,后續(xù)應(yīng)該還有調(diào)試與總結(jié)的文章。
剛才說(shuō)到項(xiàng)目還在調(diào)試,而調(diào)試項(xiàng)目需要對(duì)整體的框架有著清晰的認(rèn)識(shí),大到類的結(jié)構(gòu),小到變量的名稱,這些都要熟悉。所以總結(jié)這篇文章也是為了更好的調(diào)試。
最后,再次強(qiáng)調(diào):這篇文章是一個(gè)未完成項(xiàng)目的邏輯梳理,文中的代碼可能含有bug,若讀者能夠指出錯(cuò)誤,還望與我理性討論,我將感激不盡。
我的命名規(guī)則
- 類名:大駝峰
- 函數(shù)名與變量名:unix風(fēng)格的下劃線
- 未區(qū)分構(gòu)造函數(shù)的形參與成員變量,類成員統(tǒng)一以下劃線開頭
malloc是語(yǔ)言提供的一個(gè)內(nèi)存分配機(jī)制,其擁有自己的緩存機(jī)制以提高分配內(nèi)存的效率。但malloc畢竟是一個(gè)通用的接口,為了通用性,其效率必然不高。因此,STL為頻繁申請(qǐng)小塊內(nèi)存的操作定制了一個(gè)內(nèi)存池,該內(nèi)存池解決了多次申請(qǐng)小塊內(nèi)存導(dǎo)致的內(nèi)存碎片問(wèn)題,并且提高了小塊內(nèi)存的分配效率。而高并發(fā)內(nèi)存池則是針對(duì)多線程場(chǎng)景下的優(yōu)化,其目的在于解決內(nèi)存碎片問(wèn)題與提高多線程申請(qǐng)內(nèi)存時(shí)的效率。
定長(zhǎng)內(nèi)存池的實(shí)現(xiàn)
為什么要實(shí)現(xiàn)定長(zhǎng)內(nèi)存池?它是高并發(fā)內(nèi)存池的子結(jié)構(gòu),同時(shí)通過(guò)它我們可以了解內(nèi)存池的簡(jiǎn)單機(jī)制。定長(zhǎng)內(nèi)存池不考慮內(nèi)存碎片的問(wèn)題,即假設(shè)用戶每次申請(qǐng)的內(nèi)存長(zhǎng)度固定。
與STL的空間配置器對(duì)比,為了追求極致的效率與減少內(nèi)存碎片的產(chǎn)生,空間配置器的內(nèi)存池不是定長(zhǎng)的,用戶可以申請(qǐng)的內(nèi)存長(zhǎng)度為8B、16B…、128B,它們都是8的倍數(shù),當(dāng)然了,配置器需要將用戶申請(qǐng)的內(nèi)存向8的倍數(shù)對(duì)齊。如果不使用配置器,用戶直接調(diào)用malloc申請(qǐng)沒(méi)有對(duì)齊過(guò)的小內(nèi)存塊,很容易導(dǎo)致內(nèi)存碎片的問(wèn)題。所以空間配置器不僅要保證極致的效率還要減少內(nèi)存碎片的產(chǎn)生。這也是空間配置器和定長(zhǎng)內(nèi)存池的自由鏈表類型不同的緣由,這個(gè)結(jié)構(gòu)將在之后看到??臻g配置器的自由鏈表是一個(gè)哈希桶,存儲(chǔ)的是二級(jí)指針。而定長(zhǎng)內(nèi)存池的自由鏈表是一個(gè)單鏈表,存儲(chǔ)的是一級(jí)指針。從結(jié)構(gòu)與原理的角度看,定長(zhǎng)內(nèi)存池的實(shí)現(xiàn)更為簡(jiǎn)單。
定長(zhǎng)內(nèi)存池的結(jié)構(gòu)
首先,定長(zhǎng)內(nèi)存池以一個(gè)類模板的形式呈現(xiàn)。關(guān)于它的模板參數(shù),可以是表示內(nèi)存塊大小的非類型模板參數(shù)。也可以是一個(gè)類型模板參數(shù),此時(shí)內(nèi)存塊的大小就是該類型的大小。我用模板參數(shù)實(shí)現(xiàn)內(nèi)存池,每次調(diào)用內(nèi)存申請(qǐng)接口獲取的內(nèi)存塊大小,等于該參數(shù)類型的大小。
接著是內(nèi)存池的結(jié)構(gòu):與空間配置器一樣,定長(zhǎng)內(nèi)存池用兩個(gè)指針start和finish維護(hù)內(nèi)存池。為什么不能只是用start指針呢?若只使用start指針,則無(wú)法確定內(nèi)存池的使用情況:是否使用完了,還剩多少內(nèi)存沒(méi)有使用?這將導(dǎo)致內(nèi)存池內(nèi)存耗盡而我們卻不知情,進(jìn)而導(dǎo)致內(nèi)存的越界訪問(wèn)。
由于內(nèi)存池是連續(xù)的,用戶申請(qǐng)與歸還內(nèi)存塊的順序大概率是不同的,所以我們無(wú)法只用內(nèi)存池維護(hù)用戶歸還的內(nèi)存塊。因此,這里使用自由鏈表維護(hù)用戶歸還的內(nèi)存塊,當(dāng)用戶申請(qǐng)內(nèi)存塊時(shí),優(yōu)先使用自由鏈表中的內(nèi)存塊。因?yàn)閺膬?nèi)存池拿走的內(nèi)存塊被歸還到自由鏈表中了。因此除了一個(gè)內(nèi)存池,我們還需要一個(gè)自由鏈表維護(hù)內(nèi)存塊的申請(qǐng)與歸還。
定長(zhǎng)內(nèi)存池的內(nèi)存操作
定長(zhǎng)內(nèi)存池主要的接口有兩個(gè):new_obj和delete_obj,分別負(fù)責(zé)分配內(nèi)存塊給用戶與歸還用戶的內(nèi)存塊。
- new_obj:優(yōu)先檢查自由鏈表是否為空
- 若不為空,則將鏈表的第一個(gè)內(nèi)存塊返回(頭刪)
- 若為空,則填充內(nèi)存池,并將第一塊內(nèi)存塊返回
- delete_obj:將用戶歸還的內(nèi)存塊頭插到自由鏈表
在這兩個(gè)接口中需要用到一個(gè)操作:獲取節(jié)點(diǎn)的next指針時(shí),需要將內(nèi)存塊強(qiáng)轉(zhuǎn)為void**再解引用得到next指針。我們知道32位系統(tǒng)與64位系統(tǒng)的指針大小是不相同的。在自由鏈表中,內(nèi)存塊需要用4/8字節(jié)的空間存儲(chǔ)下一內(nèi)存塊的地址,類似單鏈表中的next指針。一種簡(jiǎn)單的解決方式是:使用條件編譯判斷當(dāng)前系統(tǒng)的位數(shù),從而確定指針的字節(jié)數(shù)。
另一種方法是使用*(void**),void*作為一個(gè)指針變量,在32位系統(tǒng)下大小為4字節(jié),在64位系統(tǒng)下大小為8字節(jié)。將內(nèi)存塊的地址強(qiáng)轉(zhuǎn)成void**,再通過(guò)解引用該地址訪問(wèn)void*長(zhǎng)度字節(jié)的數(shù)據(jù),這個(gè)操作在32位系統(tǒng)下可以訪問(wèn)4字節(jié)空間,64位系統(tǒng)下可以訪問(wèn)8字節(jié)空間,很好的規(guī)避了條件編譯的繁瑣。當(dāng)然了,*(void**)中的void可以被替換成任意類型,如int,char。(ps:SGI版本的STL中,使用了union聯(lián)合體代替這種強(qiáng)轉(zhuǎn)操作,這也是一種不錯(cuò)的解決方法)
以下是定長(zhǎng)內(nèi)存池的所有實(shí)現(xiàn):
#pragma once
#include "Common.hpp"
#ifdef _WIN32
#include <Windows.h>
#else
// 其他系統(tǒng)的頭文件
#endif
static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
void* p = VirtualAlloc(0, kpage << 12, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
// 其他系統(tǒng)申請(qǐng)內(nèi)存的函數(shù)
#endif
if (p == nullptr)
throw std::bad_alloc();
return p;
}
template <class T>
class ObjectPool
{
private:
char* _start_free = nullptr;
char* _finish_free = nullptr; // 維護(hù)內(nèi)存池的兩個(gè)指針
void* _free_list = nullptr; // 自由鏈表
public:
// 內(nèi)存塊的申請(qǐng)與歸還
T* new_obj();
void delete_obj(T* obj);
};
template <class T>
T* ObjectPool<T>::new_obj()
{
T* obj = nullptr;
// 優(yōu)先向自由鏈表申請(qǐng)內(nèi)存塊
if (_free_list)
{
obj = (T*)_free_list;
_free_list = *(void**)_free_list;
}
// 自由鏈表無(wú)內(nèi)存塊,向內(nèi)存池索要
else
{
// 內(nèi)存池空間不足
if ((_finish_free - _start_free) < sizeof(T))
{
size_t bytes_to_get = sizeof(T) * 1024; // 默認(rèn)每次申請(qǐng)1024個(gè)T的空間
_start_free = (char*)SystemAlloc(bytes_to_get >> 12);
// _start_free = (char*)malloc(bytes_to_get);
if (_start_free == nullptr)
{
throw std::bad_alloc();
}
_finish_free = _start_free + bytes_to_get;
}
size_t need_bytes = sizeof(T) > sizeof(void*) ? sizeof(T) : sizeof(void*);
obj = (T*)_start_free;
_start_free += need_bytes;
}
// 定位new,調(diào)用T的構(gòu)造函數(shù)以進(jìn)行初始化
new(obj)T;
return obj;
}
template <class T>
void ObjectPool<T>::delete_obj(T* obj)
{
obj->~T();
*(void**)obj = _free_list;
_free_list = (void*)obj;
}
其中對(duì)于申請(qǐng)內(nèi)存的操作使用了系統(tǒng)調(diào)用,不再使用malloc。Windows下malloc封裝了VirtualAlloc,Linux下malloc封裝了brk,利用條件編譯使不同的系統(tǒng)調(diào)用相應(yīng)的系統(tǒng)調(diào)用,從而使我們的定長(zhǎng)內(nèi)存池完全脫離malloc。并且,定長(zhǎng)內(nèi)存池的new_obj和delete_obj操作不僅要分配空間,還要進(jìn)行初始化與銷毀工作,所以new_obj的最后需要使用定位new,delete_obj一開始要調(diào)用對(duì)象的析構(gòu)。
最后進(jìn)行效率的測(cè)試,測(cè)試demo:
struct TreeNode
{
int _val;
TreeNode* _left;
TreeNode* _right;
TreeNode()
:_val(0)
, _left(nullptr)
, _right(nullptr)
{}
};
void TestObjectPool()
{
// 申請(qǐng)釋放的輪次
const size_t Rounds = 5;
// 每輪申請(qǐng)釋放多少次
const size_t N = 100000;
std::vector<TreeNode*> v1;
v1.reserve(N);
size_t begin1 = clock();
for (size_t j = 0; j < Rounds; ++j)
{
for (int i = 0; i < N; ++i)
{
v1.push_back(new TreeNode);
}
for (int i = 0; i < N; ++i)
{
delete v1[i];
}
v1.clear();
}
size_t end1 = clock();
std::vector<TreeNode*> v2;
v2.reserve(N);
ObjectPool<TreeNode> TNPool;
size_t begin2 = clock();
for (size_t j = 0; j < Rounds; ++j)
{
for (int i = 0; i < N; ++i)
{
v2.push_back(TNPool.new_obj());
}
for (int i = 0; i < N; ++i)
{
TNPool.delete_obj(v2[i]);
}
v2.clear();
}
size_t end2 = clock();
std::cout << "測(cè)試數(shù)量n = " << N << std::endl;
std::cout << "使用new花費(fèi)的時(shí)間:" << end1 - begin1 << std::endl;
std::cout << "使用定長(zhǎng)內(nèi)存池花費(fèi)的時(shí)間:" WWWWWW<< end2 - begin2 << std::endl;
}
測(cè)試結(jié)果:
高并發(fā)內(nèi)存池的整體框架
接著進(jìn)入主題,介紹高并發(fā)內(nèi)存池的整體框架。
一般內(nèi)存池都需要考慮兩個(gè)問(wèn)題:
- 內(nèi)存碎片問(wèn)題
- 性能問(wèn)題
高并發(fā)內(nèi)存池還需要考慮:線程加鎖導(dǎo)致的競(jìng)爭(zhēng)問(wèn)題。高并發(fā)內(nèi)存池使用三層結(jié)構(gòu)以解決這三個(gè)問(wèn)題
- thread cache:每個(gè)線程獨(dú)享一個(gè)thread cache,可以簡(jiǎn)單的理解為thread cache是每個(gè)線程的小內(nèi)存池,由于被線程獨(dú)享,所以不需要加鎖訪問(wèn),這樣能減少鎖的競(jìng)爭(zhēng)
- central cache:當(dāng)thread cache無(wú)可用內(nèi)存時(shí),需要向central cache索取內(nèi)存。當(dāng)thread cache閑置內(nèi)存過(guò)多時(shí),需要向central cache歸還內(nèi)存,以實(shí)現(xiàn)每個(gè)thread cache的均衡。由于central cache是多線程共享的,所以需要加鎖訪問(wèn)
- page cache:當(dāng)central cache無(wú)可用內(nèi)存時(shí),需要從page cache索取內(nèi)存,page cache會(huì)通過(guò)系統(tǒng)調(diào)用獲取堆資源。當(dāng)central cache的閑置內(nèi)存過(guò)多時(shí),需要向page cache歸還內(nèi)存,以合并小塊內(nèi)存得到大塊內(nèi)存
所以,線程獲取的內(nèi)存資源總是通過(guò)調(diào)用系統(tǒng)接口獲得,只不過(guò)我們?cè)诰€程和系統(tǒng)接口間建立了三層緩存結(jié)構(gòu),以解決其他問(wèn)題。
thread cache
thread cache是一個(gè)線程獨(dú)享的內(nèi)存池,當(dāng)線程需要內(nèi)存資源時(shí),優(yōu)先向thread cache索取。和文章一開始實(shí)現(xiàn)的定長(zhǎng)內(nèi)存池不同,我們只能向定長(zhǎng)內(nèi)存池索取相同大小的內(nèi)存塊,但是線程的情況很復(fù)雜,需要向thread cache申請(qǐng)不同大小的內(nèi)存塊,每次申請(qǐng)的內(nèi)存塊大小極有可能不相同。
因此thread cache是不定長(zhǎng)的,其自由鏈表也不再是簡(jiǎn)單的單鏈表,而是懸掛單鏈表的哈希桶。哈希桶其實(shí)是一個(gè)指針數(shù)組,其成員指向了和定長(zhǎng)內(nèi)存池一樣的單鏈表。
至于thread cache的操作,主要有兩個(gè):_allocate和_deallocate。
對(duì)于_allocate,我們假設(shè)線程能夠申請(qǐng)的最大內(nèi)存塊大小為256kB,以8B作為桶的基本單位,每個(gè)桶存儲(chǔ)的內(nèi)存塊大小為8B,16B,24B,…,256kB,但是這樣的自由鏈表未免有些笨重(注意,最大內(nèi)存塊為256kB),竟然有32k個(gè)桶,顯然是不合理的。所以我們需要設(shè)置對(duì)齊規(guī)則,使桶的數(shù)量盡可能的合理:
// [1, 128] 8B對(duì)齊 _free_lists[0, 16)
// [128 + 1, 1024] 16B對(duì)齊 _free_lists[16, 72)
// [1024 + 1, 8 * 1024] 128B對(duì)齊 _free_lists[72, 128)
// [8 * 1024 + 1, 64 * 1024] 1024對(duì)齊 _free_lists[128, 184)
// [64 * 1024 + 1, 256 * 1024] 8 * 1024對(duì)齊 _free_lists[184, 208)
對(duì)于不同的區(qū)間設(shè)置不同的對(duì)齊數(shù),若線程申請(qǐng)的內(nèi)存塊大小不是對(duì)齊數(shù)的整數(shù)倍,那么我們應(yīng)該向上對(duì)齊,多分配給線程一些空間,就算線程壓根不會(huì)使用,這是為了結(jié)構(gòu)的簡(jiǎn)單考慮。根據(jù)上面的對(duì)齊規(guī)則,thread cache的自由鏈表只有208個(gè)桶,同時(shí)線程能夠申請(qǐng)的最大內(nèi)存塊大小為256kB,在這樣的規(guī)則下,空間的浪費(fèi)只有10%左右,這是相當(dāng)不錯(cuò)的了。
剛才提到了由向上取整導(dǎo)致的空間浪費(fèi)問(wèn)題,這樣的空間浪費(fèi)有一個(gè)名字:內(nèi)碎片。通常我們說(shuō)的內(nèi)存碎片是指外碎片,外碎片存在于兩個(gè)內(nèi)存塊之間,當(dāng)我們需要連續(xù)的大塊空間時(shí),由于過(guò)多的外碎片,可能導(dǎo)致內(nèi)存不足的情況,但這些外碎片大小的總和卻大于我們需要的空間,所以此時(shí)的內(nèi)存不足并不是真正的內(nèi)存不足??傊?,外碎片具有不連續(xù),可利用性低的特點(diǎn),如果內(nèi)存中存在大量外碎片,系統(tǒng)的性能將受到極大的影響。而向上取整導(dǎo)致的內(nèi)碎片是無(wú)法避免的,這是為了追求極致的效率而付出的代價(jià)。但是,用可控的內(nèi)碎片減少不可控的外碎片的產(chǎn)生,也是一種問(wèn)題解決辦法。
綜上,實(shí)現(xiàn)thread cache的allocate之前要實(shí)現(xiàn)兩個(gè)主要操作:內(nèi)存的向上取整和桶號(hào)映射。這里我們需要對(duì)自由鏈表的子結(jié)構(gòu)進(jìn)行封裝:
// 桶的最多數(shù)量與內(nèi)存塊的最大字節(jié)數(shù)
static const size_t NFREELIST = 208;
static const size_t MAX_SIZE = 256 * 1024;
// 提取出obj的指針字段,以引用的形式返回
static void*& next(void* obj)
{
return *(void**)obj;
}
// 自由鏈表的子結(jié)構(gòu):?jiǎn)捂湵?/span>
class FreeList
{
private:
void* _head = nullptr;
public:
void _push_front(void* obj);
void* _pop_front();
bool _empty();
};
bool FreeList::_empty()
{
return _head == nullptr;
}
void FreeList::_push_front(void* obj)
{
if (obj)
{
next(obj) = _head;
_head = (void*)obj;
}
}
void* FreeList::_pop_front()
{
if (_head != nullptr)
{
void* ret = _head;
_head = next(_head);
return ret;
}
return nullptr;
}
剛才說(shuō)過(guò)thread cache的自由鏈表的本質(zhì)是一個(gè)哈希桶,哈希桶也是一個(gè)單鏈表,其成員也懸掛著單鏈表,也就是FreeList結(jié)構(gòu)。這個(gè)結(jié)構(gòu)對(duì)外暴露三個(gè)接口:判空、元素的插入與刪除。
然后是封裝一個(gè)類SizeClass,以實(shí)現(xiàn)向上取整與桶號(hào)映射,這個(gè)類對(duì)外暴露兩個(gè)接口,分別是:
- 字節(jié)數(shù)的向上取整
- 字節(jié)數(shù)和桶號(hào)映射
class SizeClass
{
private:
static inline size_t __round_up(size_t bytes, size_t align_num) { return (bytes + (align_num - 1)) & ~(align_num - 1); }
static inline size_t __get_index(size_t bytes, size_t align) { return ((bytes + (1 << align) - 1) >> align) - 1; }
public:
static inline size_t _round_up(size_t bytes);
static inline size_t _get_index(size_t bytes);
};
size_t SizeClass::_round_up(size_t bytes)
{
int ret = 0;
if (bytes > 0)
{
if (bytes <= 128)
ret = __round_up(bytes, 8);
else if (bytes <= 1024)
ret = __round_up(bytes, 16);
else if (bytes <= 8 * 1204)
ret = __round_up(bytes, 128);
else if (bytes <= 64 * 1024)
ret = __round_up(bytes, 1024);
else if (bytes <= 256 * 1024)
ret = __round_up(bytes, 8 * 1024);
else
std::cerr << "__round_up::bytes illegal" << std::endl;
}
return ret;
}
size_t SizeClass::_get_index(size_t bytes)
{
int ret = 0;
static int bucket_count[4] = { 16, 72, 128, 184 };
if (bytes > 0)
{
if (bytes <= 128)
ret = __get_index(bytes, 3);
else if (bytes <= 1024)
ret = __get_index(bytes - 128, 4) + bucket_count[0];
else if (bytes <= 8 * 1204)
ret = __get_index(bytes - 1024, 7) + bucket_count[1];
else if (bytes <= 64 * 1024)
ret = __get_index(bytes - 8 * 1024, 10) + bucket_count[2];
else if (bytes <= 256 * 1024)
ret = __get_index(bytes - 64 * 1024, 13) + bucket_count[3];
else
std::cerr << "__get_index::bytes illegal" << std::endl;
}
return ret;
}
以上兩個(gè)類都定義在Common.hpp中。其中,向上取整和桶號(hào)映射使用了位運(yùn)算,這樣做可以讓算法變得簡(jiǎn)潔與高效,這也是經(jīng)常會(huì)用到的兩個(gè)位運(yùn)算操作。
回到thread cache的_allocate:
- 我們需要將線程索取的內(nèi)存塊大小映射為桶號(hào)
- 根據(jù)桶號(hào)在哈希桶中找到這個(gè)桶
- 判斷這個(gè)桶是否為空
- 如果為空,那么thread cache就要向central cache索取內(nèi)存(這個(gè)操作之后實(shí)現(xiàn))
- 如果不為空,那么thread cache就從桶中取出一塊內(nèi)存塊(FreeList的頭刪操作)
至于thread cache的_deallocate:
- 我們也需要將線程索取的內(nèi)存塊大小映射為桶號(hào)
- 根據(jù)桶號(hào)在哈希桶中找到這個(gè)桶
- 將內(nèi)存塊放回桶中(FreeList的頭插操作)
ThreadCache的實(shí)現(xiàn):
#include "Common.hpp"
class ThreadCache
{
private:
FreeList _free_lists[NFREELIST];
public:
// 內(nèi)存塊的分配與歸還
void* _allocate(size_t bytes);
void _deallocate(void* obj, size_t bytes);
};
// TLS
#ifdef _WIN32
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
#else
// 其他系統(tǒng)
#endif
void* ThreadCache::_allocate(size_t bytes)
{
void* obj = nullptr;
if (bytes <= MAX_SIZE)
{
size_t need_bytes = SizeClass::_round_up(bytes);
size_t bucket_index = SizeClass::_get_index(bytes);
if (_free_lists[bucket_index]._empty())
{
// TODO內(nèi)存不足,向CentralCache索取內(nèi)存
}
else
{
obj = _free_lists[bucket_index]._pop_front();
}
}
return obj;
}
void ThreadCache::_deallocate(void* obj, size_t bytes)
{
if (obj && bytes <= MAX_SIZE)
{
size_t bucket_index = SizeClass::_get_index(block_size);
_free_lists[bucket_index]._push_front(obj);
}
}
其中使用到了:TLS,thread local storage,線程本地存儲(chǔ)。因?yàn)槊總€(gè)線程需要獨(dú)享thread cache,所以每個(gè)線程都需要有一個(gè)自己的ThreadCache對(duì)象。我們用一個(gè)指針指向ThreadCache對(duì)象的地址,并且這個(gè)指針是線程獨(dú)享的,當(dāng)線程需要申請(qǐng)內(nèi)存時(shí),首先檢查這個(gè)指針是否為空,若為空則需要new一個(gè)ThreadCache對(duì)象并保存其指針,然后通過(guò)這個(gè)指針調(diào)用ThreadCache的_allocate和_deallocate。
所以我們?cè)俜庋b兩個(gè)函數(shù),這兩個(gè)函數(shù)對(duì)標(biāo)malloc和free。對(duì)外只暴露這兩個(gè)函數(shù),它們會(huì)調(diào)用thread cache的_allocate和_deallocate,但在那之前會(huì)先獲取TLS對(duì)象,也就是ThreadCache對(duì)象的指針,通過(guò)該指針調(diào)用ThreadCache的兩個(gè)成員函數(shù)。
// ConcurrentAlloc.hpp
#pragma once
#include "Common.hpp"
#include "ThreadCache.hpp"
static void* tc_allocate(size_t bytes)
{
if (pTLSThreadCache == nullptr)
pTLSThreadCache = new ThreadCache;
return pTLSThreadCache->_allocate(bytes);
}
static void tc_deallocate(void* obj, size_t bytes)
{
if (pTLSThreadCache)
pTLSThreadCache->_deallocate(obj, bytes);
}
這樣的話,對(duì)于線程的動(dòng)態(tài)資源獲取與釋放的操作,只要調(diào)用tc_allocate和tc_deallocate就行了。
對(duì)于thread cache的_allocate實(shí)現(xiàn)中,還有一個(gè)操作沒(méi)有實(shí)現(xiàn):某一鏈表中沒(méi)有內(nèi)存塊時(shí),ThreadCache應(yīng)該向CentralCache索取內(nèi)存,要實(shí)現(xiàn)這個(gè)操作就要先實(shí)現(xiàn)central cache的整體結(jié)構(gòu)。
central cache
對(duì)于central cache,我們需要完成其節(jié)點(diǎn)Span,桶(存儲(chǔ)Span的雙向鏈表SpanList),以及哈希桶的實(shí)現(xiàn)。也就是說(shuō),哈希桶作為指針數(shù)組,其成員指向類型為SpanList的雙向鏈表,雙向鏈表的節(jié)點(diǎn)類型是Span。
對(duì)于Span:
- Span通過(guò)保存起始頁(yè)號(hào)以及擁有的頁(yè)的數(shù)量存儲(chǔ)連續(xù)的內(nèi)存頁(yè),這里需要用到兩個(gè)變量
- 其次它是一個(gè)節(jié)點(diǎn),需要有prev和next兩個(gè)指針
- 再者,因?yàn)镾pan跨越了多個(gè)內(nèi)存頁(yè),而一個(gè)內(nèi)存頁(yè)的大小通常是4kB,因此central cache需要切分內(nèi)存頁(yè),得到合適的連續(xù)的內(nèi)存塊,再分配給thread cache。這里用一個(gè)單鏈表保存切分好的內(nèi)存塊,但該鏈表不復(fù)用FreeList結(jié)構(gòu),Span只是保存鏈表的頭指針。至于為什么,之后就知道了
對(duì)于SpanList:Span只是一個(gè)節(jié)點(diǎn),我們需要用它來(lái)構(gòu)成雙鏈表SpanList。central cache的哈希桶需要包含208個(gè)SpanList(和thread cache的FreeList數(shù)量一樣)。這些雙鏈表在哈希桶中的映射規(guī)則和thread cache的哈希桶一樣,當(dāng)thread cache的某一桶號(hào)下的FreeList沒(méi)有內(nèi)存塊可用時(shí),我們只需要到central cache的相同桶號(hào)下的SpanList中,獲取一個(gè)可用的Span,將其切分好的內(nèi)存塊給thread cache使用即可。
總結(jié)一下:
- Span作為雙鏈表的節(jié)點(diǎn),存儲(chǔ)了多個(gè)連續(xù)的頁(yè)的信息,并且將這些頁(yè)切成了小內(nèi)存塊
- SpanList是將Span作為節(jié)點(diǎn)的雙鏈表,懸掛了多個(gè)Span
- central cache的本質(zhì)是由208個(gè)SpanList構(gòu)成的一個(gè)哈希桶,桶的映射規(guī)則與thread cache是對(duì)應(yīng)的
先實(shí)現(xiàn)Span結(jié)構(gòu):
// 類似于單鏈表中的節(jié)點(diǎn)
// 其跨越/存儲(chǔ)了多個(gè)內(nèi)存頁(yè)
struct Span
{
Span* _prev = nullptr;
Span* _next = nullptr;
// 存儲(chǔ)的內(nèi)存頁(yè)其實(shí)id與數(shù)量
page_t _id = 0;
size_t _n = 0;
// Span維護(hù)的多個(gè)內(nèi)存塊
void* _free_list = nullptr;
};
因?yàn)槲也幌雽憳?gòu)造函數(shù),所以給每個(gè)參數(shù)帶上了缺省值。
再來(lái)實(shí)現(xiàn)SpanList結(jié)構(gòu),這是一個(gè)帶頭節(jié)點(diǎn)的雙鏈表,對(duì)外提供insert和erase操作:
// 由Span作為節(jié)點(diǎn)的雙向鏈表
// SpanList是CenctralCache的一個(gè)桶
class SpanList
{
public:
SpanList();
~SpanList();
// 在pos前插入節(jié)點(diǎn)
void _insert(Span* obj, Span* pos);
// 刪除pos節(jié)點(diǎn)
void _erase(Span* pos);
// 為什么需要一把鎖,這個(gè)等等就解釋
std::mutex _mtx;
private:
Span* _head;
};
SpanList::SpanList()
{
_head = new Span;
_head->_prev = _head;
_head->_next = _head;
}
SpanList::~SpanList()
{
delete _head;
}
void SpanList::_insert(Span* obj, Span* pos)
{
Span* prev = pos->_prev;
prev->_next = obj;
obj->_prev = prev;
pos->_prev = obj;
obj->_next = pos;
}
void SpanList::_erase(Span* pos)
{
Span* prev = pos->_prev;
Span* next = pos->_next;
prev->_next = next;
next->_prev = prev;
}
這是很簡(jiǎn)單的數(shù)據(jù)結(jié)構(gòu)的實(shí)現(xiàn),沒(méi)啥好說(shuō)的。
最后是CentralCache的單例:因?yàn)閏entral cache只需要有一個(gè),所以被設(shè)計(jì)成單例。又因?yàn)槎鄠€(gè)thread cache可以同時(shí)訪問(wèn)central cache,所以還需要對(duì)central cache加鎖。但是考慮到加鎖粒度的問(wèn)題:由于thread cache的哈希桶映射規(guī)則和central cache一樣,因此thread cache向central cache索取內(nèi)存塊時(shí),只會(huì)訪問(wèn)其中一個(gè)SpanList。也就是說(shuō),獲取內(nèi)存塊時(shí),只需要對(duì)SpanList加鎖,而不用對(duì)整個(gè)哈希桶加鎖。所以SpanList還需要擁有一把鎖,當(dāng)thread cache訪問(wèn)SpanList前,需要先獲取這把鎖。
central cache的結(jié)構(gòu):
#include "Common.hpp"
class CentralCache
{
private:
SpanList _span_lists[MAX_SIZE];
public:
static CentralCache* _get_instance() { return &_instance; }
private:
CentralCache() {}
CentralCache(const CentralCache& x) = delete;
static CentralCache _instance;
};
CentralCache CentralCache::_instance;
這個(gè)單例沒(méi)有必要設(shè)計(jì)成懶漢,為了程序的簡(jiǎn)單,這里就設(shè)計(jì)成餓漢。
聊完了central cache的具體結(jié)構(gòu),回到最開始要解決的問(wèn)題:實(shí)現(xiàn)某一鏈表中沒(méi)有內(nèi)存塊時(shí),ThreadCache向CentralCache索取內(nèi)存的操作。
首先,由于線程池中可申請(qǐng)的內(nèi)存塊大小極值相差過(guò)大,我們需要設(shè)計(jì)一個(gè)慢啟動(dòng)算法(類似擁塞控制中的慢啟動(dòng))使thread cache每次索取的內(nèi)存塊數(shù)量是合理的。
為thread cache的FreeList結(jié)構(gòu)添加一個(gè)變量_fetch_count,該變量存儲(chǔ)了某一FreeList因?yàn)閮?nèi)存塊不足而向central cache索取的次數(shù),但這個(gè)數(shù)一開始是1不是0。每次索取內(nèi)存塊后,該數(shù)都會(huì)自增。修改后的FreeList結(jié)構(gòu),添加一個(gè)成員變量:
// 自由鏈表的子結(jié)構(gòu):?jiǎn)捂湵?/span>
class FreeList
{
private:
void* _head = nullptr;
size_t _fetch_count = 1; // 添加變量
public:
void _push(void* obj);
void* _pop();
bool _empty();
// 關(guān)于_fetch_count的操作
inline size_t _get_fetch_count() { return _fetch_count; }
inline void _add_fetch_count() { ++_fetch_count; }
};
然后再設(shè)計(jì)一個(gè)算法:對(duì)于小的內(nèi)存塊,F(xiàn)reeList索取得多一些。對(duì)于大的內(nèi)存塊,F(xiàn)reeList索取得少一些,使得總體上索取的內(nèi)存大小是差不多的。計(jì)算過(guò)程很簡(jiǎn)單:將內(nèi)存塊大小的最大值(256kB) / FreeList對(duì)應(yīng)的內(nèi)存塊大?。?em>STL的空間配置器中,由于可申請(qǐng)的最大內(nèi)存塊大小為128B,最小為8B,兩者相差不大。因此直接默認(rèn)給20塊內(nèi)存塊,但是在我們的內(nèi)存池中,可申請(qǐng)的最大內(nèi)存塊大小為256kB,對(duì)于不同大小的內(nèi)存塊,肯定不能統(tǒng)一對(duì)待,多給幾塊256kB內(nèi)存塊可不是開玩笑的)。但是該算式得到的結(jié)果在極端情況下,對(duì)于小內(nèi)存塊來(lái)說(shuō)很大,對(duì)于大內(nèi)存塊來(lái)說(shuō)很小,所以這里需要控制一下上下限:
size_t SizeClass::_adapt_counts(size_t rounded_bytes)
{
size_t nums = MAX_SIZE / rounded_bytes;
if (nums < 2) nums = 2;
else if (nums > 512) nums = 512;
return nums;
}
拉低上限,提高下限,將結(jié)果控制在一個(gè)合理的范圍。
回到一開始說(shuō)的慢啟動(dòng):我們無(wú)法保證thread cache可以使用完得到的內(nèi)存塊,可能得到了10塊內(nèi)存塊,但thread cache只使用了1塊。我們可以先給thread cache分配少量的內(nèi)存塊,如果thread cache還要內(nèi)存塊,我們?cè)俳o它多一些。所以central cache給thread cache的內(nèi)存塊數(shù)量要從_fetch_count和_adapt_nums函數(shù)返回的結(jié)果中取最小值。一開始thread cache只能獲取一塊內(nèi)存塊,之后獲取的內(nèi)存塊數(shù)量將線性增長(zhǎng)。直到_fetch_count的值超過(guò)了_adapt_nums返回的結(jié)果,之后thread cache可以得到的內(nèi)存塊數(shù)量就等于_adapt_nums返回的結(jié)果。
我們將central cache分配內(nèi)存塊給thread cache的行為封裝成函數(shù)_fetch_blocks,該函數(shù)將訪問(wèn)對(duì)應(yīng)SpanList下的可用Span,然后將其維護(hù)的內(nèi)存塊盡可能多的返回:
// 將Span維護(hù)的內(nèi)存塊返回,以兩個(gè)指針指向內(nèi)存塊的首尾
void CentralCache::_fetch_blocks(void*& begin, void*& end, size_t& njobs, size_t block_size)
{
size_t bucket_index = SizeClass::_get_index(block_size);
std::unique_lock<std::mutex> guard(_span_lists[bucket_index]._mtx);
// TODO
Span* span = TODO();
if (span)
{
begin = span->_free_list;
end = begin;
int real_jobs = 1;
// 找到自由鏈表的尾,并且修改njobs的值
for (size_t i = 0; next(end) && i < njobs - 1; ++i)
{
end = next(end);
++real_jobs;
}
njobs = real_jobs;
// 將begin和end間的內(nèi)存塊從Span中刪除
span->_free_list = next(end);
next(end) = nullptr;
}
}
由于需要使thread cache互斥訪問(wèn)的SpanList,所以需要加鎖。對(duì)于TODO函數(shù):該函數(shù)會(huì)獲取SpanList中的一個(gè)Span,這個(gè)之后再來(lái)實(shí)現(xiàn)。其中需要注意的是:begin和end分別指向多塊內(nèi)存塊中的第一塊與最后一塊,一開始end等于begin,所以至少能返回一塊,即real_jobs的值為1。之后的循環(huán)結(jié)束條件不是i < njobs而是i < njobs - 1。
總結(jié)下,對(duì)于_fetch_blocks:該接口會(huì)修改begin和end的值,表示其獲取的內(nèi)存塊區(qū)間,同時(shí)也會(huì)修改njobs的值表示真正獲取到的內(nèi)存塊數(shù)量。
thread cache的某一鏈表中沒(méi)有內(nèi)存塊時(shí),會(huì)向central cache索取內(nèi)存。central cache提供給thread cache的接口已經(jīng)實(shí)現(xiàn),現(xiàn)在要實(shí)現(xiàn)thread cache的接口:FreeList需要將從_fetch_blocks得到的多個(gè)內(nèi)存塊中的第一塊返回,并將剩下的懸掛到自己的FreeList中,其實(shí)涉及到一開始說(shuō)的慢啟動(dòng)算法。該接口的實(shí)現(xiàn):
void* ThreadCache::_fetch_from_central_cache(size_t bucket_index, size_t block_size)
{
// 慢啟動(dòng),取兩者的較小值
size_t adapt_count = SizeClass::_adapt_count(block_size);
size_t fetch_count = _free_lists[bucket_index]._get_fetch_count();
size_t njobs = fetch_count < adapt_num ? fetch_count : adapt_num;
// 自增_fetch_count
if (njobs == fetch_count)
_free_lists[bucket_index]._add_fetch_count();
void* begin = nullptr;
void* end = nullptr;
CentralCache::_get_instance()->_fetch_blocks(begin, end, njobs, block_size);
if (njobs > 1)
_free_lists[bucket_index]._range_push(next(begin), end);
return begin;
}
可以看到,只有最后得到的值和_fetch_count相同時(shí),該變量才會(huì)自增,所以準(zhǔn)確的說(shuō)這個(gè)變量表示的不是該鏈表申請(qǐng)內(nèi)存塊的次數(shù),因?yàn)楫?dāng)該變量的值超過(guò)adapt_count后,自增就會(huì)停止。
其中涉及到“_range_push:將多塊內(nèi)存塊懸掛到自己的FreeList中”的操作,這里再為FreeList封裝一個(gè)操作以實(shí)現(xiàn)該操作:
void FreeList::_range_push(void* begin, void* end)
{
if (begin && end)
{
next(end) = _head;
_head = begin;
}
}
至此,thread cache的接口基本實(shí)現(xiàn)完成,以下是到目前為止它的所有實(shí)現(xiàn):
#pragma once
#include "Common.hpp"
#include "CentralCache.hpp"
class ThreadCache
{
private:
FreeList _free_lists[NFREELIST];
// 內(nèi)存不足時(shí),向CentralCache索取
void* _fetch_from_central_cache(size_t bucket_index, size_t block_size);
public:
// 內(nèi)存塊的分配與歸還
void* _allocate(size_t bytes);
void _deallocate(void* obj, size_t bytes);
};
// TLS
#ifdef _WIN32
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
#else
// 其他系統(tǒng)
#endif
void* ThreadCache::_allocate(size_t bytes)
{
void* obj = nullptr;
if (bytes <= MAX_SIZE)
{
size_t need_bytes = SizeClass::_round_up(bytes);
size_t bucket_index = SizeClass::_get_index(bytes);
if (_free_lists[bucket_index]._empty())
{
obj = _fetch_from_central_cache(bucket_index, need_bytes);
}
else
{
obj = _free_lists[bucket_index]._pop_front();
}
}
return obj;
}
void ThreadCache::_deallocate(void* obj, size_t bytes)
{
if (obj && bytes <= MAX_SIZE)
{
size_t bucket_index = SizeClass::_get_index(bytes);
_free_lists[bucket_index]._push_front(obj);
}
}
void* ThreadCache::_fetch_from_central_cache(size_t bucket_index, size_t block_size)
{
// 慢啟動(dòng),取兩者的較小值
size_t adapt_num = SizeClass::_adapt_counts(block_size);
size_t fetch_count = _free_lists[bucket_index]._get_fetch_count();
size_t njobs = fetch_count < adapt_num ? fetch_count : adapt_num;
// 自增_fetch_count
if (njobs == fetch_count)
_free_lists[bucket_index]._add_fetch_count();
void* begin = nullptr;
void* end = nullptr;
CentralCache::_get_instance()->_fetch_blocks(begin, end, njobs, block_size);
if (njobs > 1)
_free_lists[bucket_index]._range_push(next(begin), end);
return begin;
}
而剛才實(shí)現(xiàn)的接口,central cache中的_fetch_blocks中還有一個(gè)接口沒(méi)有實(shí)現(xiàn):從SpanList中獲取Span。
當(dāng)SpanList下沒(méi)有一個(gè)可用Span時(shí),就要向page cache申請(qǐng)一個(gè)span(該接口也是之后實(shí)現(xiàn)),而SpanList得到的新的span需要被切分成對(duì)應(yīng)大小的內(nèi)存塊后才可用使用。內(nèi)存塊的切分由這個(gè)接口負(fù)責(zé):
Span* CentralCache::_get_span(SpanList& span_list, size_t block_size)
{
Span* cur = span_list._begin();
Span* end = span_list._end();
// 遍歷SpanList,查找是否有可用的Span
while (cur != end)
{
if (cur->_free_list)
{
// 注意不要erase(cur)
return cur;
}
cur = cur->_next;
}
// 先解桶鎖,使后續(xù)的thread cache可用訪問(wèn)(可能歸還,可能索?。?/span>
span_list._mtx.unlock();
// TODO::無(wú)可用Span,向page cache求助
Span* ret_span = TODO();
if (ret_span)
{
// 獲取到span后,切分span
void* start = (void*)(ret_span->_id << PAGE_SHIFT);
void* finish = (void*)((ret_span->_id + ret_span->_n) << PAGE_SHIFT);
ret_span->_free_list = start;
page_t pending = (ret_span->_id << PAGE_SHIFT) + block_size;
while (pending < (page_t)finish - block_size)
{
next(start) = (void*)pending;
pending += block_size;
start = next(start);
}
next(start) = nullptr;
}
// 此時(shí)要返回到_fetch_blocks函數(shù),需要加鎖
span_list._mtx.lock();
return ret_span;
}
_get_span首先會(huì)查找SpanList下是否有可用span,若一個(gè)可用span都沒(méi)有則會(huì)向page cache求助,得到新的span,將其切分后返回。
關(guān)于切分算法的實(shí)現(xiàn):
- 先定義兩個(gè)指向頭尾的void*指針,此時(shí)span的內(nèi)存地址為[start, finish)
- 然后定義一個(gè)整數(shù)pending,切分內(nèi)存塊時(shí),該數(shù)表示下一個(gè)內(nèi)存塊的地址
- 將start作為_free_list的值,然后開始將start和pending進(jìn)行鏈接
- pending每次增加一個(gè)內(nèi)存塊的大小,當(dāng)pending的值在[finish - block_size, finish)之間時(shí),以pending開始的內(nèi)存塊將越過(guò)finish,使用這塊內(nèi)存塊將導(dǎo)致非法訪問(wèn),所以此時(shí)將停止循環(huán)
- 最后,將span的最后一塊內(nèi)存塊的下一指針置空,表示鏈表的結(jié)束
其中要注意的是:void*的加減法和普通整數(shù)的加減法不同,所以pending使用page_t類型定義。
至此,將目前為止實(shí)現(xiàn)的CentralCache展示出來(lái):
#pragma once
#include "Common.hpp"
#include "PageCache.hpp"
class CentralCache
{
private:
SpanList _span_lists[MAX_SIZE];
public:
static CentralCache* _get_instance() { return &_instance; }
// 將Span維護(hù)的內(nèi)存塊返回,以兩個(gè)指針指向內(nèi)存塊的首尾
void _fetch_blocks(void*& begin, void*& end, size_t& njobs, size_t block_size);
// 獲取一個(gè)Span
Span* _get_span(SpanList& span_list, size_t block_size);
private:
CentralCache() {}
CentralCache(const CentralCache& x) = delete;
static CentralCache _instance;
};
CentralCache CentralCache::_instance;
void CentralCache::_fetch_blocks(void*& begin, void*& end, size_t& njobs, size_t block_size)
{
size_t bucket_index = SizeClass::_get_index(block_size);
_span_lists[bucket_index]._mtx.lock();
Span* span = _get_span(_span_lists[bucket_index], block_size);
if (span && span->_free_list)
{
begin = span->_free_list;
end = begin;
int real_jobs = 1;
// 找到自由鏈表的尾,并且修改njobs的值
for (size_t i = 0; next(end) && i < njobs - 1; ++i)
{
end = next(end);
++real_jobs;
}
njobs = real_jobs;
span->_used_count += njobs;
// 將begin和end間的內(nèi)存塊從Span中刪除
span->_free_list = next(end);
next(end) = nullptr;
}
_span_lists[bucket_index]._mtx.unlock();
}
Span* CentralCache::_get_span(SpanList& span_list, size_t block_size)
{
Span* cur = span_list._begin();
Span* end = span_list._end();
// 遍歷SpanList,查找是否有可用的Span
while (cur != end)
{
if (cur->_free_list)
{
// 注意不要erase(cur)
return cur;
}
cur = cur->_next;
}
// 先解桶鎖,使后續(xù)的thread cache可用訪問(wèn)(可能歸還,可能索取)
span_list._mtx.unlock();
// TODO::無(wú)可用Span,向page cache求助
Span* ret_span = TODO();
if (ret_span)
{
// 獲取到span后,切分span
void* start = (void*)(ret_span->_id << PAGE_SHIFT);
void* finish = (void*)((ret_span->_id + ret_span->_n) << PAGE_SHIFT);
ret_span->_free_list = start;
page_t pending = (ret_span->_id << PAGE_SHIFT) + block_size;
while (pending < (page_t)finish - block_size)
{
next(start) = (void*)pending;
pending += block_size;
start = next(start);
}
next(start) = nullptr;
}
// 此時(shí)要返回到_fetch_blocks函數(shù),需要加鎖
span_list._mtx.lock();
return ret_span;
}
和thread cache一樣,central cache也有一個(gè)等待實(shí)現(xiàn)的接口,因?yàn)檫@個(gè)接口也涉及page cache的實(shí)現(xiàn),所以我們需要先實(shí)現(xiàn)page cache。
page cache
page cache用來(lái)做什么?當(dāng)central cache無(wú)內(nèi)存可用時(shí),就會(huì)向page cache索取內(nèi)存。對(duì)于thread cache,當(dāng)其需要內(nèi)存塊時(shí),central cache會(huì)將內(nèi)存塊返回,而central cache向page cache申請(qǐng)的卻是span,所以central cache自己承擔(dān)了將span切分為內(nèi)存塊的工作。這個(gè)接口剛剛也實(shí)現(xiàn)過(guò)了,可以看出central cache是一個(gè)承上啟下的結(jié)構(gòu)。
關(guān)于page cache,其結(jié)構(gòu)也是一個(gè)哈希桶,也就是一個(gè)數(shù)組,數(shù)組成員指向SpanList,和central cache的結(jié)構(gòu)一樣。但是映射規(guī)則和central不一樣,它有129個(gè)桶,第1桶不使用,從第二個(gè)桶開始使用,也就是從下標(biāo)為1的位置使用數(shù)組。每個(gè)下標(biāo)對(duì)應(yīng)的是span的頁(yè)數(shù),比如下標(biāo)為5的位置,該位置的SpanList懸掛的都是大小為5頁(yè)的span。并且page cache也是一個(gè)單例,central cache也需要互斥訪問(wèn)它,總之,先搭建個(gè)大概的結(jié)構(gòu):
class PageCache
{
private:
PageCache() {}
PageCache(const PageCache& x) = delete;
public:
static PageCache* _get_instance() { return &_instance; }
private:
static PageCache _instance;
SpanList _span_lists[NPAGES];
std::recursive_mutex _rmtx;
};
PageCache PageCache::_instance;
當(dāng)central向page索取span時(shí),需要告知page自己要索取的span的頁(yè)數(shù),然后page就會(huì)找到對(duì)應(yīng)的桶,檢查該SpanList下是否有span可用。
- 若沒(méi)有span,page cache不會(huì)直接向堆區(qū)申請(qǐng)內(nèi)存,而是往后查找是否有更大的span可以使用
- 若找到了更大的span就進(jìn)行切分,將切好的span返回,剩下的span重新放入哈希桶中
- 若沒(méi)找到更大span,才會(huì)向堆區(qū)申請(qǐng)內(nèi)存
Span* PageCache::_fetch_kspan(size_t k)
{
// 獲取PageCache的span時(shí)需要加鎖
std::unique_lock<std::recursive_mutex> guard(_rmtx);
if (!_span_lists[k]._empty())
{
Span* ret_span = _span_lists[k]._pop_front();
return ret_span;
}
else
{
for (int i = k + 1; i < NPAGES; ++i)
{
// 往后找更大的span,進(jìn)行切分
if (!_span_lists[i]._empty())
{
// 找到不為空的SpanList,獲取第一個(gè)Span,該Span的大小為i個(gè)page
Span* old_span = _span_lists[i]._pop_front();
Span* ret_span = new Span;
ret_span->_n = k;
ret_span->_id = old_span->_id;
old_span->_n -= k;
old_span->_id += k;
_span_lists[old_span->_n]._push_front(old_span);
return ret_span;
}
}
// 沒(méi)有更大的Span可以用,此時(shí)向堆區(qū)申請(qǐng)一塊128page的空間
void* ptr = SystemAlloc(128);
Span* max_span = new Span;
max_span->_id = (page_t)ptr >> PAGE_SHIFT;
max_span->_n = 128;
_span_lists[128]._push_front(max_span);
return _fetch_kspan(k);
}
}
關(guān)于切分算法實(shí)現(xiàn):向后找到更大的未使用的span后,需要new一個(gè)新的span保存切分后得到的合適大小的span。切分需要修改新舊span的信息,比如起始頁(yè)號(hào)、跨越的頁(yè)數(shù)。最后將修改后的舊span重新插入到SpanList中,將切好的span返回。
當(dāng)page cache山窮水盡(沒(méi)有更大內(nèi)存塊可用時(shí)),就會(huì)調(diào)用VirtualAlloc向堆區(qū)申請(qǐng)資源(其他系統(tǒng)的調(diào)用接口不同),申請(qǐng)到一個(gè)128頁(yè)的span,將其插入到哈希桶中。本著代碼重用的原則,我們可以使函數(shù)遞歸調(diào)用自己。這個(gè)遞歸會(huì)繼續(xù)申請(qǐng)k頁(yè)的span,也就是會(huì)將128頁(yè)的span進(jìn)行切分,一個(gè)span被重新插入,一個(gè)span被返回。
所以,到現(xiàn)在這個(gè)內(nèi)存池的基本框架已經(jīng)搭建起來(lái),總結(jié)一下:
- 內(nèi)存池分為三層緩存,從堆區(qū)到線程依次是:page cache、central cache、thread cache
- 線程獲取和釋放內(nèi)存不再調(diào)用malloc和free,而是調(diào)用我們封裝的接口:tc_allocate和tc_deallocate
- tc_allocate的邏輯已經(jīng)實(shí)現(xiàn):
- 調(diào)用thread cache的_allocate申請(qǐng)內(nèi)存,該接口將返回一塊內(nèi)存塊,其中會(huì)產(chǎn)生內(nèi)碎片
- 若thread cache內(nèi)存塊不足,將調(diào)用_fetch_from_central_cache向central cache申請(qǐng)內(nèi)存塊
- central cache作為一個(gè)承上啟下的結(jié)構(gòu),其存儲(chǔ)的不是內(nèi)存塊,而是span
- central cache將通過(guò)_fetch_blocks接口響應(yīng)thread cache的_fetch_from_central_cache請(qǐng)求,將內(nèi)存塊返回
- 當(dāng)central cache無(wú)可用span,將會(huì)調(diào)用_get_span接口,向page cache申請(qǐng)span,該接口同時(shí)負(fù)責(zé)將申請(qǐng)到的span切分成內(nèi)存塊的操作
- page cache通過(guò)_fetch_kspan接口響應(yīng)central cache的_get_span請(qǐng)求,將一個(gè)k頁(yè)的span返回
- 當(dāng)page cache也無(wú)可用span時(shí),會(huì)調(diào)用系統(tǒng)接口,向系統(tǒng)申請(qǐng)堆區(qū)資源
當(dāng)然了,以上梳理的只是一個(gè)大致的流程,其中還有很多細(xì)節(jié)沒(méi)有展示。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-449018.html
這篇文章主要整理高并發(fā)內(nèi)存池的內(nèi)存申請(qǐng)邏輯,考慮到文章字?jǐn)?shù)已經(jīng)很多了,關(guān)于內(nèi)存釋放邏輯將在下一文章中展示。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-449018.html
到了這里,關(guān)于關(guān)于一個(gè)C++項(xiàng)目:高并發(fā)內(nèi)存池的開發(fā)過(guò)程(一)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!