国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Elasticsearch底層原理分析——新建、索引文檔

這篇具有很好參考價(jià)值的文章主要介紹了Elasticsearch底層原理分析——新建、索引文檔。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

es版本

8.1.0

重要概念回顧

Elasticsearch Node的角色

與下文流程相關(guān)的角色介紹:

Node Roles 配置 主要功能說(shuō)明
master node.roles: [ master ] 有資格參與選舉成為master節(jié)點(diǎn),從而進(jìn)行集群范圍的管理工作,如創(chuàng)建或刪除索引、跟蹤哪些節(jié)點(diǎn)是集群的一部分以及決定將哪些分片分配給哪些節(jié)點(diǎn)等
data node.roles: [ data ] 數(shù)據(jù)節(jié)點(diǎn)保存已索引的文檔的分片。處理數(shù)據(jù)相關(guān)操作,例如 CRUD、搜索和聚合。
node.roles: [ ] 節(jié)點(diǎn)不填任何角色,則是協(xié)調(diào)節(jié)點(diǎn),換言之每個(gè)節(jié)點(diǎn),也都有協(xié)調(diào)節(jié)點(diǎn)功能。具備路由請(qǐng)求、對(duì)搜索結(jié)果合并和分發(fā)批量索引等功能。本質(zhì)上,協(xié)調(diào)節(jié)點(diǎn)的行為就像智能負(fù)載均衡器

詳見:https://www.elastic.co/guide/en/elasticsearch/reference/8.9/modules-node.html

分片

  • 一個(gè)分片是一個(gè) Lucene 的實(shí)例,是一個(gè)完整的搜索引擎
  • 主分片的數(shù)量決定了索引最多能存儲(chǔ)多少數(shù)據(jù),多分片機(jī)制,帶來(lái)存儲(chǔ)量級(jí)提升
  • 主分片數(shù)不可更改,和數(shù)據(jù)路由算法有關(guān)
  • 副本分片可以防止硬件故障導(dǎo)致的數(shù)據(jù)丟失,同時(shí)可以提供讀請(qǐng)求,增大能處理的搜索吞吐量
  • 對(duì)文檔的新建、索引和刪除請(qǐng)求都是寫操作,必須在主分片上面完成,之后才能被復(fù)制到相關(guān)的副本分片

https://www.elastic.co/guide/cn/elasticsearch/guide/current/_add-an-index.html

新建、索引和刪除文檔

以官網(wǎng)(https://www.elastic.co/guide/cn/elasticsearch/guide/current/distrib-write.html)例子分析,es集群有3個(gè)節(jié)點(diǎn),其中有個(gè)索引有兩分片(P0、P1),兩副本(P0、R0、R0,P1、R1、R1),如創(chuàng)建索引時(shí):

PUT /blogs
{
   "settings" : {
      "number_of_shards" : 2,
      "number_of_replicas" : 2
   }
}

Elasticsearch底層原理分析——新建、索引文檔,elasticsearch,搜索引擎

再對(duì)一些前提知識(shí)回顧一下:

  • 每個(gè)節(jié)點(diǎn)都具備協(xié)調(diào)節(jié)點(diǎn)功能,也即路由請(qǐng)求、對(duì)搜索結(jié)果合并和分發(fā)批量索引等功能
  • 對(duì)文檔的新建、索引和刪除請(qǐng)求等寫操作,必須在主分片上面完成,之后才能被復(fù)制到相關(guān)的副本分片

這個(gè)例子中的兩個(gè)假設(shè):

  • 請(qǐng)求集群時(shí),es采用的是隨機(jī)輪詢方法進(jìn)行負(fù)載均衡,每個(gè)節(jié)點(diǎn)都有可能被請(qǐng)求到。在這個(gè)例子中,假設(shè)先請(qǐng)求到node1
  • 節(jié)點(diǎn)使用文檔的 _id 確定文檔屬于分片 0

所以(直接引用官網(wǎng)步驟):

  • 客戶端向 Node 1 發(fā)送新建、索引或者刪除請(qǐng)求。
  • 節(jié)點(diǎn)使用文檔的 _id 確定文檔屬于分片 0 。請(qǐng)求會(huì)被轉(zhuǎn)發(fā)到 Node 3,因?yàn)榉制?0 的主分片目前被分配在 Node 3 上。
  • Node 3 在主分片上面執(zhí)行請(qǐng)求。如果成功了,它將請(qǐng)求并行轉(zhuǎn)發(fā)到 Node 1 和 Node 2 的副本分片上。一旦所有的副本分片都報(bào)告成功, Node 3 將向協(xié)調(diào)節(jié)點(diǎn)報(bào)告成功,協(xié)調(diào)節(jié)點(diǎn)向客戶端報(bào)告成功。

源碼理解

如何確定文檔屬于哪個(gè)分片,請(qǐng)求轉(zhuǎn)發(fā)哪個(gè)節(jié)點(diǎn)

獲取分片ID是從TransportBulkAction類中開始調(diào)用開始

int shardId = docWriteRequest.route(indexRouting);

具體實(shí)現(xiàn)在IndexRouting類中。簡(jiǎn)述步驟就是:

  1. 對(duì)routing值進(jìn)行Murmur3Hash運(yùn)算(如果沒有設(shè)置routing,值默認(rèn)是doc id值)
  2. 對(duì)hash后的值進(jìn)行取模運(yùn)算,routingNumShards默認(rèn)1024,routingFactor默認(rèn)512
protected int shardId(String id, @Nullable String routing) {
    return hashToShardId(effectiveRoutingToHash(routing == null ? id : routing));
}

protected final int hashToShardId(int hash) {
   return Math.floorMod(hash, routingNumShards) / routingFactor;
}

private static int effectiveRoutingToHash(String effectiveRouting) {
    return Murmur3HashFunction.hash(effectiveRouting);
}
為何需要路由,以及路由帶來(lái)什么問(wèn)題
  1. 為何需要路由
    總的來(lái)說(shuō),就是多分片設(shè)計(jì),可以承載更大量級(jí)數(shù)據(jù),而分片預(yù)分配設(shè)計(jì),可以簡(jiǎn)單的獲取文檔位置,能減少數(shù)據(jù)分裂風(fēng)險(xiǎn),以及對(duì)數(shù)據(jù)重新索引友好
    https://www.elastic.co/guide/cn/elasticsearch/guide/current/overallocation.html
  2. 帶來(lái)的問(wèn)題:

創(chuàng)建索引的時(shí)候就需要確定好主分片的數(shù)量,并且永遠(yuǎn)不會(huì)改變這個(gè)數(shù)量。因?yàn)槿绻麛?shù)量變化了,那么所有之前路由的值都會(huì)無(wú)效,文檔也再也找不到了。
https://www.elastic.co/guide/cn/elasticsearch/guide/current/routing-value.html

如何根據(jù)分片ID確定節(jié)點(diǎn)

代碼在TransportReplicationAction#doRun方法中,簡(jiǎn)單概括就是state中存有集群信息,通過(guò)傳入分片ID,先獲取主分片信息,再通過(guò)主分片節(jié)點(diǎn)ID,獲取對(duì)應(yīng)節(jié)點(diǎn)信息。

final ShardRouting primary = state.getRoutingTable().shardRoutingTable(request.shardId()).primaryShard();
if (primary == null || primary.active() == false) {
    logger.trace(
        "primary shard [{}] is not yet active, scheduling a retry: action [{}], request [{}], "
            + "cluster state version [{}]",
        request.shardId(),
        actionName,
        request,
        state.version()
    );
    retryBecauseUnavailable(request.shardId(), "primary shard is not active");
    return;
}
if (state.nodes().nodeExists(primary.currentNodeId()) == false) {
    logger.trace(
        "primary shard [{}] is assigned to an unknown node [{}], scheduling a retry: action [{}], request [{}], "
            + "cluster state version [{}]",
        request.shardId(),
        primary.currentNodeId(),
        actionName,
        request,
        state.version()
    );
    retryBecauseUnavailable(request.shardId(), "primary shard isn't assigned to a known node.");
    return;
}
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
    performLocalAction(state, primary, node, indexMetadata);
} else {
    performRemoteAction(state, primary, node);
}
主分片執(zhí)行流程
1. 寫一致性

默認(rèn)寫成功一個(gè)主分片即可,源碼在ActiveShardCount#enoughShardsActive方法中

https://www.elastic.co/guide/en/elasticsearch/client/curator/current/option_wait_for_active_shards.html

    public boolean enoughShardsActive(final IndexShardRoutingTable shardRoutingTable) {
        final int activeShardCount = shardRoutingTable.activeShards().size();
        if (this == ActiveShardCount.ALL) {
            // adding 1 for the primary in addition to the total number of replicas,
            // which gives us the total number of shard copies
            return activeShardCount == shardRoutingTable.replicaShards().size() + 1;
        } else if (this == ActiveShardCount.DEFAULT) {
            return activeShardCount >= 1;
        } else {
            return activeShardCount >= value;
        }
    }
2. 具體寫流程

參考官網(wǎng)(https://www.elastic.co/guide/cn/elasticsearch/guide/current/translog.html)理解。圖片所示是一個(gè)lucene索引,lucene索引下面有三個(gè)段(segment),圖中Searchable表示從內(nèi)存(In-memory buffer,也叫Indexing Buffer)刷新到磁盤,寫入物理文件,不可更改,其中fsync操作將新文檔刷新到磁盤的操作,性能代價(jià)是很大的。所以會(huì)先將文檔寫入文件系統(tǒng)緩存中,也即圖中In-memory buffer中,對(duì)應(yīng)的是 Indexing Buffer(https://www.elastic.co/guide/en/elasticsearch/reference/8.10/indexing-buffer.html)。
所以流程是:

  • 將文檔寫入Indexing Buffer中
  • 將操作追加寫入 translog 中,以便確保即便在刷盤時(shí)異常,也能從失敗中恢復(fù)數(shù)據(jù)
  • 將內(nèi)存中的數(shù)據(jù)刷新持久化到磁盤中(默認(rèn)情況下每個(gè)分片會(huì)每秒自動(dòng)刷新一次)
  • 在刷新(flush)之后,段被全量提交,并且事務(wù)日志被清空

Elasticsearch底層原理分析——新建、索引文檔,elasticsearch,搜索引擎

Elasticsearch底層原理分析——新建、索引文檔,elasticsearch,搜索引擎

相關(guān)源碼

主要在InternalEngine類中:

  • index方法包含寫入In-memory buffer對(duì)應(yīng)生成IndexResult(?)和寫translog;
  • 將內(nèi)存中的數(shù)據(jù)刷新持久化到磁盤中在refresh方法;
  • 段被全量提交,和事務(wù)日志被清空在flush方法。
index方法
    public IndexResult index(Index index) throws IOException {
        // 確保傳入的文檔的唯一標(biāo)識(shí)是 IdFieldMapper
        assert Objects.equals(index.uid().field(), IdFieldMapper.NAME) : index.uid().field();
        // 檢查 index 的來(lái)源是否不是恢復(fù)操作
        final boolean doThrottle = index.origin().isRecovery() == false;
        // 獲取讀鎖
        try (ReleasableLock releasableLock = readLock.acquire()) {
            // 確保引擎處于打開狀態(tài)
            ensureOpen();
            // 斷言傳入的 index 的序列號(hào)符合預(yù)期
            assert assertIncomingSequenceNumber(index.origin(), index.seqNo());
            int reservedDocs = 0;
            try (
                Releasable ignored = versionMap.acquireLock(index.uid().bytes());
                Releasable indexThrottle = doThrottle ? throttle.acquireThrottle() : () -> {}
            ) {
                lastWriteNanos = index.startTime();
                // 代碼中有一段注釋,描述了關(guān)于追加(append-only)優(yōu)化的注意事項(xiàng)。根據(jù)注釋所述,如果引擎接收到一個(gè)帶有自動(dòng)生成的ID的文檔,
                // 可以優(yōu)化處理并直接使用 addDocument 而不是 updateDocument,從而跳過(guò)版本和索引查找。此外,還使用文檔的時(shí)間戳來(lái)檢測(cè)是否可能已經(jīng)添加過(guò)該文檔。
                // 獲取索引策略
                final IndexingStrategy plan = indexingStrategyForOperation(index);
                reservedDocs = plan.reservedDocs;

                final IndexResult indexResult;
                if (plan.earlyResultOnPreFlightError.isPresent()) {
                    assert index.origin() == Operation.Origin.PRIMARY : index.origin();
                    indexResult = plan.earlyResultOnPreFlightError.get();
                    assert indexResult.getResultType() == Result.Type.FAILURE : indexResult.getResultType();
                } else {
                    // generate or register sequence number
                    // 生成或注冊(cè)文檔的序列號(hào)。對(duì)于主分片的操作,會(huì)生成新的序列號(hào)。
                    if (index.origin() == Operation.Origin.PRIMARY) {
                        index = new Index(
                            index.uid(),
                            index.parsedDoc(),
                            // 生成新的序列號(hào)
                            generateSeqNoForOperationOnPrimary(index),
                            index.primaryTerm(),
                            index.version(),
                            index.versionType(),
                            index.origin(),
                            index.startTime(),
                            index.getAutoGeneratedIdTimestamp(),
                            index.isRetry(),
                            index.getIfSeqNo(),
                            index.getIfPrimaryTerm()
                        );

                        // 檢查了當(dāng)前操作是否應(yīng)該追加到 Lucene 索引中
                        final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false;
                        if (toAppend == false) {
                            // 更新主分片的最大序列號(hào)
                            advanceMaxSeqNoOfUpdatesOnPrimary(index.seqNo());
                        }
                    } else {
                        // 對(duì)于副本分片的操作,會(huì)標(biāo)記已經(jīng)見過(guò)的序列號(hào),序列號(hào)已經(jīng)被使用。
                        markSeqNoAsSeen(index.seqNo());
                    }

                    assert index.seqNo() >= 0 : "ops should have an assigned seq no.; origin: " + index.origin();

                    if (plan.indexIntoLucene || plan.addStaleOpToLucene) {
                        // 寫到 Lucene 中
                        indexResult = indexIntoLucene(index, plan);
                    } else {
                        indexResult = new IndexResult(
                            plan.versionForIndexing,
                            index.primaryTerm(),
                            index.seqNo(),
                            plan.currentNotFoundOrDeleted
                        );
                    }
                }
                // 判斷索引操作是否來(lái)自 Translog。如果是來(lái)自 Translog 的操作,就不再處理,因?yàn)檫@已經(jīng)是一個(gè)已經(jīng)被記錄的操作
                if (index.origin().isFromTranslog() == false) {
                    final Translog.Location location;
                    if (indexResult.getResultType() == Result.Type.SUCCESS) {
                        // 如果索引操作成功, 將該操作添加到 Translog 中,并獲取 Translog 的位置
                        location = translog.add(new Translog.Index(index, indexResult));
                    } else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
                        // if we have document failure, record it as a no-op in the translog and Lucene with the generated seq_no
                        // 如果索引操作失敗,并且具有序列號(hào), 則將失敗的操作記錄為一個(gè) no-op 操作
                        final NoOp noOp = new NoOp(
                            indexResult.getSeqNo(),
                            index.primaryTerm(),
                            index.origin(),
                            index.startTime(),
                            indexResult.getFailure().toString()
                        );
                        location = innerNoOp(noOp).getTranslogLocation();
                    } else {
                        // 如果索引操作失敗,并且沒有序列號(hào),將 location 設(shè)置為 null
                        location = null;
                    }
                    // 設(shè)置Translog 位置
                    indexResult.setTranslogLocation(location);
                }
                // 如果索引操作成功且需要寫入 Lucene, 則獲取 Translog 的位置信息,用于更新版本映射
                if (plan.indexIntoLucene && indexResult.getResultType() == Result.Type.SUCCESS) {
                    final Translog.Location translogLocation = trackTranslogLocation.get() ? indexResult.getTranslogLocation() : null;
                    versionMap.maybePutIndexUnderLock(
                        index.uid().bytes(),
                        new IndexVersionValue(translogLocation, plan.versionForIndexing, index.seqNo(), index.primaryTerm())
                    );
                }
                // 本地 Checkpoint 的更新, 標(biāo)記當(dāng)前序列號(hào)已經(jīng)被處理
                localCheckpointTracker.markSeqNoAsProcessed(indexResult.getSeqNo());
                if (indexResult.getTranslogLocation() == null) {
                    // the op is coming from the translog (and is hence persisted already) or it does not have a sequence number
                    // 如果 Translog 的位置信息為 null,說(shuō)明該操作來(lái)自于 Translog,已經(jīng)被持久化,或者該操作沒有序列號(hào)。
                    // 在這種情況下,標(biāo)記當(dāng)前序列號(hào)已經(jīng)被持久化
                    assert index.origin().isFromTranslog() || indexResult.getSeqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO;
                    localCheckpointTracker.markSeqNoAsPersisted(indexResult.getSeqNo());
                }
                indexResult.setTook(System.nanoTime() - index.startTime());
                // 將操作結(jié)果凍結(jié),確保其不可變
                indexResult.freeze();
                return indexResult;
            } finally {
                releaseInFlightDocs(reservedDocs);
            }
        } catch (RuntimeException | IOException e) {
            try {
                if (e instanceof AlreadyClosedException == false && treatDocumentFailureAsTragicError(index)) {
                    failEngine("index id[" + index.id() + "] origin[" + index.origin() + "] seq#[" + index.seqNo() + "]", e);
                } else {
                    maybeFailEngine("index id[" + index.id() + "] origin[" + index.origin() + "] seq#[" + index.seqNo() + "]", e);
                }
            } catch (Exception inner) {
                e.addSuppressed(inner);
            }
            throw e;
        }
    }
自動(dòng)sync條件translog條件:

相關(guān)配置:

  • index.translog.sync_interval: 默認(rèn)5s
  • index.translog.durability:默認(rèn)配置的是request,即每次寫請(qǐng)求完成之后執(zhí)行(e.g. index, delete, update, bulk)
  • index.translog.flush_threshold_size:默認(rèn)512mb

https://www.elastic.co/guide/cn/elasticsearch/guide/current/translog.html
https://www.elastic.co/guide/en/elasticsearch/reference/8.11/index-modules-translog.html

private void maybeSyncTranslog(final IndexShard indexShard) throws IOException {
    if (indexShard.getTranslogDurability() == Translog.Durability.REQUEST
        && indexShard.getLastSyncedGlobalCheckpoint() < indexShard.getLastKnownGlobalCheckpoint()) {
        indexShard.sync();
    }
}
refresh源碼:
    final boolean refresh(String source, SearcherScope scope, boolean block) throws EngineException {
        // both refresh types will result in an internal refresh but only the external will also
        // pass the new reader reference to the external reader manager.
        // 獲取當(dāng)前的本地檢查點(diǎn)
        final long localCheckpointBeforeRefresh = localCheckpointTracker.getProcessedCheckpoint();
        boolean refreshed;
        try {
            // refresh does not need to hold readLock as ReferenceManager can handle correctly if the engine is closed in mid-way.
            // 嘗試增加存儲(chǔ)的引用計(jì)數(shù),以確保在刷新期間沒有人關(guān)閉存儲(chǔ)
            if (store.tryIncRef()) {
                // increment the ref just to ensure nobody closes the store during a refresh
                try {
                    // even though we maintain 2 managers we really do the heavy-lifting only once.
                    // the second refresh will only do the extra work we have to do for warming caches etc.
                    ReferenceManager<ElasticsearchDirectoryReader> referenceManager = getReferenceManager(scope);
                    // it is intentional that we never refresh both internal / external together
                    if (block) {
                        referenceManager.maybeRefreshBlocking();
                        refreshed = true;
                    } else {
                        refreshed = referenceManager.maybeRefresh();
                    }
                } finally {
                    // 減少存儲(chǔ)的引用計(jì)數(shù)
                    store.decRef();
                }
                if (refreshed) {
                    lastRefreshedCheckpointListener.updateRefreshedCheckpoint(localCheckpointBeforeRefresh);
                }
            } else {
                refreshed = false;
            }
        } catch (AlreadyClosedException e) {
            failOnTragicEvent(e);
            throw e;
        } catch (Exception e) {
            try {
                failEngine("refresh failed source[" + source + "]", e);
            } catch (Exception inner) {
                e.addSuppressed(inner);
            }
            throw new RefreshFailedEngineException(shardId, e);
        }
        assert refreshed == false || lastRefreshedCheckpoint() >= localCheckpointBeforeRefresh
            : "refresh checkpoint was not advanced; "
                + "local_checkpoint="
                + localCheckpointBeforeRefresh
                + " refresh_checkpoint="
                + lastRefreshedCheckpoint();
        // TODO: maybe we should just put a scheduled job in threadPool?
        // We check for pruning in each delete request, but we also prune here e.g. in case a delete burst comes in and then no more deletes
        // for a long time:
        maybePruneDeletes();
        mergeScheduler.refreshConfig();
        return refreshed;
    }
flush源碼:

執(zhí)行條件主要在這段注釋里面:

// Only flush if (1) Lucene has uncommitted docs, or (2) forced by caller, or (3) the
// newly created commit points to a different translog generation (can free translog),
// or (4) the local checkpoint information in the last commit is stale, which slows down future recoveries.
    @Override
    public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
        // 確保引擎是打開的
        ensureOpen();
        if (force && waitIfOngoing == false) {
            // 如果強(qiáng)制執(zhí)行 flush 但不等待正在進(jìn)行的 flush 操作,拋出異常
            assert false : "wait_if_ongoing must be true for a force flush: force=" + force + " wait_if_ongoing=" + waitIfOngoing;
            throw new IllegalArgumentException(
                "wait_if_ongoing must be true for a force flush: force=" + force + " wait_if_ongoing=" + waitIfOngoing
            );
        }
        // 獲取讀鎖
        try (ReleasableLock lock = readLock.acquire()) {
            ensureOpen();
            if (flushLock.tryLock() == false) {
                // if we can't get the lock right away we block if needed otherwise barf
                if (waitIfOngoing == false) {
                    return;
                }
                logger.trace("waiting for in-flight flush to finish");
                flushLock.lock();
                logger.trace("acquired flush lock after blocking");
            } else {
                logger.trace("acquired flush lock immediately");
            }
            try {
                /**
                 * 1. Lucene 有未提交的文檔: 如果 Lucene 索引中存在未提交的文檔,即有尚未寫入磁盤的更改。
                 * 2. 被調(diào)用者強(qiáng)制執(zhí)行: 如果調(diào)用者明確要求執(zhí)行 flush 操作,即 force 參數(shù)為 true。
                 * 3. 新創(chuàng)建的提交指向不同的 translog 生成: 當(dāng)新創(chuàng)建的提交(commit)指向不同的 translog 生成時(shí),執(zhí)行 flush 操作。
                 * 這可能是因?yàn)?translog 已經(jīng)占用了一定的空間,需要釋放這些舊的 translog。
                 * 4. 上一次提交的本地檢查點(diǎn)信息已過(guò)期: 如果上一次提交的段信息中的本地檢查點(diǎn)信息已過(guò)期,這可能會(huì)導(dǎo)致未來(lái)的恢復(fù)操作變慢。
                 * 因此,需要執(zhí)行 flush 操作來(lái)更新本地檢查點(diǎn)信息。
                 */
                // 檢查 Lucene 是否有未提交的更改。
                boolean hasUncommittedChanges = indexWriter.hasUncommittedChanges();
                // 檢查是否應(yīng)定期執(zhí)行 flush 操作
                boolean shouldPeriodicallyFlush = shouldPeriodicallyFlush();
                if (hasUncommittedChanges
                    || force
                    || shouldPeriodicallyFlush
                    // 檢查是否本地檢查點(diǎn)信息在上一次提交的段信息中過(guò)期,如果是,則觸發(fā) flush
                    || getProcessedLocalCheckpoint() > Long.parseLong(
                        lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)
                    )) {
                    ensureCanFlush();
                    try {
                        // 滾動(dòng) translog 的生成
                        translog.rollGeneration();
                        logger.trace("starting commit for flush; commitTranslog=true");
                        // 提交索引寫入器,包括在 Lucene 中提交未提交的文檔,并將 translog 提交到持久存儲(chǔ)。
                        commitIndexWriter(indexWriter, translog);
                        logger.trace("finished commit for flush");

                        // a temporary debugging to investigate test failure - issue#32827. Remove when the issue is resolved
                        logger.debug(
                            "new commit on flush, hasUncommittedChanges:{}, force:{}, shouldPeriodicallyFlush:{}",
                            hasUncommittedChanges,
                            force,
                            shouldPeriodicallyFlush
                        );

                        // we need to refresh in order to clear older version values
                        // 強(qiáng)制刷新索引以清除舊的版本信息。
                        refresh("version_table_flush", SearcherScope.INTERNAL, true);
                        translog.trimUnreferencedReaders();
                    } catch (AlreadyClosedException e) {
                        failOnTragicEvent(e);
                        throw e;
                    } catch (Exception e) {
                        throw new FlushFailedEngineException(shardId, e);
                    }
                    // 刷新最后提交的段信息
                    refreshLastCommittedSegmentInfos();

                }
            } catch (FlushFailedEngineException ex) {
                maybeFailEngine("flush", ex);
                throw ex;
            } finally {
                flushLock.unlock();
            }
        }
        // We don't have to do this here; we do it defensively to make sure that even if wall clock time is misbehaving
        // (e.g., moves backwards) we will at least still sometimes prune deleted tombstones:
        if (engineConfig.isEnableGcDeletes()) {
            pruneDeletedTombstones();
        }
    }
    protected void commitIndexWriter(final IndexWriter writer, final Translog translog) throws IOException {
        // 確保引擎的狀態(tài)是允許刷新的
        ensureCanFlush();
        try {
            // 獲取已處理的本地檢查點(diǎn)
            final long localCheckpoint = localCheckpointTracker.getProcessedCheckpoint();
            writer.setLiveCommitData(() -> {
                
                final Map<String, String> commitData = new HashMap<>(8);
                // 添加 translog 的 UUID 到提交數(shù)據(jù)中
                commitData.put(Translog.TRANSLOG_UUID_KEY, translog.getTranslogUUID());
                // 添加本地檢查點(diǎn)到提交數(shù)據(jù)中
                commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint));
                // 添加最大序列號(hào)到提交數(shù)據(jù)中
                commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(localCheckpointTracker.getMaxSeqNo()));
                // 添加最大不安全自動(dòng)生成的 ID 時(shí)間戳到提交數(shù)據(jù)中
                commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get()));
                // 添加歷史 UUID 到提交數(shù)據(jù)中
                commitData.put(HISTORY_UUID_KEY, historyUUID);
                final String currentForceMergeUUID = forceMergeUUID;
                if (currentForceMergeUUID != null) {
                    //  如果強(qiáng)制合并 UUID 存在,則添加到提交數(shù)據(jù)中
                    commitData.put(FORCE_MERGE_UUID_KEY, currentForceMergeUUID);
                }
                // 添加最小保留序列號(hào)到提交數(shù)據(jù)中
                commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo()));
                commitData.put(ES_VERSION, Version.CURRENT.toString());
                logger.trace("committing writer with commit data [{}]", commitData);
                return commitData.entrySet().iterator();
            });
            shouldPeriodicallyFlushAfterBigMerge.set(false);
            // 調(diào)用Lucene 會(huì)將所有未提交的文檔寫入磁盤,生成新的段
            writer.commit();
        } catch (final Exception ex) {
            try {
                failEngine("lucene commit failed", ex);
            } catch (final Exception inner) {
                ex.addSuppressed(inner);
            }
            throw ex;
        } catch (final AssertionError e) {
            /*
             * If assertions are enabled, IndexWriter throws AssertionError on commit if any files don't exist, but tests that randomly
             * throw FileNotFoundException or NoSuchFileException can also hit this.
             */
            if (ExceptionsHelper.stackTrace(e).contains("org.apache.lucene.index.IndexWriter.filesExist")) {
                final EngineException engineException = new EngineException(shardId, "failed to commit engine", e);
                try {
                    failEngine("lucene commit failed", engineException);
                } catch (final Exception inner) {
                    engineException.addSuppressed(inner);
                }
                throw engineException;
            } else {
                throw e;
            }
        }
    }
寫副本

副本在寫入數(shù)據(jù)到 translog 后就可以返回了。源碼主要在ReplicationOperation類中

@Override
public void tryAction(ActionListener<ReplicaResponse> listener) {
    replicasProxy.performOn(shard, replicaRequest, primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, listener);
}

處理結(jié)束給協(xié)調(diào)節(jié)點(diǎn)返回消息

 @Override
 public void onResponse(Void aVoid) {
     successfulShards.incrementAndGet();
     try {
         updateCheckPoints(primary.routingEntry(), primary::localCheckpoint, primary::globalCheckpoint);
     } finally {
         decPendingAndFinishIfNeeded();
     }
 }

參考:
https://www.elastic.co/guide/cn/elasticsearch/guide/current/translog.html
https://www.golangblogs.com/read/elasticsearch/date-2023.05.24.16.58.36?wd=Elasticsearch
《Elasticsearch源碼解析與優(yōu)化實(shí)戰(zhàn)》文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-759385.html

到了這里,關(guān)于Elasticsearch底層原理分析——新建、索引文檔的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • elasticSearch創(chuàng)建索引庫(kù)、映射、文檔

    elasticSearch創(chuàng)建索引庫(kù)、映射、文檔

    創(chuàng)建索引庫(kù) 使用postman或curl這樣的工具創(chuàng)建 參數(shù): number_of_shards:設(shè)置分片的數(shù)量,在集群中通常設(shè)置多個(gè)分片,表示一個(gè)索引庫(kù)將拆分成多片分別存儲(chǔ)不同的結(jié)點(diǎn),提高了ES的處理能力和高可用性,入門程序使用單機(jī)環(huán)境,這里設(shè)置為1。 number_of_replicas:設(shè)置副本的數(shù)量,設(shè)

    2024年02月04日
    瀏覽(26)
  • elasticsearch索引、文檔、映射等概念

    1、文檔(document) 文檔是存儲(chǔ)在Elasticsearch中的一個(gè)JSON格式的字符串。它就像在關(guān)系數(shù)據(jù)庫(kù)中表的一行。每個(gè)存儲(chǔ)在索引中的一個(gè)文檔都有一個(gè)類型和一個(gè)ID,每個(gè)文檔都是一個(gè)JSON對(duì)象,存儲(chǔ)了零個(gè)或多個(gè)字段或鍵值對(duì)。原始的JSON文檔被存儲(chǔ)在一個(gè)叫做_source的字段中,當(dāng)搜

    2023年04月17日
    瀏覽(18)
  • ElasticSearch索引庫(kù)、文檔、RestClient操作

    ElasticSearch索引庫(kù)、文檔、RestClient操作

    es中的索引是指相同類型的文檔集合 ,即mysql中表的概念 映射:索引中文檔字段的約束,比如名稱、類型 mapping映射是對(duì)索引庫(kù)中文檔的約束。類似mysql對(duì)表單字段的約束 type :字段數(shù)據(jù)類型,常見的類型有: 字符串:text(可分詞的文本)、keyword(不可分詞的文本,例如國(guó)家

    2024年02月10日
    瀏覽(24)
  • Elasticsearch:使用 pipelines 路由文檔到想要的 Elasticsearch 索引中去

    Elasticsearch:使用 pipelines 路由文檔到想要的 Elasticsearch 索引中去

    當(dāng)應(yīng)用程序需要向 Elasticsearch 添加文檔時(shí),它們首先要知道目標(biāo)索引是什么。在很多的應(yīng)用案例中,特別是針對(duì)時(shí)序數(shù)據(jù),我們想把每個(gè)月的數(shù)據(jù)寫入到一個(gè)特定的索引中。一方面便于管理索引,另外一方面在將來(lái)搜索的時(shí)候可以按照每個(gè)月的索引來(lái)進(jìn)行搜索,這樣速度更快

    2023年04月09日
    瀏覽(25)
  • ElasticSearch底層讀寫工作原理

    ElasticSearch底層讀寫工作原理

    目錄 ES底層讀寫工作原理分析 ES寫入數(shù)據(jù)的過(guò)程 ES讀取數(shù)據(jù)的過(guò)程 根據(jù)id查詢數(shù)據(jù)的過(guò)程 根據(jù)查詢數(shù)據(jù)的過(guò)程 寫數(shù)據(jù)底層原理 ? ? ? ? ?寫請(qǐng)求是寫入 primary shard,然后同步給所有的 replica shard;讀請(qǐng)求可以從 primary shard 或 replica shard 讀取,采用的是隨機(jī)輪詢算法。

    2024年02月21日
    瀏覽(17)
  • ElasticSearch 底層讀寫原理

    ElasticSearch 底層讀寫原理

    ? 寫請(qǐng)求是寫入 primary shard,然后同步給所有的 replica shard;讀請(qǐng)求可以從 primary shard 或 replica shard 讀取,采用的是隨機(jī)輪詢算法。 1.選擇任意一個(gè)DataNode發(fā)送請(qǐng)求,例如:node2。此時(shí),node2就成為一個(gè)coordinating node(協(xié)調(diào)節(jié)點(diǎn)) 2.計(jì)算得到文檔要寫入的分片 shard = hash(routing)

    2024年04月12日
    瀏覽(28)
  • 【SpringBoot】整合Elasticsearch 操作索引及文檔

    【SpringBoot】整合Elasticsearch 操作索引及文檔

    官網(wǎng)操作文檔:Elasticsearch Clients | Elastic ????? ???????? 踩坑太多了。。。這里表明一下Spring Boot2.4以上版本可能會(huì)出現(xiàn)問(wèn)題,所以我降到了2.2.1.RELEASE。對(duì)于現(xiàn)在2023年6月而言,Es版本已經(jīng)到了8.8,而SpringBoot版本已經(jīng)到了3.x版本。如果是高版本的Boot在配置類的時(shí)候會(huì)發(fā)現(xiàn)

    2024年02月09日
    瀏覽(58)
  • Elasticsearch如何創(chuàng)建索引,添加,刪除,更新文檔

    Elasticsearch如何創(chuàng)建索引,添加,刪除,更新文檔

    了解es基本概念:elasticsearch(es)背景故事與基本概念 安裝es:Linux安裝Elasticsearch詳細(xì)教程 安裝kibana:Linux安裝Kibana詳細(xì)教程 熟悉Json 熟悉REST接口 檢查 es 及 Kibana 是否運(yùn)行正常 創(chuàng)建一個(gè)名為 twitter 的索引(index),并插入一個(gè)文檔(document) 在關(guān)系型數(shù)據(jù)庫(kù)中,需要使用DDL語(yǔ)

    2023年04月08日
    瀏覽(29)
  • ElasticSearch-索引和文檔的創(chuàng)建修改刪除

    ElasticSearch-索引和文檔的創(chuàng)建修改刪除

    目錄 一、創(chuàng)建索引 二、查看索引 三、索引是否存在 四、刪除索引 五、創(chuàng)建文檔 六、查看文檔 七、更新文檔 八、文檔是否存在 九、刪除文檔 結(jié)果: 語(yǔ)法: GET 索引名稱 ?從這兩個(gè)索引信息來(lái)看,es_db2是標(biāo)準(zhǔn)分詞器的索引,es_db3是ik分詞器索引。 語(yǔ)法:HEAD 索引名稱 ? ?在

    2024年01月19日
    瀏覽(23)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包