国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Elasticsearch分布式一致性原理剖析(三)-Data篇

這篇具有很好參考價值的文章主要介紹了Elasticsearch分布式一致性原理剖析(三)-Data篇。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

本文首發(fā)于云棲社區(qū)(Elasticsearch分布式一致性原理剖析(三)-Data篇-博客-云棲社區(qū)-阿里云),由原作者轉(zhuǎn)載。

前言

“Elasticsearch分布式一致性原理剖析”系列將會對Elasticsearch的分布式一致性原理進行詳細的剖析,介紹其實現(xiàn)方式、原理以及其存在的問題等(基于6.2版本)。前兩篇文章介紹了ES中集群如何組成,master選舉算法,master更新meta的流程等,并分析了選舉、Meta更新中的一致性問題。本文會分析ES中的數(shù)據(jù)流,包括其寫入流程、算法模型PacificA、SequenceNumber與Checkpoint等,并比較ES的實現(xiàn)與標(biāo)準(zhǔn)PacificA算法的異同。目錄如下:

  1. 問題背景
  2. 數(shù)據(jù)寫入流程
  3. PacificA算法
  4. SequenceNumber、Checkpoint與故障恢復(fù)
  5. ES與PacificA的比較
  6. 小結(jié)

問題背景

用過ES的同學(xué)都知道,ES中每個Index會劃分為多個Shard,Shard分布在不同的Node上,以此來實現(xiàn)分布式的存儲和查詢,支撐大規(guī)模的數(shù)據(jù)集。對于每個Shard,又會有多個Shard的副本,其中一個為Primary,其余的一個或多個為Replica。數(shù)據(jù)在寫入時,會先寫入Primary,由Primary將數(shù)據(jù)再同步給Replica。在讀取時,為了提高讀取能力,Primary和Replica都會接受讀請求。

Elasticsearch分布式一致性原理剖析(三)-Data篇,elasticsearch,分布式,大數(shù)據(jù)

在這種模型下,我們能夠感受到ES具有這樣的一些特性,比如:

  1. 數(shù)據(jù)高可靠:數(shù)據(jù)具有多個副本。
  2. 服務(wù)高可用:Primary掛掉之后,可以從Replica中選出新的Primary提供服務(wù)。
  3. 讀能力擴展:Primary和Replica都可以承擔(dān)讀請求。
  4. 故障恢復(fù)能力:Primary或Replica掛掉都會導(dǎo)致副本數(shù)不足,此時可以由新的Primary通過復(fù)制數(shù)據(jù)產(chǎn)生新的副本。

另外,我們也可以想到一些問題,比如:

  1. 數(shù)據(jù)怎么從Primary復(fù)制到Replica?
  2. 一次寫入要求所有副本都成功嗎?
  3. Primary掛掉會丟數(shù)據(jù)嗎?
  4. 數(shù)據(jù)從Replica讀,總是能讀到最新數(shù)據(jù)嗎?
  5. 故障恢復(fù)時,需要拷貝Shard下的全部數(shù)據(jù)嗎?

可以看到,對于ES中的數(shù)據(jù)一致性,雖然我們可以很容易的了解到其大概原理,但是對其細節(jié)我們還有很多的困惑。那么本文就從ES的寫入流程,采用的一致性算法,SequenceId和Checkpoint的設(shè)計等方面來介紹ES如何工作,進而回答上述這些問題。需要注意的是,本文基于ES6.2版本進行分析,可能很多內(nèi)容并不適用于ES之前的版本,比如2.X的版本等。

數(shù)據(jù)寫入流程

首先我們來看一下數(shù)據(jù)的寫入流程,讀者也可以閱讀這篇文章來詳細了解:Elasticsearch內(nèi)核解析 - 寫入篇 - 知乎。

Replication角度: Primary -> Replica

我們從大的角度來看,ES寫入流程為先寫入Primary,再并發(fā)寫入Replica,最后應(yīng)答客戶端,流程如下:

  • 檢查Active的Shard數(shù)。
final String activeShardCountFailure = checkActiveShardCount();
  • 寫入Primary。
primaryResult = primary.perform(request);
  • 并發(fā)的向所有Replicate發(fā)起寫入請求
performOnReplicas(replicaRequest, globalCheckpoint, replicationGroup.getRoutingTable());
  • 等所有Replicate返回或者失敗后,返回給Client。
private void decPendingAndFinishIfNeeded() {
     assert pendingActions.get() > 0 : "pending action count goes below 0 for request [" + request + "]";
     if (pendingActions.decrementAndGet() == 0) {
         finish();
     }
 }

上述過程在ReplicationOperation類的execute函數(shù)中,完整代碼如下:

public void execute() throws Exception {
        final String activeShardCountFailure = checkActiveShardCount();
        final ShardRouting primaryRouting = primary.routingEntry();
        final ShardId primaryId = primaryRouting.shardId();
        if (activeShardCountFailure != null) {
            finishAsFailed(new UnavailableShardsException(primaryId,
                "{} Timeout: [{}], request: [{}]", activeShardCountFailure, request.timeout(), request));
            return;
        }

        totalShards.incrementAndGet();
        pendingActions.incrementAndGet(); // increase by 1 until we finish all primary coordination
        primaryResult = primary.perform(request);
        primary.updateLocalCheckpointForShard(primaryRouting.allocationId().getId(), primary.localCheckpoint());
        final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
        if (replicaRequest != null) {
            if (logger.isTraceEnabled()) {
                logger.trace("[{}] op [{}] completed on primary for request [{}]", primaryId, opType, request);
            }

            // we have to get the replication group after successfully indexing into the primary in order to honour recovery semantics.
            // we have to make sure that every operation indexed into the primary after recovery start will also be replicated
            // to the recovery target. If we used an old replication group, we may miss a recovery that has started since then.
            // we also have to make sure to get the global checkpoint before the replication group, to ensure that the global checkpoint
            // is valid for this replication group. If we would sample in the reverse, the global checkpoint might be based on a subset
            // of the sampled replication group, and advanced further than what the given replication group would allow it to.
            // This would entail that some shards could learn about a global checkpoint that would be higher than its local checkpoint.
            final long globalCheckpoint = primary.globalCheckpoint();
            final ReplicationGroup replicationGroup = primary.getReplicationGroup();
            markUnavailableShardsAsStale(replicaRequest, replicationGroup.getInSyncAllocationIds(), replicationGroup.getRoutingTable());
            performOnReplicas(replicaRequest, globalCheckpoint, replicationGroup.getRoutingTable());
        }

        successfulShards.incrementAndGet();  // mark primary as successful
        decPendingAndFinishIfNeeded();
    }

下面我們針對這個流程,來分析幾個問題:

1. 為什么第一步要檢查Active的Shard數(shù)?

ES中有一個參數(shù),叫做waitforactiveshards,這個參數(shù)是Index的一個setting,也可以在請求中帶上這個參數(shù)。這個參數(shù)的含義是,在每次寫入前,該shard至少具有的active副本數(shù)。假設(shè)我們有一個Index,其每個Shard有3個Replica,加上Primary則總共有4個副本。如果配置waitforactiveshards為3,那么允許最多有一個Replica掛掉,如果有兩個Replica掛掉,則Active的副本數(shù)不足3,此時不允許寫入。

這個參數(shù)默認是1,即只要Primary在就可以寫入,起不到什么作用。如果配置大于1,可以起到一種保護的作用,保證寫入的數(shù)據(jù)具有更高的可靠性。但是這個參數(shù)只在寫入前檢查,并不保證數(shù)據(jù)一定在至少這些個副本上寫入成功,所以并不是嚴格保證了最少寫入了多少個副本。關(guān)于這一點,可參考以下官方文檔:

https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html
...It is important to note that this setting greatly reduces the chances of the write operation not writing to the requisite number of shard copies, but it does not completely eliminate the possibility, because this check occurs before the write operation commences. Once the write operation is underway, it is still possible for replication to fail on any number of shard copies but still succeed on the primary. The _shards section of the write operation’s response reveals the number of shard copies on which replication succeeded/failed.

2. 寫入Primary完成后,為何要等待所有Replica響應(yīng)(或連接失敗)后返回

在更早的ES版本,Primary和Replica之間是允許異步復(fù)制的,即寫入Primary成功即可返回。但是這種模式下,如果Primary掛掉,就有丟數(shù)據(jù)的風(fēng)險,而且從Replica讀數(shù)據(jù)也很難保證能讀到最新的數(shù)據(jù)。所以后來ES就取消異步模式了,改成Primary等Replica返回后再返回給客戶端。

因為Primary要等所有Replica返回才能返回給客戶端,那么延遲就會受到最慢的Replica的影響,這確實是目前ES架構(gòu)的一個弊端。之前曾誤認為這里是等waitforactive_shards個副本寫入成功即可返回,但是后來讀源碼發(fā)現(xiàn)是等所有Replica返回的。

https://github.com/elastic/elasticsearch/blob/master/docs/reference/docs/data-replication.asciidoc
... Once all replicas have successfully performed the operation and responded to the primary, the primary acknowledges the successful completion of the request to the client.

如果Replica寫入失敗,ES會執(zhí)行一些重試邏輯等,但最終并不強求一定要在多少個節(jié)點寫入成功。在返回的結(jié)果中,會包含數(shù)據(jù)在多少個shard中寫入成功了,多少個失敗了:

{
    "_shards" : {
        "total" : 2,
        "failed" : 0,
        "successful" : 2
    }
}

3. 如果某個Replica持續(xù)寫失敗,用戶是否會經(jīng)常查到舊數(shù)據(jù)?

這個問題是說,假如一個Replica持續(xù)寫入失敗,那么這個Replica上的數(shù)據(jù)可能落后Primary很多。我們知道ES中Replica也是可以承擔(dān)讀請求的,那么用戶是否會讀到這個Replica上的舊數(shù)據(jù)呢?

答案是如果一個Replica寫失敗了,Primary會將這個信息報告給Master,然后Master會在Meta中更新這個Index的InSyncAllocations配置,將這個Replica從中移除,移除后它就不再承擔(dān)讀請求。在Meta更新到各個Node之前,用戶可能還會讀到這個Replica的數(shù)據(jù),但是更新了Meta之后就不會了。所以這個方案并不是非常的嚴格,考慮到ES本身就是一個近實時系統(tǒng),數(shù)據(jù)寫入后需要refresh才可見,所以一般情況下,在短期內(nèi)讀到舊數(shù)據(jù)應(yīng)該也是可接受的。

ReplicationOperation.java,寫入Replica失敗的OnFailure函數(shù):

            public void onFailure(Exception replicaException) {
                logger.trace(
                    (org.apache.logging.log4j.util.Supplier<?>) () -> new ParameterizedMessage(
                        "[{}] failure while performing [{}] on replica {}, request [{}]",
                        shard.shardId(),
                        opType,
                        shard,
                        replicaRequest),
                    replicaException);
                if (TransportActions.isShardNotAvailableException(replicaException)) {
                    decPendingAndFinishIfNeeded();
                } else {
                    RestStatus restStatus = ExceptionsHelper.status(replicaException);
                    shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure(
                        shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false));
                    String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard);
                    replicasProxy.failShardIfNeeded(shard, message,
                            replicaException, ReplicationOperation.this::decPendingAndFinishIfNeeded,
                            ReplicationOperation.this::onPrimaryDemoted, throwable -> decPendingAndFinishIfNeeded());
                }
            }

調(diào)用failShardIfNeeded:

        public void failShardIfNeeded(ShardRouting replica, String message, Exception exception,
                                      Runnable onSuccess, Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {

            logger.warn((org.apache.logging.log4j.util.Supplier<?>)
                    () -> new ParameterizedMessage("[{}] {}", replica.shardId(), message), exception);
            shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, message, exception,
                    createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
        }

shardStateAction.remoteShardFailed向Master發(fā)送請求,執(zhí)行該Replica的ShardFailed邏輯,將Shard從InSyncAllocation中移除。

    public void shardFailed(ShardRouting failedShard, UnassignedInfo unassignedInfo) {
        if (failedShard.active() && unassignedInfo.getReason() != UnassignedInfo.Reason.NODE_LEFT) {
            removeAllocationId(failedShard);

            if (failedShard.primary()) {
                Updates updates = changes(failedShard.shardId());
                if (updates.firstFailedPrimary == null) {
                    // more than one primary can be failed (because of batching, primary can be failed, replica promoted and then failed...)
                    updates.firstFailedPrimary = failedShard;
                }
            }
        }

        if (failedShard.active() && failedShard.primary()) {
            increasePrimaryTerm(failedShard.shardId());
        }
    }

ES中維護InSyncAllocation的做法,是遵循的PacificA算法,下一節(jié)會詳述。

Primary自身角度

從Primary自身的角度,一次寫入請求會先寫入Lucene,然后寫入translog。具體流程可以看這篇文章:Elasticsearch內(nèi)核解析 - 寫入篇 - 知乎?。

1. 為什么要寫translog?

translog類似于數(shù)據(jù)庫中的commitlog,或者binlog。只要translog寫入成功并flush,那么這筆數(shù)據(jù)就落盤了,數(shù)據(jù)安全性有了保證,Segment就可以晚一點落盤。因為translog是append方式寫入,寫入性能也會比隨機寫更高。

另一方面是,translog記錄了每一筆數(shù)據(jù)更改,以及數(shù)據(jù)更改的順序,所以translog也可以用于數(shù)據(jù)恢復(fù)。數(shù)據(jù)恢復(fù)包含兩方面,一方面是節(jié)點重啟后,從translog中恢復(fù)重啟前還未落盤的Segment數(shù)據(jù),另一方面是用于Primary和新的Replica之間的數(shù)據(jù)同步,即Replica逐步追上Primary數(shù)據(jù)的過程。

2. 為什么先寫Lucene,再寫translog?

寫Lucene是寫入內(nèi)存,寫入后在內(nèi)存中refresh即可讀到,寫translog是落盤,為了數(shù)據(jù)持久化以及恢復(fù)。正常來講,分布式系統(tǒng)中是先寫commitLog進行數(shù)據(jù)持久化,再在內(nèi)存中apply這次更改,那么ES為什么要反其道而行之呢?主要原因大概是寫入Lucene時,Lucene會再對數(shù)據(jù)進行一些檢查,有可能出現(xiàn)寫入Lucene失敗的情況。如果先寫translog,那么就要處理寫入translog成功但是寫入Lucene一直失敗的問題,所以ES采用了先寫Lucene的方式。

PacificA算法

PacificA是微軟亞洲研究院提出的一種用于日志復(fù)制系統(tǒng)的分布式一致性算法,論文發(fā)表于2008年(PacificA paper)。ES官方明確提出了其Replication模型基于該算法:

https://github.com/elastic/elasticsearch/blob/master/docs/reference/docs/data-replication.asciidoc
Elasticsearch’s data replication model is based on the primary-backup model and is described very well in the PacificA paper of Microsoft Research. That model is based on having a single copy from the replication group that acts as the primary shard. The other copies are called replica shards. The primary serves as the main entry point for all indexing operations. It is in charge of validating them and making sure they are correct. Once an index operation has been accepted by the primary, the primary is also responsible for replicating the operation to the other copies.

網(wǎng)上講解這個算法的文章較少,因此本文根據(jù)PacificA的論文,簡單介紹一下這個算法。該算法具有以下幾個特點:

  1. 強一致性。
  2. 單Primary向多Secondary的數(shù)據(jù)同步模式。
  3. 使用額外的一致性組件維護Configuration。
  4. 少數(shù)派Replica可用時仍可寫入。

一些名詞

首先我們介紹一下算法中的一些名詞:

  1. Replica Group:一個互為副本的數(shù)據(jù)集合叫做Replica Group,每個副本是一個Replica。一個Replica Group中只有一個副本是Primary,其余為Secondary。
  2. Configuration:一個Replica Group的Configuration描述了這個Replica Group包含哪些副本,其中Primary是誰等。
  3. Configuration Version:Configuration的版本號,每次Configuration發(fā)生變更時加1。
  4. Configuration Manager: 管理Configuration的全局組件,其保證Configuration數(shù)據(jù)的一致性。Configuration變更會由某個Replica發(fā)起,帶著Version發(fā)送給Configuration Manager,Configuration Manager會檢查Version是否正確,如果不正確則拒絕更改。
  5. Query & Update:對一個Replica Group的操作分為兩種,Query和Update,Query不會改變數(shù)據(jù),Update會更改數(shù)據(jù)。
  6. Serial Number(sn):代表每個Update操作執(zhí)行的順序,每次Update操作加1,為連續(xù)的數(shù)字。
  7. Prepared List:Update操作的準(zhǔn)備序列。
  8. Committed List:Update操作的提交序列,提交序列中的操作一定不會丟失(除非全部副本掛掉)。在同一個Replica上,Committed List一定是Prepared List的前綴。

Primary Invariant

在PacificA算法中,要求采用某種錯誤檢測機制來滿足以下不變式:

Primary Invariant:?任何時候,當(dāng)一個Replica認為自己是Primary時,Configuration Manager中維護的Configuration也認為其是當(dāng)前的Primary。任何時候,最多只有一個Replica認為自己是這個Replica Group的Primary。

Primary Invariant保證了當(dāng)一個節(jié)點認為自己是Primary時,其肯定是當(dāng)前的Primary。如果不能滿足Primary Invariant,那么Query請求就可能發(fā)送給Old Primary,讀到舊的數(shù)據(jù)。

怎么保證滿足Primary Invariant呢?論文給出的一種方法是通過Lease機制,這也是分布式系統(tǒng)中常用的一種方式。具體來說,Primary會定期獲取一個Lease,獲取之后認為某段時間內(nèi)自己肯定是Primary,一旦超過這個時間還未獲取到新的Lease就退出Primary狀態(tài)。只要各個機器的CPU不出現(xiàn)較大的時鐘漂移,那么就能夠保證Lease機制的有效性。

論文中實現(xiàn)Lease機制的方式是,Primary定期向所有Secondary發(fā)送心跳來獲取Lease,而不是所有節(jié)點都向某個中心化組件獲取Lease。這樣的好處是分散了壓力,不會出現(xiàn)中心化組件故障而導(dǎo)致所有節(jié)點失去Lease的情況。

Query

Query流程比較簡單,Query只能發(fā)送給Primary,Primary根據(jù)最新commit的數(shù)據(jù),返回對應(yīng)的值。由于算法要求滿足Primary Invariant,所以Query總是能讀到最新commit的數(shù)據(jù)。

Update

Update流程如下:

  1. Primary分配一個Serial Number(簡稱sn)給一個UpdateRequest。
  2. Primary將這個UpdateRequest加入自己的Prepared List,同時向所有Secondary發(fā)送Prepare請求,要求將這個UpdateRequest加入Prepared List。
  3. 當(dāng)所有Replica都完成了Prepare,即所有Replica的Prepared List中都包含了該Update請求時,Primary開始Commit這個請求,即將這個UpdateRequest放入Committed List中,同時Apply這個Update。需要注意的是,同一個Replica上,Committed List永遠是Prepared List的前綴,所以Primary實際上是提高Committed Point,把這個Update Request包含進來。
  4. 返回客戶端,Update操作成功。

當(dāng)下一次Primary向Secondary發(fā)送請求時,會帶上Primary當(dāng)前的Committed Point,此時Secondary才會提高自己的Committed Point。

從Update流程我們可以得出以下不變式:

Commited Invariant

我們把某一個Secondary的Committed List記為SecondaryCommittedList,其Prepared List記為SecondaryPreparedList,把Primary的Committed List記為PrimaryCommittedList。

Commited Invariant:SecondaryCommittedList一定是PrimaryCommittedList的前綴,PrimaryCommittedList一定是SecondaryPreparedList的前綴。

Reconfiguration:Secondary故障,Primary故障,新加節(jié)點

1. Secondary故障

當(dāng)一個Secondary故障時,Primary向Configuration Manager發(fā)起Reconfiguration,將故障節(jié)點從Replica Group中刪除。一旦移除這個Replica,它就不屬于這個Replica Group了,所有請求都不會再發(fā)給它。

假設(shè)某個Primary和Secondary發(fā)生了網(wǎng)絡(luò)分區(qū),但是都可以連接Configuration Manager。這時候Primary會檢測到Secondary沒有響應(yīng)了,Secondary也會檢測到Primary沒有響應(yīng)。此時兩者都會試圖發(fā)起Reconfiguration,將對方從Replica Group中移除,這里的策略是First Win的原則,誰先到Configuration Manager中更改成功,誰就留在Replica Group里,而另外一個已經(jīng)不屬于Replica Group了,也就無法再更新Configuration了。由于Primary會向Secondary請求一個Lease,在Lease有效期內(nèi)Secondary不會執(zhí)行Reconfiguration,而Primary的探測間隔必然是小于Lease時間的,所以我認為這種情況下總是傾向于Primary先進行Reconfiguration,將Secondary剔除。

2. Primary故障

當(dāng)一個Primary故障時,Secondary會收不到Primary的心跳,如果超過Lease的時間,那么Secondary就會發(fā)起Reconfiguration,將Primary剔除,這里也是First Win的原則,哪個Secondary先成功,就會變成Primary。

當(dāng)一個Secondary變成Primary后,需要先經(jīng)過一個叫做Reconciliation的階段才能提供服務(wù)。由于上述的Commited Invariant,所以原先的Primary的Committed List一定是新的Primary的Prepared List的前綴,那么我們將新的Primary的Prepared List中的內(nèi)容與當(dāng)前Replica Group中的其他節(jié)點對齊,相當(dāng)于把該節(jié)點上未Commit的記錄在所有節(jié)點上再Commit一次,那么就一定包含之前所有的Commit記錄。即以下不變式:

Reconfiguration Invariant:當(dāng)一個新的Primary在T時刻完成Reconciliation時,那么T時刻之前任何節(jié)點(包括原Primary)的Commited List都是新Primary當(dāng)前Commited List的前綴。

Reconfiguration Invariant表明了已經(jīng)Commit的數(shù)據(jù)在Reconfiguration過程中不會丟。

3. 新加節(jié)點

新加的節(jié)點需要先成為Secondary Candidate,這時候Primary就開始向其發(fā)送Prepare請求,此時這個節(jié)點還會追之前未同步過來的記錄,一旦追平,就申請成為一個Secondary,然后Primary向Configuration Manager發(fā)起配置變更,將這個節(jié)點加入Replica Group。

還有一種情況時,如果一個節(jié)點曾經(jīng)在Replica Group中,由于臨時發(fā)生故障被移除,現(xiàn)在需要重新加回來。此時這個節(jié)點上的Commited List中的數(shù)據(jù)肯定是已經(jīng)被Commit的了,但是Prepared List中的數(shù)據(jù)未必被Commit,所以應(yīng)該將未Commit的數(shù)據(jù)移除,從Committed Point開始向Primary請求數(shù)據(jù)。

算法總結(jié)

PacificA是一個讀寫都滿足強一致性的算法,它把數(shù)據(jù)的一致性與配置(Configuration)的一致性分開,使用額外的一致性組件(Configuration Manager)維護配置的一致性,在數(shù)據(jù)的可用副本數(shù)少于半數(shù)時,仍可以寫入新數(shù)據(jù)并保證強一致性。

ES在設(shè)計上參考了PacificA算法,其通過Master維護Index的Meta,類似于論文中的Configuration Manager維護Configuration。其IndexMeta中的InSyncAllocationIds代表了當(dāng)前可用的Shard,類似于論文中維護Replica Group。下一節(jié)我們會介紹ES中的SequenceNumber和Checkpoint,這兩個類似于PacificA算法中的Serial Number和Committed Point,在這一節(jié)之后,會再有一節(jié)來比較ES的實現(xiàn)與PacificA的異同。

SequenceNumber、Checkpoint與故障恢復(fù)

上面介紹了ES的一致性算法模型PacificA,該算法很重要的一點是每個Update操作都會有一個對應(yīng)的Serial Number,表示執(zhí)行的順序。在之前的ES版本中,每個寫入操作并沒有類似Serial Number的東西,所以很多事情做不了。在15年的時候,ES官方開始規(guī)劃給每個寫操作加入SequenceNumber,并設(shè)想了很多應(yīng)用場景。具體信息可以參考以下兩個鏈接:

Add Sequence Numbers to write operations #10708

Sequence IDs: Coming Soon to an Elasticsearch Cluster Near You文章來源地址http://www.zghlxwxcb.cn/news/detail-820897.html

下面我們簡單介紹一下Sequence、Checkpoint是什么,以及其應(yīng)用場景。

Term和SequenceNumber

每個寫操作都會分配兩個值,Term和SequenceNumber。Term在每次Primary變更時都會加1,類似于PacificA論文中的Configuration Version。SequenceNumber在每次操作后加1,類似于PacificA論文中的Serial Number。

由于寫請求總是發(fā)給Primary,所以Term和SequenceNumber會由Primary分配,在向Replica發(fā)送同步請求時,會帶上這兩個值。

LocalCheckpoint和GlobalCheckpoint

LocalCheckpoint代表本Shard中所有小于該值的請求都已經(jīng)處理完畢。

GlobalCheckpoint代表所有小于該值的請求在所有的Replica上都處理完畢。GlobalCheckpoint會由Primary進行維護,每個Replica會向Primary匯報自己的LocalCheckpoint,Primary根據(jù)這些信息來提升GlobalCheckpoint。

GlobalCheckpoint是一個全局的安全位置,代表其前面的請求都被所有Replica正確處理了,可以應(yīng)用在節(jié)點故障恢復(fù)后的數(shù)據(jù)回補。另一方面,GlobalCheckpoint也可以用于Translog的GC,因為之前的操作記錄可以不保存了。不過ES中Translog的GC策略是按照大小或者時間,好像并沒有使用GlobalCheckpoint。

快速故障恢復(fù)

當(dāng)一個Replica故障時,ES會將其移除,當(dāng)故障超過一定時間,ES會分配一個新的Replica到新的Node上,此時需要全量同步數(shù)據(jù)。但是如果之前故障的Replica回來了,就可以只回補故障之后的數(shù)據(jù),追平后加回來即可,實現(xiàn)快速故障恢復(fù)。實現(xiàn)快速故障恢復(fù)的條件有兩個,一個是能夠保存故障期間所有的操作以及其順序,另一個是能夠知道從哪個點開始同步數(shù)據(jù)。第一個條件可以通過保存一定時間的Translog實現(xiàn),第二個條件可以通過Checkpoint實現(xiàn),所以就能夠?qū)崿F(xiàn)快速的故障恢復(fù)。這是SequenceNumber和Checkpoint的第一個重要應(yīng)用場景。

ES與PacificA的比較

相同點

  1. Meta一致性和Data一致性分開處理:PacificA中通過Configuration Manager維護Configuration的一致性,ES中通過Master維護Meta的一致性。
  2. 維護同步中的副本集合:PacificA中維護Replica Group,ES中維護InSyncAllocationIds。
  3. SequenceNumber:在PacificA和ES中,寫操作都具有SequenceNumber,記錄操作順序。

不同點

不同點主要體現(xiàn)在ES雖然遵循PacificA,但是目前其實現(xiàn)還有很多地方不滿足算法要求,所以不能保證嚴格的強一致性。主要有以下幾點:

  1. Meta一致性:上一篇中分析了ES中Meta一致性的問題,可以看到ES并不能完全保證Meta一致性,因此也必然無法嚴格保證Data的一致性。
  2. Prepare階段:PacificA中有Prepare階段,保證數(shù)據(jù)在所有節(jié)點Prepare成功后才能Commit,保證Commit的數(shù)據(jù)不丟,ES中沒有這個階段,數(shù)據(jù)會直接寫入。
  3. 讀一致性:ES中所有InSync的Replica都可讀,提高了讀能力,但是可能讀到舊數(shù)據(jù)。另一方面是即使只能讀Primary,ES也需要Lease機制等避免讀到Old Primary。因為ES本身是近實時系統(tǒng),所以讀一致性要求可能并不嚴格。

小結(jié)

本文分析了ES中數(shù)據(jù)流的一致性問題,可以看到ES最近幾年在這一塊有很多進展,但也存在許多問題。本文是Elasticsearch分布式一致性原理剖析的最后一篇文章,該系列文章是對ES的一個調(diào)研分析總結(jié),逐步分析了ES中的節(jié)點發(fā)現(xiàn)、Master選舉、Meta一致性、Data一致性等,對能夠讀完該系列文章的同學(xué)說一聲感謝,期待與大家的交流。

Reference

Index API | Elasticsearch Reference [6.2]

Reading and Writing documents | Elasticsearch Reference [6.2]

PacificA: Replication in Log-Based Distributed Storage Systems

Add Sequence Numbers to write operations #10708

Sequence IDs: Coming Soon to an Elasticsearch Cluster Near You

到了這里,關(guān)于Elasticsearch分布式一致性原理剖析(三)-Data篇的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 聊聊分布式架構(gòu)09——分布式中的一致性協(xié)議

    聊聊分布式架構(gòu)09——分布式中的一致性協(xié)議

    目錄 01從集中式到分布式 系統(tǒng)特點 集中式特點 分布式特點 事務(wù)處理差異 02一致性協(xié)議與Paxos算法 2PC(Two-Phase Commit) 階段一:提交事務(wù)請求 階段二:執(zhí)行事務(wù)提交 優(yōu)缺點 3PC(Three-Phase Commit) 階段一:CanCommit 階段二:PreCommit 階段三:doCommit 優(yōu)缺點 Paxos算法 拜占庭將軍問題

    2024年02月08日
    瀏覽(27)
  • 分布式數(shù)據(jù)庫-事務(wù)一致性

    分布式數(shù)據(jù)庫-事務(wù)一致性

    version: v-2023060601 author: 路__ 分布式數(shù)據(jù)庫的“強一致性”應(yīng)該包含兩個方面: serializability(串行) and linearizability(線性一致) ,上述圖為“Highly Available Transactions: Virtues and Limitations”論文中對于一致性模型的介紹。圖中箭頭表示一致性模型之間的關(guān)系。對于異步網(wǎng)絡(luò)上的分

    2024年02月08日
    瀏覽(28)
  • 【分布式】一致性哈希和哈希槽

    【分布式】一致性哈希和哈希槽

    當(dāng)我們擁有了多臺存儲服務(wù)器之后,現(xiàn)在有多個key,希望可以將這些個key均勻的緩存到這些服務(wù)器上,可以使用哪些方案呢? 1.1 直接哈希取模 這是一種最容易想到的方法,使用取模算法hash(key)% N,對key進行hash運算后取模,N是機器的數(shù)量。key進行hash后的結(jié)果對3取模,得

    2024年02月03日
    瀏覽(28)
  • RocketMQ分布式事務(wù) -> 最終一致性實現(xiàn)

    RocketMQ分布式事務(wù) -> 最終一致性實現(xiàn)

    · 分布式事務(wù)的問題常在業(yè)務(wù)與面試中被提及, 近日摸魚看到這篇文章, 闡述的非常通俗易懂, 固持久化下來我博客中, 也以便于我二刷 轉(zhuǎn)載源 : 基于RocketMQ分布式事務(wù) - 完整示例 本文代碼不只是簡單的demo,考慮到一些異常情況、冪等性消費和死信隊列等情況,盡量向可靠業(yè)務(wù)

    2024年02月15日
    瀏覽(29)
  • 分布式系統(tǒng)架構(gòu)設(shè)計之分布式數(shù)據(jù)存儲的擴展方式、主從復(fù)制以及分布式一致性

    分布式系統(tǒng)架構(gòu)設(shè)計之分布式數(shù)據(jù)存儲的擴展方式、主從復(fù)制以及分布式一致性

    在分布式系統(tǒng)中,數(shù)據(jù)存儲的擴展是為了適應(yīng)業(yè)務(wù)的增長和提高系統(tǒng)的性能。分為水平擴展和垂直擴展兩種方式,這兩種方式在架構(gòu)設(shè)計和應(yīng)用場景上有著不同的優(yōu)勢和局限性。 水平擴展是通過增加節(jié)點或服務(wù)器的數(shù)量來擴大整個系統(tǒng)的容量和性能。在數(shù)據(jù)存儲領(lǐng)域,水平擴

    2024年02月03日
    瀏覽(101)
  • Zookeeper分布式一致性協(xié)議ZAB源碼剖析

    Zookeeper分布式一致性協(xié)議ZAB源碼剖析

    ZAB 協(xié)議全稱:Zookeeper Atomic Broadcast(Zookeeper 原子廣播協(xié)議)。 Zookeeper 是一個為分布式應(yīng)用提供高效且可靠的分布式協(xié)調(diào)服務(wù)。在解決分布式一致性方面,Zookeeper 并沒有使用 Paxos ,而是采用了 ZAB 協(xié)議,ZAB是Paxos算法的一種簡化實現(xiàn)。 ZAB 協(xié)議定義:ZAB 協(xié)議是為分布式協(xié)調(diào)服

    2024年02月07日
    瀏覽(35)
  • 分布式系統(tǒng)共識機制:一致性算法設(shè)計思想

    分布式系統(tǒng)共識機制:一致性算法設(shè)計思想

    這次以一個宏觀的角度去總結(jié) 自己學(xué)習(xí)過的一致性算法。一致性算法的目標(biāo)就是讓分布式系統(tǒng)里的大部分節(jié)點 保持數(shù)據(jù)一致。 區(qū)塊鏈中的共識算法,pow、pos這類就屬于這個范圍,但他們僅僅是在區(qū)塊鏈領(lǐng)域內(nèi)應(yīng)用的,下面介紹一致性算法是在分布式系統(tǒng)中 應(yīng)用廣泛的,當(dāng)然

    2023年04月16日
    瀏覽(29)
  • 本地消息表模式保障分布式系統(tǒng)最終一致性

    本地消息表模式保障分布式系統(tǒng)最終一致性

    訂單表 消息表 process_queue 庫存系統(tǒng) return_queue 說明 成功 失敗 / / / 訂單庫回滾 成功 成功 失敗 / / 訂單系統(tǒng)重發(fā)消息 成功 成功 成功 失敗 / Broker自動重試,注意接口冪等 成功 成功 成功 庫存不足退回 / Broker通知回掉,訂單/消息作廢 成功 成功 成功 成功 失敗 訂單系統(tǒng)重發(fā)消

    2024年04月28日
    瀏覽(35)
  • 分布式一致性算法——Paxos 和 Raft 算法

    分布式一致性算法——Paxos 和 Raft 算法

    本文隸屬于專欄《100個問題搞定大數(shù)據(jù)理論體系》,該專欄為筆者原創(chuàng),引用請注明來源,不足和錯誤之處請在評論區(qū)幫忙指出,謝謝! 本專欄目錄結(jié)構(gòu)和參考文獻請見100個問題搞定大數(shù)據(jù)理論體系 Paxos和Raft算法都是 分布式一致性算法 ,它們的目的都是 在一個分布式系統(tǒng)

    2024年01月20日
    瀏覽(31)
  • 分布式「走進分布式一致性協(xié)議」從2PC、3PC、Paxos 到 ZAB

    分布式「走進分布式一致性協(xié)議」從2PC、3PC、Paxos 到 ZAB

    設(shè)計一個分布式系統(tǒng)必定會遇到一個問題—— 因為分區(qū)容忍性(partition tolerance)的存在,就必定要求我們需要在系統(tǒng)可用性(availability)和數(shù)據(jù)一致性(consistency)中做出權(quán)衡 。這就是著名的 CAP 一致性(Consistency)是指多副本(Replications)問題中的數(shù)據(jù)一致性。關(guān)于分布式

    2024年02月03日
    瀏覽(28)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包