I. Source
- MIT-6.824 2020 課程官網(wǎng)
- Lab3: KVRaft 實驗主頁
- simviso 精品付費翻譯 MIT 6.824 課程
- Paper - Raft extended version
II. My Code
- source code 的 Gitee 地址
- Lab3B: KVRaft with log compaction的 Gitee 地址
課程官網(wǎng)提供的 Lab 代碼下載地址,我沒有訪問成功,于是我從 Github 其他用戶那里 clone 到干凈的源碼,有需要可以訪問我的 Gitee 獲取
III. Motivation
Lab3A: KVRaft without log compaction 解決了分布式 KV 存儲數(shù)據(jù)庫問題,如果需要請移步此博文。而 Lab3B: KVRaft with log compaction 主要是解決存儲的日志量過大的問題,原先的 KV Server 只要一直在運行,那么 Raft 層存儲的日志條目就會一直增加。一直增加的態(tài)勢是非常糟糕的,因為內(nèi)存是有限的,撐爆內(nèi)存只是早晚的事
所以,我們設(shè)計出一種壓縮日志的機制,即是日志增長到一定程度之后,通過 snapshot 的手段記錄下 kv 數(shù)據(jù)庫的狀態(tài),并丟棄 Raft 層已存儲的日志條目,從而達到壓縮日志,節(jié)省空間的目的
其二,通過 snapshot 手段能夠使落后的 follower 更快地跟上 leader 最新的狀態(tài),leader 向落后的 follower 發(fā)送 snapshot,讓 follower 直接復(fù)制最新的 kv 表,而不是讓落后的 follower 慢慢迭代成最新的狀態(tài)。這樣可以大大提高同步的速度
IV. Solution
先梳理一下 primary server 的 snapshot 流程。首先,client 會向 primary server 發(fā)送 Get/Put 請求,server 收到請求后會將請求轉(zhuǎn)為日志條目,并追加至 Raft 層的日志中;待 Raft 集群同步完成之后,servers 會判斷 Raft 層的日志是否已達到截斷閾值(日志太長了)
這里請注意,不論是 primary server 還是 secondary server,它們都有資格制作 snapshot。在正常的同步流程中,都是讓每位 follower 自己制作 snapshot 的,而不是等待 leader 發(fā)來的 InstallSnapshot RPC 進行被動同步。這樣做的好處是減少集群中通信的開銷,只有在 follower 的 snapshot 與 leader 的相差較遠(yuǎn)的時候,leader 才會向其發(fā)送 InstallSnapshot RPC 進行 snapshot 同步
回歸正題,如果 Raft 層的日志條目太多了,則將此時的 kv 表抓取下來制作成 snapshot,并告知 Raft 層:kv 層已生成最新的 snapshot,Raft 層在收到 snapshot 之后,就可以丟掉 snapshot 點之前的日志條目了
另外,集群中的其他 secondary servers 在收到 leader 發(fā)來的 InstallSnapshot RPC 時,將會根據(jù)自己的狀態(tài)選擇是否更新 kv 表,其中也包含一些丟棄 snapshot 點之前的過期條目之類的操作
S1 - server制作/接收snapshot
和 Lab3A: KVRaft without log compaction 一樣,代碼的結(jié)構(gòu)并沒變化,只是在 kvraft/server.go:loop()
中增加了一些接收和發(fā)送 snapshot 的業(yè)務(wù)代碼,
func (kv *KVServer) loop() {
for !kv.killed() {
msg := <-kv.applyCh /* Raft 集群已同步 */
if !msg.CommandValid { /* follower 的 kv 層要抄 leader 送來的作業(yè) */
r := bytes.NewBuffer(msg.Snapshot)
d := gob.NewDecoder(r)
kv.mu.Lock()
kv.db = make(map[string]string)
kv.ack = make(map[int64]int)
d.Decode(&kv.db)
d.Decode(&kv.ack)
kv.mu.Unlock()
} else {
op := msg.Command.(Op) /* 將 Command 空接口部分強制轉(zhuǎn)換為 Op*/
idx := msg.CommandIndex /* 這是第幾條命令 */
kv.mu.Lock()
/* 準(zhǔn)備將該命令應(yīng)用到狀態(tài)機 */
if kv.isUp2Date(op.ClntId, op.CmdId) { /* 不執(zhí)行過期的命令 */
kv.updateDB(op)
kv.ack[op.ClntId] = op.CmdId /* ack 跟蹤最新的命令編號 */
}
if kv.maxraftstate != -1 && kv.rf.GetPersisterSize() > kv.maxraftstate {
w := new(bytes.Buffer)
e := gob.NewEncoder(w)
e.Encode(kv.db)
e.Encode(kv.ack)
data := w.Bytes()
kv.rf.StartSnapshot(idx, data) /* 這里不能用 goroutine,否則 server->raft 的 snapshot 不及時,進而導(dǎo)致 statesize 過大 */
}
/*
* 分流,回應(yīng) client,即繼續(xù) Get 或 PutAppend 當(dāng)中的流程,
* 最后再回復(fù) client,不然會導(dǎo)致 leader 和 follower 制作 snapshot 不同步
*/
ch, ok := kv.results[idx]
if ok { /* RPC Handler 已經(jīng)準(zhǔn)備好讀取已同步的命令了 */
select {
case <-kv.results[idx]:
default:
}
ch <- op
}
kv.mu.Unlock()
}
}
}
日志條目在 Raft 層同步完成之后,server 會判斷該條目( Raft 層傳來的命令)到底是要其接收 OR 制作 snapshot。何時接收 snapshot?無非是在 primary server 覺得 secondary server 有點落后的時候
leader 會在 Raft 層向 follower 發(fā)送 InstallSnapshot RPC,follower 收到之后進行一系列的 snapshot 更新操作,然后通過 kv 層的 secondary:目前收到了來自 primary 的最新 snapshot,望你取走,更新你的 kv 表,對應(yīng)第 5 ~ 15 行
何時制作 snapshot?在日志條目達到一定數(shù)量時,超過 maxraftstate
閾值就開始考慮制作,對應(yīng)第 26 ~ 33 行。且 maxraftstate
不為 -1,這在 Lab3: KVRaft 實驗主頁 上有過提示,
If
maxraftstate
is -1, you do not have to snapshot.maxraftstate
applies to the GOB-encoded bytes your Raft passes topersister.SaveRaftState()
.
其中的 GetPersisterSize()
是我自己增加的輔助函數(shù),定義在 raft/snapshot.go
中,
/* 輔助函數(shù),用于 kv 層感知 snapshot 閾值 */
func (rf *Raft) GetPersisterSize() int {
return rf.persister.RaftStateSize()
}
其實就是調(diào)用 persister
對象的 RaftStateSize()
方法,讓 kv 層能夠感知到當(dāng)前 Raft 層已持久化了多少日志,好做進一步的制作 snapshot 判斷
第 32 行必須立刻將制作好的 snapshot 發(fā)往 Raft 層,如果采用 goroutine 異步的手段,會有 snapshot 延遲發(fā)送的情況發(fā)生,進而導(dǎo)致日志條目數(shù)量與實際制作 snapshot 時不符
另外,制作和發(fā)送 snapshot 這件事必須在回應(yīng) client 請求(第 39 ~ 46 行)之前完成,這個順序不能亂。用正常的邏輯思考這個問題也很容易理順,即是只有在自己和集群中其他服務(wù)器同步好了之后,才會去回應(yīng)外部的請求。其中,“同步完成” 如何定義?那自然包含制作和發(fā)送 snapshot 這一套子流程啦
S2 - raft層保存snapshot
server 在將 snapshot 從 kv 層發(fā)往 Raft 層后,后者是通過 StartSnapshot()
來接收 snapshot,定義在 raft/snapshot.go
中,
func (rf *Raft) StartSnapshot(idx int, snapshot []byte) {
if rf.killed() {
return
}
rf.mu.Lock()
defer rf.mu.Unlock()
baseIdx := rf.log[0].Idx
/* 已 snapshoted 不予處理 OR 超前的錯誤 snapshot */
if idx <= baseIdx || idx > rf.commitIdx { /* 何為超前? */
return
}
/* 設(shè)置 snapshot 截斷點 */
lastIncludedIdx := idx
lastIncludedTerm := rf.log[idx-baseIdx].Term
/* 把截斷點之前的日志丟掉 */
newLog := make([]LogEntry, 0)
newLog = append(newLog, LogEntry{Idx: lastIncludedIdx, Term: lastIncludedTerm})
for i := lastIncludedIdx - baseIdx + 1; i < len(rf.log); i++ {
newLog = append(newLog, rf.log[i])
}
rf.log = newLog
/* Lab2C 持久化操作 */
rf.persist()
rf.persister.SaveSnapshot(snapshot)
}
主要的邏輯即是保存 kv 層發(fā)來的 snapshot,首先會記錄下 snapshot 的截斷點(第 16 ~ 18 行),然后將截斷點之前的日志條目統(tǒng)統(tǒng)丟掉(第 21 ~ 28 行),其中需要注意邏輯編號和物理下標(biāo)之間的轉(zhuǎn)換,這點非常重要,即 i := lastIncludedIdx-baseIdx+1
。最后,持久化一下日志條目和 snapshot 即可
S3 - leader定向發(fā)送InstallSnapshot RPC
leader 在廣播 AE 包時如果發(fā)現(xiàn)該 follower 比較落后(體現(xiàn)在 nextIdx 上),會向其發(fā)送 InstallSnapshot RPC,要求該 follower 同步最新的 snapshot。具體的業(yè)務(wù)邏輯還是 Lab2C: Persist 的 raft/raft.go:boatcastAE()
框架,
func (rf *Raft) boatcastAE() {
rf.mu.Lock()
defer rf.mu.Unlock()
/* 所有 peers 應(yīng)該收到相同的 AE 包 */
for i, _ := range rf.peers {
if i == rf.me || rf.role != Leader {
continue
}
baseIdx := rf.log[0].Idx
if rf.nextIdxs[i]-1 < baseIdx {
targs := InstallSnapshotArgs{
Term: rf.curTerm,
LeaderId: rf.me,
LastIncludedIdx: rf.log[0].Idx,
LastIncludedTerm: rf.log[0].Term,
Data: rf.persister.snapshot,
}
treply := InstallSnapshotReply{}
go func(id int, args InstallSnapshotArgs, reply InstallSnapshotReply) {
rf.sendInstallSnapshot(id, &args, &reply)
}(i, targs, treply)
} else {
/* 這一塊 RPC 初始化操作必須寫在 goroutine 之外,因為在 goroutine 內(nèi)部,鎖是失效的 */
targs := AppendEntriesArgs{
Term: rf.curTerm,
LeaderId: rf.me,
LeaderCommit: rf.commitIdx,
}
/*------------Lab2B Log Replication----------------*/
/* nextIdxs 和 prevLogIdx 都是邏輯編號 */
if rf.nextIdxs[i]-1 <= rf.lastLogIdx() { /* backup test 不加以限制可能會越界 */
targs.PrevLogIdx = rf.nextIdxs[i] - 1
} else {
targs.PrevLogIdx = rf.lastLogIdx() /* 意味著發(fā)送不含日志條目的心跳包 */
}
targs.PrevLogTerm = rf.log[targs.PrevLogIdx-baseIdx].Term
if rf.nextIdxs[i] <= rf.lastLogIdx() { /* rejoin test 不加以限制可能會越界 */
targs.Entries = make([]LogEntry, len(rf.log[rf.nextIdxs[i]-baseIdx:]))
copy(targs.Entries, rf.log[rf.nextIdxs[i]-baseIdx:])
}
treply := AppendEntriesReply{}
go func(id int, args AppendEntriesArgs, reply AppendEntriesReply) {
rf.sendAppendEntries(id, &args, &reply)
}(i, targs, treply)
}
}
}
主要是新增了發(fā)送 snapshot 的邏輯,對應(yīng)第 13 ~ 26 行。同步之前的準(zhǔn)備也很簡單,就是做一些簡單的封裝,包括 snapshot 截斷點及對應(yīng)的 term,對應(yīng)變量 tagrs
,然后就是調(diào)用 raft/snapshot.go:sendInstallSnapshot()
RPC 方法將其發(fā)走
這里需要注意的是如果是常規(guī)的發(fā)送 AE 包,接收方( appendEntries.go:AppendEntries )需要添加一段 snapshot 截斷點的邏輯,
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
/*------------Lab2C Persist---------------*/
defer rf.persist()
reply.Success = false
reply.Term = rf.curTerm
if args.Term < rf.curTerm {
return
}
/* 主要為了讓舊 leader 收到了新 leader 的心跳包后而被迫退位 */
if args.Term > rf.curTerm {
...
}
/* 心跳包只對 follower 和 candidate 管用,leader 是不會響應(yīng)它的 */
rf.heartBeatCh <- struct{}{}
rf.votedFor = args.LeaderId
/*------------Lab2B Log Replication----------------*/
baseIdx := rf.log[0].Idx
/*------------Lab3B Log Compaction----------------*/
if baseIdx > args.PrevLogIdx { /* snapshot 之前的日志條目已提交了,請勿覆蓋 */
reply.XTerm = XTermCommitted
reply.XIdx = baseIdx + 1
return
}
/* 已提交的日志條目不允許覆蓋 */
if args.PrevLogIdx < rf.commitIdx {
...
return
}
if rf.lastLogIdx() < args.PrevLogIdx { /* 違法下標(biāo),越界了 */
...
return
}
...
}
通過第 27 ~ 31 行的代碼判斷 snapshot 截斷點之前的日志條目是否已經(jīng)提交了。翻譯一下,即是如果 leader 發(fā)來的條目是 follower 已經(jīng)提交過的,那么 follower 直接拒絕并告知 leader:下次發(fā)新的來( snapshot 截斷點之后的)
S4 - follower回應(yīng)InstallSnapshot RPC請求
在 follower 收到 InstallSnapshot RPC 請求后,首先會做一些常規(guī)的判斷,包括 term 的比較等等,這些和 Lab2B: Log Replication 的 AppendEntries RPC 請求的處理邏輯相同,在這就不再贅述。定義在 raft/snapshot.go
中,
func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
reply.Term = rf.curTerm
if args.Term < rf.curTerm {
return
}
/* 主要為了讓舊 leader 收到了新 leader 的心跳包后而被迫退位 */
if args.Term > rf.curTerm {
rf.curTerm = args.Term
rf.role = Follower
rf.votedFor = NoBody
}
/* 心跳包只對 follower 和 candidate 管用,leader 是不會響應(yīng)它的 */
rf.heartBeatCh <- struct{}{}
rf.votedFor = args.LeaderId
baseIdx := rf.log[0].Idx
/* 版本較低的 snapshot 不予理會 */
if args.LastIncludedIdx <= baseIdx {
return
}
rf.truncateLog(args.LastIncludedIdx, args.LastIncludedTerm)
msg := ApplyMsg{CommandValid: false, Snapshot: args.Data}
rf.persist()
/* follower 保存 leader 發(fā)來的快照 */
rf.persister.SaveSnapshot(args.Data)
/* 不用提交 snapshot 之前的日志條目啦,但請為之后的提交做好準(zhǔn)備 */
rf.commitIdx = args.LastIncludedIdx
rf.appliedIdx = args.LastIncludedIdx
/* 將 snapshot 直接交給 kv 層,無需 commit */
rf.applyCh <- msg
}
重點講解第 21 行之后的邏輯,首先將上次 snapshot 截斷點的標(biāo)號記為 baseIdx
,如果請求所包含的 snapshot 比當(dāng)前的 baseIdx
落后,則不予理會;反之就需要考慮如何同步請求發(fā)來的 snapshot 了
當(dāng)然,第一步就是截斷 snapshot 點之前的日志條目,在這里我封裝了 truncateLog()
方法以便復(fù)用,
func (rf *Raft) truncateLog(lastIncludedIdx int, lastIncludedTerm int) {
idx := 0
/* 從日志后面往前掃,尋找 lastIncluded 條目所在的位置 */
for i := len(rf.log) - 1; i >= 0; i-- {
if rf.log[i].Idx == lastIncludedIdx && rf.log[i].Term == lastIncludedTerm {
idx = i
break
}
}
newLog := make([]LogEntry, 0)
newLog = append(newLog, LogEntry{Idx: lastIncludedIdx, Term: lastIncludedTerm})
if idx != 0 { /* 有找到 lastIncluded 條目 */
for i := idx + 1; i < len(rf.log); i++ {
newLog = append(newLog, rf.log[i])
}
}
rf.log = newLog
}
邏輯很簡單,就是從日志后面往前掃,尋找 lastIncluded
條目所在的位置。然后,以該點為開端將之后的日志條目順起來就可以了。額外需要考慮的情況是原來的日志條目中可能并不存在 lastIncluded
的條目,這個時候只需更新日志下標(biāo) 0 處的條目即可( snapshot 截斷點)
另外,為什么要新建 newLog
切片?主要是為了實現(xiàn)垃圾回收功能,這是 Golang 的 gc 機制的特色。如果還在引用 rf.log
,那么就回收不了 snapshot 截斷點之前的日志條目的空間。所以,我們必須另起爐灶(惡心的一比)
言歸正傳,follower 在截斷之后就可以將 snapshot 保存在本地了。這里需要注意的是不用再提交 snapshot 截斷點之前的日志條目了,但請為之后的 commit 做好準(zhǔn)備,對應(yīng)代碼在第 37 和 38 行更新 appliedIdx
和 commitIdx
操作。最后,就可以將包含來自于 leader 的 snapshot 數(shù)據(jù)打包發(fā)往 kv 層了
S5 - leader收到InstallSnapshot Reply
leader 在收到 follower 關(guān)于 InstallSnapshot RPC 回應(yīng)之后,同樣也是做一些常規(guī)的判斷,如同 Lab2B: Log Replication 的 AppendEntries Reply,
func (rf *Raft) sendInstallSnapshot(server int, args *InstallSnapshotArgs, reply *InstallSnapshotReply) bool {
DPrintf("[%v->%v] send install rpc in snapshot.go:sendInstallSnapshot\n", rf.me, server)
ok := rf.peers[server].Call("Raft.InstallSnapshot", args, reply)
rf.mu.Lock()
defer rf.mu.Unlock()
if !ok {
return ok
}
term := rf.curTerm
/* 自身過期的情況下 */
if rf.role != Leader || args.Term != term {
return ok
}
/* 僅僅是被動退位,不涉及到需要投票給誰 */
if reply.Term > term {
rf.curTerm = reply.Term
rf.role = Follower /* 主動回滾至 follower */
rf.votedFor = NoBody
rf.persist()
return ok
}
rf.nextIdxs[server] = args.LastIncludedIdx + 1
rf.matchIdxs[server] = rf.nextIdxs[server] - 1
return ok
}
需要關(guān)注的即是第 27 和 28 行新增的 nextIdx 調(diào)整的邏輯部分,leader 下一次再向該 follower 發(fā)送 snapshot 截斷點下一個日志條目即可
S6 - server啟動初始化
kv server 和 Raft 一樣,需要保證持久化機制,所以在 kvraft/server.go:StartKVServer()
中添加重啟之后讀取 snapshot 的操作,
func StartKVServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int) *KVServer {
// call labgob.Register on structures you want
// Go's RPC library to marshall/unmarshall.
labgob.Register(Op{})
kv := new(KVServer)
kv.me = me
kv.maxraftstate = maxraftstate
// You may need initialization code here.
/* applyCh 要是異步的,不然會阻塞 */
kv.applyCh = make(chan raft.ApplyMsg, 100)
kv.rf = raft.Make(servers, me, persister, kv.applyCh)
// You may need initialization code here.
kv.db = make(map[string]string)
kv.ack = make(map[int64]int)
kv.results = make(map[int]chan Op)
/*------------Lab3B Log Compaction----------------*/
kv.readSnapshot(kv.rf.Persister().ReadSnapshot())
go kv.loop()
return kv
}
對應(yīng)第 22 行 readSnapshot()
,它也定義在 kvraft/server.go
中,
/* 輔助函數(shù),讀取已持久化的 snapshot */
func (kv *KVServer) readSnapshot(data []byte) {
if data == nil || len(data) == 0 {
return
}
r := bytes.NewBuffer(data)
d := gob.NewDecoder(r)
var db map[string]string
var ack map[int64]int
if d.Decode(&db) != nil || d.Decode(&ack) != nil {
DPrintf("%v readSnapshot err in server.go:readSnapshot\n", kv.rf.GetId())
} else {
kv.db = db
kv.ack = ack
}
}
其中的 rf.Persister()
是我定義在 raft/raft.go
中的輔助函數(shù),它返回 Raft 的 persister 對象,
func (rf *Raft) Persister() *Persister {
return rf.persister
}
最后,還需要注意的是 Raft 層重啟初始化時需要更新 appliedIdx
的值為 snapshot 截斷點,在 raft/raft.go:readPersister
中,
func (rf *Raft) readPersist(data []byte) {
if data == nil || len(data) < 1 { // bootstrap without any state?
return
}
r := bytes.NewBuffer(data)
d := gob.NewDecoder(r)
var curTerm int
var votedFor int
var log []LogEntry
if d.Decode(&curTerm) != nil || d.Decode(&votedFor) != nil || d.Decode(&log) != nil {
DPrintf("read persist fail\n")
} else {
rf.curTerm = curTerm
rf.votedFor = votedFor
rf.log = log
}
/* restart 之后一定要將 appliedIdx 重置成 snapshot 點 */
rf.appliedIdx = rf.log[0].Idx
}
意在告訴 Raft 請勿再提交已提交過的日志條目了!至此,梳理完 KVRaft with log compaction 的整套流程
V. Result
golang 比較麻煩,它有 GOPATH 模式,也有 GOMODULE 模式,6.824-golabs-2020 采用的是 GOPATH,所以在運行之前,需要將 golang 默認(rèn)的 GOMODULE 關(guān)掉,
$ export GO111MODULE="off"
隨后,就可以進入 src/kvraft
中開始運行測試程序,
$ go test -run 3B
僅此一次的測試遠(yuǎn)遠(yuǎn)不夠,可以通過 shell 循環(huán),讓測試跑個一百次就差不多了
$ for i in {1..100}; go test -run 3B
這樣,如果還沒錯誤,那應(yīng)該是真的通過了。分布式的很多 bug 需要通過反復(fù)模擬才能復(fù)現(xiàn)出來的,它不像單線程程序那樣,永遠(yuǎn)是冪等的情況。也可以用我寫的腳本 test_3b.py,文章來源:http://www.zghlxwxcb.cn/news/detail-617259.html
import os
ntests = 100
nfails = 0
noks = 0
if __name__ == "__main__":
for i in range(ntests):
print("*************ROUND " + str(i+1) + "/" + str(ntests) + "*************")
filename = "out" + str(i+1)
os.system("go test -run 3B | tee " + filename)
with open(filename) as f:
if 'PASS' in f.read():
noks += 1
print("??ok, " + str(noks) + "/" + str(ntests))
os.system("rm " + filename)
else:
nfails += 1
print("??fails, " + str(nfails) + "/" + str(ntests))
continue
我已經(jīng)跑過一百次,無一 FAIL文章來源地址http://www.zghlxwxcb.cn/news/detail-617259.html
到了這里,關(guān)于「實驗記錄」MIT 6.824 KVRaft Lab3B With Log Compaction的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!