Raft算法之日志復(fù)制
一、日志復(fù)制大致流程
在Leader選舉過程中,集群最終會(huì)選舉出一個(gè)Leader節(jié)點(diǎn),而集群中剩余的其他節(jié)點(diǎn)將會(huì)成為Follower節(jié)點(diǎn)。Leader節(jié)點(diǎn)除了向Follower節(jié)點(diǎn)發(fā)送心跳消息,還會(huì)處理客戶端的請求,并將客戶端的更新操作以消息(Append Entries消息)的形式發(fā)送到集群中所有的Follower節(jié)點(diǎn)。當(dāng)Follower節(jié)點(diǎn)記錄收到的這些消息之后,會(huì)向Leader節(jié)點(diǎn)返回相應(yīng)的響應(yīng)消息。當(dāng)Leader節(jié)點(diǎn)在收到半數(shù)以上的Follower節(jié)點(diǎn)的響應(yīng)消息之后,會(huì)對客戶端的請求進(jìn)行應(yīng)答。最后,Leader會(huì)提交客戶端的更新操作,該過程會(huì)發(fā)送Append Entries消息到Follower節(jié)點(diǎn),通知Follower節(jié)點(diǎn)該操作已經(jīng)提交,同時(shí)Leader節(jié)點(diǎn)和Follower節(jié)點(diǎn)也就可以將該操作應(yīng)用到自己的狀態(tài)機(jī)中。
參考資料:https://blog.csdn.net/qq_43949280/article/details/122669244
二、ETCD中raft模塊的日志復(fù)制
2.1 消息的發(fā)送
前文中提到Leader節(jié)點(diǎn)會(huì)處理客戶端的更新操作,這就是閱讀代碼的入口。
ETCD代碼中除了有raft模塊,還有一個(gè)raftexample模塊,是對raft模塊的使用示例,該模塊位置如下:
看完這個(gè)模塊的文件,覺得處理數(shù)據(jù)存儲(chǔ)的入口應(yīng)該在kvstore.go
文件中。在這個(gè)文件中有一個(gè)newKVStore(...)
方法,如果要使用使用kvstore
結(jié)構(gòu)體的話,肯定會(huì)調(diào)用newKVStore(...)
方法。
我們來看看這個(gè)方法的調(diào)用點(diǎn):
可以確定調(diào)用點(diǎn)只在main.go
文件中,如下所示:
// contrib/raftexample/main.go文件
func main() {
cluster := flag.String("cluster", "http://127.0.0.1:9021", "comma separated cluster peers")
id := flag.Int("id", 1, "node ID")
kvport := flag.Int("port", 9121, "key-value server port")
join := flag.Bool("join", false, "join an existing cluster")
flag.Parse()
proposeC := make(chan string)
defer close(proposeC)
confChangeC := make(chan raftpb.ConfChange)
defer close(confChangeC)
// raft provides a commit stream for the proposals from the http api
var kvs *kvstore // 定義kvstore
getSnapshot := func() ([]byte, error) { return kvs.getSnapshot() }
commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC)
kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC) // ref-1 創(chuàng)建kvstore
// the key-value http handler will propose updates to raft
serveHttpKVAPI(kvs, *kvport, confChangeC, errorC) // ref-2 使用kvstore
}
我們接著看ref-2處使用kvstore
的函數(shù)serveHttpKVAPI(...)
的細(xì)節(jié),如下所示:
// serveHttpKVAPI starts a key-value server with a GET/PUT API and listens.
func serveHttpKVAPI(kv *kvstore, port int, confChangeC chan<- raftpb.ConfChange, errorC <-chan error) {
srv := http.Server{ // 創(chuàng)建http server
Addr: ":" + strconv.Itoa(port),
Handler: &httpKVAPI{
store: kv, // 把前面提到的kvstore 賦值給httpKVAPI的成員字段store
confChangeC: confChangeC,
},
}
go func() { // 開啟http server
if err := srv.ListenAndServe(); err != nil {
log.Fatal(err)
}
}()
// exit when raft goes down
if err, ok := <-errorC; ok {
log.Fatal(err)
}
}
現(xiàn)在的關(guān)鍵是httpKVAPI
類型,它基于由raft支撐的key-value存儲(chǔ)來處理http請求,下面是該類型細(xì)節(jié):
// contrib/raftexample/httpapi.go文件
// Handler for a http based key-value store backed by raft
type httpKVAPI struct {
store *kvstore
confChangeC chan<- raftpb.ConfChange
}
func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
key := r.RequestURI
defer r.Body.Close()
switch {
case r.Method == "PUT": // ref-3 設(shè)置鍵值對時(shí),是用的put方法,在該模塊的reademe文件有提到。
v, err := ioutil.ReadAll(r.Body) // 讀取客戶端傳遞過來的body
if err != nil {
log.Printf("Failed to read on PUT (%v)\n", err)
http.Error(w, "Failed on PUT", http.StatusBadRequest)
return
}
h.store.Propose(key, string(v)) // ref-4 kvstore處理存儲(chǔ)鍵值對
// Optimistic-- no waiting for ack from raft. Value is not yet
// committed so a subsequent GET on the key may return old value
w.WriteHeader(http.StatusNoContent)
case r.Method == "GET":
if v, ok := h.store.Lookup(key); ok {
w.Write([]byte(v))
} else {
http.Error(w, "Failed to GET", http.StatusNotFound)
}
case r.Method == "POST":
url, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Printf("Failed to read on POST (%v)\n", err)
http.Error(w, "Failed on POST", http.StatusBadRequest)
return
}
nodeId, err := strconv.ParseUint(key[1:], 0, 64)
if err != nil {
log.Printf("Failed to convert ID for conf change (%v)\n", err)
http.Error(w, "Failed on POST", http.StatusBadRequest)
return
}
cc := raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
NodeID: nodeId,
Context: url,
}
h.confChangeC <- cc
// As above, optimistic that raft will apply the conf change
w.WriteHeader(http.StatusNoContent)
case r.Method == "DELETE":
nodeId, err := strconv.ParseUint(key[1:], 0, 64)
if err != nil {
log.Printf("Failed to convert ID for conf change (%v)\n", err)
http.Error(w, "Failed on DELETE", http.StatusBadRequest)
return
}
cc := raftpb.ConfChange{
Type: raftpb.ConfChangeRemoveNode,
NodeID: nodeId,
}
h.confChangeC <- cc
// As above, optimistic that raft will apply the conf change
w.WriteHeader(http.StatusNoContent)
default:
w.Header().Set("Allow", "PUT")
w.Header().Add("Allow", "GET")
w.Header().Add("Allow", "POST")
w.Header().Add("Allow", "DELETE")
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
}
}
終于在ref-4
處看到了kvstore處理存儲(chǔ)鍵值對的入口,就是Propose(...)
方法。下面是該方法的細(xì)節(jié):
// contrib/raftexample/kvstore.go文件
func (s *kvstore) Propose(k string, v string) {
var buf bytes.Buffer
// 對key-value數(shù)據(jù)進(jìn)行編碼,存儲(chǔ)到buf中
if err := gob.NewEncoder(&buf).Encode(kv{k, v}); err != nil {
log.Fatal(err)
}
s.proposeC <- buf.String() // 將buf中的數(shù)據(jù)傳遞過channel
}
在代碼中可以看到把數(shù)據(jù)傳遞給了proposeC這個(gè)channel,現(xiàn)在的關(guān)鍵就是找出來哪兒在從這個(gè)channel讀取數(shù)據(jù)。
首先找到proposeC
字段所在的類型定義,然后查看proposeC字段的使用點(diǎn),可以看到它是在創(chuàng)建kvstore類型變量的時(shí)候傳遞進(jìn)來的一個(gè)channel。
[外鏈圖片轉(zhuǎn)存失敗,源站可能有防盜鏈機(jī)制,建議將圖片保存下來直接上傳(img-JEILdB8q-1689314736004)(images/image-20230707171843285.png)]
接著跟蹤,可以發(fā)現(xiàn)這個(gè)channel是newKVStore(...)
函數(shù)的一個(gè)入?yún)?,這個(gè)函數(shù)我們在一開始的時(shí)候分析過。
[外鏈圖片轉(zhuǎn)存失敗,源站可能有防盜鏈機(jī)制,建議將圖片保存下來直接上傳(img-5GV6iAJv-1689314736005)(images/image-20230707172103285.png)]
我們重新回到ref-1
處的代碼,看看newKVStore調(diào)用是怎么傳遞這個(gè)關(guān)鍵channel的:
// contrib/raftexample/main.go文件
func main() {
cluster := flag.String("cluster", "http://127.0.0.1:9021", "comma separated cluster peers")
id := flag.Int("id", 1, "node ID")
kvport := flag.Int("port", 9121, "key-value server port")
join := flag.Bool("join", false, "join an existing cluster")
flag.Parse()
proposeC := make(chan string) // 創(chuàng)建proposeC
defer close(proposeC)
confChangeC := make(chan raftpb.ConfChange)
defer close(confChangeC)
// raft provides a commit stream for the proposals from the http api
var kvs *kvstore // 定義kvstore
getSnapshot := func() ([]byte, error) { return kvs.getSnapshot() }
commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC) // ref-5 傳遞proposeC給raft的node
kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC) // ref-1 創(chuàng)建kvstore
// the key-value http handler will propose updates to raft
serveHttpKVAPI(kvs, *kvport, confChangeC, errorC) // ref-2 使用kvstore
}
現(xiàn)在可以斷定proposeC這個(gè)channel的數(shù)據(jù)讀取就在ref-5
處代碼調(diào)用的newRaftNode(...)
里面,代碼如下所示:
// contrib/raftexample/raft.go 文件
// newRaftNode initiates a raft instance and returns a committed log entry
// channel and error channel. Proposals for log updates are sent over the
// provided the proposal channel. All log entries are replayed over the
// commit channel, followed by a nil message (to indicate the channel is
// current), then new log entries. To shutdown, close proposeC and read errorC.
func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error), proposeC <-chan string,
confChangeC <-chan raftpb.ConfChange) (<-chan *commit, <-chan error, <-chan *snap.Snapshotter) {
commitC := make(chan *commit)
errorC := make(chan error)
rc := &raftNode{
proposeC: proposeC, // ref-6 proposeC賦值給字段proposeC
confChangeC: confChangeC,
commitC: commitC,
errorC: errorC,
id: id,
peers: peers,
join: join,
waldir: fmt.Sprintf("raftexample-%d", id),
snapdir: fmt.Sprintf("raftexample-%d-snap", id),
getSnapshot: getSnapshot,
snapCount: defaultSnapshotCount,
stopc: make(chan struct{}),
httpstopc: make(chan struct{}),
httpdonec: make(chan struct{}),
logger: zap.NewExample(),
snapshotterReady: make(chan *snap.Snapshotter, 1),
// rest of structure populated after WAL replay
}
go rc.startRaft()
return commitC, errorC, rc.snapshotterReady
}
我們接著跟raftNode
中proposeC
調(diào)用點(diǎn),從下圖中可以看到讀取proposeC數(shù)據(jù)點(diǎn)只有一個(gè)。
我們接著看讀取數(shù)據(jù)的具體代碼:
// contrib/raftexample/raft.go文件
func (rc *raftNode) serveChannels() {
snap, err := rc.raftStorage.Snapshot()
if err != nil {
panic(err)
}
rc.confState = snap.Metadata.ConfState
rc.snapshotIndex = snap.Metadata.Index
rc.appliedIndex = snap.Metadata.Index
defer rc.wal.Close()
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
// send proposals over raft
go func() {
confChangeCount := uint64(0)
for rc.proposeC != nil && rc.confChangeC != nil {
select {
case prop, ok := <-rc.proposeC: // 讀取鍵值對數(shù)據(jù)
if !ok {
rc.proposeC = nil
} else {
// blocks until accepted by raft state machine
rc.node.Propose(context.TODO(), []byte(prop)) // ref-7 處理客戶端寫入的鍵值對
}
case cc, ok := <-rc.confChangeC:
if !ok {
rc.confChangeC = nil
} else {
confChangeCount++
cc.ID = confChangeCount
rc.node.ProposeConfChange(context.TODO(), cc)
}
}
}
// client closed channel; shutdown raft if not already
close(rc.stopc)
}()
...... // 省略
}
我們接著看ref-7
處raftNode
是怎么處理鍵值對寫入的,由于Node是一個(gè)接口,我們需要看看這個(gè)Propose(...)
方法的實(shí)現(xiàn):
[外鏈圖片轉(zhuǎn)存失敗,源站可能有防盜鏈機(jī)制,建議將圖片保存下來直接上傳(img-TQpZJlzu-1689314736006)(images/image-20230707173531638.png)]
可以看到在raft模塊中只有一個(gè)實(shí)現(xiàn),在node.go
文件中,如下所示:
// raft/node.go文件
func (n *node) Propose(ctx context.Context, data []byte) error {
return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
}
可以看到,把數(shù)據(jù)放入到了pb.Entry
中,并且將pb.Message
的消息類型設(shè)置為了pb.MsgProp
。我們接著看stepWait(...)
方法:
// raft/node.go 文件
func (n *node) stepWait(ctx context.Context, m pb.Message) error {
return n.stepWithWaitOption(ctx, m, true)
}
// 進(jìn)入到使用消息的狀態(tài)機(jī)中。
// Step advances the state machine using msgs. The ctx.Err() will be returned,
// if any.
func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error {
if m.Type != pb.MsgProp { // 如果消息類型不是pb.MsgProp
select {
case n.recvc <- m:
return nil
case <-ctx.Done():
return ctx.Err()
case <-n.done:
return ErrStopped
}
}
ch := n.propc // 賦值channel
pm := msgWithResult{m: m} // 依據(jù)消息構(gòu)建msgWithResult類型變量
if wait { // 上游傳遞是true
pm.result = make(chan error, 1) // 創(chuàng)建接收處理結(jié)果的channel
}
select {
case ch <- pm: // ref-7 將構(gòu)建的消息發(fā)送出去
if !wait {
return nil
}
case <-ctx.Done():
return ctx.Err()
case <-n.done:
return ErrStopped
}
select {
case err := <-pm.result: // ref-8 等待處理結(jié)果
if err != nil {
return err
}
case <-ctx.Done():
return ctx.Err()
case <-n.done:
return ErrStopped
}
return nil
}
ref-7處是在將消息發(fā)送出去,那么現(xiàn)在的關(guān)鍵就是消息在哪兒讀取的呢?
[外鏈圖片轉(zhuǎn)存失敗,源站可能有防盜鏈機(jī)制,建議將圖片保存下來直接上傳(img-HhXH0t68-1689314736006)(images/image-20230707174633031.png)]
依據(jù)調(diào)用點(diǎn)信息,我們找到如下使用propc的地方:
// raft/node.go文件
func (n *node) run() {
var propc chan msgWithResult
var readyc chan Ready
var advancec chan struct{}
var rd Ready
r := n.rn.raft
lead := None
for {
if advancec != nil {
readyc = nil
} else if n.rn.HasReady() {
// Populate a Ready. Note that this Ready is not guaranteed to
// actually be handled. We will arm readyc, but there's no guarantee
// that we will actually send on it. It's possible that we will
// service another channel instead, loop around, and then populate
// the Ready again. We could instead force the previous Ready to be
// handled first, but it's generally good to emit larger Readys plus
// it simplifies testing (by emitting less frequently and more
// predictably).
rd = n.rn.readyWithoutAccept()
readyc = n.readyc
}
if lead != r.lead {
if r.hasLeader() {
if lead == None {
r.logger.Infof("raft.node: %x elected leader %x at term %d", r.id, r.lead, r.Term)
} else {
r.logger.Infof("raft.node: %x changed leader from %x to %x at term %d", r.id, lead, r.lead, r.Term)
}
propc = n.propc // ref-9 將節(jié)點(diǎn)的propc賦值給變量propc
} else {
r.logger.Infof("raft.node: %x lost leader %x at term %d", r.id, lead, r.Term)
propc = nil
}
lead = r.lead
}
select {
// TODO: maybe buffer the config propose if there exists one (the way
// described in raft dissertation)
// Currently it is dropped in Step silently.
case pm := <-propc: // 讀取propc中的數(shù)據(jù)
m := pm.m // 將pb.Message取出來
m.From = r.id
err := r.Step(m) // ref-9
if pm.result != nil {
pm.result <- err
close(pm.result)
}
...... // 省略其他case
case <-advancec:
n.rn.Advance(rd)
rd = Ready{}
advancec = nil
case c := <-n.status:
c <- getStatus(r)
case <-n.stop:
close(n.done)
return
}
}
該run()方法在前一篇博文中分析過,在此就不在贅述。我們接著看ref-9
處是如何在step(...)
方法中處理消息的:
// raft/raft.go文件
func (r *raft) Step(m pb.Message) error {
// Handle the message term, which may result in our stepping down to a follower.
switch { // 處理消息的任期數(shù)據(jù)
case m.Term == 0: // 由于前面的數(shù)據(jù)都沒有設(shè)置term,所以會(huì)走這個(gè)case
// local message
case m.Term > r.Term:
...... // 省略
case m.Term < r.Term:
...... // 省略
}
switch m.Type {
case pb.MsgHup:
...... // 省略
case pb.MsgVote, pb.MsgPreVote:
...... // 省略
default:
err := r.step(r, m) // ref-10 處理消息
if err != nil {
return err
}
}
return nil
}
我們繼續(xù)看ref-10
處是如何處理消息的,下面是該函數(shù)的訪問點(diǎn):
我們知道當(dāng)前分析的是Leader節(jié)點(diǎn),所以可以直接鎖定唯一調(diào)用點(diǎn)就是將stepLeader
賦值給r.step
,代碼如下所示:
// raft/raft.go文件
func (r *raft) becomeLeader() {
// TODO(xiangli) remove the panic when the raft implementation is stable
if r.state == StateFollower {
panic("invalid transition [follower -> leader]")
}
r.step = stepLeader // ref-11 將stepLeader賦值給step字段
r.reset(r.Term)
r.tick = r.tickHeartbeat
r.lead = r.id
r.state = StateLeader
...... // 省略
}
現(xiàn)在的關(guān)鍵就是stepLeader函數(shù)了。becomeLeader
在上一篇博客中也提到過。下面我們接著看stepLeader函數(shù)細(xì)節(jié):
// raft/raft.go文件
func stepLeader(r *raft, m pb.Message) error {
// These message types do not require any progress for m.From.
switch m.Type {
case pb.MsgBeat:
...... // 省略
return nil
case pb.MsgCheckQuorum:
...... // 省略
return nil
case pb.MsgProp: // 依據(jù)前文閱讀代碼,消息類型是MsgProp,所以會(huì)走這個(gè)分支
if len(m.Entries) == 0 {
r.logger.Panicf("%x stepped empty MsgProp", r.id)
}
if r.prs.Progress[r.id] == nil {
// If we are not currently a member of the range (i.e. this node
// was removed from the configuration while serving as leader),
// drop any new proposals.
return ErrProposalDropped
}
if r.leadTransferee != None {
r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
return ErrProposalDropped
}
for i := range m.Entries {
e := &m.Entries[i]
var cc pb.ConfChangeI
if e.Type == pb.EntryConfChange { // 如果是配置改變
var ccc pb.ConfChange
if err := ccc.Unmarshal(e.Data); err != nil {
panic(err)
}
cc = ccc
} else if e.Type == pb.EntryConfChangeV2 { // 如果是配置改變的V2版本
var ccc pb.ConfChangeV2
if err := ccc.Unmarshal(e.Data); err != nil {
panic(err)
}
cc = ccc
}
if cc != nil {
alreadyPending := r.pendingConfIndex > r.raftLog.applied
alreadyJoint := len(r.prs.Config.Voters[1]) > 0
wantsLeaveJoint := len(cc.AsV2().Changes) == 0
var refused string
if alreadyPending {
refused = fmt.Sprintf("possible unapplied conf change at index %d (applied to %d)", r.pendingConfIndex, r.raftLog.applied)
} else if alreadyJoint && !wantsLeaveJoint {
refused = "must transition out of joint config first"
} else if !alreadyJoint && wantsLeaveJoint {
refused = "not in joint state; refusing empty conf change"
}
if refused != "" {
r.logger.Infof("%x ignoring conf change %v at config %s: %s", r.id, cc, r.prs.Config, refused)
m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
} else {
r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
}
}
}
if !r.appendEntry(m.Entries...) { // ref-13 將entry數(shù)據(jù)追加到raftlog中
return ErrProposalDropped
}
r.bcastAppend() // ref-12 將entry數(shù)據(jù)廣播到其他節(jié)點(diǎn)上
return nil
case pb.MsgReadIndex:
...... // 省略
return nil
}
// All other message types require a progress for m.From (pr).
pr := r.prs.Progress[m.From]
if pr == nil {
r.logger.Debugf("%x no progress available for %x", r.id, m.From)
return nil
}
switch m.Type {
...... // 省略
}
ref-12處的代碼是我們的關(guān)注點(diǎn),接著看看數(shù)據(jù)是怎么廣播出去的:
// raft/raft.go文件
// bcastAppend sends RPC, with entries to all peers that are not up-to-date
// according to the progress recorded in r.prs.
func (r *raft) bcastAppend() {
// r.prs字段記錄著其他節(jié)點(diǎn)的信息。這個(gè)visit方法就是遍歷其他所有節(jié)點(diǎn),然后發(fā)送信息
r.prs.Visit(func(id uint64, _ *tracker.Progress) {
if id == r.id {
return
}
r.sendAppend(id) // ref-14 發(fā)送數(shù)據(jù)給其他節(jié)點(diǎn)
})
}
我們接著看看怎么發(fā)送數(shù)據(jù)給其他節(jié)點(diǎn)的:
// raft/raft.go 文件
// sendAppend sends an append RPC with new entries (if any) and the
// current commit index to the given peer.
func (r *raft) sendAppend(to uint64) {
r.maybeSendAppend(to, true)
}
// maybeSendAppend sends an append RPC with new entries to the given peer,
// if necessary. Returns true if a message was sent. The sendIfEmpty
// argument controls whether messages with no entries will be sent
// ("empty" messages are useful to convey updated Commit indexes, but
// are undesirable when we're sending multiple messages in a batch).
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
pr := r.prs.Progress[to]
if pr.IsPaused() {
return false
}
m := pb.Message{}
m.To = to
// 從r.raftlog中獲取任期和entry數(shù)據(jù)。這個(gè)地方就和前面往r.raftlog中存入日志呼應(yīng)起來了。
term, errt := r.raftLog.term(pr.Next - 1)
ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
if len(ents) == 0 && !sendIfEmpty {
return false
}
if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
...... // 省略對錯(cuò)誤情況的處理
} else {
// 組裝要發(fā)送的消息
m.Type = pb.MsgApp // 注意這個(gè)消息類型是pb.MsgApp
m.Index = pr.Next - 1
m.LogTerm = term
m.Entries = ents
m.Commit = r.raftLog.committed
if n := len(m.Entries); n != 0 {
switch pr.State {
// optimistically increase the next when in StateReplicate
case tracker.StateReplicate:
last := m.Entries[n-1].Index
pr.OptimisticUpdate(last)
pr.Inflights.Add(last)
case tracker.StateProbe:
pr.ProbeSent = true
default:
r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
}
}
}
r.send(m) // 發(fā)送數(shù)據(jù)
return true
}
現(xiàn)在的關(guān)鍵點(diǎn),在于r.send(m)
是如何將數(shù)據(jù)發(fā)送出去的:
// raft/raft.go文件
// send schedules persisting state to a stable storage and AFTER that
// sending the message (as part of next Ready message processing).
func (r *raft) send(m pb.Message) {
if m.From == None {
m.From = r.id
}
if m.Type == pb.MsgVote || m.Type == pb.MsgVoteResp || m.Type == pb.MsgPreVote || m.Type == pb.MsgPreVoteResp {
if m.Term == 0 {
// All {pre-,}campaign messages need to have the term set when
// sending.
// - MsgVote: m.Term is the term the node is campaigning for,
// non-zero as we increment the term when campaigning.
// - MsgVoteResp: m.Term is the new r.Term if the MsgVote was
// granted, non-zero for the same reason MsgVote is
// - MsgPreVote: m.Term is the term the node will campaign,
// non-zero as we use m.Term to indicate the next term we'll be
// campaigning for
// - MsgPreVoteResp: m.Term is the term received in the original
// MsgPreVote if the pre-vote was granted, non-zero for the
// same reasons MsgPreVote is
panic(fmt.Sprintf("term should be set when sending %s", m.Type))
}
} else {
if m.Term != 0 {
panic(fmt.Sprintf("term should not be set when sending %s (was %d)", m.Type, m.Term))
}
// do not attach term to MsgProp, MsgReadIndex
// proposals are a way to forward to the leader and
// should be treated as local message.
// MsgReadIndex is also forwarded to leader.
if m.Type != pb.MsgProp && m.Type != pb.MsgReadIndex {
m.Term = r.Term
}
}
r.msgs = append(r.msgs, m) // 將消息m追加到r.msgs上
}
消息被追加到r.msgs上,那么哪兒又在讀取這個(gè)r.msgs呢?只有一個(gè)地方,該r.msgs被賦值給其他字段:
// raft/node.go文件
func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
rd := Ready{
Entries: r.raftLog.unstableEntries(),
CommittedEntries: r.raftLog.nextEnts(),
Messages: r.msgs, // 將r.msgs賦值給Messages
}
...... // 省略其他處理
return rd
}
傳輸Messages的地方如下所示:
func (rc *raftNode) serveChannels() {
...... // 省略
// event loop on raft state machine updates
for {
select {
case <-ticker.C:
rc.node.Tick()
// store raft entries to wal, then publish over commit channel
case rd := <-rc.node.Ready():
rc.wal.Save(rd.HardState, rd.Entries)
if !raft.IsEmptySnap(rd.Snapshot) {
rc.saveSnap(rd.Snapshot)
rc.raftStorage.ApplySnapshot(rd.Snapshot)
rc.publishSnapshot(rd.Snapshot)
}
rc.raftStorage.Append(rd.Entries)
rc.transport.Send(rd.Messages) // 調(diào)用傳輸模塊,發(fā)送消息。這個(gè)傳輸模塊是ETCD的etcdserver模塊提供的。
applyDoneC, ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries))
if !ok {
rc.stop()
return
}
rc.maybeTriggerSnapshot(applyDoneC)
rc.node.Advance()
case err := <-rc.transport.ErrorC:
rc.writeError(err)
return
case <-rc.stopc:
rc.stop()
return
}
}
}
2.2 消息的接收
在集群中,F(xiàn)ollower會(huì)接收到leader的消息,我們直接看becomeFollower
函數(shù),如下所示:
// raft/raft.go 文件
func (r *raft) becomeFollower(term uint64, lead uint64) {
r.step = stepFollower // ref-15 設(shè)置處理消息接收的函數(shù)
r.reset(term)
r.tick = r.tickElection
r.lead = lead
r.state = StateFollower
r.logger.Infof("%x became follower at term %d", r.id, r.Term)
}
我們接著看關(guān)鍵函數(shù)stepFollower
,如下所示:
// raft/raft.go文件
func stepFollower(r *raft, m pb.Message) error {
switch m.Type {
case pb.MsgProp:
if r.lead == None {
r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
return ErrProposalDropped
} else if r.disableProposalForwarding {
r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term)
return ErrProposalDropped
}
m.To = r.lead
r.send(m)
case pb.MsgApp: // 上文中Leader最后發(fā)送的消息類型就是pb.MsgApp,因此會(huì)走這個(gè)分支
r.electionElapsed = 0
r.lead = m.From
r.handleAppendEntries(m) // ref-16 處理消息中的entries數(shù)據(jù)
case pb.MsgHeartbeat:
r.electionElapsed = 0
r.lead = m.From
r.handleHeartbeat(m)
case pb.MsgSnap:
r.electionElapsed = 0
r.lead = m.From
r.handleSnapshot(m)
case pb.MsgTransferLeader:
if r.lead == None {
r.logger.Infof("%x no leader at term %d; dropping leader transfer msg", r.id, r.Term)
return nil
}
m.To = r.lead
r.send(m)
case pb.MsgTimeoutNow:
r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From)
// Leadership transfers never use pre-vote even if r.preVote is true; we
// know we are not recovering from a partition so there is no need for the
// extra round trip.
r.hup(campaignTransfer)
case pb.MsgReadIndex:
if r.lead == None {
r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)
return nil
}
m.To = r.lead
r.send(m)
case pb.MsgReadIndexResp:
if len(m.Entries) != 1 {
r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries))
return nil
}
r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})
}
return nil
}
我們接著看關(guān)鍵函數(shù)handleAppendEntries,如下所示:
// raft/raft.go文件
func (r *raft) handleAppendEntries(m pb.Message) {
if m.Index < r.raftLog.committed { // 如果消息的index小于提交的記錄,則什么也不做。
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
return
}
// 開始追加entry數(shù)據(jù)
if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
} else {
...... // 省略
}
}
現(xiàn)在關(guān)鍵步驟是r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...)
,我們接著看:
// raft/log.go 文件
// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,
// it returns (last index of new entries, true).
func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
if l.matchTerm(index, logTerm) {
lastnewi = index + uint64(len(ents))
ci := l.findConflict(ents)
switch {
case ci == 0:
case ci <= l.committed:
l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
default:
offset := index + 1
l.append(ents[ci-offset:]...) // ref-16 將數(shù)據(jù)追加到日志中
}
l.commitTo(min(committed, lastnewi)) // 提交數(shù)據(jù)
return lastnewi, true
}
return 0, false
}
ref-16處代碼在處理數(shù)據(jù)的追加,詳細(xì)細(xì)節(jié)如下:文章來源:http://www.zghlxwxcb.cn/news/detail-568009.html
// raft/log.go文件
func (l *raftLog) append(ents ...pb.Entry) uint64 {
if len(ents) == 0 {
return l.lastIndex()
}
if after := ents[0].Index - 1; after < l.committed {
l.logger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed)
}
l.unstable.truncateAndAppend(ents)
return l.lastIndex()
}
// raft/log_unstable.go文件
func (u *unstable) truncateAndAppend(ents []pb.Entry) {
after := ents[0].Index
switch {
case after == u.offset+uint64(len(u.entries)):
// after is the next index in the u.entries
// directly append
u.entries = append(u.entries, ents...)
case after <= u.offset:
u.logger.Infof("replace the unstable entries from index %d", after)
// The log is being truncated to before our current offset
// portion, so set the offset and replace the entries
u.offset = after
u.entries = ents
default:
// truncate to after and copy to u.entries
// then append
u.logger.Infof("truncate the unstable entries before index %d", after)
u.entries = append([]pb.Entry{}, u.slice(u.offset, after)...)
u.entries = append(u.entries, ents...)
}
}
func (u *unstable) truncateAndAppend(ents []pb.Entry) {
after := ents[0].Index
switch {
case after == u.offset+uint64(len(u.entries)):
// after is the next index in the u.entries
// directly append
u.entries = append(u.entries, ents...)
case after <= u.offset:
u.logger.Infof("replace the unstable entries from index %d", after)
// The log is being truncated to before our current offset
// portion, so set the offset and replace the entries
u.offset = after
u.entries = ents
default:
// truncate to after and copy to u.entries
// then append
u.logger.Infof("truncate the unstable entries before index %d", after)
u.entries = append([]pb.Entry{}, u.slice(u.offset, after)...)
u.entries = append(u.entries, ents...)
}
}
日志復(fù)制流程的分析到這兒就結(jié)束了。文章來源地址http://www.zghlxwxcb.cn/news/detail-568009.html
到了這里,關(guān)于Raft算法之日志復(fù)制的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!