国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Kafka-生產(chǎn)者

這篇具有很好參考價(jià)值的文章主要介紹了Kafka-生產(chǎn)者。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

Kafka在實(shí)際應(yīng)用中,經(jīng)常被用作高性能、可擴(kuò)展的消息中間件。

Kafka自定義了一套網(wǎng)絡(luò)協(xié)議,只要遵守這套協(xié)議的格式,就可以向Kafka發(fā)送消息,也可以從Kafka中拉取消息。

在實(shí)踐生產(chǎn)過程中,一套API封裝良好、靈活易用的客戶端可以避免開發(fā)人員重復(fù)勞動(dòng),提高開發(fā)效率,也可以提高程序的健壯性和可靠性。

Kafka提供了Java版本的生產(chǎn)者的實(shí)現(xiàn)——KafkaProducer,使用KafkaProducer的API可以輕松實(shí)現(xiàn)同步/異步發(fā)送消息、批量發(fā)送、超時(shí)重發(fā)等復(fù)雜的功能,在業(yè)務(wù)模塊向Kafka寫入消息時(shí),KafkaProducer就顯得必不可少。

現(xiàn)在,Kafka的愛好者已經(jīng)使用多種語言(諸如C++、Java、Python、Go等)實(shí)現(xiàn)了Kafka的客戶端。

如果讀者使用其他語言,可以到Kafka官方網(wǎng)站的wiki(https://cwiki.apache.org/confluence/display/KAFKA/Clients)查找相關(guān)資料。

在Kafka core模塊的kafka.producer包中,新版本的生產(chǎn)者客戶端實(shí)現(xiàn)KafkaProducer(Java實(shí)現(xiàn))在Kafka clients模塊的org.apache.kafka.clients.producer包中。

KafkaProducer分析

在圖中簡(jiǎn)略描述了KafkaProducer發(fā)送消息的整個(gè)流程。

Kafka-生產(chǎn)者,隊(duì)列,kafka,分布式
下面簡(jiǎn)述圖中每個(gè)步驟的操作:

  1. Producerlnterceptors對(duì)消息進(jìn)行攔截。
  2. Serializer對(duì)消息的key和value進(jìn)行序列化。
  3. Partitioner為消息選擇合適的Partition。
  4. RecordAccumulator收集消息,實(shí)現(xiàn)批量發(fā)送。
  5. Sender從RecordAccumulator獲取消息。
  6. 構(gòu)造ClientRequest。
  7. 將ClientRequest交給NetworkClient,準(zhǔn)備發(fā)送。
  8. NetworkClient將請(qǐng)求放入KafkaChannel的緩存。
  9. 執(zhí)行網(wǎng)絡(luò)I/O,發(fā)送請(qǐng)求。
  10. 收到響應(yīng),調(diào)用ClientRequest的回調(diào)函數(shù)。
  11. 調(diào)用RecordBatch的回調(diào)函數(shù),最終調(diào)用每個(gè)消息上注冊(cè)的回調(diào)函數(shù)。

消息發(fā)送的過程中,涉及兩個(gè)線程協(xié)同工作。主線程首先將業(yè)務(wù)數(shù)據(jù)封裝成ProducerRecord對(duì)象,之后調(diào)用send方法將消息放入RecordAccumulator(消息收集器,也可以理解為主線程與Sender線程之間的緩沖區(qū))中暫存。

Sender線程負(fù)責(zé)將消息信息構(gòu)成請(qǐng)求,并最終執(zhí)行網(wǎng)絡(luò)IVO的線程,它從RecordAccumulator中取出消息并批量發(fā)送出去。

需要注意的是,KafkaProducer是線程安全的,多個(gè)線程間可以共享使用同一個(gè)KafkaProducer對(duì)象。

KafkaProducer實(shí)現(xiàn)了Producer接口,在Producer接口中定義KafkaProducer對(duì)外提供的API,分為四類方法。

  • send方法:發(fā)送消息,實(shí)際是將消息放入RecordAccumulator暫存,等待發(fā)送。
  • flush方法:刷新操作,等待RecordAccumulator中所有消息發(fā)送完成,在刷新完成之前會(huì)阻塞調(diào)用的線程。
  • partitionsFor方法:在KafkaProducer中維護(hù)了一個(gè)Metadata對(duì)象用于存儲(chǔ)Kafka集群的元數(shù)據(jù),Metadata中的元數(shù)據(jù)會(huì)定期更新。partitionsFor方法負(fù)責(zé)從Metadata中獲取指定Topic中的分區(qū)信息。
  • close方法:關(guān)閉此Producer對(duì)象,主要操作是設(shè)置close標(biāo)志,等待RecordAccumulator中的消息清空,關(guān)閉Sender線程。
    還有一個(gè)metrics方法,用于記錄統(tǒng)計(jì)信息,與消息發(fā)送的流程無關(guān),我們不做詳細(xì)分析。
    了解了Producer接口的功能之后,我們下面就來分析KafkaProducer的具體實(shí)現(xiàn)。

首先,介紹KafkaProducer中比較重要的字段,在后面分析過程中,會(huì)逐個(gè)進(jìn)行分析,如圖所示。

Kafka-生產(chǎn)者,隊(duì)列,kafka,分布式

  • PRODUCER_CLIENT_ID_SEQUENCE:clientld的生成器,如果沒有明確指定client的Id,則使用字段生成一個(gè)ID。
  • clientld:此生產(chǎn)者的唯一標(biāo)識(shí)。
  • partitioner:分區(qū)選擇器,根據(jù)一定的策略,將消息路由到合適的分區(qū)。
  • maxRequestSize:消息的最大長(zhǎng)度,這個(gè)長(zhǎng)度包含了消息頭、序列化后的key和序列化后的value的長(zhǎng)度。
  • totalMemorySize:發(fā)送單個(gè)消息的緩沖區(qū)大小。
  • accumulator:RecordAccumulator,用于收集并緩存消息,等待Sender線程發(fā)送。
  • sender:發(fā)送消息的Sender任務(wù),實(shí)現(xiàn)了Runnable接口,在ioThread線程中執(zhí)行。
  • ioThread:執(zhí)行Sender任務(wù)發(fā)送消息的線程,稱為“Sender線程”。
  • compressionType:壓縮算法,可選項(xiàng)有none、gzip、snappy、lz4。這是針對(duì)RecordAccumulator中多條消息進(jìn)行的壓縮,所以消息越多,壓縮效果越好。
  • keySerializer:key的序列化器。
  • valueSerializer:value的序列化器。
  • Metadata metadata:整個(gè)Kafka集群的元數(shù)據(jù)。
  • maxBlockTimeMs:等待更新Kafka集群元數(shù)據(jù)的最大時(shí)長(zhǎng)。
  • requestTimeoutMs:消息的超時(shí)時(shí)間,也就是從消息發(fā)送到收到ACK響應(yīng)的最長(zhǎng)時(shí)長(zhǎng)。
  • interceptors:Producerlnterceptor集合,Producerlnterceptor可以在消息發(fā)送之前對(duì)其進(jìn)行攔截或修改;也可以先于用戶的Callback,對(duì)ACK響應(yīng)進(jìn)行預(yù)處理。
  • producerConfig:配置對(duì)象,使用反射初始化KafkaProducer配置的相對(duì)對(duì)象。

KafkaProducer構(gòu)造完成之后,我們來關(guān)注KafkaProducer的send方法。圖展示了整個(gè)send方法的調(diào)用流程。

Kafka-生產(chǎn)者,隊(duì)列,kafka,分布式

Producerlnterceptors&Producerlnterceptor

Producerlnterceptors是一個(gè)Producerlnterceptor集合,其onSend方法、onAcknowledgement方法、onSendEror方法,實(shí)際上是循環(huán)調(diào)用其封裝的Producerlnterceptor集合的對(duì)應(yīng)方法。

Producerlnterceptor對(duì)象可以在消息發(fā)送之前對(duì)其進(jìn)行攔截或修改,也可以先于用戶的Callback,對(duì)ACK響應(yīng)進(jìn)行預(yù)處理。

如果熟悉Java Web開發(fā),可以將其與Filter的功能做類比。

如果要使用自定義Producerlnterceptor類,只要實(shí)現(xiàn)Producerlnterceptor接口,創(chuàng)建其對(duì)象并添加到Producerlnterceptors中即可。

Producerlnterceptors與ProducerInterceptor之間的關(guān)系如圖所示。

Kafka-生產(chǎn)者,隊(duì)列,kafka,分布式

Kafka集群元數(shù)據(jù)

每個(gè)Topic中有多個(gè)分區(qū),這些分區(qū)的Leader副本可以分配在集群中不同的Broker上。

我們站在生產(chǎn)者的角度來看,分區(qū)的數(shù)量以及Leader副本的分布是動(dòng)態(tài)變化的。

通過簡(jiǎn)單的示例說明這種動(dòng)態(tài)變化:在運(yùn)行過程中,Leader副本隨時(shí)都有可能出現(xiàn)故障進(jìn)而導(dǎo)致Leader副本的重新選舉,新的Leader副本會(huì)在其他Broker上繼續(xù)對(duì)外提供服務(wù)。

當(dāng)需要提高某Topic的并行處理消息的能力時(shí),我們可以通過增加其分區(qū)的數(shù)量來實(shí)現(xiàn)。

當(dāng)然,還有別的方式導(dǎo)致這種動(dòng)態(tài)變化,例如,手動(dòng)觸發(fā)“優(yōu)先副本”選舉等。

我們創(chuàng)建的ProducerRecord中只指定了Topic的名稱,并未明確指定分區(qū)編號(hào)。

KafkaProducer要將此消息追加到指定Topic的某個(gè)分區(qū)的Leader副本中,首先需要知道Topic的分區(qū)數(shù)量,經(jīng)過路由后確定目標(biāo)分區(qū),之后KafkaProducer需要知道目標(biāo)分區(qū)的Leader副本所在服務(wù)器的地址、端口等信息,才能建立連接,將消息發(fā)送到Kafka中。

因此,在KafkaProducer中維護(hù)了Kafka集群的元數(shù)據(jù),這些元數(shù)據(jù)記錄了:某個(gè)Topic中有哪幾個(gè)分區(qū),每個(gè)分區(qū)的Leader副本分配哪個(gè)節(jié)點(diǎn)上,F(xiàn)ollower副本分配哪些節(jié)點(diǎn)上,哪些副本在ISR集合中以及這些節(jié)點(diǎn)的網(wǎng)絡(luò)地址、端口。

在KafkaProducer中,使用Node、TopicPartition、PartitionInfo這三個(gè)類封裝了Kafka集群的相關(guān)元數(shù)據(jù),其主要字段如圖所示。

Kafka-生產(chǎn)者,隊(duì)列,kafka,分布式

  • Node表示集群中的一個(gè)節(jié)點(diǎn),Node記錄這個(gè)節(jié)點(diǎn)的host、ip、port等信息。
  • TopicPartition表示某Topic的一個(gè)分區(qū),其中的topic字段是Topic的名稱,partition字段則此分區(qū)在Topic中的分區(qū)編號(hào)(ID)。
  • PartitionInfo表示一個(gè)分區(qū)的詳細(xì)信息。其中topic字段和partition字段的含義與TopicPartition中的相同,除此之外,leader字段記錄了Leader副本所在節(jié)點(diǎn)的id,replica字段記錄了全部副本所在的節(jié)點(diǎn)信息,inSyncReplicas字段記錄了ISR集合中所有副本所在的節(jié)點(diǎn)信息。

通過這三個(gè)類的組合,我們可以完整表示出KafkaProducer需要的集群元數(shù)據(jù)。

這些元數(shù)據(jù)保存在了Cluster這個(gè)類中,并按照不同的映射方式進(jìn)行存放,方便查詢。Cluster類的核心字段如圖所示。

Kafka-生產(chǎn)者,隊(duì)列,kafka,分布式

  • nodes:Kafka集群中節(jié)點(diǎn)信息列表。
  • nodesById:Brokerld與Node節(jié)點(diǎn)之間對(duì)應(yīng)關(guān)系,方便按照Brokerld進(jìn)行索引。
  • partitionsBy TopicPartition:記錄了TopicPartition與PartitionInfo的映射關(guān)系。
  • partitionsByTopic:記錄了Topic名稱和Partitionlnfo的映射關(guān)系,可以按照Topic名稱查詢其中全部分區(qū)的詳細(xì)信息。
  • availablePartitionsByTopic:Topic與Partitionlnfo的映射關(guān)系,這里的List中存放的分區(qū)必須是有Leader副本的Partition,而partitionsByTopic中記錄的分區(qū)則不一定有Leader副本,因?yàn)槟承┲虚g狀態(tài),例如Leader副本宕機(jī)而觸發(fā)的選舉過程中,分區(qū)不一定有Leader副本。
  • partitionsByNode:記錄了Node與PartitionInfo的映射關(guān)系,可以按照節(jié)點(diǎn)Id查詢其上分布的全部分區(qū)的詳細(xì)信息。

Metadata中封裝了Cluster對(duì)象,并保存Cluster數(shù)據(jù)的最后更新時(shí)間、版本號(hào)(version)、是否需要更新等待信息,如圖所示。

Kafka-生產(chǎn)者,隊(duì)列,kafka,分布式

  • topics:記錄了當(dāng)前已知的所有topic,在cluster字段中記錄了Topic最新的元數(shù)據(jù)。
  • version:表示Kafka集群元數(shù)據(jù)的版本號(hào)。Kafka集群元數(shù)據(jù)每更新成功一次,version字段的值增1。通過新舊版本號(hào)的比較,判斷集群元數(shù)據(jù)是否更新完成。
  • metadataExpireMs:每隔多久,更新一次。默認(rèn)是300×1000,也就是5分種。
  • refreshBackoffMs:兩次發(fā)出更新Cluster保存的元數(shù)據(jù)信息的最小時(shí)間差,默認(rèn)為100ms。這是為了防止更新操作過于頻繁而造成網(wǎng)絡(luò)阻塞和增加服務(wù)端壓力。在Kafka中與重試操作有關(guān)的操作中,都有這種“退避(backoff)時(shí)間”設(shè)計(jì)的身影。
  • lastRefreshMs:記錄上一次更新元數(shù)據(jù)的時(shí)間戳(也包含更新失敗的情況)。
  • lastSuccessfulRefreshMs:上一次成功更新的時(shí)間戳。如果每次都成功,則lastSuccessfulRefreshMs、lastRefreshMs相等。 否則,lastRefreshMs>lastSuccessulRefreshMs。
  • cluster:記錄Kafka集群的元數(shù)據(jù)。
  • needUpdate:標(biāo)識(shí)是否強(qiáng)制更新Cluster,這是觸發(fā)Sender線程更新集群元數(shù)據(jù)的條件之一。
  • listeners:監(jiān)聽Metadata更新的監(jiān)聽器集合。自定義Metadata監(jiān)聽實(shí)現(xiàn)Metadata.Listener.onMetadataUpdate方法即可,在更新Metadata中的cluster字段之前,會(huì)通知listener集合中全部Listener對(duì)象。
  • needMetadataForAllTopics:是否需要更新全部Topic的元數(shù)據(jù),一般情況下,KafkaProducer只維護(hù)它用到的Topic的元數(shù)據(jù),是集群中全部Topic的子集。

Metadata的方法比較簡(jiǎn)單,主要是操縱上面的幾個(gè)字段,這里著重介紹主線程用到的requestUpdate方法和awaitUpdate方法。

requestUpdate()方法將needUpdate字段修改為true,這樣當(dāng)Sender線程運(yùn)行時(shí)會(huì)更新Metadata記錄的集群元數(shù)據(jù),然后返回version字段的值。

awaitUpdate方法主要是通過version版本號(hào)來判斷元數(shù)據(jù)是否更新完成,更新未完成則阻塞等待。

Kafka-生產(chǎn)者,隊(duì)列,kafka,分布式

下面回到KafkaProducer.waitOnMetadata方法的分析,它負(fù)責(zé)觸發(fā)Kafka集群元數(shù)據(jù)的更新,并阻塞主線程等待更新完畢。它的主要步驟是:

  1. 檢測(cè)Metadata中是否包含指定Topic的元數(shù)據(jù),若不包含,則將Topic添加到topics集合中,下次更新時(shí)會(huì)從服務(wù)端獲取指定Topic的元數(shù)據(jù)。
  2. 嘗試獲取Topic中分區(qū)的詳細(xì)信息,失敗后會(huì)調(diào)用requestUpdate)方法設(shè)置Metadata.needUpdate字段,并得到當(dāng)前元數(shù)據(jù)版本號(hào)。
  3. 喚醒Sender線程,由Sender線程更新Metadata中保存的Kafka集群元數(shù)據(jù)。
  4. 主線程調(diào)用awaitUpdate()方法,等待Sender線程完成更新。
  5. 從Metadata中獲取指定Topic分區(qū)的詳細(xì)信息(即PartitionInfo集合)。若失敗,則回到步驟2繼續(xù)嘗試,若等待時(shí)間超時(shí),則拋出異常。

waitOnMetadata()方法的具體實(shí)現(xiàn)如下:

Kafka-生產(chǎn)者,隊(duì)列,kafka,分布式

Serializer&Deserializer

客戶端發(fā)送的消息的key和value都是byte數(shù)組,Serializer和Deserializer接口提供了將Java對(duì)象序列化(反序列化)為byte數(shù)組的功能。在KafkaProducer中,根據(jù)配置文件,使用合適的Serializer。

圖展示了Serializer和Deserializer接口以及它們的實(shí)現(xiàn)類。

Kafka-生產(chǎn)者,隊(duì)列,kafka,分布式
Kafka已經(jīng)為我們提供了Java基本類型的Serializer實(shí)現(xiàn)和Deserializer實(shí)現(xiàn),我們也可以對(duì)Java復(fù)雜類型的自定義Serializer和Deserializer實(shí)現(xiàn),只要實(shí)現(xiàn)Serializer或Deserializer接口即可。

下面簡(jiǎn)單介紹Serializer,Deserializer是其逆操作。

在Serializer接口中,configure()方法是在執(zhí)行序列化操作之前的配置,例如,在StringSerializer.configure()方法中會(huì)選擇合適的編碼類型(encoding),默認(rèn)是UTF-8;IntegerSerializer.configure()方法則是空實(shí)現(xiàn)。

serializer方法是真正進(jìn)行序列化的地方,將傳入的Java對(duì)象序列化為byte[]。

close方法是在其后的關(guān)閉方法,多為空實(shí)現(xiàn)。

Partitioner

KafkaProducer.send()方法的下一步操作是選擇消息的分區(qū)。

在有的應(yīng)用場(chǎng)景中,由業(yè)務(wù)邏輯控制每個(gè)消息追加到合適的分區(qū)中,而有時(shí)候業(yè)務(wù)邏輯并不關(guān)心分區(qū)的選擇。

在KafkaProducer.partition方法中,優(yōu)先根據(jù)ProducerRecord中partition字段指定的序號(hào)選擇分區(qū),如果ProducerRecord.partition字段沒有明確指定分區(qū)編號(hào),則通過Partitioner.partition()方法選擇Partition。

Kafka-生產(chǎn)者,隊(duì)列,kafka,分布式

Kafka提供了Partitioner接口的一個(gè)默認(rèn)實(shí)現(xiàn)——DefaultPartitioner,繼承結(jié)構(gòu)如圖(左)所示,可以看到上面介紹的ProducerInterceptor接口也繼承了Configurable接口。

在創(chuàng)建KafkaProducer時(shí)傳人的key/value配置項(xiàng)會(huì)保存到AbstractConfig的originals字段中,如圖(右)所示。AbstractConfig的核心方法是getConfiguredInstance方法,其主要功能是通過反射機(jī)制實(shí)例化originals字段中指定的類。在前面分析KafkaProducer的構(gòu)造函數(shù)時(shí),也看到過此方法的調(diào)用。

DefaultPartitioner.partition方法負(fù)責(zé)在ProduceRecord中沒有明確指定分區(qū)編號(hào)的時(shí)候,為其選擇合適的分區(qū):如果消息沒有key,會(huì)根據(jù)counter與Partition個(gè)數(shù)取模來確定分區(qū)編號(hào),count不斷遞增,確保消息不會(huì)都發(fā)到同一個(gè)Partition里;如果消息有key的話,則對(duì)key進(jìn)行hash(使用的是murmur2這種高效率低碰撞的Hash算法),然后與分區(qū)數(shù)量取模,來確定key所在的分區(qū)達(dá)到負(fù)載均衡。文章來源地址http://www.zghlxwxcb.cn/news/detail-807113.html

到了這里,關(guān)于Kafka-生產(chǎn)者的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Kafka生產(chǎn)者原理 kafka生產(chǎn)者發(fā)送流程 kafka消息發(fā)送到集群步驟 kafka如何發(fā)送消息 kafka詳解

    Kafka生產(chǎn)者原理 kafka生產(chǎn)者發(fā)送流程 kafka消息發(fā)送到集群步驟 kafka如何發(fā)送消息 kafka詳解

    kafka尚硅谷視頻: 10_尚硅谷_Kafka_生產(chǎn)者_(dá)原理_嗶哩嗶哩_bilibili ? ???? 1. producer初始化:加載默認(rèn)配置,以及配置的參數(shù),開啟網(wǎng)絡(luò)線程 ???? 2. 攔截器攔截 ???? 3. 序列化器進(jìn)行消息key, value序列化 ???? 4. 進(jìn)行分區(qū) ???? 5. kafka broker集群 獲取metaData ???? 6. 消息緩存到

    2024年02月11日
    瀏覽(21)
  • 三、Kafka生產(chǎn)者1---Kafka生產(chǎn)者初始化-new KafkaProducer

    概述 本文主要是分享Kafka初始化生產(chǎn)者的 大體過程 初始化過程中會(huì)新建很多對(duì)象,本文暫先分享部分對(duì)象 1.分區(qū)器---Partitioner partitioner 2.重試時(shí)間---long retryBackoffMs 3.序列化器---SerializerK keySerializer,SerializerV valueSerializer 4.攔截器--- ListProducerInterceptorK, V interceptorList 5.累加器-

    2024年03月14日
    瀏覽(37)
  • Apache Kafka - 重識(shí)Kafka生產(chǎn)者

    Apache Kafka - 重識(shí)Kafka生產(chǎn)者

    Kafka 生產(chǎn)者是 Apache Kafka 中的一個(gè)重要組件,它負(fù)責(zé)將數(shù)據(jù)發(fā)送到 Kafka 集群中。在實(shí)時(shí)數(shù)據(jù)處理和流式處理應(yīng)用程序中,Kafka 生產(chǎn)者扮演著非常重要的角色。 這里我們將介紹 Kafka 生產(chǎn)者的概念、工作原理以及如何使用 Kafka 生產(chǎn)者。 Kafka 生產(chǎn)者是一種用于將數(shù)據(jù)發(fā)送到 Kafk

    2024年02月05日
    瀏覽(25)
  • [kafka消息生產(chǎn)被阻塞] - 如何解決Kafka生產(chǎn)者阻塞的問題

    [kafka消息生產(chǎn)被阻塞] - 如何解決Kafka生產(chǎn)者阻塞的問題 Kafka是一個(gè)高度可擴(kuò)展的分布式流平臺(tái),用于構(gòu)建實(shí)時(shí)數(shù)據(jù)管道和流處理應(yīng)用程序。作為一個(gè)廣泛使用的消息代理系統(tǒng),Kafka在數(shù)據(jù)傳輸方面表現(xiàn)出色,但是在極端情況下,它可能會(huì)出現(xiàn)生產(chǎn)者阻塞的問題。這可能會(huì)導(dǎo)致

    2024年02月11日
    瀏覽(22)
  • kafka入門(五):kafka生產(chǎn)者發(fā)送消息

    構(gòu)建消息,即創(chuàng)建 ProduceRecord 對(duì)象。 (1) kafka發(fā)送消息,最常見的構(gòu)造方法是: topic 表示主題, value 表示值。 (2) kafka發(fā)送消息指定key,ProducerRecord 的 key ,既可以作為消息的唯一id,也可以用來決定消息該被寫到主題的哪個(gè)分區(qū)。擁有相同key 的消息,將被寫到同一個(gè)分區(qū)。

    2024年01月17日
    瀏覽(41)
  • Kafka-生產(chǎn)者

    Kafka-生產(chǎn)者

    Kafka在實(shí)際應(yīng)用中,經(jīng)常被用作高性能、可擴(kuò)展的消息中間件。 Kafka自定義了一套網(wǎng)絡(luò)協(xié)議,只要遵守這套協(xié)議的格式,就可以向Kafka發(fā)送消息,也可以從Kafka中拉取消息。 在實(shí)踐生產(chǎn)過程中,一套API封裝良好、靈活易用的客戶端可以避免開發(fā)人員重復(fù)勞動(dòng),提高開發(fā)效率,也

    2024年01月20日
    瀏覽(24)
  • (三)Kafka 生產(chǎn)者

    (三)Kafka 生產(chǎn)者

    創(chuàng)建一個(gè) ProducerRecord 對(duì)象,需要包含目標(biāo)主題和要發(fā)送的內(nèi)容,還可以指定鍵、分區(qū)、時(shí)間戳或標(biāo)頭。 在發(fā)送 ProducerRecord 對(duì)象時(shí),生產(chǎn)者需要先把鍵和值對(duì)象序列化成字節(jié)數(shù)組,這樣才能在網(wǎng)絡(luò)上傳輸。 如果沒有顯式地指定分區(qū),那么數(shù)據(jù)將被傳給分區(qū)器。分區(qū)器通常會(huì)基

    2024年02月09日
    瀏覽(21)
  • 三、Kafka生產(chǎn)者

    三、Kafka生產(chǎn)者

    1 發(fā)送原理 在消息發(fā)送的過程中,涉及到了兩個(gè)線程——main 線程和 Sender 線程。在 main 線程中創(chuàng)建了一個(gè)雙端隊(duì)列 RecordAccumulator。main 線程將消息發(fā)送給 RecordAccumulator,Sender 線程不斷從 RecordAccumulator 中拉取消息發(fā)送到 Kafka Broker 【RecordAccumulator緩沖的結(jié)構(gòu): 每一個(gè)分區(qū)對(duì)應(yīng)一

    2024年02月12日
    瀏覽(21)
  • Kafka(生產(chǎn)者)

    Kafka(生產(chǎn)者)

    目 前 企 業(yè) 中 比 較 常 見 的 消 息 隊(duì) 列 產(chǎn) 品 主 要 有 Kafka(在大數(shù)據(jù)場(chǎng)景主要采用 Kafka 作為消息隊(duì)列。) ActiveMQ RabbitMQ RocketMQ 1.1.1 傳統(tǒng)消息隊(duì)列的應(yīng)用場(chǎng)景 傳統(tǒng)的消息隊(duì)列的主要應(yīng)用場(chǎng)景包括: 緩存/消峰 、 解耦 和 異步通信 。 緩沖/消峰: 有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過

    2024年02月11日
    瀏覽(27)
  • 「Kafka」生產(chǎn)者篇

    「Kafka」生產(chǎn)者篇

    在消息發(fā)送的過程中,涉及到了 兩個(gè)線程 —— main 線程 和 Sender 線程 。 在 main 線程中創(chuàng)建了 一個(gè) 雙端隊(duì)列 RecordAccumulator 。 main線程將消息發(fā)送給RecordAccumulator,Sender線程不斷從 RecordAccumulator 中拉取消息發(fā)送到 Kafka Broker。 main線程創(chuàng)建 Producer 對(duì)象,調(diào)用 send 函數(shù)發(fā)送消息,

    2024年01月19日
    瀏覽(23)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包