国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

分布式 - 消息隊列Kafka:Kafka生產(chǎn)者架構(gòu)和配置參數(shù)

這篇具有很好參考價值的文章主要介紹了分布式 - 消息隊列Kafka:Kafka生產(chǎn)者架構(gòu)和配置參數(shù)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

1. kafka 生產(chǎn)者發(fā)送消息整體架構(gòu)

生產(chǎn)者發(fā)送消息流程參考圖1:

分布式 - 消息隊列Kafka:Kafka生產(chǎn)者架構(gòu)和配置參數(shù),【分布式-消息隊列Kafka】,kafka

先從創(chuàng)建一個ProducerRecord對象開始,其中需要包含目標主題和要發(fā)送的內(nèi)容。另外,還可以指定鍵、分區(qū)、時間戳或標頭。在發(fā)送ProducerRecord對象時,生產(chǎn)者需要先把鍵和值對象序列化成字節(jié)數(shù)組,這樣才能在網(wǎng)絡上傳輸。

接下來,如果沒有顯式地指定分區(qū),那么數(shù)據(jù)將被傳給分區(qū)器。分區(qū)器通常會基于ProducerRecord對象的鍵選擇一個分區(qū)。選好分區(qū)以后,生產(chǎn)者就知道該往哪個主題和分區(qū)發(fā)送這條消息了。緊接著,該消息會被添加到一個消息批次里,這個批次里的所有消息都將被發(fā)送給同一個主題和分區(qū)。有一個獨立的線程負責把這些消息批次發(fā)送給目標broker。

broker在收到這些消息時會返回一個響應。如果消息寫入成功,就返回一個RecordMetaData對象,其中包含了主題和分區(qū)信息,以及消息在分區(qū)中的偏移量。如果消息寫入失敗,則會返回一個錯誤。生產(chǎn)者在收到錯誤之后會嘗試重新發(fā)送消息,重試幾次之后如果還是失敗,則會放棄重試,并返回錯誤信息。

生產(chǎn)者發(fā)送消息流程參考圖2:

分布式 - 消息隊列Kafka:Kafka生產(chǎn)者架構(gòu)和配置參數(shù),【分布式-消息隊列Kafka】,kafka

① 主線程和Sender 線程:

整個生產(chǎn)者客戶端由兩個線程協(xié)調(diào)運行,這兩個線程分別為主線程和Sender線程(發(fā)送線程)。在主線程中由KafkaProducer創(chuàng)建消息,然后通過可能的攔截器、序列化器和分區(qū)器的作用之后緩存到消息累加器(RecordAccumulator,也稱為消息收集器)中。Sender 線程負責從RecordAccumulator中獲取消息并將其發(fā)送到Kafka中。

② 消息累加器 RecordAccumulator :

RecordAccumulator 主要用來緩存消息以便 Sender 線程可以批量發(fā)送,進而減少網(wǎng)絡傳輸?shù)馁Y源消耗以提升性能。RecordAccumulator 緩存的大小可以通過生產(chǎn)者客戶端參數(shù)buffer.memory 配置,默認值為 33554432B,即 32MB。如果生產(chǎn)者發(fā)送消息的速度超過發(fā)送到服務器的速度,則會導致生產(chǎn)者空間不足,這個時候KafkaProducer的send()方法調(diào)用要么被阻塞,要么拋出異常,這個取決于參數(shù)max.block.ms的配置,此參數(shù)的默認值為60000,即60秒。

③ 雙端隊列 Deque<ProducerBatch>:

主線程中發(fā)送過來的消息都會被追加到RecordAccumulator的某個雙端隊列(Deque)中,在 RecordAccumulator 的內(nèi)部為每個分區(qū)都維護了一個雙端隊列,隊列中的內(nèi)容就是ProducerBatch,即 Deque<ProducerBatch>。消息寫入緩存時,追加到雙端隊列的尾部;Sender讀取消息時,從雙端隊列的頭部讀取。

④ 消息批次 ProducerBatch:

注意ProducerBatch不是ProducerRecord,ProducerBatch中可以包含一至多個 ProducerRecord。通俗地說,ProducerRecord 是生產(chǎn)者中創(chuàng)建的消息,而ProducerBatch是指一個消息批次,ProducerRecord會被包含在ProducerBatch中,這樣可以使字節(jié)的使用更加緊湊。與此同時,將較小的ProducerRecord拼湊成一個較大的ProducerBatch,也可以減少網(wǎng)絡請求的次數(shù)以提升整體的吞吐量。如果生產(chǎn)者客戶端需要向很多分區(qū)發(fā)送消息,則可以將buffer.memory參數(shù)適當調(diào)大以增加整體的吞吐量。

⑤ BufferPool 和 ByteBuffer:

消息在網(wǎng)絡上都是以字節(jié)(Byte)的形式傳輸?shù)模诎l(fā)送之前需要創(chuàng)建一塊內(nèi)存區(qū)域來保存對應的消息。在Kafka生產(chǎn)者客戶端中,通過java.io.ByteBuffer實現(xiàn)消息內(nèi)存的創(chuàng)建和釋放。不過頻繁的創(chuàng)建和釋放是比較耗費資源的,在RecordAccumulator的內(nèi)部還有一個BufferPool,它主要用來實現(xiàn)ByteBuffer的復用,以實現(xiàn)緩存的高效利用。不過BufferPool只針對特定大小的ByteBuffer進行管理,而其他大小的ByteBuffer不會緩存進BufferPool中,這個特定的大小由batch.size參數(shù)來指定,默認值為16384B,即16KB。我們可以適當?shù)卣{(diào)大batch.size參數(shù)以便多緩存一些消息。

為什么使用 BufferPool 呢?

在Kafka中,RecordAccumulator是用來將生產(chǎn)者發(fā)送的消息緩存起來,以便批量發(fā)送到Kafka集群。在RecordAccumulator的內(nèi)部,還有一個BufferPool,它主要用來實現(xiàn)ByteBuffer的復用,以實現(xiàn)緩存的高效利用。

BufferPool是一個ByteBuffer的池子,它維護了一組ByteBuffer,這些ByteBuffer可以被多個線程共享。當一個線程需要一個ByteBuffer時,它可以從BufferPool中獲取一個可用的ByteBuffer,使用完后再將它歸還給BufferPool。這樣可以避免頻繁地創(chuàng)建和銷毀ByteBuffer,提高了內(nèi)存的利用率和性能。

⑥ ProducerBatch的大小和batch.size參數(shù)的關系:

ProducerBatch的大小和batch.size參數(shù)也有著密切的關系。當一條消息(ProducerRecord)流入RecordAccumulator時,會先尋找與消息分區(qū)所對應的雙端隊列(如果沒有則新建),再從這個雙端隊列的尾部獲取一個ProducerBatch(如果沒有則新建),查看 ProducerBatch 中是否還可以寫入這個 ProducerRecord,如果可以則寫入,如果不可以則需要創(chuàng)建一個新的ProducerBatch。在新建ProducerBatch時評估這條消息的大小是否超過batch.size參數(shù)的大小,如果不超過,那么就以 batch.size 參數(shù)的大小來創(chuàng)建ProducerBatch,這樣在使用完這段內(nèi)存區(qū)域之后,可以通過BufferPool 的管理來進行復用;如果超過,那么就以評估的大小來創(chuàng)建ProducerBatch,這段內(nèi)存區(qū)域不會被復用。

batch.size參數(shù)是Producer的一個配置參數(shù),用于控制ProducerBatch的大小。它指定了一批消息的最大大小,單位是字節(jié)。當Producer發(fā)送的消息數(shù)量達到batch.size或者等待時間超過linger.ms時,Producer會將這一批消息打包成一個ProducerBatch并發(fā)送到Kafka集群中的Broker。

因此,batch.size參數(shù)的大小會直接影響ProducerBatch的大小。如果batch.size設置得太小,會導致ProducerBatch的大小也很小,這樣會增加網(wǎng)絡傳輸?shù)拈_銷,降低消息發(fā)送的效率。如果batch.size設置得太大,會導致ProducerBatch的大小過大,可能會占用過多的內(nèi)存,甚至會導致消息發(fā)送超時或失敗。

⑦ 創(chuàng)建 ProduceRequest:

Sender 從 RecordAccumulator 中獲取緩存的消息之后,會進一步將原本<分區(qū),Deque<ProducerBatch>>的保存形式轉(zhuǎn)變成<Node,List<ProducerBatch>的形式,其中Node表示Kafka集群的broker節(jié)點。對于網(wǎng)絡連接來說,生產(chǎn)者客戶端是與具體的broker節(jié)點建立的連接,也就是向具體的broker 節(jié)點發(fā)送消息,而并不關心消息屬于哪一個分區(qū);而對于 KafkaProducer的應用邏輯而言,我們只關注向哪個分區(qū)中發(fā)送哪些消息,所以在這里需要做一個應用邏輯層面到網(wǎng)絡I/O層面的轉(zhuǎn)換。在轉(zhuǎn)換成<Node,List<ProducerBatch>>的形式之后,Sender 還會進一步封裝成<Node,Request>的形式,這樣就可以將Request請求發(fā)往各個Node了,這里的Request是指Kafka的各種協(xié)議請求,對于消息發(fā)送而言就是指具體的ProduceRequest。

⑧ 正在處理的請求 InFlightRequests:

請求在從Sender線程發(fā)往Kafka之前還會保存到InFlightRequests中。InFlightRequests保存對象的具體形式為 Map<NodeId,Deque<Request>>,它的主要作用是緩存了已經(jīng)發(fā)出去但還沒有收到響應的請求(NodeId 是一個 String 類型,表示節(jié)點的 id 編號)。與此同時,InFlightRequests還提供了許多管理類的方法,并且通過配置參數(shù)還可以限制每個連接(也就是客戶端與Node之間的連接)最多緩存的請求數(shù)。這個配置參數(shù)為max.in.flight.requests.per.connection,默認值為 5,即每個連接最多只能緩存 5個未響應的請求,超過該數(shù)值之后就不能再向這個連接發(fā)送更多的請求了,除非有緩存的請求收到了響應(Response)。通過比較Deque<Request>的size與這個參數(shù)的大小來判斷對應的Node中是否已經(jīng)堆積了很多未響應的消息,如果真是如此,那么說明這個 Node 節(jié)點負載較大或網(wǎng)絡連接有問題,再繼續(xù)向其發(fā)送請求會增大請求超時的可能。

2. Kafka 生產(chǎn)者重要參數(shù)配置

01. acks

acks 指定了生產(chǎn)者在多少個分區(qū)副本收到消息的情況下才會認為消息寫入成功。在默認情況下,Kafka會在leader副本收到消息后向客戶端回應消息寫入成功。acks 是生產(chǎn)者客戶端中一個非常重要的參數(shù),它涉及消息的可靠性和吞吐量之間的權衡。acks參數(shù)有3種類型的值(都是字符串類型)。

① acks=0: 生產(chǎn)者在成功寫入消息之前不會等待任何來自服務器的相應。如果在消息從發(fā)送到寫入Kafka的過程中出現(xiàn)某些異常,導致Kafka并沒有收到這條消息,那么生產(chǎn)者也無從得知,消息也就丟失了。不過因為生產(chǎn)者不需要等待服務器響應,所以它可以以網(wǎng)絡能夠支持的最大速度發(fā)送消息,從而達到很高的吞吐量(每秒鐘傳輸?shù)臄?shù)據(jù)量)。

② acks=1:默認值即為1。生產(chǎn)者發(fā)送消息之后,只要分區(qū)的leader副本成功寫入消息,那么它就會收到來自服務端的成功響應。如果消息無法寫入leader副本,比如在leader 副本崩潰、重新選舉新的 leader 副本的過程中,那么生產(chǎn)者就會收到一個錯誤的響應,為了避免消息丟失,生產(chǎn)者可以選擇重發(fā)消息。如果消息寫入leader副本并返回成功響應給生產(chǎn)者,且在被其他follower副本拉取之前l(fā)eader副本崩潰,那么此時消息還是會丟失,因為新選舉的leader副本中并沒有這條對應的消息。acks設置為1,是消息可靠性和吞吐量之間的折中方案。

③ acks=all:生產(chǎn)者在消息發(fā)送之后,需要等待ISR中的所有副本都成功寫入消息之后才能夠收到來自服務端的成功響應。在其他配置環(huán)境相同的情況下,acks 設置為all(或者-1)可以達到最強的可靠性。但這并不意味著消息就一定可靠,因為ISR中可能只有l(wèi)eader副本,這樣就退化成了acks=1的情況。

分區(qū)中的所有副本統(tǒng)稱為AR 。所有與leader副本保持一定程度同步的副本(包括leader副本在內(nèi))組成ISR,ISR集合是AR集合中的一個子集。消息會先發(fā)送到leader副本,然后follower副本才能從leader副本中拉取消息進行同步,同步期間內(nèi)follower副本相對于leader副本而言會有一定程度的滯后。前面所說的“一定程度的同步”是指可忍受的滯后范圍,這個范圍可以通過參數(shù)進行配置。與leader副本同步滯后過多的副本(不包括leader副本)組成OSR,由此可見,AR=ISR+OSR。在正常情況下,所有的 follower 副本都應該與 leader 副本保持一定程度的同步,即AR=ISR,OSR集合為空。

注意:acks參數(shù)配置的是一個字符串類型,而不是整數(shù)類型,如果配置為整數(shù)類型會拋出異常

// 設置 acks=1
properties.put(ProducerConfig.ACKS_CONFIG,"1");

你會發(fā)現(xiàn),為acks設置的值越小,生產(chǎn)者發(fā)送消息的速度就越快。也就是說,我們通過犧牲可靠性來換取較低的生產(chǎn)者延遲。過,端到端延遲是指從消息生成到可供消費者讀取的時間,這對3種配置來說都是一樣的。這是因為為了保持一致性,在消息被寫入所有同步副本之前,Kafka不允許消費者讀取它們。因此,如果你關心的是端到端延遲,而不是生產(chǎn)者延遲,那么就不需要在可靠性和低延遲之間做權衡了:你可以選擇最可靠的配置,但仍然可以獲得相同的端到端延遲。

02. 消息傳遞時間

有幾個參數(shù)可用來控制開發(fā)人員最感興趣的生產(chǎn)者行為:在調(diào)用send()方法后多長時間可以知道消息發(fā)送成功與否。這也是等待Kafka返回成功響應或放棄重試并承認發(fā)送失敗的時間。

從Kafka 2.1開始,我們將ProduceRecord的發(fā)送時間分成如下兩個時間間隔,它們是被分開處理的。

  • 異步調(diào)用send()所花費的時間。在此期間,調(diào)用send()的線程將被阻塞。
  • 從異步調(diào)用send()返回到觸發(fā)回調(diào)(不管是成功還是失敗)的時間,也就是從ProduceRecord被放到批次中直到Kafka成功響應、出現(xiàn)不可恢復異?;虬l(fā)送超時的時間。

如果同步調(diào)用send(),那么發(fā)送線程將持續(xù)阻塞,也就無法知道每個時間間隔是多長。

分布式 - 消息隊列Kafka:Kafka生產(chǎn)者架構(gòu)和配置參數(shù),【分布式-消息隊列Kafka】,kafka

① max.block.ms

這個參數(shù)用于控制在調(diào)用send()或通過partitionsFor()顯式地請求元數(shù)據(jù)時生產(chǎn)者可以發(fā)生阻塞的時間。當生產(chǎn)者的發(fā)送緩沖區(qū)被填滿或元數(shù)據(jù)不可用時,這些方法就可能發(fā)生阻塞。當達到max.block.ms配置的時間時,就會拋出一個超時異常。

RecordAccumulator 主要用來緩存消息以便 Sender 線程可以批量發(fā)送,進而減少網(wǎng)絡傳輸?shù)馁Y源消耗以提升性能。RecordAccumulator 緩存的大小可以通過生產(chǎn)者客戶端參數(shù)buffer.memory 配置,默認值為 33554432B,即 32MB。如果生產(chǎn)者發(fā)送消息的速度超過發(fā)送到服務器的速度,則會導致生產(chǎn)者空間不足,這個時候KafkaProducer的send()方法調(diào)用要么被阻塞,要么拋出異常,這個取決于參數(shù)max.block.ms的配置,此參數(shù)的默認值為60000,即60秒。

// 設置 max.block.ms 為 60s
properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG,60);

② delivery.timeout.ms

這個參數(shù)用于控制從消息準備好發(fā)送(send()方法成功返回并將消息放入批次中)到broker響應或客戶端放棄發(fā)送(包括重試)所花費的時間。這個時間應該大于linger.ms和request.timeout.ms。如果配置的時間不滿足這一點,則會拋出異常。通常,成功發(fā)送消息的速度要比delivery.timeout.ms快得多。

如果生產(chǎn)者在重試時超出了delivery.timeout.ms,那么將執(zhí)行回調(diào),并會將broker之前返回的錯誤傳給它。如果消息批次還沒有發(fā)送完畢就超出了delivery.timeout.ms,那么也將執(zhí)行回調(diào),并會將超時異常傳給它。

可以將這個參數(shù)配置成你愿意等待的最長時間,通常是幾分鐘,并使用默認的重試次數(shù)(幾乎無限制)。基于這樣的配置,只要生產(chǎn)者還有時間(或者在發(fā)送成功之前),它都會持續(xù)重試。這是一種合理的重試方式。我們的重試策略通常是:“在broker發(fā)生崩潰的情況下,首領選舉通常需要30秒才能完成,因此為了以防萬一,我們會持續(xù)重試120秒?!睘榱吮苊鉄┈嵉嘏渲弥卦嚧螖?shù)和重試時間間隔,只需將delivery.timeout.ms設置為120。

// 設置 delivery.timeout.ms 為 120s
properties.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG,120);

③ request.timeout.ms

這個參數(shù)用于控制生產(chǎn)者在發(fā)送消息時等待服務器響應的時間。需要注意的是,這是指生產(chǎn)者在放棄之前等待每個請求的時間,不包括重試、發(fā)送之前所花費的時間等。如果設置的值已觸及,但服務器沒有響應,那么生產(chǎn)者將重試發(fā)送,或者執(zhí)行回調(diào),并傳給它一個TimeoutException。

④ retries 和 retry.backoff.ms

當生產(chǎn)者收到來自服務器的錯誤消息時,這個錯誤有可能是暫時的(例如,一個分區(qū)沒有首領)。在這種情況下,retries參數(shù)可用于控制生產(chǎn)者在放棄發(fā)送并向客戶端宣告失敗之前可以重試多少次。在默認情況下,重試時間間隔是100毫秒,但可以通過retry.backoff.ms參數(shù)來控制重試時間間隔。

⑤ 總結(jié):

并不建議在當前版本的Kafka中使用這些參數(shù)。相反,你可以測試一下broker在發(fā)生崩潰之后需要多長時間恢復(也就是直到所有分區(qū)都有了首領副本),并設置合理的delivery.timeout.ms,讓重試時間大于Kafka集群從崩潰中恢復的時間,以免生產(chǎn)者過早放棄重試。

生產(chǎn)者并不會重試所有的錯誤。有些錯誤不是暫時的,生產(chǎn)者就不會進行重試(例如,“消息太大”錯誤)。通常,對于可重試的錯誤,生產(chǎn)者會自動進行重試,所以不需要在應用程序中處理重試邏輯。你要做的是集中精力處理不可重試的錯誤或者當重試次數(shù)達到上限時的情況。

03. linger.ms

這個參數(shù)指定了生產(chǎn)者在發(fā)送消息批次之前等待更多消息加入批次的時間。生產(chǎn)者會在批次被填滿或等待時間達到linger.ms時把消息批次發(fā)送出去。在默認情況下,只要有可用的發(fā)送者線程,生產(chǎn)者都會直接把批次發(fā)送出去,就算批次中只有一條消息。

把linger.ms設置成比0大的數(shù),可以讓生產(chǎn)者在將批次發(fā)送給服務器之前等待一會兒,以使更多的消息加入批次中。雖然這樣會增加一點兒延遲,但也極大地提升了吞吐量。這是因為一次性發(fā)送的消息越多,每條消息的開銷就越小,如果啟用了壓縮,則計算量也更少了。

04. buffer.memory

這個參數(shù)用來設置生產(chǎn)者要發(fā)送給服務器的消息的內(nèi)存緩沖區(qū)大小。如果應用程序調(diào)用send()方法的速度超過生產(chǎn)者將消息發(fā)送給服務器的速度,那么生產(chǎn)者的緩沖空間可能會被耗盡,后續(xù)的send()方法調(diào)用會等待內(nèi)存空間被釋放,如果在max.block.ms之后還沒有可用空間,就拋出異常。需要注意的是,這個異常與其他異常不一樣,它是send()方法而不是Future對象拋出來的。

05. batch.size

當有多條消息被發(fā)送給同一個分區(qū)時,生產(chǎn)者會把它們放在同一個批次里。這個參數(shù)指定了一個批次可以使用的內(nèi)存大小。需要注意的是,該參數(shù)是按照字節(jié)數(shù)而不是消息條數(shù)來計算的。當批次被填滿時,批次里所有的消息都將被發(fā)送出去。但是生產(chǎn)者并不一定都會等到批次被填滿時才將其發(fā)送出去。那些未填滿的批次,甚至只包含一條消息的批次也有可能被發(fā)送出去。所以,就算把批次大小設置得很大,也不會導致延遲,只是會占用更多的內(nèi)存而已。但如果把批次大小設置得太小,則會增加一些額外的開銷,因為生產(chǎn)者需要更頻繁地發(fā)送消息。

06. max.in.flight.requests.per.connection

這個參數(shù)指定了生產(chǎn)者在收到服務器響應之前可以發(fā)送多少個消息批次。它的值越大,占用的內(nèi)存就越多,不過吞吐量也會得到提升。Apache wiki頁面上的實驗數(shù)據(jù)表明,在單數(shù)據(jù)中心環(huán)境中,該參數(shù)被設置為2時可以獲得最佳的吞吐量,但使用默認值5也可以獲得差不多的性能。

07. compression.type

在默認情況下,生產(chǎn)者發(fā)送的消息是未經(jīng)壓縮的。這個參數(shù)可以被設置為snappy、gzip、lz4或zstd,這指定了消息被發(fā)送給broker之前使用哪一種壓縮算法。snappy壓縮算法由谷歌發(fā)明,雖然占用較少的CPU時間,但能提供較好的性能和相當可觀的壓縮比。如果同時有性能和網(wǎng)絡帶寬方面的考慮,那么可以使用這種算法。gzip壓縮算法通常會占用較多的CPU時間,但提供了更高的壓縮比。如果網(wǎng)絡帶寬比較有限,則可以使用這種算法。使用壓縮可以降低網(wǎng)絡傳輸和存儲開銷,而這些往往是向Kafka發(fā)送消息的瓶頸所在。

08. max.request.size

這個參數(shù)用于控制生產(chǎn)者發(fā)送的請求的大小。它限制了可發(fā)送的單條最大消息的大小和單個請求的消息總量的大小。假設這個參數(shù)的值為1 MB,那么可發(fā)送的單條最大消息就是1 MB,或者生產(chǎn)者最多可以在單個請求里發(fā)送一條包含1024個大小為1 KB的消息。另外,broker對可接收的最大消息也有限制(message.max.bytes),其兩邊的配置最好是匹配的,以免生產(chǎn)者發(fā)送的消息被broker拒絕。

09. receive.buffer.bytes和 send.buffer.bytes

這兩個參數(shù)分別指定了TCP socket接收和發(fā)送數(shù)據(jù)包的緩沖區(qū)大小。如果它們被設為–1,就使用操作系統(tǒng)默認值。如果生產(chǎn)者或消費者與broker位于不同的數(shù)據(jù)中心,則可以適當加大它們的值,因為跨數(shù)據(jù)中心網(wǎng)絡的延遲一般都比較高,而帶寬又比較低。

10. enable.idempotence

從0.11版本開始,Kafka支持精確一次性語義。

假設為了最大限度地提升可靠性,你將生產(chǎn)者的acks設置為all,并將delivery.timeout.ms設置為一個比較大的數(shù),允許進行盡可能多的重試。這些配置可以確保每條消息被寫入Kafka至少一次。但在某些情況下,消息有可能被寫入Kafka不止一次。假設一個broker收到了生產(chǎn)者發(fā)送的消息,然后消息被寫入本地磁盤并成功復制給了其他broker。此時,這個broker還沒有向生產(chǎn)者發(fā)送響應就發(fā)生了崩潰。而生產(chǎn)者將一直等待,直到達到request.timeout.ms,然后進行重試。重試發(fā)送的消息將被發(fā)送給新的首領,而這個首領已經(jīng)有這條消息的副本,因為之前寫入的消息已經(jīng)被成功復制給它了?,F(xiàn)在,你就有了一條重復的消息。

為了避免這種情況,可以將enable.idempotence設置為true。當冪等生產(chǎn)者被啟用時,生產(chǎn)者將給發(fā)送的每一條消息都加上一個序列號。如果broker收到具有相同序列號的消息,那么它就會拒絕第二個副本,而生產(chǎn)者則會收到DuplicateSequenceException,這個異常對生產(chǎn)者來說是無害的。

如果要啟用冪等性,那么max.in.flight.requests.per.connection應小于或等于5、retries應大于0,并且acks被設置為all。如果設置了不恰當?shù)闹?,則會拋出ConfigException異常。

3. kafka 生產(chǎn)者中何時發(fā)生QueueFullException?

kafka 生產(chǎn)者客戶端由兩個線程協(xié)調(diào)運行,這兩個線程分別為主線程和Sender線程(發(fā)送線程)。在主線程中由KafkaProducer創(chuàng)建消息,然后通過可能的攔截器、序列化器和分區(qū)器的作用之后緩存到消息累加器(RecordAccumulator,也稱為消息收集器)中。Sender 線程負責從RecordAccumulator中獲取消息并將其發(fā)送到Kafka中。

RecordAccumulator 主要用來緩存消息以便 Sender 線程可以批量發(fā)送,進而減少網(wǎng)絡傳輸?shù)馁Y源消耗以提升性能。RecordAccumulator 緩存的大小可以通過生產(chǎn)者客戶端參數(shù)buffer.memory 配置,默認值為 33554432B,即 32MB。如果生產(chǎn)者發(fā)送消息的速度超過發(fā)送到服務器的速度,則會導致生產(chǎn)者空間不足,這個時候KafkaProducer的send()方法調(diào)用要么被阻塞,要么拋出異常 QueueFullException。

Kafka生產(chǎn)者在發(fā)送消息時,如果發(fā)送的消息數(shù)量超過了生產(chǎn)者內(nèi)部緩沖區(qū)的大小,就會發(fā)生QueueFullException異常。這個緩沖區(qū)的大小由生產(chǎn)者配置參數(shù)buffer.memory來控制,默認值為32MB。當生產(chǎn)者發(fā)送的消息數(shù)量超過了這個緩沖區(qū)的大小時,就會拋出QueueFullException異常。

此外,如果生產(chǎn)者發(fā)送消息的速度超過了Kafka集群的處理速度,也會導致生產(chǎn)者內(nèi)部緩沖區(qū)滿,從而拋出QueueFullException異常。在這種情況下,可以通過調(diào)整生產(chǎn)者的發(fā)送速度或增加Kafka集群的處理能力來解決問題。

4. Kafka Producer是否直接將數(shù)據(jù)發(fā)送到broker的leader節(jié)點?

Kafka Producer會直接將數(shù)據(jù)發(fā)送到broker的leader節(jié)點。在Kafka中,每個分區(qū)都有一個leader節(jié)點和多個follower節(jié)點。Producer會將消息發(fā)送到分區(qū)的leader節(jié)點,然后由leader節(jié)點負責將消息復制到所有的follower節(jié)點。這種設計可以提高Kafka的可靠性和可擴展性,因為如果某個節(jié)點出現(xiàn)故障,Kafka可以自動將leader節(jié)點切換到其他可用的節(jié)點上,從而保證數(shù)據(jù)的可靠性和高可用性。

5. Kafka 如何實現(xiàn)批量發(fā)送?

Kafka 實現(xiàn)批量發(fā)送的方式是通過生產(chǎn)者的 batch.size 參數(shù)來控制的。該參數(shù)指定了生產(chǎn)者在發(fā)送消息之前等待的最大字節(jié)數(shù)。當生產(chǎn)者收集到的消息大小達到 batch.size 時,它會將這些消息一起發(fā)送到 Kafka 集群。

具體來說,當生產(chǎn)者收到一條消息時,它會將該消息添加到一個緩沖區(qū)中。如果緩沖區(qū)中的消息大小達到了 batch.size,或者等待時間超過了 linger.ms(指定了生產(chǎn)者在發(fā)送消息之前等待的最大時間),生產(chǎn)者就會將緩沖區(qū)中的所有消息一起發(fā)送到 Kafka 集群。

通過調(diào)整 batch.size 和 linger.ms 參數(shù),可以控制生產(chǎn)者發(fā)送消息的頻率和批量大小,從而優(yōu)化生產(chǎn)者的性能。

6. Kafka 生產(chǎn)者最佳實踐?

01. 發(fā)送消息

Future<RecordMetadata> metadataFuture = producer.send(new ProducerRecord<String, String>(
        topic,   //消息主題。
        null,   //分區(qū)編號。建議為null,由Producer分配。
        System.currentTimeMillis(),   //時間戳。
        String.valueOf(value.hashCode()),   //消息鍵。
        value   //消息值。
));

02. Key和Value

Kafka 的消息有以下兩個字段:

  • Key:消息的標識。
  • Value:消息內(nèi)容。

為了便于追蹤,建議為消息設置一個唯一的Key??梢酝ㄟ^Key追蹤某消息,打印發(fā)送日志和消費日志,了解該消息的發(fā)送和消費情況。如果消息發(fā)送量較大,建議不要設置Key,并使用黏性分區(qū)策略。

03. 失敗重試

分布式環(huán)境下,由于網(wǎng)絡等原因偶爾發(fā)送失敗是常見的。導致這種失敗的原因可能是消息已經(jīng)發(fā)送成功,但是ACK失敗,也有可能是確實沒發(fā)送成功。

Kafka 是VIP網(wǎng)絡架構(gòu),長時間不進行通信連接會被主動斷開,因此,不是一直活躍的客戶端會經(jīng)常收到connection reset by peer錯誤,建議重試消息發(fā)送??梢愿鶕?jù)業(yè)務需求,設置以下重試參數(shù):

  • retries:消息發(fā)送失敗時的重試次數(shù)。
  • retry.backoff.ms,消息發(fā)送失敗的重試間隔,建議設置為1000,單位:毫秒。

04. 使用異步發(fā)送方式

05. Acks

Acks的說明如下:

  • acks=0:無需服務端的Response、性能較高、丟數(shù)據(jù)風險較大。
  • acks=1:服務端主節(jié)點寫成功即返回Response、性能中等、丟數(shù)據(jù)風險中等、主節(jié)點宕機可能導致數(shù)據(jù)丟失。
  • acks=all:服務端主節(jié)點寫成功且備節(jié)點同步成功才返回Response、性能較差、數(shù)據(jù)較為安全、主節(jié)點和備節(jié)點都宕機才會導致數(shù)據(jù)丟失。

為了提升發(fā)送性能, 建議設置為acks=1。

06. 提升發(fā)送性能(減少碎片化發(fā)送請求)

一般情況下,一個Kafka Topic 會有多個分區(qū)。Kafka Producer客戶端在向服務端發(fā)送消息時,需要先確認往哪個Topic的哪個分區(qū)發(fā)送。我們給同一個分區(qū)發(fā)送多條消息時,Producer客戶端將相關消息打包成一個Batch,批量發(fā)送到服務端。Producer客戶端在處理Batch時,是有額外開銷的。一般情況下,小Batch會導致Producer客戶端產(chǎn)生大量請求,造成請求隊列在客戶端和服務端的排隊,并造成相關機器的CPU升高,從而整體推高了消息發(fā)送和消費延遲。一個合適的Batch大小,可以減少發(fā)送消息時客戶端向服務端發(fā)起的請求次數(shù),在整體上提高消息發(fā)送的吞吐和延遲。

Batch機制,Kafka Producer端主要通過兩個參數(shù)進行控制:

  • batch.size : 發(fā)往每個分區(qū)(Partition)的消息緩存量(消息內(nèi)容的字節(jié)數(shù)之和,不是條數(shù))。達到設置的數(shù)值時,就會觸發(fā)一次網(wǎng)絡請求,然后Producer客戶端把消息批量發(fā)往服務器。如果batch.size設置過小,有可能影響發(fā)送性能和穩(wěn)定性。建議保持默認值16384。單位:字節(jié)。
  • linger.ms : 每條消息在緩存中的最長時間。若超過這個時間,Producer客戶端就會忽略batch.size的限制,立即把消息發(fā)往服務器。建議根據(jù)業(yè)務場景, 設置linger.ms在100~1000之間。單位:毫秒。

因此,Kafka Producer客戶端什么時候把消息批量發(fā)送至服務器是由batch.sizelinger.ms共同決定的。您可以根據(jù)具體業(yè)務需求進行調(diào)整。為了提升發(fā)送的性能,保障服務的穩(wěn)定性, 建議您設置batch.size=16384linger.ms=1000。

07. 黏性分區(qū)策略

只有發(fā)送到相同分區(qū)的消息,才會被放到同一個Batch中,因此決定一個Batch如何形成的一個因素是Kafka Producer端設置的分區(qū)策略。Kafka Producer允許通過設置Partitioner的實現(xiàn)類來選擇適合自己業(yè)務的分區(qū)。在消息指定Key的情況下,云消息隊列 Kafka 版Producer的默認策略是對消息的Key進行哈希,然后根據(jù)哈希結(jié)果選擇分區(qū),保證相同Key的消息會發(fā)送到同一個分區(qū)。

在消息沒有指定Key的情況下,Kafka 2.4版本之前的默認策略是循環(huán)使用主題的所有分區(qū),將消息以輪詢的方式發(fā)送到每一個分區(qū)上。但是,這種默認策略Batch的效果會比較差,在實際使用中,可能會產(chǎn)生大量的小Batch,從而使得實際的延遲增加。鑒于該默認策略對無Key消息的分區(qū)效率低問題,Kafka 在2.4版本引入了黏性分區(qū)策略(Sticky Partitioning Strategy)。

黏性分區(qū)策略主要解決無Key消息分散到不同分區(qū),造成小Batch問題。其主要策略是如果一個分區(qū)的Batch完成后,就隨機選擇另一個分區(qū),然后后續(xù)的消息盡可能地使用該分區(qū)。這種策略在短時間內(nèi)看,會將消息發(fā)送到同一個分區(qū),如果拉長整個運行時間,消息還是可以均勻地發(fā)布到各個分區(qū)上的。這樣可以避免消息出現(xiàn)分區(qū)傾斜,同時還可以降低延遲,提升服務整體性能。

如果您使用的Kafka Producer客戶端是2.4及以上版本,默認的分區(qū)策略就采用黏性分區(qū)策略。如果您使用的Producer客戶端版本小于2.4,可以根據(jù)黏性分區(qū)策略原理,自行實現(xiàn)分區(qū)策略,然后通過參數(shù)partitioner.class設置指定的分區(qū)策略。文章來源地址http://www.zghlxwxcb.cn/news/detail-639277.html

到了這里,關于分布式 - 消息隊列Kafka:Kafka生產(chǎn)者架構(gòu)和配置參數(shù)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • 分布式消息隊列Kafka(四)- 消費者

    分布式消息隊列Kafka(四)- 消費者

    1.Kafka消費方式 2.Kafka消費者工作流程 (1)總體工作流程 (2)消費者組工作流程 3.消費者API (1)單個消費者消費 實現(xiàn)代碼 (2)單個消費者指定分區(qū)消費 代碼實現(xiàn): (3)消費者組消費 復制上面CustomConsumer三個,同時去訂閱統(tǒng)一個主題,消費數(shù)據(jù),發(fā)現(xiàn)一個分區(qū)只能被一個

    2023年04月26日
    瀏覽(33)
  • 分布式 - 消息隊列Kafka:Kafka 消費者消息消費與參數(shù)配置

    分布式 - 消息隊列Kafka:Kafka 消費者消息消費與參數(shù)配置

    01. 創(chuàng)建消費者 在讀取消息之前,需要先創(chuàng)建一個KafkaConsumer對象。創(chuàng)建KafkaConsumer對象與創(chuàng)建KafkaProducer對象非常相似——把想要傳給消費者的屬性放在Properties對象里。 為簡單起見,這里只提供4個必要的屬性:bootstrap.servers、key.deserializer 和 value.deserializer。 ① bootstrap.servers 指

    2024年02月12日
    瀏覽(27)
  • 分布式應用之zookeeper集群+消息隊列Kafka

    分布式應用之zookeeper集群+消息隊列Kafka

    ? ? ? ?ZooKeeper是一個分布式的,開放源碼的分布式應用程序協(xié)調(diào)服務,是Google的Chubby一個開源的實現(xiàn),是Hadoop和Hbase的重要組件。它是一個為分布式應用提供一致性服務的軟件,提供的功能包括:配置維護、域名服務、分布式同步、組服務等。為分布式框架提供協(xié)調(diào)服務的

    2024年02月06日
    瀏覽(139)
  • zookeeper+kafka分布式消息隊列集群的部署

    zookeeper+kafka分布式消息隊列集群的部署

    目錄 一、zookeeper 1.Zookeeper 定義 2.Zookeeper 工作機制 3.Zookeeper 特點 4.Zookeeper 數(shù)據(jù)結(jié)構(gòu) 5.Zookeeper 應用場景 (1)統(tǒng)一命名服務 (2)統(tǒng)一配置管理 (3)統(tǒng)一集群管理 (4)服務器動態(tài)上下線 6.Zookeeper 選舉機制 (1)第一次啟動選舉機制 (2)非第一次啟動選舉機制 7.部署zookeepe

    2024年02月14日
    瀏覽(25)
  • 分布式 - 消息隊列Kafka:Kafka 消費者的消費位移

    分布式 - 消息隊列Kafka:Kafka 消費者的消費位移

    01. Kafka 分區(qū)位移 對于Kafka中的分區(qū)而言,它的每條消息都有唯一的offset,用來表示消息在分區(qū)中對應的位置。偏移量從0開始,每個新消息的偏移量比前一個消息的偏移量大1。 每條消息在分區(qū)中的位置信息由一個叫位移(Offset)的數(shù)據(jù)來表征。分區(qū)位移總是從 0 開始,假設一

    2024年02月12日
    瀏覽(27)
  • 分布式 - 消息隊列Kafka:Kafka消費者的分區(qū)分配策略

    分布式 - 消息隊列Kafka:Kafka消費者的分區(qū)分配策略

    Kafka 消費者負載均衡策略? Kafka 消費者分區(qū)分配策略? 1. 環(huán)境準備 創(chuàng)建主題 test 有5個分區(qū),準備 3 個消費者并進行消費,觀察消費分配情況。然后再停止其中一個消費者,再次觀察消費分配情況。 ① 創(chuàng)建主題 test,該主題有5個分區(qū),2個副本: ② 創(chuàng)建3個消費者CustomConsu

    2024年02月13日
    瀏覽(31)
  • 【簡單認識zookeeper+kafka分布式消息隊列集群的部署】

    【簡單認識zookeeper+kafka分布式消息隊列集群的部署】

    Zookeeper是一個開源的分布式的,為分布式框架提供協(xié)調(diào)服務的Apache項目。 Zookeeper從設計模式角度來理解:是一個基于觀察者模式設計的分布式服務管理框架,它負責存儲和管理大家都關心的數(shù)據(jù),然后接受觀察者的注冊,一旦這些數(shù)據(jù)的狀態(tài)發(fā)生變化,Zookeeper就將負責通知已

    2024年02月13日
    瀏覽(24)
  • 分布式 - 消息隊列Kafka:Kafka消費者和消費者組

    分布式 - 消息隊列Kafka:Kafka消費者和消費者組

    1. Kafka 消費者是什么? 消費者負責訂閱Kafka中的主題,并且從訂閱的主題上拉取消息。與其他一些消息中間件不同的是:在Kafka的消費理念中還有一層消費組的概念,每個消費者都有一個對應的消費組。當消息發(fā)布到主題后,只會被投遞給訂閱它的每個消費組中的一個消費者

    2024年02月13日
    瀏覽(29)
  • 分布式 - 消息隊列Kafka:Kafka 消費者消費位移的提交方式

    分布式 - 消息隊列Kafka:Kafka 消費者消費位移的提交方式

    最簡單的提交方式是讓消費者自動提交偏移量,自動提交 offset 的相關參數(shù): enable.auto.commit:是否開啟自動提交 offset 功能,默認為 true; auto.commit.interval.ms:自動提交 offset 的時間間隔,默認為5秒; 如果 enable.auto.commit 被設置為true,那么每過5秒,消費者就會自動提交 poll() 返

    2024年02月12日
    瀏覽(32)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包