国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

【注意】Kafka生產(chǎn)者異步發(fā)送消息仍有可能阻塞

這篇具有很好參考價值的文章主要介紹了【注意】Kafka生產(chǎn)者異步發(fā)送消息仍有可能阻塞。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

問題描述

Kafka是常用的消息中間件。在Spring Boot項目中,使用KafkaTemplate作為生產(chǎn)者發(fā)送消息。有時,為了不影響主業(yè)務流程,會采用異步發(fā)送的方式,如下所示。

@Slf4j
@Component
public class KafkaSender {

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendAsync(String topic, String message) {
        kafkaTemplate.send(topic, message)
                .addCallback(
                        sendResult -> log.info("Send success"),
                        e -> log.error("Send failed", e));
    }
}

本以為采用異步發(fā)送,必然不會影響到主業(yè)務流程。但實際使用時發(fā)現(xiàn),在第一次發(fā)送消息時,如果Kafka Broker連接失敗,調(diào)用sendAsync()方法的主線程會長時間阻塞。這點是出乎意料的。

原因分析

跟蹤源碼可知,Kafka生產(chǎn)者在第一次發(fā)送消息時,會嘗試從Broker獲取元數(shù)據(jù)Metadata(見KafkaProducerwaitOnMetadata()方法),如果Broker連接失敗,則會一直阻塞于此,循環(huán)嘗試獲取,直至超時(超時時間由max.block.ms定義)。

    /**
     * Wait for cluster metadata including partitions for the given topic to be available.
     * @param topic The topic we want metadata for
     * @param partition A specific partition expected to exist in metadata, or null if there's no preference
     * @param nowMs The current time in ms
     * @param maxWaitMs The maximum time in ms for waiting on the metadata
     * @return The cluster containing topic metadata and the amount of time we waited in ms
     * @throws TimeoutException if metadata could not be refreshed within {@code max.block.ms}
     * @throws KafkaException for all Kafka-related exceptions, including the case where this method is called after producer close
     */
    private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long nowMs, long maxWaitMs) throws InterruptedException {
        // add topic to metadata topic list if it is not there already and reset expiry
        Cluster cluster = metadata.fetch();

        if (cluster.invalidTopics().contains(topic))
            throw new InvalidTopicException(topic);

        metadata.add(topic, nowMs);

        Integer partitionsCount = cluster.partitionCountForTopic(topic);
        // Return cached metadata if we have it, and if the record's partition is either undefined
        // or within the known partition range
        if (partitionsCount != null && (partition == null || partition < partitionsCount))
            return new ClusterAndWaitTime(cluster, 0);

        long remainingWaitMs = maxWaitMs;
        long elapsed = 0;
        // Issue metadata requests until we have metadata for the topic and the requested partition,
        // or until maxWaitTimeMs is exceeded. This is necessary in case the metadata
        // is stale and the number of partitions for this topic has increased in the meantime.
        do {
            if (partition != null) {
                log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic);
            } else {
                log.trace("Requesting metadata update for topic {}.", topic);
            }
            metadata.add(topic, nowMs + elapsed);
            int version = metadata.requestUpdateForTopic(topic);
            sender.wakeup();
            try {
                metadata.awaitUpdate(version, remainingWaitMs);
            } catch (TimeoutException ex) {
                // Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
                throw new TimeoutException(
                        String.format("Topic %s not present in metadata after %d ms.",
                                topic, maxWaitMs));
            }
            cluster = metadata.fetch();
            elapsed = time.milliseconds() - nowMs;
            if (elapsed >= maxWaitMs) {
                throw new TimeoutException(partitionsCount == null ?
                        String.format("Topic %s not present in metadata after %d ms.",
                                topic, maxWaitMs) :
                        String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",
                                partition, topic, partitionsCount, maxWaitMs));
            }
            metadata.maybeThrowExceptionForTopic(topic);
            remainingWaitMs = maxWaitMs - elapsed;
            partitionsCount = cluster.partitionCountForTopic(topic);
        } while (partitionsCount == null || (partition != null && partition >= partitionsCount));

        return new ClusterAndWaitTime(cluster, elapsed);
    }

也就是說,Kafka生產(chǎn)者在發(fā)送消息前,要先獲取到Metadata。對于異步發(fā)送,雖然消息發(fā)送的過程是非阻塞的,但獲取Metadata的過程是阻塞的。如果因為Broker連接失敗、Topic未創(chuàng)建等原因而一直獲取不到Metadata,主線程將長時間阻塞。

解決辦法

解決辦法也很簡單。如果Kafka發(fā)送消息并非關鍵業(yè)務,為了不影響主業(yè)務流程的進行,可以創(chuàng)建線程池來專門執(zhí)行消息發(fā)送工作,保證sendAsync()方法一定是異步執(zhí)行的。注意,線程池大小和工作隊列長度需要合理限定,避免因阻塞任務過多而OOM;拒絕策略可以視情況選擇DiscardPolicy。

另外,還可以考慮指定max.block.ms,來限制獲取Metadata的最大阻塞時間(默認60000ms):

spring:
  kafka:
    producer:
      properties:
        max.block.ms: 1000

實際上,在異步發(fā)送消息的過程中,除了因為獲取不到Metadata而阻塞外,還可能因為消息緩沖池已滿而阻塞(參考:Kafka Producer 異步發(fā)送消息居然也會阻塞?)。這2種阻塞的超時時間均由max.block.ms定義。

總結(jié)

Kafka生產(chǎn)者異步發(fā)送消息的方法(如Spring Boot中的kafkaTemplate.send()),看似異步,實則可能阻塞。由于發(fā)送消息前需要獲取元數(shù)據(jù)Metadata,如果一直獲取失?。赡茉虬˙roker連接失敗、Topic未創(chuàng)建等),將導致長時間阻塞。這點與我們的一般理解不符,需要特別注意。文章來源地址http://www.zghlxwxcb.cn/news/detail-412084.html

到了這里,關于【注意】Kafka生產(chǎn)者異步發(fā)送消息仍有可能阻塞的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權(quán),不承擔相關法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • kafka入門(五):kafka生產(chǎn)者發(fā)送消息

    構(gòu)建消息,即創(chuàng)建 ProduceRecord 對象。 (1) kafka發(fā)送消息,最常見的構(gòu)造方法是: topic 表示主題, value 表示值。 (2) kafka發(fā)送消息指定key,ProducerRecord 的 key ,既可以作為消息的唯一id,也可以用來決定消息該被寫到主題的哪個分區(qū)。擁有相同key 的消息,將被寫到同一個分區(qū)。

    2024年01月17日
    瀏覽(41)
  • 多圖詳解 kafka 生產(chǎn)者消息發(fā)送過程

    多圖詳解 kafka 生產(chǎn)者消息發(fā)送過程

    生產(chǎn)者客戶端代碼 KafkaProducer 通過解析 producer.propeties 文件里面的屬性來構(gòu)造自己。例如 :分區(qū)器、Key 和 Value 序列化器、攔截器、 RecordAccumulator消息累加器 、 元信息更新器 、啟動發(fā)送請求的后臺線程 生產(chǎn)者元信息更新器 我們之前有講過. 客戶端都會保存集群的元信息,例如

    2023年04月09日
    瀏覽(31)
  • 分布式 - 消息隊列Kafka:Kafka生產(chǎn)者發(fā)送消息的方式

    分布式 - 消息隊列Kafka:Kafka生產(chǎn)者發(fā)送消息的方式

    不管是把Kafka作為消息隊列、消息總線還是數(shù)據(jù)存儲平臺,總是需要一個可以往Kafka寫入數(shù)據(jù)的生產(chǎn)者、一個可以從Kafka讀取數(shù)據(jù)的消費者,或者一個兼具兩種角色的應用程序。 Kafka 生產(chǎn)者是指使用 Apache Kafka 消息系統(tǒng)的應用程序,它們負責將消息發(fā)送到 Kafka 集群中的一個或多

    2024年02月13日
    瀏覽(29)
  • 分布式 - 消息隊列Kafka:Kafka生產(chǎn)者發(fā)送消息的分區(qū)策略

    分布式 - 消息隊列Kafka:Kafka生產(chǎn)者發(fā)送消息的分區(qū)策略

    01. Kafka 分區(qū)的作用 分區(qū)的作用就是提供負載均衡的能力,或者說對數(shù)據(jù)進行分區(qū)的主要原因,就是為了實現(xiàn)系統(tǒng)的高伸縮性。不同的分區(qū)能夠被放置到不同節(jié)點的機器上,而數(shù)據(jù)的讀寫操作也都是針對分區(qū)這個粒度而進行的,這樣每個節(jié)點的機器都能獨立地執(zhí)行各自分區(qū)的

    2024年02月13日
    瀏覽(32)
  • 分布式 - 消息隊列Kafka:Kafka生產(chǎn)者發(fā)送消息的3種方式

    分布式 - 消息隊列Kafka:Kafka生產(chǎn)者發(fā)送消息的3種方式

    不管是把Kafka作為消息隊列、消息總線還是數(shù)據(jù)存儲平臺,總是需要一個可以往Kafka寫入數(shù)據(jù)的生產(chǎn)者、一個可以從Kafka讀取數(shù)據(jù)的消費者,或者一個兼具兩種角色的應用程序。 Kafka 生產(chǎn)者是指使用 Apache Kafka 消息系統(tǒng)的應用程序,它們負責將消息發(fā)送到 Kafka 集群中的一個或多

    2024年02月13日
    瀏覽(28)
  • Kafka 入門到起飛系列 - 生產(chǎn)者發(fā)送消息流程解析

    Kafka 入門到起飛系列 - 生產(chǎn)者發(fā)送消息流程解析

    生產(chǎn)者通過 producerRecord 對象封裝消息主題、消息的value(內(nèi)容)、timestamp(時間戳)等 生產(chǎn)者通過 send() 方法發(fā)送消息,send()方法會經(jīng)過如下幾步 1. 首先將消息交給 攔截器(Interceptor) 處理, 攔截器對生產(chǎn)者而言,對所有消息都是生效的,攔截器也支持鏈式編程(責任器鏈)的

    2024年02月16日
    瀏覽(24)
  • kafka服務端允許生產(chǎn)者發(fā)送最大消息體大小

    ????????server.properties中加上的message.max.bytes配置,我目前設置為5242880,即5MB,可以根據(jù)實際情況增大。 ????????在生產(chǎn)者端配置max.request.size,這是單個消息最大字節(jié)數(shù),根據(jù)實際調(diào)整,max.request.size 必須小于 message.max.bytes 以及消費者的 max.partition.fetch.bytes。這樣消息

    2024年02月15日
    瀏覽(24)
  • 07、Kafka ------ 消息生產(chǎn)者(演示 發(fā)送消息) 和 消息消費者(演示 監(jiān)聽消息)

    07、Kafka ------ 消息生產(chǎn)者(演示 發(fā)送消息) 和 消息消費者(演示 監(jiān)聽消息)

    簡單來說,就是一個數(shù)據(jù)項。 ▲ 消息就是 Kafka 所記錄的數(shù)據(jù)節(jié)點,消息在 Kafka 中又被稱為記錄(record)或事件(event)。 從存儲上來看,消息就是存儲在分區(qū)文件(有點類似于List)中的一個數(shù)據(jù)項,消息具有 key、value、時間戳 和 可選的元數(shù)據(jù)頭。 ▲ 下面是一個示例事件

    2024年01月20日
    瀏覽(46)
  • Kafka中的生產(chǎn)者如何處理消息發(fā)送失敗的情況?

    在Kafka中,生產(chǎn)者可以通過以下方式處理消息發(fā)送失敗的情況: 同步發(fā)送模式(Sync Mode):在同步發(fā)送模式下,生產(chǎn)者發(fā)送消息后會阻塞等待服務器的響應。如果發(fā)送失敗,生產(chǎn)者會拋出異常(例如 ProducerRecord 發(fā)送異常)或返回錯誤信息。開發(fā)者可以捕獲異常并根據(jù)需要進行

    2024年02月06日
    瀏覽(23)
  • kafka生產(chǎn)者發(fā)送消息報錯 Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected

    kafka生產(chǎn)者發(fā)送消息報錯 Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected

    報這個錯誤是因為kafka里的配置要修改下 在config目錄下 server.properties配置文件 這下發(fā)送消息就不會一直等待,就可以發(fā)送成功了

    2024年02月06日
    瀏覽(23)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包