国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

kafka入門,生產(chǎn)者異步發(fā)送、回調(diào)函數(shù),同步發(fā)送(四)

這篇具有很好參考價值的文章主要介紹了kafka入門,生產(chǎn)者異步發(fā)送、回調(diào)函數(shù),同步發(fā)送(四)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

一、創(chuàng)建Maven項目

引入依賴

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.0.0</version>
        </dependency>
    </dependencies>

二、異步發(fā)送

 public static void main(String[] args) throws InterruptedException, ExecutionException {
        //1、創(chuàng)建kafka生產(chǎn)者的配置對象
        Properties properties=new Properties();
        //2、給kafka配置對象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");
        //key,value序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        //3、創(chuàng)建kafka生產(chǎn)者對象
        KafkaProducer<String,String> kafkaProducer=new KafkaProducer<String, String>(properties);
        //4.調(diào)用send發(fā)送消息
        for (int i = 0; i < 100; i++) {
            //異步發(fā)送 不帶回調(diào)函數(shù)
            kafkaProducer.send(new ProducerRecord<>("first", "kafka " + i));
        }
        //關閉資源
        kafkaProducer.close();
    }
    

三、回調(diào)函數(shù)

回調(diào)函數(shù)會在producer收到ack時調(diào)用,該方法有兩個參數(shù),分別是元數(shù)據(jù)信息(RecordMetadata)和異常信息(Exception),如果Exception為null,說明信息發(fā)送失敗
注意:消息發(fā)送失敗會自動重試,不需要我們在回調(diào)函數(shù)中手動重試。

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        //1、創(chuàng)建kafka生產(chǎn)者的配置對象
        Properties properties=new Properties();
        //2、給kafka配置對象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");
        //key,value序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        //3、創(chuàng)建kafka生產(chǎn)者對象
        KafkaProducer<String,String> kafkaProducer=new KafkaProducer<String, String>(properties);
        //4.調(diào)用send發(fā)送消息
        for (int i = 0; i < 100; i++) {
           
            //異步發(fā)送帶回調(diào)函數(shù)
            kafkaProducer.send(new ProducerRecord<>("first", "kafka " + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    if(e==null){
                        //沒用一次輸出,輸出信息到控制臺
                        System.out.println(String.format("主題:%s,分區(qū):%s",metadata.topic(),metadata.partition()));
                    }else {
                        e.printStackTrace();
                    }
                }
            });
            //延遲一會,數(shù)據(jù)會發(fā)送到不同分區(qū),發(fā)送太快則可能會到同個分區(qū) 16K 0ms
            Thread.sleep(2);
        }
        //關閉資源
        kafkaProducer.close();
    }
}

四、同步發(fā)送

只需在異步發(fā)送的基礎上,再調(diào)用一下 get()方法即可文章來源地址http://www.zghlxwxcb.cn/news/detail-506942.html

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        //1、創(chuàng)建kafka生產(chǎn)者的配置對象
        Properties properties=new Properties();
        //2、給kafka配置對象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");
        //key,value序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        //3、創(chuàng)建kafka生產(chǎn)者對象
        KafkaProducer<String,String> kafkaProducer=new KafkaProducer<String, String>(properties);
        //4.調(diào)用send發(fā)送消息
        for (int i = 0; i < 100; i++) {
            //同步發(fā)送
            kafkaProducer.send(new ProducerRecord<>("first", "kafka " + i)).get();
        }
        //關閉資源
        kafkaProducer.close();
    }

到了這里,關于kafka入門,生產(chǎn)者異步發(fā)送、回調(diào)函數(shù),同步發(fā)送(四)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • Kafka 入門到起飛系列 - 生產(chǎn)者發(fā)送消息流程解析

    Kafka 入門到起飛系列 - 生產(chǎn)者發(fā)送消息流程解析

    生產(chǎn)者通過 producerRecord 對象封裝消息主題、消息的value(內(nèi)容)、timestamp(時間戳)等 生產(chǎn)者通過 send() 方法發(fā)送消息,send()方法會經(jīng)過如下幾步 1. 首先將消息交給 攔截器(Interceptor) 處理, 攔截器對生產(chǎn)者而言,對所有消息都是生效的,攔截器也支持鏈式編程(責任器鏈)的

    2024年02月16日
    瀏覽(24)
  • Kafka生產(chǎn)者原理 kafka生產(chǎn)者發(fā)送流程 kafka消息發(fā)送到集群步驟 kafka如何發(fā)送消息 kafka詳解

    Kafka生產(chǎn)者原理 kafka生產(chǎn)者發(fā)送流程 kafka消息發(fā)送到集群步驟 kafka如何發(fā)送消息 kafka詳解

    kafka尚硅谷視頻: 10_尚硅谷_Kafka_生產(chǎn)者_原理_嗶哩嗶哩_bilibili ? ???? 1. producer初始化:加載默認配置,以及配置的參數(shù),開啟網(wǎng)絡線程 ???? 2. 攔截器攔截 ???? 3. 序列化器進行消息key, value序列化 ???? 4. 進行分區(qū) ???? 5. kafka broker集群 獲取metaData ???? 6. 消息緩存到

    2024年02月11日
    瀏覽(21)
  • 多圖詳解 kafka 生產(chǎn)者消息發(fā)送過程

    多圖詳解 kafka 生產(chǎn)者消息發(fā)送過程

    生產(chǎn)者客戶端代碼 KafkaProducer 通過解析 producer.propeties 文件里面的屬性來構造自己。例如 :分區(qū)器、Key 和 Value 序列化器、攔截器、 RecordAccumulator消息累加器 、 元信息更新器 、啟動發(fā)送請求的后臺線程 生產(chǎn)者元信息更新器 我們之前有講過. 客戶端都會保存集群的元信息,例如

    2023年04月09日
    瀏覽(30)
  • kafka服務端允許生產(chǎn)者發(fā)送最大消息體大小

    ????????server.properties中加上的message.max.bytes配置,我目前設置為5242880,即5MB,可以根據(jù)實際情況增大。 ????????在生產(chǎn)者端配置max.request.size,這是單個消息最大字節(jié)數(shù),根據(jù)實際調(diào)整,max.request.size 必須小于 message.max.bytes 以及消費者的 max.partition.fetch.bytes。這樣消息

    2024年02月15日
    瀏覽(24)
  • 分布式 - 消息隊列Kafka:Kafka生產(chǎn)者發(fā)送消息的方式

    分布式 - 消息隊列Kafka:Kafka生產(chǎn)者發(fā)送消息的方式

    不管是把Kafka作為消息隊列、消息總線還是數(shù)據(jù)存儲平臺,總是需要一個可以往Kafka寫入數(shù)據(jù)的生產(chǎn)者、一個可以從Kafka讀取數(shù)據(jù)的消費者,或者一個兼具兩種角色的應用程序。 Kafka 生產(chǎn)者是指使用 Apache Kafka 消息系統(tǒng)的應用程序,它們負責將消息發(fā)送到 Kafka 集群中的一個或多

    2024年02月13日
    瀏覽(29)
  • 分布式 - 消息隊列Kafka:Kafka生產(chǎn)者發(fā)送消息的分區(qū)策略

    分布式 - 消息隊列Kafka:Kafka生產(chǎn)者發(fā)送消息的分區(qū)策略

    01. Kafka 分區(qū)的作用 分區(qū)的作用就是提供負載均衡的能力,或者說對數(shù)據(jù)進行分區(qū)的主要原因,就是為了實現(xiàn)系統(tǒng)的高伸縮性。不同的分區(qū)能夠被放置到不同節(jié)點的機器上,而數(shù)據(jù)的讀寫操作也都是針對分區(qū)這個粒度而進行的,這樣每個節(jié)點的機器都能獨立地執(zhí)行各自分區(qū)的

    2024年02月13日
    瀏覽(32)
  • 分布式 - 消息隊列Kafka:Kafka生產(chǎn)者發(fā)送消息的3種方式

    分布式 - 消息隊列Kafka:Kafka生產(chǎn)者發(fā)送消息的3種方式

    不管是把Kafka作為消息隊列、消息總線還是數(shù)據(jù)存儲平臺,總是需要一個可以往Kafka寫入數(shù)據(jù)的生產(chǎn)者、一個可以從Kafka讀取數(shù)據(jù)的消費者,或者一個兼具兩種角色的應用程序。 Kafka 生產(chǎn)者是指使用 Apache Kafka 消息系統(tǒng)的應用程序,它們負責將消息發(fā)送到 Kafka 集群中的一個或多

    2024年02月13日
    瀏覽(28)
  • Kafka中的生產(chǎn)者如何處理消息發(fā)送失敗的情況?

    在Kafka中,生產(chǎn)者可以通過以下方式處理消息發(fā)送失敗的情況: 同步發(fā)送模式(Sync Mode):在同步發(fā)送模式下,生產(chǎn)者發(fā)送消息后會阻塞等待服務器的響應。如果發(fā)送失敗,生產(chǎn)者會拋出異常(例如 ProducerRecord 發(fā)送異常)或返回錯誤信息。開發(fā)者可以捕獲異常并根據(jù)需要進行

    2024年02月06日
    瀏覽(23)
  • 07、Kafka ------ 消息生產(chǎn)者(演示 發(fā)送消息) 和 消息消費者(演示 監(jiān)聽消息)

    07、Kafka ------ 消息生產(chǎn)者(演示 發(fā)送消息) 和 消息消費者(演示 監(jiān)聽消息)

    簡單來說,就是一個數(shù)據(jù)項。 ▲ 消息就是 Kafka 所記錄的數(shù)據(jù)節(jié)點,消息在 Kafka 中又被稱為記錄(record)或事件(event)。 從存儲上來看,消息就是存儲在分區(qū)文件(有點類似于List)中的一個數(shù)據(jù)項,消息具有 key、value、時間戳 和 可選的元數(shù)據(jù)頭。 ▲ 下面是一個示例事件

    2024年01月20日
    瀏覽(45)
  • 大數(shù)據(jù)開發(fā)之Kafka(概述、快速入門、生產(chǎn)者)

    大數(shù)據(jù)開發(fā)之Kafka(概述、快速入門、生產(chǎn)者)

    Kafka是一個分布式的基于發(fā)布/訂閱模式的消息隊列,主要應用于大數(shù)據(jù)實時處理領域。 發(fā)布/訂閱:消息的發(fā)布者不會將消息直接發(fā)送給特定的訂閱者,而是將發(fā)布的消息分為不同的類別,訂閱者只接收感興趣的消息。 目前企業(yè)中比較常見的消息隊列產(chǎn)品主要有Kafka、ActiveM

    2024年01月19日
    瀏覽(21)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包