国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

使用kafka-clients操作數(shù)據(jù)(java)

這篇具有很好參考價(jià)值的文章主要介紹了使用kafka-clients操作數(shù)據(jù)(java)。希望對大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

一、添加依賴

     <!--    kafka-clients-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.5.1</version>
        </dependency>

二、生產(chǎn)者

自定義分區(qū),可忽略文章來源地址http://www.zghlxwxcb.cn/news/detail-615966.html

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class MyPatitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        String msgStr = value.toString();
        if(msgStr.contains("a")){
            return 1;
        }
        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

1、普通消息

 public static void main(String[] args) throws ExecutionException, InterruptedException {
        //配置
        Properties properties = new Properties();
        //連接參數(shù)
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.157.130:9092");
        //序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //關(guān)聯(lián)自定義分區(qū)器 可選
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.minos.kafka.producer.MyPatitioner");

        //優(yōu)化參數(shù) 可選
        //緩沖器大小 32M
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 30 * 1024 * 1024);
        //批次大小
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024);
        //Linger.ms
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 5);
        //壓縮
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        //acks
        properties.put(ProducerConfig.ACKS_CONFIG, "-1");
        //重試次數(shù)
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);


        //創(chuàng)建生產(chǎn)者
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        //異步發(fā)送數(shù)據(jù)
        for (int i = 0; i < 10; i++) {
            //給first主題發(fā)消息
            kafkaProducer.send(new ProducerRecord<String, String>("first", "hello" + i));
            //回調(diào)異步發(fā)送
            kafkaProducer.send(new ProducerRecord<String, String>("first", "hello2" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        System.out.println("主題:" + recordMetadata.topic() + "分區(qū):" + recordMetadata.partition());
                    }
                }
            });
            kafkaProducer.send(new ProducerRecord<String, String>("first", "a" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        System.out.println("主題:" + recordMetadata.topic() + "分區(qū)" + recordMetadata.partition() + "a");
                    }
                }
            });
            Thread.sleep(500);
        }

        //同步
        for (int i = 0; i < 10; i++) {
            //給first主題發(fā)消息
            kafkaProducer.send(new ProducerRecord<String, String>("first", "sync_hello" + i)).get();
        }

        //關(guān)閉資源
        kafkaProducer.close();
    }
root@ubuntu2203:/usr/local/kafka_2.12-3.5.1/bin# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
a0
hello0
hello20
a1
hello1
hello21
a2
hello2
hello22
a3
hello3
hello23
a4
hello4
hello24
a5
hello5
hello25
a6
hello6
hello26
a7
hello7
hello27
a8
hello8
hello28
a9
hello9
hello29
sync_hello0
sync_hello1
sync_hello2
sync_hello3
sync_hello4
sync_hello5
sync_hello6
sync_hello7
sync_hello8
sync_hello9

2、事務(wù)消息

 public static void main(String[] args) throws ExecutionException, InterruptedException {
        //配置
        Properties properties = new Properties();
        //連接參數(shù)
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.157.130:9092");
        //序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //關(guān)聯(lián)自定義分區(qū)器 可選
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.minos.kafka.producer.MyPatitioner");

        //優(yōu)化參數(shù) 可選
        //緩沖器大小 32M
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 30 * 1024 * 1024);
        //批次大小
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024);
        //Linger.ms
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 5);
        //壓縮
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        //acks
        properties.put(ProducerConfig.ACKS_CONFIG, "-1");
        //重試次數(shù)
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);

        //指定事務(wù)ID
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transactional_id_01");
        properties.put("enable.idempotence", "true");

        //創(chuàng)建生產(chǎn)者
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        //事務(wù)消息 初始化
        kafkaProducer.initTransactions();
        //開始事務(wù)
        kafkaProducer.beginTransaction();
        try {
            kafkaProducer.send(new ProducerRecord<String, String>("first", "Transactions")).get();
            //提交事務(wù)
            kafkaProducer.commitTransaction();
        } catch (Exception e) {
            //終止事務(wù)
            kafkaProducer.abortTransaction();
        } finally {
            //關(guān)閉資源
            kafkaProducer.close();
        }
    }
root@ubuntu2203:/usr/local/kafka_2.12-3.5.1/bin# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
Transactions

到了這里,關(guān)于使用kafka-clients操作數(shù)據(jù)(java)的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 在Spring Boot微服務(wù)集成Kafka客戶端(kafka-clients)操作Kafka

    記錄 :459 場景 :在Spring Boot微服務(wù)集成Kafka客戶端kafka-clients-3.0.0操作Kafka。使用kafka-clients的原生KafkaProducer操作Kafka生產(chǎn)者Producer。使用kafka-clients的原生KafkaConsumer操作Kafka的消費(fèi)者Consumer。 版本 :JDK 1.8,Spring?Boot 2.6.3,kafka_2.12-2.8.0,kafka-clients-3.0.0。 Kafka安裝 :https://blog.csdn.ne

    2024年02月12日
    瀏覽(91)
  • 通過Java client訪問Kafka

    1. Install Kafka 1) download kafka binary from https://kafka.apache.org/downloads 2) extract binary 2. Start Kafka 1) start zookeeper in daemon mode 2) start kafka server in daemon mode 3. Test Kafka 1) create a topic 2) producer events 3) consumer events 4. Access Kafka from Java client 1) download kafka client binary from https://jar-download.com/artifacts

    2024年02月05日
    瀏覽(25)
  • 大數(shù)據(jù)之Kafka————java來實(shí)現(xiàn)kafka相關(guān)操作

    一、在java中配置pom 二、生產(chǎn)者方法 (1)、Producer Java中寫在生產(chǎn)者輸入內(nèi)容在kafka中可以讓消費(fèi)者提取 [root@kb144 config]# kafka-console-consumer.sh --bootstrap-server 192.168.153.144:9092 --topic kb22 (2)、Producer進(jìn)行多線程操作 ? 生產(chǎn)者多線程是一種常見的技術(shù)實(shí)踐,可以提高消息生產(chǎn)的并發(fā)性

    2024年02月11日
    瀏覽(19)
  • C#:了解LINQ,簡化數(shù)據(jù)查詢和操作的強(qiáng)大工具

    C#:了解LINQ,簡化數(shù)據(jù)查詢和操作的強(qiáng)大工具

    以下是 LINQ(Language Integrated Query)中常見的及其作用,并給出一個(gè)示例以展示其執(zhí)行結(jié)果: from :用于指定數(shù)據(jù)源,可以是集合、數(shù)組、數(shù)據(jù)庫表等。 示例: where :用于篩選滿足指定條件的元素。 示例: select :用于選擇返回的結(jié)果集。 示例: orderby :用于對結(jié)果集

    2024年02月12日
    瀏覽(30)
  • 使用spring-kafka的Java API操作Kafka集群的Topic

    記錄 :462 場景 :在Spring Boot微服務(wù)集成spring-kafka-2.8.2操作Kafka集群的Topic的創(chuàng)建和刪除。 版本 :JDK 1.8,Spring?Boot 2.6.3,kafka_2.12-2.8.0,spring-kafka-2.8.2。 Kafka集群安裝 :https://blog.csdn.net/zhangbeizhen18/article/details/131156084 1.微服務(wù)中 配置Kafka信息 1.1在pom.xml添加依賴 pom.xml文件: 解析:

    2024年02月10日
    瀏覽(28)
  • 使用Kafka客戶端(spring-kafka)的Java API操作Kafka的Topic

    記錄 :458 場景 :在Spring Boot微服務(wù)集成Kafka客戶端spring-kafka-2.8.2操作Kafka的Topic的創(chuàng)建和刪除。 版本 :JDK 1.8,Spring?Boot 2.6.3,kafka_2.12-2.8.0,spring-kafka-2.8.2。 Kafka安裝 :https://blog.csdn.net/zhangbeizhen18/article/details/129071395 1.微服務(wù)中 配置Kafka信息 1.1在pom.xml添加依賴 pom.xml文件: 解析

    2024年02月09日
    瀏覽(21)
  • 使用flink的sql-client.sh,測試mysql-->kafka-->kafka-->mysql實(shí)時(shí)流

    使用flink的sql-client.sh,測試mysql-->kafka-->kafka-->mysql實(shí)時(shí)流

    目錄 1. 環(huán)境介紹 2. mysql建表 3. flinksql建表 3.1 進(jìn)入flinksql客戶端? ?3.2 配置輸出格式 ?3.3 flink建表 3.4 任務(wù)流配置 4. 測試 4.1 插入測試數(shù)據(jù) 4.2 查看結(jié)果表數(shù)據(jù)? 4.3 新增測試數(shù)據(jù) 4.4 再次查看結(jié)果表數(shù)據(jù) 服務(wù) 版本 zookeeper 3.8.0 kafka 3.3.1 flink 1.13.5 mysql 5.7.34 jdk 1.8 scala 2.12 連接器

    2024年02月11日
    瀏覽(24)
  • Kafka傳輸數(shù)據(jù)到Spark Streaming通過編寫程序java、scala程序?qū)崿F(xiàn)操作

    Kafka傳輸數(shù)據(jù)到Spark Streaming通過編寫程序java、scala程序?qū)崿F(xiàn)操作

    現(xiàn)有一電商網(wǎng)站數(shù)據(jù)文件,名為buyer_favorite1,記錄了用戶對商品的收藏?cái)?shù)據(jù),數(shù)據(jù)以“t”鍵分割,數(shù)據(jù)內(nèi)容及數(shù)據(jù)格式如下: 項(xiàng)目環(huán)境說明 開啟hadoop集群,zookeeper服務(wù),開啟kafka服務(wù)。再另開啟一個(gè)窗口,在/apps/kafka/bin目錄下創(chuàng)建一個(gè)topic。 1、新創(chuàng)一個(gè)文件folder命名為li

    2024年02月13日
    瀏覽(23)
  • 最新版ES8的client API操作 Elasticsearch Java API client 8.0

    最新版ES8的client API操作 Elasticsearch Java API client 8.0

    作者:ChenZhen 本人不常看網(wǎng)站消息,有問題通過下面的方式聯(lián)系: 郵箱:1583296383@qq.com vx: ChenZhen_7 我的個(gè)人博客地址:https://www.chenzhen.space/?? 版權(quán):本文為博主的原創(chuàng)文章,本文版權(quán)歸作者所有,轉(zhuǎn)載請附上原文出處鏈接及本聲明。?? 如果對你有幫助,請給一個(gè)小小的s

    2024年02月04日
    瀏覽(29)
  • Elasticsearch Java REST Client 批量操作(Bulk API)

    上一篇:Elasticsearch Java REST Client Term Vectors API 下一篇:Elasticsearch Java REST Client Search APIs 查詢 BulkRequest可用于使用單個(gè)請求執(zhí)行多個(gè)索引、更新和/或刪除操作。 它需要至少一個(gè)操作添加到 Bulk 請求中: multiGetAPI 在單個(gè) http 請求中并行執(zhí)行多個(gè)請求get 。 MultiGetRequest,添加 `M

    2024年02月11日
    瀏覽(25)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包