国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

kafka入門(五):kafka生產(chǎn)者發(fā)送消息

這篇具有很好參考價(jià)值的文章主要介紹了kafka入門(五):kafka生產(chǎn)者發(fā)送消息。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

構(gòu)建消息

構(gòu)建消息,即創(chuàng)建 ProduceRecord 對(duì)象。

(1) kafka發(fā)送消息,最常見(jiàn)的構(gòu)造方法是:

public ProducerRecord(String topic,V value)

topic 表示主題, value 表示值。

(2) kafka發(fā)送消息指定key,ProducerRecord 的 key ,既可以作為消息的唯一id,也可以用來(lái)決定消息該被寫到主題的哪個(gè)分區(qū)。擁有相同key 的消息,將被寫到同一個(gè)分區(qū)。

public ProducerRecord(String topic,K key,V value)

(3) kafka發(fā)送消息指定分區(qū),如下:

public ProducerRecord(String topic,Integer partition,K key,V value)

發(fā)送消息的模式

創(chuàng)建生產(chǎn)者實(shí)例和構(gòu)建消息之后,就可以開始發(fā)送消息了。

發(fā)送消息主要有三種模式:發(fā)后即忘、同步、異步。

發(fā)后即忘:

就是直接調(diào)用 生產(chǎn)者的 send方法發(fā)送。

發(fā)后即完,只管往 kafka中發(fā)送消息,而不關(guān)心消息是否正確到達(dá)。

這種發(fā)送方式的性能最高,可靠性也最差。

producer.send(record);

具體代碼如下:

public class KafkaDemoProducer {
    public static final String BROKER_LIST = "localhost:9092";
    public static final String TOPIC = "myTopic1";

    public static void main(String[] args) {
        //屬性配置
        Properties properties = getProperties(BROKER_LIST);
        //生產(chǎn)者初始化
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "hello kafka");
        //發(fā)送消息
        try {
            producer.send(record);
            System.out.println("========>producer.send(record).");
        } catch (Exception e) {
            System.out.println("send error." + e);
        }
        producer.close();
    }


    private static Properties getProperties(String brokerList) {
        Properties properties = new Properties();
        properties.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("bootstrap.servers", brokerList);
        return properties;
    }

}

同步發(fā)送:

try {
	producer.send(record).get();
} catch (ExecutionException | InterruptedException e) {
	log.error("send record get error", e);
}

同步發(fā)送的方式可靠性最高,要么消息發(fā)送成功,要么發(fā)生異常。如果發(fā)生異常,會(huì)catch并處理異常。

同步發(fā)送的性能會(huì)差一些,需要阻塞等待一條消息發(fā)送完,才能發(fā)送下一條。

異步發(fā)送:

異步發(fā)送,就是在 send 方法里指定一下 Callback 的回調(diào)函數(shù)。

消息發(fā)送成功后,會(huì)收到成功的回調(diào)。參數(shù) metadata ,為發(fā)送成功的消息,相關(guān)的信息

如果發(fā)送失敗,也會(huì)收到回調(diào),包含失敗的異常信息 exception。

producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
    	if (exception != null) {
    		log.error("send onCompletion error." , exception);
   		} else {
  			log.info(metadata.topic() + "-" + metadata.partition() + ":" + metadata.offset());
  	     }
    }
});

kafka入門文章

https://blog.csdn.net/sinat_32502451/category_12465196.html

kafka入門文章

https://blog.csdn.net/sinat_32502451/category_12465196.html

參考資料:

《深入理解Kafka 核心設(shè)計(jì)與實(shí)踐原理》文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-798348.html

到了這里,關(guān)于kafka入門(五):kafka生產(chǎn)者發(fā)送消息的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 分布式 - 消息隊(duì)列Kafka:Kafka生產(chǎn)者發(fā)送消息的分區(qū)策略

    分布式 - 消息隊(duì)列Kafka:Kafka生產(chǎn)者發(fā)送消息的分區(qū)策略

    01. Kafka 分區(qū)的作用 分區(qū)的作用就是提供負(fù)載均衡的能力,或者說(shuō)對(duì)數(shù)據(jù)進(jìn)行分區(qū)的主要原因,就是為了實(shí)現(xiàn)系統(tǒng)的高伸縮性。不同的分區(qū)能夠被放置到不同節(jié)點(diǎn)的機(jī)器上,而數(shù)據(jù)的讀寫操作也都是針對(duì)分區(qū)這個(gè)粒度而進(jìn)行的,這樣每個(gè)節(jié)點(diǎn)的機(jī)器都能獨(dú)立地執(zhí)行各自分區(qū)的

    2024年02月13日
    瀏覽(32)
  • 分布式 - 消息隊(duì)列Kafka:Kafka生產(chǎn)者發(fā)送消息的3種方式

    分布式 - 消息隊(duì)列Kafka:Kafka生產(chǎn)者發(fā)送消息的3種方式

    不管是把Kafka作為消息隊(duì)列、消息總線還是數(shù)據(jù)存儲(chǔ)平臺(tái),總是需要一個(gè)可以往Kafka寫入數(shù)據(jù)的生產(chǎn)者、一個(gè)可以從Kafka讀取數(shù)據(jù)的消費(fèi)者,或者一個(gè)兼具兩種角色的應(yīng)用程序。 Kafka 生產(chǎn)者是指使用 Apache Kafka 消息系統(tǒng)的應(yīng)用程序,它們負(fù)責(zé)將消息發(fā)送到 Kafka 集群中的一個(gè)或多

    2024年02月13日
    瀏覽(28)
  • kafka服務(wù)端允許生產(chǎn)者發(fā)送最大消息體大小

    ????????server.properties中加上的message.max.bytes配置,我目前設(shè)置為5242880,即5MB,可以根據(jù)實(shí)際情況增大。 ????????在生產(chǎn)者端配置max.request.size,這是單個(gè)消息最大字節(jié)數(shù),根據(jù)實(shí)際調(diào)整,max.request.size 必須小于 message.max.bytes 以及消費(fèi)者的 max.partition.fetch.bytes。這樣消息

    2024年02月15日
    瀏覽(24)
  • 07、Kafka ------ 消息生產(chǎn)者(演示 發(fā)送消息) 和 消息消費(fèi)者(演示 監(jiān)聽消息)

    07、Kafka ------ 消息生產(chǎn)者(演示 發(fā)送消息) 和 消息消費(fèi)者(演示 監(jiān)聽消息)

    簡(jiǎn)單來(lái)說(shuō),就是一個(gè)數(shù)據(jù)項(xiàng)。 ▲ 消息就是 Kafka 所記錄的數(shù)據(jù)節(jié)點(diǎn),消息在 Kafka 中又被稱為記錄(record)或事件(event)。 從存儲(chǔ)上來(lái)看,消息就是存儲(chǔ)在分區(qū)文件(有點(diǎn)類似于List)中的一個(gè)數(shù)據(jù)項(xiàng),消息具有 key、value、時(shí)間戳 和 可選的元數(shù)據(jù)頭。 ▲ 下面是一個(gè)示例事件

    2024年01月20日
    瀏覽(45)
  • 【注意】Kafka生產(chǎn)者異步發(fā)送消息仍有可能阻塞

    Kafka是常用的消息中間件。在Spring Boot項(xiàng)目中,使用KafkaTemplate作為生產(chǎn)者發(fā)送消息。有時(shí),為了不影響主業(yè)務(wù)流程,會(huì)采用 異步 發(fā)送的方式,如下所示。 本以為采用異步發(fā)送,必然不會(huì)影響到主業(yè)務(wù)流程。但實(shí)際使用時(shí)發(fā)現(xiàn),在第一次發(fā)送消息時(shí),如果Kafka Broker連接失敗,

    2023年04月13日
    瀏覽(25)
  • Kafka中的生產(chǎn)者如何處理消息發(fā)送失敗的情況?

    在Kafka中,生產(chǎn)者可以通過(guò)以下方式處理消息發(fā)送失敗的情況: 同步發(fā)送模式(Sync Mode):在同步發(fā)送模式下,生產(chǎn)者發(fā)送消息后會(huì)阻塞等待服務(wù)器的響應(yīng)。如果發(fā)送失敗,生產(chǎn)者會(huì)拋出異常(例如 ProducerRecord 發(fā)送異常)或返回錯(cuò)誤信息。開發(fā)者可以捕獲異常并根據(jù)需要進(jìn)行

    2024年02月06日
    瀏覽(23)
  • kafka入門,生產(chǎn)者異步發(fā)送、回調(diào)函數(shù),同步發(fā)送(四)

    引入依賴 回調(diào)函數(shù)會(huì)在producer收到ack時(shí)調(diào)用,該方法有兩個(gè)參數(shù),分別是元數(shù)據(jù)信息(RecordMetadata)和異常信息(Exception),如果Exception為null,說(shuō)明信息發(fā)送失敗 注意:消息發(fā)送失敗會(huì)自動(dòng)重試,不需要我們?cè)诨卣{(diào)函數(shù)中手動(dòng)重試。 只需在異步發(fā)送的基礎(chǔ)上,再調(diào)用一下 get(

    2024年02月11日
    瀏覽(28)
  • kafka生產(chǎn)者發(fā)送消息報(bào)錯(cuò) Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected

    kafka生產(chǎn)者發(fā)送消息報(bào)錯(cuò) Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected

    報(bào)這個(gè)錯(cuò)誤是因?yàn)閗afka里的配置要修改下 在config目錄下 server.properties配置文件 這下發(fā)送消息就不會(huì)一直等待,就可以發(fā)送成功了

    2024年02月06日
    瀏覽(23)
  • RabbitMq生產(chǎn)者發(fā)送消息確認(rèn)

    RabbitMq生產(chǎn)者發(fā)送消息確認(rèn)

    一般情況下RabbitMq的生產(chǎn)者能夠正常的把消息投遞到交換機(jī)Exchange,Exchange能夠根據(jù)路由鍵routingKey把消息投遞到隊(duì)列Queue,但是一旦出現(xiàn)消息無(wú)法投遞到交換機(jī)Exchange,或無(wú)法路由到Queue的這種特殊情況下,則需要對(duì)生產(chǎn)者的消息進(jìn)行緩存或者保存到數(shù)據(jù)庫(kù),后續(xù)在調(diào)查完RabbitM

    2024年02月04日
    瀏覽(25)
  • Kafka學(xué)習(xí)---2、kafka生產(chǎn)者、異步和同步發(fā)送API、分區(qū)、生產(chǎn)經(jīng)驗(yàn)

    Kafka學(xué)習(xí)---2、kafka生產(chǎn)者、異步和同步發(fā)送API、分區(qū)、生產(chǎn)經(jīng)驗(yàn)

    1.1 生產(chǎn)者消息發(fā)送流程 1.1.1 發(fā)送原理 在消息發(fā)生的過(guò)程中,設(shè)計(jì)到了兩個(gè)線程——main線程和Sender線程。在main線程中創(chuàng)建了一個(gè)雙端隊(duì)列RecordAccumulator。main線程將消息發(fā)給RecordAccumulator,Sender線程不斷從RecordAccumulator中拉取消息發(fā)送到Kafka Broker。 batch.size:只有數(shù)據(jù)積累到bat

    2024年02月09日
    瀏覽(22)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包