下面的例子只是各拿一個(gè)做舉例,不是全部場(chǎng)景,不要以為logStartOffset,LogEndOffset,HW,LW只有三個(gè)場(chǎng)景可以修改
前言
這里需要針對(duì)logStartOffset和LogEndOffset做特殊說明,要不會(huì)讓大家腦袋混亂,并且前言后的章節(jié)講的都是
主題分區(qū)級(jí)別
的
1、logStartOffset
(1)主題分區(qū)級(jí)別
- 對(duì)于每個(gè)分區(qū)中每一個(gè)副本(包括Leader和Follower)都有一個(gè)獨(dú)立的值,Kafka服務(wù)器記錄并管理
- 如果是單個(gè)segment,那logStartOffset是segment的第一個(gè)offset位點(diǎn),如果是多個(gè)segment,則是最舊的segment的第一個(gè)offset位點(diǎn),
- 會(huì)受到生產(chǎn)者的寫入、管理員的管理等因素的影響(刪除oldSegment等)
(2)消費(fèi)分組級(jí)別
- 針對(duì)每個(gè)消費(fèi)者組內(nèi)的每個(gè)分區(qū)而言的,對(duì)于每個(gè)消費(fèi)者組內(nèi)的每個(gè)分區(qū)都有一個(gè)獨(dú)立的值,表示消費(fèi)者在加入消費(fèi)組之前已經(jīng)消費(fèi)的最大偏移量。
- 客戶端記錄并管理的,它表示該消費(fèi)者在消費(fèi)時(shí)的起始位置
- 會(huì)根據(jù)消費(fèi)者消費(fèi)的情況而不斷變化
2、LogEndOffset
(1)主題分區(qū)級(jí)別
- 對(duì)于每個(gè)分區(qū)每一個(gè)副本(包括Leader和Follower)都有一個(gè)獨(dú)立的值,Kafka服務(wù)器記錄并管理
- 最新的segemnt的最新的offset位點(diǎn)+1
- 會(huì)受到生產(chǎn)者的寫入、管理員的管理等因素的影響
(2)消費(fèi)分組級(jí)別
- 針對(duì)每個(gè)消費(fèi)者組內(nèi)的每個(gè)分區(qū)而言的,對(duì)于每個(gè)消費(fèi)者組內(nèi)的每個(gè)分區(qū)都有一個(gè)獨(dú)立的值,表示消費(fèi)者在加入消費(fèi)組之前已經(jīng)消費(fèi)過的最后一條消息的下一個(gè)偏移量。
- 客戶端記錄并管理的,它表示該消費(fèi)者在消費(fèi)時(shí)的已消費(fèi)消息的位置
- 會(huì)根據(jù)消費(fèi)者消費(fèi)的情況而不斷變化
3、HighWatermark(HW)
在Leader副本中的ISR集合中,最小的主題分區(qū)級(jí)別的LogEndOffset中為HW
4、LowWatermark(LW)
這個(gè)數(shù)據(jù)是虛線的值,不是實(shí)際存儲(chǔ)的值,可以參考第五章節(jié)證明環(huán)節(jié),
但是LW和所有副本(AR)中最小的主題分區(qū)級(jí)別的logStartOffset是一致的
一、定時(shí)任務(wù)
1、在處理創(chuàng)建分區(qū)請(qǐng)求時(shí),會(huì)啟動(dòng)定時(shí)任務(wù),主要用于把高水位線HW定時(shí)寫入到文件中
def becomeLeaderOrFollower(correlationId: Int,
leaderAndIsrRequest: LeaderAndIsrRequest,
onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse = {
//todo 啟動(dòng)高水位線定時(shí)任務(wù),目的是把每一個(gè)分區(qū)的高水位線的數(shù)據(jù)寫入到高水位標(biāo)記文件中
startHighWatermarkCheckPointThread()
}
def startHighWatermarkCheckPointThread(): Unit = {
if (highWatermarkCheckPointThreadStarted.compareAndSet(false, true)) {
//() => checkpointHighWatermarks() 是一個(gè)runner
scheduler.schedule("highwatermark-checkpoint", () => checkpointHighWatermarks(), 0L, config.replicaHighWatermarkCheckpointIntervalMs)
}
}
定時(shí)任務(wù)checkpointHighWatermarks
def checkpointHighWatermarks(): Unit = {
//該函數(shù)接受兩個(gè)參數(shù):logDirToCheckpoints和log。函數(shù)的作用是將log的高水位標(biāo)記(highWatermark)存儲(chǔ)到logDirToCheckpoints中。
def putHw(logDirToCheckpoints: mutable.AnyRefMap[String, mutable.AnyRefMap[TopicPartition, Long]],
log: UnifiedLog): Unit = {
val checkpoints = logDirToCheckpoints.getOrElseUpdate(log.parentDir,
new mutable.AnyRefMap[TopicPartition, Long]())
checkpoints.put(log.topicPartition, log.highWatermark)
}
//它是一個(gè)mutable.AnyRefMap[String, mutable.AnyRefMap[TopicPartition, Long]]類型的可變映射。初始化大小為allPartitions.size。
val logDirToHws = new mutable.AnyRefMap[String, mutable.AnyRefMap[TopicPartition, Long]](
allPartitions.size)
// 使用onlinePartitionsIterator迭代器遍歷每個(gè)分區(qū),并對(duì)每個(gè)分區(qū)的日志調(diào)用putHw函數(shù)來更新logDirToHws。
onlinePartitionsIterator.foreach { partition =>
partition.log.foreach(putHw(logDirToHws, _))
partition.futureLog.foreach(putHw(logDirToHws, _))
}
//使用logDirToHws中的每個(gè)(logDir, hws)對(duì)調(diào)用highWatermarkCheckpoints.get(logDir),并嘗試將hws寫入高水位標(biāo)記文件
for ((logDir, hws) <- logDirToHws) {
try highWatermarkCheckpoints.get(logDir).foreach(_.write(hws))
catch {
//如果寫入過程中發(fā)生KafkaStorageException異常,則打印錯(cuò)誤日志
case e: KafkaStorageException =>
error(s"Error while writing to highwatermark file in directory $logDir", e)
}
}
}
二、副本Follower在向Leader副本Fetch數(shù)據(jù)
至于為什么是fetchRecords
方法,你可以看一下kakfa 3.5 kafka服務(wù)端處理消費(fèi)者客戶端拉取數(shù)據(jù)請(qǐng)求源碼
def fetchRecords(
fetchParams: FetchParams,
fetchPartitionData: FetchRequest.PartitionData,
fetchTimeMs: Long,
maxBytes: Int,
minOneMessage: Boolean,
updateFetchState: Boolean
): LogReadInfo = {
//省略代碼
//判斷獲取數(shù)據(jù)的請(qǐng)求是否來自Follower
if (fetchParams.isFromFollower) {
// Check that the request is from a valid replica before doing the read
val (replica, logReadInfo) = inReadLock(leaderIsrUpdateLock) {
//省略代碼
val logReadInfo = readFromLocalLog(localLog)
(replica, logReadInfo)
}
//todo Follower副本在fetch數(shù)據(jù)后,修改一些信息
if (updateFetchState && !logReadInfo.divergingEpoch.isPresent) {
//如果 fetch 來自 broker 的副本同步,那么就更新相關(guān)的 log end offset
updateFollowerFetchState(
replica,
followerFetchOffsetMetadata = logReadInfo.fetchedData.fetchOffsetMetadata,
followerStartOffset = fetchPartitionData.logStartOffset,
followerFetchTimeMs = fetchTimeMs,
leaderEndOffset = logReadInfo.logEndOffset,
fetchParams.replicaEpoch
)
}
logReadInfo
}
//省略代碼
}
/**
* Update the follower's state in the leader based on the last fetch request. See
* [[Replica.updateFetchState()]] for details.
*
* This method is visible for performance testing (see `UpdateFollowerFetchStateBenchmark`)
*/
def updateFollowerFetchState(
replica: Replica,
followerFetchOffsetMetadata: LogOffsetMetadata,
followerStartOffset: Long,
followerFetchTimeMs: Long,
leaderEndOffset: Long,
brokerEpoch: Long
): Unit = {
// No need to calculate low watermark if there is no delayed DeleteRecordsRequest
//通過判斷是否存在延遲的DeleteRecordsRequest來確定是否需要計(jì)算低水位(lowWatermarkIfLeader)。如果沒有延遲的DeleteRecordsRequest,則將oldLeaderLW設(shè)為-1。
val oldLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L
//獲取副本的先前的跟隨者日志結(jié)束偏移量
val prevFollowerEndOffset = replica.stateSnapshot.logEndOffset
//調(diào)用replica.updateFetchState方法來更新副本的抓取狀態(tài),包括跟隨者的抓取偏移量元數(shù)據(jù)、起始偏移量、抓取時(shí)間、領(lǐng)導(dǎo)者的結(jié)束偏移量和代理節(jié)點(diǎn)的時(shí)期。
replica.updateFetchState(
followerFetchOffsetMetadata,
followerStartOffset,
followerFetchTimeMs,
leaderEndOffset,
brokerEpoch
)
//再次判斷是否存在延遲的DeleteRecordsRequest,如果沒有則將newLeaderLW設(shè)為-1。
val newLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L
// check if the LW of the partition has incremented
// since the replica's logStartOffset may have incremented
//檢查分區(qū)的低水位是否增加,即新的低水位(newLeaderLW)是否大于舊的低水位(oldLeaderLW)。
val leaderLWIncremented = newLeaderLW > oldLeaderLW
// Check if this in-sync replica needs to be added to the ISR.
//調(diào)用maybeExpandIsr方法來檢查是否需要將該同步副本添加到ISR(In-Sync Replicas)中。
maybeExpandIsr(replica)
// check if the HW of the partition can now be incremented
// since the replica may already be in the ISR and its LEO has just incremented
//檢查分區(qū)的高水位是否可以增加。如果副本的日志結(jié)束偏移量(replica.stateSnapshot.logEndOffset)發(fā)生變化,
val leaderHWIncremented = if (prevFollowerEndOffset != replica.stateSnapshot.logEndOffset) {
// the leader log may be updated by ReplicaAlterLogDirsThread so the following method must be in lock of
// leaderIsrUpdateLock to prevent adding new hw to invalid log.
//嘗試增加高水位(maybeIncrementLeaderHW方法),并在leaderIsrUpdateLock鎖的保護(hù)下執(zhí)行該操作。
inReadLock(leaderIsrUpdateLock) {
leaderLogIfLocal.exists(leaderLog => maybeIncrementLeaderHW(leaderLog, followerFetchTimeMs))
}
} else {
false
}
// some delayed operations may be unblocked after HW or LW changed
//如果低水位或高水位發(fā)生變化,則嘗試完成延遲請(qǐng)求(tryCompleteDelayedRequests方法)。
if (leaderLWIncremented || leaderHWIncremented)
tryCompleteDelayedRequests()
debug(s"Recorded replica ${replica.brokerId} log end offset (LEO) position " +
s"${followerFetchOffsetMetadata.messageOffset} and log start offset $followerStartOffset.")
}
1、嘗試升高Leader副本的HW
/**
* 檢查并可能增加分區(qū)的高水位線;
* 1. Partition ISR changed 1.分區(qū) ISR 已更改
* 2. Any replica's LEO changed 2。任何副本的 LEO 已更改
*
* HW由同步或被視為已捕獲的所有副本中的最小日志結(jié)束偏移量確定。
* 這樣,如果一個(gè)副本被視為已捕獲,但其對(duì)數(shù)結(jié)束偏移小于HW,我們將等待此副本趕上HW,然后再推進(jìn)HW。
* 這有助于 ISR 僅包含領(lǐng)導(dǎo)者副本且從屬者試圖趕上的情況。
* 如果我們?cè)谇斑M(jìn)HW時(shí)不等待跟隨者,則跟隨者的對(duì)數(shù)結(jié)束偏移量可能會(huì)一直落后于HW(由領(lǐng)導(dǎo)者的對(duì)數(shù)結(jié)束偏移量決定),因此永遠(yuǎn)不會(huì)添加到 ISR 中。
* 隨著 AlterPartition 的添加,我們還在推進(jìn)硬件時(shí)將新添加的副本視為 ISR 的一部分。
* 控制器尚未將這些副本提交到 ISR,因此我們可以恢復(fù)到之前提交的 ISR。
* 但是,向 ISR 添加其他副本會(huì)使其更具限制性,因此更安全。我們將此集合稱為“最大”ISR。
*/
private def maybeIncrementLeaderHW(leaderLog: UnifiedLog, currentTimeMs: Long = time.milliseconds): Boolean = {
//首先獲得Leader分區(qū)的LogEndOffset
val leaderLogEndOffset = leaderLog.logEndOffsetMetadata
//創(chuàng)建一個(gè)新的Hw。不一定用上,
var newHighWatermark = leaderLogEndOffset
//首先,代碼通過迭代remoteReplicasMap中的每個(gè)副本(replica)來確定新的高水位線。對(duì)于每個(gè)副本,它檢查副本的狀態(tài)快照(replica.stateSnapshot)的日志結(jié)束偏移
remoteReplicasMap.values.foreach { replica =>
// Note here we are using the "maximal", see explanation above
val replicaState = replica.stateSnapshot
//如果副本的日志結(jié)束偏移小于新的高水位線,并且副本已經(jīng)追趕上了領(lǐng)導(dǎo)者日志,或者副本的brokerId包含在最大ISR(in-sync replicas)列表中,則將該日志結(jié)束偏移賦值給新的高水位線。
if (replicaState.logEndOffsetMetadata.messageOffset < newHighWatermark.messageOffset &&
(replicaState.isCaughtUp(leaderLogEndOffset.messageOffset, currentTimeMs, replicaLagTimeMaxMs)
|| partitionState.maximalIsr.contains(replica.brokerId))) {
//則將該副本日志結(jié)束偏移賦值給新的高水位線。相當(dāng)于HW設(shè)置為ISR中endOffset最小的那一個(gè)
newHighWatermark = replicaState.logEndOffsetMetadata
}
}
leaderLog.maybeIncrementHighWatermark(newHighWatermark) match {
case Some(oldHighWatermark) =>
//嘗試更新領(lǐng)導(dǎo)者日志的高水位線。如果成功更新了舊的高水位線,則會(huì)輸出一條調(diào)試信息,并返回true
debug(s"High watermark updated from $oldHighWatermark to $newHighWatermark")
true
//省略代碼
}
}
leaderLog.maybeIncrementHighWatermark
這個(gè)是一個(gè)方法,有返回值
/**
*當(dāng)且僅當(dāng)高水位線大于舊值時(shí),才將其更新為新值。更新為大于日志結(jié)束偏移量的值是錯(cuò)誤的。
*此方法旨在由領(lǐng)導(dǎo)者在更新追隨者提取偏移量后更新高水位線。
* @return the old high watermark, if updated by the new value
*/
def maybeIncrementHighWatermark(newHighWatermark: LogOffsetMetadata): Option[LogOffsetMetadata] = {
//省略代碼
lock.synchronized {
val oldHighWatermark = fetchHighWatermarkMetadata
//省略代碼
if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||
(oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) {
updateHighWatermarkMetadata(newHighWatermark)
Some(oldHighWatermark)
} else {
None
}
}
}
updateHighWatermarkMetadata
修改HW元數(shù)據(jù)的方法
//更新HW高水位線的方法
private def updateHighWatermarkMetadata(newHighWatermark: LogOffsetMetadata): Unit = {
if (newHighWatermark.messageOffset < 0)
throw new IllegalArgumentException("High watermark offset should be non-negative")
lock synchronized {
if (newHighWatermark.messageOffset < highWatermarkMetadata.messageOffset) {
warn(s"Non-monotonic update of high watermark from $highWatermarkMetadata to $newHighWatermark")
}
//先更新HW緩存,再更新數(shù)據(jù)
highWatermarkMetadata = newHighWatermark
producerStateManager.onHighWatermarkUpdated(newHighWatermark.messageOffset)
logOffsetsListener.onHighWatermarkUpdated(newHighWatermark.messageOffset)
maybeIncrementFirstUnstableOffset()
}
trace(s"Setting high watermark $newHighWatermark")
}
三、生產(chǎn)者把數(shù)據(jù)推送到服務(wù)端
至于生產(chǎn)者推送消息到服務(wù)端,可以參考kafka 3.5 kafka服務(wù)端接收生產(chǎn)者發(fā)送的數(shù)據(jù)源碼
1、logEndOffset升高
private def append(records: MemoryRecords,
origin: AppendOrigin,
interBrokerProtocolVersion: MetadataVersion,
validateAndAssignOffsets: Boolean,
leaderEpoch: Int,
requestLocal: Option[RequestLocal],
ignoreRecordSize: Boolean): LogAppendInfo = {
//把數(shù)據(jù)追加到數(shù)據(jù)文件、索引文件、時(shí)間索引文件的方法
//這里會(huì)修改LogEndOffset,保證這個(gè)點(diǎn)是最新數(shù)據(jù)的位點(diǎn)+1
localLog.append(appendInfo.lastOffset, appendInfo.maxTimestamp, appendInfo.offsetOfMaxTimestamp, validRecords)
//修改高水位線最后一個(gè)日志的偏移量
updateHighWatermarkWithLogEndOffset()
}
private[log] def append(lastOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords): Unit = {
//給數(shù)據(jù)文件增加數(shù)據(jù),并且根據(jù)條件判斷是否給索引文件和時(shí)間索引文件增加數(shù)據(jù)
segments.activeSegment.append(largestOffset = lastOffset, largestTimestamp = largestTimestamp,
shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp, records = records)
//更新日志的結(jié)束偏移量,并更新恢復(fù)點(diǎn)。
updateLogEndOffset(lastOffset + 1)
}
其中updateLogEndOffset會(huì)修改logEndOffset
/**
* The offset metadata of the next message that will be appended to the log
*/
private[log] def logEndOffsetMetadata: LogOffsetMetadata = nextOffsetMetadata
/**
* The offset of the next message that will be appended to the log
*/
private[log] def logEndOffset: Long = nextOffsetMetadata.messageOffset
/**
* Update end offset of the log, and update the recoveryPoint.
*更新日志的結(jié)束偏移量,并更新恢復(fù)點(diǎn)。
* @param endOffset the new end offset of the log
*/
private[log] def updateLogEndOffset(endOffset: Long): Unit = {
nextOffsetMetadata = new LogOffsetMetadata(endOffset, segments.activeSegment.baseOffset, segments.activeSegment.size)
//恢復(fù)點(diǎn)設(shè)置為上次的endOffset
if (recoveryPoint > endOffset) {
updateRecoveryPoint(endOffset)
}
}
new LogOffsetMetadata
會(huì)創(chuàng)建一個(gè)新的給logEndOffsetMetadata
,覆蓋掉舊的,并且logEndOffset變成了新的logEndOffsetMetadata
中的messageOffset
public LogOffsetMetadata(long messageOffset,
long segmentBaseOffset,
int relativePositionInSegment) {
this.messageOffset = messageOffset;
this.segmentBaseOffset = segmentBaseOffset;
this.relativePositionInSegment = relativePositionInSegment;
}
四、segment過期執(zhí)行刪除最早創(chuàng)建的segment
1、logStartOffset升高
執(zhí)行deleteSegments
一般是segment
過期執(zhí)行刪除操作,都是從時(shí)間最久的segment開始刪除,所以LogStartOffset
才會(huì)遞增
private def deleteSegments(deletable: Iterable[LogSegment], reason: SegmentDeletionReason): Int = {
maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") {
val numToDelete = deletable.size
if (numToDelete > 0) {
// we must always have at least one segment, so if we are going to delete all the segments, create a new one first
var segmentsToDelete = deletable
if (localLog.segments.numberOfSegments == numToDelete) {
val newSegment = roll()
if (deletable.last.baseOffset == newSegment.baseOffset) {
warn(s"Empty active segment at ${deletable.last.baseOffset} was deleted and recreated due to $reason")
segmentsToDelete = deletable.dropRight(1)
}
}
localLog.checkIfMemoryMappedBufferClosed()
// remove the segments for lookups
localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, reason)
deleteProducerSnapshots(deletable, asyncDelete = true)
maybeIncrementLogStartOffset(localLog.segments.firstSegmentBaseOffset.get, LogStartOffsetIncrementReason.SegmentDeletion)
}
numToDelete
}
}
在刪除segment時(shí),會(huì)調(diào)用maybeIncrementLogStartOffset
會(huì)嘗試修改LogStartOffset
/**
*如果提供的偏移量較大,則遞增日志開始偏移量。
*如果日志開始偏移量發(fā)生更改,則此方法還會(huì)更新一些鍵偏移量,以便“l(fā)ogStartOffset <= logStableOffset <= highWatermark”。
* 前導(dǎo)紀(jì)元緩存也會(huì)更新,以便該組件中引用的所有偏移都指向此日志中的有效偏移。
* @throws OffsetOutOfRangeException if the log start offset is greater than the high watermark
* @return true if the log start offset was updated; otherwise false
*/
def maybeIncrementLogStartOffset(newLogStartOffset: Long, reason: LogStartOffsetIncrementReason): Boolean = {
var updatedLogStartOffset = false
maybeHandleIOException(s"Exception while increasing log start offset for $topicPartition to $newLogStartOffset in dir ${dir.getParent}") {
lock synchronized {
if (newLogStartOffset > highWatermark)
throw new OffsetOutOfRangeException(s"Cannot increment the log start offset to $newLogStartOffset of partition $topicPartition " +
s"since it is larger than the high watermark $highWatermark")
localLog.checkIfMemoryMappedBufferClosed()
if (newLogStartOffset > logStartOffset) {
//修改LogStartOffset
updatedLogStartOffset = true
updateLogStartOffset(newLogStartOffset)
_localLogStartOffset = newLogStartOffset
info(s"Incremented log start offset to $newLogStartOffset due to $reason")
leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
producerStateManager.onLogStartOffsetIncremented(newLogStartOffset)
maybeIncrementFirstUnstableOffset()
}
}
}
updatedLogStartOffset
}
//修改LogStartOffset
private def updateLogStartOffset(offset: Long): Unit = {
logStartOffset = offset
if (highWatermark < offset) {
updateHighWatermark(offset)
}
if (localLog.recoveryPoint < offset) {
localLog.updateRecoveryPoint(offset)
}
}
五、證明LW在代碼中不實(shí)際存儲(chǔ)
1、UnifiedLog.scala沒有字段表示存儲(chǔ)的是LW
但是比如HW,logStartOffset、logEndOffset都存在文章來源:http://www.zghlxwxcb.cn/news/detail-709432.html
class UnifiedLog(@volatile var logStartOffset: Long,
private val localLog: LocalLog,
brokerTopicStats: BrokerTopicStats,
val producerIdExpirationCheckIntervalMs: Int,
@volatile var leaderEpochCache: Option[LeaderEpochFileCache],
val producerStateManager: ProducerStateManager,
@volatile private var _topicId: Option[Uuid],
val keepPartitionMetadataFile: Boolean,
val remoteStorageSystemEnable: Boolean = false,
remoteLogManager: Option[RemoteLogManager] = None,
@volatile private var logOffsetsListener: LogOffsetsListener = LogOffsetsListener.NO_OP_OFFSETS_LISTENER) extends Logging {
//logStartOffset
@volatile private[kafka] var _localLogStartOffset: Long = logStartOffset
//highWatermark
def highWatermark: Long = highWatermarkMetadata.messageOffset
//logEndOffset
def logEndOffset: Long = localLog.logEndOffset
}
2、獲得lowWatermark方法
基本獲得lowWatermark
的地方都是用lowWatermarkIfLeader
獲得LW,而方法內(nèi)部是通過遍歷所有副本中最小的logStartOffset
得到的文章來源地址http://www.zghlxwxcb.cn/news/detail-709432.html
/**
* 低水位線偏移值,僅當(dāng)本地副本是分區(qū)前導(dǎo)符時(shí)才計(jì)算 它僅由領(lǐng)導(dǎo)代理用于決定何時(shí)滿足 DeleteRecordsRequest。
* 它的值是所有活動(dòng)副本的最小 logStartOffset 當(dāng)領(lǐng)導(dǎo)者代理收到 FetchRequest 或 DeleteRecordsRequest 時(shí),低水位線將增加。
*/
def lowWatermarkIfLeader: Long = {
//不是Leader拋異常
if (!isLeader)
throw new NotLeaderOrFollowerException(s"Leader not local for partition $topicPartition on broker $localBrokerId")
//當(dāng) DeleteRecordsRequest 未完成時(shí),lowWatermarkIfLeader 可能會(huì)被多次調(diào)用,已注意避免在此代碼中生成不必要的集合
//首先獲得Leader的logStartOffset
var lowWaterMark = localLogOrException.logStartOffset
//遍歷所有的副本,如果有一個(gè)副本的logStartOffset小于當(dāng)前Leader的logStartOffset,則LW則重新設(shè)置為最小的那個(gè)
remoteReplicas.foreach { replica =>
val logStartOffset = replica.stateSnapshot.logStartOffset
if (metadataCache.hasAliveBroker(replica.brokerId) && logStartOffset < lowWaterMark) {
lowWaterMark = logStartOffset
}
}
//如果存在未來的日志(futureLog),則將最小水位線與未來日志的起始偏移量進(jìn)行比較,取較小值作為最終的最小水位線;如果不存在未來的日志,則直接返回最小水位線。
futureLog match {
case Some(partitionFutureLog) =>
Math.min(lowWaterMark, partitionFutureLog.logStartOffset)
case None =>
lowWaterMark
}
}
到了這里,關(guān)于kafka 3.5 主題分區(qū)的高水位線HW,低水位線LW,logStartOffset,LogEndOffset什么情況下會(huì)更新源碼的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!