VersionSet
在 LevelDB 中,VersionSet
類是一個(gè)關(guān)鍵的內(nèi)部組件,負(fù)責(zé)管理數(shù)據(jù)庫的不同版本。這個(gè)類跟蹤了所有的 SSTables(排序字符串表)和它們?cè)跀?shù)據(jù)庫中的布局。每次對(duì)數(shù)據(jù)庫進(jìn)行修改時(shí)(如添加、刪除數(shù)據(jù)),LevelDB 會(huì)創(chuàng)建一個(gè)新的 Version
對(duì)象,這個(gè)對(duì)象由 VersionSet
管理。VersionSet
通過維護(hù)這些 Version
對(duì)象,可以快速地切換到數(shù)據(jù)庫的不同歷史狀態(tài),從而支持 LevelDB 的快照和壓縮操作。簡而言之,VersionSet
在 LevelDB 中扮演著版本控制和數(shù)據(jù)組織的角色,確保了數(shù)據(jù)庫操作的高效和數(shù)據(jù)的一致性。
VersionSet 接口概覽
class VersionSet {
public:
VersionSet(const std::string& dbname, const Options* options, TableCache* table_cache,
const InternalKeyComparator*);
VersionSet(const VersionSet&) = delete;
VersionSet& operator=(const VersionSet&) = delete;
~VersionSet();
// 將 VersionEdit 應(yīng)用到當(dāng)前 Version,生成新的 Version,
// 并將新 Version 加入到 VersionSet 中。
// Version N + VersionEdit = Version N+1。
Status LogAndApply(VersionEdit* edit, port::Mutex* mu) EXCLUSIVE_LOCKS_REQUIRED(mu);
// 從manifest文件中恢復(fù)數(shù)據(jù)庫的狀態(tài)。
// 在LevelDB啟動(dòng)時(shí),會(huì)調(diào)用這個(gè)方法來加載數(shù)據(jù)庫的當(dāng)前狀態(tài)。
Status Recover(bool* save_manifest);
// 返回當(dāng)前的 Version。
Version* current() const { return current_; }
// 返回當(dāng)前正在使用的 MANIFEST 文件編號(hào)。
uint64_t ManifestFileNumber() const { return manifest_file_number_; }
// 生成一個(gè)新的文件編號(hào)。
uint64_t NewFileNumber() { return next_file_number_++; }
// VersionSet::ReuseFileNumber方法用于在某些情況下重復(fù)使用SST文件編號(hào)。
// 這通常在創(chuàng)建新的SST文件但未實(shí)際使用它,然后決定刪除它的情況下發(fā)生。
// 例如,當(dāng)LevelDB進(jìn)行壓縮操作時(shí),它會(huì)創(chuàng)建新的SST文件來存儲(chǔ)壓縮后的數(shù)據(jù)。
// 然而,如果在創(chuàng)建新文件后,壓縮操作由于某種原因(如錯(cuò)誤或異常)被中斷,
// 那么新創(chuàng)建的SST文件可能就不再需要了。在這種情況下,LevelDB可以通過
// 調(diào)用VersionSet::ReuseFileNumber方法來重復(fù)使用該SST文件的編號(hào),
// 而不是浪費(fèi)一個(gè)新的編號(hào)。
void ReuseFileNumber(uint64_t file_number) {
if (next_file_number_ == file_number + 1) {
next_file_number_ = file_number;
}
}
// 返回某一層上的 SST 文件數(shù)量。
int NumLevelFiles(int level) const;
// 返回某一層上 SST 文件的總大小。
int64_t NumLevelBytes(int level) const;
// 返回當(dāng)前數(shù)據(jù)庫中最大的 Sequence Number。
uint64_t LastSequence() const { return last_sequence_; }
// 設(shè)置當(dāng)前數(shù)據(jù)庫中最大的 Sequence Number。
void SetLastSequence(uint64_t s) {
assert(s >= last_sequence_);
last_sequence_ = s;
}
// 標(biāo)記某個(gè)文件編號(hào)已經(jīng)被使用。
void MarkFileNumberUsed(uint64_t number);
// 返回當(dāng)前正在使用的 WAL 文件編號(hào)。
uint64_t LogNumber() const { return log_number_; }
// prev_log_number_ 記錄了當(dāng)前正在進(jìn)行 Compaction 的 WAL 文件編號(hào)。
// 當(dāng)開始 Compaction 時(shí),當(dāng)前的 WAL 可能仍有一些正在寫入的數(shù)據(jù)。為了
// 確保這些數(shù)據(jù)不會(huì)丟失,LevelDB 并不會(huì)立即刪除 WAL 文件。相反,它開始寫入
// 一個(gè)新的 WAL 文件,并在prev_log_number_中保留對(duì)舊日志文件(正在被壓縮的文件)的引用。
// 這樣,即使在 Compaction 過程中發(fā)生崩潰,LevelDB 仍然可以從舊的日志文件中恢復(fù)數(shù)據(jù)。
// 一旦 Compaction 完成并且舊 WAL 文件中的所有數(shù)據(jù)都安全地寫入到SST文件中,
// 舊的 WAL 文件就會(huì)被刪除,prev_log_number_被設(shè)置為零。
uint64_t PrevLogNumber() const { return prev_log_number_; }
// 選擇一個(gè)合適的 Level 和 SST 文件集合進(jìn)行 Compaction,
// 用 Compaction 對(duì)象表示這次 Compaction 所需的信息。
Compaction* PickCompaction();
// 指定 Level 和一個(gè)范圍 [begin, end],返回一個(gè) Compaction 對(duì)象,
Compaction* CompactRange(int level, const InternalKey* begin, const InternalKey* end);
// 對(duì)每一層計(jì)算 level-i 與 level-i+1 之間的 overlap bytes。
// 返回最大的 overlap bytes。
int64_t MaxNextLevelOverlappingBytes();
// 讀取 Compaction 里包含的輸入文件,創(chuàng)建一個(gè)可以遍歷這些文件的 Iterator。
Iterator* MakeInputIterator(Compaction* c);
// 判斷當(dāng)前是否需要進(jìn)行 Compaction。
bool NeedsCompaction() const {
Version* v = current_;
return (v->compaction_score_ >= 1) || (v->file_to_compact_ != nullptr);
}
// 將所有活躍的 SST 文件編號(hào)添加到 live 中。
// 活躍的 SST 指正在參與 compaction 或者未過期的 SST。
// 有些 SST 由于創(chuàng)建了快照,compaction 時(shí)沒有將其刪除。
// 快照釋放后,這些 SST 就過期了,不屬于任何一個(gè) level,但是仍然存在于磁盤上。
void AddLiveFiles(std::set<uint64_t>* live);
// 計(jì)算 key 在指定版本中的大致偏移量。
// 假設(shè)該版本的數(shù)據(jù)庫狀態(tài)如下:
// +----------------------------+
// Level-0 | SST0 SST1 SST2 |
// +----------------------------+
// Level-1 | SST3 SST4 SST5 |
// +----------------------------+
// Level-2 | ... |
// +----------------------------+
// Level-3 | ... |
// +----------------------------+
// Level-4 | ... |
// +----------------------------+
//
// 假設(shè)目標(biāo) key 是 SST4 中的第一個(gè),每個(gè) SST 的大小為 4KB,
// 則`ApproximateOffsetOf`返回的 offset 為 4 * 4KB = 16KB。
uint64_t ApproximateOffsetOf(Version* v, const InternalKey& key);
struct LevelSummaryStorage {
char buffer[100];
};
// 以可視化的方式打印每個(gè) level 的 SST 數(shù)量。
// 比如 scratch->buffer = "files[ 1, 5, 7, 9, 0, 0, 0]"
// 表示 Level-0 有 1 個(gè) SST,Level-1 有 5 個(gè) SST,以此類推。
const char* LevelSummary(LevelSummaryStorage* scratch) const;
};
VersionSet 中各個(gè)接口的實(shí)現(xiàn)
VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu)
Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
// has_log_number_ 表示該 edit 是否有對(duì)應(yīng)的 WAL
// log_number_ 為 edit 對(duì)應(yīng)的 WAL 編號(hào)
if (edit->has_log_number_) {
// 如果該 edit 是否有對(duì)應(yīng)的 WAL,
// 檢查其 WAL 編號(hào)是否合法。
assert(edit->log_number_ >= log_number_);
assert(edit->log_number_ < next_file_number_);
} else {
// 如果該 edit 沒有對(duì)應(yīng)的 WAL,
// 那么就使用當(dāng)前的 WAL 編號(hào)。
edit->SetLogNumber(log_number_);
}
// 如果該 edit 沒有對(duì)應(yīng)的 prev WAL 編號(hào),
// 那么就使用當(dāng)前的 prev WAL 編號(hào)。
if (!edit->has_prev_log_number_) {
edit->SetPrevLogNumber(prev_log_number_);
}
// 將 next_file_number_ 和 last_sequence_ 保存到 edit 中,
// 這樣 apply edit 的時(shí)候,new version 才能包含這兩個(gè)值。
edit->SetNextFile(next_file_number_);
edit->SetLastSequence(last_sequence_);
// 創(chuàng)建一個(gè)新的 Version,利用 Builder,基于當(dāng)前版本 current_,
// 把 edit 應(yīng)用到新的 Version 中。
// 簡單來說: current_ + edit = new version
Version* v = new Version(this);
{
Builder builder(this, current_);
builder.Apply(edit);
builder.SaveTo(v);
}
Finalize(v);
// 每產(chǎn)生一個(gè)新的 Version,都需要?jiǎng)?chuàng)建一個(gè)對(duì)應(yīng)的 New MANIFEST。
std::string new_manifest_file;
Status s;
// 當(dāng)數(shù)據(jù)庫是第一次被打開時(shí),descriptor_log_ 為 nullptr。
if (descriptor_log_ == nullptr) {
assert(descriptor_file_ == nullptr);
// 生成新的 MANIFEST 文件名
new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);
// 此處的 edit->SetNextFile(next_file_number_); 是多余的,
// 之前已經(jīng) set 過了。
edit->SetNextFile(next_file_number_);
// 由于數(shù)據(jù)庫是第一次打開,需要先把當(dāng)前狀態(tài)寫入到新的 MANIFEST 文件中,作為 base version。
// 狀態(tài)指的是每層 Level 都有哪些 SST 文件。
s = env_->NewWritableFile(new_manifest_file, &descriptor_file_);
if (s.ok()) {
descriptor_log_ = new log::Writer(descriptor_file_);
// 將數(shù)據(jù)庫的當(dāng)前狀態(tài)寫入到新的 MANIFEST 文件中。
s = WriteSnapshot(descriptor_log_);
}
}
{
// 只有以下 3 種情況會(huì)讀取 MANIFEST 和 CURRETN 文件:
// - 數(shù)據(jù)庫啟動(dòng)時(shí)
// - Recover 時(shí)
// - Compaction 后調(diào)用 VersionSet::LogAndApply
// 此處可以 unlock mu,是因?yàn)樯厦孢@ 3 種情況不會(huì)有 2 種同時(shí)發(fā)生。
mu->Unlock();
// 往 MANIFEST 中添加 edit。
if (s.ok()) {
std::string record;
edit->EncodeTo(&record);
s = descriptor_log_->AddRecord(record);
if (s.ok()) {
s = descriptor_file_->Sync();
}
if (!s.ok()) {
Log(options_->info_log, "MANIFEST write: %s\n", s.ToString().c_str());
}
}
// 如果 MANIFEST 是新創(chuàng)建的,還需要更新 CURRENT。
if (s.ok() && !new_manifest_file.empty()) {
s = SetCurrentFile(env_, dbname_, manifest_file_number_);
}
mu->Lock();
}
if (s.ok()) {
// 將生成的新 Version 加入 VersionSet。
AppendVersion(v);
log_number_ = edit->log_number_;
prev_log_number_ = edit->prev_log_number_;
} else {
delete v;
if (!new_manifest_file.empty()) {
delete descriptor_log_;
delete descriptor_file_;
descriptor_log_ = nullptr;
descriptor_file_ = nullptr;
env_->RemoveFile(new_manifest_file);
}
}
return s;
}
VersionSet::Builder 的實(shí)現(xiàn)
VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu)
中通過 Builder 將edit
應(yīng)用到當(dāng)前版本current_
,生成新的 Version。
Version* v = new Version(this);
{
Builder builder(this, current_);
builder.Apply(edit);
builder.SaveTo(v);
}
Finalize(v);
我們來看下 Builder 是如何根據(jù)edit
與current_
生成新 Version 的。
VersionSet::Builder 的構(gòu)造
class VersionSet::Builder {
private:
// 排序函數(shù),用于比較兩個(gè) FileMetaData 的大小。
struct BySmallestKey {
const InternalKeyComparator* internal_comparator;
bool operator()(FileMetaData* f1, FileMetaData* f2) const {
int r = internal_comparator->Compare(f1->smallest, f2->smallest);
if (r != 0) {
return (r < 0);
} else {
// Break ties by file number
return (f1->number < f2->number);
}
}
};
// 一個(gè)基于 BySmallestKey 比較函數(shù)的集合。
typedef std::set<FileMetaData*, BySmallestKey> FileSet;
struct LevelState {
std::set<uint64_t> deleted_files;
FileSet* added_files;
};
// vset_ 和 base_ 均由構(gòu)造函數(shù)初始化
VersionSet* vset_;
Version* base_;
// levels_ 中記錄了各個(gè) level 中新增與移除了哪些 SST 文件
LevelState levels_[config::kNumLevels];
public:
// 構(gòu)造函數(shù)中主要是初始化各個(gè) level 的 added_files 集合。
Builder(VersionSet* vset, Version* base) : vset_(vset), base_(base) {
base_->Ref();
BySmallestKey cmp;
cmp.internal_comparator = &vset_->icmp_;
/* 為每一層 level 初始化 FileSet */
for (int level = 0; level < config::kNumLevels; level++) {
levels_[level].added_files = new FileSet(cmp);
}
}
};
VersionSet::Builder::Apply(VersionEdit* edit)
解析edit
中的內(nèi)容,每個(gè) Level 都有哪些新增與移除的 SST 文件,然后更新VersionSet::Builder::levels_
中的deleted_files
和added_files
,等到調(diào)用VersionSet::Builder::SaveTo(Version* v)
時(shí)將VersionSet::Builder::levels_
中的內(nèi)容保存到版本v
中。
void Apply(VersionEdit* edit) {
// vset_->compact_pointer_[i] 記錄 level-i 下一次 Compaction 的起始 InternalKey。
// 下面這段 for 循環(huán)只是把 VersionEdit::compact_pointers_ 中的內(nèi)容轉(zhuǎn)換成 VersionSet::compact_pointer_。
for (size_t i = 0; i < edit->compact_pointers_.size(); i++) {
const int level = edit->compact_pointers_[i].first;
// vset_->compact_pointer_[i] 記錄 level-i 下一次 Compaction 的起始 InternalKey
vset_->compact_pointer_[level] = edit->compact_pointers_[i].second.Encode().ToString();
}
// 將 deleted files 記錄到 levels_ 中。
for (const auto& deleted_file_set_kvp : edit->deleted_files_) {
const int level = deleted_file_set_kvp.first;
const uint64_t number = deleted_file_set_kvp.second;
levels_[level].deleted_files.insert(number);
}
// 將 new files 記錄到 levels_ 中。
for (size_t i = 0; i < edit->new_files_.size(); i++) {
const int level = edit->new_files_[i].first;
FileMetaData* f = new FileMetaData(edit->new_files_[i].second);
f->refs = 1;
// 設(shè)置每個(gè) New SST 文件的 allowed_seeks。
// allowed_seeks 表示該 SST 文件允許被無效查找的次數(shù),無效查找是指
// 對(duì)該 SST 進(jìn)行了查找,但是沒有找到目標(biāo) Key。
// 每被無效查找一次,allowed_seeks 就減 1。
// allowed_seeks 為 0 時(shí),該 SST 文件就會(huì)被加入到 compaction 調(diào)度中。
// allowed_seeks 的計(jì)算方式如下: max(100, file_size / 16KB)。
f->allowed_seeks = static_cast<int>((f->file_size / 16384U));
if (f->allowed_seeks < 100) f->allowed_seeks = 100;
levels_[level].deleted_files.erase(f->number);
levels_[level].added_files->insert(f);
}
}
VersionSet::Builder::SaveTo(Version* v)
將VersionSet::Builder::levels_
中的內(nèi)容保存到版本v
中。
void SaveTo(Version* v) {
BySmallestKey cmp;
cmp.internal_comparator = &vset_->icmp_;
for (int level = 0; level < config::kNumLevels; level++) {
// base_files 是該層上原有的 SST,added_files 是該層上新增的 SST。
// 將 base_files 與 added_files 合并后,存放到 v->files_[level] 中。
//
// 對(duì)于每一層,先準(zhǔn)備好 base_files 和 added_files。
// 比如說,base_files = [1, 3, 5],added_files = [2, 4]
// 那么就應(yīng)該按照 [1, 2, 3, 4, 5] 的順序調(diào)用 MaybeAddFile 方法,將 SST 添加
// 到 v->files_[level] 中。
const std::vector<FileMetaData*>& base_files = base_->files_[level];
std::vector<FileMetaData*>::const_iterator base_iter = base_files.begin();
std::vector<FileMetaData*>::const_iterator base_end = base_files.end();
const FileSet* added_files = levels_[level].added_files;
v->files_[level].reserve(base_files.size() + added_files->size());
for (const auto& added_file : *added_files) {
for (std::vector<FileMetaData*>::const_iterator bpos =
std::upper_bound(base_iter, base_end, added_file, cmp);
base_iter != bpos; ++base_iter) {
MaybeAddFile(v, level, *base_iter);
}
MaybeAddFile(v, level, added_file);
}
for (; base_iter != base_end; ++base_iter) {
MaybeAddFile(v, level, *base_iter);
}
// 如果是 DEBUG 模式
#ifndef NDEBUG
if (level > 0) {
// 如果 level > 0,則檢查該層上的 SST 是否有 overlap。
for (uint32_t i = 1; i < v->files_[level].size(); i++) {
const InternalKey& prev_end = v->files_[level][i - 1]->largest;
const InternalKey& this_begin = v->files_[level][i]->smallest;
if (vset_->icmp_.Compare(prev_end, this_begin) >= 0) {
std::fprintf(stderr, "overlapping ranges in same level %s vs. %s\n",
prev_end.DebugString().c_str(),
this_begin.DebugString().c_str());
std::abort();
}
}
}
#endif
}
}
VersionSet::Builder::MaybeAddFile(Version* v, int level, FileMetaData* f)
如果 f 出現(xiàn)在 deleted_files 中,說明該 SST 文件已經(jīng)被刪除了,不需要添加到 v->files_[level] 中。
否則的話,將 f 添加到 v->files_[level] 中。
void MaybeAddFile(Version* v, int level, FileMetaData* f) {
if (levels_[level].deleted_files.count(f->number) > 0) {
// 如果 f 出現(xiàn)在 deleted_files 中,說明該 SST 文件已經(jīng)被刪除了,不需要添加到 v->files_[level] 中。
} else {
// 如果 f 沒有出現(xiàn)在 deleted_files 中,則將其添加到 v->files_[level] 中。
std::vector<FileMetaData*>* files = &v->files_[level];
if (level > 0 && !files->empty()) {
// Must not overlap
assert(vset_->icmp_.Compare((*files)[files->size() - 1]->largest, f->smallest) < 0);
}
f->refs++;
files->push_back(f);
}
}
VersionSet::Recover(bool* save_manifest)
VersionSet::Recover
會(huì)通過讀取 CURRENT 文件,找到當(dāng)前正在使用的 MANIFEST 文件,然后讀取 MANIFEST 文件,將數(shù)據(jù)庫恢復(fù)到上次關(guān)閉前的狀態(tài)。
如果 MANIFEST 文件大小超過閾值,無法繼續(xù)使用了,save_manifest
會(huì)被設(shè)為true
。表示當(dāng)前的 MANIFEST 文件需要被保存。
// 從數(shù)據(jù)庫中讀取 CURRENT 文件,解析 MANIFEST文件,
// 恢復(fù)數(shù)據(jù)庫的狀態(tài)(每層 Level 都有哪些 SST 文件)。
Status VersionSet::Recover(bool* save_manifest) {
struct LogReporter : public log::Reader::Reporter {
Status* status;
void Corruption(size_t bytes, const Status& s) override {
if (this->status->ok()) *this->status = s;
}
};
// 讀取 CURRENT 文件的內(nèi)容,存放到 std::string current 中。
std::string current;
Status s = ReadFileToString(env_, CurrentFileName(dbname_), ¤t);
if (!s.ok()) {
return s;
}
// 如果 CURRENT 文件為空,或者沒有以 '\n' 結(jié)尾,說明 CURRENT 文件有問題,
// 返回失敗。
if (current.empty() || current[current.size() - 1] != '\n') {
return Status::Corruption("CURRENT file does not end with newline");
}
// 去掉 current 中的 '\n'。
current.resize(current.size() - 1);
// current 中存放的是 MANIFEST 的文件名,例如 MANIFEST-000001。
// dscname 為 MANIFEST 的完整路徑。
std::string dscname = dbname_ + "/" + current;
SequentialFile* file;
// 以順序讀取的方式打開 MANIFEST 文件。
s = env_->NewSequentialFile(dscname, &file);
if (!s.ok()) {
if (s.IsNotFound()) {
return Status::Corruption("CURRENT points to a non-existent file", s.ToString());
}
return s;
}
bool have_log_number = false;
bool have_prev_log_number = false;
bool have_next_file = false;
bool have_last_sequence = false;
uint64_t next_file = 0;
uint64_t last_sequence = 0;
uint64_t log_number = 0;
uint64_t prev_log_number = 0;
Builder builder(this, current_);
int read_records = 0;
{
// 構(gòu)造一個(gè) reader,用于讀取 MANIFEST 文件。
LogReporter reporter;
reporter.status = &s;
log::Reader reader(file, &reporter, true /*checksum*/, 0 /*initial_offset*/);
Slice record;
std::string scratch;
// 讀取 MANIFEST 文件中的每一條 Record,
// 把 Record 解碼成一個(gè) VersionEdit,
// 然后調(diào)用 builder.Apply(&edit) 將 edit 應(yīng)用到 builder 中。
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
++read_records;
// 將 record 解碼成 VersionEdit。
VersionEdit edit;
s = edit.DecodeFrom(record);
if (s.ok()) {
// 檢查解碼出來的 VersionEdit.comparator 與當(dāng)前數(shù)據(jù)庫的 comparator 是否一致。
if (edit.has_comparator_ && edit.comparator_ != icmp_.user_comparator()->Name()) {
s = Status::InvalidArgument(
edit.comparator_ + " does not match existing comparator ",
icmp_.user_comparator()->Name());
}
}
// 將 VersionEdit 應(yīng)用到 builder 中。
if (s.ok()) {
builder.Apply(&edit);
}
// 記錄最新狀態(tài)下的 WAL 編號(hào)。
if (edit.has_log_number_) {
log_number = edit.log_number_;
have_log_number = true;
}
// 記錄最新狀態(tài)下的 prev WAL 編號(hào)。
if (edit.has_prev_log_number_) {
prev_log_number = edit.prev_log_number_;
have_prev_log_number = true;
}
// 記錄最新狀態(tài)下的 next_file 編號(hào)。
if (edit.has_next_file_number_) {
next_file = edit.next_file_number_;
have_next_file = true;
}
// 記錄最新狀態(tài)下的 last_sequence 編號(hào)。
if (edit.has_last_sequence_) {
last_sequence = edit.last_sequence_;
have_last_sequence = true;
}
}
}
// 讀取 MANIFEST 文件完畢,關(guān)閉文件。
delete file;
file = nullptr;
if (s.ok()) {
if (!have_next_file) {
s = Status::Corruption("no meta-nextfile entry in descriptor");
} else if (!have_log_number) {
s = Status::Corruption("no meta-lognumber entry in descriptor");
} else if (!have_last_sequence) {
s = Status::Corruption("no last-sequence-number entry in descriptor");
}
if (!have_prev_log_number) {
prev_log_number = 0;
}
// 標(biāo)記 next_file_number_ 和 log_number 都已經(jīng)被占用了。
MarkFileNumberUsed(prev_log_number);
MarkFileNumberUsed(log_number);
}
if (s.ok()) {
// 基于 MANIFEST 里的 VersionEdit列表,構(gòu)造一個(gè)新的 Version。
Version* v = new Version(this);
builder.SaveTo(v);
// 把 New Version 作為當(dāng)前 Version。
Finalize(v);
AppendVersion(v);
manifest_file_number_ = next_file;
next_file_number_ = next_file + 1;
last_sequence_ = last_sequence;
log_number_ = log_number;
prev_log_number_ = prev_log_number;
// 檢查是否可以繼續(xù)使用當(dāng)前的 MANIFEST 文件。
if (ReuseManifest(dscname, current)) {
// No need to save new manifest
} else {
// 如果不能繼續(xù)使用的話,需要把當(dāng)前的 MANIFEST 進(jìn)行保存。
*save_manifest = true;
}
} else {
std::string error = s.ToString();
Log(options_->info_log, "Error recovering version set with %d records: %s", read_records,
error.c_str());
}
return s;
}
edit.DecodeFrom(record)
的實(shí)現(xiàn)可參考大白話解析LevelDB: VersionEdit
VersionSet::current()
VersionSet::current()
的實(shí)現(xiàn)沒啥好說的,直接返回當(dāng)前正在使用的 Version,即current_
。
Version* current() const { return current_; }
VersionSet::ManifestFileNumber()
VersionSet::ManifestFileNumber()
的實(shí)現(xiàn)沒啥好說的,直接返回當(dāng)前正在使用的 MANIFEST 文件編號(hào),即manifest_file_number_
。
uint64_t ManifestFileNumber() const { return manifest_file_number_; }
VersionSet::NewFileNumber()
VersionSet::NewFileNumber()
的實(shí)現(xiàn)沒啥好說的,直接返回一個(gè)新的文件編號(hào),即next_file_number_
。
uint64_t NewFileNumber() { return next_file_number_++; }
VersionSet::ReuseFileNumber(uint64_t file_number)
VersionSet::ReuseFileNumber
方法用于在某些情況下回收 SST 文件編號(hào)。
這通常在創(chuàng)建新的SST文件但未實(shí)際使用它,然后決定刪除它的情況下發(fā)生。
例如,當(dāng)LevelDB進(jìn)行壓縮操作時(shí),它會(huì)創(chuàng)建新的SST文件來存儲(chǔ)壓縮后的數(shù)據(jù)。然而,如果在創(chuàng)建新文件后,壓縮操作由于某種原因(如錯(cuò)誤或異常)被中斷,那么新創(chuàng)建的SST文件可能就不再需要了。
在這種情況下,LevelDB可以通過調(diào)用VersionSet::ReuseFileNumber
方法來回收該 SST 文件編號(hào),而不是浪費(fèi)一個(gè)新的編號(hào)。
void ReuseFileNumber(uint64_t file_number) {
// 假設(shè)上一次 NewFileNumer() 分配的編號(hào)為 100,同時(shí)
// next_file_number_ 會(huì)更新為 101。
// 上次分出去的編號(hào) 100 用不到了,使用 ReuseFileNumber(100)
// 進(jìn)行回收,就把 next_file_number_ 回退為 100 就行。
// 這樣下次 NewFileNumer() 又會(huì)把 100 分出去。
if (next_file_number_ == file_number + 1) {
next_file_number_ = file_number;
}
}
VersionSet::NumLevelFiles(int level)
VersionSet::current_->files
中存儲(chǔ)了每一層 SST 文件的 MetaData,只需要讀取current_->files_[level].size()
即可得到該層 SST 文件的數(shù)量。
int VersionSet::NumLevelFiles(int level) const {
assert(level >= 0);
assert(level < config::kNumLevels);
return current_->files_[level].size();
}
VersionSet::NumLevelBytes(int level)
通過調(diào)用TotalFileSize(current_->files_[level])
來獲取指定level
上 SST 文件的總大小。
int64_t VersionSet::NumLevelBytes(int level) const {
assert(level >= 0);
assert(level < config::kNumLevels);
return TotalFileSize(current_->files_[level]);
}
我們繼續(xù)看TotalFileSize
的實(shí)現(xiàn):
很直觀,就是遍歷files
,將每個(gè) SST 文件的大小累加起來。
static int64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
int64_t sum = 0;
for (size_t i = 0; i < files.size(); i++) {
sum += files[i]->file_size;
}
return sum;
}
VersionSet::LastSequence()
VersionSet::LastSequence()
的實(shí)現(xiàn)沒啥好說的,直接返回當(dāng)前數(shù)據(jù)庫中最大的 Sequence Number,即last_sequence_
。
uint64_t LastSequence() const { return last_sequence_; }
VersionSet::SetLastSequence(uint64_t s)
VersionSet::SetLastSequence
的實(shí)現(xiàn)沒啥好說的,直接設(shè)置當(dāng)前數(shù)據(jù)庫中最大的 Sequence Number,即last_sequence_
。
void SetLastSequence(uint64_t s) {
assert(s >= last_sequence_);
last_sequence_ = s;
}
VersionSet::MarkFileNumberUsed(uint64_t number)
該函數(shù)的作用是標(biāo)記某個(gè)文件編號(hào)number
已經(jīng)被使用。
如果number
小于next_file_number_
,那number
肯定已經(jīng)被分出去使用了,所以啥也不需要干。
只有number > = next_file_number_
時(shí),為了防止將來NewFileNumber()
返回number
,需要把next_file_number_
撥到number + 1
,這樣 NewFileNumber()
后面就永遠(yuǎn)不會(huì)返回number
。
void VersionSet::MarkFileNumberUsed(uint64_t number) {
if (next_file_number_ <= number) {
next_file_number_ = number + 1;
}
}
VersionSet::LogNumber()
返回當(dāng)前正在使用的 WAL 編號(hào)。
實(shí)現(xiàn)沒啥好講的,直接返回log_number_
。
uint64_t LogNumber() const { return log_number_; }
VersionSet::PrevLogNumber()
和VersionSet::LogNumber
不太一樣,VersionSet::PrevLogNumber
返回的是當(dāng)前正在進(jìn)行 Compaction 的 WAL 編號(hào)。
為什么說 PrevLogNumber 是當(dāng)前正在進(jìn)行 Compaction 的 WAL 編號(hào)呢?
當(dāng)開始 Compaction 時(shí),當(dāng)前的 WAL 可能仍有一些正在寫入的數(shù)據(jù)。為了確保這些數(shù)據(jù)不會(huì)丟失,LevelDB 并不會(huì)立即刪除 WAL 文件。
相反,它開始寫入一個(gè)新的 WAL 文件,并在prev_log_number_
中保留對(duì)舊日志文件(正在被壓縮的文件)的引用。
這樣,即使在 Compaction 過程中發(fā)生崩潰,LevelDB 仍然可以從舊的日志文件中恢復(fù)數(shù)據(jù)。
一旦 Compaction 完成并且舊 WAL 文件中的所有數(shù)據(jù)都安全地寫入到SST文件中,舊的 WAL 文件就會(huì)被刪除,prev_log_number_
被設(shè)置為零。
uint64_t PrevLogNumber() const { return prev_log_number_; }
Compaction* VersionSet::PickCompaction()
返回一個(gè)Compaction
對(duì)象來表示這次 Compaction 所需的信息,主要包含:
- 要對(duì)哪個(gè) Level 進(jìn)行 Compaction
- 參與 Compaction 的 SST 文件集合
Compaction* VersionSet::PickCompaction() {
Compaction* c;
// 要進(jìn)行 Compaction 的 level。
int level;
// current_->compaction_score_ 為所有 level 中最大的 score。
// score = 當(dāng)前 level 所有 SST 總大小 / 此 level 閾值。
// current_->compaction_score 的值是在調(diào)用`VersionSet::Finalize(Version* v)`
// 的時(shí)候計(jì)算出來的。
// current->file_to_compact_ 表示某個(gè) SST 被讀取的次數(shù)已經(jīng)達(dá)到了閾值,需要進(jìn)行
// Compaction。
// current->file_to_compact_ 的值是在`Version::UpdateStats`的時(shí)候更新的。
const bool size_compaction = (current_->compaction_score_ >= 1);
const bool seek_compaction = (current_->file_to_compact_ != nullptr);
// 優(yōu)先級(jí): size_compaction > seek_compaction 。
// 如果滿足 size_compaction,那么優(yōu)先執(zhí)行 size_compaction。
if (size_compaction) {
// 獲取需要進(jìn)行 Compaction 的 level。
level = current_->compaction_level_;
assert(level >= 0);
assert(level + 1 < config::kNumLevels);
// 創(chuàng)建一個(gè) Compaction 對(duì)象 c。
c = new Compaction(options_, level);
// 從 files_[level] 里找到 compact_pointer_[level] 之后的第一個(gè) SST,
// 作為本次需要進(jìn)行 Compaction 的 SST,放到 c->inputs_[0] 中。
// compact_pointer_[level] 的含義是 level 層上一次參與 Compaction
// 的 SST 的最大 InternalKey。
for (size_t i = 0; i < current_->files_[level].size(); i++) {
FileMetaData* f = current_->files_[level][i];
if (compact_pointer_[level].empty() ||
icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) {
c->inputs_[0].push_back(f);
break;
}
}
// 上面找不到 SST,表示 level 層上還沒有 SST 參與過 Compaction,
// 那本次就選擇 level 層上的第一個(gè) SST 參與 Compaction。
if (c->inputs_[0].empty()) {
// Wrap-around to the beginning of the key space
c->inputs_[0].push_back(current_->files_[level][0]);
}
} else if (seek_compaction) {
// 不滿足 size_compaction,但滿足 seek_compaction,那么執(zhí)行 seek_compaction。
// 獲取需要進(jìn)行 Compaction 的 level。
level = current_->file_to_compact_level_;
// 創(chuàng)建一個(gè) Compaction 對(duì)象 c。
c = new Compaction(options_, level);
// 把要進(jìn)行 Compaction 的 SST 放到 c->inputs_[0] 中。
c->inputs_[0].push_back(current_->file_to_compact_);
} else {
// 不滿足 size_compaction,也不滿足 seek_compaction,
// 所以不需要進(jìn)行 Compaction。
return nullptr;
}
// 把需要進(jìn)行 Compaction 的 version 設(shè)置到 c->input_version_ 中。
c->input_version_ = current_;
// 把 c->input_version_ 的引用計(jì)數(shù)加 1,防止被銷毀。
c->input_version_->Ref();
// 如果要進(jìn)行 Compaction 的 level 是 0,那么需要把 level 0 中所有與
// 本次要進(jìn)行 Compaction 的 SST 有重疊的 SST 都放到 c->inputs_[0] 中,
// 做為本次 Compaction 的輸入。
if (level == 0) {
// 取得 inputs_[0] 中所有 SST 里的最小 InternalKey 和最大 InternalKey
InternalKey smallest, largest;
GetRange(c->inputs_[0], &smallest, &largest);
// 在 level-0 層中查找哪些 SST 與 [smallest, largest] 有重疊,
// 把它們都放到 c->inputs_[0] 中。
current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);
assert(!c->inputs_[0].empty());
}
// c->inputs_[0] 表示 level K 將要進(jìn)行 Compact 的 sst files,
// c->inputs_[1] 表示 level K+1 將要進(jìn)行 Compact 的 sst files。
// SetupOtherInputs(c) 會(huì)根據(jù) c->inputs_[0] 來確定 c->inputs_[1]。
SetupOtherInputs(c);
return c;
}
我們繼續(xù)來看下SetupOtherInputs(c)
的實(shí)現(xiàn):
void VersionSet::SetupOtherInputs(Compaction* c) {
const int level = c->level();
InternalKey smallest, largest;
// 擴(kuò)大 c->inputs_[0] 里的 SST 集合。
// 規(guī)則如下:
// 假設(shè) c->inputs_[0] 里有兩個(gè) SST,分別是 SST-1 和 SST-2。
// 如果 level 上存在一個(gè) SST,它的 smallest_user_key 與
// c_inputs_[0] 的 largest_user_key 相等,那么把這個(gè) SST
// 稱為 Boundary SST。
// 不斷的把 Boundary SST 加入到 c->inputs_[0] 中,再繼續(xù)查找
// 新的 Boundary SST,直到找不到為止。
AddBoundaryInputs(icmp_, current_->files_[level], &c->inputs_[0]);
// 獲取 c->inputs_[0] 的 key 范圍,也就是 level 層要進(jìn)行 Compaction 的范圍。
GetRange(c->inputs_[0], &smallest, &largest);
// 在 level+1 層查找與 c->inputs_[0] 有重疊的 SST,放到 c->inputs_[1] 中。
current_->GetOverlappingInputs(level + 1, &smallest, &largest, &c->inputs_[1]);
// 以同樣的方式,擴(kuò)大 c->inputs_[1] 里的 SST 集合。
AddBoundaryInputs(icmp_, current_->files_[level + 1], &c->inputs_[1]);
// 獲取整體的 compaction 范圍, [all_start, all_limit]
InternalKey all_start, all_limit;
GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
// 如果 level+1 層有需要 Compaction 的 SST,那么就嘗試把 level 層的
// Compaction SST 擴(kuò)大,但是不能導(dǎo)致 level+1 層的 Compaction SST 數(shù)量變多。
if (!c->inputs_[1].empty()) {
std::vector<FileMetaData*> expanded0;
// 找出 level 層所有與 [all_start, all_limit] 有重疊的 SST,
// 放到 expanded0 中。
current_->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0);
// 在 level 層尋找 expanded0 的 Boundary SST,放到 expanded0 中。
AddBoundaryInputs(icmp_, current_->files_[level], &expanded0);
// 計(jì)算 level 層的 Compaction SST 總大小.
const int64_t inputs0_size = TotalFileSize(c->inputs_[0]);
// 計(jì)算 level+1 層的 Compaction SST 總大小.
const int64_t inputs1_size = TotalFileSize(c->inputs_[1]);
// 計(jì)算 expanded0 的總大小。
const int64_t expanded0_size = TotalFileSize(expanded0);
// 如果 expanded0 的總大小比 c->inputs_[0] 的總大小大,并且
// expanded0 和 c->inputs_[1] 的 SST 數(shù)量總和小于閾值,
// 就可以考慮把 expanded0 作為 level 層的 Compaction SST。
if (expanded0.size() > c->inputs_[0].size() &&
inputs1_size + expanded0_size < ExpandedCompactionByteSizeLimit(options_)) {
InternalKey new_start, new_limit;
// 獲取 expanded0 的 key 范圍, [new_start, new_limit]。
GetRange(expanded0, &new_start, &new_limit);
// 根據(jù) expanded0 的范圍,獲取 level+1 層的 Compaction SST,
// 作為 expanded1。
std::vector<FileMetaData*> expanded1;
current_->GetOverlappingInputs(level + 1, &new_start, &new_limit, &expanded1);
AddBoundaryInputs(icmp_, current_->files_[level + 1], &expanded1);
// 如果 expanded1 里 SST 的數(shù)量和最開始算出來的 level+1 層的
// Compaction SST 數(shù)量一樣,那么就可以把 expanded0 和 expanded1
// 分別作為 level 和 level+1 層的 Compaction SST。
if (expanded1.size() == c->inputs_[1].size()) {
Log(options_->info_log,
"Expanding@%d %d+%d (%ld+%ld bytes) to %d+%d (%ld+%ld "
"bytes)\n",
level, int(c->inputs_[0].size()), int(c->inputs_[1].size()), long(inputs0_size),
long(inputs1_size), int(expanded0.size()), int(expanded1.size()),
long(expanded0_size), long(inputs1_size));
smallest = new_start;
largest = new_limit;
c->inputs_[0] = expanded0;
c->inputs_[1] = expanded1;
GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
}
}
}
// 計(jì)算 level+2 層所有與 [all_start, all_limit] 有重疊的 SST,
// 放到 c->grandparents_ 中。
if (level + 2 < config::kNumLevels) {
current_->GetOverlappingInputs(level + 2, &all_start, &all_limit, &c->grandparents_);
}
// 設(shè)置 compact_pointer_[level],表示 level 層下一次 Compaction 的起始位置。
compact_pointer_[level] = largest.Encode().ToString();
c->edit_.SetCompactPointer(level, largest);
}
Compaction* VersionSet::CompactRange(int level, const InternalKey* begin, const InternalKey* end)
指定 Level 和一個(gè)范圍 [begin, end],返回一個(gè) Compaction 對(duì)象。
Compaction* VersionSet::CompactRange(int level, const InternalKey* begin, const InternalKey* end) {
// 獲取 level 層所有與 [begin, end] 有重疊的 SST,放到 inputs 中。
std::vector<FileMetaData*> inputs;
current_->GetOverlappingInputs(level, begin, end, &inputs);
if (inputs.empty()) {
return nullptr;
}
// 對(duì)于非 level-0 層,需要檢查下 inputs 里 SST 文件大小的總和。
// 如果 inputs 里 SST 文件大小的總和超過了閾值,那么就需要把 inputs
// 進(jìn)行裁剪,只保留一部分 SST。
if (level > 0) {
const uint64_t limit = MaxFileSizeForLevel(options_, level);
uint64_t total = 0;
for (size_t i = 0; i < inputs.size(); i++) {
uint64_t s = inputs[i]->file_size;
total += s;
if (total >= limit) {
inputs.resize(i + 1);
break;
}
}
}
// 把 inputs 里的 SST 放到 c->inputs_[0] 中。
Compaction* c = new Compaction(options_, level);
c->input_version_ = current_;
c->input_version_->Ref();
c->inputs_[0] = inputs;
// 根據(jù) c->inputs_[0] 來填充 c->inputs_[1]。
SetupOtherInputs(c);
return c;
}
VersionSet::MaxNextLevelOverlappingBytes()
計(jì)算每個(gè) level 上的每個(gè) SST 文件與下一層 level 的 overlap bytes,返回最大的那個(gè) overlap bytes。
int64_t VersionSet::MaxNextLevelOverlappingBytes() {
int64_t result = 0;
std::vector<FileMetaData*> overlaps;
// 計(jì)算每個(gè) level 上的每個(gè) SST 文件與下一層 level 的 overlap bytes,
// 返回最大的那個(gè) overlap bytes。
for (int level = 1; level < config::kNumLevels - 1; level++) {
for (size_t i = 0; i < current_->files_[level].size(); i++) {
const FileMetaData* f = current_->files_[level][i];
current_->GetOverlappingInputs(level + 1, &f->smallest, &f->largest, &overlaps);
const int64_t sum = TotalFileSize(overlaps);
if (sum > result) {
result = sum;
}
}
}
return result;
}
VersionSet::MakeInputIterator(Compaction* c)
讀取Compaction* c
里包含的輸入文件,創(chuàng)建一個(gè)可以遍歷這些文件的 Iterator。
Iterator* VersionSet::MakeInputIterator(Compaction* c) {
ReadOptions options;
options.verify_checksums = options_->paranoid_checks;
options.fill_cache = false;
// 如果要進(jìn)行 Compaction 的 level 是 0,那么需要把 level 0 中每個(gè) SST 都
// 要?jiǎng)?chuàng)建一個(gè) Iterator,level + 1 層只需要?jiǎng)?chuàng)建一個(gè) Concatenating Iterator。
// 所以需要的 Iterator 數(shù)量為 c->inputs_[0].size() + 1。
// 如果要進(jìn)行 Compaction 的 level 不是 0,那么只需要 2 個(gè) Concatenating
// Iterator。
const int space = (c->level() == 0 ? c->inputs_[0].size() + 1 : 2);
Iterator** list = new Iterator*[space];
int num = 0;
for (int which = 0; which < 2; which++) {
if (!c->inputs_[which].empty()) {
if (c->level() + which == 0) {
// 如果 Compaction level 是 0,并且 which 為 0。
// 為每個(gè) level 0 的 SST 創(chuàng)建一個(gè) Iterator。
const std::vector<FileMetaData*>& files = c->inputs_[which];
for (size_t i = 0; i < files.size(); i++) {
list[num++] =
table_cache_->NewIterator(options, files[i]->number, files[i]->file_size);
}
} else {
// 為當(dāng)前 level 創(chuàng)建一個(gè) Concatenating Iterator。
list[num++] = NewTwoLevelIterator(
new Version::LevelFileNumIterator(icmp_, &c->inputs_[which]), &GetFileIterator,
table_cache_, options);
}
}
}
assert(num <= space);
// 把 list 里的 Iterator 合并成一個(gè) MergingIterator。
Iterator* result = NewMergingIterator(&icmp_, list, num);
delete[] list;
return result;
}
VersionSet::NeedsCompaction()
判斷當(dāng)前是否需要進(jìn)行 Compaction。
只需要看下current_->compaction_score_
是否大于等于 1,或者current_->file_to_compact_
是否不為空即可。
bool NeedsCompaction() const {
Version* v = current_;
return (v->compaction_score_ >= 1) || (v->file_to_compact_ != nullptr);
}
VersionSet::AddLiveFiles(std::set<uint64_t>* live)
遍歷所有 Version,將每個(gè) level 上的 SST 文件編號(hào)都加入到live
中。
void VersionSet::AddLiveFiles(std::set<uint64_t>* live) {
for (Version* v = dummy_versions_.next_; v != &dummy_versions_; v = v->next_) {
for (int level = 0; level < config::kNumLevels; level++) {
const std::vector<FileMetaData*>& files = v->files_[level];
for (size_t i = 0; i < files.size(); i++) {
live->insert(files[i]->number);
}
}
}
}
VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& key)
計(jì)算key
在指定版本v
中的大致偏移量。
假設(shè)該版本的數(shù)據(jù)庫狀態(tài)如下:文章來源:http://www.zghlxwxcb.cn/news/detail-803665.html
+----------------------------+
Level-0 | SST0 SST1 SST2 |
+----------------------------+
Level-1 | SST3 SST4 SST5 |
+----------------------------+
Level-2 | ... |
+----------------------------+
Level-3 | ... |
+----------------------------+
Level-4 | ... |
+----------------------------+
假設(shè)目標(biāo)key
是 SST4 中的第一個(gè),每個(gè) SST 的大小為 4KB,則 offset 為
4
?
4
K
B
=
16
K
B
4 * 4KB = 16KB
4?4KB=16KB。文章來源地址http://www.zghlxwxcb.cn/news/detail-803665.html
uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) {
uint64_t result = 0;
for (int level = 0; level < config::kNumLevels; level++) {
const std::vector<FileMetaData*>& files = v->files_[level];
for (size_t i = 0; i < files.size(); i++) {
// 遍歷版本 v 的每個(gè) SST file
if (icmp_.Compare(files[i]->largest, ikey) <= 0) {
// 當(dāng)前 SST 文件所有的 key 都小于 ikey,所以直接把
// 整個(gè) SST 文件的大小加到 result 中。
result += files[i]->file_size;
} else if (icmp_.Compare(files[i]->smallest, ikey) > 0) {
// 當(dāng)前 SST 文件所有的 key 都大于 ikey,忽略該文件。
if (level > 0) {
// 如果當(dāng)前在非 level-0 上,則該 level 上的 SST 是有序且不重合的,
// 碰到一個(gè) SST 已經(jīng)整體大于 ikey,那么后面的 SST 都不會(huì)包含 ikey,
// 可以不用往后看了。
break;
}
} else {
// 當(dāng)前 SST 文件包含 ikey,計(jì)算 ikey 在 SST 文件中的大致偏移量。
Table* tableptr;
Iterator* iter = table_cache_->NewIterator(ReadOptions(), files[i]->number,
files[i]->file_size, &tableptr);
if (tableptr != nullptr) {
result += tableptr->ApproximateOffsetOf(ikey.Encode());
}
delete iter;
}
}
}
return result;
}
到了這里,關(guān)于大白話解析LevelDB: VersionSet的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!