Kakfa集群有主題,每一個(gè)主題下又有很多分區(qū),為了保證防止丟失數(shù)據(jù),在分區(qū)下分Leader副本和Follower副本,而kafka的某個(gè)分區(qū)的Leader和Follower數(shù)據(jù)如何同步呢?下面就是講解的這個(gè)
首先要知道,F(xiàn)ollower的數(shù)據(jù)是通過Fetch線程異步從Leader拉取的數(shù)據(jù),不懂的可以看一下Kafka——副本(Replica)機(jī)制
一、Broker接收處理分區(qū)的Leader和Follower的API
在kafkaApis.scala
中
//處理Leader和follower,ISR的請求
case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
def handleLeaderAndIsrRequest(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
//從請求頭中獲取關(guān)聯(lián)ID
//從請求體中獲取LeaderAndIsrRequest對象。
val correlationId = request.header.correlationId
val leaderAndIsrRequest = request.body[LeaderAndIsrRequest]
//對請求進(jìn)行集群操作的授權(quán)驗(yàn)證。
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
//檢查broker的代數(shù)是否過時(shí)。
if (zkSupport.isBrokerEpochStale(leaderAndIsrRequest.brokerEpoch, leaderAndIsrRequest.isKRaftController)) {
//省略代碼
} else {
//調(diào)用replicaManager.becomeLeaderOrFollower方法處理請求,獲取響應(yīng)
val response = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest,
RequestHandlerHelper.onLeadershipChange(groupCoordinator, txnCoordinator, _, _))
requestHelper.sendResponseExemptThrottle(request, response)
}
}
經(jīng)過一些檢驗(yàn)后,調(diào)用becomeLeaderOrFollower
獲得響應(yīng)結(jié)果,
二、針對分區(qū)副本的Leader和Follower的處理邏輯
def becomeLeaderOrFollower(correlationId: Int,
leaderAndIsrRequest: LeaderAndIsrRequest,
onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse = {
//首先記錄了方法開始的時(shí)間戳
val startMs = time.milliseconds()
//副本狀態(tài)改變鎖
replicaStateChangeLock synchronized {
//從leaderAndIsrRequest中獲取一些請求信息,包括controller的ID、分區(qū)狀態(tài)等
val controllerId = leaderAndIsrRequest.controllerId
val requestPartitionStates = leaderAndIsrRequest.partitionStates.asScala
val response = {
//處理過程中,會(huì)檢查請求的controller epoch是否過時(shí)
if (leaderAndIsrRequest.controllerEpoch < controllerEpoch) {
//省略
} else {
val responseMap = new mutable.HashMap[TopicPartition, Errors]
controllerEpoch = leaderAndIsrRequest.controllerEpoch
val partitions = new mutable.HashSet[Partition]()
//要成功Leader分區(qū)的集合
val partitionsToBeLeader = new mutable.HashMap[Partition, LeaderAndIsrPartitionState]()
//要成為Follower分區(qū)的集合
val partitionsToBeFollower = new mutable.HashMap[Partition, LeaderAndIsrPartitionState]()
val topicIdUpdateFollowerPartitions = new mutable.HashSet[Partition]()
//遍歷requestPartitionStates,其中包含了來自控制器(controller)的分區(qū)狀態(tài)請求
requestPartitionStates.foreach { partitionState =>
val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex)
//對于每個(gè)分區(qū)狀態(tài)請求,首先檢查分區(qū)是否存在,如果不存在,則創(chuàng)建一個(gè)新的分區(qū)。
val partitionOpt = getPartition(topicPartition) match {
case HostedPartition.Offline =>
stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
s"controller $controllerId with correlation id $correlationId " +
s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " +
"partition is in an offline log directory")
responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
None
case HostedPartition.Online(partition) =>
Some(partition)
case HostedPartition.None =>
val partition = Partition(topicPartition, time, this)
allPartitions.putIfNotExists(topicPartition, HostedPartition.Online(partition))
Some(partition)
}
//接下來,檢查分區(qū)的主題ID和Leader的epoch(版本號(hào))等信息。
partitionOpt.foreach { partition =>
val currentLeaderEpoch = partition.getLeaderEpoch
val requestLeaderEpoch = partitionState.leaderEpoch
val requestTopicId = topicIdFromRequest(topicPartition.topic)
val logTopicId = partition.topicId
if (!hasConsistentTopicId(requestTopicId, logTopicId)) {
//如果主題ID不一致,則記錄錯(cuò)誤并將其添加到響應(yīng)映射(responseMap)中。
stateChangeLogger.error(s"Topic ID in memory: ${logTopicId.get} does not" +
s" match the topic ID for partition $topicPartition received: " +
s"${requestTopicId.get}.")
responseMap.put(topicPartition, Errors.INCONSISTENT_TOPIC_ID)
}
//如果Leader的epoch大于當(dāng)前的epoch,則記錄控制器(controller)的epoch,并將分區(qū)添加到要成為Leader或Follower的集合中。
else if (requestLeaderEpoch > currentLeaderEpoch) {
//分區(qū)副本確定是當(dāng)前broker的,則添加到partitionsToBeLeader或者partitionsToBeFollower
//如果分區(qū)副本是leader并且broker是當(dāng)前broker,則加入partitionsToBeLeader
//其他的放入到partitionsToBeFollower
//這樣保證后續(xù)操作partitionsToBeLeader或者partitionsToBeFollower只操作當(dāng)前broker的
if (partitionState.replicas.contains(localBrokerId)) {
partitions += partition
if (partitionState.leader == localBrokerId) {
partitionsToBeLeader.put(partition, partitionState)
} else {
partitionsToBeFollower.put(partition, partitionState)
}
}
//省略代碼.....
}
//創(chuàng)建高水位線檢查點(diǎn)
val highWatermarkCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
//如果partitionsToBeLeader非空,則調(diào)用makeLeaders方法將指定的分區(qū)設(shè)置為Leader,并返回這些分區(qū)的集合,否則返回空集合
val partitionsBecomeLeader = if (partitionsToBeLeader.nonEmpty)
//這個(gè)是處理Leader的邏輯
makeLeaders(controllerId, controllerEpoch, partitionsToBeLeader, correlationId, responseMap,
highWatermarkCheckpoints, topicIdFromRequest)
else
Set.empty[Partition]
//如果partitionsToBeFollower非空,則調(diào)用makeFollowers方法將指定的分區(qū)副本設(shè)置為Follower,并返回這些分區(qū)的集合,否則返回空集合。
val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)
//這個(gè)是處理Follower的邏輯
makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap,
highWatermarkCheckpoints, topicIdFromRequest)
else
Set.empty[Partition]
//根據(jù)partitionsBecomeFollower集合獲取Follower分區(qū)的主題集合,并更新相關(guān)指標(biāo)。
val followerTopicSet = partitionsBecomeFollower.map(_.topic).toSet
updateLeaderAndFollowerMetrics(followerTopicSet)
//如果topicIdUpdateFollowerPartitions非空,則調(diào)用updateTopicIdForFollowers方法更新Follower分區(qū)的主題ID。
if (topicIdUpdateFollowerPartitions.nonEmpty)
updateTopicIdForFollowers(controllerId, controllerEpoch, topicIdUpdateFollowerPartitions, correlationId, topicIdFromRequest)
//啟動(dòng)高水位檢查點(diǎn)線程。
startHighWatermarkCheckPointThread()
//根據(jù)參數(shù)初始化日志目錄獲取器
maybeAddLogDirFetchers(partitions, highWatermarkCheckpoints, topicIdFromRequest)
//關(guān)閉空閑的副本獲取器線程
//todo FetcherThreads
replicaFetcherManager.shutdownIdleFetcherThreads()
replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
//省略代碼....
}
}
//省略代碼....
}
}
因?yàn)檫@篇文章主要寫Follower如何拉取數(shù)據(jù),所以只需要關(guān)注上面代碼中的makeFollowers
就可以了
1、如果是Follower,則準(zhǔn)備創(chuàng)建Fetcher線程,好異步執(zhí)行向Leader拉取數(shù)據(jù)
/*
1. 從領(lǐng)導(dǎo)者分區(qū)集中刪除這些分區(qū)。
2. 將這些 partition 標(biāo)記為 follower,之后這些 partition 就不會(huì)再接收 produce 的請求了
3. 停止對這些 partition 的副本同步,這樣這些副本就不會(huì)再有(來自副本請求線程)的數(shù)據(jù)進(jìn)行追加了
4.對這些 partition 的 offset 進(jìn)行 checkpoint,如果日志需要截?cái)嗑瓦M(jìn)行截?cái)嗖僮鳎? 5. 清空 purgatory 中的 produce 和 fetch 請求
6.如果代理未關(guān)閉,向這些 partition 的新 leader 啟動(dòng)副本同步線程
* 執(zhí)行這些步驟的順序可確保轉(zhuǎn)換中的副本在檢查點(diǎn)偏移之前不會(huì)再接收任何消息,以便保證檢查點(diǎn)之前的所有消息都刷新到磁盤
*如果此函數(shù)中拋出意外錯(cuò)誤,它將被傳播到 KafkaAPIS,其中將在每個(gè)分區(qū)上設(shè)置錯(cuò)誤消息,因?yàn)槲覀儾恢朗悄膫€(gè)分區(qū)導(dǎo)致了它。否則,返回由于此方法而成為追隨者的分區(qū)集
*/
private def makeFollowers(controllerId: Int,
controllerEpoch: Int,
partitionStates: Map[Partition, LeaderAndIsrPartitionState],
correlationId: Int,
responseMap: mutable.Map[TopicPartition, Errors],
highWatermarkCheckpoints: OffsetCheckpoints,
topicIds: String => Option[Uuid]) : Set[Partition] = {
val traceLoggingEnabled = stateChangeLogger.isTraceEnabled
//省略代碼。。。。
//創(chuàng)建一個(gè)可變的Set[Partition]對象partitionsToMakeFollower,用于統(tǒng)計(jì)follower的集合
val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set()
try {
partitionStates.forKeyValue { (partition, partitionState) =>
//遍歷partitionStates中的每個(gè)分區(qū),根據(jù)分區(qū)的leader是否可用來改變分區(qū)的狀態(tài)。
val newLeaderBrokerId = partitionState.leader
try {
if (metadataCache.hasAliveBroker(newLeaderBrokerId)) {
//如果分區(qū)的leader可用,將分區(qū)設(shè)置為follower,并將其添加到partitionsToMakeFollower中。
// Only change partition state when the leader is available
if (partition.makeFollower(partitionState, highWatermarkCheckpoints, topicIds(partitionState.topicName))) {
partitionsToMakeFollower += partition
}
} else {
//省略代碼。。
} catch {
//省略代碼。。。
}
}
//刪除針對partitionsToMakeFollower中 partition 的副本同步線程
replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition))
stateChangeLogger.info(s"Stopped fetchers as part of become-follower request from controller $controllerId " +
s"epoch $controllerEpoch with correlation id $correlationId for ${partitionsToMakeFollower.size} partitions")
//對于每個(gè)分區(qū),完成延遲的抓取或生產(chǎn)請求。
partitionsToMakeFollower.foreach { partition =>
completeDelayedFetchOrProduceRequests(partition.topicPartition)
}
//如果正在關(guān)閉服務(wù)器,跳過添加抓取器的步驟。
if (isShuttingDown.get()) {
//省略代碼
} else {
//對于每個(gè)分區(qū),獲取分區(qū)的leader和抓取偏移量,并構(gòu)建partitionsToMakeFollowerWithLeaderAndOffset映射。
val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map { partition =>
val leaderNode = partition.leaderReplicaIdOpt.flatMap(leaderId => metadataCache.
getAliveBrokerNode(leaderId, config.interBrokerListenerName)).getOrElse(Node.noNode())
val leader = new BrokerEndPoint(leaderNode.id(), leaderNode.host(), leaderNode.port())
val log = partition.localLogOrException
val fetchOffset = initialFetchOffset(log)
partition.topicPartition -> InitialFetchState(topicIds(partition.topic), leader, partition.getLeaderEpoch, fetchOffset)
}.toMap
//添加抓取器以獲取partitionsToMakeFollowerWithLeaderAndOffset中的分區(qū)。
replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
}
} catch {
//省略代碼
}
//省略代碼。。。
}
2、遍歷分區(qū)Follower副本,判斷是否有向目標(biāo)broker現(xiàn)成的Fetcher,如果是則復(fù)用,否則創(chuàng)建
之后執(zhí)行replicaFetcherManager.addFetcherForPartitions
把信息添加到指定的Fetcher線程中
// to be defined in subclass to create a specific fetcher
def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): T
//主要目的是將分區(qū)和偏移量添加到相應(yīng)的Fetcher線程中
def addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition, InitialFetchState]): Unit = {
lock synchronized {
//首先對partitionAndOffsets進(jìn)行分組,按照BrokerAndFetcherId來分組
val partitionsPerFetcher = partitionAndOffsets.groupBy { case (topicPartition, brokerAndInitialFetchOffset) =>
BrokerAndFetcherId(brokerAndInitialFetchOffset.leader, getFetcherId(topicPartition))
}
def addAndStartFetcherThread(brokerAndFetcherId: BrokerAndFetcherId,
brokerIdAndFetcherId: BrokerIdAndFetcherId): T = {
//創(chuàng)建Fetcher線程
val fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)
//把線程放入到fetcherThreadMap
fetcherThreadMap.put(brokerIdAndFetcherId, fetcherThread)
//線程啟動(dòng)
fetcherThread.start()
fetcherThread
}
for ((brokerAndFetcherId, initialFetchOffsets) <- partitionsPerFetcher) {
val brokerIdAndFetcherId = BrokerIdAndFetcherId(brokerAndFetcherId.broker.id, brokerAndFetcherId.fetcherId)
//將啟動(dòng)的FetcherThread線程添加到fetcherThreadMap中
val fetcherThread = fetcherThreadMap.get(brokerIdAndFetcherId) match {
// //檢查是否已經(jīng)存在一個(gè)與當(dāng)前broker和fetcher id相匹配的Fetcher線程。
case Some(currentFetcherThread) if currentFetcherThread.leader.brokerEndPoint() == brokerAndFetcherId.broker =>
// reuse the fetcher thread
//如果存在,則重用該線程
currentFetcherThread
case Some(f) =>//如果之前有,fetcher線程,則先關(guān)閉在創(chuàng)建一個(gè)新的Fetcher線程并啟動(dòng)
f.shutdown()
addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId)
case None =>//創(chuàng)建一個(gè)新的Fetcher線程,并啟動(dòng)
addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId)
}
//將分區(qū)添加到相應(yīng)的Fetcher線程中
// failed partitions are removed when added partitions to thread
addPartitionsToFetcherThread(fetcherThread, initialFetchOffsets)
}
}
}
上面可能有疑問,為什么有重用fetcher線程?
答案:是broker并不一定會(huì)為每一個(gè)主題分區(qū)的Follower都啟動(dòng)一個(gè) fetcher 線程,對于一個(gè)目的 broker,只會(huì)啟動(dòng) num.replica.fetchers
個(gè)線程,具體這個(gè) topic-partition
會(huì)分配到哪個(gè) fetcher 線程上,是根據(jù) topic 名和 partition id 進(jìn)行計(jì)算得到,實(shí)現(xiàn)所示:
// Visibility for testing
private[server] def getFetcherId(topicPartition: TopicPartition): Int = {
lock synchronized {
Utils.abs(31 * topicPartition.topic.hashCode() + topicPartition.partition) % numFetchersPerBroker
}
}
繼續(xù)往下,其中createFetcherThread
的實(shí)現(xiàn)是下面
3、創(chuàng)建Fetcher線程的實(shí)現(xiàn)
class ReplicaFetcherManager(brokerConfig: KafkaConfig,
protected val replicaManager: ReplicaManager,
metrics: Metrics,
time: Time,
threadNamePrefix: Option[String] = None,
quotaManager: ReplicationQuotaManager,
metadataVersionSupplier: () => MetadataVersion,
brokerEpochSupplier: () => Long)
extends AbstractFetcherManager[ReplicaFetcherThread](
name = "ReplicaFetcherManager on broker " + brokerConfig.brokerId,
clientId = "Replica",
numFetchers = brokerConfig.numReplicaFetchers) {
override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): ReplicaFetcherThread = {
val prefix = threadNamePrefix.map(tp => s"$tp:").getOrElse("")
val threadName = s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}"
val logContext = new LogContext(s"[ReplicaFetcher replicaId=${brokerConfig.brokerId}, leaderId=${sourceBroker.id}, " +
s"fetcherId=$fetcherId] ")
val endpoint = new BrokerBlockingSender(sourceBroker, brokerConfig, metrics, time, fetcherId,
s"broker-${brokerConfig.brokerId}-fetcher-$fetcherId", logContext)
val fetchSessionHandler = new FetchSessionHandler(logContext, sourceBroker.id)
val leader = new RemoteLeaderEndPoint(logContext.logPrefix, endpoint, fetchSessionHandler, brokerConfig,
replicaManager, quotaManager, metadataVersionSupplier, brokerEpochSupplier)
// 創(chuàng)建了一個(gè)ReplicaFetcherThread對象,它的構(gòu)造函數(shù)接受多個(gè)參數(shù),用于副本的獲取和管理。
new ReplicaFetcherThread(threadName, leader, brokerConfig, failedPartitions, replicaManager,
quotaManager, logContext.logPrefix, metadataVersionSupplier)
}
def shutdown(): Unit = {
info("shutting down")
closeAllFetchers()
info("shutdown completed")
}
}
其中new ReplicaFetcherThread
返回一個(gè)創(chuàng)建的線程
class ReplicaFetcherThread(name: String,
leader: LeaderEndPoint,
brokerConfig: KafkaConfig,
failedPartitions: FailedPartitions,
replicaMgr: ReplicaManager,
quota: ReplicaQuota,
logPrefix: String,
metadataVersionSupplier: () => MetadataVersion)
extends AbstractFetcherThread(name = name,
clientId = name,
leader = leader,
failedPartitions,
fetchTierStateMachine = new ReplicaFetcherTierStateMachine(leader, replicaMgr),
fetchBackOffMs = brokerConfig.replicaFetchBackoffMs,
isInterruptible = false,
replicaMgr.brokerTopicStats) {
override def doWork(): Unit = {
super.doWork()
completeDelayedFetchRequests()
}
}
而ReplicaFetcherThread
繼承的AbstractFetcherThread
類,AbstractFetcherThread
又繼承自ShutdownableThread
類,其中ShutdownableThread
中的run方法是線程的執(zhí)行函數(shù)
abstract class AbstractFetcherThread(name: String,
clientId: String,
val leader: LeaderEndPoint,
failedPartitions: FailedPartitions,
val fetchTierStateMachine: TierStateMachine,
fetchBackOffMs: Int = 0,
isInterruptible: Boolean = true,
val brokerTopicStats: BrokerTopicStats) //BrokerTopicStats's lifecycle managed by ReplicaManager
extends ShutdownableThread(name, isInterruptible) with Logging {
override def doWork(): Unit = {
maybeTruncate()
maybeFetch()
}
}
public abstract class ShutdownableThread extends Thread {
//省略代碼
public abstract void doWork();
public void run() {
isStarted = true;
log.info("Starting");
try {
while (isRunning())
doWork();
} catch (FatalExitError e) {
shutdownInitiated.countDown();
shutdownComplete.countDown();
log.info("Stopped");
Exit.exit(e.statusCode());
} catch (Throwable e) {
if (isRunning())
log.error("Error due to", e);
} finally {
shutdownComplete.countDown();
}
log.info("Stopped");
}
}
ShutdownableThread
中的run函數(shù)調(diào)用子類的doWork()
,
而doWork中的執(zhí)行順序如下文章來源:http://www.zghlxwxcb.cn/news/detail-697005.html
//是否截?cái)?/span>
maybeTruncate()
//抓取
maybeFetch()
//處理延時(shí)抓取請求
completeDelayedFetchRequests()
其中 maybeFetch()
,就是正常拼接fetch
請求,并向目標(biāo)broker
發(fā)送請求,調(diào)用broker
的case ApiKeys.FETCH => handleFetchRequest(request)
,文章來源地址http://www.zghlxwxcb.cn/news/detail-697005.html
private def maybeFetch(): Unit = {
//分區(qū)映射鎖
val fetchRequestOpt = inLock(partitionMapLock) {
val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = leader.buildFetch(partitionStates.partitionStateMap.asScala)
handlePartitionsWithErrors(partitionsWithError, "maybeFetch")
if (fetchRequestOpt.isEmpty) {
trace(s"There are no active partitions. Back off for $fetchBackOffMs ms before sending a fetch request")
partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
}
fetchRequestOpt
}
fetchRequestOpt.foreach { case ReplicaFetch(sessionPartitions, fetchRequest) =>
processFetchRequest(sessionPartitions, fetchRequest)
}
}
到了這里,關(guān)于kafka 3.5 主題分區(qū)的Follower創(chuàng)建Fetcher線程從Leader拉取數(shù)據(jù)源碼的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!