国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

使用C++20協(xié)程實(shí)現(xiàn)Raft一致性算法 | Raft算法C++20實(shí)戰(zhàn)

通過充分利用C++20的強(qiáng)大功能,演示了在分布式系統(tǒng)關(guān)鍵組件中構(gòu)建現(xiàn)代且高效的方法。文章深入探討了使用C++20協(xié)程在復(fù)雜編程環(huán)境中構(gòu)建Raft服務(wù)器時(shí)所遇到的挑戰(zhàn)和解決方案。

本文介紹了如何在C++20中實(shí)現(xiàn)Raft服務(wù)器一致性模塊,而無需使用任何額外的庫。文章分為三個(gè)主要部分:

  1. Raft算法的全面概述

  2. Raft服務(wù)器開發(fā)的詳細(xì)說明

  3. 基于協(xié)程的自定義網(wǎng)絡(luò)庫的描述

該實(shí)現(xiàn)利用了C++20的強(qiáng)大功能,特別是協(xié)程,以提供一種有效且現(xiàn)代的方法來構(gòu)建分布式系統(tǒng)的關(guān)鍵組件。這篇文章不僅展示了在復(fù)雜編程環(huán)境中使用C++20協(xié)程的實(shí)際應(yīng)用和好處,還深入探討了從零開始構(gòu)建共識(shí)模塊(如Raft服務(wù)器)時(shí)遇到的挑戰(zhàn)和解決方案。Raft服務(wù)器和網(wǎng)絡(luò)庫的存儲(chǔ)庫(miniraft-cpp和coroio)可供進(jìn)一步探索和實(shí)際應(yīng)用。

引言

在深入研究Raft算法的復(fù)雜性之前,讓我們考慮一個(gè)現(xiàn)實(shí)世界的例子。我們的目標(biāo)是開發(fā)一個(gè)網(wǎng)絡(luò)鍵值存儲(chǔ)(K/V)系統(tǒng)。在C++中,可以通過使用unordered_map<string, string>輕松實(shí)現(xiàn)這一目標(biāo)。然而,在實(shí)際應(yīng)用中,對(duì)容錯(cuò)存儲(chǔ)系統(tǒng)的需求增加了復(fù)雜性。一種看似簡單的方法可能涉及部署三臺(tái)(或更多)機(jī)器,每臺(tái)機(jī)器托管此服務(wù)的副本。期望是用戶管理數(shù)據(jù)復(fù)制和一致性。然而,這種方法可能導(dǎo)致不可預(yù)測的行為。例如,可能使用特定鍵更新數(shù)據(jù),然后稍后檢索較舊版本。

用戶真正想要的是一個(gè)分布式系統(tǒng),可能分布在多臺(tái)機(jī)器上,運(yùn)行得就像單臺(tái)主機(jī)系統(tǒng)一樣流暢。為了滿足這一需求,通常在K/V存儲(chǔ)(或任何類似的服務(wù),以下簡稱為“狀態(tài)機(jī)”)的前面放置一個(gè)共識(shí)模塊。此配置確保所有用戶與狀態(tài)機(jī)的交互都通過共識(shí)模塊進(jìn)行,而不是直接訪問。在這個(gè)背景下,讓我們看看如何使用Raft算法作為示例來實(shí)現(xiàn)這樣一個(gè)共識(shí)模塊。

Raft概述

在Raft算法中,有一個(gè)奇數(shù)個(gè)被稱為節(jié)點(diǎn)的參與者。每個(gè)節(jié)點(diǎn)保留自己的記錄日志。有一個(gè)節(jié)點(diǎn)是領(lǐng)導(dǎo)者,其他的是追隨者。用戶將所有請(qǐng)求(讀取和寫入)直接發(fā)送給領(lǐng)導(dǎo)者。當(dāng)收到更改狀態(tài)機(jī)的寫入請(qǐng)求時(shí),領(lǐng)導(dǎo)者首先記錄它,然后將其轉(zhuǎn)發(fā)給追隨者,追隨者也記錄它。一旦大多數(shù)節(jié)點(diǎn)成功響應(yīng),領(lǐng)導(dǎo)者將認(rèn)為此條目已提交,將其應(yīng)用于狀態(tài)機(jī),并通知用戶成功。

在Raft中,“Term”是一個(gè)關(guān)鍵概念,它只能增長。Term在系統(tǒng)發(fā)生變化時(shí)(例如領(lǐng)導(dǎo)變更)會(huì)發(fā)生變化。Raft的日志具有特定的結(jié)構(gòu),每個(gè)條目包括一個(gè)Term和一個(gè)Payload。Term指的是寫入初始條目的領(lǐng)導(dǎo)者。Payload表示要對(duì)狀態(tài)機(jī)進(jìn)行的更改。Raft確保具有相同索引和Term的兩個(gè)條目是相同的。Raft日志不是僅追加的,可能會(huì)被截?cái)?。例如,在下面的場景中,領(lǐng)導(dǎo)者S1在崩潰之前復(fù)制了兩個(gè)條目。S2接管并開始復(fù)制條目,S1的日志與S2和S3的不同。因此,S1日志的最后一個(gè)條目將被移除并替換為新的條目。

具有相同索引和術(shù)語的兩個(gè)條目是相同的

Raft RPC API

讓我們來研究一下Raft RPC。值得注意的是,Raft API相當(dāng)簡單,只有兩個(gè)調(diào)用。我們將首先查看領(lǐng)導(dǎo)者選舉API。值得注意的是,Raft確保每個(gè)任期只能有一個(gè)領(lǐng)導(dǎo)者。還可能存在沒有領(lǐng)導(dǎo)者的任期,例如選舉失敗的情況。為確保只發(fā)生一次選舉,每個(gè)節(jié)點(diǎn)將其選票保存在一個(gè)稱為VotedFor的持久變量中。選舉RPC稱為RequestVote,具有三個(gè)參數(shù):Term、LastLogIndex和LastLogTerm。響應(yīng)包含Term和VoteGranted。值得注意的是,每個(gè)請(qǐng)求都包含Term,在Raft中,只有當(dāng)節(jié)點(diǎn)的Terms是兼容的時(shí),它們才能有效通信。

當(dāng)節(jié)點(diǎn)發(fā)起選舉時(shí),它向其他節(jié)點(diǎn)發(fā)送RequestVote請(qǐng)求并收集它們的選票。如果大多數(shù)響應(yīng)是積極的,節(jié)點(diǎn)將晉升為領(lǐng)導(dǎo)者角色。

現(xiàn)在讓我們看一下AppendEntries請(qǐng)求。它接受參數(shù),如Term、PrevLogIndex、PrevLogTerm和Entries,響應(yīng)包含Term和Success。如果請(qǐng)求中的Entries字段為空,則它充當(dāng)心跳。

當(dāng)收到AppendEntries請(qǐng)求時(shí),追隨者檢查Term的PrevLogIndex。如果它與PrevLogTerm匹配,追隨者將Entries添加到其日志,從PrevLogIndex + 1開始(如果存在PrevLogIndex之后的條目,則將其刪除):

AppendEntries請(qǐng)求被接收的流程

AppendEntries請(qǐng)求被接收的流程

如果Terms不匹配,追隨者返回Success=false。在這種情況下,領(lǐng)導(dǎo)者會(huì)重試發(fā)送請(qǐng)求,將PrevLogIndex減少一個(gè)。

領(lǐng)導(dǎo)者重試發(fā)送請(qǐng)求,將PrevLogIndex減少一個(gè)

領(lǐng)導(dǎo)者重試發(fā)送請(qǐng)求,將PrevLogIndex減少一個(gè)

當(dāng)節(jié)點(diǎn)收到RequestVote請(qǐng)求時(shí),它將其LastTerm和LastLogIndex對(duì)比到最近的日志條目。如果這對(duì)是小于或等于請(qǐng)求者的值,節(jié)點(diǎn)返回VoteGranted=true。

Raft中的狀態(tài)轉(zhuǎn)換

Raft的狀態(tài)轉(zhuǎn)換如下。每個(gè)節(jié)點(diǎn)開始時(shí)都處于Follower狀態(tài)。如果Follower在設(shè)定的超時(shí)內(nèi)未收到AppendEntries,它會(huì)增加其Term并移動(dòng)到Candidate狀態(tài),觸發(fā)選舉。節(jié)點(diǎn)可以從Candidate狀態(tài)轉(zhuǎn)到Leader狀態(tài),如果它贏得選舉,或者如果它收到AppendEntries請(qǐng)求,則返回到Follower狀態(tài)。候選者還可以在超時(shí)期內(nèi)未轉(zhuǎn)換為Follower或Leader時(shí),重新變?yōu)镃andidate。如果處于任何狀態(tài)的節(jié)點(diǎn)收到具有比當(dāng)前Term大的RPC請(qǐng)求,則它將轉(zhuǎn)到Follower狀態(tài)。

提交

現(xiàn)在讓我們考慮一個(gè)示例,演示了Raft并不像看起來那么簡單。我從Diego Ongaro的博士論文中選取了這個(gè)例子。在第2個(gè)任期中,S1是領(lǐng)導(dǎo)者,在崩潰之前復(fù)制了兩個(gè)條目。隨后,S5在第3個(gè)任期中領(lǐng)導(dǎo),添加了一個(gè)條目,然后崩潰。接下來,S2在第4個(gè)任期中接管領(lǐng)導(dǎo),復(fù)制了來自第2個(gè)任期的條目,為第4個(gè)任期添加了自己的條目,然后崩潰。這導(dǎo)致了兩種可能的結(jié)果:S5重新獲得領(lǐng)導(dǎo)權(quán)并截?cái)嗟?個(gè)任期的條目,或者S1重新獲得領(lǐng)導(dǎo)權(quán)并提交第2個(gè)任期的條目。第2個(gè)任期的條目只有在它們被新領(lǐng)導(dǎo)者的后續(xù)條目覆蓋后才會(huì)被安全提交。

Raft算法在動(dòng)態(tài)且經(jīng)常不可預(yù)測的情況下的操作方式

Raft算法在動(dòng)態(tài)且經(jīng)常不可預(yù)測的情況下的操作方式

這個(gè)例子展示了Raft算法在動(dòng)態(tài)且經(jīng)常不可預(yù)測的情況下的操作方式。事件的順序,包括多個(gè)領(lǐng)導(dǎo)者和崩潰,展示了在分布式系統(tǒng)中維護(hù)一致狀態(tài)的復(fù)雜性。這種復(fù)雜性不會(huì)立即顯現(xiàn)出來,但在涉及領(lǐng)導(dǎo)者更改和系統(tǒng)故障的情況下變得重要。該示例強(qiáng)調(diào)了在處理這種復(fù)雜性時(shí)穩(wěn)健和深思熟慮方法的重要性,而這正是Raft試圖解決的問題。

額外資料

為了進(jìn)一步學(xué)習(xí)和更深入地了解Raft,我推薦以下材料:原始的Raft論文,非常適合實(shí)現(xiàn)。Diego Ongaro的博士論文提供了更深入的見解。Maxim Babenko的講座則更加詳細(xì)深入。

Raft實(shí)現(xiàn)

現(xiàn)在讓我們轉(zhuǎn)向Raft服務(wù)器的實(shí)現(xiàn),我認(rèn)為它在很大程度上受益于C++20協(xié)程。在我的實(shí)現(xiàn)中,持久狀態(tài)存儲(chǔ)在內(nèi)存中。然而,在實(shí)際場景中,它應(yīng)該保存到磁盤上。稍后我會(huì)詳細(xì)介紹MessageHolder。它的功能類似于shared_ptr,但專門設(shè)計(jì)用于處理Raft消息,確保對(duì)這些通信的高效管理和處理。

struct TState {
    uint64_t CurrentTerm = 1;
    uint32_t VotedFor = 0;
    std::vector<TMessageHolder<TLogEntry>> Log;
};

在Volatile狀態(tài)中,我使用了"L"表示"leader"或"F"表示"follower"來標(biāo)記條目,以澄清它們的用途。CommitIndex表示最后一個(gè)被提交的日志條目。相反,LastApplied是應(yīng)用于狀態(tài)機(jī)的最新日志條目,它始終小于或等于CommitIndex。NextIndex很重要,因?yàn)樗鼧?biāo)識(shí)要發(fā)送給同行的下一個(gè)日志條目。類似地,MatchIndex跟蹤最后一個(gè)發(fā)現(xiàn)匹配的日志條目。Votes部分包含投票給我的同行的ID。超時(shí)管理是一個(gè)重要方面:HeartbeatDue和RpcDue管理領(lǐng)導(dǎo)者的超時(shí),而ElectionDue處理追隨者的超時(shí)。

using TTime = std::chrono::time_point<std::chrono::steady_clock>;
struct TVolatileState {
    uint64_t CommitIndex = 0; // L,F
    uint64_t LastApplied = 0; // L,F
    std::unordered_map<uint32_t, uint64_t> NextIndex; // L
    std::unordered_map<uint32_t, uint64_t> MatchIndex; // L
    std::unordered_set<uint32_t> Votes; // C
    std::unordered_map<uint32_t, TTime> HeartbeatDue; // L
    std::unordered_map<uint32_t, TTime> RpcDue; // L
    TTime ElectionDue; // F
};

Raft API

我的Raft算法實(shí)現(xiàn)有兩個(gè)類。第一個(gè)是INode,表示一個(gè)同行節(jié)點(diǎn)。這個(gè)類包括兩個(gè)方法:Send,將出站消息存儲(chǔ)在內(nèi)部緩沖區(qū)中,和Drain,處理實(shí)際的消息分發(fā)。Raft是第二個(gè)類,它管理當(dāng)前節(jié)點(diǎn)的狀態(tài)。它還包括兩個(gè)方法:Process,處理傳入的連接,和ProcessTimeout,必須定期調(diào)用以處理超時(shí),例如領(lǐng)導(dǎo)者選舉超時(shí)。使用這些類的用戶應(yīng)根據(jù)需要使用Process、ProcessTimeout和Drain方法。INode的Send方法在Raft類內(nèi)部被調(diào)用,確保消息處理和狀態(tài)管理在Raft框架中無縫集成。

struct INode {
    virtual ~INode() = default;
    virtual void Send(TMessageHolder<TMessage> message) = 0;
    virtual void Drain() = 0;
};
class TRaft {
public:
    TRaft(uint32_t node,
        const std::unordered_map<uint32_t, std::shared_ptr<INode>>& nodes);
    void Process(TTime now,
        TMessageHolder<TMessage> message,
        const std::shared_ptr<INode>& replyTo = {});
    void ProcessTimeout(TTime now);
};

Raft消息

現(xiàn)在讓我們看看如何發(fā)送和讀取Raft消息。我沒有使用序列化庫,而是以TLV格式讀取和發(fā)送原始結(jié)構(gòu)。這是消息頭的樣子:

struct TMessage {
    uint32_t Type;
    uint32_t Len;
    char Value[0];
};

為了更方便,我引入了第二級(jí)頭:

struct TMessageEx: public TMessage {
    uint32_t Src = 0;
    uint32_t Dst = 0;
    uint64_t Term = 0;
};

這包括每個(gè)消息中發(fā)送者和接收者的ID。除了LogEntry之外,所有消息都繼承自TMessageEx。LogEntry和AppendEntries的實(shí)現(xiàn)如下:

struct TLogEntry: public TMessage {
    static constexpr EMessageType MessageType = EMessageType::LOG_ENTRY;
    uint64_t Term = 1;
    char Data[0];
};
struct TAppendEntriesRequest: public TMessageEx {
    static constexpr EMessageType MessageType
        = EMessageType::APPEND_ENTRIES_REQUEST;
    uint64_t PrevLogIndex = 0;
    uint64_t PrevLogTerm = 0;
    uint32_t Nentries = 0;
};

為了簡化消息處理,我使用了一個(gè)名為MessageHolder的類,類似于shared_ptr:

template<typename T>
requires std::derived_from<T, TMessage>
struct TMessageHolder {
    T* Mes;
    std::shared_ptr<char[]> RawData;
    uint32_t PayloadSize;
    std::shared_ptr<TMessageHolder<TMessage>[]> Payload;
    template<typename U>
    requires std::derived_from<U, T>
    TMessageHolder<U> Cast() {...}
    template<typename U>
    requires std::derived_from<U, T>
    auto Maybe() { ... }
};

這個(gè)類包括一個(gè)包含消息本身的char數(shù)組。它還可能包括一個(gè)Payload(僅用于AppendEntry),以及用于安全地將基本類型消息轉(zhuǎn)換為特定類型消息的方法(Maybe方法)和不安全的轉(zhuǎn)換方法(Cast方法)。這是使用MessageHolder的典型例子:

void SomeFunction(TMessageHolder<TMessage> message) {
    auto maybeAppendEntries = message.Maybe<TAppendEntriesRequest>();
    if (maybeAppendEntries) {
        auto appendEntries = maybeAppendEntries.Cast();
    }
    // 如果我們確定
    auto appendEntries = message.Cast<TAppendEntriesRequest>();
    // 使用重載的operator->
    auto term = appendEntries->Term;
    auto nentries = appendEntries->Nentries;
    // ...
}

以及在Candidate狀態(tài)處理程序中的實(shí)際示例:

void TRaft::Candidate(TTime now, TMessageHolder<TMessage> message) {
    if (auto maybeResponseVote = message.Maybe<TRequestVoteResponse>()) {
        OnRequestVote(std::move(maybeResponseVote.Cast()));
    } else
    if (auto maybeRequestVote = message.Maybe<TRequestVoteRequest>()) {
        OnRequestVote(now, std::move(maybeRequestVote.Cast()));
    } else
    if (auto maybeAppendEntries = message.Maybe<TAppendEntriesRequest>()) {
        OnAppendEntries(now, std::move(maybeAppendEntries.Cast()));
    }
}

這種設(shè)計(jì)方法提高了Raft實(shí)現(xiàn)中消息處理的效率和靈活性。

Raft 服務(wù)器

讓我們討論一下 Raft 服務(wù)器的實(shí)現(xiàn)。Raft 服務(wù)器將為網(wǎng)絡(luò)交互設(shè)置協(xié)程。首先,我們將看一下處理消息讀寫的協(xié)程。這些協(xié)程使用的原語稍后在文章中討論,同時(shí)還會(huì)對(duì)網(wǎng)絡(luò)庫進(jìn)行分析。寫協(xié)程負(fù)責(zé)將消息寫入套接字,而讀協(xié)程稍微復(fù)雜一些。為了讀取,它必須首先檢索 Type 和 Len 變量,然后分配一個(gè)長度為 Len 字節(jié)的數(shù)組,最后讀取剩余的消息。這種結(jié)構(gòu)有助于在 Raft 服務(wù)器內(nèi)部有效而高效地管理網(wǎng)絡(luò)通信。

template<typename TSocket>
TValueTask<void>
TMessageWriter<TSocket>::Write(TMessageHolder<TMessage> message) {
    co_await TByteWriter(Socket).Write(message.Mes, message->Len);
    auto payload = std::move(message.Payload);
    for (uint32_t i = 0; i < message.PayloadSize; ++i) {
        co_await Write(std::move(payload[i]));
    }
    co_return;
}
template<typename TSocket>
TValueTask<TMessageHolder<TMessage>> TMessageReader<TSocket>::Read() {
    decltype(TMessage::Type) type; decltype(TMessage::Len) len;
    auto s = co_await Socket.ReadSome(&type, sizeof(type));
    if (s != sizeof(type)) { /* throw */ }
    s = co_await Socket.ReadSome(&len, sizeof(len));
    if (s != sizeof(len)) { /* throw */}
    auto mes = NewHoldedMessage<TMessage>(type, len);
    co_await TByteReader(Socket).Read(mes->Value, len - sizeof(TMessage));
    auto maybeAppendEntries = mes.Maybe<TAppendEntriesRequest>();
    if (maybeAppendEntries) {
        auto appendEntries = maybeAppendEntries.Cast();
        auto nentries = appendEntries->Nentries; mes.InitPayload(nentries);
        for (uint32_t i = 0; i < nentries; i++) mes.Payload[i] = co_await Read();
    }
    co_return mes;
}

要啟動(dòng) Raft 服務(wù)器,創(chuàng)建 RaftServer 類的實(shí)例并調(diào)用 Serve 方法。Serve 方法啟動(dòng)兩個(gè)協(xié)程。Idle 協(xié)程負(fù)責(zé)定期處理超時(shí),而 InboundServe 管理傳入的連接。

class TRaftServer {
public:
    void Serve() {
        Idle();
        InboundServe();
    }
private:
    TVoidTask InboundServe();
    TVoidTask InboundConnection(TSocket socket);
    TVoidTask Idle();
}

通過 accept 調(diào)用接收傳入的連接。然后啟動(dòng) InboundConnection 協(xié)程,該協(xié)程讀取傳入的消息并將其轉(zhuǎn)發(fā)給 Raft 實(shí)例進(jìn)行處理。這個(gè)配置確保了 Raft 服務(wù)器能夠高效處理內(nèi)部超時(shí)和外部通信。

TVoidTask InboundServe() {
    while (true) {
        auto client = co_await Socket.Accept();
        InboundConnection(std::move(client));
    }
    co_return;
}
TVoidTask InboundConnection(TSocket socket) {
    while (true) {
        auto mes = co_await TMessageReader(client->Sock()).Read();
        Raft->Process(std::chrono::steady_clock::now(), std::move(mes),
            client);
        Raft->ProcessTimeout(std::chrono::steady_clock::now());
        DrainNodes();
    }
    co_return;
}

Idle 協(xié)程的工作方式如下:每隔 sleep 秒調(diào)用一次 ProcessTimeout 方法。值得注意的是,這個(gè)協(xié)程使用了異步的 sleep。這種設(shè)計(jì)使得 Raft 服務(wù)器能夠有效地管理對(duì)時(shí)間敏感的操作,而不阻塞其他進(jìn)程,提高服務(wù)器的整體響應(yīng)性和性能。

while (true) {
    Raft->ProcessTimeout(std::chrono::steady_clock::now());
    DrainNodes();
    auto t1 = std::chrono::steady_clock::now();
    if (t1 > t0 + dt) {
        DebugPrint();
        t0 = t1;
    }
    co_await Poller.Sleep(t1 + sleep);
}

為發(fā)送出站消息創(chuàng)建的協(xié)程設(shè)計(jì)得非常簡單。它在循環(huán)中重復(fù)發(fā)送所有累積的消息到套接字。在發(fā)生錯(cuò)誤時(shí),它啟動(dòng)另一個(gè)協(xié)程,負(fù)責(zé)連接(通過 connect 函數(shù))。這種結(jié)構(gòu)確保了出站消息的順利和高效處理,同時(shí)通過錯(cuò)誤處理和連接管理保持了魯棒性。

try {
    while (!Messages.empty()) {
        auto tosend = std::move(Messages); Messages.clear();
        for (auto&& m : tosend) {
            co_await TMessageWriter(Socket).Write(std::move(m));
        }
    }
} catch (const std::exception& ex) {
    Connect();
}
co_return;

有了 Raft 服務(wù)器的實(shí)現(xiàn),這些例子展示了協(xié)程是如何極大地簡化開發(fā)的。雖然我還沒有深入研究 Raft 的實(shí)現(xiàn)(相信我,它比 Raft 服務(wù)器復(fù)雜得多),但總體算法不僅簡單,而且設(shè)計(jì)緊湊。

接下來,我們將看一些 Raft 服務(wù)器的例子。之后,我將描述我從頭開始為 Raft 服務(wù)器創(chuàng)建的網(wǎng)絡(luò)庫。這個(gè)庫對(duì)于在 Raft 框架內(nèi)實(shí)現(xiàn)高效的網(wǎng)絡(luò)通信至關(guān)重要。

以下是啟動(dòng)三個(gè)節(jié)點(diǎn)的 Raft 集群的示例。每個(gè)實(shí)例都接收自己的 ID 作為參數(shù),以及

其他實(shí)例的地址和 ID。在這種情況下,客戶端只與領(lǐng)導(dǎo)者通信。它發(fā)送隨機(jī)字符串,同時(shí)保持固定數(shù)量的在途消息并等待它們的提交。這個(gè)配置描述了客戶端在多節(jié)點(diǎn) Raft 環(huán)境中與領(lǐng)導(dǎo)者之間的交互,演示了算法處理分布式數(shù)據(jù)和共識(shí)的能力。

$ ./server --id 1 --node 127.0.0.1:8001:1 --node 127.0.0.1:8002:2 --node 127.0.0.1:8003:3
...
Candidate, Term: 2, Index: 0, CommitIndex: 0,
...
Leader, Term: 3, Index: 1080175, CommitIndex: 1080175, Delay: 2:0 3:0
        MatchIndex: 2:1080175 3:1080175 NextIndex: 2:1080176 3:1080176
....
$ ./server --id 2 --node 127.0.0.1:8001:1 --node 127.0.0.1:8002:2 --node 127.0.0.1:8003:3
...
$ ./server --id 3 --node 127.0.0.1:8001:1 --node 127.0.0.1:8002:2 --node 127.0.0.1:8003:3
...
Follower, Term: 3, Index: 1080175, CommitIndex: 1080175,
...
$ dd if=/dev/urandom | base64 | pv -l | ./client --node 127.0.0.1:8001:1 >log1
 198k 0:00:03 [159.2k/s] [        <=>

我測量了 3 節(jié)點(diǎn)和 5 節(jié)點(diǎn)集群配置下的提交延遲。正如預(yù)期的那樣,5 節(jié)點(diǎn)設(shè)置的延遲更高:

  • 3 節(jié)點(diǎn)

    50 百分位數(shù)(中位數(shù)):292,872 ns

    80 百分位數(shù):407,561 ns

    90 百分位數(shù):569,164 ns

    99 百分位數(shù):40,279,001 ns

  • 5 節(jié)點(diǎn)

    50 百分位數(shù)(中位數(shù)):425,194 ns

    80 百分位數(shù):672,541 ns

    90 百分位數(shù):1,027,669 ns

    99 百分位數(shù):38,578,749 ns

I/O庫

現(xiàn)在讓我們來看一下我從頭開始創(chuàng)建并在Raft服務(wù)器實(shí)現(xiàn)中使用的I/O庫。我從cppreference.com的以下示例開始,這是一個(gè)回顯服務(wù)器的實(shí)現(xiàn):

task<> tcp_echo_server() {
    char data[1024];
    while (true) {
        std::size_t n = co_await socket.async_read_some(buffer(data));
        co_await async_write(socket, buffer(data, n));
    }
}

我的庫需要事件循環(huán)、套接字原語以及像`read_some`/`write_some`(在我的庫中稱為`ReadSome`/`WriteSome`)這樣的方法,以及更高級(jí)的包裝器,比如`async_write`/`async_read`(在我的庫中稱為`TByteReader`/`TByteWriter`)。

為了實(shí)現(xiàn)套接字的`ReadSome`方法,我需要?jiǎng)?chuàng)建一個(gè)`Awaitable`,如下所示:

auto ReadSome(char* buf, size_t size) {
    struct TAwaitable {
        bool await_ready() { return false; /*總是掛起*/ }
        void await_suspend(std::coroutine_handle<> h) {
            poller->AddRead(fd, h);
        }
        int await_resume() {
            return read(fd, b, s);
        }
        TSelect* poller; int fd; char* b; size_t s;
    };
    return TAwaitable{Poller_, Fd_, buf, size};
}

當(dāng)調(diào)用`co_await`時(shí),協(xié)程會(huì)暫停,因?yàn)閌await_ready`返回`false`。在`await_suspend`中,我們捕獲協(xié)程句柄,并將其與套接字句柄一起傳遞給輪詢器。當(dāng)套接字準(zhǔn)備好時(shí),輪詢器調(diào)用協(xié)程句柄來重新啟動(dòng)協(xié)程。在恢復(fù)時(shí),調(diào)用`await_resume`,它執(zhí)行讀取并將讀取的字節(jié)數(shù)返回給協(xié)程。`WriteSome`、`Accept`和`Connect`方法的實(shí)現(xiàn)方式相似。

輪詢器的設(shè)置如下:

struct TEvent {
    int Fd; int Type; // READ = 1, WRITE = 2;
    std::coroutine_handle<> Handle;
};
class TSelect {
    void Poll() {
        for (const auto& ch : Events) { /* FD_SET(ReadFds); FD_SET(WriteFds);*/ }
        pselect(Size, ReadFds, WriteFds, nullptr, ts, nullptr);
        for (int k = 0; k < Size; ++k) {
            if (FD_ISSET(k, WriteFds)) {
                Events[k].Handle.resume();
            }
            // ...
        }
    }
    std::vector<TEvent> Events;
    // ...
};

我保持了一組成對(duì)的(套接字描述符,協(xié)程句柄)數(shù)組,用于初始化輪詢器后端的結(jié)構(gòu)(在本例中為select)。當(dāng)與就緒套接字相對(duì)應(yīng)的協(xié)程喚醒時(shí),將調(diào)用`resume`。

在主函數(shù)中應(yīng)用如下:

TSimpleTask task(TSelect& poller) {
    TSocket socket(0, poller);
    char buffer[1024];
    while (true) {
        auto readSize = co_await socket.ReadSome(buffer, sizeof(buffer));
    }
}
int main() {
    TSelect poller;
    task(poller);
    while (true) { poller.Poll(); }
}

我們啟動(dòng)了一個(gè)協(xié)程(或多個(gè)協(xié)程),該協(xié)程通過`co_await`進(jìn)入睡眠模式,然后控制被傳遞到一個(gè)無限循環(huán)中,該循環(huán)調(diào)用輪詢器機(jī)制。如果在輪詢器中套接字變?yōu)榫途w,那么相應(yīng)的協(xié)程將被觸發(fā)并執(zhí)行,直到下一個(gè)`co_await`。

為了讀寫Raft消息,我需要?jiǎng)?chuàng)建對(duì)`ReadSome`/`WriteSome`進(jìn)行高級(jí)封裝的函數(shù),類似于:

TValueTask<T> Read() {
    T res; size_t size = sizeof(T);
    char* p = reinterpret_cast<char*>(&res);
    while (size != 0) {
        auto readSize = co_await Socket.ReadSome(p, size);
        p += readSize;
        size -= readSize;
    }
    co_return res;
}
// 用法
T t = co_await Read<T>();

為了實(shí)現(xiàn)這些,我需要?jiǎng)?chuàng)建一個(gè)同時(shí)充當(dāng)`Awaitable`的協(xié)程。協(xié)程由一對(duì)協(xié)程句柄和promise組成。協(xié)程句柄用于從外部管理協(xié)程,而promise用于內(nèi)部管理。協(xié)程句柄可以包含`Awaitable`方法,允許使用`co_await`等待協(xié)程的結(jié)果。promise可以

用于存儲(chǔ)`co_return`返回的結(jié)果,并喚醒調(diào)用協(xié)程。

在`coroutine_handle`中,在`await_suspend`方法中,我們存儲(chǔ)了調(diào)用協(xié)程的協(xié)程句柄。其值將保存在promise中:

template<typename T>
struct TValueTask : std::coroutine_handle<> {
    bool await_ready() { return !!this->promise().Value; }
    void await_suspend(std::coroutine_handle<> caller) {
        this->promise().Caller = caller;
    }
    T await_resume() { return *this->promise().Value; }
    using promise_type = TValuePromise<T>;
};

在promise本身中,`return_value`方法將存儲(chǔ)返回的值。使用可等待對(duì)象喚醒調(diào)用協(xié)程,在`final_suspend`中返回該對(duì)象。這是因?yàn)榫幾g器在`co_return`后會(huì)調(diào)用`final_suspend`上的`co_await`。

template<typename T>
struct TValuePromise {
    void return_value(const T& t) { Value = t; }
    std::suspend_never initial_suspend() { return {}; }
    // 在這里恢復(fù)調(diào)用者
    TFinalSuspendContinuation<T> final_suspend() noexcept;
    std::optional<T> Value;
    std::coroutine_handle<> Caller = std::noop_coroutine();
};

在`await_suspend`中,調(diào)用協(xié)程可以被返回,并將自動(dòng)喚醒。需要注意的是,被調(diào)用的協(xié)程現(xiàn)在將處于睡眠狀態(tài),它的協(xié)程句柄必須在銷毀時(shí)通過`destroy`進(jìn)行銷毀,以避免內(nèi)存泄漏。例如,可以在`TValueTask`的析構(gòu)函數(shù)中完成此操作。

template<typename T>
struct TFinalSuspendContinuation {
    bool await_ready() noexcept { return false; }
    std::coroutine_handle<> await_suspend(
        std::coroutine_handle<TValuePromise<T>> h) noexcept
    {
        return h.promise().Caller;
    }
    void await_resume() noexcept { }
};

隨著庫描述的完成,我將其移植到libevent基準(zhǔn)測試中,以確保其性能。該基準(zhǔn)測試生成一個(gè)包含N個(gè)Unix管道的鏈,每個(gè)管道都連接到下一個(gè)。然后,它初始化對(duì)鏈的100次寫入操作,這將持續(xù)到總共進(jìn)行1000次寫入調(diào)用。下面的圖像展示了該基準(zhǔn)測試的運(yùn)行時(shí),作為我的庫(coroio)與libevent的不同后端的函數(shù)。

我的庫 (coroio) 與 libevent 的各種后端的 Benchmark 運(yùn)行時(shí)作為 N 的函數(shù)

我的庫 (coroio) 與 libevent 的各種后端的 Benchmark 運(yùn)行時(shí)作為 N 的函數(shù)

結(jié)論

總的來說,本文描述了使用C++20協(xié)程實(shí)現(xiàn)Raft服務(wù)器的過程,強(qiáng)調(diào)了這一現(xiàn)代C++特性提供的便利和效率。自定義的I/O庫是此實(shí)現(xiàn)的關(guān)鍵,因?yàn)樗行У靥幚砹水惒絀/O操作。該庫的性能通過與libevent基準(zhǔn)測試的驗(yàn)證得到了證實(shí),展示了其競爭力。

對(duì)于那些對(duì)學(xué)習(xí)或使用這些工具感興趣的人,I/O庫可在coroio處找到,Raft庫可在miniraft-cpp處找到(在文章開頭提供鏈接)。這兩個(gè)存儲(chǔ)庫提供了使用C++20協(xié)程構(gòu)建強(qiáng)大、高性能分布式系統(tǒng)的詳細(xì)信息。文章來源地址http://www.zghlxwxcb.cn/article/705.html

到此這篇關(guān)于使用C++20協(xié)程實(shí)現(xiàn)Raft一致性算法 | Raft算法C++20實(shí)戰(zhàn)的文章就介紹到這了,更多相關(guān)內(nèi)容可以在右上角搜索或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

原文地址:http://www.zghlxwxcb.cn/article/705.html

如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)聯(lián)系站長進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 分布式一致性算法——Paxos 和 Raft 算法

    分布式一致性算法——Paxos 和 Raft 算法

    本文隸屬于專欄《100個(gè)問題搞定大數(shù)據(jù)理論體系》,該專欄為筆者原創(chuàng),引用請(qǐng)注明來源,不足和錯(cuò)誤之處請(qǐng)?jiān)谠u(píng)論區(qū)幫忙指出,謝謝! 本專欄目錄結(jié)構(gòu)和參考文獻(xiàn)請(qǐng)見100個(gè)問題搞定大數(shù)據(jù)理論體系 Paxos和Raft算法都是 分布式一致性算法 ,它們的目的都是 在一個(gè)分布式系統(tǒng)

    2024年01月20日
    瀏覽(29)
  • 分布式一致性算法Paxos、Raft 及 Zookeeper ZAB

    分布式一致性算法Paxos、Raft 及 Zookeeper ZAB

    國科大學(xué)習(xí)生活(期末復(fù)習(xí)資料、課程大作業(yè)解析、學(xué)習(xí)文檔等): 文章專欄(點(diǎn)擊跳轉(zhuǎn)) 大數(shù)據(jù)開發(fā)學(xué)習(xí)文檔(分布式文件系統(tǒng)的實(shí)現(xiàn),大數(shù)據(jù)生態(tài)圈學(xué)習(xí)文檔等): 文章專欄(點(diǎn)擊跳轉(zhuǎn)) 分布式一致性算法是用于在分布式系統(tǒng)中 確保數(shù)據(jù)一致性 的一類算法。在分布式計(jì)

    2024年02月04日
    瀏覽(23)
  • 分布式系統(tǒng)中的那些一致性(CAP、BASE、2PC、3PC、Paxos、ZAB、Raft)

    分布式系統(tǒng)中的那些一致性(CAP、BASE、2PC、3PC、Paxos、ZAB、Raft)

    本文介紹 CAP、BASE理論的正確理解、Paxos 算法如何保證一致性及死循環(huán)問題、ZAB 協(xié)議中原子廣播及崩潰恢復(fù)以及 Raft 算法的動(dòng)態(tài)演示。 下面還有投票,一起參與進(jìn)來吧?? 工作過幾年的同學(xué),尤其是這幾年,大家或多或少都參與過分布式系統(tǒng)的開發(fā),遇到過各式各樣“分布式

    2024年02月05日
    瀏覽(27)
  • 一致性哈希算法優(yōu)勢在哪?如何實(shí)現(xiàn)?

    1.1 簡介Hash 哈希算法即散列算法,是一種從任意文件中創(chuàng)造小的數(shù)字「指紋」的方法。與指紋一樣,散列算法就是一種以較短的信息來保證文件唯一性的標(biāo)志,這種標(biāo)志與文件的每一個(gè)字節(jié)都相關(guān),而且難以找到逆向規(guī)律。因此,當(dāng)原有文件發(fā)生改變時(shí),其標(biāo)志值也會(huì)發(fā)生改

    2024年02月03日
    瀏覽(34)
  • mysql使用redis+canal實(shí)現(xiàn)緩存一致性

    mysql使用redis+canal實(shí)現(xiàn)緩存一致性

    目錄 一、開啟binlog日志 1.首先查看是否開啟了binlog 2、開啟binlog日志,并重啟mysql服務(wù) 二、授權(quán) canal 鏈接 MySQL 賬號(hào)具有作為 MySQL slave 的權(quán)限 三、下載配置canal 1、下載 canal, 訪問?release?頁面 , 選擇需要的包下載, 如以 1.0.17 版本為例 2、?修改confexample文件夾下instance.propert

    2024年02月13日
    瀏覽(32)
  • Sharding-JDBC 自定義一致性哈希算法 + 虛擬節(jié)點(diǎn) 實(shí)現(xiàn)數(shù)據(jù)庫分片策略

    分片操作是分片鍵 + 分片算法,也就是分片策略。目前Sharding-JDBC 支持多種分片策略: 標(biāo)準(zhǔn)分片策略 對(duì)應(yīng)StandardShardingStrategy。提供對(duì)SQL語句中的=, IN和BETWEEN AND的分片操作支持。 復(fù)合分片策略 對(duì)應(yīng)ComplexShardingStrategy。復(fù)合分片策略。提供對(duì)SQL語句中的=, IN和BETWEEN AND的分片操作

    2024年02月02日
    瀏覽(93)
  • 談?wù)勔恢滦怨K惴? decoding=
  • 07. 算法之一致性哈希算法介紹

    07. 算法之一致性哈希算法介紹

    哈希算法在程序開發(fā)中的很多地方都能看到他的身影,但是哈希有他的局限性,比如如果兩個(gè)key哈希到同一個(gè)位置的時(shí)候,此時(shí)就不好處理。本節(jié)我們介紹一下常規(guī)處理方式。 哈希算法將任意長度的二進(jìn)制值映射為較短的固定長度的二進(jìn)制值,這個(gè)小的二進(jìn)制值稱為哈希值。

    2024年02月06日
    瀏覽(24)
  • 【負(fù)載均衡——一致性哈希算法】

    【負(fù)載均衡——一致性哈希算法】

    一致性哈希算法就很好地解決了分布式系統(tǒng)在擴(kuò)容或者縮容時(shí),發(fā)生過多的數(shù)據(jù)遷移的問題。 一致哈希算法也用了取模運(yùn)算,但與哈希算法不同的是,哈希算法是對(duì)節(jié)點(diǎn)的數(shù)量進(jìn)行取模運(yùn)算,而一致 哈希算法 是對(duì) 2^32 進(jìn)行取模運(yùn)算,是一個(gè)固定的值。 一致性哈希要進(jìn)行兩步

    2024年04月10日
    瀏覽(25)
  • 區(qū)塊鏈:哈希算法與一致性哈希算法

    本篇主要介紹區(qū)塊鏈中常用到的哈希算法。 1 哈希算法 1.1 定義及特性 ??哈希算法是指通過哈希函數(shù)(Hash Function)對(duì)任意長度的輸入數(shù)據(jù)(比如文件、消息、數(shù)字等)進(jìn)行轉(zhuǎn)換,生成一個(gè)固定長度的哈希值(Hash Value)的過程。 ??在區(qū)塊鏈中,哈希算法常用于區(qū)塊驗(yàn)證及安全性保

    2024年02月17日
    瀏覽(22)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包