3.1 消息傳遞模型
3.1.1 點對點模型
重要的特性:
-
- 消息通過 隊列來進行交換
-
- 每條消息僅會傳遞給一個消費者
-
- 消息傳遞有先后順序,消息被消費后從隊列刪除(除非使用了消息優(yōu)先級)
-
- 生產(chǎn)者或者消費者可以動態(tài)加入
傳送模型:
-
- 異步即發(fā)即棄:生產(chǎn)者發(fā)送一條消息,不會等待收到一個響應
-
- 異步請求、應答:生產(chǎn)者發(fā)送一條消息,阻塞等待應答隊列,應答隊列等待消費者響應
分類
-
- 單工通信:數(shù)據(jù)智能單向傳輸,有固定發(fā)送者和接收者。如:遙控器
-
- 半雙工通信:數(shù)據(jù)可以雙向傳輸,但不能同時。如:對講機
-
- 全雙工通信:數(shù)據(jù)可以同時雙向傳輸。如:電話
3.1.2 發(fā)布、訂閱模型
示例:
當我們在瀏覽視頻或者博客論壇之類的網(wǎng)站時,遇到感興趣的up主或者博主, 我們通常會選擇去訂閱他們的頻道或者內容。 這樣一來,每當他們發(fā)布一個新的內容, 網(wǎng)站平臺方就會通過某種渠道來通知我們, 我們便可以在第一時間了解到這一訊息, 至于是否選擇第一時間閱讀, 則卻決于我們自己,而這就是一個典型的訂閱發(fā)布模式。
當使用點對點傳輸模型,當我們向了解特定信息,需要定期去信息平臺去查看是否更新了新的內容,這樣就會產(chǎn)生一些不必要的時間開支
重要的特性:
- 消息通過一個稱為主題的虛擬通道進行交換。
- 每條消息都會傳送給稱為訂閱者的多個消息消費者。訂閱者有許多類型,包括持久性、非持久性和動態(tài)性。
- 發(fā)布者通常不會知道、也也意識不到哪一個訂閱者正在接收主題消息。
- 消息被推送給消費者,這意味著消息會傳送給消費者,而無需請求。消息通過一個稱為主題的虛擬通道進行交換。主題就是生產(chǎn)者發(fā)布消息和訂閱者消費消息的目的地。傳送給一個主題的消息被自動推送給所有合格的消費者。
- 生產(chǎn)者和消費者之間沒有耦合。訂閱者和發(fā)布者可以在運行時動態(tài)添加,這使得系統(tǒng)的復雜性可以隨時間的推移而增長。
- 訂閱一個主題的每個客戶端都會接收到發(fā)布該主題的消息副本。發(fā)布者生產(chǎn)的單條消息可以復制并分發(fā)給成百上千的訂閱者。
3.1.3 主題模型
主題模型是一種更加靈活和強大的消息模型,它可以根據(jù)消息的主題進行過濾和匹配。在主題模型中,消息發(fā)送者將消息發(fā)布到一個交換機,而消費者通過創(chuàng)建綁定到該交換機的隊列,并指定自己感興趣的主題(由路由鍵來表示)。交換機根據(jù)消息的路由鍵將消息發(fā)送到匹配的隊列中,只有訂閱了相應主題的消費者才會接收到消息。
主題模型的特點是可以根據(jù)消息的主題進行靈活的消息過濾和匹配。這種模型適用于處理復雜的消息路由需求,可以根據(jù)多個屬性、標簽或關鍵字對消息進行分類和投遞。例如,商品訂閱系統(tǒng)可以根據(jù)用戶的興趣分類推送商品信息。
3.1.4 總結
-
點對點模型:適用于一對一的消息傳遞,具有高可靠性。
-
發(fā)布/訂閱模型:適用于廣播消息給多個消費者,實現(xiàn)消息的廣播。
-
主題模型:適用于根據(jù)消息的主題進行靈活的過濾和匹配,處理復雜的消息路由需求。
3.2 kafka 術語
- topic: 發(fā)布訂閱的對象稱為主題
- producer:生產(chǎn)者,用于向一個或者多個主題發(fā)送消息
- consumer Group:消費者組,訂閱主題并接收主題的消息,可以同時訂閱多個主題
- consumer:每個消費者屬于一個特定的group
- broker: 物理概念,一個kafka集群通常由多個broker組成,broker用來接收和處理客戶端請求,并對消息進行持久化處理。通常多個broker分布于多個機器用以提高系統(tǒng)可用性。
- replication:備份機制
- replica: 副本,通常情況下被拷貝到不同的機架或者不同的機器
-Leader replica: 領導者副本,對外提供服務,直接與客戶端交互。生產(chǎn)者總是向領導者副本寫消息,消費者總是向領導者副本讀消息。 - Follower replica: 跟隨者副本,不對外提供服務,領導副本的追隨者。只向領導者副本發(fā)送消息同步請求,領導者副本將最新消息發(fā)送給追隨者副本之后,兩者保持同步。
- replica: 副本,通常情況下被拷貝到不同的機架或者不同的機器
- partition:分區(qū),一個主題可以劃分多個分區(qū)。生產(chǎn)者生產(chǎn)的每一條消息只會出現(xiàn)在一個分區(qū)中。每個分區(qū)配置多個副本,通常一個領導者副本,多個追隨者副本。
- offset: 又稱consumer offset 消費者位移,每個消費者都有自己的位移,保存在broker內部的topic中
3.3 kafka 系統(tǒng)架構
三層消息架構
- 主題層:每個主題設置M個分區(qū),每個分區(qū)配置N個副本
- 分區(qū)層:每個分區(qū)配置N個副本,只有一個領導者副本對外提供服務
- 消息層:分區(qū)中包含多條消息,每條消息從位移0開始依次遞增
客戶端和服務器端的通信,是基于簡單、高性能且與編程語言無關的TCP協(xié)議
3.4 kafka 生產(chǎn)者
kafka生產(chǎn)流程:
-
首先創(chuàng)建一個ProducerRecord對象,它需要包含目標主題和要發(fā)送的內容,還可以指定鍵或分區(qū)。在發(fā)送ProducerRecord對象時,生產(chǎn)者要先把鍵和值對象序列化成字節(jié)數(shù)組,這樣它們才能夠在網(wǎng)絡上傳輸。(生產(chǎn)者的消息先被放到緩存里,然后使用單獨的線程發(fā)送到服務器)
-
接下來,數(shù)據(jù)被傳給分區(qū)器,如果之前在ProducerRecord對象里指定了分區(qū),那么分區(qū)器就不會再做任何事情,直接把指定的分區(qū)返回。如果沒有指定分區(qū),那么分區(qū)器會根據(jù)ProducerRecord對象的鍵來選擇一個分區(qū)。選好分區(qū)以后,生產(chǎn)者就知道該往哪個主題和分區(qū)發(fā)送這條記錄了。緊接著,這條記錄被添加到一個記錄批次里,這個批次里的所有消息回被發(fā)送到相同的主題和分區(qū)上。有一個獨立的線程負責把這些記錄彼此發(fā)送到相應的broker上。
-
服務器在收到這些消息時會返回一個響應,如果消息成功寫入kafka,就返回一個RecordMetaData對象,它包含了主題和分區(qū)信息,以及記錄在分區(qū)里的偏移量。如果寫入失敗,則返回一個錯誤。生趁著在收到錯誤之后會嘗試重新發(fā)送消息,幾次之后如果還是返回失敗,那么就返回錯誤信息。
3.5 編寫生產(chǎn)者客戶端
3.5.1 引入pom
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
3.5.2 生產(chǎn)者代碼
public static void main(String[] args) {
Properties prop = new Properties();
prop.put("bootstrap.servers","127.0.0.1:9092");
prop.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.put("acks","all");
prop.put("retries",0);
prop.put("batch.size",16384);
prop.put("linger.ms",1);
prop.put("buffer.memory",33554432);
String topic ="test";
String sendValue = "發(fā)送消息";
KafkaProducer<String, String> producer = new KafkaProducer<>(prop);
producer.send(new ProducerRecord<String,String>(topic,Integer.toString(2),sendValue));
producer.close();
}
打開消費者客戶端監(jiān)聽:
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
參數(shù)說明:
- acks
-
如果 acks=0,生產(chǎn)者在成功寫入消息之前不會等待任何來自服務器的響應。也就是說,如果當中出現(xiàn)了問題,導致服務器沒有收到消息,那么生產(chǎn)者就無從得知,消息也就丟失了。不過,因為生產(chǎn)者不需要等待服務器的響應,所以它可以以網(wǎng)絡能夠支持的最大速度發(fā)送消息,從而達到很高的吞吐量。
-
如果 acks=1,只要集群的首領節(jié)點收到消息,生產(chǎn)者就會收到一個來自服務器的成功響應。如果消息無法到達首領節(jié)點(比如首領節(jié)點崩潰,新的首領還沒有被選舉出來),生產(chǎn)者會收到一個錯誤響應,為了避免數(shù)據(jù)丟失,生產(chǎn)者會重發(fā)消息。不過,如果一個沒有收到消息的節(jié)點成為新首領,消息還是會丟失。這個時候的吞吐量取決于使用的是同步發(fā)送還是異步發(fā)送。如果讓發(fā)送客戶端等待服務器的響應(通過調用 Future 對象的 get() 方法),顯然會增加延遲(在網(wǎng)絡上傳輸一個來回的延遲)。如果客戶端使用回調,延遲問題就可以得到緩解,不過吞吐量還是會受發(fā)送中消息數(shù)量的限制(比如,生產(chǎn)者在收到服務器響應之前可以發(fā)送多少個消息)。
-
如果 acks=all,只有當所有參與復制的節(jié)點全部收到消息時,生產(chǎn)者才會收到一個來自服務器的成功響應。這種模式是最安全的,它可以保證不止一個服務器收到消息,就算有服務器發(fā)生崩潰,整個集群仍然可以運行(第 5 章將討論更多的細節(jié))。不過,它的延遲比 acks=1 時更高,因為我們要等待不只一個服務器節(jié)點接收消息。
-
buffer.memory
設置生產(chǎn)者緩存區(qū)大小,生產(chǎn)者用它緩沖要發(fā)送到服務器的消息。如果應用發(fā)送速度過快,會導致生產(chǎn)者空間不足,這時候send() 要么被阻塞、要么拋出異常,取決于max.block.ms,表示拋出異常之前可以阻塞多久。 -
compression.type
默認情況,消息不會被壓縮。該參數(shù)可以設置為 snappy、gzip 或 lz4,它指定了消息被發(fā)送給 broker 之前使用哪一種壓縮算法進行壓縮。snappy 壓縮算法由 Google 發(fā)明,它占用較少的 CPU,卻能提供較好的性能和相當可觀的壓縮比,如果比較關注性能和網(wǎng)絡帶寬,可以使用這種算法。gzip 壓縮算法一般會占用較多的 CPU,但會提供更高的壓縮比,所以如果網(wǎng)絡帶寬比較有限,可以使用這種算法。使用壓縮可以降低網(wǎng)絡傳輸開銷和存儲開銷,而這往往是向 Kafka 發(fā)送消息的瓶頸所在。 -
retries
生產(chǎn)者從服務器收到的錯誤有可能是臨時性的錯誤(比如分區(qū)找不到首領)。在這種情況下,retries 參數(shù)的值決定了生產(chǎn)者可以重發(fā)消息的次數(shù),如果達到這個次數(shù),生產(chǎn)者會放棄重試并返回錯誤。默認情況下,生產(chǎn)者會在每次重試之間等待 100ms,不過可以通過 retry.backoff.ms 參數(shù)來改變這個時間間隔。建議在設置重試次數(shù)和重試時間間隔之前,先測試一下恢復一個崩潰節(jié)點需要多少時間(比如所有分區(qū)選舉出首領需要多長時間),讓總的重試時間比 Kafka 集群從崩潰中恢復的時間長,否則生產(chǎn)者會過早地放棄重試。不過有些錯誤不是臨時性錯誤,沒辦法通過重試來解決(比如“消息太大”錯誤)。一般情況下,因為生產(chǎn)者會自動進行重試,所以就沒必要在代碼邏輯里處理那些可重試的錯誤。你只需要處理那些不可重試的錯誤或重試次數(shù)超出上限的情況。 -
batch.size
當有多個消息需要被發(fā)送到同一個分區(qū)時,生產(chǎn)者會把它們放在同一個批次里。該參數(shù)指定了一個批次可以使用的內存大小,按照字節(jié)數(shù)計算(而不是消息個數(shù))。當批次被填滿,批次里的所有消息會被發(fā)送出去。不過生產(chǎn)者并不一定都會等到批次被填滿才發(fā)送,半滿的批次,甚至只包含一個消息的批次也有可能被發(fā)送。所以就算把批次大小設置得很大,也不會造成延遲,只是會占用更多的內存而已。但如果設置得太小,因為生產(chǎn)者需要更頻繁地發(fā)送消息,會增加一些額外的開銷。 -
linger.ms
該參數(shù)指定了生產(chǎn)者在發(fā)送批次之前等待更多消息加入批次的時間。KafkaProducer 會在批次填滿或 linger.ms 達到上限時把批次發(fā)送出去。默認情況下,只要有可用的線程,生產(chǎn)者就會把消息發(fā)送出去,就算批次里只有一個消息。把 linger.ms 設置成比 0 大的數(shù),讓生產(chǎn)者在發(fā)送批次之前等待一會兒,使更多的消息加入到這個批次。雖然這樣會增加延遲,但也會提升吞吐量(因為一次性發(fā)送更多的消息,每個消息的開銷就變小了) -
client.id
該參數(shù)可以是任意的字符串,服務器會用它來識別消息的來源,還可以用在日志和配額指標里 -
max.in.flight.requests.per.connection
該參數(shù)指定了生產(chǎn)者在收到服務器響應之前可以發(fā)送多少個消息。它的值越高,就會占用越多的內存,不過也會提升吞吐量。把它設為 1 可以保證消息是按照發(fā)送的順序寫入服務器的,即使發(fā)生了重試。 -
imeout.ms、request.timeout.ms 和 metadata.fetch.timeout.ms
request.timeout.ms 指定了生產(chǎn)者在發(fā)送數(shù)據(jù)時等待服務器返回響應的時間,metadata.fetch.timeout.ms 指定了生產(chǎn)者在獲取元數(shù)據(jù)(比如目標分區(qū)的首領是誰)時等待服務器返回響應的時間。如果等待響應超時,那么生產(chǎn)者要么重試發(fā)送數(shù)據(jù),要么返回一個錯誤(拋出異常或執(zhí)行回調)。timeout.ms 指定了 broker 等待同步副本返回消息確認的時間,與 asks 的配置相匹配——如果在指定時間內沒有收到同步副本的確認,那么 broker 就會返回一個錯誤。 -
max.block.ms
該參數(shù)指定了在調用 send() 方法或使用 partitionsFor() 方法獲取元數(shù)據(jù)時生產(chǎn)者的阻塞時間。當生產(chǎn)者的發(fā)送緩沖區(qū)已滿,或者沒有可用的元數(shù)據(jù)時,這些方法就會阻塞。在阻塞時間達到 max.block.ms 時,生產(chǎn)者會拋出超時異常。 -
max.request.size
該參數(shù)用于控制生產(chǎn)者發(fā)送的請求大小。它可以指能發(fā)送的單個消息的最大值,也可以指單個請求里所有消息總的大小。例如,假設這個值為 1MB,那么可以發(fā)送的單個最大消息為 1MB,或者生產(chǎn)者可以在單個請求里發(fā)送一個批次,該批次包含了 1000 個消息,每個消息大小為 1KB。另外,broker 對可接收的消息最大值也有自己的限制(message.max.bytes),所以兩邊的配置最好可以匹配,避免生產(chǎn)者發(fā)送的消息被 broker 拒絕。 -
receive.buffer.bytes 和 send.buffer.bytes
這兩個參數(shù)分別指定了 TCP socket 接收和發(fā)送數(shù)據(jù)包的緩沖區(qū)大小。如果它們被設為 -1,就使用操作系統(tǒng)的默認值。如果生產(chǎn)者或消費者與 broker 處于不同的數(shù)據(jù)中心,那么可以適當增大這些值,因為跨數(shù)據(jù)中心的網(wǎng)絡一般都有比較高的延遲和比較低的帶寬。
3.5.3 消費者代碼
public static void main(String[] args) {
try {
Properties prop = new Properties();
prop.put("bootstrap.servers","127.0.0.1:9092");
prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
prop.put("group.id","con-1"); // 消費者群組
prop.put("auto.offset.reset","latest");
//自動提交偏移量
prop.put("auto.commit.intervals.ms","true");
//自動提交時間
prop.put("auto.commit.interval.ms","1000");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
ArrayList<String> topics = new ArrayList<>();
//可以訂閱多個消息
topics.add("test");
consumer.subscribe(topics);
while(true){
ConsumerRecords<String,String> poll = consumer.poll(Duration.ofSeconds(20));
for(ConsumerRecord<String,String> consumerRecord :poll){
System.out.println(consumerRecord);
}
}
}catch (Exception e) {
System.out.println("error:" + e.getMessage());
}
}
打開生產(chǎn)者客戶端發(fā)送消息:
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
參數(shù)說明:
-
fetch.min.bytes
該屬性指定了消費者從服務器獲取記錄的最小字節(jié)數(shù)。broker 在收到消費者的數(shù)據(jù)請求時,如果可用的數(shù)據(jù)量小于 fetch.min.bytes 指定的大小,那么它會等到有足夠的可用數(shù)據(jù)時才把它返回給消費者。這樣可以降低消費者和 broker 的工作負載,因為它們在主題不是很活躍的時候(或者一天里的低谷時段)就不需要來來回回地處理消息。如果沒有很多可用數(shù)據(jù),但消費者的 CPU 使用率卻很高,那么就需要把該屬性的值設得比默認值大。如果消費者的數(shù)量比較多,把該屬性的值設置得大一點可以降低 broker 的工作負載。 -
fetch.max.wait.ms
我們通過 fetch.min.bytes 告訴 Kafka,等到有足夠的數(shù)據(jù)時才把它返回給消費者。而 feth.max.wait.ms 則用于指定 broker 的等待時間,默認是 500ms。如果沒有足夠的數(shù)據(jù)流入“Kafka,消費者獲取最小數(shù)據(jù)量的要求就得不到滿足,最終導致 500ms 的延遲。如果要降低潛在的延遲(為了滿足 SLA),可以把該參數(shù)值設置得小一些。如果 fetch.max.wait.ms 被設為 100ms,并且 fetch.min.bytes 被設為 1MB,那么 Kafka 在收到消費者的請求后,要么返回 1MB 數(shù)據(jù),要么在 100ms 后返回所有可用的數(shù)據(jù),就看哪個條件先得到滿足。 -
max.partition.fetch.bytes
該屬性指定了服務器從每個分區(qū)里返回給消費者的最大字節(jié)數(shù)。它的默認值是 1MB,也就是說,KafkaConsumer.poll() 方法從每個分區(qū)里返回的記錄最多不超過 max.partition.fetch.bytes 指定的字節(jié)。如果一個主題有 20 個分區(qū)和 5 個消費者,那么每個消費者需要至少 4MB 的可用內存來接收記錄。在為消費者分配內存時,可以給它們多分配一些,因為如果群組里有消費者發(fā)生崩潰,剩下的消費者需要處理更多的分區(qū)。max.partition.fetch.bytes 的值必須比 broker 能夠接收的最大消息的字節(jié)數(shù)(通過 max.message.size 屬性配置)大,否則消費者可能無法讀取這些消息,導致消費者一直掛起重試。在設置該屬性時,另一個需要考慮的因素是消費者處理數(shù)據(jù)的時間。消費者需要頻繁調用 poll() 方法來避免會話過期和發(fā)生分區(qū)再均衡,如果單次調用 poll() 返回的數(shù)據(jù)太多,消費者需要更多的時間來處理,可能無法及時進行下一個輪詢來避免會話過期。如果出現(xiàn)這種情況,可以把 max.partition.fetch.bytes 值改小,或者延長會話過期時間。 -
session.timeout.ms 指定的時間內發(fā)送心跳給群組協(xié)調器,就被認為已經(jīng)死亡,協(xié)調器就會觸發(fā)再均衡,把它的分區(qū)分配給群組里的其他消費者。該屬性與 heartbeat.interval.ms 緊密相關。heartbeat.interval.ms 指定了 poll() 方法向協(xié)調器發(fā)送心跳的頻率,session.timeout.ms 則指定了消費者可以多久不發(fā)送心跳。所以,一般需要同時修改這兩個屬性,heartbeat.interval.ms 必須比 session.timeout.ms 小,一般是 session.timeout.ms 的三分之一。如果 session.timeout.ms 是 3s,那么 heartbeat.interval.ms 應該是 1s。把 session.timeout.ms 值設得比默認值小,可以更快地檢測和恢復崩潰的節(jié)點,不過長時間的輪詢或垃圾收集可能導致非預期的再均衡。把該屬性的值設置得大一些,可以減少意外的再均衡,不過檢測節(jié)點崩潰需要更長的時間。
-
auto.offset.reset
該屬性指定了消費者在讀取一個沒有偏移量的分區(qū)或者偏移量無效的情況下(因消費者長時間失效,包含偏移量的記錄已經(jīng)過時并被刪除)該作何處理。它的默認值是 latest,意思是說,在偏移量無效的情況下,消費者將從最新的記錄開始讀取數(shù)據(jù)(在消費者啟動之后生成的記錄)。另一個值是 earliest,意思是說,在偏移量無效的情況下,消費者將從起始位置讀取分區(qū)的記錄。 -
enable.auto.commit
我們稍后將介紹幾種不同的提交偏移量的方式。該屬性指定了消費者是否自動提交偏移量,默認值是 true。為了盡量避免出現(xiàn)重復數(shù)據(jù)和數(shù)據(jù)丟失,可以把它設為 false,由自己控制何時提交偏移量。如果把它設為 true,還可以通過配置 auto.commit.interval.ms 屬性來控制提交的頻率。 -
partition.assignment.strategy
我們知道,分區(qū)會被分配給群組里的消費者。PartitionAssignor 根據(jù)給定的消費者和主題,決定哪些分區(qū)應該被分配給哪個消費者。Kafka 有兩個默認的分配策略。
8.client.id
該屬性可以是任意字符串,broker 用它來標識從客戶端發(fā)送過來的消息,通常被用在日志、度量指標和配額里。 -
max.poll.records
該屬性用于控制單次調用 call() 方法能夠返回的記錄數(shù)量,可以幫你控制在輪詢里需要處理的數(shù)據(jù)量。文章來源:http://www.zghlxwxcb.cn/news/detail-829169.html -
receive.buffer.bytes 和 send.buffer.bytes
socket 在讀寫數(shù)據(jù)時用到的 TCP 緩沖區(qū)也可以設置大小。如果它們被設為 -1,就使用操作系統(tǒng)的默認值。如果生產(chǎn)者或消費者與 broker 處于不同的數(shù)據(jù)中心內,可以適當增大這些值,因為跨數(shù)據(jù)中心的網(wǎng)絡一般都有比較高的延遲和比較低的帶寬。文章來源地址http://www.zghlxwxcb.cn/news/detail-829169.html
到了這里,關于第3、4章 Kafka 生產(chǎn)者 和 消費者 ——向 Kafka 寫入數(shù)據(jù) 和讀取數(shù)據(jù)的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!