国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

kafka 3.5 主題分區(qū)的Follower創(chuàng)建Fetcher線程從Leader拉取數(shù)據(jù)源碼

這篇具有很好參考價(jià)值的文章主要介紹了kafka 3.5 主題分區(qū)的Follower創(chuàng)建Fetcher線程從Leader拉取數(shù)據(jù)源碼。希望對大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

Kakfa集群有主題,每一個(gè)主題下又有很多分區(qū),為了保證防止丟失數(shù)據(jù),在分區(qū)下分Leader副本和Follower副本,而kafka的某個(gè)分區(qū)的Leader和Follower數(shù)據(jù)如何同步呢?下面就是講解的這個(gè)

首先要知道,F(xiàn)ollower的數(shù)據(jù)是通過Fetch線程異步從Leader拉取的數(shù)據(jù),不懂的可以看一下Kafka——副本(Replica)機(jī)制

一、Broker接收處理分區(qū)的Leader和Follower的API

kafkaApis.scala

     //處理Leader和follower,ISR的請求
     case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
 def handleLeaderAndIsrRequest(request: RequestChannel.Request): Unit = {
    val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
    //從請求頭中獲取關(guān)聯(lián)ID
    //從請求體中獲取LeaderAndIsrRequest對象。
    val correlationId = request.header.correlationId
    val leaderAndIsrRequest = request.body[LeaderAndIsrRequest]
    //對請求進(jìn)行集群操作的授權(quán)驗(yàn)證。
    authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
    //檢查broker的代數(shù)是否過時(shí)。
    if (zkSupport.isBrokerEpochStale(leaderAndIsrRequest.brokerEpoch, leaderAndIsrRequest.isKRaftController)) {
    //省略代碼
    } else {
      //調(diào)用replicaManager.becomeLeaderOrFollower方法處理請求,獲取響應(yīng)
      val response = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest,
        RequestHandlerHelper.onLeadershipChange(groupCoordinator, txnCoordinator, _, _))
      requestHelper.sendResponseExemptThrottle(request, response)
    }
  }

經(jīng)過一些檢驗(yàn)后,調(diào)用becomeLeaderOrFollower獲得響應(yīng)結(jié)果,

二、針對分區(qū)副本的Leader和Follower的處理邏輯

def becomeLeaderOrFollower(correlationId: Int,
                             leaderAndIsrRequest: LeaderAndIsrRequest,
                             onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse = {
    //首先記錄了方法開始的時(shí)間戳
    val startMs = time.milliseconds()
    //副本狀態(tài)改變鎖
    replicaStateChangeLock synchronized {
      //從leaderAndIsrRequest中獲取一些請求信息,包括controller的ID、分區(qū)狀態(tài)等
      val controllerId = leaderAndIsrRequest.controllerId
      val requestPartitionStates = leaderAndIsrRequest.partitionStates.asScala
   

      val response = {
        //處理過程中,會(huì)檢查請求的controller epoch是否過時(shí)
        if (leaderAndIsrRequest.controllerEpoch < controllerEpoch) {
       		//省略
        } else {
          val responseMap = new mutable.HashMap[TopicPartition, Errors]
          controllerEpoch = leaderAndIsrRequest.controllerEpoch

          val partitions = new mutable.HashSet[Partition]()
          //要成功Leader分區(qū)的集合
          val partitionsToBeLeader = new mutable.HashMap[Partition, LeaderAndIsrPartitionState]()
          //要成為Follower分區(qū)的集合
          val partitionsToBeFollower = new mutable.HashMap[Partition, LeaderAndIsrPartitionState]()
          val topicIdUpdateFollowerPartitions = new mutable.HashSet[Partition]()

          //遍歷requestPartitionStates,其中包含了來自控制器(controller)的分區(qū)狀態(tài)請求
          requestPartitionStates.foreach { partitionState =>
            val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex)
            //對于每個(gè)分區(qū)狀態(tài)請求,首先檢查分區(qū)是否存在,如果不存在,則創(chuàng)建一個(gè)新的分區(qū)。
            val partitionOpt = getPartition(topicPartition) match {
              case HostedPartition.Offline =>
                stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
                  s"controller $controllerId with correlation id $correlationId " +
                  s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " +
                  "partition is in an offline log directory")
                responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
                None

              case HostedPartition.Online(partition) =>
                Some(partition)

              case HostedPartition.None =>
                val partition = Partition(topicPartition, time, this)
                allPartitions.putIfNotExists(topicPartition, HostedPartition.Online(partition))
                Some(partition)
            }

            //接下來,檢查分區(qū)的主題ID和Leader的epoch(版本號(hào))等信息。
            partitionOpt.foreach { partition =>
              val currentLeaderEpoch = partition.getLeaderEpoch
              val requestLeaderEpoch = partitionState.leaderEpoch
              val requestTopicId = topicIdFromRequest(topicPartition.topic)
              val logTopicId = partition.topicId

              if (!hasConsistentTopicId(requestTopicId, logTopicId)) {
                //如果主題ID不一致,則記錄錯(cuò)誤并將其添加到響應(yīng)映射(responseMap)中。
                stateChangeLogger.error(s"Topic ID in memory: ${logTopicId.get} does not" +
                  s" match the topic ID for partition $topicPartition received: " +
                  s"${requestTopicId.get}.")
                responseMap.put(topicPartition, Errors.INCONSISTENT_TOPIC_ID)
              } 
              //如果Leader的epoch大于當(dāng)前的epoch,則記錄控制器(controller)的epoch,并將分區(qū)添加到要成為Leader或Follower的集合中。 
              else if (requestLeaderEpoch > currentLeaderEpoch) {              
                //分區(qū)副本確定是當(dāng)前broker的,則添加到partitionsToBeLeader或者partitionsToBeFollower
                //如果分區(qū)副本是leader并且broker是當(dāng)前broker,則加入partitionsToBeLeader
                //其他的放入到partitionsToBeFollower
                //這樣保證后續(xù)操作partitionsToBeLeader或者partitionsToBeFollower只操作當(dāng)前broker的
                if (partitionState.replicas.contains(localBrokerId)) {
                  partitions += partition
                  if (partitionState.leader == localBrokerId) {
                    partitionsToBeLeader.put(partition, partitionState)
                  } else {
                    partitionsToBeFollower.put(partition, partitionState)
                  }
                } 
                //省略代碼.....
          }
          //創(chuàng)建高水位線檢查點(diǎn)
          val highWatermarkCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
          //如果partitionsToBeLeader非空,則調(diào)用makeLeaders方法將指定的分區(qū)設(shè)置為Leader,并返回這些分區(qū)的集合,否則返回空集合
          val partitionsBecomeLeader = if (partitionsToBeLeader.nonEmpty)
          //這個(gè)是處理Leader的邏輯
            makeLeaders(controllerId, controllerEpoch, partitionsToBeLeader, correlationId, responseMap,
              highWatermarkCheckpoints, topicIdFromRequest)
          else
            Set.empty[Partition]
          //如果partitionsToBeFollower非空,則調(diào)用makeFollowers方法將指定的分區(qū)副本設(shè)置為Follower,并返回這些分區(qū)的集合,否則返回空集合。
          val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)
          //這個(gè)是處理Follower的邏輯
            makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap,
              highWatermarkCheckpoints, topicIdFromRequest)
          else
            Set.empty[Partition]
          //根據(jù)partitionsBecomeFollower集合獲取Follower分區(qū)的主題集合,并更新相關(guān)指標(biāo)。
          val followerTopicSet = partitionsBecomeFollower.map(_.topic).toSet
          updateLeaderAndFollowerMetrics(followerTopicSet)
          //如果topicIdUpdateFollowerPartitions非空,則調(diào)用updateTopicIdForFollowers方法更新Follower分區(qū)的主題ID。
          if (topicIdUpdateFollowerPartitions.nonEmpty)
            updateTopicIdForFollowers(controllerId, controllerEpoch, topicIdUpdateFollowerPartitions, correlationId, topicIdFromRequest)    
          //啟動(dòng)高水位檢查點(diǎn)線程。
          startHighWatermarkCheckPointThread()
          //根據(jù)參數(shù)初始化日志目錄獲取器
          maybeAddLogDirFetchers(partitions, highWatermarkCheckpoints, topicIdFromRequest)
          //關(guān)閉空閑的副本獲取器線程
          //todo FetcherThreads
          replicaFetcherManager.shutdownIdleFetcherThreads()
          replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
		//省略代碼....		
        }
      }
      //省略代碼....		
    }
  }

因?yàn)檫@篇文章主要寫Follower如何拉取數(shù)據(jù),所以只需要關(guān)注上面代碼中的makeFollowers就可以了

1、如果是Follower,則準(zhǔn)備創(chuàng)建Fetcher線程,好異步執(zhí)行向Leader拉取數(shù)據(jù)

/*
    1. 從領(lǐng)導(dǎo)者分區(qū)集中刪除這些分區(qū)。
    2. 將這些 partition 標(biāo)記為 follower,之后這些 partition 就不會(huì)再接收 produce 的請求了
    3. 停止對這些 partition 的副本同步,這樣這些副本就不會(huì)再有(來自副本請求線程)的數(shù)據(jù)進(jìn)行追加了
    4.對這些 partition 的 offset 進(jìn)行 checkpoint,如果日志需要截?cái)嗑瓦M(jìn)行截?cái)嗖僮鳎?    5.  清空 purgatory 中的 produce 和 fetch 請求
    6.如果代理未關(guān)閉,向這些 partition 的新 leader 啟動(dòng)副本同步線程
   * 執(zhí)行這些步驟的順序可確保轉(zhuǎn)換中的副本在檢查點(diǎn)偏移之前不會(huì)再接收任何消息,以便保證檢查點(diǎn)之前的所有消息都刷新到磁盤
   *如果此函數(shù)中拋出意外錯(cuò)誤,它將被傳播到 KafkaAPIS,其中將在每個(gè)分區(qū)上設(shè)置錯(cuò)誤消息,因?yàn)槲覀儾恢朗悄膫€(gè)分區(qū)導(dǎo)致了它。否則,返回由于此方法而成為追隨者的分區(qū)集
   */
  private def makeFollowers(controllerId: Int,
                            controllerEpoch: Int,
                            partitionStates: Map[Partition, LeaderAndIsrPartitionState],
                            correlationId: Int,
                            responseMap: mutable.Map[TopicPartition, Errors],
                            highWatermarkCheckpoints: OffsetCheckpoints,
                            topicIds: String => Option[Uuid]) : Set[Partition] = {
    val traceLoggingEnabled = stateChangeLogger.isTraceEnabled
   	//省略代碼。。。。
    //創(chuàng)建一個(gè)可變的Set[Partition]對象partitionsToMakeFollower,用于統(tǒng)計(jì)follower的集合
    val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set()
    try {
      partitionStates.forKeyValue { (partition, partitionState) =>
        //遍歷partitionStates中的每個(gè)分區(qū),根據(jù)分區(qū)的leader是否可用來改變分區(qū)的狀態(tài)。
        val newLeaderBrokerId = partitionState.leader
        try {
          if (metadataCache.hasAliveBroker(newLeaderBrokerId)) {
            //如果分區(qū)的leader可用,將分區(qū)設(shè)置為follower,并將其添加到partitionsToMakeFollower中。
            // Only change partition state when the leader is available
            if (partition.makeFollower(partitionState, highWatermarkCheckpoints, topicIds(partitionState.topicName))) {
              partitionsToMakeFollower += partition
            }
          } else {
          //省略代碼。。
        } catch {
      		//省略代碼。。。
        }
      }
      //刪除針對partitionsToMakeFollower中 partition 的副本同步線程
      replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition))
      stateChangeLogger.info(s"Stopped fetchers as part of become-follower request from controller $controllerId " +
        s"epoch $controllerEpoch with correlation id $correlationId for ${partitionsToMakeFollower.size} partitions")
      //對于每個(gè)分區(qū),完成延遲的抓取或生產(chǎn)請求。
      partitionsToMakeFollower.foreach { partition =>
        completeDelayedFetchOrProduceRequests(partition.topicPartition)
      }
      //如果正在關(guān)閉服務(wù)器,跳過添加抓取器的步驟。
      if (isShuttingDown.get()) {
      	//省略代碼
      } else {
        //對于每個(gè)分區(qū),獲取分區(qū)的leader和抓取偏移量,并構(gòu)建partitionsToMakeFollowerWithLeaderAndOffset映射。
        val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map { partition =>
          val leaderNode = partition.leaderReplicaIdOpt.flatMap(leaderId => metadataCache.
            getAliveBrokerNode(leaderId, config.interBrokerListenerName)).getOrElse(Node.noNode())
          val leader = new BrokerEndPoint(leaderNode.id(), leaderNode.host(), leaderNode.port())
          val log = partition.localLogOrException
          val fetchOffset = initialFetchOffset(log)
          partition.topicPartition -> InitialFetchState(topicIds(partition.topic), leader, partition.getLeaderEpoch, fetchOffset)
        }.toMap
        //添加抓取器以獲取partitionsToMakeFollowerWithLeaderAndOffset中的分區(qū)。
        replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
      }
    } catch {
     	//省略代碼
    }
	 //省略代碼。。。
  }

2、遍歷分區(qū)Follower副本,判斷是否有向目標(biāo)broker現(xiàn)成的Fetcher,如果是則復(fù)用,否則創(chuàng)建

之后執(zhí)行replicaFetcherManager.addFetcherForPartitions把信息添加到指定的Fetcher線程中

 // to be defined in subclass to create a specific fetcher
  def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): T
//主要目的是將分區(qū)和偏移量添加到相應(yīng)的Fetcher線程中
  def addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition, InitialFetchState]): Unit = {
    lock synchronized {
      //首先對partitionAndOffsets進(jìn)行分組,按照BrokerAndFetcherId來分組
      val partitionsPerFetcher = partitionAndOffsets.groupBy { case (topicPartition, brokerAndInitialFetchOffset) =>
        BrokerAndFetcherId(brokerAndInitialFetchOffset.leader, getFetcherId(topicPartition))
      }

      def addAndStartFetcherThread(brokerAndFetcherId: BrokerAndFetcherId,
                                   brokerIdAndFetcherId: BrokerIdAndFetcherId): T = {
        //創(chuàng)建Fetcher線程                           
        val fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)
        //把線程放入到fetcherThreadMap
        fetcherThreadMap.put(brokerIdAndFetcherId, fetcherThread)
        //線程啟動(dòng)
        fetcherThread.start()
        fetcherThread
      }

      for ((brokerAndFetcherId, initialFetchOffsets) <- partitionsPerFetcher) {

        val brokerIdAndFetcherId = BrokerIdAndFetcherId(brokerAndFetcherId.broker.id, brokerAndFetcherId.fetcherId)
        //將啟動(dòng)的FetcherThread線程添加到fetcherThreadMap中
        val fetcherThread = fetcherThreadMap.get(brokerIdAndFetcherId) match {
          // //檢查是否已經(jīng)存在一個(gè)與當(dāng)前broker和fetcher id相匹配的Fetcher線程。
          case Some(currentFetcherThread) if currentFetcherThread.leader.brokerEndPoint() == brokerAndFetcherId.broker =>
            // reuse the fetcher thread
            //如果存在,則重用該線程
            currentFetcherThread
          case Some(f) =>//如果之前有,fetcher線程,則先關(guān)閉在創(chuàng)建一個(gè)新的Fetcher線程并啟動(dòng)
            f.shutdown()
            addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId)
          case None =>//創(chuàng)建一個(gè)新的Fetcher線程,并啟動(dòng)
            addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId)
        }
        //將分區(qū)添加到相應(yīng)的Fetcher線程中
        // failed partitions are removed when added partitions to thread
        addPartitionsToFetcherThread(fetcherThread, initialFetchOffsets)
      }
    }
  }

上面可能有疑問,為什么有重用fetcher線程?
答案:是broker并不一定會(huì)為每一個(gè)主題分區(qū)的Follower都啟動(dòng)一個(gè) fetcher 線程,對于一個(gè)目的 broker,只會(huì)啟動(dòng) num.replica.fetchers 個(gè)線程,具體這個(gè) topic-partition 會(huì)分配到哪個(gè) fetcher 線程上,是根據(jù) topic 名和 partition id 進(jìn)行計(jì)算得到,實(shí)現(xiàn)所示:

  // Visibility for testing
  private[server] def getFetcherId(topicPartition: TopicPartition): Int = {
    lock synchronized {
      Utils.abs(31 * topicPartition.topic.hashCode() + topicPartition.partition) % numFetchersPerBroker
    }
  }

繼續(xù)往下,其中createFetcherThread的實(shí)現(xiàn)是下面

3、創(chuàng)建Fetcher線程的實(shí)現(xiàn)

class ReplicaFetcherManager(brokerConfig: KafkaConfig,
                            protected val replicaManager: ReplicaManager,
                            metrics: Metrics,
                            time: Time,
                            threadNamePrefix: Option[String] = None,
                            quotaManager: ReplicationQuotaManager,
                            metadataVersionSupplier: () => MetadataVersion,
                            brokerEpochSupplier: () => Long)
      extends AbstractFetcherManager[ReplicaFetcherThread](
        name = "ReplicaFetcherManager on broker " + brokerConfig.brokerId,
        clientId = "Replica",
        numFetchers = brokerConfig.numReplicaFetchers) {

  override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): ReplicaFetcherThread = {
    val prefix = threadNamePrefix.map(tp => s"$tp:").getOrElse("")
    val threadName = s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}"
    val logContext = new LogContext(s"[ReplicaFetcher replicaId=${brokerConfig.brokerId}, leaderId=${sourceBroker.id}, " +
      s"fetcherId=$fetcherId] ")
    val endpoint = new BrokerBlockingSender(sourceBroker, brokerConfig, metrics, time, fetcherId,
      s"broker-${brokerConfig.brokerId}-fetcher-$fetcherId", logContext)
    val fetchSessionHandler = new FetchSessionHandler(logContext, sourceBroker.id)
    val leader = new RemoteLeaderEndPoint(logContext.logPrefix, endpoint, fetchSessionHandler, brokerConfig,
      replicaManager, quotaManager, metadataVersionSupplier, brokerEpochSupplier)
   // 創(chuàng)建了一個(gè)ReplicaFetcherThread對象,它的構(gòu)造函數(shù)接受多個(gè)參數(shù),用于副本的獲取和管理。
    new ReplicaFetcherThread(threadName, leader, brokerConfig, failedPartitions, replicaManager,
      quotaManager, logContext.logPrefix, metadataVersionSupplier)
  }

  def shutdown(): Unit = {
    info("shutting down")
    closeAllFetchers()
    info("shutdown completed")
  }
}

其中new ReplicaFetcherThread返回一個(gè)創(chuàng)建的線程

class ReplicaFetcherThread(name: String,
                           leader: LeaderEndPoint,
                           brokerConfig: KafkaConfig,
                           failedPartitions: FailedPartitions,
                           replicaMgr: ReplicaManager,
                           quota: ReplicaQuota,
                           logPrefix: String,
                           metadataVersionSupplier: () => MetadataVersion)
  extends AbstractFetcherThread(name = name,
                                clientId = name,
                                leader = leader,
                                failedPartitions,
                                fetchTierStateMachine = new ReplicaFetcherTierStateMachine(leader, replicaMgr),
                                fetchBackOffMs = brokerConfig.replicaFetchBackoffMs,
                                isInterruptible = false,
                                replicaMgr.brokerTopicStats) {

override def doWork(): Unit = {
    super.doWork()
    completeDelayedFetchRequests()
  }




}

ReplicaFetcherThread繼承的AbstractFetcherThread類,AbstractFetcherThread又繼承自ShutdownableThread類,其中ShutdownableThread中的run方法是線程的執(zhí)行函數(shù)

abstract class AbstractFetcherThread(name: String,
                                     clientId: String,
                                     val leader: LeaderEndPoint,
                                     failedPartitions: FailedPartitions,
                                     val fetchTierStateMachine: TierStateMachine,
                                     fetchBackOffMs: Int = 0,
                                     isInterruptible: Boolean = true,
                                     val brokerTopicStats: BrokerTopicStats) //BrokerTopicStats's lifecycle managed by ReplicaManager
  extends ShutdownableThread(name, isInterruptible) with Logging {
 override def doWork(): Unit = {
    maybeTruncate()
    maybeFetch()
  }
}
public abstract class ShutdownableThread extends Thread {
	//省略代碼
 public abstract void doWork();
  public void run() {
        isStarted = true;
        log.info("Starting");
        try {
            while (isRunning())
                doWork();
        } catch (FatalExitError e) {
            shutdownInitiated.countDown();
            shutdownComplete.countDown();
            log.info("Stopped");
            Exit.exit(e.statusCode());
        } catch (Throwable e) {
            if (isRunning())
                log.error("Error due to", e);
        } finally {
            shutdownComplete.countDown();
        }
        log.info("Stopped");
    }
}    

ShutdownableThread中的run函數(shù)調(diào)用子類的doWork()
而doWork中的執(zhí)行順序如下

//是否截?cái)?/span>
 maybeTruncate()
 //抓取
 maybeFetch()
 //處理延時(shí)抓取請求
 completeDelayedFetchRequests()

其中 maybeFetch(),就是正常拼接fetch請求,并向目標(biāo)broker發(fā)送請求,調(diào)用brokercase ApiKeys.FETCH => handleFetchRequest(request)文章來源地址http://www.zghlxwxcb.cn/news/detail-697005.html

private def maybeFetch(): Unit = {
	//分區(qū)映射鎖
    val fetchRequestOpt = inLock(partitionMapLock) {
      val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = leader.buildFetch(partitionStates.partitionStateMap.asScala)

      handlePartitionsWithErrors(partitionsWithError, "maybeFetch")

      if (fetchRequestOpt.isEmpty) {
        trace(s"There are no active partitions. Back off for $fetchBackOffMs ms before sending a fetch request")
        partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
      }

      fetchRequestOpt
    }

    fetchRequestOpt.foreach { case ReplicaFetch(sessionPartitions, fetchRequest) =>
      processFetchRequest(sessionPartitions, fetchRequest)
    }
  }

到了這里,關(guān)于kafka 3.5 主題分區(qū)的Follower創(chuàng)建Fetcher線程從Leader拉取數(shù)據(jù)源碼的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • kafka 3.5 主題分區(qū)的高水位線HW,低水位線LW,logStartOffset,LogEndOffset什么情況下會(huì)更新源碼

    下面的例子只是各拿一個(gè)做舉例,不是全部場景,不要以為logStartOffset,LogEndOffset,HW,LW只有三個(gè)場景可以修改 這里需要針對 logStartOffset 和 LogEndOffset 做特殊說明,要不會(huì)讓大家腦袋混亂,并且前言后的章節(jié)講的都是 主題分區(qū)級別 的 (1)主題分區(qū)級別 對于每個(gè)分區(qū)中每一個(gè)

    2024年02月09日
    瀏覽(17)
  • Kafka:主題創(chuàng)建、分區(qū)修改查看、生產(chǎn)者、消費(fèi)者

    Kafka:主題創(chuàng)建、分區(qū)修改查看、生產(chǎn)者、消費(fèi)者

    1.創(chuàng)建主題 2.查看所有主題 3.查看詳細(xì)主題 序號(hào)從0開始計(jì)算 Partition:分區(qū)數(shù),該主題有3個(gè)分區(qū) Replica:副本數(shù),該主題有3個(gè)副本 Leader:副本數(shù)中的主的序號(hào),生產(chǎn)消費(fèi)的對象 1.修改分區(qū)數(shù) 修改的分區(qū)數(shù)量不可以小于或者等于當(dāng)前主題分區(qū)的數(shù)量,否則會(huì)報(bào)錯(cuò) 在根目錄kaf

    2024年02月11日
    瀏覽(32)
  • 05、Kafka ------ 各個(gè)功能的作用解釋(主題和分區(qū) 詳解,用命令行和圖形界面創(chuàng)建主題和查看主題)

    05、Kafka ------ 各個(gè)功能的作用解釋(主題和分區(qū) 詳解,用命令行和圖形界面創(chuàng)建主題和查看主題)

    Kafka 主題雖然也叫 topic,但它和 Pub-Sub 消息模型中 topic 主題及 AMQP 的 topic 都不同(AMQP 的 topic 只是 Exchange 的類型)。 Kafka 的主題只是盛裝消息的邏輯容器(注意是邏輯容器),主題之下會(huì)分為若干個(gè)分區(qū),分區(qū)才是盛裝消息的物理容器。 ▲ 消息組織方式實(shí)際上是三級結(jié)構(gòu)

    2024年02月03日
    瀏覽(32)
  • kafka中topic的部分分區(qū)leader為none,怎樣解決?

    kafka中topic的部分分區(qū)leader為none,怎樣解決?

    ? (以Hadoop的topic為例) 進(jìn)入Zookeeper客戶端查看kafka存儲(chǔ)的信息,/kafka/brokers/topics/hadoop/partitions/1/state get /kafka/brokers/topics/hadoop/partitions/1/state 查看到 {\\\"controller_epoch\\\":33,\\\"leader\\\":-1,\\\"version\\\":1,\\\"leader_epoch\\\":25,\\\"isr\\\":[3]}? leader為-1,固分區(qū)的leader為none 修改/kafka/brokers/topics/hadoop/partitions/

    2024年02月03日
    瀏覽(21)
  • 【基礎(chǔ)】Kafka -- 主題與分區(qū)

    【基礎(chǔ)】Kafka -- 主題與分區(qū)

    主題管理包括創(chuàng)建主題、查看主題消息、修改主題以及刪除主題等操作,Kafka 提供的 kafka-topics.sh 腳本來執(zhí)行這些操作,腳本位于 $KAFKA_HOME/bin/ 目錄下,該腳本實(shí)際上是調(diào)用了 kafka.admin.TopicCommand 類來執(zhí)行主題管理的操作。 簡單創(chuàng)建與查看 若 broker 端的配置參數(shù) auto.create.top

    2023年04月22日
    瀏覽(17)
  • k8s部署zookeeper集群(3節(jié)點(diǎn),1個(gè)leader,2個(gè)follower)

    環(huán)境: centos 7.9 k8s集群 在k8s上面安裝zookeeper集群,我們還是按照k8s的官方文檔來安裝吧,這樣比較好,網(wǎng)上有各種各樣的安裝方式,這里使用 https://kubernetes.io/docs/tutorials/stateful-application/zookeeper/ k8s的官方文檔來安裝。

    2024年02月13日
    瀏覽(62)
  • zookeeper4==zookeeper源碼閱讀,F(xiàn)OLLOWER收到了需要LEADER執(zhí)行的命令后各節(jié)點(diǎn)會(huì)執(zhí)行什么

    zookeeper4==zookeeper源碼閱讀,F(xiàn)OLLOWER收到了需要LEADER執(zhí)行的命令后各節(jié)點(diǎn)會(huì)執(zhí)行什么

    上面已經(jīng)閱讀并觀察了節(jié)點(diǎn)確定自己的身份后會(huì)做些什么,大致就是比對雙方信息然后完成同步。 本篇閱讀, FOLLOWER收到了需要LEADER執(zhí)行的命令后,怎么同步給LEADER的,并且LEADER會(huì)執(zhí)行什么操作。 源碼啟動(dòng)zkCli用于測試 將原本的代碼拷貝一份用IDEA打開后,找到org.apache.zook

    2024年02月03日
    瀏覽(15)
  • 解密Kafka主題的分區(qū)策略:提升實(shí)時(shí)數(shù)據(jù)處理的關(guān)鍵

    解密Kafka主題的分區(qū)策略:提升實(shí)時(shí)數(shù)據(jù)處理的關(guān)鍵

    大家好,我是哪吒。 Kafka幾乎是當(dāng)今時(shí)代背景下數(shù)據(jù)管道的首選,無論你是做后端開發(fā)、還是大數(shù)據(jù)開發(fā),對它可能都不陌生。開源軟件Kafka的應(yīng)用越來越廣泛。 面對Kafka的普及和學(xué)習(xí)熱潮,哪吒想分享一下自己多年的開發(fā)經(jīng)驗(yàn),帶領(lǐng)讀者比較輕松地掌握Kafka的相關(guān)知識(shí)。 上

    2024年02月05日
    瀏覽(26)
  • [AIGC_coze] Kafka 的主題分區(qū)之間的關(guān)系

    [AIGC_coze] Kafka 的主題分區(qū)之間的關(guān)系

    在 Kafka 中,主題(Topics)和分區(qū)(Partitions)是兩個(gè)重要的概念,它們之間存在著密切的關(guān)系。 主題是 Kafka 中用于數(shù)據(jù)發(fā)布和訂閱的邏輯單元。每個(gè)主題可以包含多個(gè)分區(qū),每個(gè)分區(qū)都是一個(gè)獨(dú)立的有序數(shù)據(jù)集。生產(chǎn)者將數(shù)據(jù)發(fā)送到特定的主題,而消費(fèi)者通過訂閱主題來接收

    2024年02月19日
    瀏覽(20)
  • JAVA實(shí)時(shí)獲取kafka各個(gè)主題下分區(qū)消息的消費(fèi)情況

    通過指定 主題 和 消費(fèi)者組 調(diào)用方法,實(shí)時(shí)查看主題下分區(qū)消息的消費(fèi)情況(消息總數(shù)量、消費(fèi)消息數(shù)量、未消費(fèi)的消息數(shù)量)。

    2024年02月13日
    瀏覽(27)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包