Kafka在實(shí)際應(yīng)用中,經(jīng)常被用作高性能、可擴(kuò)展的消息中間件。
Kafka自定義了一套網(wǎng)絡(luò)協(xié)議,只要遵守這套協(xié)議的格式,就可以向Kafka發(fā)送消息,也可以從Kafka中拉取消息。
在實(shí)踐生產(chǎn)過程中,一套API封裝良好、靈活易用的客戶端可以避免開發(fā)人員重復(fù)勞動(dòng),提高開發(fā)效率,也可以提高程序的健壯性和可靠性。
Kafka提供了Java版本的生產(chǎn)者的實(shí)現(xiàn)——KafkaProducer,使用KafkaProducer的API可以輕松實(shí)現(xiàn)同步/異步發(fā)送消息、批量發(fā)送、超時(shí)重發(fā)等復(fù)雜的功能,在業(yè)務(wù)模塊向Kafka寫入消息時(shí),KafkaProducer就顯得必不可少。
現(xiàn)在,Kafka的愛好者已經(jīng)使用多種語言(諸如C++、Java、Python、Go等)實(shí)現(xiàn)了Kafka的客戶端。
如果讀者使用其他語言,可以到Kafka官方網(wǎng)站的wiki(https://cwiki.apache.org/confluence/display/KAFKA/Clients)查找相關(guān)資料。
在Kafka core模塊的kafka.producer包中,新版本的生產(chǎn)者客戶端實(shí)現(xiàn)KafkaProducer(Java實(shí)現(xiàn))在Kafka clients模塊的org.apache.kafka.clients.producer包中。
KafkaProducer分析
在圖中簡(jiǎn)略描述了KafkaProducer發(fā)送消息的整個(gè)流程。
下面簡(jiǎn)述圖中每個(gè)步驟的操作:
- Producerlnterceptors對(duì)消息進(jìn)行攔截。
- Serializer對(duì)消息的key和value進(jìn)行序列化。
- Partitioner為消息選擇合適的Partition。
- RecordAccumulator收集消息,實(shí)現(xiàn)批量發(fā)送。
- Sender從RecordAccumulator獲取消息。
- 構(gòu)造ClientRequest。
- 將ClientRequest交給NetworkClient,準(zhǔn)備發(fā)送。
- NetworkClient將請(qǐng)求放入KafkaChannel的緩存。
- 執(zhí)行網(wǎng)絡(luò)I/O,發(fā)送請(qǐng)求。
- 收到響應(yīng),調(diào)用ClientRequest的回調(diào)函數(shù)。
- 調(diào)用RecordBatch的回調(diào)函數(shù),最終調(diào)用每個(gè)消息上注冊(cè)的回調(diào)函數(shù)。
消息發(fā)送的過程中,涉及兩個(gè)線程協(xié)同工作。主線程首先將業(yè)務(wù)數(shù)據(jù)封裝成ProducerRecord對(duì)象,之后調(diào)用send方法將消息放入RecordAccumulator(消息收集器,也可以理解為主線程與Sender線程之間的緩沖區(qū))中暫存。
Sender線程負(fù)責(zé)將消息信息構(gòu)成請(qǐng)求,并最終執(zhí)行網(wǎng)絡(luò)IVO的線程,它從RecordAccumulator中取出消息并批量發(fā)送出去。
需要注意的是,KafkaProducer是線程安全的,多個(gè)線程間可以共享使用同一個(gè)KafkaProducer對(duì)象。
KafkaProducer實(shí)現(xiàn)了Producer接口,在Producer接口中定義KafkaProducer對(duì)外提供的API,分為四類方法。
- send方法:發(fā)送消息,實(shí)際是將消息放入RecordAccumulator暫存,等待發(fā)送。
- flush方法:刷新操作,等待RecordAccumulator中所有消息發(fā)送完成,在刷新完成之前會(huì)阻塞調(diào)用的線程。
- partitionsFor方法:在KafkaProducer中維護(hù)了一個(gè)Metadata對(duì)象用于存儲(chǔ)Kafka集群的元數(shù)據(jù),Metadata中的元數(shù)據(jù)會(huì)定期更新。partitionsFor方法負(fù)責(zé)從Metadata中獲取指定Topic中的分區(qū)信息。
- close方法:關(guān)閉此Producer對(duì)象,主要操作是設(shè)置close標(biāo)志,等待RecordAccumulator中的消息清空,關(guān)閉Sender線程。
還有一個(gè)metrics方法,用于記錄統(tǒng)計(jì)信息,與消息發(fā)送的流程無關(guān),我們不做詳細(xì)分析。
了解了Producer接口的功能之后,我們下面就來分析KafkaProducer的具體實(shí)現(xiàn)。
首先,介紹KafkaProducer中比較重要的字段,在后面分析過程中,會(huì)逐個(gè)進(jìn)行分析,如圖所示。
- PRODUCER_CLIENT_ID_SEQUENCE:clientld的生成器,如果沒有明確指定client的Id,則使用字段生成一個(gè)ID。
- clientld:此生產(chǎn)者的唯一標(biāo)識(shí)。
- partitioner:分區(qū)選擇器,根據(jù)一定的策略,將消息路由到合適的分區(qū)。
- maxRequestSize:消息的最大長(zhǎng)度,這個(gè)長(zhǎng)度包含了消息頭、序列化后的key和序列化后的value的長(zhǎng)度。
- totalMemorySize:發(fā)送單個(gè)消息的緩沖區(qū)大小。
- accumulator:RecordAccumulator,用于收集并緩存消息,等待Sender線程發(fā)送。
- sender:發(fā)送消息的Sender任務(wù),實(shí)現(xiàn)了Runnable接口,在ioThread線程中執(zhí)行。
- ioThread:執(zhí)行Sender任務(wù)發(fā)送消息的線程,稱為“Sender線程”。
- compressionType:壓縮算法,可選項(xiàng)有none、gzip、snappy、lz4。這是針對(duì)RecordAccumulator中多條消息進(jìn)行的壓縮,所以消息越多,壓縮效果越好。
- keySerializer:key的序列化器。
- valueSerializer:value的序列化器。
- Metadata metadata:整個(gè)Kafka集群的元數(shù)據(jù)。
- maxBlockTimeMs:等待更新Kafka集群元數(shù)據(jù)的最大時(shí)長(zhǎng)。
- requestTimeoutMs:消息的超時(shí)時(shí)間,也就是從消息發(fā)送到收到ACK響應(yīng)的最長(zhǎng)時(shí)長(zhǎng)。
- interceptors:Producerlnterceptor集合,Producerlnterceptor可以在消息發(fā)送之前對(duì)其進(jìn)行攔截或修改;也可以先于用戶的Callback,對(duì)ACK響應(yīng)進(jìn)行預(yù)處理。
- producerConfig:配置對(duì)象,使用反射初始化KafkaProducer配置的相對(duì)對(duì)象。
KafkaProducer構(gòu)造完成之后,我們來關(guān)注KafkaProducer的send方法。圖展示了整個(gè)send方法的調(diào)用流程。
Producerlnterceptors&Producerlnterceptor
Producerlnterceptors是一個(gè)Producerlnterceptor集合,其onSend方法、onAcknowledgement方法、onSendEror方法,實(shí)際上是循環(huán)調(diào)用其封裝的Producerlnterceptor集合的對(duì)應(yīng)方法。
Producerlnterceptor對(duì)象可以在消息發(fā)送之前對(duì)其進(jìn)行攔截或修改,也可以先于用戶的Callback,對(duì)ACK響應(yīng)進(jìn)行預(yù)處理。
如果熟悉Java Web開發(fā),可以將其與Filter的功能做類比。
如果要使用自定義Producerlnterceptor類,只要實(shí)現(xiàn)Producerlnterceptor接口,創(chuàng)建其對(duì)象并添加到Producerlnterceptors中即可。
Producerlnterceptors與ProducerInterceptor之間的關(guān)系如圖所示。
Kafka集群元數(shù)據(jù)
每個(gè)Topic中有多個(gè)分區(qū),這些分區(qū)的Leader副本可以分配在集群中不同的Broker上。
我們站在生產(chǎn)者的角度來看,分區(qū)的數(shù)量以及Leader副本的分布是動(dòng)態(tài)變化的。
通過簡(jiǎn)單的示例說明這種動(dòng)態(tài)變化:在運(yùn)行過程中,Leader副本隨時(shí)都有可能出現(xiàn)故障進(jìn)而導(dǎo)致Leader副本的重新選舉,新的Leader副本會(huì)在其他Broker上繼續(xù)對(duì)外提供服務(wù)。
當(dāng)需要提高某Topic的并行處理消息的能力時(shí),我們可以通過增加其分區(qū)的數(shù)量來實(shí)現(xiàn)。
當(dāng)然,還有別的方式導(dǎo)致這種動(dòng)態(tài)變化,例如,手動(dòng)觸發(fā)“優(yōu)先副本”選舉等。
我們創(chuàng)建的ProducerRecord中只指定了Topic的名稱,并未明確指定分區(qū)編號(hào)。
KafkaProducer要將此消息追加到指定Topic的某個(gè)分區(qū)的Leader副本中,首先需要知道Topic的分區(qū)數(shù)量,經(jīng)過路由后確定目標(biāo)分區(qū),之后KafkaProducer需要知道目標(biāo)分區(qū)的Leader副本所在服務(wù)器的地址、端口等信息,才能建立連接,將消息發(fā)送到Kafka中。
因此,在KafkaProducer中維護(hù)了Kafka集群的元數(shù)據(jù),這些元數(shù)據(jù)記錄了:某個(gè)Topic中有哪幾個(gè)分區(qū),每個(gè)分區(qū)的Leader副本分配哪個(gè)節(jié)點(diǎn)上,F(xiàn)ollower副本分配哪些節(jié)點(diǎn)上,哪些副本在ISR集合中以及這些節(jié)點(diǎn)的網(wǎng)絡(luò)地址、端口。
在KafkaProducer中,使用Node、TopicPartition、PartitionInfo這三個(gè)類封裝了Kafka集群的相關(guān)元數(shù)據(jù),其主要字段如圖所示。
- Node表示集群中的一個(gè)節(jié)點(diǎn),Node記錄這個(gè)節(jié)點(diǎn)的host、ip、port等信息。
- TopicPartition表示某Topic的一個(gè)分區(qū),其中的topic字段是Topic的名稱,partition字段則此分區(qū)在Topic中的分區(qū)編號(hào)(ID)。
- PartitionInfo表示一個(gè)分區(qū)的詳細(xì)信息。其中topic字段和partition字段的含義與TopicPartition中的相同,除此之外,leader字段記錄了Leader副本所在節(jié)點(diǎn)的id,replica字段記錄了全部副本所在的節(jié)點(diǎn)信息,inSyncReplicas字段記錄了ISR集合中所有副本所在的節(jié)點(diǎn)信息。
通過這三個(gè)類的組合,我們可以完整表示出KafkaProducer需要的集群元數(shù)據(jù)。
這些元數(shù)據(jù)保存在了Cluster這個(gè)類中,并按照不同的映射方式進(jìn)行存放,方便查詢。Cluster類的核心字段如圖所示。
- nodes:Kafka集群中節(jié)點(diǎn)信息列表。
- nodesById:Brokerld與Node節(jié)點(diǎn)之間對(duì)應(yīng)關(guān)系,方便按照Brokerld進(jìn)行索引。
- partitionsBy TopicPartition:記錄了TopicPartition與PartitionInfo的映射關(guān)系。
- partitionsByTopic:記錄了Topic名稱和Partitionlnfo的映射關(guān)系,可以按照Topic名稱查詢其中全部分區(qū)的詳細(xì)信息。
- availablePartitionsByTopic:Topic與Partitionlnfo的映射關(guān)系,這里的List中存放的分區(qū)必須是有Leader副本的Partition,而partitionsByTopic中記錄的分區(qū)則不一定有Leader副本,因?yàn)槟承┲虚g狀態(tài),例如Leader副本宕機(jī)而觸發(fā)的選舉過程中,分區(qū)不一定有Leader副本。
- partitionsByNode:記錄了Node與PartitionInfo的映射關(guān)系,可以按照節(jié)點(diǎn)Id查詢其上分布的全部分區(qū)的詳細(xì)信息。
Metadata中封裝了Cluster對(duì)象,并保存Cluster數(shù)據(jù)的最后更新時(shí)間、版本號(hào)(version)、是否需要更新等待信息,如圖所示。
- topics:記錄了當(dāng)前已知的所有topic,在cluster字段中記錄了Topic最新的元數(shù)據(jù)。
- version:表示Kafka集群元數(shù)據(jù)的版本號(hào)。Kafka集群元數(shù)據(jù)每更新成功一次,version字段的值增1。通過新舊版本號(hào)的比較,判斷集群元數(shù)據(jù)是否更新完成。
- metadataExpireMs:每隔多久,更新一次。默認(rèn)是300×1000,也就是5分種。
- refreshBackoffMs:兩次發(fā)出更新Cluster保存的元數(shù)據(jù)信息的最小時(shí)間差,默認(rèn)為100ms。這是為了防止更新操作過于頻繁而造成網(wǎng)絡(luò)阻塞和增加服務(wù)端壓力。在Kafka中與重試操作有關(guān)的操作中,都有這種“退避(backoff)時(shí)間”設(shè)計(jì)的身影。
- lastRefreshMs:記錄上一次更新元數(shù)據(jù)的時(shí)間戳(也包含更新失敗的情況)。
- lastSuccessfulRefreshMs:上一次成功更新的時(shí)間戳。如果每次都成功,則lastSuccessfulRefreshMs、lastRefreshMs相等。 否則,lastRefreshMs>lastSuccessulRefreshMs。
- cluster:記錄Kafka集群的元數(shù)據(jù)。
- needUpdate:標(biāo)識(shí)是否強(qiáng)制更新Cluster,這是觸發(fā)Sender線程更新集群元數(shù)據(jù)的條件之一。
- listeners:監(jiān)聽Metadata更新的監(jiān)聽器集合。自定義Metadata監(jiān)聽實(shí)現(xiàn)Metadata.Listener.onMetadataUpdate方法即可,在更新Metadata中的cluster字段之前,會(huì)通知listener集合中全部Listener對(duì)象。
- needMetadataForAllTopics:是否需要更新全部Topic的元數(shù)據(jù),一般情況下,KafkaProducer只維護(hù)它用到的Topic的元數(shù)據(jù),是集群中全部Topic的子集。
Metadata的方法比較簡(jiǎn)單,主要是操縱上面的幾個(gè)字段,這里著重介紹主線程用到的requestUpdate方法和awaitUpdate方法。
requestUpdate()方法將needUpdate字段修改為true,這樣當(dāng)Sender線程運(yùn)行時(shí)會(huì)更新Metadata記錄的集群元數(shù)據(jù),然后返回version字段的值。
awaitUpdate方法主要是通過version版本號(hào)來判斷元數(shù)據(jù)是否更新完成,更新未完成則阻塞等待。
下面回到KafkaProducer.waitOnMetadata方法的分析,它負(fù)責(zé)觸發(fā)Kafka集群元數(shù)據(jù)的更新,并阻塞主線程等待更新完畢。它的主要步驟是:
- 檢測(cè)Metadata中是否包含指定Topic的元數(shù)據(jù),若不包含,則將Topic添加到topics集合中,下次更新時(shí)會(huì)從服務(wù)端獲取指定Topic的元數(shù)據(jù)。
- 嘗試獲取Topic中分區(qū)的詳細(xì)信息,失敗后會(huì)調(diào)用requestUpdate)方法設(shè)置Metadata.needUpdate字段,并得到當(dāng)前元數(shù)據(jù)版本號(hào)。
- 喚醒Sender線程,由Sender線程更新Metadata中保存的Kafka集群元數(shù)據(jù)。
- 主線程調(diào)用awaitUpdate()方法,等待Sender線程完成更新。
- 從Metadata中獲取指定Topic分區(qū)的詳細(xì)信息(即PartitionInfo集合)。若失敗,則回到步驟2繼續(xù)嘗試,若等待時(shí)間超時(shí),則拋出異常。
waitOnMetadata()方法的具體實(shí)現(xiàn)如下:
Serializer&Deserializer
客戶端發(fā)送的消息的key和value都是byte數(shù)組,Serializer和Deserializer接口提供了將Java對(duì)象序列化(反序列化)為byte數(shù)組的功能。在KafkaProducer中,根據(jù)配置文件,使用合適的Serializer。
圖展示了Serializer和Deserializer接口以及它們的實(shí)現(xiàn)類。
Kafka已經(jīng)為我們提供了Java基本類型的Serializer實(shí)現(xiàn)和Deserializer實(shí)現(xiàn),我們也可以對(duì)Java復(fù)雜類型的自定義Serializer和Deserializer實(shí)現(xiàn),只要實(shí)現(xiàn)Serializer或Deserializer接口即可。
下面簡(jiǎn)單介紹Serializer,Deserializer是其逆操作。
在Serializer接口中,configure()方法是在執(zhí)行序列化操作之前的配置,例如,在StringSerializer.configure()方法中會(huì)選擇合適的編碼類型(encoding),默認(rèn)是UTF-8;IntegerSerializer.configure()方法則是空實(shí)現(xiàn)。
serializer方法是真正進(jìn)行序列化的地方,將傳入的Java對(duì)象序列化為byte[]。
close方法是在其后的關(guān)閉方法,多為空實(shí)現(xiàn)。
Partitioner
KafkaProducer.send()方法的下一步操作是選擇消息的分區(qū)。
在有的應(yīng)用場(chǎng)景中,由業(yè)務(wù)邏輯控制每個(gè)消息追加到合適的分區(qū)中,而有時(shí)候業(yè)務(wù)邏輯并不關(guān)心分區(qū)的選擇。
在KafkaProducer.partition方法中,優(yōu)先根據(jù)ProducerRecord中partition字段指定的序號(hào)選擇分區(qū),如果ProducerRecord.partition字段沒有明確指定分區(qū)編號(hào),則通過Partitioner.partition()方法選擇Partition。
Kafka提供了Partitioner接口的一個(gè)默認(rèn)實(shí)現(xiàn)——DefaultPartitioner,繼承結(jié)構(gòu)如圖(左)所示,可以看到上面介紹的ProducerInterceptor接口也繼承了Configurable接口。
在創(chuàng)建KafkaProducer時(shí)傳人的key/value配置項(xiàng)會(huì)保存到AbstractConfig的originals字段中,如圖(右)所示。AbstractConfig的核心方法是getConfiguredInstance方法,其主要功能是通過反射機(jī)制實(shí)例化originals字段中指定的類。在前面分析KafkaProducer的構(gòu)造函數(shù)時(shí),也看到過此方法的調(diào)用。文章來源:http://www.zghlxwxcb.cn/news/detail-807113.html
DefaultPartitioner.partition方法負(fù)責(zé)在ProduceRecord中沒有明確指定分區(qū)編號(hào)的時(shí)候,為其選擇合適的分區(qū):如果消息沒有key,會(huì)根據(jù)counter與Partition個(gè)數(shù)取模來確定分區(qū)編號(hào),count不斷遞增,確保消息不會(huì)都發(fā)到同一個(gè)Partition里;如果消息有key的話,則對(duì)key進(jìn)行hash(使用的是murmur2這種高效率低碰撞的Hash算法),然后與分區(qū)數(shù)量取模,來確定key所在的分區(qū)達(dá)到負(fù)載均衡。文章來源地址http://www.zghlxwxcb.cn/news/detail-807113.html
到了這里,關(guān)于Kafka-生產(chǎn)者的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!