一、添加依賴
<!-- kafka-clients-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.5.1</version>
</dependency>
二、生產(chǎn)者
自定義分區(qū),可忽略文章來源地址http://www.zghlxwxcb.cn/news/detail-615966.html
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class MyPatitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
String msgStr = value.toString();
if(msgStr.contains("a")){
return 1;
}
return 0;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
1、普通消息
public static void main(String[] args) throws ExecutionException, InterruptedException {
//配置
Properties properties = new Properties();
//連接參數(shù)
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.157.130:9092");
//序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//關(guān)聯(lián)自定義分區(qū)器 可選
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.minos.kafka.producer.MyPatitioner");
//優(yōu)化參數(shù) 可選
//緩沖器大小 32M
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 30 * 1024 * 1024);
//批次大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024);
//Linger.ms
properties.put(ProducerConfig.LINGER_MS_CONFIG, 5);
//壓縮
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
//acks
properties.put(ProducerConfig.ACKS_CONFIG, "-1");
//重試次數(shù)
properties.put(ProducerConfig.RETRIES_CONFIG, 3);
//創(chuàng)建生產(chǎn)者
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
//異步發(fā)送數(shù)據(jù)
for (int i = 0; i < 10; i++) {
//給first主題發(fā)消息
kafkaProducer.send(new ProducerRecord<String, String>("first", "hello" + i));
//回調(diào)異步發(fā)送
kafkaProducer.send(new ProducerRecord<String, String>("first", "hello2" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
System.out.println("主題:" + recordMetadata.topic() + "分區(qū):" + recordMetadata.partition());
}
}
});
kafkaProducer.send(new ProducerRecord<String, String>("first", "a" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
System.out.println("主題:" + recordMetadata.topic() + "分區(qū)" + recordMetadata.partition() + "a");
}
}
});
Thread.sleep(500);
}
//同步
for (int i = 0; i < 10; i++) {
//給first主題發(fā)消息
kafkaProducer.send(new ProducerRecord<String, String>("first", "sync_hello" + i)).get();
}
//關(guān)閉資源
kafkaProducer.close();
}
root@ubuntu2203:/usr/local/kafka_2.12-3.5.1/bin# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
a0
hello0
hello20
a1
hello1
hello21
a2
hello2
hello22
a3
hello3
hello23
a4
hello4
hello24
a5
hello5
hello25
a6
hello6
hello26
a7
hello7
hello27
a8
hello8
hello28
a9
hello9
hello29
sync_hello0
sync_hello1
sync_hello2
sync_hello3
sync_hello4
sync_hello5
sync_hello6
sync_hello7
sync_hello8
sync_hello9
2、事務(wù)消息
public static void main(String[] args) throws ExecutionException, InterruptedException {
//配置
Properties properties = new Properties();
//連接參數(shù)
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.157.130:9092");
//序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//關(guān)聯(lián)自定義分區(qū)器 可選
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.minos.kafka.producer.MyPatitioner");
//優(yōu)化參數(shù) 可選
//緩沖器大小 32M
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 30 * 1024 * 1024);
//批次大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024);
//Linger.ms
properties.put(ProducerConfig.LINGER_MS_CONFIG, 5);
//壓縮
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
//acks
properties.put(ProducerConfig.ACKS_CONFIG, "-1");
//重試次數(shù)
properties.put(ProducerConfig.RETRIES_CONFIG, 3);
//指定事務(wù)ID
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transactional_id_01");
properties.put("enable.idempotence", "true");
//創(chuàng)建生產(chǎn)者
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
//事務(wù)消息 初始化
kafkaProducer.initTransactions();
//開始事務(wù)
kafkaProducer.beginTransaction();
try {
kafkaProducer.send(new ProducerRecord<String, String>("first", "Transactions")).get();
//提交事務(wù)
kafkaProducer.commitTransaction();
} catch (Exception e) {
//終止事務(wù)
kafkaProducer.abortTransaction();
} finally {
//關(guān)閉資源
kafkaProducer.close();
}
}
root@ubuntu2203:/usr/local/kafka_2.12-3.5.1/bin# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
Transactions
文章來源:http://www.zghlxwxcb.cn/news/detail-615966.html
到了這里,關(guān)于使用kafka-clients操作數(shù)據(jù)(java)的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!