一、創(chuàng)建Maven項目
引入依賴
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
二、異步發(fā)送
public static void main(String[] args) throws InterruptedException, ExecutionException {
//1、創(chuàng)建kafka生產(chǎn)者的配置對象
Properties properties=new Properties();
//2、給kafka配置對象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");
//key,value序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//3、創(chuàng)建kafka生產(chǎn)者對象
KafkaProducer<String,String> kafkaProducer=new KafkaProducer<String, String>(properties);
//4.調(diào)用send發(fā)送消息
for (int i = 0; i < 100; i++) {
//異步發(fā)送 不帶回調(diào)函數(shù)
kafkaProducer.send(new ProducerRecord<>("first", "kafka " + i));
}
//關閉資源
kafkaProducer.close();
}
三、回調(diào)函數(shù)
回調(diào)函數(shù)會在producer收到ack時調(diào)用,該方法有兩個參數(shù),分別是元數(shù)據(jù)信息(RecordMetadata)和異常信息(Exception),如果Exception為null,說明信息發(fā)送失敗注意:消息發(fā)送失敗會自動重試,不需要我們在回調(diào)函數(shù)中手動重試。
文章來源:http://www.zghlxwxcb.cn/news/detail-506942.html
public static void main(String[] args) throws InterruptedException, ExecutionException {
//1、創(chuàng)建kafka生產(chǎn)者的配置對象
Properties properties=new Properties();
//2、給kafka配置對象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");
//key,value序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//3、創(chuàng)建kafka生產(chǎn)者對象
KafkaProducer<String,String> kafkaProducer=new KafkaProducer<String, String>(properties);
//4.調(diào)用send發(fā)送消息
for (int i = 0; i < 100; i++) {
//異步發(fā)送帶回調(diào)函數(shù)
kafkaProducer.send(new ProducerRecord<>("first", "kafka " + i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e==null){
//沒用一次輸出,輸出信息到控制臺
System.out.println(String.format("主題:%s,分區(qū):%s",metadata.topic(),metadata.partition()));
}else {
e.printStackTrace();
}
}
});
//延遲一會,數(shù)據(jù)會發(fā)送到不同分區(qū),發(fā)送太快則可能會到同個分區(qū) 16K 0ms
Thread.sleep(2);
}
//關閉資源
kafkaProducer.close();
}
}
四、同步發(fā)送
只需在異步發(fā)送的基礎上,再調(diào)用一下 get()方法即可文章來源地址http://www.zghlxwxcb.cn/news/detail-506942.html
public static void main(String[] args) throws InterruptedException, ExecutionException {
//1、創(chuàng)建kafka生產(chǎn)者的配置對象
Properties properties=new Properties();
//2、給kafka配置對象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");
//key,value序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//3、創(chuàng)建kafka生產(chǎn)者對象
KafkaProducer<String,String> kafkaProducer=new KafkaProducer<String, String>(properties);
//4.調(diào)用send發(fā)送消息
for (int i = 0; i < 100; i++) {
//同步發(fā)送
kafkaProducer.send(new ProducerRecord<>("first", "kafka " + i)).get();
}
//關閉資源
kafkaProducer.close();
}
到了這里,關于kafka入門,生產(chǎn)者異步發(fā)送、回調(diào)函數(shù),同步發(fā)送(四)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!