一.版本兼容的問題
因?yàn)槟硞€功能需要對接的kafka是一個上古版本0.10.0.0,公司項(xiàng)目又是springcloud項(xiàng)目,導(dǎo)致版本兼容性的問題很頭大
1.kafka的版本號
下載的windows版kafka如:kafka_2.10-0.10.0.0
2.10標(biāo)識編譯kafka集群的scala版本號,kafka的服務(wù)端編碼語言為scala
0.10.0.0標(biāo)識kafka真正的版本號
kafka的版本號從1.0開始由四位版本號改為了三位,既類似0.9.0.0–>1.0.0。
2.java對接kafka一般有以下的方式
-
spring-cloud-stream/spring-cloud-stream-binder-kafka
各個版本的官方文檔:spring-could-stream
scs中也引入了 spring kafka,kafka client也有對應(yīng)關(guān)系在官網(wǎng)中可以看到 - kafka-clients
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
第二這種會引入兩個依賴jar,不使用 scala api可以用第一種
kafka-clients-0.10.2.0.jar
kafka_2.11-0.10.2.0.jar
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.2.0</version>
</dependency>
-
spring-kafka
官方文檔:spring-kafka
spring kafka的版本和spring-boot-starter-parent要匹配
spring-kafka中引入了kafka-client的版本對照關(guān)系如下
此處有個坑就是他強(qiáng)制要求springboot的版本和spring-kafka對應(yīng)
//https://blog.csdn.net/lzx1991610/article/details/100777040
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
二.實(shí)現(xiàn)訂閱和發(fā)布消息代碼
Kafka消費(fèi)者通過groupId消費(fèi)指定topic的,
以groupId區(qū)分不同的消費(fèi)者,即不同的groupId消費(fèi)相同的topic,對于topic而言,就是不同的消費(fèi)者,
同時,消費(fèi)者需要記錄消費(fèi)到的offset,以便下次啟動時定位到具體的位置,消費(fèi)消息。
這里,配置的offset策略為:latest,即每次重啟消費(fèi)者時,從最新的offset開始消費(fèi)(上次記錄的offset之后的一個,如果上次消費(fèi)沒有記錄,則從當(dāng)前offset之后開始消費(fèi))。
offset的重置這樣理解: 當(dāng)前topic寫入數(shù)據(jù)有4條,offset從0到3,
如果,offset重設(shè)為earliest,則每次重啟消費(fèi)者,offset都會從0開始消費(fèi)數(shù)據(jù);
如果,offset重設(shè)為latest,則,每次消費(fèi)從上次消費(fèi)的offset下一個開始消費(fèi),如果上次消費(fèi)的offset為3,則,重啟后,
從4開始消費(fèi)數(shù)據(jù)。 原文鏈接:https://blog.csdn.net/Xin_101/article/details/126154171
參考博客: https://www.jianshu.com/p/1f9e18e926f6
public class KafkaUtil {
final static String url = "localhost:9092";
public static void receiveBPMessage(){
Properties props = new Properties();
//183.240.87.230:9092為消息服務(wù)器開放的TCP端口
props.put("bootstrap.servers", KafkaUtil.url);
//0為消費(fèi)者所在的用戶組,同一個組對于消息的消費(fèi)只能有一次,不同組可以共同消費(fèi)同一條消息
props.put("group.id", "0");
//指定了消費(fèi)者是否自動提交偏移量,默認(rèn)值是 true,自動提交
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
//server.keystore.jks證書所在路徑,以及密碼。由消息服務(wù)器頒發(fā)。
// props.put("ssl.keystore.location","/root/securityCA/server.keystore.jks");
// props.put("ssl.keystore.password", "123456");
// props.put("security.protocol","SSL");
// props.put("ssl.truststore.type", "JKS");
// props.put("ssl.keystore.type", "JKS");
//client.truststore.jks證書所在路徑,以及密碼。由消息服務(wù)器頒發(fā)。
// props.put("ssl.truststore.location","/root/securityCA/client.truststore.jks");
// props.put("ssl.truststore.password", "123456");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
//建立consumer連接
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//訂閱主題
consumer.subscribe(Collections.singletonList("test"));
//消息輪詢是消費(fèi)者的核心,通過輪詢向服務(wù)器請求數(shù)據(jù)
try {
while (true) {
//消費(fèi)消息
ConsumerRecords<String, String> records = consumer.poll(500);
// for (ConsumerRecord<String, String> record : records) {
// // 每條記錄都包含了記錄所屬主題的信息、記錄所在分區(qū)的信息、記錄在分區(qū)里的偏移量,以及記錄的鍵值對。
// System.out.println(String.format("topic=%s, partition=%s, offset=%d, customer=%s, country=%s",
// record.topic(), record.partition(), record.offset(),record.key(), record.value()));
// }
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords){
//對消息做簡單地打印操作
System.out.println(String.format("topic=%s, partition=%s, offset=%d, customer=%s, country=%s",
record.topic(), record.partition(), record.offset(),record.key(), record.value()));
}
long lastOffset=partitionRecords.get(partitionRecords.size() - 1).offset();
//提交消息消費(fèi)的offset
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally {
// 關(guān)閉消費(fèi)者,網(wǎng)絡(luò)連接和 socket 也會隨之關(guān)閉,并立即觸發(fā)一次再均衡
consumer.close();
}
}
public static void sendBPMessage(JSONObject object){
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,KafkaUtil.url);
//server.keystore.jks證書所在路徑。由消息服務(wù)器頒發(fā)。
// producerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,"/root/securityCA/server.keystore.jks");
// //server.keystore.jks證書的密碼。由消息服務(wù)器提供。
// producerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,"medstarMessageServer");
// //client.truststore.jks證書所在路徑。由消息服務(wù)器頒發(fā)。
// producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,"/root/securityCA/client.truststore.jks");
// //client.truststore.jks證書的密碼。由消息服務(wù)器提供。
// producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,"medstarMessageServer");
// producerProps.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS");
// producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//根據(jù)配置文件創(chuàng)建生產(chǎn)者連接
KafkaProducer producer = new KafkaProducer(producerProps);
//發(fā)送消息,該實(shí)例中,為循環(huán)發(fā)送test數(shù)據(jù)100次,可以根據(jù)實(shí)際情況,遍歷列表中的數(shù)據(jù),拼接成規(guī)定的消息格式進(jìn)行發(fā)送,一般,同一個機(jī)構(gòu)的消息發(fā)送通道是固定的,通道會由消息服務(wù)器產(chǎn)生并分配給對應(yīng)機(jī)構(gòu)
for (int i = 0; i < 10; i++) {
//新建ProducerRecord類型的數(shù)據(jù),第一個參數(shù)為發(fā)送的通道,第二個參數(shù)為發(fā)送消息的內(nèi)容
ProducerRecord<String,String> r = new ProducerRecord<String,String>("test","key-"+i,"中文-"+i);
producer.send(r);
System.err.println("發(fā)送消息");
}
//關(guān)閉消息服務(wù)器連接,可以在消息全部發(fā)送完畢的時候關(guān)閉連接
producer.close();
}
}
三.安裝windows版kafka進(jìn)行測試
參考博客: https://blog.csdn.net/marquis0/article/details/126525221
命令參考文章來源:http://www.zghlxwxcb.cn/news/detail-729583.html
//啟動內(nèi)置zk
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
//啟動kafka服務(wù)
.\bin\windows\kafka-server-start.bat .\config\server.properties
//創(chuàng)建一個名稱為test的topic 類似于數(shù)據(jù)庫的表
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 -replication-factor 1 --partitions 1 --topic test
//創(chuàng)建一個生產(chǎn)者
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
//創(chuàng)建一個消費(fèi)者
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning --zookeeper localhost:2181
不同版本的kafka命令會不一樣 以下參考
舊版本
##創(chuàng)建topic
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic xxoo
#查看topic
./kafka-topics.sh --list --bootstrap-server localhost:9092
# topic 描述
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic xxoo
# producer(控制臺向topic生產(chǎn)數(shù)據(jù))
./kafka-console-producer.sh --broker-list localhost:9092 --topic xxoo
>this is a message
>this is another message
##consumer(控制臺消費(fèi)topic的數(shù)據(jù)2)
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic xxoo --from-beginning
this is a message
this is another message
## 查看某一個topic對應(yīng)的消息數(shù)量
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic xxoo --time -1
## 新版本的消費(fèi)者組名和它要消費(fèi)的那個topic的offset信息就會被記錄在broker服務(wù)器上,老版本存在zookeeper上
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
./kafka-consumer-groups.sh --bootstrap-server kafka01.qq.cn:9092,kafka02.qq.cn:9092,kafka03.qq.cn:9092 --list
##刪除消費(fèi)組
./kafka-consumer-groups.sh --bootstrap-server 192.168.100.11:9092 --delete --group py-test
##查看消費(fèi)組的的列表
./kafka-consumer-groups.sh --list --bootstrap-server 192.168.100.11:9092
或者
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 xxoo --list
## 查看特定消費(fèi)組的情況
./kafka-consumer-groups.sh --bootstrap-server 192.168.100.11:9092 --group py-test --describe
-- 舊版本Kafka命令行參數(shù)(kafka_scala2.11-2.0.0 為例)
# 查看topic
./kafka-topics.sh --list --zookeeper localhost:2181
## topic描述
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic xxoo
## 創(chuàng)建topic
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic xxoo
# topic 查看信息
# /usr/local/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic xx
# 分區(qū)擴(kuò)展
# /usr/local/kafka/bin/kafka-topics.sh --alter --topic xx --zookeeper localhost:2181 --partitions 24
## consumer(控制臺消費(fèi)topic的數(shù)據(jù)2)
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic xxoo --from-beginning
# 指定消費(fèi)組消費(fèi)
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic xxoo --group xx-group
### 生產(chǎn)數(shù)據(jù)
./kafka-console-producer.sh --broker-list localhost:9092 --topic xxoo
## 查看某一個topic對應(yīng)的消息數(shù)
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic xxoo --time -1
## delete topic
./kafka-topics --delete --zookeeper localhost:2181 --topic javadaemon
# 查看消費(fèi)組列表
./kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
./kafka-consumer-groups.sh --list --bootstrap-server kafka01.car.cn:9092
# 查看指定消費(fèi)組以及連接的ip地址
./kafka-consumer-groups.sh --bootstrap-server 192.168.0.2:9092 --describe --group vmsOperationLogGroup|grep vms-road_fee
## 查看指定消費(fèi)組的堆積情況
./kafka-consumer-groups.sh --bootstrap-server kafka01.car.cn:9092 --describe --group knight_group
## 查看指定分區(qū)的信息
# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper IP:2181 --topic test
清理openapi-AccessLog-Rest指定保留2天
# /usr/local/kafka/bin/kafka-configs.sh --zookeeper IP:2181 --entity-type topics --entity-name test --alter --add-config retention.ms=172800000
測試在生產(chǎn)者命令窗口發(fā)布消息,發(fā)現(xiàn)消費(fèi)者命令窗口打印顯示,并且項(xiàng)目main方法調(diào)用執(zhí)行消費(fèi)者后,也會收到消息
測試使用java接口發(fā)布消息,kafka客戶端也能接受到消息文章來源地址http://www.zghlxwxcb.cn/news/detail-729583.html
到了這里,關(guān)于java使用kafka-clients集成0.10.0.0版本kafka(一)的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!