国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

kafka使用詳解、最佳實(shí)踐和問(wèn)題排查

這篇具有很好參考價(jià)值的文章主要介紹了kafka使用詳解、最佳實(shí)踐和問(wèn)題排查。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

kafka是一個(gè)常用的分布式消息中間件,與RabbitMQ對(duì)比,特點(diǎn)是可以無(wú)限橫向擴(kuò)容,并保持高可靠性、高吞吐量和低延遲,因此比RabbitMQ有更高的市場(chǎng)占有率(網(wǎng)上搜了一下,kafka大約41%,RabbitMQ大約29%)。

一、kafka常見(jiàn)概念

一般正常的開(kāi)發(fā),了解到前6個(gè)概念就好,其余的概念更多用于kafka運(yùn)維配置,或問(wèn)題排查。

1、producer生產(chǎn)者

指生產(chǎn)消息,并把消息投遞到kafka的外部應(yīng)用程序 ,它不是kafka的組成部分

2、consumer消費(fèi)者

指連接到kafka,接收/訂閱消息,并進(jìn)行后續(xù)邏輯處理的外部應(yīng)用程序,它也不是kafka的組成部分。
一個(gè)消費(fèi)者可以同時(shí)消費(fèi)kafka的多個(gè)隊(duì)列(主題)

3、Consumer Group消費(fèi)者組

連接到kafka的消費(fèi)者,必須指定消費(fèi)者組,多個(gè)消費(fèi)者可以指定相同的消費(fèi)者組,這樣可以避免同一個(gè)消息被重復(fù)消費(fèi)。
如果2個(gè)消費(fèi)者綁定了同一個(gè)隊(duì)列(主題),指定了不同的消費(fèi)者組,則每條消息,都會(huì)同時(shí)投遞給這2個(gè)消費(fèi)者。

4、topic 主題

kafka里,收發(fā)消息的邏輯集合,每個(gè)主題,都可以認(rèn)為是一個(gè)隊(duì)列;
生產(chǎn)者和消費(fèi)者,都是通過(guò)連接主題 來(lái)處理消息。

5、partition分區(qū)

kafka里存儲(chǔ)消息的物理集合,一個(gè)主題可以劃分為1個(gè)或多個(gè)分區(qū),可以理解為子隊(duì)列;
每個(gè)分區(qū)只屬于一個(gè)主題,且只能被一個(gè)消費(fèi)者消費(fèi)(同一個(gè)組)。
該主題收到的所有消息,會(huì)根據(jù)消息的key選擇對(duì)應(yīng)的分區(qū)進(jìn)行投遞;
如果消息未指定key,且沒(méi)有定義分區(qū)規(guī)則時(shí),則kafka會(huì)隨機(jī)平均投遞到主題的多個(gè)分區(qū)里。
注意:每個(gè)分區(qū)里的消息,一定是按隊(duì)列的規(guī)則,保證先進(jìn)先出;但是不同的分區(qū)的消息,則無(wú)法保證。
因此,如果要確保消費(fèi)者能按消息的投遞順序進(jìn)行消費(fèi):

  • 每個(gè)主題只建一個(gè)分區(qū)(這個(gè)不推薦)
  • 同一批需要保證順序的消息,指定相同的key,比如使用用戶ID作為消息的key,相同key的消息會(huì)投遞到同一個(gè)分區(qū)

6、offset偏移量

指每個(gè)分區(qū)里的消息的唯一編號(hào),并且是從0開(kāi)始遞增的。主題+分區(qū)+偏移量,可以唯一定位一條消息。
注:每個(gè)分區(qū)里的每條消息,offset一定是不同的;不同分區(qū)的offset是會(huì)重復(fù)的。
消費(fèi)者會(huì)也記錄每次消費(fèi)的offset值,來(lái)標(biāo)識(shí)自己當(dāng)前處理到哪一條消息了,以便斷開(kāi)重連時(shí),繼續(xù)消費(fèi),消費(fèi)者的offset也是存儲(chǔ)在kafka中。

7、broker集群節(jié)點(diǎn)

kafka集群里的某個(gè)節(jié)點(diǎn),通常是一臺(tái)服務(wù)器上的kafka實(shí)例。

8、replica副本

主題的每個(gè)分區(qū),可以指定多個(gè)副本,每個(gè)副本都存儲(chǔ)一份完全相同的消息數(shù)據(jù)。
一般建議同一個(gè)分區(qū)的不同副本,要保存在不同的broker上,避免broker故障導(dǎo)致該分區(qū)數(shù)據(jù)丟失。

9、leader/follower

主題的每個(gè)分區(qū),如果有多個(gè)副本,那么其中一個(gè)副本會(huì)作為leader對(duì)外提供讀寫(xiě)服務(wù),其余的作為follower只同步數(shù)據(jù)。
如果leader出現(xiàn)故障時(shí),會(huì)從follower中選舉一個(gè)副本作為leader重新提供服務(wù)。

10、ISR(in-sync replicas)

分區(qū)的同步副本集合,每個(gè)分區(qū)都會(huì)維護(hù)一個(gè)ISR列表,內(nèi)容是那些與leader保持同步的follower清單。
如果某個(gè)follower跟不上同步的進(jìn)度,或無(wú)法保持同步時(shí),會(huì)從ISR列表中移除。
只有在ISR列表里的follower,才有機(jī)會(huì)提升為leader.
注:已經(jīng)成為leader的副本,也在ISR中

11、LEO日志末端偏移量

LEO指Log End Offset,即當(dāng)前分區(qū)中下一條待寫(xiě)入消息的偏移量,該條消息是未指向具體的消息。
分區(qū)的每個(gè)副本,都有自己的LEO。

12、HW高水位線

HW指High Watermark Offset,即當(dāng)前分區(qū)中已經(jīng)被提交并復(fù)制到所有副本的最高消息偏移量(offset),
leader接收到消息,但是還未同步完時(shí),不會(huì)更新HW值。
leader會(huì)比對(duì)自己和所有follower的LEO,用其中的較小值,來(lái)更新HW值。

13、LAG滯后消息數(shù)

一個(gè)消費(fèi)者組,在消費(fèi) 主題的每個(gè)分區(qū)時(shí),每個(gè)分區(qū)都會(huì)計(jì)算一個(gè)LAG值,指該分區(qū)的消息總數(shù)與消費(fèi)者已消費(fèi)的消息數(shù)的差值。
通常是 該分區(qū)的HW 減 消費(fèi)者組的offset。
實(shí)踐中,運(yùn)維人員應(yīng)當(dāng)對(duì)LAG進(jìn)行監(jiān)控,比如超出10000時(shí)進(jìn)行告警和處理。

理解了這些概念,網(wǎng)上找了一張kafka工作原理圖:
kafka使用詳解、最佳實(shí)踐和問(wèn)題排查

二、kafka與RabbitMQ對(duì)比

相對(duì)于RabbitMQ,Kafka有如下特點(diǎn):

  • kafka的消息消費(fèi)完并不會(huì)立即刪除,而是保存一定時(shí)間后才刪除,默認(rèn)是7天。而RabbitMQ是消費(fèi)完就刪除。
    kafka不刪消息這點(diǎn)我很喜歡,尤其是排查問(wèn)題需要數(shù)據(jù)恢復(fù)時(shí)。
  • kafka消費(fèi)者只支持pull模式,不支持push模式,即消費(fèi)者只能主動(dòng)輪詢kafka獲取消息,默認(rèn)是每500ms拉一次,每次最多拉500條數(shù)據(jù),輪詢的優(yōu)點(diǎn)就是靈活,缺點(diǎn)就是沒(méi)消息時(shí)空耗性能。
    RabbitMQ默認(rèn)只支持push模式,主動(dòng)推送消息給消費(fèi)者,實(shí)時(shí)性更好。
  • kafka通過(guò)topic主題連接生產(chǎn)者和消費(fèi)者,多個(gè)消費(fèi)者連接到同一個(gè)topic,即可消費(fèi)該主題的所有消息,如果有不需要的消息,也只能由消費(fèi)者自行判斷和拋棄處理。
    RabbitMQ則是通過(guò)Exchange接收消息,再通過(guò)指定的規(guī)則轉(zhuǎn)發(fā)到具體的Queue,由消費(fèi)者消費(fèi),可以參考我以前寫(xiě)的文章:https://youbl.blog.csdn.net/article/details/80401945
    RabbitMQ可以通過(guò)配置很多路由,避免消息投遞給不必要的消費(fèi)者,
    不過(guò)RabbitMQ也支持直接通過(guò)Queue接收和投遞消息。
  • 性能上,RabbitMQ是單線程模型,大數(shù)據(jù)上會(huì)有瓶頸;而kafka可以幾乎無(wú)限擴(kuò)展。
  • 有序性,kafka對(duì)于主題的每個(gè)分區(qū),因?yàn)橛星抑荒苡幸粋€(gè)消費(fèi)者,所以能保證消息的有序性,不同分區(qū)則無(wú)法保證;
    而RabbitMQ在多消費(fèi)者時(shí),會(huì)平均分配消息,無(wú)法保證有序,并且在消息消費(fèi)失敗重新投遞時(shí),也會(huì)破壞消息順序。

三、kafka最佳實(shí)踐

1、生產(chǎn)者配置

  • 生產(chǎn)者發(fā)送時(shí),有個(gè)acks配置,說(shuō)明如下:

    • 為0,生產(chǎn)者發(fā)消息后,不等borker響應(yīng)就返回成功,性能最高,丟數(shù)據(jù)概率也最高;
    • 為1,生產(chǎn)者發(fā)消息后,leader節(jié)點(diǎn)返回成功,就算成功;但是leader未同步給其它副本前就掛了,也會(huì)丟數(shù)據(jù);
    • 為all 或 -1,則必須等所有副本都同步成功,才返回成功,保證數(shù)據(jù)不丟失,但性能最低。
      生產(chǎn)環(huán)境,建議配置為-1,另外2個(gè)配置都有丟數(shù)據(jù)的可能性。
  • min.insync.replicas 最小副本數(shù)要求,默認(rèn)值1,建議為2(當(dāng)然要求每個(gè)topic副本數(shù)都在3以上)
    因?yàn)槿绻渲脼?,當(dāng)leader收到數(shù)據(jù),還未同步就故障了,會(huì)丟失數(shù)據(jù)。

  • retries: 重試次數(shù),設(shè)置為較大值,默認(rèn)值為Integer.MAX_VALUE,確保發(fā)送成功。
    注:雖然重試次數(shù)默認(rèn)很大,但是重試還受到另一個(gè)時(shí)間配置的影響:delivery.timeout.ms(默認(rèn)2分鐘),retries還沒(méi)用完,這個(gè)超時(shí)時(shí)間就到了,也會(huì)中斷發(fā)送。
    另外,如果設(shè)置了較大的retries,請(qǐng)使用異步發(fā)消息的方式,避免同步操作導(dǎo)致線程堵塞,影響用戶體驗(yàn),或其它業(yè)務(wù)問(wèn)題。

  • 配置參考:

spring:
  kafka:
    producer:
      bootstrap-servers: 10.1.1.1:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      retries: 10000
    properties:
      delivery.timeout.ms: 2000 # 發(fā)送消息上報(bào)成功或失敗的最大時(shí)間,默認(rèn)120000,兩分鐘
      linger.ms: 0              # 生產(chǎn)者把數(shù)據(jù)組合到一個(gè)批處理進(jìn)行請(qǐng)求的最大延遲時(shí)間,默認(rèn)0
      # 參考 https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient
      request.timeout.ms: 1000  # 批處理就緒后到響應(yīng)的等待時(shí)長(zhǎng),含網(wǎng)絡(luò)+服務(wù)器復(fù)制時(shí)間
      batch.size: 1000

2、消費(fèi)者配置

  • 為了避免消息丟失,消費(fèi)者需要開(kāi)啟手動(dòng)ack,消息業(yè)務(wù)邏輯處理完成再提交偏移量
  • 參考后面的死循環(huán)問(wèn)題,建議使用String反序列化器
  • 根據(jù)業(yè)務(wù)情況,配置合適的批量拉取數(shù)量max-poll-records,默認(rèn)值500
  • 根據(jù)業(yè)務(wù)情況,配置合適的auto-offset-reset值,默認(rèn)值latest
    • latest:消費(fèi)者在消費(fèi)主題的某個(gè)分區(qū)時(shí),如果沒(méi)有之前的消費(fèi)記錄(以前提交的偏移量),則只拉取最新消息,忽略歷史消息。
    • earliest:與latest相反,沒(méi)有之前的消費(fèi)記錄時(shí),從最早的消息開(kāi)始處理。
    • none:沒(méi)有之前的消費(fèi)記錄時(shí),拋出異常。
  • 配置參考:
spring:
  kafka:
    consumer:
      bootstrap-servers: 10.1.1.1:9092
      max-poll-records: 100
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      auto-offset-reset: latest
    listener:
      type: batch
      ack-mode: manual_immediate

3、其它

  • 配置多個(gè) bootstrap server url,避免單節(jié)點(diǎn)故障,導(dǎo)致連接失敗
  • 如果是異步發(fā)消息,不要在kafkaTemplate的成功回調(diào)和失敗回調(diào)方法里有太多業(yè)務(wù)邏輯,回調(diào)方法是單線程處理,里面的業(yè)務(wù)邏輯會(huì)占用delivery.timeout.ms的超時(shí)時(shí)間配置,可能導(dǎo)致后續(xù)消息發(fā)送超時(shí)。
  • 同理,消費(fèi)者也是單線程處理,消費(fèi)邏輯太重的話,可能導(dǎo)致session.timeout.ms超時(shí),從而被認(rèn)為消費(fèi)者離線,導(dǎo)致問(wèn)題

四、kafka工具介紹

1、圖形化工具

推薦 OffsetExplorer,下載地址:https://www.kafkatool.com/download.html
kafka使用詳解、最佳實(shí)踐和問(wèn)題排查

2、命令行工具

kafka安裝包內(nèi)置了很多腳本工具,可以方便的查詢kafka的狀態(tài),這些工具只需要下載就可以使用,無(wú)需安裝。

  • 下載地址: https://kafka.apache.org/downloads
    下載后解壓,在bin目錄下有很多sh文件,這些是在linux上使用的;
    如果在Windows下使用,要用 bin\windows\ 下的那些bat文件。
    下面用windows的bat文件命令舉例(linux改用對(duì)應(yīng)的sh文件執(zhí)行即可)
  • 使用說(shuō)明,請(qǐng)參考官方文檔:https://kafka.apache.org/documentation/

查詢某個(gè)消費(fèi)者組下有哪些消費(fèi)者,以及這些消費(fèi)者對(duì)主題的消費(fèi)狀態(tài):

d:\kafka_2.13-3.4.0\bin\windows\kafka-consumer-groups.bat --describe --group=cb_consumers --bootstrap-server=10.0.0.1:9092
kafka使用詳解、最佳實(shí)踐和問(wèn)題排查
字段說(shuō)明:

  • GROUP 消費(fèi)者分組
  • TOPIC 消費(fèi)的主題
  • PARTITION 消費(fèi)的分區(qū)
  • CURRENT-OFFSET 當(dāng)前消費(fèi)到的消息偏移量
  • LOG-END-OFFSET 當(dāng)前分區(qū)的最大消息偏移量
  • LAG 滯后消息條數(shù)
  • CONSUMER-ID 消費(fèi)者的ID
  • HOST 消費(fèi)者所在的主機(jī)
  • CLIENT-ID 客戶端ID
    注:LAG可以簡(jiǎn)單理解為 LOG-END-OFFSET 減 CURRENT-OFFSET,但是實(shí)際上LAG=HW 減 CURRENT-OFFSET

四、springboot項(xiàng)目使用

1、生產(chǎn)者Demo代碼:

1.1、添加pom依賴:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

1.2、添加application.yml配置:

spring:
  kafka:
    producer:
      bootstrap-servers: 10.1.1.1:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      retries: 2  # 失敗重發(fā)次數(shù)

1.3、Java發(fā)送代碼:

private final KafkaTemplate kafkaTemplate; // 注入的Bean

// 同步發(fā)送消息
String topic = "beinetTest111";
Object result = kafkaTemplate.send(topic, "我是key", objData).get();

2、消費(fèi)者Demo代碼:

2.1、添加pom依賴:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2.2、添加application.yml配置:

spring:
  kafka:
    consumer:
      bootstrap-servers: 10.1.1.1:9092
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer 
    listener:
      type: batch
      ack-mode: manual_immediate

2.3、Java發(fā)送代碼:

@KafkaListener(topics = "${kafka-topic.reports}")
public void consumerCreateTask(List<ConsumerRecord<String, Object>> consumerRecordList, Acknowledgment ack) {
    if (consumerRecordList == null || consumerRecordList.size() <= 0)
        return;

    long start = System.nanoTime();
    ConsumerRecord lastRecord = consumerRecordList.get(0);
    try {
        // 轉(zhuǎn)換dto,并進(jìn)行業(yè)務(wù)邏輯處理

        long elapsedTime = System.nanoTime() - start;
        log.debug("Topic:{} 分區(qū):{} 偏移:{} 條數(shù):{} 耗時(shí):{}ns",
                lastRecord.topic(),
                lastRecord.partition(),
                lastRecord.offset(),
                dtos.size(),
                elapsedTime);
    } catch (Exception exp) {
        long elapsedTime = System.nanoTime() - start;
        log.error("Topic:{} 分區(qū):{} 偏移:{} 耗時(shí):{}ns 出錯(cuò):",
                lastRecord.topic(),
                lastRecord.partition(),
                lastRecord.offset(),
                elapsedTime,
                exp);
    } finally {
        // 不論成敗,都提交,避免出錯(cuò)導(dǎo)致死循環(huán),避免丟消息的邏輯,可以在catch里備份
        ack.acknowledge();
    }
}

五、kafka常見(jiàn)問(wèn)題

1、有多個(gè)消費(fèi)者,但是總會(huì)有一個(gè)消費(fèi)者拿不到消息數(shù)據(jù)

對(duì)一個(gè)消費(fèi)者組而言,一個(gè)主題有幾個(gè)分區(qū),就最多接受幾個(gè)消費(fèi)者;
比如主題有2個(gè)分區(qū),那么每個(gè)分區(qū)只能分配給組里的一個(gè)消費(fèi)者,最多只有2個(gè)消費(fèi)者連接上來(lái),如果組里有3個(gè)消費(fèi)者,那么肯定會(huì)有一個(gè)消費(fèi)者處于空閑狀態(tài),沒(méi)活干。
如果主題有2個(gè)分區(qū),但是組里只有一個(gè)消費(fèi)者,那么2個(gè)分區(qū)的消息,都會(huì)投遞給這一個(gè)消費(fèi)者。

2、主題的分區(qū)分配策略是怎么樣的?

當(dāng)主題存在多個(gè)分區(qū)和多個(gè)消費(fèi)者時(shí),kafka的源碼實(shí)現(xiàn)里有如下幾種分區(qū)分配策略:

  • Range策略(默認(rèn)策略):
    把當(dāng)前消費(fèi)者組消費(fèi)的每個(gè)主題的所有分區(qū),逐個(gè)分配給消費(fèi)者,注意是每個(gè)主題單獨(dú)處理,所以會(huì)出現(xiàn)不均衡的情況。
    例如:a主題有3個(gè)分區(qū)a0/a1/a2,b主題3個(gè)分區(qū)b0/b1/b2,有2個(gè)消費(fèi)者C0/C1,分配過(guò)程大致是:
    第1步分配a主題: a0->C0, a1->C1, a2->C0
    第2步分配b主題: b0->C0, b1->C1, b2->C0
    這樣可以看出,【消費(fèi)者C0要維護(hù)4個(gè)分區(qū)的數(shù)據(jù),而C1只要維護(hù)2個(gè)分區(qū)的數(shù)據(jù)】,出現(xiàn)了明顯的不均衡問(wèn)題。
  • Round-Robin策略:
    把所有的分區(qū),排序后,輪詢方式逐一分配給所有的消費(fèi)者,
    例如:a主題有3個(gè)分區(qū)a0/a1/a2,b主題3個(gè)分區(qū)b0/b1/b2,有2個(gè)消費(fèi)者C0/C1,分配過(guò)程大致是:
    第1步分配a主題: a0->C0, a1->C1, a2->C0
    第2步分配b主題: b0->C1, b1->C0, b2->C1
    注意:第2步不是從頭開(kāi)始,而是接著第1步,繼續(xù)分配,所以排除了Range方案的不均衡問(wèn)題,
    最終的分配結(jié)果是【2個(gè)消費(fèi)者,各自負(fù)責(zé)3個(gè)分區(qū)】。
    但是,如果2個(gè)消費(fèi)者消費(fèi)的主題,只有部分交集,并不完全相同時(shí),還是會(huì)出現(xiàn)不均衡的情況。
    如果希望改用這種策略,目前暫時(shí)沒(méi)有配置方法,要在代碼里修改partition.assignment.strategy屬性,參考代碼:
@Configuration
@RequiredArgsConstructor
public class KafkaConfiguration {
    private final KafkaProperties kafkaProperties;
    private final ConcurrentKafkaListenerContainerFactory<String, Object> kafkaFactory;

    @Bean("myKafkaFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> batchFactory() {
        Map<String, Object> props = kafkaProperties.buildConsumerProperties();
        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");
        kafkaFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
        return kafkaFactory;
    }
}

然后在消費(fèi)者代碼上指定使用這個(gè)工廠Bean:

 @KafkaListener(id = "beinetHandler1", groupId = "beinetGroup", topicPattern = "beinetTest.*",
            containerFactory = "myKafkaFactory")
    public void msgHandler(List<ConsumerRecord> message, Acknowledgment ack) {
  • 在kafka的源碼里還有2種實(shí)現(xiàn),本文暫不深入介紹:
    org.apache.kafka.clients.consumer.CooperativeStickyAssignor
    org.apache.kafka.clients.consumer.StickyAssignor

3、消費(fèi)者加入或退出時(shí),消息還能正常消費(fèi)嗎?

結(jié)論:只要有存活的消費(fèi)者存在,那么所有的消息都能正常消費(fèi)。
當(dāng)有新的消費(fèi)者加入組,或組中有消費(fèi)者下線/退出,都會(huì)觸發(fā)消費(fèi)者重新平衡的動(dòng)作,就是重新為所有的消費(fèi)者分配分區(qū)。
重平衡發(fā)生時(shí),默認(rèn)停止所有消費(fèi)者工作,直到分配結(jié)束。

4、有兄弟說(shuō)已經(jīng)往kafka寫(xiě)入消息了,但是消費(fèi)者那邊沒(méi)有數(shù)據(jù)入庫(kù)

  • 首先,確認(rèn)kafka有消息,用上面的圖形化工具offset explorer,去對(duì)應(yīng)的topic主題查找數(shù)據(jù),發(fā)現(xiàn)確實(shí)有數(shù)據(jù)
  • 再在工具里,查看Consumers下的對(duì)應(yīng)Group,發(fā)現(xiàn)Lag為0,說(shuō)明消息已經(jīng)被正常消費(fèi)了
  • 查看消費(fèi)者的應(yīng)用日志,沒(méi)有消費(fèi)日志產(chǎn)生
  • 再繼續(xù)排查消費(fèi)者的應(yīng)用日志,發(fā)現(xiàn)有如下日志:cb_consumers: partitions assigned: []
    這表示,該消費(fèi)者沒(méi)有分配到分區(qū),不在工作中。
    初步判斷,是不是有人在其它地方啟動(dòng)了消費(fèi)者,把數(shù)據(jù)給消費(fèi)掉了。
  • offset explorer這個(gè)工具,不能展示消費(fèi)者IP信息,只能使用上面的命令kafka-consumer-groups.bat查看消費(fèi)者IP,
    再找運(yùn)維看看這個(gè)IP是誰(shuí)的。
  • 最后定位到是測(cè)試環(huán)境配置錯(cuò)誤,把開(kāi)發(fā)環(huán)境的數(shù)據(jù)消費(fèi)掉了。

5、反序列化失敗,導(dǎo)致消費(fèi)者死循環(huán)問(wèn)題

某天發(fā)布到測(cè)試環(huán)境后,發(fā)現(xiàn)程序啟動(dòng)后,一直拋如下異常,且持續(xù)幾十分鐘也不會(huì)中斷:

<#6d8d6458> j.l.IllegalStateException: No type information in headers and no default type provided
    at o.s.util.Assert.state(Assert.java:76)
    at o.s.k.s.s.JsonDeserializer.deserialize(JsonDeserializer.java:535)
    at o.a.k.c.c.i.Fetcher.parseRecord(Fetcher.java:1387)
    at o.a.k.c.c.i.Fetcher.access$3400(Fetcher.java:133)
    at o.a.k.c.c.i.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1618)
    at o.a.k.c.c.i.Fetcher$CompletedFetch.access$1700(Fetcher.java:1454)
    at o.a.k.c.c.i.Fetcher.fetchRecords(Fetcher.java:687)
    at o.a.k.c.c.i.Fetcher.fetchedRecords(Fetcher.java:638)
    at o.a.k.c.c.KafkaConsumer.pollForFetches(KafkaConsumer.java:1272)
    at o.a.k.c.c.KafkaConsumer.poll(KafkaConsumer.java:1233)
    at o.a.k.c.c.KafkaConsumer.poll(KafkaConsumer.java:1206)
    at j.i.r.GeneratedMethodAccessor109.invoke(Unknown Source)
    at j.i.r.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at j.l.reflect.Method.invoke(Unknown Source)
    at o.s.a.s.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
    at o.s.a.f.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:208)
    at c.s.proxy.$Proxy186.poll(Unknown Source)
    at o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1413)
    at o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1250)
    at o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1162)
    at j.u.c.Executors$RunnableAdapter.call(Unknown Source)
    at j.u.c.FutureTask.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

放google搜索了一下,說(shuō)是反序列化找不到類型信息導(dǎo)致的,并建議不要使用JSON反序列化。
查了一下配置變化記錄,確實(shí)加了一個(gè)kafka的反序列化配置變更:

spring: 
  kafka:
    producer:
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    consumer:
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer

把spring.consumer.value-deserializer 改成:org.apache.kafka.common.serialization.StringDeserializer 就恢復(fù)了。
了解了一下,同事希望在消費(fèi)者的方法參數(shù)上,直接使用對(duì)象,而不是使用String,所以做了這個(gè)修改。
而正好出錯(cuò)的這個(gè)消費(fèi)者,消費(fèi)的是其它項(xiàng)目生產(chǎn)的消息,里面并不包含類型信息。

而這個(gè)異常,是在spring的底層拋出的,業(yè)務(wù)代碼上無(wú)法進(jìn)行try 捕捉,代碼同時(shí)又設(shè)置了手工提交ack,導(dǎo)致代碼進(jìn)入了死循環(huán)。
為了避免這種問(wèn)題,還是建議使用StringDeserializer反序列化,自己在代碼里反序列化比較好。

6、broker單個(gè)故障,導(dǎo)致消費(fèi)者無(wú)法提交偏移量的問(wèn)題

生產(chǎn)環(huán)境,為了性能和故障轉(zhuǎn)移,部署了6個(gè)broker,某天有一個(gè)broker故障下線了,按理應(yīng)該會(huì)自動(dòng)切換,實(shí)際上,所有的消費(fèi)者都開(kāi)始拋異常:“error when storing group assignment during syncgroup”
直到人工恢復(fù)broker并上線,故障才恢復(fù)。
最終排查結(jié)果:

  • 運(yùn)維把kafka的一個(gè)內(nèi)部topic: __consumer_offsets 副本數(shù)配置為2,
  • 同時(shí)配置 min.insync.replicas=2 ,該配置的含義是ISR列表最小同步副本數(shù)不得少于2個(gè)
  • 故障下線的broker,正好包含該topic: __consumer_offsets的一個(gè)副本,導(dǎo)致該主題的副本數(shù)只剩下一個(gè),不符合 min.insync.replicas=2配置要求,從而停止工作
  • topic: __consumer_offsets的作用,是接收并存儲(chǔ)所有消費(fèi)者組的消費(fèi)偏移量,該主題不工作,就會(huì)導(dǎo)致消費(fèi)者無(wú)法提交偏移量,從而導(dǎo)致所有消費(fèi)不正常,會(huì)重復(fù)消費(fèi)數(shù)據(jù)。

知道問(wèn)題了,調(diào)整就是把 topic: __consumer_offsets的副本數(shù)調(diào)整為3(默認(rèn)值就是3,運(yùn)維改錯(cuò)了)

7、所有消費(fèi)者都不消費(fèi)任何消息

如果消費(fèi)者先啟動(dòng),然后才創(chuàng)建topic,會(huì)導(dǎo)致消費(fèi)者消費(fèi)不到數(shù)據(jù),可以重啟消費(fèi)者試試

8、spring中的kafka,是否存在線程安全問(wèn)題

生產(chǎn)者使用的KafkaTemplate是線程安全的,經(jīng)測(cè)試,都是使用同一個(gè)線程進(jìn)行消息發(fā)送。
同樣,消費(fèi)者也是線程安全的,每個(gè)消費(fèi)者也是單線程處理所有接收到的消息。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-495742.html

9、kafka消息堵塞怎么處理

  • 如果消息不重要,可以直接刪除主題重建,該主題下的所有消息自然就沒(méi)了,注意需要重建topic,然后重啟消費(fèi)者;
  • 如果所有消息都需要消費(fèi),基于kafka的分區(qū)特性,每個(gè)分區(qū)只能一個(gè)消費(fèi)者,因此無(wú)法簡(jiǎn)單通過(guò)增加消費(fèi)者來(lái)解決問(wèn)題
    • 確認(rèn)消費(fèi)者有沒(méi)有出現(xiàn)異常,需要注意的是,有些開(kāi)發(fā)人員會(huì)吞掉異常,導(dǎo)致你認(rèn)為消費(fèi)者正常的,可以通過(guò)業(yè)務(wù)數(shù)據(jù)是否持續(xù)增長(zhǎng)來(lái)判別,如果是消費(fèi)者異常了,修復(fù)bug即可。
    • 如果消費(fèi)正常,再確認(rèn)是否突發(fā)消息數(shù)據(jù)增長(zhǎng),簡(jiǎn)單判斷就是主題的LAG是否在持續(xù)按正常速率降低,觀察幾分鐘,如果持續(xù)降低,基于判斷是突發(fā)消息,可以耐心等待。
    • 如果消息不會(huì)正常下降,基本判斷是消費(fèi)速度慢,
      • 先用工具確認(rèn)該主題下每個(gè)分區(qū)的LAG,如果只是某個(gè)分區(qū)的LAG特別高,其它分區(qū)正常(未堵塞),那么應(yīng)該是消息分配不均衡,這個(gè)分區(qū)的消息特別多,考慮調(diào)整生產(chǎn)者的消息key,確保所有分區(qū)的消息數(shù)量均衡;
      • 如果對(duì)消息時(shí)序無(wú)特別要求,可以代碼里通過(guò)線程池異步處理消息,注意要控制線程池?cái)?shù)量,不要導(dǎo)致應(yīng)用oom了;
      • 考慮增加幾個(gè)分區(qū),再增加幾個(gè)消費(fèi)者,這樣新生產(chǎn)的消息會(huì)重新分散到不同分區(qū),降低舊消費(fèi)者的壓力;
      • 堵塞的消息,考慮加個(gè)新消費(fèi)者,消費(fèi)到另外的臨時(shí)主題,臨時(shí)主題多加個(gè)幾倍的分區(qū)和消費(fèi)者,進(jìn)行快速消費(fèi),注意不要對(duì)下游或數(shù)據(jù)庫(kù)造成沖擊,導(dǎo)致其它問(wèn)題。

到了這里,關(guān)于kafka使用詳解、最佳實(shí)踐和問(wèn)題排查的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 深入理解 Kafka 集群管理與最佳實(shí)踐

    深入理解 Kafka 集群管理與最佳實(shí)踐

    構(gòu)建和管理一個(gè)穩(wěn)定、高性能的Kafka集群對(duì)于實(shí)現(xiàn)可靠的消息傳遞至關(guān)重要。本文將深入研究Kafka集群的各個(gè)方面,包括集群搭建、節(jié)點(diǎn)配置、分區(qū)與副本管理、安全性與監(jiān)控,為讀者提供全面的指導(dǎo)和實(shí)例代碼。 1.1 Broker 節(jié)點(diǎn) 在Kafka集群中,Broker節(jié)點(diǎn)是核心組件,負(fù)責(zé)消息的

    2024年02月03日
    瀏覽(26)
  • 深入理解Kafka:架構(gòu)、設(shè)計(jì)原則及最佳實(shí)踐

    Kafka是一款由Apache開(kāi)發(fā)的分布式流處理平臺(tái),它最初是由LinkedIn公司在2010年開(kāi)發(fā)的。從最初的消息隊(duì)列到如今的分布式流處理平臺(tái)Kafka經(jīng)歷了一個(gè)逐步演化的過(guò)程。 Kafka最開(kāi)始的設(shè)計(jì)目的是解決LinkedIn內(nèi)部存在的海量數(shù)據(jù)傳輸問(wèn)題,在其不斷的發(fā)展中Kafka逐漸發(fā)展成為一種可持

    2024年02月07日
    瀏覽(33)
  • 深入解析 Kafka生產(chǎn)者:關(guān)鍵特性與最佳實(shí)踐

    引言 Apache Kafka作為一個(gè)高度可擴(kuò)展且具有高效性的消息中間件,已經(jīng)成為現(xiàn)代大數(shù)據(jù)生態(tài)系統(tǒng)中的核心組件之一。在本文中,我們將專注于Kafka中的一個(gè)重要角色——生產(chǎn)者(Producer),探討其核心功能、工作原理及其關(guān)鍵配置項(xiàng),旨在幫助讀者更好地理解和優(yōu)化Kafka生產(chǎn)者的

    2024年03月17日
    瀏覽(34)
  • 基于Canal+kafka監(jiān)聽(tīng)數(shù)據(jù)庫(kù)變化的最佳實(shí)踐

    基于Canal+kafka監(jiān)聽(tīng)數(shù)據(jù)庫(kù)變化的最佳實(shí)踐

    1、前言 ??????? 工作中,我們很多時(shí)候需要根據(jù)某些狀態(tài)的變化更新另一個(gè)業(yè)務(wù)的邏輯,比如訂單的生成,成交等,需要更新或者通知其他的業(yè)務(wù)。我們通常的操作通過(guò)業(yè)務(wù)埋點(diǎn)、接口的調(diào)用或者中間件完成。 ????????但是狀態(tài)變化的入口比較多的時(shí)候,就很容易漏掉

    2023年04月08日
    瀏覽(37)
  • Kafka 最佳實(shí)踐:構(gòu)建可靠、高性能的分布式消息系統(tǒng)

    Kafka 最佳實(shí)踐:構(gòu)建可靠、高性能的分布式消息系統(tǒng)

    Apache Kafka 是一個(gè)強(qiáng)大的分布式消息系統(tǒng),被廣泛應(yīng)用于實(shí)時(shí)數(shù)據(jù)流處理和事件驅(qū)動(dòng)架構(gòu)。為了充分發(fā)揮 Kafka 的優(yōu)勢(shì),需要遵循一些最佳實(shí)踐,確保系統(tǒng)在高負(fù)載下穩(wěn)定運(yùn)行,數(shù)據(jù)可靠傳遞。本文將深入探討 Kafka 的一些最佳實(shí)踐,并提供豐富的示例代碼,幫助讀者更好地應(yīng)用

    2024年02月03日
    瀏覽(43)
  • Kafka 問(wèn)題排查

    Kafka 問(wèn)題排查

    事情的起因是用戶在 app 上查不到訂單了,而訂單數(shù)據(jù)是從 mysql 的 order_search 表查詢的,order_search 表的數(shù)據(jù)是從 oracle 的 order 表同步過(guò)來(lái)的,查不到說(shuō)明同步有問(wèn)題 首先重啟,同步數(shù)據(jù),問(wèn)題解決,然后查找原因。首先看日志,有如下兩種情況 有的容器消費(fèi)消息的日志正常

    2024年01月22日
    瀏覽(14)
  • Kafka 消息不能正常消費(fèi)問(wèn)題排查

    Kafka 消息不能正常消費(fèi)問(wèn)題排查

    事情的起因是用戶在 app 上查不到訂單了,而訂單數(shù)據(jù)是從 mysql 的 order_search 表查詢的,order_search 表的數(shù)據(jù)是從 oracle 的 order 表同步過(guò)來(lái)的,查不到說(shuō)明同步有問(wèn)題 首先重啟,同步數(shù)據(jù),問(wèn)題解決,然后查找原因。首先看日志,有如下兩種情況 有的容器消費(fèi)消息的日志正常

    2024年01月18日
    瀏覽(20)
  • 線上FullGC問(wèn)題排查實(shí)踐——手把手教你排查線上問(wèn)題

    線上FullGC問(wèn)題排查實(shí)踐——手把手教你排查線上問(wèn)題

    作者:京東科技 韓國(guó)凱 問(wèn)題起因是我們收到了jdos的容器CPU告警,CPU使用率已經(jīng)達(dá)到104% 觀察該機(jī)器日志發(fā)現(xiàn),此時(shí)有很多線程在執(zhí)行跑批任務(wù)。正常來(lái)說(shuō),跑批任務(wù)是低CPU高內(nèi)存型,所以此時(shí)考慮是FullGC引起的大量CPU占用(之前有類似情況,告知用戶后重啟應(yīng)用后解決問(wèn)題

    2024年02月02日
    瀏覽(29)
  • Kafka重復(fù)消費(fèi)、Dubbo重復(fù)調(diào)用問(wèn)題排查

    Kafka重復(fù)消費(fèi)、Dubbo重復(fù)調(diào)用問(wèn)題排查

    ????????本業(yè)務(wù)為車機(jī)流量充值業(yè)務(wù),大致流程為:收到微信、支付寶端用戶支付成功回調(diào)后,將用戶訂單信息發(fā)送至kafka中;消費(fèi)者接收到kafka中信息后進(jìn)行解析,處理用戶訂單信息,為用戶訂購(gòu)相關(guān)流量包(調(diào)用電信相關(guān)接口),訂購(gòu)成功/失敗后會(huì)通過(guò)MQTT發(fā)送訂購(gòu)成功

    2024年03月24日
    瀏覽(18)
  • RabbitMQ系統(tǒng)監(jiān)控、問(wèn)題排查和性能優(yōu)化實(shí)踐

    一、系統(tǒng)監(jiān)控:RabbitMQ的各項(xiàng)性能指標(biāo)及監(jiān)控 Message Rates:消息率包含了publish,deliver/get,ack等方面的數(shù)據(jù),反映了消息在系統(tǒng)中流轉(zhuǎn)的情況。 Queue Length:隊(duì)列長(zhǎng)度反映了系統(tǒng)當(dāng)前的負(fù)載情況。如果隊(duì)列中的消息過(guò)多,可能需要增加消費(fèi)者來(lái)處理消息,或者檢查消費(fèi)者是否出

    2024年04月11日
    瀏覽(25)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包