国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

第3、4章 Kafka 生產(chǎn)者 和 消費者 ——向 Kafka 寫入數(shù)據(jù) 和讀取數(shù)據(jù)

這篇具有很好參考價值的文章主要介紹了第3、4章 Kafka 生產(chǎn)者 和 消費者 ——向 Kafka 寫入數(shù)據(jù) 和讀取數(shù)據(jù)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

3.1 消息傳遞模型

3.1.1 點對點模型

重要的特性:

    1. 消息通過 隊列來進行交換
    1. 每條消息僅會傳遞給一個消費者
    1. 消息傳遞有先后順序,消息被消費后從隊列刪除(除非使用了消息優(yōu)先級)
    1. 生產(chǎn)者或者消費者可以動態(tài)加入

傳送模型:

    1. 異步即發(fā)即棄:生產(chǎn)者發(fā)送一條消息,不會等待收到一個響應
    1. 異步請求、應答:生產(chǎn)者發(fā)送一條消息,阻塞等待應答隊列,應答隊列等待消費者響應

分類

    1. 單工通信:數(shù)據(jù)智能單向傳輸,有固定發(fā)送者和接收者。如:遙控器
    1. 半雙工通信:數(shù)據(jù)可以雙向傳輸,但不能同時。如:對講機
    1. 全雙工通信:數(shù)據(jù)可以同時雙向傳輸。如:電話

3.1.2 發(fā)布、訂閱模型

示例:
當我們在瀏覽視頻或者博客論壇之類的網(wǎng)站時,遇到感興趣的up主或者博主, 我們通常會選擇去訂閱他們的頻道或者內容。 這樣一來,每當他們發(fā)布一個新的內容, 網(wǎng)站平臺方就會通過某種渠道來通知我們, 我們便可以在第一時間了解到這一訊息, 至于是否選擇第一時間閱讀, 則卻決于我們自己,而這就是一個典型的訂閱發(fā)布模式。

當使用點對點傳輸模型,當我們向了解特定信息,需要定期去信息平臺去查看是否更新了新的內容,這樣就會產(chǎn)生一些不必要的時間開支

重要的特性:

  1. 消息通過一個稱為主題的虛擬通道進行交換。
  2. 每條消息都會傳送給稱為訂閱者的多個消息消費者。訂閱者有許多類型,包括持久性、非持久性和動態(tài)性。
  3. 發(fā)布者通常不會知道、也也意識不到哪一個訂閱者正在接收主題消息。
  4. 消息被推送給消費者,這意味著消息會傳送給消費者,而無需請求。消息通過一個稱為主題的虛擬通道進行交換。主題就是生產(chǎn)者發(fā)布消息和訂閱者消費消息的目的地。傳送給一個主題的消息被自動推送給所有合格的消費者。
  5. 生產(chǎn)者和消費者之間沒有耦合。訂閱者和發(fā)布者可以在運行時動態(tài)添加,這使得系統(tǒng)的復雜性可以隨時間的推移而增長。
  6. 訂閱一個主題的每個客戶端都會接收到發(fā)布該主題的消息副本。發(fā)布者生產(chǎn)的單條消息可以復制并分發(fā)給成百上千的訂閱者。

3.1.3 主題模型

主題模型是一種更加靈活和強大的消息模型,它可以根據(jù)消息的主題進行過濾和匹配。在主題模型中,消息發(fā)送者將消息發(fā)布到一個交換機,而消費者通過創(chuàng)建綁定到該交換機的隊列,并指定自己感興趣的主題(由路由鍵來表示)。交換機根據(jù)消息的路由鍵將消息發(fā)送到匹配的隊列中,只有訂閱了相應主題的消費者才會接收到消息。

主題模型的特點是可以根據(jù)消息的主題進行靈活的消息過濾和匹配。這種模型適用于處理復雜的消息路由需求,可以根據(jù)多個屬性、標簽或關鍵字對消息進行分類和投遞。例如,商品訂閱系統(tǒng)可以根據(jù)用戶的興趣分類推送商品信息。

3.1.4 總結

  1. 點對點模型:適用于一對一的消息傳遞,具有高可靠性。

  2. 發(fā)布/訂閱模型:適用于廣播消息給多個消費者,實現(xiàn)消息的廣播。

  3. 主題模型:適用于根據(jù)消息的主題進行靈活的過濾和匹配,處理復雜的消息路由需求。

3.2 kafka 術語

第3、4章 Kafka 生產(chǎn)者 和 消費者 ——向 Kafka 寫入數(shù)據(jù) 和讀取數(shù)據(jù),Kafka,kafka,數(shù)據(jù)庫,分布式

  1. topic: 發(fā)布訂閱的對象稱為主題
  2. producer:生產(chǎn)者,用于向一個或者多個主題發(fā)送消息
  3. consumer Group:消費者組,訂閱主題并接收主題的消息,可以同時訂閱多個主題
    • consumer:每個消費者屬于一個特定的group
  4. broker: 物理概念,一個kafka集群通常由多個broker組成,broker用來接收和處理客戶端請求,并對消息進行持久化處理。通常多個broker分布于多個機器用以提高系統(tǒng)可用性。
  5. replication:備份機制
    • replica: 副本,通常情況下被拷貝到不同的機架或者不同的機器
      -Leader replica: 領導者副本,對外提供服務,直接與客戶端交互。生產(chǎn)者總是向領導者副本寫消息,消費者總是向領導者副本讀消息。
    • Follower replica: 跟隨者副本,不對外提供服務,領導副本的追隨者。只向領導者副本發(fā)送消息同步請求,領導者副本將最新消息發(fā)送給追隨者副本之后,兩者保持同步。
  6. partition:分區(qū),一個主題可以劃分多個分區(qū)。生產(chǎn)者生產(chǎn)的每一條消息只會出現(xiàn)在一個分區(qū)中。每個分區(qū)配置多個副本,通常一個領導者副本,多個追隨者副本。
  7. offset: 又稱consumer offset 消費者位移,每個消費者都有自己的位移,保存在broker內部的topic中

3.3 kafka 系統(tǒng)架構

第3、4章 Kafka 生產(chǎn)者 和 消費者 ——向 Kafka 寫入數(shù)據(jù) 和讀取數(shù)據(jù),Kafka,kafka,數(shù)據(jù)庫,分布式
三層消息架構

  • 主題層:每個主題設置M個分區(qū),每個分區(qū)配置N個副本
  • 分區(qū)層:每個分區(qū)配置N個副本,只有一個領導者副本對外提供服務
  • 消息層:分區(qū)中包含多條消息,每條消息從位移0開始依次遞增

客戶端和服務器端的通信,是基于簡單、高性能且與編程語言無關的TCP協(xié)議

3.4 kafka 生產(chǎn)者

第3、4章 Kafka 生產(chǎn)者 和 消費者 ——向 Kafka 寫入數(shù)據(jù) 和讀取數(shù)據(jù),Kafka,kafka,數(shù)據(jù)庫,分布式

kafka生產(chǎn)流程:

  1. 首先創(chuàng)建一個ProducerRecord對象,它需要包含目標主題和要發(fā)送的內容,還可以指定鍵或分區(qū)。在發(fā)送ProducerRecord對象時,生產(chǎn)者要先把鍵和值對象序列化成字節(jié)數(shù)組,這樣它們才能夠在網(wǎng)絡上傳輸。(生產(chǎn)者的消息先被放到緩存里,然后使用單獨的線程發(fā)送到服務器)

  2. 接下來,數(shù)據(jù)被傳給分區(qū)器,如果之前在ProducerRecord對象里指定了分區(qū),那么分區(qū)器就不會再做任何事情,直接把指定的分區(qū)返回。如果沒有指定分區(qū),那么分區(qū)器會根據(jù)ProducerRecord對象的鍵來選擇一個分區(qū)。選好分區(qū)以后,生產(chǎn)者就知道該往哪個主題和分區(qū)發(fā)送這條記錄了。緊接著,這條記錄被添加到一個記錄批次里,這個批次里的所有消息回被發(fā)送到相同的主題和分區(qū)上。有一個獨立的線程負責把這些記錄彼此發(fā)送到相應的broker上。

  3. 服務器在收到這些消息時會返回一個響應,如果消息成功寫入kafka,就返回一個RecordMetaData對象,它包含了主題和分區(qū)信息,以及記錄在分區(qū)里的偏移量。如果寫入失敗,則返回一個錯誤。生趁著在收到錯誤之后會嘗試重新發(fā)送消息,幾次之后如果還是返回失敗,那么就返回錯誤信息。

3.5 編寫生產(chǎn)者客戶端

3.5.1 引入pom

  <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
       <version>2.4.1</version>
   </dependency>

3.5.2 生產(chǎn)者代碼

public static void main(String[] args) {
        Properties prop = new Properties();
        prop.put("bootstrap.servers","127.0.0.1:9092");
        prop.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        prop.put("acks","all");
        prop.put("retries",0);
        prop.put("batch.size",16384);
        prop.put("linger.ms",1);
        prop.put("buffer.memory",33554432);
        String topic ="test";
        String sendValue = "發(fā)送消息";
        KafkaProducer<String, String> producer = new KafkaProducer<>(prop);
        producer.send(new ProducerRecord<String,String>(topic,Integer.toString(2),sendValue));
        producer.close();
    }

打開消費者客戶端監(jiān)聽:

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

參數(shù)說明:

  1. acks
  • 如果 acks=0,生產(chǎn)者在成功寫入消息之前不會等待任何來自服務器的響應。也就是說,如果當中出現(xiàn)了問題,導致服務器沒有收到消息,那么生產(chǎn)者就無從得知,消息也就丟失了。不過,因為生產(chǎn)者不需要等待服務器的響應,所以它可以以網(wǎng)絡能夠支持的最大速度發(fā)送消息,從而達到很高的吞吐量。

  • 如果 acks=1,只要集群的首領節(jié)點收到消息,生產(chǎn)者就會收到一個來自服務器的成功響應。如果消息無法到達首領節(jié)點(比如首領節(jié)點崩潰,新的首領還沒有被選舉出來),生產(chǎn)者會收到一個錯誤響應,為了避免數(shù)據(jù)丟失,生產(chǎn)者會重發(fā)消息。不過,如果一個沒有收到消息的節(jié)點成為新首領,消息還是會丟失。這個時候的吞吐量取決于使用的是同步發(fā)送還是異步發(fā)送。如果讓發(fā)送客戶端等待服務器的響應(通過調用 Future 對象的 get() 方法),顯然會增加延遲(在網(wǎng)絡上傳輸一個來回的延遲)。如果客戶端使用回調,延遲問題就可以得到緩解,不過吞吐量還是會受發(fā)送中消息數(shù)量的限制(比如,生產(chǎn)者在收到服務器響應之前可以發(fā)送多少個消息)。

  • 如果 acks=all,只有當所有參與復制的節(jié)點全部收到消息時,生產(chǎn)者才會收到一個來自服務器的成功響應。這種模式是最安全的,它可以保證不止一個服務器收到消息,就算有服務器發(fā)生崩潰,整個集群仍然可以運行(第 5 章將討論更多的細節(jié))。不過,它的延遲比 acks=1 時更高,因為我們要等待不只一個服務器節(jié)點接收消息。

  1. buffer.memory
    設置生產(chǎn)者緩存區(qū)大小,生產(chǎn)者用它緩沖要發(fā)送到服務器的消息。如果應用發(fā)送速度過快,會導致生產(chǎn)者空間不足,這時候send() 要么被阻塞、要么拋出異常,取決于max.block.ms,表示拋出異常之前可以阻塞多久。

  2. compression.type
    默認情況,消息不會被壓縮。該參數(shù)可以設置為 snappy、gzip 或 lz4,它指定了消息被發(fā)送給 broker 之前使用哪一種壓縮算法進行壓縮。snappy 壓縮算法由 Google 發(fā)明,它占用較少的 CPU,卻能提供較好的性能和相當可觀的壓縮比,如果比較關注性能和網(wǎng)絡帶寬,可以使用這種算法。gzip 壓縮算法一般會占用較多的 CPU,但會提供更高的壓縮比,所以如果網(wǎng)絡帶寬比較有限,可以使用這種算法。使用壓縮可以降低網(wǎng)絡傳輸開銷和存儲開銷,而這往往是向 Kafka 發(fā)送消息的瓶頸所在。

  3. retries
    生產(chǎn)者從服務器收到的錯誤有可能是臨時性的錯誤(比如分區(qū)找不到首領)。在這種情況下,retries 參數(shù)的值決定了生產(chǎn)者可以重發(fā)消息的次數(shù),如果達到這個次數(shù),生產(chǎn)者會放棄重試并返回錯誤。默認情況下,生產(chǎn)者會在每次重試之間等待 100ms,不過可以通過 retry.backoff.ms 參數(shù)來改變這個時間間隔。建議在設置重試次數(shù)和重試時間間隔之前,先測試一下恢復一個崩潰節(jié)點需要多少時間(比如所有分區(qū)選舉出首領需要多長時間),讓總的重試時間比 Kafka 集群從崩潰中恢復的時間長,否則生產(chǎn)者會過早地放棄重試。不過有些錯誤不是臨時性錯誤,沒辦法通過重試來解決(比如“消息太大”錯誤)。一般情況下,因為生產(chǎn)者會自動進行重試,所以就沒必要在代碼邏輯里處理那些可重試的錯誤。你只需要處理那些不可重試的錯誤或重試次數(shù)超出上限的情況。

  4. batch.size
    當有多個消息需要被發(fā)送到同一個分區(qū)時,生產(chǎn)者會把它們放在同一個批次里。該參數(shù)指定了一個批次可以使用的內存大小,按照字節(jié)數(shù)計算(而不是消息個數(shù))。當批次被填滿,批次里的所有消息會被發(fā)送出去。不過生產(chǎn)者并不一定都會等到批次被填滿才發(fā)送,半滿的批次,甚至只包含一個消息的批次也有可能被發(fā)送。所以就算把批次大小設置得很大,也不會造成延遲,只是會占用更多的內存而已。但如果設置得太小,因為生產(chǎn)者需要更頻繁地發(fā)送消息,會增加一些額外的開銷。

  5. linger.ms
    該參數(shù)指定了生產(chǎn)者在發(fā)送批次之前等待更多消息加入批次的時間。KafkaProducer 會在批次填滿或 linger.ms 達到上限時把批次發(fā)送出去。默認情況下,只要有可用的線程,生產(chǎn)者就會把消息發(fā)送出去,就算批次里只有一個消息。把 linger.ms 設置成比 0 大的數(shù),讓生產(chǎn)者在發(fā)送批次之前等待一會兒,使更多的消息加入到這個批次。雖然這樣會增加延遲,但也會提升吞吐量(因為一次性發(fā)送更多的消息,每個消息的開銷就變小了)

  6. client.id
    該參數(shù)可以是任意的字符串,服務器會用它來識別消息的來源,還可以用在日志和配額指標里

  7. max.in.flight.requests.per.connection
    該參數(shù)指定了生產(chǎn)者在收到服務器響應之前可以發(fā)送多少個消息。它的值越高,就會占用越多的內存,不過也會提升吞吐量。把它設為 1 可以保證消息是按照發(fā)送的順序寫入服務器的,即使發(fā)生了重試。

  8. imeout.ms、request.timeout.ms 和 metadata.fetch.timeout.ms
    request.timeout.ms 指定了生產(chǎn)者在發(fā)送數(shù)據(jù)時等待服務器返回響應的時間,metadata.fetch.timeout.ms 指定了生產(chǎn)者在獲取元數(shù)據(jù)(比如目標分區(qū)的首領是誰)時等待服務器返回響應的時間。如果等待響應超時,那么生產(chǎn)者要么重試發(fā)送數(shù)據(jù),要么返回一個錯誤(拋出異常或執(zhí)行回調)。timeout.ms 指定了 broker 等待同步副本返回消息確認的時間,與 asks 的配置相匹配——如果在指定時間內沒有收到同步副本的確認,那么 broker 就會返回一個錯誤。

  9. max.block.ms
    該參數(shù)指定了在調用 send() 方法或使用 partitionsFor() 方法獲取元數(shù)據(jù)時生產(chǎn)者的阻塞時間。當生產(chǎn)者的發(fā)送緩沖區(qū)已滿,或者沒有可用的元數(shù)據(jù)時,這些方法就會阻塞。在阻塞時間達到 max.block.ms 時,生產(chǎn)者會拋出超時異常。

  10. max.request.size
    該參數(shù)用于控制生產(chǎn)者發(fā)送的請求大小。它可以指能發(fā)送的單個消息的最大值,也可以指單個請求里所有消息總的大小。例如,假設這個值為 1MB,那么可以發(fā)送的單個最大消息為 1MB,或者生產(chǎn)者可以在單個請求里發(fā)送一個批次,該批次包含了 1000 個消息,每個消息大小為 1KB。另外,broker 對可接收的消息最大值也有自己的限制(message.max.bytes),所以兩邊的配置最好可以匹配,避免生產(chǎn)者發(fā)送的消息被 broker 拒絕。

  11. receive.buffer.bytes 和 send.buffer.bytes
    這兩個參數(shù)分別指定了 TCP socket 接收和發(fā)送數(shù)據(jù)包的緩沖區(qū)大小。如果它們被設為 -1,就使用操作系統(tǒng)的默認值。如果生產(chǎn)者或消費者與 broker 處于不同的數(shù)據(jù)中心,那么可以適當增大這些值,因為跨數(shù)據(jù)中心的網(wǎng)絡一般都有比較高的延遲和比較低的帶寬。

3.5.3 消費者代碼

public static void main(String[] args) {
        try {
            Properties prop = new Properties();
            prop.put("bootstrap.servers","127.0.0.1:9092");
            prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            prop.put("group.id","con-1"); // 消費者群組
            prop.put("auto.offset.reset","latest");
            //自動提交偏移量
            prop.put("auto.commit.intervals.ms","true");
            //自動提交時間
            prop.put("auto.commit.interval.ms","1000");
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
            ArrayList<String> topics = new ArrayList<>();
            //可以訂閱多個消息
            topics.add("test");
            consumer.subscribe(topics);
            while(true){
                ConsumerRecords<String,String> poll = consumer.poll(Duration.ofSeconds(20));
                for(ConsumerRecord<String,String> consumerRecord :poll){
                    System.out.println(consumerRecord);
                }
            }
        }catch (Exception e) {
            System.out.println("error:" + e.getMessage());
        }
    }

打開生產(chǎn)者客戶端發(fā)送消息:

./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test

參數(shù)說明:

  1. fetch.min.bytes
    該屬性指定了消費者從服務器獲取記錄的最小字節(jié)數(shù)。broker 在收到消費者的數(shù)據(jù)請求時,如果可用的數(shù)據(jù)量小于 fetch.min.bytes 指定的大小,那么它會等到有足夠的可用數(shù)據(jù)時才把它返回給消費者。這樣可以降低消費者和 broker 的工作負載,因為它們在主題不是很活躍的時候(或者一天里的低谷時段)就不需要來來回回地處理消息。如果沒有很多可用數(shù)據(jù),但消費者的 CPU 使用率卻很高,那么就需要把該屬性的值設得比默認值大。如果消費者的數(shù)量比較多,把該屬性的值設置得大一點可以降低 broker 的工作負載。

  2. fetch.max.wait.ms
    我們通過 fetch.min.bytes 告訴 Kafka,等到有足夠的數(shù)據(jù)時才把它返回給消費者。而 feth.max.wait.ms 則用于指定 broker 的等待時間,默認是 500ms。如果沒有足夠的數(shù)據(jù)流入“Kafka,消費者獲取最小數(shù)據(jù)量的要求就得不到滿足,最終導致 500ms 的延遲。如果要降低潛在的延遲(為了滿足 SLA),可以把該參數(shù)值設置得小一些。如果 fetch.max.wait.ms 被設為 100ms,并且 fetch.min.bytes 被設為 1MB,那么 Kafka 在收到消費者的請求后,要么返回 1MB 數(shù)據(jù),要么在 100ms 后返回所有可用的數(shù)據(jù),就看哪個條件先得到滿足。

  3. max.partition.fetch.bytes
    該屬性指定了服務器從每個分區(qū)里返回給消費者的最大字節(jié)數(shù)。它的默認值是 1MB,也就是說,KafkaConsumer.poll() 方法從每個分區(qū)里返回的記錄最多不超過 max.partition.fetch.bytes 指定的字節(jié)。如果一個主題有 20 個分區(qū)和 5 個消費者,那么每個消費者需要至少 4MB 的可用內存來接收記錄。在為消費者分配內存時,可以給它們多分配一些,因為如果群組里有消費者發(fā)生崩潰,剩下的消費者需要處理更多的分區(qū)。max.partition.fetch.bytes 的值必須比 broker 能夠接收的最大消息的字節(jié)數(shù)(通過 max.message.size 屬性配置)大,否則消費者可能無法讀取這些消息,導致消費者一直掛起重試。在設置該屬性時,另一個需要考慮的因素是消費者處理數(shù)據(jù)的時間。消費者需要頻繁調用 poll() 方法來避免會話過期和發(fā)生分區(qū)再均衡,如果單次調用 poll() 返回的數(shù)據(jù)太多,消費者需要更多的時間來處理,可能無法及時進行下一個輪詢來避免會話過期。如果出現(xiàn)這種情況,可以把 max.partition.fetch.bytes 值改小,或者延長會話過期時間。

  4. session.timeout.ms 指定的時間內發(fā)送心跳給群組協(xié)調器,就被認為已經(jīng)死亡,協(xié)調器就會觸發(fā)再均衡,把它的分區(qū)分配給群組里的其他消費者。該屬性與 heartbeat.interval.ms 緊密相關。heartbeat.interval.ms 指定了 poll() 方法向協(xié)調器發(fā)送心跳的頻率,session.timeout.ms 則指定了消費者可以多久不發(fā)送心跳。所以,一般需要同時修改這兩個屬性,heartbeat.interval.ms 必須比 session.timeout.ms 小,一般是 session.timeout.ms 的三分之一。如果 session.timeout.ms 是 3s,那么 heartbeat.interval.ms 應該是 1s。把 session.timeout.ms 值設得比默認值小,可以更快地檢測和恢復崩潰的節(jié)點,不過長時間的輪詢或垃圾收集可能導致非預期的再均衡。把該屬性的值設置得大一些,可以減少意外的再均衡,不過檢測節(jié)點崩潰需要更長的時間。

  5. auto.offset.reset
    該屬性指定了消費者在讀取一個沒有偏移量的分區(qū)或者偏移量無效的情況下(因消費者長時間失效,包含偏移量的記錄已經(jīng)過時并被刪除)該作何處理。它的默認值是 latest,意思是說,在偏移量無效的情況下,消費者將從最新的記錄開始讀取數(shù)據(jù)(在消費者啟動之后生成的記錄)。另一個值是 earliest,意思是說,在偏移量無效的情況下,消費者將從起始位置讀取分區(qū)的記錄。

  6. enable.auto.commit
    我們稍后將介紹幾種不同的提交偏移量的方式。該屬性指定了消費者是否自動提交偏移量,默認值是 true。為了盡量避免出現(xiàn)重復數(shù)據(jù)和數(shù)據(jù)丟失,可以把它設為 false,由自己控制何時提交偏移量。如果把它設為 true,還可以通過配置 auto.commit.interval.ms 屬性來控制提交的頻率。

  7. partition.assignment.strategy
    我們知道,分區(qū)會被分配給群組里的消費者。PartitionAssignor 根據(jù)給定的消費者和主題,決定哪些分區(qū)應該被分配給哪個消費者。Kafka 有兩個默認的分配策略。
    8.client.id
    該屬性可以是任意字符串,broker 用它來標識從客戶端發(fā)送過來的消息,通常被用在日志、度量指標和配額里。

  8. max.poll.records
    該屬性用于控制單次調用 call() 方法能夠返回的記錄數(shù)量,可以幫你控制在輪詢里需要處理的數(shù)據(jù)量。

  9. receive.buffer.bytes 和 send.buffer.bytes
    socket 在讀寫數(shù)據(jù)時用到的 TCP 緩沖區(qū)也可以設置大小。如果它們被設為 -1,就使用操作系統(tǒng)的默認值。如果生產(chǎn)者或消費者與 broker 處于不同的數(shù)據(jù)中心內,可以適當增大這些值,因為跨數(shù)據(jù)中心的網(wǎng)絡一般都有比較高的延遲和比較低的帶寬。文章來源地址http://www.zghlxwxcb.cn/news/detail-829169.html

到了這里,關于第3、4章 Kafka 生產(chǎn)者 和 消費者 ——向 Kafka 寫入數(shù)據(jù) 和讀取數(shù)據(jù)的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • Java輕松使用Kafka生產(chǎn)者,消費者

    Java輕松使用Kafka生產(chǎn)者,消費者 一、環(huán)境說明 項目中需要下面的依賴: ( 版本自定義 ) 2. yml配置文件設置 1. 簡單生產(chǎn)者的書寫: 1. 簡單消費者的書寫: ? 注:多消費者時,需要對應kafka中配置的分區(qū);多少的Partition就有多少個消費者,以免資源浪費

    2024年02月15日
    瀏覽(29)
  • Kafka生產(chǎn)者與消費者api示例

    Kafka生產(chǎn)者與消費者api示例

    ? 一個正常的生產(chǎn)邏輯需要具備以下幾個步驟 配置生產(chǎn)者參數(shù)及創(chuàng)建相應的生產(chǎn)者實例 構建待發(fā)送的消息 發(fā)送消息 關閉生產(chǎn)者實例 采用默認分區(qū)方式將消息散列的發(fā)送到各個分區(qū)當中 ? ?對于properties配置的第二種寫法,相對來說不會出錯,簡單舉例: ? 1.kafka的生產(chǎn)者可

    2024年02月07日
    瀏覽(24)
  • 筆記:配置多個kafka生產(chǎn)者和消費者

    如果只有一個kafka,那么使用自帶的KafkaAutoConfiguration配置類即可,對應已有屬性類KafkaProperties,屬性前綴為spring.kafka.xxx; 本文記錄配置多個kafka的情況,即在KafkaAutoConfiguration的基礎上,自定義額外的kafka生產(chǎn)者和消費者。 適用場景:需要消費來源于不同kafka的消息、需要在不

    2024年02月15日
    瀏覽(32)
  • kafka生產(chǎn)者和消費者(python版)

    生產(chǎn)者 消費者 消費者中的組名主要用戶針對主題的偏移量進行更改,也涉及到主題中分區(qū)的問題, kafka工具類 此工具類基本上拿過去就可以用 疑問 當消費者鏈接kafka時發(fā)現(xiàn)topic沒有未讀的消息怎樣退出呢,默認是在一直等待,但是我期望沒有要讀的消息的時候直接退出即可

    2024年02月16日
    瀏覽(21)
  • Kafka官方生產(chǎn)者和消費者腳本簡單使用

    怎樣使用Kafka官方生產(chǎn)者和消費者腳本進行消費生產(chǎn)和消費?這里假設已經(jīng)下載了kafka官方文件,并已經(jīng)解壓. 這就可以見到測試kafka對應topic了.

    2024年02月04日
    瀏覽(23)
  • Kafka:主題創(chuàng)建、分區(qū)修改查看、生產(chǎn)者、消費者

    Kafka:主題創(chuàng)建、分區(qū)修改查看、生產(chǎn)者、消費者

    1.創(chuàng)建主題 2.查看所有主題 3.查看詳細主題 序號從0開始計算 Partition:分區(qū)數(shù),該主題有3個分區(qū) Replica:副本數(shù),該主題有3個副本 Leader:副本數(shù)中的主的序號,生產(chǎn)消費的對象 1.修改分區(qū)數(shù) 修改的分區(qū)數(shù)量不可以小于或者等于當前主題分區(qū)的數(shù)量,否則會報錯 在根目錄kaf

    2024年02月11日
    瀏覽(32)
  • Kafka系列:查看Topic列表、消息消費情況、模擬生產(chǎn)者消費者

    Kafka系列:查看Topic列表、消息消費情況、模擬生產(chǎn)者消費者

    執(zhí)行topic刪除命令時,出現(xiàn)提示 這條命令其實并不執(zhí)行刪除動作,僅僅是在zookeeper上標記該topic要被刪除而已,同時也提醒用戶一定要提前打開delete.topic.enable開關,否則刪除動作是不會執(zhí)行的。 解決辦法: a)在server.properties中設置delete.topic.enable參數(shù)為ture b)如下操作: 1.登

    2023年04月26日
    瀏覽(29)
  • 探究:kafka生產(chǎn)者/消費者與多線程安全

    探究:kafka生產(chǎn)者/消費者與多線程安全

    目錄 1. 多線程安全 1.1. 生產(chǎn)者是多線程安全的么? 1.1. 消費者是多線程安全的么? 2. 消費者規(guī)避多線程安全方案 2.1. 每個線程維護一個kafkaConsumer 2.2. [單/多]kafkaConsumer實例 + 多worker線程 2.3.方案優(yōu)缺點對比 ????????Kafka生產(chǎn)者是 線程安全 的,可以在多個線程中共享一個

    2023年04月26日
    瀏覽(24)
  • Linux安裝Kafka,創(chuàng)建topic、生產(chǎn)者、消費者

    Linux安裝Kafka,創(chuàng)建topic、生產(chǎn)者、消費者

    1.創(chuàng)建安裝目錄/usr/local/kafka mkdir /usr/local/kafka 2.進入安裝包目錄 cd?/usr/local/kafka? 3.下載安裝包 wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz 4.解壓安裝包 tar -zxvf kafka_2.12-3.3.1.tgz 5.進入cd kafka_2.12-3.3.1目錄 cd kafka_2.12-3.3.1/ 6.修改zookeeper配置 cat ./config/zookeeper.properties | grep

    2023年04月17日
    瀏覽(29)
  • Kafka 架構深度解析:生產(chǎn)者(Producer)和消費者(Consumer)

    Kafka 架構深度解析:生產(chǎn)者(Producer)和消費者(Consumer)

    Apache Kafka 作為分布式流處理平臺,其架構中的生產(chǎn)者和消費者是核心組件,負責實現(xiàn)高效的消息生產(chǎn)和消費。本文將深入剖析 Kafka 架構中生產(chǎn)者和消費者的工作原理、核心概念以及高級功能。 1 發(fā)送消息到 Kafka Kafka 生產(chǎn)者負責將消息發(fā)布到指定的主題。以下是一個簡單的生

    2024年02月03日
    瀏覽(50)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包