国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

通過Java client訪問Kafka

這篇具有很好參考價值的文章主要介紹了通過Java client訪問Kafka。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

1. Install Kafka
1) download kafka binary from https://kafka.apache.org/downloads
2) extract binary

$ tar xvf kafka_2.13-3.3.1.tgz -C ~/bigdata/

2. Start Kafka
1) start zookeeper in daemon mode

$ cd ~/bigdata/kafka_2.13-3.3.1
$ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
$ netstat -lnpt | grep -i TCP | grep `jps | grep -w QuorumPeerMain | awk '{print $1}'` | grep "LISTEN"
tcp6       0      0 :::45835                :::*                    LISTEN      568684/java         
tcp6       0      0 :::2181                 :::*                    LISTEN      568684/java

2) start kafka server in daemon mode

$ bin/kafka-server-start.sh -daemon config/server.properties
$ netstat -lnpt | grep -i TCP | grep `jps | grep -w Kafka | awk '{print $1}'` | grep "LISTEN"
tcp6       0      0 :::33011                :::*                    LISTEN      569177/java         
tcp6       0      0 :::9092                 :::*                    LISTEN      569177/java

3. Test Kafka
1) create a topic

$ bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
Created topic test.
$ bin/kafka-topics.sh --describe --topic test --bootstrap-server localhost:9092
Topic: test	TopicId: oLdPl33IR7KZFGmrURMKFw	PartitionCount: 1	ReplicationFactor: 1	Configs: 
	Topic: test	Partition: 0	Leader: 0	Replicas: 0	Isr: 0

2) producer events

$ bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
>hello world
>good morning
>cheer
>...
input ^C to break

3) consumer events

$ bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092 
hello world
good morning
cheer
...
input ^C to break

4. Access Kafka from Java client
1) download kafka client binary from https://jar-download.com/artifacts/org.apache.kafka/kafka-clients/3.3.1

$ tar xvf kafka-client-3.3.1.tgz -C ~/learn/java/java8/lib/

2) Write the kafka Java client

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class Kafka {
    static class KfkProducer {
        Producer<String, String> producer;

        KfkProducer(String host, int port) {
            Properties props = new Properties();
            props.put("bootstrap.servers", String.format("%s:%d", host, port));
//            props.put("linger.ms", 1);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            producer = new KafkaProducer<>(props);
        }

        void close() {
            producer.close();
        }

        RecordMetadata send(String topic, String key, String value) {
            Future<RecordMetadata> result = producer.send(new ProducerRecord<>(topic, key, value));
            RecordMetadata meta = null;
            try {
                meta = result.get();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
            return meta;
        }
    }

    static class KfkConsumer {
        KafkaConsumer<String, String> consumer;

        KfkConsumer(String host, int port, List<String> topics) {
            Properties props = new Properties();
            props.put("bootstrap.servers", String.format("%s:%d", host, port));
            props.put("group.id", "group01");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            consumer = new KafkaConsumer<>(props);
            consumer.subscribe(topics);
        }

        void close() {
            consumer.close();
        }

        List<List<Object>> poll(int num) {
            List<List<Object>> result = new ArrayList<>();
            while (result.size() < num) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    result.add(Arrays.asList(record.offset(), record.key(), record.value()));
                }
            }
            return result;
        }
    }

    public static void main(String... argv) {
        KfkProducer producer = new KfkProducer("localhost", 9092);
        for (int i = 0; i < 5; i ++) {
            System.out.println(producer.send("test", "" + (i % 5), Integer.toString(i)));
        }
        producer.close();

        KfkConsumer consumer = new KfkConsumer("localhost", 9092, Arrays.asList("test"));
        List<List<Object>> records = consumer.poll(5);
        for (List<Object> record: records) {
            System.out.printf("offset = %d, key = %s, value = %s%n",
                    (long)(record.get(0)), record.get(1), record.get(2));
        }
        consumer.close();
    }
}

3) compile the client

$ javac -cp "lib/kafka-client-3.3.1/*" Kafka.java

4) run the client

$ java -cp "lib/kafka-client-3.3.1/*:." Kafka
test-0@8
test-0@9
test-0@10
test-0@11
test-0@12
offset = 8, key = 0, value = 0
offset = 9, key = 1, value = 1
offset = 10, key = 2, value = 2
offset = 11, key = 3, value = 3
offset = 12, key = 4, value = 4

Note: following scripts can be used to stop servers and clean all created events as need

$ bin/kafka-server-stop.sh
$ bin/zookeeper-server-stop.sh
$ rm -rf /tmp/kafka-logs /tmp/zookeeper /tmp/kraft-combined-logs

Reference: https://kafka.apache.org/quickstart文章來源地址http://www.zghlxwxcb.cn/news/detail-447647.html

到了這里,關(guān)于通過Java client訪問Kafka的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 聊聊kafka client性能調(diào)優(yōu)及kafka最佳實踐

    聊聊kafka client性能調(diào)優(yōu)及kafka最佳實踐

    這里是 weihubeats ,覺得文章不錯可以關(guān)注公眾號 小奏技術(shù) ,文章首發(fā)。拒絕營銷號,拒絕標題黨 最近在使用 kafka 的時候遇到了一些性能問題。 所以就打算研究下 kafka 相關(guān)的性能優(yōu)化方案。 client 主要分兩個 producer consumer producer 主要是有兩個核心參數(shù) batch.size linger.ms batch.s

    2024年02月03日
    瀏覽(19)
  • kerberos認證Flink的kafka connector和kafka client配置

    kerberos認證Flink的kafka connector和kafka client配置

    1. kafka配置文件 kafka jaas必須配置,如果缺少,則報一下錯誤。 對于Flink只能通過配置 java.security.auth.login.config 的方式。 jaas配置 1.1 方式一: System.setProperty配置系統(tǒng)變量: kafka_client_jaas_keytab.conf文件內(nèi)容如下: 1.2 方法二:在IDEA中添加jvm參數(shù): 注意:將參數(shù)添加至kafka 的pr

    2024年02月04日
    瀏覽(24)
  • 在Spring Boot微服務(wù)集成Kafka客戶端(kafka-clients)操作Kafka

    記錄 :459 場景 :在Spring Boot微服務(wù)集成Kafka客戶端kafka-clients-3.0.0操作Kafka。使用kafka-clients的原生KafkaProducer操作Kafka生產(chǎn)者Producer。使用kafka-clients的原生KafkaConsumer操作Kafka的消費者Consumer。 版本 :JDK 1.8,Spring?Boot 2.6.3,kafka_2.12-2.8.0,kafka-clients-3.0.0。 Kafka安裝 :https://blog.csdn.ne

    2024年02月12日
    瀏覽(91)
  • kafka client for go

    關(guān)于 go 的 kafka client 有很多開源項目,例如 sarama: 具有完整協(xié)議支持的純 Go 實現(xiàn)。包括消費者和生產(chǎn)者實施,支持 GZIP 和 Snappy 壓縮。 confluent-kafka-go: Confluent 的 Golang Kafka 客戶端包裝了 librdkafka C 庫,提供完整的 Kafka 協(xié)議支持,具有出色的性能和可靠性。提供了高級生產(chǎn)者和

    2024年02月03日
    瀏覽(18)
  • 在Spring Boot微服務(wù)集成kafka-clients操作Kafka集群

    記錄 :463 場景 :在Spring Boot微服務(wù)集成kafka-clients-3.0.0操作Kafka集群。使用kafka-clients的原生KafkaProducer操作Kafka集群生產(chǎn)者Producer。使用kafka-clients的原生KafkaConsumer操作Kafka集群的消費者Consumer。 版本 :JDK 1.8,Spring?Boot 2.6.3,kafka_2.12-2.8.0,kafka-clients-3.0.0。 Kafka集群安裝 :https://bl

    2024年02月09日
    瀏覽(94)
  • Kafka傳輸數(shù)據(jù)到Spark Streaming通過編寫程序java、scala程序?qū)崿F(xiàn)操作

    Kafka傳輸數(shù)據(jù)到Spark Streaming通過編寫程序java、scala程序?qū)崿F(xiàn)操作

    現(xiàn)有一電商網(wǎng)站數(shù)據(jù)文件,名為buyer_favorite1,記錄了用戶對商品的收藏數(shù)據(jù),數(shù)據(jù)以“t”鍵分割,數(shù)據(jù)內(nèi)容及數(shù)據(jù)格式如下: 項目環(huán)境說明 開啟hadoop集群,zookeeper服務(wù),開啟kafka服務(wù)。再另開啟一個窗口,在/apps/kafka/bin目錄下創(chuàng)建一個topic。 1、新創(chuàng)一個文件folder命名為li

    2024年02月13日
    瀏覽(23)
  • 使用flink的sql-client.sh,測試mysql-->kafka-->kafka-->mysql實時流

    使用flink的sql-client.sh,測試mysql-->kafka-->kafka-->mysql實時流

    目錄 1. 環(huán)境介紹 2. mysql建表 3. flinksql建表 3.1 進入flinksql客戶端? ?3.2 配置輸出格式 ?3.3 flink建表 3.4 任務(wù)流配置 4. 測試 4.1 插入測試數(shù)據(jù) 4.2 查看結(jié)果表數(shù)據(jù)? 4.3 新增測試數(shù)據(jù) 4.4 再次查看結(jié)果表數(shù)據(jù) 服務(wù) 版本 zookeeper 3.8.0 kafka 3.3.1 flink 1.13.5 mysql 5.7.34 jdk 1.8 scala 2.12 連接器

    2024年02月11日
    瀏覽(24)
  • Ubuntu22.04 install Kafka

    kafka quickstart? install kafka

    2024年02月09日
    瀏覽(25)
  • Failed to construct kafka producer at org.apache.kafka.clients.producer.KafkaProducer

    報錯信息如下: 在網(wǎng)上找了很久的解決方案,也沒找到個所以然,可能是我能力不足沒理解到,后來我嘗試clean下項目,竟然報錯了 提示我pom.xml中有錯誤,我看了看,唯一有可能的是新導(dǎo)入的一個依賴去掉了版本號,我加上版本號后又重新clean下,成功了,, 然后,我重啟

    2024年02月05日
    瀏覽(18)
  • kafka消費報錯, org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since

    問題: 在有大量消息需要消費時,消費端出現(xiàn)報錯:org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which t

    2024年03月23日
    瀏覽(21)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包