整體架構(gòu)
整個生產(chǎn)者客戶端由兩個縣城協(xié)調(diào)運行,這兩個線程分別為主線程和Sender線程(發(fā)送線程)。
主線程中由KafkaProducer創(chuàng)建消息,然后通過可能的攔截器,序列化器和分區(qū)器之后緩存到消息累加器(RecordAccumulator)。Sender線程負(fù)責(zé)從RecordAccumulator中獲取消息并將其發(fā)送到kafka中。
RecordAccumulator 主要用來緩存消息以便 Sender 線程可以批量發(fā)送,進而減少網(wǎng)絡(luò)傳輸?shù)馁Y源消耗以提升性能。RecordAccumulator 緩存的大小可以通過生產(chǎn)者客戶端參數(shù) buffer.memory 配置,默認(rèn)值為 33554432B,即32MB。如果生產(chǎn)者發(fā)送消息的速度超過發(fā)送到服務(wù)器的速度,則會導(dǎo)致生產(chǎn)者空間不足,這個時候 KafkaProducer 的 send() 方法調(diào)用要么被阻塞,要么拋出異常,這個取決于參數(shù) max.block.ms 的配置,此參數(shù)的默認(rèn)值為60000,即60秒。
主線程中發(fā)送過來的消息都會被追加到 RecordAccumulator 的某個雙端隊列(Deque)中,在 RecordAccumulator 的內(nèi)部為每個分區(qū)都維護了一個雙端隊列,隊列中的內(nèi)容就是 ProducerBatch,即 Deque。消息寫入緩存時,追加到雙端隊列的尾部;Sender 讀取消息時,從雙端隊列的頭部讀取。注意 ProducerBatch 不是 ProducerRecord,ProducerBatch 中可以包含一至多個 ProducerRecord。通俗地說,ProducerRecord 是生產(chǎn)者中創(chuàng)建的消息,而 ProducerBatch 是指一個消息批次,ProducerRecord 會被包含在 ProducerBatch 中,這樣可以使字節(jié)的使用更加緊湊。與此同時,將較小的 ProducerRecord 拼湊成一個較大的 ProducerBatch,也可以減少網(wǎng)絡(luò)請求的次數(shù)以提升整體的吞吐量。
Sender 從 RecordAccumulator 中獲取緩存的消息之后,會進一步將原本<分區(qū), Deque< ProducerBatch>> 的保存形式轉(zhuǎn)變成 <Node, List< ProducerBatch> 的形式,其中 Node 表示 Kafka 集群的 broker 節(jié)點。對于網(wǎng)絡(luò)連接來說,生產(chǎn)者客戶端是與具體的 broker 節(jié)點建立的連接,也就是向具體的 broker 節(jié)點發(fā)送消息,而并不關(guān)心消息屬于哪一個分區(qū);而對于 KafkaProducer 的應(yīng)用邏輯而言,我們只關(guān)注向哪個分區(qū)中發(fā)送哪些消息,所以在這里需要做一個應(yīng)用邏輯層面到網(wǎng)絡(luò)I/O層面的轉(zhuǎn)換。
在轉(zhuǎn)換成 <Node, List> 的形式之后,Sender 還會進一步封裝成 <Node, Request> 的形式,這樣就可以將 Request 請求發(fā)往各個 Node 了,這里的 Request 是指 Kafka 的各種協(xié)議請求,對于消息發(fā)送而言就是指具體的 ProduceRequest。
請求在從 Sender 線程發(fā)往 Kafka 之前還會保存到 InFlightRequests 中,InFlightRequests 保存對象的具體形式為 Map<NodeId, Deque>,它的主要作用是緩存了已經(jīng)發(fā)出去但還沒有收到響應(yīng)的請求(NodeId 是一個 String 類型,表示節(jié)點的 id 編號)。與此同時,InFlightRequests 還提供了許多管理類的方法,并且通過配置參數(shù)還可以限制每個連接(也就是客戶端與 Node 之間的連接)最多緩存的請求數(shù)。這個配置參數(shù)為 max.in.flight.requests.per.connection,默認(rèn)值為5,即每個連接最多只能緩存5個未響應(yīng)的請求,超過該數(shù)值之后就不能再向這個連接發(fā)送更多的請求了,除非有緩存的請求收到了響應(yīng)(Response)。通過比較 Deque 的 size 與這個參數(shù)的大小來判斷對應(yīng)的 Node 中是否已經(jīng)堆積了很多未響應(yīng)的消息,如果真是如此,那么說明這個 Node 節(jié)點負(fù)載較大或網(wǎng)絡(luò)連接有問題,再繼續(xù)向其發(fā)送請求會增大請求超時的可能。
前面提及的 InFlightRequests 還可以獲得 leastLoadedNode,即所有 Node 中負(fù)載最小的那一個。這里的負(fù)載最小是通過每個 Node 在 InFlightRequests 中還未確認(rèn)的請求決定的,未確認(rèn)的請求越多則認(rèn)為負(fù)載越大。對于下圖中的 InFlightRequests 來說,圖中展示了三個節(jié)點 Node0、Node1和Node2,很明顯 Node1 的負(fù)載最小。也就是說,Node1 為當(dāng)前的 leastLoadedNode。選擇 leastLoadedNode 發(fā)送請求可以使它能夠盡快發(fā)出,避免因網(wǎng)絡(luò)擁塞等異常而影響整體的進度。leastLoadedNode 的概念可以用于多個應(yīng)用場合,比如元數(shù)據(jù)請求、消費者組播協(xié)議的交互。
生產(chǎn)者重要參數(shù)
acks
- acks=1。(默認(rèn)值):生產(chǎn)者發(fā)送消息之后,只要分區(qū)的 leader 副本成功寫入消息,那么它就會收到來自服務(wù)端的成功響應(yīng)。如果消息無法寫入 leader 副本,比如在 leader 副本崩潰、重新選舉新的 leader 副本的過程中,那么生產(chǎn)者就會收到一個錯誤的響應(yīng),為了避免消息丟失,生產(chǎn)者可以選擇重發(fā)消息。如果消息寫入 leader 副本并返回成功響應(yīng)給生產(chǎn)者,且在被其他 follower 副本拉取之前 leader 副本崩潰,那么此時消息還是會丟失,因為新選舉的 leader 副本中并沒有這條對應(yīng)的消息。acks 設(shè)置為1,是消息可靠性和吞吐量之間的折中方案。
- acks=0。生產(chǎn)者發(fā)送消息之后不需要等待任何服務(wù)端的響應(yīng)。如果在消息從發(fā)送到寫入 Kafka 的過程中出現(xiàn)某些異常,導(dǎo)致 Kafka 并沒有收到這條消息,那么生產(chǎn)者也無從得知,消息也就丟失了。在其他配置環(huán)境相同的情況下,acks 設(shè)置為0可以達到最大的吞吐量。
- acks=-1或acks=all。生產(chǎn)者在消息發(fā)送之后,需要等待 ISR 中的所有副本都成功寫入消息之后才能夠收到來自服務(wù)端的成功響應(yīng)。在其他配置環(huán)境相同的情況下,acks 設(shè)置為 -1(all) 可以達到最強的可靠性。但這并不意味著消息就一定可靠,因為ISR中可能只有 leader 副本,這樣就退化成了 acks=1 的情況。要獲得更高的消息可靠性需要配合 min.insync.replicas 等參數(shù)的聯(lián)動
retries和retry.backoff.ms
retries 參數(shù)用來配置生產(chǎn)者重試的次數(shù),默認(rèn)值為0,即在發(fā)生異常的時候不進行任何重試動作。消息在從生產(chǎn)者發(fā)出到成功寫入服務(wù)器之前可能發(fā)生一些臨時性的異常,比如網(wǎng)絡(luò)抖動、leader 副本的選舉等,這種異常往往是可以自行恢復(fù)的,生產(chǎn)者可以通過配置 retries 大于0的值,以此通過內(nèi)部重試來恢復(fù)而不是一味地將異常拋給生產(chǎn)者的應(yīng)用程序。如果重試達到設(shè)定的次數(shù),那么生產(chǎn)者就會放棄重試并返回異常。不過并不是所有的異常都是可以通過重試來解決的,比如消息太大,超過 max.request.size 參數(shù)配置的值時,這種方式就不可行了。
重試還和另一個參數(shù) retry.backoff.ms 有關(guān),這個參數(shù)的默認(rèn)值為100,它用來設(shè)定兩次重試之間的時間間隔,避免無效的頻繁重試。在配置 retries 和 retry.backoff.ms 之前,最好先估算一下可能的異常恢復(fù)時間,這樣可以設(shè)定總的重試時間大于這個異?;謴?fù)時間,以此來避免生產(chǎn)者過早地放棄重試。
Kafka 可以保證同一個分區(qū)中的消息是有序的。如果生產(chǎn)者按照一定的順序發(fā)送消息,那么這些消息也會順序地寫入分區(qū),進而消費者也可以按照同樣的順序消費它們。文章來源:http://www.zghlxwxcb.cn/news/detail-757539.html
對于某些應(yīng)用來說,順序性非常重要,比如 MySQL 的 binlog 傳輸,如果出現(xiàn)錯誤就會造成非常嚴(yán)重的后果。如果將 retries 參數(shù)配置為非零值,并且 max.in.flight.requests.per.connection 參數(shù)配置為大于1的值,那么就會出現(xiàn)錯序的現(xiàn)象:如果第一批次消息寫入失敗,而第二批次消息寫入成功,那么生產(chǎn)者會重試發(fā)送第一批次的消息,此時如果第一批次的消息寫入成功,那么這兩個批次的消息就出現(xiàn)了錯序。一般而言,在需要保證消息順序的場合建議把參數(shù) max.in.flight.requests.per.connection 配置為1,而不是把 retries 配置為0,不過這樣也會影響整體的吞吐。文章來源地址http://www.zghlxwxcb.cn/news/detail-757539.html
到了這里,關(guān)于Kafka系列 - 生產(chǎn)者客戶端架構(gòu)以及3個重要參數(shù)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!