構(gòu)建消息
構(gòu)建消息,即創(chuàng)建 ProduceRecord 對(duì)象。
(1) kafka發(fā)送消息,最常見(jiàn)的構(gòu)造方法是:
public ProducerRecord(String topic,V value)
topic 表示主題, value 表示值。
(2) kafka發(fā)送消息指定key,ProducerRecord 的 key ,既可以作為消息的唯一id,也可以用來(lái)決定消息該被寫到主題的哪個(gè)分區(qū)。擁有相同key 的消息,將被寫到同一個(gè)分區(qū)。
public ProducerRecord(String topic,K key,V value)
(3) kafka發(fā)送消息指定分區(qū),如下:
public ProducerRecord(String topic,Integer partition,K key,V value)
發(fā)送消息的模式
創(chuàng)建生產(chǎn)者實(shí)例和構(gòu)建消息之后,就可以開始發(fā)送消息了。
發(fā)送消息主要有三種模式:發(fā)后即忘、同步、異步。
發(fā)后即忘:
就是直接調(diào)用 生產(chǎn)者的 send方法發(fā)送。
發(fā)后即完,只管往 kafka中發(fā)送消息,而不關(guān)心消息是否正確到達(dá)。
這種發(fā)送方式的性能最高,可靠性也最差。
producer.send(record);
具體代碼如下:
public class KafkaDemoProducer {
public static final String BROKER_LIST = "localhost:9092";
public static final String TOPIC = "myTopic1";
public static void main(String[] args) {
//屬性配置
Properties properties = getProperties(BROKER_LIST);
//生產(chǎn)者初始化
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "hello kafka");
//發(fā)送消息
try {
producer.send(record);
System.out.println("========>producer.send(record).");
} catch (Exception e) {
System.out.println("send error." + e);
}
producer.close();
}
private static Properties getProperties(String brokerList) {
Properties properties = new Properties();
properties.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
properties.put("bootstrap.servers", brokerList);
return properties;
}
}
同步發(fā)送:
try {
producer.send(record).get();
} catch (ExecutionException | InterruptedException e) {
log.error("send record get error", e);
}
同步發(fā)送的方式可靠性最高,要么消息發(fā)送成功,要么發(fā)生異常。如果發(fā)生異常,會(huì)catch并處理異常。
同步發(fā)送的性能會(huì)差一些,需要阻塞等待一條消息發(fā)送完,才能發(fā)送下一條。
異步發(fā)送:
異步發(fā)送,就是在 send 方法里指定一下 Callback 的回調(diào)函數(shù)。
消息發(fā)送成功后,會(huì)收到成功的回調(diào)。參數(shù) metadata ,為發(fā)送成功的消息,相關(guān)的信息
如果發(fā)送失敗,也會(huì)收到回調(diào),包含失敗的異常信息 exception。
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
log.error("send onCompletion error." , exception);
} else {
log.info(metadata.topic() + "-" + metadata.partition() + ":" + metadata.offset());
}
}
});
kafka入門文章
https://blog.csdn.net/sinat_32502451/category_12465196.html文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-798348.html
kafka入門文章
https://blog.csdn.net/sinat_32502451/category_12465196.html
參考資料:
《深入理解Kafka 核心設(shè)計(jì)與實(shí)踐原理》文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-798348.html
到了這里,關(guān)于kafka入門(五):kafka生產(chǎn)者發(fā)送消息的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!