国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

kafka 3.5 主題分區(qū)的高水位線HW,低水位線LW,logStartOffset,LogEndOffset什么情況下會(huì)更新源碼

這篇具有很好參考價(jià)值的文章主要介紹了kafka 3.5 主題分區(qū)的高水位線HW,低水位線LW,logStartOffset,LogEndOffset什么情況下會(huì)更新源碼。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

下面的例子只是各拿一個(gè)做舉例,不是全部場(chǎng)景,不要以為logStartOffset,LogEndOffset,HW,LW只有三個(gè)場(chǎng)景可以修改

前言

這里需要針對(duì)logStartOffsetLogEndOffset做特殊說明,要不會(huì)讓大家腦袋混亂,并且前言后的章節(jié)講的都是主題分區(qū)級(jí)別

1、logStartOffset

(1)主題分區(qū)級(jí)別

  1. 對(duì)于每個(gè)分區(qū)中每一個(gè)副本(包括Leader和Follower)都有一個(gè)獨(dú)立的值,Kafka服務(wù)器記錄并管理
  2. 如果是單個(gè)segment,那logStartOffset是segment的第一個(gè)offset位點(diǎn),如果是多個(gè)segment,則是最舊的segment的第一個(gè)offset位點(diǎn),
  3. 會(huì)受到生產(chǎn)者的寫入、管理員的管理等因素的影響(刪除oldSegment等)

(2)消費(fèi)分組級(jí)別

  1. 針對(duì)每個(gè)消費(fèi)者組內(nèi)的每個(gè)分區(qū)而言的,對(duì)于每個(gè)消費(fèi)者組內(nèi)的每個(gè)分區(qū)都有一個(gè)獨(dú)立的值,表示消費(fèi)者在加入消費(fèi)組之前已經(jīng)消費(fèi)的最大偏移量。
  2. 客戶端記錄并管理的,它表示該消費(fèi)者在消費(fèi)時(shí)的起始位置
  3. 會(huì)根據(jù)消費(fèi)者消費(fèi)的情況而不斷變化

2、LogEndOffset

(1)主題分區(qū)級(jí)別

  1. 對(duì)于每個(gè)分區(qū)每一個(gè)副本(包括Leader和Follower)都有一個(gè)獨(dú)立的值,Kafka服務(wù)器記錄并管理
  2. 最新的segemnt的最新的offset位點(diǎn)+1
  3. 會(huì)受到生產(chǎn)者的寫入、管理員的管理等因素的影響

(2)消費(fèi)分組級(jí)別

  1. 針對(duì)每個(gè)消費(fèi)者組內(nèi)的每個(gè)分區(qū)而言的,對(duì)于每個(gè)消費(fèi)者組內(nèi)的每個(gè)分區(qū)都有一個(gè)獨(dú)立的值,表示消費(fèi)者在加入消費(fèi)組之前已經(jīng)消費(fèi)過的最后一條消息的下一個(gè)偏移量。
  2. 客戶端記錄并管理的,它表示該消費(fèi)者在消費(fèi)時(shí)的已消費(fèi)消息的位置
  3. 會(huì)根據(jù)消費(fèi)者消費(fèi)的情況而不斷變化

3、HighWatermark(HW)

在Leader副本中的ISR集合中,最小的主題分區(qū)級(jí)別的LogEndOffset中為HW

4、LowWatermark(LW)

這個(gè)數(shù)據(jù)是虛線的值,不是實(shí)際存儲(chǔ)的值,可以參考第五章節(jié)證明環(huán)節(jié),
但是LW和所有副本(AR)中最小的主題分區(qū)級(jí)別的logStartOffset是一致的

一、定時(shí)任務(wù)

1、在處理創(chuàng)建分區(qū)請(qǐng)求時(shí),會(huì)啟動(dòng)定時(shí)任務(wù),主要用于把高水位線HW定時(shí)寫入到文件中

def becomeLeaderOrFollower(correlationId: Int,
                             leaderAndIsrRequest: LeaderAndIsrRequest,
                             onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse = {
          //todo 啟動(dòng)高水位線定時(shí)任務(wù),目的是把每一個(gè)分區(qū)的高水位線的數(shù)據(jù)寫入到高水位標(biāo)記文件中
          startHighWatermarkCheckPointThread()
}          
def startHighWatermarkCheckPointThread(): Unit = {
    if (highWatermarkCheckPointThreadStarted.compareAndSet(false, true)) {
      //() => checkpointHighWatermarks() 是一個(gè)runner
      scheduler.schedule("highwatermark-checkpoint", () => checkpointHighWatermarks(), 0L, config.replicaHighWatermarkCheckpointIntervalMs)
    }
  }

定時(shí)任務(wù)checkpointHighWatermarks

def checkpointHighWatermarks(): Unit = {
    //該函數(shù)接受兩個(gè)參數(shù):logDirToCheckpoints和log。函數(shù)的作用是將log的高水位標(biāo)記(highWatermark)存儲(chǔ)到logDirToCheckpoints中。
    def putHw(logDirToCheckpoints: mutable.AnyRefMap[String, mutable.AnyRefMap[TopicPartition, Long]],
              log: UnifiedLog): Unit = {
      val checkpoints = logDirToCheckpoints.getOrElseUpdate(log.parentDir,
        new mutable.AnyRefMap[TopicPartition, Long]())
      checkpoints.put(log.topicPartition, log.highWatermark)
    }
    //它是一個(gè)mutable.AnyRefMap[String, mutable.AnyRefMap[TopicPartition, Long]]類型的可變映射。初始化大小為allPartitions.size。
    val logDirToHws = new mutable.AnyRefMap[String, mutable.AnyRefMap[TopicPartition, Long]](
      allPartitions.size)
   // 使用onlinePartitionsIterator迭代器遍歷每個(gè)分區(qū),并對(duì)每個(gè)分區(qū)的日志調(diào)用putHw函數(shù)來更新logDirToHws。
    onlinePartitionsIterator.foreach { partition =>
      partition.log.foreach(putHw(logDirToHws, _))
      partition.futureLog.foreach(putHw(logDirToHws, _))
    }
    //使用logDirToHws中的每個(gè)(logDir, hws)對(duì)調(diào)用highWatermarkCheckpoints.get(logDir),并嘗試將hws寫入高水位標(biāo)記文件
    for ((logDir, hws) <- logDirToHws) {
      try highWatermarkCheckpoints.get(logDir).foreach(_.write(hws))
      catch {
        //如果寫入過程中發(fā)生KafkaStorageException異常,則打印錯(cuò)誤日志
        case e: KafkaStorageException =>
          error(s"Error while writing to highwatermark file in directory $logDir", e)
      }
    }
  }

二、副本Follower在向Leader副本Fetch數(shù)據(jù)

至于為什么是fetchRecords方法,你可以看一下kakfa 3.5 kafka服務(wù)端處理消費(fèi)者客戶端拉取數(shù)據(jù)請(qǐng)求源碼

def fetchRecords(
    fetchParams: FetchParams,
    fetchPartitionData: FetchRequest.PartitionData,
    fetchTimeMs: Long,
    maxBytes: Int,
    minOneMessage: Boolean,
    updateFetchState: Boolean
  ): LogReadInfo = {
    //省略代碼
    //判斷獲取數(shù)據(jù)的請(qǐng)求是否來自Follower
    if (fetchParams.isFromFollower) {
      // Check that the request is from a valid replica before doing the read
      val (replica, logReadInfo) = inReadLock(leaderIsrUpdateLock) {
        //省略代碼
        val logReadInfo = readFromLocalLog(localLog)
        (replica, logReadInfo)
      }
      //todo Follower副本在fetch數(shù)據(jù)后,修改一些信息
      if (updateFetchState && !logReadInfo.divergingEpoch.isPresent) {
        //如果 fetch 來自 broker 的副本同步,那么就更新相關(guān)的 log end offset
        updateFollowerFetchState(
          replica,
          followerFetchOffsetMetadata = logReadInfo.fetchedData.fetchOffsetMetadata,
          followerStartOffset = fetchPartitionData.logStartOffset,
          followerFetchTimeMs = fetchTimeMs,
          leaderEndOffset = logReadInfo.logEndOffset,
          fetchParams.replicaEpoch
        )
      }

      logReadInfo
    } 
  //省略代碼  
  }
/**
   * Update the follower's state in the leader based on the last fetch request. See
   * [[Replica.updateFetchState()]] for details.
   *
   * This method is visible for performance testing (see `UpdateFollowerFetchStateBenchmark`)
   */
  def updateFollowerFetchState(
    replica: Replica,
    followerFetchOffsetMetadata: LogOffsetMetadata,
    followerStartOffset: Long,
    followerFetchTimeMs: Long,
    leaderEndOffset: Long,
    brokerEpoch: Long
  ): Unit = {
    // No need to calculate low watermark if there is no delayed DeleteRecordsRequest
    //通過判斷是否存在延遲的DeleteRecordsRequest來確定是否需要計(jì)算低水位(lowWatermarkIfLeader)。如果沒有延遲的DeleteRecordsRequest,則將oldLeaderLW設(shè)為-1。
    val oldLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L
    //獲取副本的先前的跟隨者日志結(jié)束偏移量
    val prevFollowerEndOffset = replica.stateSnapshot.logEndOffset
    //調(diào)用replica.updateFetchState方法來更新副本的抓取狀態(tài),包括跟隨者的抓取偏移量元數(shù)據(jù)、起始偏移量、抓取時(shí)間、領(lǐng)導(dǎo)者的結(jié)束偏移量和代理節(jié)點(diǎn)的時(shí)期。
    replica.updateFetchState(
      followerFetchOffsetMetadata,
      followerStartOffset,
      followerFetchTimeMs,
      leaderEndOffset,
      brokerEpoch
    )
    //再次判斷是否存在延遲的DeleteRecordsRequest,如果沒有則將newLeaderLW設(shè)為-1。
    val newLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L
    // check if the LW of the partition has incremented
    // since the replica's logStartOffset may have incremented
    //檢查分區(qū)的低水位是否增加,即新的低水位(newLeaderLW)是否大于舊的低水位(oldLeaderLW)。
    val leaderLWIncremented = newLeaderLW > oldLeaderLW

    // Check if this in-sync replica needs to be added to the ISR.
    //調(diào)用maybeExpandIsr方法來檢查是否需要將該同步副本添加到ISR(In-Sync Replicas)中。
    maybeExpandIsr(replica)

    // check if the HW of the partition can now be incremented
    // since the replica may already be in the ISR and its LEO has just incremented
    //檢查分區(qū)的高水位是否可以增加。如果副本的日志結(jié)束偏移量(replica.stateSnapshot.logEndOffset)發(fā)生變化,
    val leaderHWIncremented = if (prevFollowerEndOffset != replica.stateSnapshot.logEndOffset) {
      // the leader log may be updated by ReplicaAlterLogDirsThread so the following method must be in lock of
      // leaderIsrUpdateLock to prevent adding new hw to invalid log.
      //嘗試增加高水位(maybeIncrementLeaderHW方法),并在leaderIsrUpdateLock鎖的保護(hù)下執(zhí)行該操作。
      inReadLock(leaderIsrUpdateLock) {
        leaderLogIfLocal.exists(leaderLog => maybeIncrementLeaderHW(leaderLog, followerFetchTimeMs))
      }
    } else {
      false
    }

    // some delayed operations may be unblocked after HW or LW changed
    //如果低水位或高水位發(fā)生變化,則嘗試完成延遲請(qǐng)求(tryCompleteDelayedRequests方法)。
    if (leaderLWIncremented || leaderHWIncremented)
      tryCompleteDelayedRequests()

    debug(s"Recorded replica ${replica.brokerId} log end offset (LEO) position " +
      s"${followerFetchOffsetMetadata.messageOffset} and log start offset $followerStartOffset.")
  }

1、嘗試升高Leader副本的HW

 /**
   * 檢查并可能增加分區(qū)的高水位線;
   * 1. Partition ISR changed 1.分區(qū) ISR 已更改
   * 2. Any replica's LEO changed 2。任何副本的 LEO 已更改
   *
   * HW由同步或被視為已捕獲的所有副本中的最小日志結(jié)束偏移量確定。
   * 這樣,如果一個(gè)副本被視為已捕獲,但其對(duì)數(shù)結(jié)束偏移小于HW,我們將等待此副本趕上HW,然后再推進(jìn)HW。
   * 這有助于 ISR 僅包含領(lǐng)導(dǎo)者副本且從屬者試圖趕上的情況。
   * 如果我們?cè)谇斑M(jìn)HW時(shí)不等待跟隨者,則跟隨者的對(duì)數(shù)結(jié)束偏移量可能會(huì)一直落后于HW(由領(lǐng)導(dǎo)者的對(duì)數(shù)結(jié)束偏移量決定),因此永遠(yuǎn)不會(huì)添加到 ISR 中。
   * 隨著 AlterPartition 的添加,我們還在推進(jìn)硬件時(shí)將新添加的副本視為 ISR 的一部分。
   * 控制器尚未將這些副本提交到 ISR,因此我們可以恢復(fù)到之前提交的 ISR。
   * 但是,向 ISR 添加其他副本會(huì)使其更具限制性,因此更安全。我們將此集合稱為“最大”ISR。
   */
  private def maybeIncrementLeaderHW(leaderLog: UnifiedLog, currentTimeMs: Long = time.milliseconds): Boolean = {
    //首先獲得Leader分區(qū)的LogEndOffset
    val leaderLogEndOffset = leaderLog.logEndOffsetMetadata
    //創(chuàng)建一個(gè)新的Hw。不一定用上,
    var newHighWatermark = leaderLogEndOffset
    //首先,代碼通過迭代remoteReplicasMap中的每個(gè)副本(replica)來確定新的高水位線。對(duì)于每個(gè)副本,它檢查副本的狀態(tài)快照(replica.stateSnapshot)的日志結(jié)束偏移
    remoteReplicasMap.values.foreach { replica =>
      // Note here we are using the "maximal", see explanation above
      val replicaState = replica.stateSnapshot
      //如果副本的日志結(jié)束偏移小于新的高水位線,并且副本已經(jīng)追趕上了領(lǐng)導(dǎo)者日志,或者副本的brokerId包含在最大ISR(in-sync replicas)列表中,則將該日志結(jié)束偏移賦值給新的高水位線。
      if (replicaState.logEndOffsetMetadata.messageOffset < newHighWatermark.messageOffset &&
        (replicaState.isCaughtUp(leaderLogEndOffset.messageOffset, currentTimeMs, replicaLagTimeMaxMs)
          || partitionState.maximalIsr.contains(replica.brokerId))) {
        //則將該副本日志結(jié)束偏移賦值給新的高水位線。相當(dāng)于HW設(shè)置為ISR中endOffset最小的那一個(gè)
        newHighWatermark = replicaState.logEndOffsetMetadata
      }
    }
    leaderLog.maybeIncrementHighWatermark(newHighWatermark) match {
      case Some(oldHighWatermark) =>
        //嘗試更新領(lǐng)導(dǎo)者日志的高水位線。如果成功更新了舊的高水位線,則會(huì)輸出一條調(diào)試信息,并返回true
        debug(s"High watermark updated from $oldHighWatermark to $newHighWatermark")
        true
      //省略代碼
    }
  }

leaderLog.maybeIncrementHighWatermark這個(gè)是一個(gè)方法,有返回值

 /**
   *當(dāng)且僅當(dāng)高水位線大于舊值時(shí),才將其更新為新值。更新為大于日志結(jié)束偏移量的值是錯(cuò)誤的。
   *此方法旨在由領(lǐng)導(dǎo)者在更新追隨者提取偏移量后更新高水位線。
   * @return the old high watermark, if updated by the new value
   */
  def maybeIncrementHighWatermark(newHighWatermark: LogOffsetMetadata): Option[LogOffsetMetadata] = {
   //省略代碼
    lock.synchronized {
      val oldHighWatermark = fetchHighWatermarkMetadata
	 //省略代碼
      if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||
        (oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) {
        updateHighWatermarkMetadata(newHighWatermark)
        Some(oldHighWatermark)
      } else {
        None
      }
    }
  }

updateHighWatermarkMetadata修改HW元數(shù)據(jù)的方法

//更新HW高水位線的方法
  private def updateHighWatermarkMetadata(newHighWatermark: LogOffsetMetadata): Unit = {
    if (newHighWatermark.messageOffset < 0)
      throw new IllegalArgumentException("High watermark offset should be non-negative")

    lock synchronized {
      if (newHighWatermark.messageOffset < highWatermarkMetadata.messageOffset) {
        warn(s"Non-monotonic update of high watermark from $highWatermarkMetadata to $newHighWatermark")
      }
      //先更新HW緩存,再更新數(shù)據(jù)
      highWatermarkMetadata = newHighWatermark
      producerStateManager.onHighWatermarkUpdated(newHighWatermark.messageOffset)
      logOffsetsListener.onHighWatermarkUpdated(newHighWatermark.messageOffset)
      maybeIncrementFirstUnstableOffset()
    }
    trace(s"Setting high watermark $newHighWatermark")
  }

三、生產(chǎn)者把數(shù)據(jù)推送到服務(wù)端

至于生產(chǎn)者推送消息到服務(wù)端,可以參考kafka 3.5 kafka服務(wù)端接收生產(chǎn)者發(fā)送的數(shù)據(jù)源碼

1、logEndOffset升高

private def append(records: MemoryRecords,
                     origin: AppendOrigin,
                     interBrokerProtocolVersion: MetadataVersion,
                     validateAndAssignOffsets: Boolean,
                     leaderEpoch: Int,
                     requestLocal: Option[RequestLocal],
                     ignoreRecordSize: Boolean): LogAppendInfo = {
             
              //把數(shù)據(jù)追加到數(shù)據(jù)文件、索引文件、時(shí)間索引文件的方法
              //這里會(huì)修改LogEndOffset,保證這個(gè)點(diǎn)是最新數(shù)據(jù)的位點(diǎn)+1
              localLog.append(appendInfo.lastOffset, appendInfo.maxTimestamp, appendInfo.offsetOfMaxTimestamp, validRecords)
              //修改高水位線最后一個(gè)日志的偏移量
              updateHighWatermarkWithLogEndOffset()

  }
 private[log] def append(lastOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords): Unit = {
    //給數(shù)據(jù)文件增加數(shù)據(jù),并且根據(jù)條件判斷是否給索引文件和時(shí)間索引文件增加數(shù)據(jù)
    segments.activeSegment.append(largestOffset = lastOffset, largestTimestamp = largestTimestamp,
      shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp, records = records)
    //更新日志的結(jié)束偏移量,并更新恢復(fù)點(diǎn)。
    updateLogEndOffset(lastOffset + 1)
  }

其中updateLogEndOffset會(huì)修改logEndOffset

/**
   * The offset metadata of the next message that will be appended to the log
   */
  private[log] def logEndOffsetMetadata: LogOffsetMetadata = nextOffsetMetadata
  /**
   * The offset of the next message that will be appended to the log
   */
  private[log] def logEndOffset: Long = nextOffsetMetadata.messageOffset
  /**
   * Update end offset of the log, and update the recoveryPoint.
   *更新日志的結(jié)束偏移量,并更新恢復(fù)點(diǎn)。
   * @param endOffset the new end offset of the log
   */
  private[log] def updateLogEndOffset(endOffset: Long): Unit = {
    nextOffsetMetadata = new LogOffsetMetadata(endOffset, segments.activeSegment.baseOffset, segments.activeSegment.size)
    //恢復(fù)點(diǎn)設(shè)置為上次的endOffset
    if (recoveryPoint > endOffset) {
      updateRecoveryPoint(endOffset)
    }
  }

new LogOffsetMetadata會(huì)創(chuàng)建一個(gè)新的給logEndOffsetMetadata,覆蓋掉舊的,并且logEndOffset變成了新的logEndOffsetMetadata中的messageOffset

  public LogOffsetMetadata(long messageOffset,
                             long segmentBaseOffset,
                             int relativePositionInSegment) {
        this.messageOffset = messageOffset;
        this.segmentBaseOffset = segmentBaseOffset;
        this.relativePositionInSegment = relativePositionInSegment;
    }

四、segment過期執(zhí)行刪除最早創(chuàng)建的segment

1、logStartOffset升高

執(zhí)行deleteSegments一般是segment過期執(zhí)行刪除操作,都是從時(shí)間最久的segment開始刪除,所以LogStartOffset才會(huì)遞增

private def deleteSegments(deletable: Iterable[LogSegment], reason: SegmentDeletionReason): Int = {
    maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") {
      val numToDelete = deletable.size
      if (numToDelete > 0) {
        // we must always have at least one segment, so if we are going to delete all the segments, create a new one first
        var segmentsToDelete = deletable
        if (localLog.segments.numberOfSegments == numToDelete) {
          val newSegment = roll()
          if (deletable.last.baseOffset == newSegment.baseOffset) {
            warn(s"Empty active segment at ${deletable.last.baseOffset} was deleted and recreated due to $reason")
            segmentsToDelete = deletable.dropRight(1)
          }
        }
        localLog.checkIfMemoryMappedBufferClosed()
        // remove the segments for lookups
        localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, reason)
        deleteProducerSnapshots(deletable, asyncDelete = true)
        maybeIncrementLogStartOffset(localLog.segments.firstSegmentBaseOffset.get, LogStartOffsetIncrementReason.SegmentDeletion)
      }
      numToDelete
    }
  }

在刪除segment時(shí),會(huì)調(diào)用maybeIncrementLogStartOffset 會(huì)嘗試修改LogStartOffset

 /**
  
   *如果提供的偏移量較大,則遞增日志開始偏移量。
   *如果日志開始偏移量發(fā)生更改,則此方法還會(huì)更新一些鍵偏移量,以便“l(fā)ogStartOffset <= logStableOffset <= highWatermark”。
   * 前導(dǎo)紀(jì)元緩存也會(huì)更新,以便該組件中引用的所有偏移都指向此日志中的有效偏移。
   * @throws OffsetOutOfRangeException if the log start offset is greater than the high watermark
   * @return true if the log start offset was updated; otherwise false
   */
  def maybeIncrementLogStartOffset(newLogStartOffset: Long, reason: LogStartOffsetIncrementReason): Boolean = {
    var updatedLogStartOffset = false
    maybeHandleIOException(s"Exception while increasing log start offset for $topicPartition to $newLogStartOffset in dir ${dir.getParent}") {
      lock synchronized {
        if (newLogStartOffset > highWatermark)
          throw new OffsetOutOfRangeException(s"Cannot increment the log start offset to $newLogStartOffset of partition $topicPartition " +
            s"since it is larger than the high watermark $highWatermark")

        localLog.checkIfMemoryMappedBufferClosed()
        if (newLogStartOffset > logStartOffset) {
         //修改LogStartOffset
          updatedLogStartOffset = true
          updateLogStartOffset(newLogStartOffset)
          _localLogStartOffset = newLogStartOffset
          info(s"Incremented log start offset to $newLogStartOffset due to $reason")
          leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
          producerStateManager.onLogStartOffsetIncremented(newLogStartOffset)
          maybeIncrementFirstUnstableOffset()
        }
      }
    }

    updatedLogStartOffset
  }
//修改LogStartOffset
  private def updateLogStartOffset(offset: Long): Unit = {
    logStartOffset = offset

    if (highWatermark < offset) {
      updateHighWatermark(offset)
    }

    if (localLog.recoveryPoint < offset) {
      localLog.updateRecoveryPoint(offset)
    }
  }

五、證明LW在代碼中不實(shí)際存儲(chǔ)

1、UnifiedLog.scala沒有字段表示存儲(chǔ)的是LW

但是比如HW,logStartOffset、logEndOffset都存在

class UnifiedLog(@volatile var logStartOffset: Long,
                 private val localLog: LocalLog,
                 brokerTopicStats: BrokerTopicStats,
                 val producerIdExpirationCheckIntervalMs: Int,
                 @volatile var leaderEpochCache: Option[LeaderEpochFileCache],
                 val producerStateManager: ProducerStateManager,
                 @volatile private var _topicId: Option[Uuid],
                 val keepPartitionMetadataFile: Boolean,
                 val remoteStorageSystemEnable: Boolean = false,
                 remoteLogManager: Option[RemoteLogManager] = None,
                 @volatile private var logOffsetsListener: LogOffsetsListener = LogOffsetsListener.NO_OP_OFFSETS_LISTENER) extends Logging {
   //logStartOffset
  @volatile private[kafka] var _localLogStartOffset: Long = logStartOffset
  //highWatermark            
  def highWatermark: Long = highWatermarkMetadata.messageOffset
  //logEndOffset
  def logEndOffset: Long =  localLog.logEndOffset

}

2、獲得lowWatermark方法

基本獲得lowWatermark的地方都是用lowWatermarkIfLeader獲得LW,而方法內(nèi)部是通過遍歷所有副本中最小的logStartOffset得到的文章來源地址http://www.zghlxwxcb.cn/news/detail-709432.html

/**
   * 低水位線偏移值,僅當(dāng)本地副本是分區(qū)前導(dǎo)符時(shí)才計(jì)算 它僅由領(lǐng)導(dǎo)代理用于決定何時(shí)滿足 DeleteRecordsRequest。
   * 它的值是所有活動(dòng)副本的最小 logStartOffset 當(dāng)領(lǐng)導(dǎo)者代理收到 FetchRequest 或 DeleteRecordsRequest 時(shí),低水位線將增加。
   */
  def lowWatermarkIfLeader: Long = {
  	//不是Leader拋異常
    if (!isLeader)
      throw new NotLeaderOrFollowerException(s"Leader not local for partition $topicPartition on broker $localBrokerId")
    //當(dāng) DeleteRecordsRequest 未完成時(shí),lowWatermarkIfLeader 可能會(huì)被多次調(diào)用,已注意避免在此代碼中生成不必要的集合
    //首先獲得Leader的logStartOffset
    var lowWaterMark = localLogOrException.logStartOffset
    //遍歷所有的副本,如果有一個(gè)副本的logStartOffset小于當(dāng)前Leader的logStartOffset,則LW則重新設(shè)置為最小的那個(gè)
    remoteReplicas.foreach { replica =>
      val logStartOffset = replica.stateSnapshot.logStartOffset
      if (metadataCache.hasAliveBroker(replica.brokerId) && logStartOffset < lowWaterMark) {
        lowWaterMark = logStartOffset
      }
    }
    //如果存在未來的日志(futureLog),則將最小水位線與未來日志的起始偏移量進(jìn)行比較,取較小值作為最終的最小水位線;如果不存在未來的日志,則直接返回最小水位線。
    futureLog match {
      case Some(partitionFutureLog) =>
        Math.min(lowWaterMark, partitionFutureLog.logStartOffset)
      case None =>
        lowWaterMark
    }
  }

到了這里,關(guān)于kafka 3.5 主題分區(qū)的高水位線HW,低水位線LW,logStartOffset,LogEndOffset什么情況下會(huì)更新源碼的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 【FLink】水位線(Watermark)

    【FLink】水位線(Watermark)

    目錄 1、關(guān)于時(shí)間語義 1.1事件時(shí)間 1.2處理時(shí)間?編輯 2、什么是水位線 2.1 順序流和亂序流 2.2亂序數(shù)據(jù)的處理 2.3 水位線的特性 3 、水位線的生成 3.1 生成水位線的總體原則 3.2 水位線生成策略 3.3 Flink內(nèi)置水位線 3.3.1?有序流中內(nèi)置水位線設(shè)置 3.4.2?斷點(diǎn)式水位線生成器(Punc

    2024年02月21日
    瀏覽(19)
  • Flink-水位線和時(shí)間語義

    Flink-水位線和時(shí)間語義

    在實(shí)際應(yīng)用中,事件時(shí)間語義會(huì)更為常見。一般情況下,業(yè)務(wù)日志數(shù)據(jù)中都會(huì)記錄數(shù)據(jù)生成的時(shí)間戳(timestamp),它就可以作為事件時(shí)間的判斷基礎(chǔ)。 在Flink中,由于處理時(shí)間比較簡(jiǎn)單,早期版本默認(rèn)的時(shí)間語義是處理時(shí)間;而考慮到事件時(shí)間在實(shí)際應(yīng)用中更為廣泛,從Fli

    2024年02月04日
    瀏覽(34)
  • Flink-【時(shí)間語義、窗口、水位線】

    Flink-【時(shí)間語義、窗口、水位線】

    ??:可樂 可樂的生產(chǎn)日期?= 事件時(shí)間(可樂產(chǎn)生的時(shí)間); 可樂被喝的時(shí)間 = 處理時(shí)間(可樂被處理【喝掉=處理】的時(shí)間)。 機(jī)器時(shí)間:可能不準(zhǔn)確(例如:A可樂廠的時(shí)鐘比較慢,B可樂廠的時(shí)鐘比較快,但實(shí)際上B產(chǎn)生可樂的時(shí)間比A產(chǎn)生可樂的時(shí)間慢,卻被先處理了)

    2024年02月01日
    瀏覽(21)
  • Flink之Watermark水印、水位線

    在Apache Flink中,Watermark(水?。┦且环N用于處理事件時(shí)間(eventtime)的時(shí)間指示器。它模擬了事件流中事件時(shí)間進(jìn)展的概念。 事件時(shí)間是指事件實(shí)際發(fā)生的時(shí)間,在分布式流處理中經(jīng)常用于處理無序事件流。然而,由于網(wǎng)絡(luò)延遲、亂序事件的到達(dá)以及分布式處理的特點(diǎn),事件

    2024年02月08日
    瀏覽(22)
  • Flink-水位線的設(shè)置以及傳遞

    Flink-水位線的設(shè)置以及傳遞

    6.2.1 概述 分類 有序流 無序流 判斷的時(shí)間延遲 延遲時(shí)間判定 6.2.2 水位線的設(shè)置 分析 DataStream下的assignTimstampsAndWatermarks方法,返回SingleOutputStreamOperator本質(zhì)還是個(gè)算子,傳入的參數(shù)是WatermarkStrategy的生成策略 但是WatermarkStrategy是一個(gè)接口 有序流 因此調(diào)用靜態(tài)方法forMonotonousT

    2023年04月15日
    瀏覽(27)
  • flink水位線傳播及任務(wù)事件時(shí)間

    flink水位線傳播及任務(wù)事件時(shí)間

    本文來講解一下flink的水位線傳播及對(duì)其對(duì)任務(wù)事件時(shí)間的影響 首先flink是通過從源頭生成水位線記錄的方式來實(shí)現(xiàn)水位線傳播的,也就是說水位線是嵌入在正常的記錄流中的特殊記錄,攜帶者水位線的時(shí)間戳,以下我們就通過圖片的方式來講解下水位線是如何傳播以及更新

    2024年02月16日
    瀏覽(21)
  • 【入門Flink】- 09Flink水位線Watermark

    【入門Flink】- 09Flink水位線Watermark

    在 窗口的處理過程 中,基于數(shù)據(jù)的時(shí)間戳,自定義一個(gè) “邏輯時(shí)鐘” 。這個(gè)時(shí)鐘的時(shí)間不會(huì)自動(dòng)流逝;它的時(shí)間進(jìn)展,就是靠著新到數(shù)據(jù)的時(shí)間戳來推動(dòng)的。 用來衡量 事件時(shí)間 進(jìn)展的標(biāo)記,就被稱作 “水位線”(Watermark) 。 具體實(shí)現(xiàn)上,水位線可以看作一條 特殊的數(shù)

    2024年01月17日
    瀏覽(23)
  • Flink詳解系列之五--水位線(watermark)

    Flink詳解系列之五--水位線(watermark)

    1、概念 在Flink中,水位線是一種衡量Event Time進(jìn)展的機(jī)制,用來處理實(shí)時(shí)數(shù)據(jù)中的亂序問題的,通常是水位線和窗口結(jié)合使用來實(shí)現(xiàn)。 從設(shè)備生成實(shí)時(shí)流事件,到Flink的source,再到多個(gè)oparator處理數(shù)據(jù),過程中會(huì)受到網(wǎng)絡(luò)延遲、背壓等多種因素影響造成數(shù)據(jù)亂序。在進(jìn)行窗口處

    2024年02月13日
    瀏覽(20)
  • Elasticsearch--解決磁盤使用率超過警戒水位線

    Elasticsearch--解決磁盤使用率超過警戒水位線

    原文網(wǎng)址:Elasticsearch--解決磁盤使用率超過警戒水位線_IT利刃出鞘的博客-CSDN博客 本文介紹如何解決ES磁盤使用率超過警戒水位線的問題。 當(dāng)客戶端向 Elasticsearch 寫入文檔時(shí)候報(bào)錯(cuò): 在 elasticsearch 的日志文件中報(bào)錯(cuò)如下: 出現(xiàn)如上問題多半是:磁盤使用量超過警戒水位線,

    2024年02月05日
    瀏覽(21)
  • 源碼解析FlinkKafkaConsumer支持周期性水位線發(fā)送

    當(dāng)flink消費(fèi)kafka的消息時(shí),我們經(jīng)常會(huì)用到FlinkKafkaConsumer進(jìn)行水位線的發(fā)送,本文就從源碼看下FlinkKafkaConsumer.assignTimestampsAndWatermarks指定周期性水位線發(fā)送的流程 1.首先從Fetcher類開始,創(chuàng)建Fetcher類的時(shí)候會(huì)構(gòu)建一個(gè)周期性的水位線發(fā)送線程并啟動(dòng) 2.隨后,PeriodicWatermarkEmitter中

    2024年02月08日
    瀏覽(14)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包