国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

消息隊(duì)列-kafka-消息發(fā)送流程(源碼跟蹤) 與消息可靠性

這篇具有很好參考價(jià)值的文章主要介紹了消息隊(duì)列-kafka-消息發(fā)送流程(源碼跟蹤) 與消息可靠性。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

官方網(wǎng)址

源碼:https://kafka.apache.org/downloads
快速開始:https://kafka.apache.org/documentation/#gettingStarted
springcloud整合

發(fā)送消息流程

消息隊(duì)列-kafka-消息發(fā)送流程(源碼跟蹤) 與消息可靠性,消息隊(duì)列,kafka,消息隊(duì)列
主線程:主線程只負(fù)責(zé)組織消息,如果是同步發(fā)送會(huì)阻塞,如果是異步發(fā)送需要傳入一個(gè)回調(diào)函數(shù)。
Map集合:存儲(chǔ)了主線程的消息。
Sender線程:真正的發(fā)送其實(shí)是sender去發(fā)送到broker中。

源碼閱讀

1 首先打開Producer.send()可以看到里面的內(nèi)容

// 返回值是一個(gè) Future 參數(shù)為ProducerRecord
Future<RecordMetadata> send(ProducerRecord<K, V> record);
// ProducerRecord定義了這些信息
// 主題
private final String topic;
// 分區(qū)
private final Integer partition;
// header
private final Headers headers;
private final K key;
private final V value;
// 時(shí)間戳
private final Long timestamp;

2 發(fā)送之前的前置處理

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
     // intercept the record, which can be potentially modified; this method does not throw exceptions
     // 這里給開發(fā)者提供了前置處理的勾子
     ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
     // 我們最終發(fā)送的是經(jīng)過處理后的消息 并且如果是異步發(fā)送會(huì)有callback 這個(gè)是用戶定義的
     return doSend(interceptedRecord, callback);
 }

3 進(jìn)入真正的發(fā)送邏輯Future doSend()

  • 由于是網(wǎng)絡(luò)通信,所以我們要序列化,在這個(gè)函數(shù)里面就做了序列化的內(nèi)容。
try {
     serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
 } catch (ClassCastException cce) {
     throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
             " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
             " specified in key.serializer", cce);
 }
 byte[] serializedValue;
 try {
     serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
 } catch (ClassCastException cce) {
     throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
             " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
             " specified in value.serializer", cce);
 }
  • 然后我們獲取分區(qū)
// 然后這里又是一個(gè)策略者模式 也是由用戶可以配置的  DefaultPartitioner UniformStickyPartitioner RoundRobinPartitioner 提供了這樣三個(gè)分區(qū)器
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
   Integer partition = record.partition();
   return partition != null ?
           partition :
           partitioner.partition(
                   record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

4 到了我們的RecordAccumulator,也就是先由主線程發(fā)送到了RecordAccumulator

// 也就是對(duì)圖中的Map集合
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                 serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);

我們發(fā)現(xiàn)里面是用一個(gè)MAP存儲(chǔ)的一個(gè)分區(qū)和ProducerBatch 是講這個(gè)消息寫到內(nèi)存里面MemoryRecordsBuilder 通過這個(gè)進(jìn)行寫入

// 可以看到是一個(gè)鏈表實(shí)現(xiàn)的雙向隊(duì)列,也就是消息會(huì)按append的順序?qū)懙?內(nèi)存記錄中去
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;

5 接著我們看,我們append了以后,會(huì)有一個(gè)判斷去喚醒sender線程,見下面的注釋

// 如果說哦我們當(dāng)前的 這個(gè)batch滿了或者 我們創(chuàng)建了一個(gè)新的batch 這個(gè)時(shí)候喚醒 sender線程去發(fā)送數(shù)據(jù)
if (result.batchIsFull || result.newBatchCreated) {
      log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
      // 喚醒sender 去發(fā)送數(shù)據(jù)
      this.sender.wakeup();
  }
// 實(shí)現(xiàn)了Runnable 所以我們?nèi)タ匆幌翿UN方法的邏輯
public class Sender implements Runnable 

好上來就是一個(gè)循環(huán)

while (running) {
    try {
        runOnce();
    } catch (Exception e) {
        log.error("Uncaught error in kafka producer I/O thread: ", e);
    }
}

接著進(jìn)入runOnece方法,直接看核心邏輯

// 從RecordAccumulator 拿數(shù)據(jù) 然后發(fā)送
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
      addToInflightBatches(batches);
// 中間省去了非核心邏輯
sendProduceRequests(batches, now);

如果繼續(xù)跟蹤的話最終是走到了selector.send()里面:

Send send = request.toSend(destination, header);
 InFlightRequest inFlightRequest = new InFlightRequest(
         clientRequest,
         header,
         isInternalRequest,
         request,
         send,
         now);
 this.inFlightRequests.add(inFlightRequest);
 selector.send(send);

6 接著我們就要看返回邏輯了,可以看到在sendRequest里面sendProduceRequest方法是通過傳入了一個(gè)回調(diào)函數(shù)處理返回的。

RequestCompletionHandler callback = new RequestCompletionHandler() {
          public void onComplete(ClientResponse response) {
              handleProduceResponse(response, recordsByPartition, time.milliseconds());
          }
      };
// 如果有返回
if (response.hasResponse()) {
          ProduceResponse produceResponse = (ProduceResponse) response.responseBody();
          for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {
              TopicPartition tp = entry.getKey();
              ProduceResponse.PartitionResponse partResp = entry.getValue();
              ProducerBatch batch = batches.get(tp);
              completeBatch(batch, partResp, correlationId, now, receivedTimeMs + produceResponse.throttleTimeMs());
          }
          this.sensors.recordLatency(response.destination(), response.requestLatencyMs());
      } 

追蹤到ProducerBatch

if (this.finalState.compareAndSet(null, tryFinalState)) {
        completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception);
        return true;
    }
private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {
       // Set the future before invoking the callbacks as we rely on its state for the `onCompletion` call
       produceFuture.set(baseOffset, logAppendTime, exception);

       // execute callbacks
       for (Thunk thunk : thunks) {
           try {
               if (exception == null) {
                   RecordMetadata metadata = thunk.future.value();
                   if (thunk.callback != null)
                       thunk.callback.onCompletion(metadata, null);
               } else {
                   if (thunk.callback != null)
                       thunk.callback.onCompletion(null, exception);
               }
           } catch (Exception e) {
               log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e);
           }
       }

       produceFuture.done();
   }

Thunk 這個(gè)其實(shí)就是我們?cè)贏ppend的時(shí)候的回調(diào):
消息隊(duì)列-kafka-消息發(fā)送流程(源碼跟蹤) 與消息可靠性,消息隊(duì)列,kafka,消息隊(duì)列
至此整個(gè)流程就完成了,從發(fā)送消息,到響應(yīng)后回調(diào)我們的函數(shù)。

消息可靠性

// 所有消費(fèi)者的配置都在ProducerConfig 里面
public static final String ACKS_CONFIG = "acks";

acks = 0:異步形式,單向發(fā)送,不會(huì)等待 broker 的響應(yīng)
acks = 1:主分區(qū)保存成功,然后就響應(yīng)了客戶端,并不保證所有的副本分區(qū)保存成功
acks = all 或 -1:等待 broker 的響應(yīng),然后 broker 等待副本分區(qū)的響應(yīng),總之?dāng)?shù)據(jù)落地到所有的分區(qū)后,才能給到producer 一個(gè)響應(yīng)

在可靠性的保證下,假設(shè)一些故障:

  • Broker 收到消息后,同步 ISR 異常:只要在 -1 的情況下,其實(shí)不會(huì)造成消息的丟失,因?yàn)橛兄卦嚈C(jī)制
  • Broker 收到消息,并同步 ISR 成功,但是響應(yīng)超時(shí):只要在 -1 的情況下,其實(shí)不會(huì)造成消息的丟失,因?yàn)橛兄卦嚈C(jī)制

可靠性能保證哪些,不能保障哪些?文章來源地址http://www.zghlxwxcb.cn/news/detail-838011.html

  • 保證了消息不會(huì)丟失
  • 不保證消息一定不會(huì)重復(fù)(消息有重復(fù)的概率,需要消費(fèi)者有冪等性控制機(jī)制)

到了這里,關(guān)于消息隊(duì)列-kafka-消息發(fā)送流程(源碼跟蹤) 與消息可靠性的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 四種策略確保 RabbitMQ 消息發(fā)送可靠性!你用哪種?

    四種策略確保 RabbitMQ 消息發(fā)送可靠性!你用哪種?

    如果能確認(rèn)這兩步,那么我們就可以認(rèn)為消息發(fā)送成功了。 如果這兩步中任一步驟出現(xiàn)問題,那么消息就沒有成功送達(dá),此時(shí)我們可能要通過重試等方式去重新發(fā)送消息,多次重試之后,如果消息還是不能到達(dá),則可能就需要人工介入了。 經(jīng)過上面的分析,我們可以確認(rèn),

    2024年04月12日
    瀏覽(21)
  • 防止消息丟失與消息重復(fù)——Kafka可靠性分析及優(yōu)化實(shí)踐

    防止消息丟失與消息重復(fù)——Kafka可靠性分析及優(yōu)化實(shí)踐

    上手第一關(guān),手把手教你安裝kafka與可視化工具kafka-eagle Kafka是什么,以及如何使用SpringBoot對(duì)接Kafka 架構(gòu)必備能力——kafka的選型對(duì)比及應(yīng)用場(chǎng)景 Kafka存取原理與實(shí)現(xiàn)分析,打破面試難關(guān) 在上一章內(nèi)容中,我們解析了Kafka在讀寫層面上的原理,介紹了很多Kafka在讀出與寫入時(shí)的

    2024年02月08日
    瀏覽(26)
  • 【學(xué)習(xí)日記2023.6.19】 之 RabbitMQ服務(wù)異步通信_(tái)消息可靠性_死信交換機(jī)_惰性隊(duì)列_MQ集群

    【學(xué)習(xí)日記2023.6.19】 之 RabbitMQ服務(wù)異步通信_(tái)消息可靠性_死信交換機(jī)_惰性隊(duì)列_MQ集群

    消息隊(duì)列在使用過程中,面臨著很多實(shí)際問題需要思考: 消息從發(fā)送,到消費(fèi)者接收,會(huì)經(jīng)歷多個(gè)過程: 其中的每一步都可能導(dǎo)致消息丟失,常見的丟失原因包括: 發(fā)送時(shí)丟失: 生產(chǎn)者發(fā)送的消息未送達(dá)exchange 消息到達(dá)exchange后未到達(dá)queue MQ宕機(jī),queue將消息丟失 consumer接收

    2024年02月11日
    瀏覽(98)
  • Kafka消息隊(duì)列實(shí)現(xiàn)消息的發(fā)送和接收

    Kafka消息隊(duì)列實(shí)現(xiàn)消息的發(fā)送和接收

    消息在Kafka消息隊(duì)列中發(fā)送和接收過程如下圖所示: 消息生產(chǎn)者Producer產(chǎn)生消息數(shù)據(jù),發(fā)送到Kafka消息隊(duì)列中,一臺(tái)Kafka節(jié)點(diǎn)只有一個(gè)Broker,消息會(huì)存儲(chǔ)在Kafka的Topic(主題中),不同類型的消息數(shù)據(jù)會(huì)存儲(chǔ)在不同的Topic中,可以利用Topic實(shí)現(xiàn)消息的分類,消息消費(fèi)者Consumer會(huì)訂閱

    2024年02月11日
    瀏覽(21)
  • 分布式 - 消息隊(duì)列Kafka:Kafka生產(chǎn)者發(fā)送消息的方式

    分布式 - 消息隊(duì)列Kafka:Kafka生產(chǎn)者發(fā)送消息的方式

    不管是把Kafka作為消息隊(duì)列、消息總線還是數(shù)據(jù)存儲(chǔ)平臺(tái),總是需要一個(gè)可以往Kafka寫入數(shù)據(jù)的生產(chǎn)者、一個(gè)可以從Kafka讀取數(shù)據(jù)的消費(fèi)者,或者一個(gè)兼具兩種角色的應(yīng)用程序。 Kafka 生產(chǎn)者是指使用 Apache Kafka 消息系統(tǒng)的應(yīng)用程序,它們負(fù)責(zé)將消息發(fā)送到 Kafka 集群中的一個(gè)或多

    2024年02月13日
    瀏覽(29)
  • 分布式 - 消息隊(duì)列Kafka:Kafka生產(chǎn)者發(fā)送消息的分區(qū)策略

    分布式 - 消息隊(duì)列Kafka:Kafka生產(chǎn)者發(fā)送消息的分區(qū)策略

    01. Kafka 分區(qū)的作用 分區(qū)的作用就是提供負(fù)載均衡的能力,或者說對(duì)數(shù)據(jù)進(jìn)行分區(qū)的主要原因,就是為了實(shí)現(xiàn)系統(tǒng)的高伸縮性。不同的分區(qū)能夠被放置到不同節(jié)點(diǎn)的機(jī)器上,而數(shù)據(jù)的讀寫操作也都是針對(duì)分區(qū)這個(gè)粒度而進(jìn)行的,這樣每個(gè)節(jié)點(diǎn)的機(jī)器都能獨(dú)立地執(zhí)行各自分區(qū)的

    2024年02月13日
    瀏覽(32)
  • 分布式 - 消息隊(duì)列Kafka:Kafka生產(chǎn)者發(fā)送消息的3種方式

    分布式 - 消息隊(duì)列Kafka:Kafka生產(chǎn)者發(fā)送消息的3種方式

    不管是把Kafka作為消息隊(duì)列、消息總線還是數(shù)據(jù)存儲(chǔ)平臺(tái),總是需要一個(gè)可以往Kafka寫入數(shù)據(jù)的生產(chǎn)者、一個(gè)可以從Kafka讀取數(shù)據(jù)的消費(fèi)者,或者一個(gè)兼具兩種角色的應(yīng)用程序。 Kafka 生產(chǎn)者是指使用 Apache Kafka 消息系統(tǒng)的應(yīng)用程序,它們負(fù)責(zé)將消息發(fā)送到 Kafka 集群中的一個(gè)或多

    2024年02月13日
    瀏覽(28)
  • rabbitmq消息可靠性之消息回調(diào)機(jī)制

    rabbitmq消息可靠性之消息回調(diào)機(jī)制

    rabbitmq消息可靠性之消息回調(diào)機(jī)制 rabbitmq在消息的發(fā)送與接收中,會(huì)經(jīng)過上面的流程,這些流程中每一步都有可能導(dǎo)致消息丟失,或者消費(fèi)失敗甚至直接是服務(wù)器宕機(jī)等,這是我們服務(wù)接受不了的,為了保證消息的可靠性,rabbitmq提供了以下幾種機(jī)制 生產(chǎn)者確認(rèn)機(jī)制 消息持久

    2024年02月08日
    瀏覽(37)
  • RabbitMQ --- 消息可靠性

    RabbitMQ --- 消息可靠性

    消息隊(duì)列在使用過程中,面臨著很多實(shí)際問題需要思考: ?? ? 消息從發(fā)送,到消費(fèi)者接收,會(huì)經(jīng)理多個(gè)過程: 其中的每一步都可能導(dǎo)致消息丟失,常見的丟失原因包括: 發(fā)送時(shí)丟失: 生產(chǎn)者發(fā)送的消息未送達(dá)exchange 消息到達(dá)exchange后未到達(dá)queue MQ宕機(jī),queue將消息丟失 co

    2024年02月14日
    瀏覽(29)
  • Kafka消息發(fā)送流程

    Kafka消息發(fā)送流程

    我們通過創(chuàng)建 KafkaProducer 對(duì)象來發(fā)送消息,KafkaProducer有兩個(gè)線程 Producer主線程:把消息發(fā)送到內(nèi)存緩沖區(qū) Sender線程:把內(nèi)存緩沖區(qū)的消息發(fā)送到 broker Producer主線程 Producer 主線程的的流程如圖所示 拉取元數(shù)據(jù):每個(gè) topic 有多個(gè)分區(qū),需要知道對(duì)應(yīng)的broker地址 序列化器:將

    2023年04月17日
    瀏覽(16)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包