国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

java使用kafka-clients集成0.10.0.0版本kafka(一)

這篇具有很好參考價值的文章主要介紹了java使用kafka-clients集成0.10.0.0版本kafka(一)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報違法"按鈕提交疑問。

一.版本兼容的問題

因?yàn)槟硞€功能需要對接的kafka是一個上古版本0.10.0.0,公司項(xiàng)目又是springcloud項(xiàng)目,導(dǎo)致版本兼容性的問題很頭大

1.kafka的版本號

kafka-clients,kafka,java,kafka,開發(fā)語言
下載的windows版kafka如:kafka_2.10-0.10.0.0
2.10標(biāo)識編譯kafka集群的scala版本號,kafka的服務(wù)端編碼語言為scala
0.10.0.0標(biāo)識kafka真正的版本號
kafka的版本號從1.0開始由四位版本號改為了三位,既類似0.9.0.0–>1.0.0。

2.java對接kafka一般有以下的方式

  • spring-cloud-stream/spring-cloud-stream-binder-kafka
    各個版本的官方文檔:spring-could-stream
    scs中也引入了 spring kafka,kafka client也有對應(yīng)關(guān)系在官網(wǎng)中可以看到
  • kafka-clients
  <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.0</version>
  </dependency>

第二這種會引入兩個依賴jar,不使用 scala api可以用第一種
kafka-clients-0.10.2.0.jar
kafka_2.11-0.10.2.0.jar

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.10.2.0</version>
        </dependency>

  • spring-kafka
    官方文檔:spring-kafka
    spring kafka的版本和spring-boot-starter-parent要匹配
    spring-kafka中引入了kafka-client的版本對照關(guān)系如下
    kafka-clients,kafka,java,kafka,開發(fā)語言
    kafka-clients,kafka,java,kafka,開發(fā)語言
    此處有個坑就是他強(qiáng)制要求springboot的版本和spring-kafka對應(yīng)
//https://blog.csdn.net/lzx1991610/article/details/100777040
 <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
 </dependency>

二.實(shí)現(xiàn)訂閱和發(fā)布消息代碼

Kafka消費(fèi)者通過groupId消費(fèi)指定topic的,
以groupId區(qū)分不同的消費(fèi)者,即不同的groupId消費(fèi)相同的topic,對于topic而言,就是不同的消費(fèi)者,
同時,消費(fèi)者需要記錄消費(fèi)到的offset,以便下次啟動時定位到具體的位置,消費(fèi)消息。
這里,配置的offset策略為:latest,即每次重啟消費(fèi)者時,從最新的offset開始消費(fèi)(上次記錄的offset之后的一個,如果上次消費(fèi)沒有記錄,則從當(dāng)前offset之后開始消費(fèi))。
offset的重置這樣理解: 當(dāng)前topic寫入數(shù)據(jù)有4條,offset從0到3,
如果,offset重設(shè)為earliest,則每次重啟消費(fèi)者,offset都會從0開始消費(fèi)數(shù)據(jù);
如果,offset重設(shè)為latest,則,每次消費(fèi)從上次消費(fèi)的offset下一個開始消費(fèi),如果上次消費(fèi)的offset為3,則,重啟后,
從4開始消費(fèi)數(shù)據(jù)。 原文鏈接:https://blog.csdn.net/Xin_101/article/details/126154171

參考博客: https://www.jianshu.com/p/1f9e18e926f6

public class KafkaUtil {

    final static String  url = "localhost:9092";

    public static void receiveBPMessage(){
        Properties props = new Properties();
        //183.240.87.230:9092為消息服務(wù)器開放的TCP端口
        props.put("bootstrap.servers", KafkaUtil.url);
        //0為消費(fèi)者所在的用戶組,同一個組對于消息的消費(fèi)只能有一次,不同組可以共同消費(fèi)同一條消息
        props.put("group.id", "0");
        //指定了消費(fèi)者是否自動提交偏移量,默認(rèn)值是 true,自動提交
        props.put("enable.auto.commit", "false");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        //server.keystore.jks證書所在路徑,以及密碼。由消息服務(wù)器頒發(fā)。
//        props.put("ssl.keystore.location","/root/securityCA/server.keystore.jks");
//        props.put("ssl.keystore.password", "123456");
//        props.put("security.protocol","SSL");
//        props.put("ssl.truststore.type", "JKS");
//        props.put("ssl.keystore.type", "JKS");
        //client.truststore.jks證書所在路徑,以及密碼。由消息服務(wù)器頒發(fā)。
//        props.put("ssl.truststore.location","/root/securityCA/client.truststore.jks");
//        props.put("ssl.truststore.password", "123456");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        //建立consumer連接
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        //訂閱主題
        consumer.subscribe(Collections.singletonList("test"));
        //消息輪詢是消費(fèi)者的核心,通過輪詢向服務(wù)器請求數(shù)據(jù)
        try {
            while (true) {
                //消費(fèi)消息
                ConsumerRecords<String, String> records = consumer.poll(500);
//                for (ConsumerRecord<String, String> record : records) {
//                    // 每條記錄都包含了記錄所屬主題的信息、記錄所在分區(qū)的信息、記錄在分區(qū)里的偏移量,以及記錄的鍵值對。
//                    System.out.println(String.format("topic=%s, partition=%s, offset=%d, customer=%s, country=%s",
//                            record.topic(), record.partition(), record.offset(),record.key(), record.value()));
//                }
                for (TopicPartition partition : records.partitions()) {
                    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                    for (ConsumerRecord<String, String> record : partitionRecords){
                        //對消息做簡單地打印操作
                        System.out.println(String.format("topic=%s, partition=%s, offset=%d, customer=%s, country=%s",
                                record.topic(), record.partition(), record.offset(),record.key(), record.value()));
                    }
                    long lastOffset=partitionRecords.get(partitionRecords.size() - 1).offset();
                    //提交消息消費(fèi)的offset
                    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
                }
            }
        } finally {
            // 關(guān)閉消費(fèi)者,網(wǎng)絡(luò)連接和 socket 也會隨之關(guān)閉,并立即觸發(fā)一次再均衡
            consumer.close();
        }

    }

    public static void sendBPMessage(JSONObject object){
        Properties producerProps = new Properties();
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,KafkaUtil.url);
        //server.keystore.jks證書所在路徑。由消息服務(wù)器頒發(fā)。
//        producerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,"/root/securityCA/server.keystore.jks");
//        //server.keystore.jks證書的密碼。由消息服務(wù)器提供。
//        producerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,"medstarMessageServer");
//        //client.truststore.jks證書所在路徑。由消息服務(wù)器頒發(fā)。
//        producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,"/root/securityCA/client.truststore.jks");
//        //client.truststore.jks證書的密碼。由消息服務(wù)器提供。
//        producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,"medstarMessageServer");
//        producerProps.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS");
//        producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        //根據(jù)配置文件創(chuàng)建生產(chǎn)者連接
        KafkaProducer producer = new KafkaProducer(producerProps);
        //發(fā)送消息,該實(shí)例中,為循環(huán)發(fā)送test數(shù)據(jù)100次,可以根據(jù)實(shí)際情況,遍歷列表中的數(shù)據(jù),拼接成規(guī)定的消息格式進(jìn)行發(fā)送,一般,同一個機(jī)構(gòu)的消息發(fā)送通道是固定的,通道會由消息服務(wù)器產(chǎn)生并分配給對應(yīng)機(jī)構(gòu)
        for (int i = 0; i < 10; i++) {
            //新建ProducerRecord類型的數(shù)據(jù),第一個參數(shù)為發(fā)送的通道,第二個參數(shù)為發(fā)送消息的內(nèi)容
            ProducerRecord<String,String> r = new ProducerRecord<String,String>("test","key-"+i,"中文-"+i);
            producer.send(r);
            System.err.println("發(fā)送消息");
        }
        //關(guān)閉消息服務(wù)器連接,可以在消息全部發(fā)送完畢的時候關(guān)閉連接
        producer.close();
    }


}

三.安裝windows版kafka進(jìn)行測試

參考博客: https://blog.csdn.net/marquis0/article/details/126525221
命令參考

//啟動內(nèi)置zk
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
//啟動kafka服務(wù)
.\bin\windows\kafka-server-start.bat .\config\server.properties
//創(chuàng)建一個名稱為test的topic 類似于數(shù)據(jù)庫的表  
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 -replication-factor 1 --partitions 1 --topic test
//創(chuàng)建一個生產(chǎn)者
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
//創(chuàng)建一個消費(fèi)者
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092  --topic test --from-beginning --zookeeper localhost:2181   
不同版本的kafka命令會不一樣 以下參考
舊版本
##創(chuàng)建topic
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic xxoo
 
#查看topic
./kafka-topics.sh --list --bootstrap-server localhost:9092
 
# topic 描述
./kafka-topics.sh --describe --zookeeper localhost:2181  --topic xxoo
 
# producer(控制臺向topic生產(chǎn)數(shù)據(jù))
./kafka-console-producer.sh --broker-list localhost:9092 --topic xxoo
>this is a message
>this is another message
 
##consumer(控制臺消費(fèi)topic的數(shù)據(jù)2)
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic xxoo --from-beginning
this is a message
this is another message
 
## 查看某一個topic對應(yīng)的消息數(shù)量
./kafka-run-class.sh  kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic xxoo --time -1
 
 
 
 
 
 
## 新版本的消費(fèi)者組名和它要消費(fèi)的那個topic的offset信息就會被記錄在broker服務(wù)器上,老版本存在zookeeper上
 
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
 
./kafka-consumer-groups.sh --bootstrap-server  kafka01.qq.cn:9092,kafka02.qq.cn:9092,kafka03.qq.cn:9092 --list
 
 
##刪除消費(fèi)組
./kafka-consumer-groups.sh --bootstrap-server 192.168.100.11:9092 --delete --group py-test
 
##查看消費(fèi)組的的列表
./kafka-consumer-groups.sh  --list  --bootstrap-server 192.168.100.11:9092
或者
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 xxoo --list
 
## 查看特定消費(fèi)組的情況
./kafka-consumer-groups.sh --bootstrap-server 192.168.100.11:9092 --group py-test --describe
-- 舊版本Kafka命令行參數(shù)(kafka_scala2.11-2.0.0 為例)
# 查看topic
./kafka-topics.sh --list --zookeeper localhost:2181
 
## topic描述
./kafka-topics.sh --describe --zookeeper localhost:2181  --topic xxoo
 
## 創(chuàng)建topic
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic xxoo
 
# topic 查看信息
# /usr/local/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic xx
 
# 分區(qū)擴(kuò)展
# /usr/local/kafka/bin/kafka-topics.sh --alter  --topic xx --zookeeper localhost:2181 --partitions 24
 
## consumer(控制臺消費(fèi)topic的數(shù)據(jù)2)
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic xxoo --from-beginning
 
# 指定消費(fèi)組消費(fèi)
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic xxoo --group xx-group
 
### 生產(chǎn)數(shù)據(jù)
./kafka-console-producer.sh --broker-list localhost:9092 --topic xxoo
 
## 查看某一個topic對應(yīng)的消息數(shù)
./kafka-run-class.sh  kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic xxoo --time -1
 
## delete topic
./kafka-topics --delete --zookeeper localhost:2181 --topic javadaemon 
 
 
# 查看消費(fèi)組列表
./kafka-consumer-groups.sh --list  --bootstrap-server localhost:9092
./kafka-consumer-groups.sh --list  --bootstrap-server kafka01.car.cn:9092
 
# 查看指定消費(fèi)組以及連接的ip地址
./kafka-consumer-groups.sh --bootstrap-server 192.168.0.2:9092 --describe --group  vmsOperationLogGroup|grep vms-road_fee
 
 
##  查看指定消費(fèi)組的堆積情況
./kafka-consumer-groups.sh  --bootstrap-server kafka01.car.cn:9092 --describe --group knight_group
 
## 查看指定分區(qū)的信息
# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper IP:2181  --topic test
 
清理openapi-AccessLog-Rest指定保留2天
# /usr/local/kafka/bin/kafka-configs.sh --zookeeper IP:2181 --entity-type topics --entity-name test --alter --add-config  retention.ms=172800000

測試在生產(chǎn)者命令窗口發(fā)布消息,發(fā)現(xiàn)消費(fèi)者命令窗口打印顯示,并且項(xiàng)目main方法調(diào)用執(zhí)行消費(fèi)者后,也會收到消息
測試使用java接口發(fā)布消息,kafka客戶端也能接受到消息
kafka-clients,kafka,java,kafka,開發(fā)語言
kafka-clients,kafka,java,kafka,開發(fā)語言文章來源地址http://www.zghlxwxcb.cn/news/detail-729583.html

到了這里,關(guān)于java使用kafka-clients集成0.10.0.0版本kafka(一)的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 在Spring Boot微服務(wù)集成Kafka客戶端(kafka-clients)操作Kafka

    記錄 :459 場景 :在Spring Boot微服務(wù)集成Kafka客戶端kafka-clients-3.0.0操作Kafka。使用kafka-clients的原生KafkaProducer操作Kafka生產(chǎn)者Producer。使用kafka-clients的原生KafkaConsumer操作Kafka的消費(fèi)者Consumer。 版本 :JDK 1.8,Spring?Boot 2.6.3,kafka_2.12-2.8.0,kafka-clients-3.0.0。 Kafka安裝 :https://blog.csdn.ne

    2024年02月12日
    瀏覽(91)
  • SpringBoot集成Elasticsearch8.x(6)|(新版本Java API Client使用)

    章節(jié) 章節(jié) 第一章鏈接: SpringBoot集成Elasticsearch7.x(1)|(增刪改查功能實(shí)現(xiàn)) 第二章鏈接: SpringBoot集成Elasticsearch7.x(2)|(復(fù)雜查詢) 第三章鏈接: SpringBoot集成Elasticsearch7.x(3)|(aggregations之指標(biāo)聚合查詢) 第四章鏈接: SpringBoot集成Elasticsearch7.x(4)|(aggregations之分桶聚合

    2024年02月08日
    瀏覽(29)
  • # SpringBoot集成Elasticsearch8.5.x(5)|( 新版本Java API Client使用)

    章節(jié) 章節(jié) 第一章鏈接: SpringBoot集成Elasticsearch7.x(1)|(增刪改查功能實(shí)現(xiàn)) 第二章鏈接: SpringBoot集成Elasticsearch7.x(2)|(復(fù)雜查詢) 第三章鏈接: SpringBoot集成Elasticsearch7.x(3)|(aggregations之指標(biāo)聚合查詢) 第四章鏈接: SpringBoot集成Elasticsearch7.x(4)|(aggregations之分桶聚合

    2023年04月13日
    瀏覽(25)
  • SpringBoot集成Elasticsearch8.x(7)|(新版本Java API Client使用完整示例)

    章節(jié) 第一章鏈接: SpringBoot集成Elasticsearch7.x(1)|(增刪改查功能實(shí)現(xiàn)) 第二章鏈接: SpringBoot集成Elasticsearch7.x(2)|(復(fù)雜查詢) 第三章鏈接: SpringBoot集成Elasticsearch7.x(3)|(aggregations之指標(biāo)聚合查詢) 第四章鏈接: SpringBoot集成Elasticsearch7.x(4)|(aggregations之分桶聚合查詢)

    2024年02月16日
    瀏覽(28)
  • 通過Java client訪問Kafka

    1. Install Kafka 1) download kafka binary from https://kafka.apache.org/downloads 2) extract binary 2. Start Kafka 1) start zookeeper in daemon mode 2) start kafka server in daemon mode 3. Test Kafka 1) create a topic 2) producer events 3) consumer events 4. Access Kafka from Java client 1) download kafka client binary from https://jar-download.com/artifacts

    2024年02月05日
    瀏覽(24)
  • SpringBoot集成Kafka版本不兼容導(dǎo)致出現(xiàn)錯誤

    1、系統(tǒng)報錯 2、排查與解決 出錯原因:springboot集成spring-kafka的時候需要注意兩者之間的版本對應(yīng)關(guān)系,因?yàn)榘姹静患嫒輰?dǎo)致出現(xiàn)錯誤 解決:kafka-clients : 是springboot集成的spring-kafka,spring-kafka中引入了kafka-client的版本 參考:https://spring.io/projects/spring-kafka 參考:https://stackover

    2024年02月14日
    瀏覽(20)
  • Java與es8實(shí)戰(zhàn)之二:Springboot集成es8的Java Client

    配置springboot的application.yml 配置es的自簽證書 執(zhí)行如下命令將es容器中的crt文件復(fù)制到本地 docker cp 容器名稱:/usr/share/elasticsearch/config/certs/http_ca.crt . 將crt文件放至springboot項(xiàng)目的resource路徑下

    2024年02月12日
    瀏覽(21)
  • kafka:java集成 kafka(springboot集成、客戶端集成)

    kafka:java集成 kafka(springboot集成、客戶端集成)

    摘要 對于java的kafka集成,一般選用springboot集成kafka,但可能由于對接方kafka老舊、kafka不安全等問題導(dǎo)致kafak版本與spring版本不兼容,這個時候就得自己根據(jù)kafka客戶端api集成了。 一、springboot集成kafka 具體官方文檔地址:https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/

    2023年04月22日
    瀏覽(94)
  • 使用flink的sql-client.sh,測試mysql-->kafka-->kafka-->mysql實(shí)時流

    使用flink的sql-client.sh,測試mysql-->kafka-->kafka-->mysql實(shí)時流

    目錄 1. 環(huán)境介紹 2. mysql建表 3. flinksql建表 3.1 進(jìn)入flinksql客戶端? ?3.2 配置輸出格式 ?3.3 flink建表 3.4 任務(wù)流配置 4. 測試 4.1 插入測試數(shù)據(jù) 4.2 查看結(jié)果表數(shù)據(jù)? 4.3 新增測試數(shù)據(jù) 4.4 再次查看結(jié)果表數(shù)據(jù) 服務(wù) 版本 zookeeper 3.8.0 kafka 3.3.1 flink 1.13.5 mysql 5.7.34 jdk 1.8 scala 2.12 連接器

    2024年02月11日
    瀏覽(24)
  • 【Java】IDE集成開發(fā)環(huán)境工具IntelliJ安裝和使用

    【Java】IDE集成開發(fā)環(huán)境工具IntelliJ安裝和使用

    歡迎來到《小5講堂》 大家好,我是全棧小5。 這是《Java》序列文章,每篇文章將以博主理解的角度展開講解, 特別是針對知識點(diǎn)的概念進(jìn)行敘說,大部分文章將會對這些概念進(jìn)行實(shí)際例子驗(yàn)證,以此達(dá)到加深對知識點(diǎn)的理解和掌握。 溫馨提示:博主能力有限,理解水平有限

    2024年01月18日
    瀏覽(30)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包