国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

多圖詳解 kafka 生產(chǎn)者消息發(fā)送過程

這篇具有很好參考價(jià)值的文章主要介紹了多圖詳解 kafka 生產(chǎn)者消息發(fā)送過程。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

多圖詳解 kafka 生產(chǎn)者消息發(fā)送過程

生產(chǎn)者客戶端代碼

public?class?SzzTestSend?{
  
????public?static?final?String?bootStrap?=?"xxxxxx:9090";????public?static?final?String?topic?=?"t_3_1";
????public?static?void?main(String[]?args)?{
  ????????Properties?properties?=?new?Properties();????????properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootStrap);????????//?序列化協(xié)議??下面兩種寫法都可以????????properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,?StringSerializer.class.getName());????????properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");????????//過濾器?可配置多個(gè)用逗號(hào)隔開????????properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"org.apache.kafka.clients.producer.SzzProducerInterceptorsTest");????????//構(gòu)造?KafkaProducer????????KafkaProducer?producer?=?new?KafkaProducer(properties);????????//??發(fā)送消息,?并設(shè)置?回調(diào)(回調(diào)函數(shù)也可以不要)????????ProducerRecord<String,String>?record?=?new?ProducerRecord(topic,"Hello?World!");????????try?{
  ????????????producer.send(record,new?SzzTestCallBack(record.topic(),?record.key(),?record.value()));????????}catch?(Exception?e){
  ????????????e.printStackTrace();????????}????}
????/**?????*?發(fā)送成功回調(diào)類?????*/????public?static?class?SzzTestCallBack?implements?Callback{
  ????????private?static?final?Logger?log?=?LoggerFactory.getLogger(SzzTestCallBack.class);????????private?String?topic;????????private?String?key;????????private?String?value;
????????public?SzzTestCallBack(String?topic,?String?key,?String?value)?{
  ????????????this.topic?=?topic;????????????this.key?=?key;????????????this.value?=?value;
????????}????????public?void?onCompletion(RecordMetadata?metadata,?Exception?e)?{
  ????????????if?(e?!=?null)?{
  ????????????????log.error("Error?when?sending?message?to?topic?{}?with?key:?{},?value:?{}?with?error:",????????????????????????topic,?key,value,?e);????????????}else?{
  ????????????????log.info("send?message?to?topic?{}?with?key:?{}?value:{}?success,?partiton:{}?offset:{}",????????????????????????topic,?key,value,metadata.partition(),metadata.offset());????????????}????????}????}}

1 構(gòu)造 KafkaProducer

KafkaProducer 通過解析producer.propeties文件里面的屬性來(lái)構(gòu)造自己。例如 :分區(qū)器、Key 和 Value 序列化器、攔截器、RecordAccumulator消息累加器 、元信息更新器、啟動(dòng)發(fā)送請(qǐng)求的后臺(tái)線程

????????//構(gòu)造?KafkaProducer????????KafkaProducer?producer?=?new?KafkaProducer(properties);

生產(chǎn)者元信息更新器

我們之前有講過. 客戶端都會(huì)保存集群的元信息,例如生產(chǎn)者的元信息是 ProducerMetadata. 消費(fèi)組的是 ConsumerMetadata 。

?

相關(guān)的 Producer 配置有:

雖然 Producer 元信息會(huì)自動(dòng)更新, 但是有可能在生產(chǎn)者發(fā)送消息的時(shí)候,發(fā)現(xiàn)某個(gè) TopicPartition 不存在,這個(gè)時(shí)候可能就需要立刻發(fā)起一個(gè)元信息更新了。

集群資源變更監(jiān)聽器

org.apache.kafka.common.ClusterResourceListener

在構(gòu)造 KafkaConsumer 的時(shí)候, 還會(huì)構(gòu)造一個(gè) 集群資源變更監(jiān)聽器 ClusterResourceListener

當(dāng)用戶希望收到有關(guān)集群元數(shù)據(jù)更改的通知時(shí),可以實(shí)現(xiàn)回調(diào)接口。

需要在攔截器指標(biāo)采樣器、序列化器反序列化器 中訪問集群元數(shù)據(jù)的用戶可以實(shí)現(xiàn)此接口。

public?interface?ClusterResourceListener?{
  ????/**?????*?用戶可以實(shí)現(xiàn)以獲取?ClusterResource?更新的回調(diào)方法。?????*?@param?clusterResource?cluster?metadata?????*/????void?onUpdate(ClusterResource?clusterResource);}

下面描述了每種類型的方法調(diào)用順序。

Clients

在每個(gè)元數(shù)據(jù)響應(yīng)之后都會(huì)調(diào)用一次 onUpdate(ClusterResource)

當(dāng)在org.apache.kafka.clients.producer.ProducerInterceptor實(shí)現(xiàn)的 ClusterResourceListener 的時(shí)候

調(diào)用順序?yàn)?/strong>: ProducerInterceptor.onSend() -> onUpdate(ClusterResource) -> ProducerInterceptor.onAcknowledgement()

當(dāng)在org.apache.kafka.clients.consumer.ConsumerInterceptor實(shí)現(xiàn)的 ClusterResourceListener 的時(shí)候

調(diào)用順序?yàn)?/strong>:onUpdate() - > ConsumerInterce文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-407479.html

到了這里,關(guān)于多圖詳解 kafka 生產(chǎn)者消息發(fā)送過程的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 分布式 - 消息隊(duì)列Kafka:Kafka生產(chǎn)者發(fā)送消息的3種方式

    分布式 - 消息隊(duì)列Kafka:Kafka生產(chǎn)者發(fā)送消息的3種方式

    不管是把Kafka作為消息隊(duì)列、消息總線還是數(shù)據(jù)存儲(chǔ)平臺(tái),總是需要一個(gè)可以往Kafka寫入數(shù)據(jù)的生產(chǎn)者、一個(gè)可以從Kafka讀取數(shù)據(jù)的消費(fèi)者,或者一個(gè)兼具兩種角色的應(yīng)用程序。 Kafka 生產(chǎn)者是指使用 Apache Kafka 消息系統(tǒng)的應(yīng)用程序,它們負(fù)責(zé)將消息發(fā)送到 Kafka 集群中的一個(gè)或多

    2024年02月13日
    瀏覽(28)
  • Kafka 入門到起飛系列 - 生產(chǎn)者發(fā)送消息流程解析

    Kafka 入門到起飛系列 - 生產(chǎn)者發(fā)送消息流程解析

    生產(chǎn)者通過 producerRecord 對(duì)象封裝消息主題、消息的value(內(nèi)容)、timestamp(時(shí)間戳)等 生產(chǎn)者通過 send() 方法發(fā)送消息,send()方法會(huì)經(jīng)過如下幾步 1. 首先將消息交給 攔截器(Interceptor) 處理, 攔截器對(duì)生產(chǎn)者而言,對(duì)所有消息都是生效的,攔截器也支持鏈?zhǔn)骄幊蹋ㄘ?zé)任器鏈)的

    2024年02月16日
    瀏覽(24)
  • kafka服務(wù)端允許生產(chǎn)者發(fā)送最大消息體大小

    ????????server.properties中加上的message.max.bytes配置,我目前設(shè)置為5242880,即5MB,可以根據(jù)實(shí)際情況增大。 ????????在生產(chǎn)者端配置max.request.size,這是單個(gè)消息最大字節(jié)數(shù),根據(jù)實(shí)際調(diào)整,max.request.size 必須小于 message.max.bytes 以及消費(fèi)者的 max.partition.fetch.bytes。這樣消息

    2024年02月15日
    瀏覽(24)
  • 07、Kafka ------ 消息生產(chǎn)者(演示 發(fā)送消息) 和 消息消費(fèi)者(演示 監(jiān)聽消息)

    07、Kafka ------ 消息生產(chǎn)者(演示 發(fā)送消息) 和 消息消費(fèi)者(演示 監(jiān)聽消息)

    簡(jiǎn)單來(lái)說(shuō),就是一個(gè)數(shù)據(jù)項(xiàng)。 ▲ 消息就是 Kafka 所記錄的數(shù)據(jù)節(jié)點(diǎn),消息在 Kafka 中又被稱為記錄(record)或事件(event)。 從存儲(chǔ)上來(lái)看,消息就是存儲(chǔ)在分區(qū)文件(有點(diǎn)類似于List)中的一個(gè)數(shù)據(jù)項(xiàng),消息具有 key、value、時(shí)間戳 和 可選的元數(shù)據(jù)頭。 ▲ 下面是一個(gè)示例事件

    2024年01月20日
    瀏覽(45)
  • 【注意】Kafka生產(chǎn)者異步發(fā)送消息仍有可能阻塞

    Kafka是常用的消息中間件。在Spring Boot項(xiàng)目中,使用KafkaTemplate作為生產(chǎn)者發(fā)送消息。有時(shí),為了不影響主業(yè)務(wù)流程,會(huì)采用 異步 發(fā)送的方式,如下所示。 本以為采用異步發(fā)送,必然不會(huì)影響到主業(yè)務(wù)流程。但實(shí)際使用時(shí)發(fā)現(xiàn),在第一次發(fā)送消息時(shí),如果Kafka Broker連接失敗,

    2023年04月13日
    瀏覽(25)
  • Kafka中的生產(chǎn)者如何處理消息發(fā)送失敗的情況?

    在Kafka中,生產(chǎn)者可以通過以下方式處理消息發(fā)送失敗的情況: 同步發(fā)送模式(Sync Mode):在同步發(fā)送模式下,生產(chǎn)者發(fā)送消息后會(huì)阻塞等待服務(wù)器的響應(yīng)。如果發(fā)送失敗,生產(chǎn)者會(huì)拋出異常(例如 ProducerRecord 發(fā)送異常)或返回錯(cuò)誤信息。開發(fā)者可以捕獲異常并根據(jù)需要進(jìn)行

    2024年02月06日
    瀏覽(23)
  • kafka生產(chǎn)者發(fā)送消息報(bào)錯(cuò) Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected

    kafka生產(chǎn)者發(fā)送消息報(bào)錯(cuò) Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected

    報(bào)這個(gè)錯(cuò)誤是因?yàn)閗afka里的配置要修改下 在config目錄下 server.properties配置文件 這下發(fā)送消息就不會(huì)一直等待,就可以發(fā)送成功了

    2024年02月06日
    瀏覽(23)
  • RabbitMq生產(chǎn)者發(fā)送消息確認(rèn)

    RabbitMq生產(chǎn)者發(fā)送消息確認(rèn)

    一般情況下RabbitMq的生產(chǎn)者能夠正常的把消息投遞到交換機(jī)Exchange,Exchange能夠根據(jù)路由鍵routingKey把消息投遞到隊(duì)列Queue,但是一旦出現(xiàn)消息無(wú)法投遞到交換機(jī)Exchange,或無(wú)法路由到Queue的這種特殊情況下,則需要對(duì)生產(chǎn)者的消息進(jìn)行緩存或者保存到數(shù)據(jù)庫(kù),后續(xù)在調(diào)查完RabbitM

    2024年02月04日
    瀏覽(25)
  • kafka入門,生產(chǎn)者異步發(fā)送、回調(diào)函數(shù),同步發(fā)送(四)

    引入依賴 回調(diào)函數(shù)會(huì)在producer收到ack時(shí)調(diào)用,該方法有兩個(gè)參數(shù),分別是元數(shù)據(jù)信息(RecordMetadata)和異常信息(Exception),如果Exception為null,說(shuō)明信息發(fā)送失敗 注意:消息發(fā)送失敗會(huì)自動(dòng)重試,不需要我們?cè)诨卣{(diào)函數(shù)中手動(dòng)重試。 只需在異步發(fā)送的基礎(chǔ)上,再調(diào)用一下 get(

    2024年02月11日
    瀏覽(28)
  • Kafka學(xué)習(xí)---2、kafka生產(chǎn)者、異步和同步發(fā)送API、分區(qū)、生產(chǎn)經(jīng)驗(yàn)

    Kafka學(xué)習(xí)---2、kafka生產(chǎn)者、異步和同步發(fā)送API、分區(qū)、生產(chǎn)經(jīng)驗(yàn)

    1.1 生產(chǎn)者消息發(fā)送流程 1.1.1 發(fā)送原理 在消息發(fā)生的過程中,設(shè)計(jì)到了兩個(gè)線程——main線程和Sender線程。在main線程中創(chuàng)建了一個(gè)雙端隊(duì)列RecordAccumulator。main線程將消息發(fā)給RecordAccumulator,Sender線程不斷從RecordAccumulator中拉取消息發(fā)送到Kafka Broker。 batch.size:只有數(shù)據(jù)積累到bat

    2024年02月09日
    瀏覽(22)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包