国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

kafka 02——三個(gè)重要的kafka客戶端

這篇具有很好參考價(jià)值的文章主要介紹了kafka 02——三個(gè)重要的kafka客戶端。希望對大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

1. 前言

1.1 關(guān)于 Kafka 的安裝

  • 請參考下面的文章:
    Kafka 01——Kafka的安裝及簡單入門使用.

1.2 常用客戶端簡介

  • AdminClient API:
    允許管理和檢測Topic、Broker以及其他Kafka對象。
  • Producer API:
    發(fā)布消息到一個(gè)或多個(gè)API。
  • Consumer API:
    訂閱一個(gè)或多個(gè)Topic,并處理產(chǎn)生的消息。

1.3 依賴

  • 如下:
    kafka 02——三個(gè)重要的kafka客戶端,# 分布式架構(gòu),# Kafka,kafka,分布式

            <!--kafka客戶端-->
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>2.8.2</version>
            </dependency>
            
    
  • 完整的pom

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.7.6</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.liu.susu</groupId>
        <artifactId>kafka-api</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>kafka-api</name>
        <description>kafka-api</description>
        <properties>
            <java.version>1.8</java.version>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
    
            <dependency>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-classic</artifactId>
                <version>1.2.11</version>
            </dependency>
    
            <!--kafka客戶端-->
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>2.8.2</version>
            </dependency>
    
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    

2. AdminClient

2.1 Admin Configs

  • 關(guān)于配置,可參考官網(wǎng):
    https://kafka.apache.org/documentation/#adminclientconfigs.

2.2 AdminClient API

2.2.1 設(shè)置 AdminClient 對象

  • 詳細(xì)配置請參考官網(wǎng),簡單配置使用,如下:

    package com.liu.susu.admin;
    
    import org.apache.kafka.clients.admin.AdminClient;
    import org.apache.kafka.clients.admin.AdminClientConfig;
    
    import java.util.Arrays;
    import java.util.List;
    import java.util.Properties;
    
    /**
     * @Description
     * @Author susu
     */
    public class AdminExample1 {
    
        public final static String TOPIC_NAME = "";
    
        /**
         * 1. 創(chuàng)建并設(shè)置 AdminClient 對象
         */
        public static AdminClient getAdminClient(){
            Properties properties = new Properties();
            properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "Kafka服務(wù)IP:9092");
    
            AdminClient adminClient = AdminClient.create(properties);
            return adminClient;
        }
    
        public static void main(String[] args) {
            //1. 測試 創(chuàng)建并設(shè)置 AdminClient 對象
            AdminClient adminClient = AdminExample1.getAdminClient();
            System.out.println("adminClient==>" + adminClient);
        }
    }
    
    

    kafka 02——三個(gè)重要的kafka客戶端,# 分布式架構(gòu),# Kafka,kafka,分布式

2.2.2 創(chuàng)建 topic + 獲取 topic 列表

  • 如下:

        /**
         * 2. 創(chuàng)建topic
         */
        public static void createTopic(){
            AdminClient adminClient = getAdminClient();
            // 副本因子
            short rs = 1;
            NewTopic newTopic = new NewTopic("new_topic_test", 1, rs);//new_topic_test 是 topic的name
            CreateTopicsResult topics = adminClient.createTopics(Arrays.asList(newTopic));
            System.out.println("創(chuàng)建的新topic為::::" + topics);
        }
    
        /**
         * 3. 獲取已經(jīng)創(chuàng)建的 topic 的列表
         */
        public static ListTopicsResult getTopicList(){
            AdminClient adminClient = getAdminClient();
            ListTopicsResult topicList = adminClient.listTopics();
            return topicList;
        }
    
  • 測試如下:

        public static void main(String[] args) throws ExecutionException, InterruptedException {
            //1. 測試 創(chuàng)建并設(shè)置 AdminClient 對象
    //        AdminClient adminClient = AdminExample1.getAdminClient();
    //        System.out.println("adminClient==>" + adminClient);
    
            //2. 測試 創(chuàng)建topic
            createTopic();
    
            //3. 獲取已經(jīng)創(chuàng)建的 topic 的列表
            ListTopicsResult topicList = getTopicList();
            Collection<TopicListing> topicListings = topicList.listings().get();
            for (TopicListing topic : topicListings) {
                System.out.println(topic);
            }
    
        }
    

    kafka 02——三個(gè)重要的kafka客戶端,# 分布式架構(gòu),# Kafka,kafka,分布式
    kafka 02——三個(gè)重要的kafka客戶端,# 分布式架構(gòu),# Kafka,kafka,分布式

2.2.3 刪除topic

  • 如下:
        /**
         * 4. 刪除 topic
         */
        public static void deleteTopic(String topicName) throws ExecutionException, InterruptedException {
            AdminClient adminClient = getAdminClient();
            DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList(topicName));
            deleteTopicsResult.all().get();
        }
    

2.2.4 查看 topic 的描述信息

  • 如下:

        /**
         * 5. 獲取描述 topic 的信息
         */
        public static void getDescribeTopics(String topicName) throws ExecutionException, InterruptedException {
            AdminClient adminClient = getAdminClient();
            DescribeTopicsResult result = adminClient.describeTopics(Arrays.asList(topicName));
            Map<String, TopicDescription> descriptionMap = result.all().get();
            descriptionMap.forEach((k,v)->{
                System.out.println("k==>"+k +",v===>"+v);
            });
        }
    

    kafka 02——三個(gè)重要的kafka客戶端,# 分布式架構(gòu),# Kafka,kafka,分布式

    k==>susu-topic,v===>(name=susu-topic, internal=false, partitions=(partition=0, leader=IP:9092 (id: 0 rack: null), replicas=IP:9092 (id: 0 rack: null), isr=IP:9092 (id: 0 rack: null)), authorizedOperations=null)
    

2.2.5 查看 topic 的配置信息

  • 如下:
        /**
         * 6. 獲取 topic 的配置信息
         */
        public static void getDescribeConfig(String topicName) throws ExecutionException, InterruptedException{
            AdminClient adminClient = getAdminClient();
            ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC,topicName);
            DescribeConfigsResult configsResult = adminClient.describeConfigs(Arrays.asList(resource));
            Map<ConfigResource, Config> configMap = configsResult.all().get();
            configMap.forEach((k,v)->{
                System.out.println("k==>"+k +",v===>"+v);
            });
        }
        
        //查看某一項(xiàng)配置(eg:message.downconversion.enable)的值
        Config config = configMap.get(resource);
        ConfigEntry configEntry = config.get("message.downconversion.enable");
        System.out.println("message.downconversion.enable===>" + configEntry.value());
    
    kafka 02——三個(gè)重要的kafka客戶端,# 分布式架構(gòu),# Kafka,kafka,分布式

2.2.6 修改 topic 的配置信息

  • 如下:
        /**
         * 7. 修改 topic 的配置信息
         *    本例修改 message.downconversion.enable,將默認(rèn)的 true 改為 false
         */
        public static void editConfig(String topicName) throws ExecutionException, InterruptedException {
            AdminClient adminClient = getAdminClient();
            Map<ConfigResource,Config> configMap = new HashMap<>();
    
            ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC,topicName);
    
            String keyName = "message.downconversion.enable";
            String value = "false";
            ConfigEntry configEntry = new ConfigEntry(keyName, value);
            Config config = new Config(Arrays.asList(configEntry));
    
            configMap.put(configResource,config);
    
            AlterConfigsResult alterConfigsResult = adminClient.alterConfigs(configMap);
            alterConfigsResult.all().get();
        }
    
  • 效果如下:
    kafka 02——三個(gè)重要的kafka客戶端,# 分布式架構(gòu),# Kafka,kafka,分布式

2.2.7 新增 Partition

2.2.7.1 相關(guān)概念
  • Topic:主題,一個(gè)虛擬的概念,由1到多個(gè) Partitions 組成,可以理解為一個(gè)隊(duì)列,生產(chǎn)者和消費(fèi)者都是面向一個(gè)Topic。
  • Partition:分區(qū),實(shí)際消息存儲單位。為了實(shí)現(xiàn)擴(kuò)展性,一個(gè)非常大的 Topic 可以分布到多個(gè) Broker 上,一個(gè)Topic 可以分為多個(gè) Partition,每個(gè) Partition 是一個(gè)有序的隊(duì)列(分區(qū)有序,不能保證全局有序)
  • Producer:消息生產(chǎn)者,向 Kafka 中發(fā)布消息的角色。
  • Consumer:消息消費(fèi)者,從 Kafka 中拉取消息消費(fèi)的客戶端。
  • Broker:經(jīng)紀(jì)人,一臺 Kafka 服務(wù)器就是一個(gè) Broker,一個(gè)集群由多個(gè) Broker 組成,一個(gè) Broker 可以容納多個(gè) Topic。
2.2.7.2 演示
  • 代碼如下:
        /**
         * 8. 增加 topic 的Partitions
         */
        public static void addPartitionNum(String topicName, int partitionNum) throws ExecutionException, InterruptedException {
            AdminClient adminClient = getAdminClient();
            Map<String,NewPartitions> partitionsMap = new HashMap<>() ;
    
            NewPartitions newPartitions = NewPartitions.increaseTo(partitionNum);//增加到的數(shù)量
    
            partitionsMap.put(topicName,newPartitions);
    
            CreatePartitionsResult request = adminClient.createPartitions(partitionsMap);
            request.all().get();
        }
    
  • 效果如下:
    kafka 02——三個(gè)重要的kafka客戶端,# 分布式架構(gòu),# Kafka,kafka,分布式

2.3 附代碼

  • 如下:
    package com.liu.susu.admin;
    
    import org.apache.kafka.clients.admin.*;
    import org.apache.kafka.common.KafkaFuture;
    import org.apache.kafka.common.config.ConfigResource;
    import org.apache.kafka.common.requests.CreatePartitionsRequest;
    
    import java.util.*;
    import java.util.concurrent.ExecutionException;
    
    /**
     * @Description
     * @Author susu
     */
    public class AdminExample1 {
    
        public final static String TOPIC_NAME = "new_topic_test";
    
        /**
         * 1. 創(chuàng)建并設(shè)置 AdminClient 對象
         */
        public static AdminClient getAdminClient(){
            Properties properties = new Properties();
            properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "43.143.190.116:9092");
    
            AdminClient adminClient = AdminClient.create(properties);
            return adminClient;
        }
    
    
        /**
         * 2. 創(chuàng)建topic
         */
        public static void createTopic(){
            AdminClient adminClient = getAdminClient();
            // 副本因子
            short rs = 1;
            NewTopic newTopic = new NewTopic("new_topic_test", 1, rs);
            CreateTopicsResult topics = adminClient.createTopics(Arrays.asList(newTopic));
            System.out.println("創(chuàng)建的新topic為::::" + topics);
        }
    
        /**
         * 3. 獲取已經(jīng)創(chuàng)建的 topic 的列表
         */
        public static ListTopicsResult getTopicList(){
            AdminClient adminClient = getAdminClient();
            ListTopicsResult topicList = adminClient.listTopics();
            return topicList;
        }
    
        /**
         * 4. 刪除 topic
         */
        public static void deleteTopic(String topicName) throws ExecutionException, InterruptedException {
            AdminClient adminClient = getAdminClient();
            DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList(topicName));
            deleteTopicsResult.all().get();
        }
    
        /**
         * 5. 獲取描述 topic 的信息
         */
        public static void getDescribeTopics(String topicName) throws ExecutionException, InterruptedException {
            AdminClient adminClient = getAdminClient();
            DescribeTopicsResult result = adminClient.describeTopics(Arrays.asList(topicName));
            Map<String, TopicDescription> descriptionMap = result.all().get();
            descriptionMap.forEach((k,v)->{
                System.out.println("k==>"+k +",v===>"+v);
            });
        }
    
        /**
         * 6. 獲取 topic 的配置信息
         */
        public static void getDescribeConfig(String topicName) throws ExecutionException, InterruptedException{
            AdminClient adminClient = getAdminClient();
            ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC,topicName);
            DescribeConfigsResult configsResult = adminClient.describeConfigs(Arrays.asList(resource));
            Map<ConfigResource, Config> configMap = configsResult.all().get();
            configMap.forEach((k,v)->{
                System.out.println("\nk==>"+k +",v===>"+v);
            });
    
            //查看某一項(xiàng)配置(eg:message.downconversion.enable)的值
            Config config = configMap.get(resource);
            ConfigEntry configEntry = config.get("message.downconversion.enable");
            System.out.println("message.downconversion.enable===>" + configEntry.value());
        }
    
        /**
         * 7. 修改 topic 的配置信息
         *    本例修改 message.downconversion.enable,將默認(rèn)的 true 改為 false
         */
        public static void editConfig(String topicName) throws ExecutionException, InterruptedException {
            AdminClient adminClient = getAdminClient();
            Map<ConfigResource,Config> configMap = new HashMap<>();
    
            ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC,topicName);
    
            String keyName = "message.downconversion.enable";
            String value = "false";
            ConfigEntry configEntry = new ConfigEntry(keyName, value);
            Config config = new Config(Arrays.asList(configEntry));
    
            configMap.put(configResource,config);
    
            AlterConfigsResult alterConfigsResult = adminClient.alterConfigs(configMap);
            alterConfigsResult.all().get();
        }
    
        /**
         * 8. 增加 topic 的Partitions
         */
        public static void addPartitionNum(String topicName, int partitionNum) throws ExecutionException, InterruptedException {
            AdminClient adminClient = getAdminClient();
            Map<String,NewPartitions> partitionsMap = new HashMap<>() ;
    
            NewPartitions newPartitions = NewPartitions.increaseTo(partitionNum);//增加到的數(shù)量
    
            partitionsMap.put(topicName,newPartitions);
    
            CreatePartitionsResult request = adminClient.createPartitions(partitionsMap);
            request.all().get();
        }
    
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            //1. 測試 創(chuàng)建并設(shè)置 AdminClient 對象
    //        AdminClient adminClient = AdminExample1.getAdminClient();
    //        System.out.println("adminClient==>" + adminClient);
    
    
            //2. 測試 創(chuàng)建topic
    //        createTopic();
    
            //3. 獲取已經(jīng)創(chuàng)建的 topic 的列表
            ListTopicsResult topicList = getTopicList();
            Collection<TopicListing> topicListings = topicList.listings().get();
            for (TopicListing topic : topicListings) {
                System.out.println(topic);
            }
    
            // 4. 刪除topic
    //        deleteTopic("new_topic_test");
    
            // 5.
    //        getDescribeTopics("susu-topic");
    
            //6. 獲取 topic 的配置信息
    //        getDescribeConfig("susu-topic");
    
            // 7. 修改 topic 的配置信息
    //        editConfig("susu-topic");
    //
    //        System.out.println("\n=============修改之后的配置===========\n");
    //
    //        getDescribeConfig("susu-topic"); //修改之后再查看配置
    
            //8. 增加 topic 的Partitions
            addPartitionNum("susu-topic",2);
            System.out.println("添加完畢");
    
        }
    
    }
    
    

3. 生產(chǎn)者(Producer API)

3.1 Producer Configs

3.1.1 參考官網(wǎng)

  • 關(guān)于 Producer Configs 更多配置,參考官網(wǎng)
    https://kafka.apache.org/documentation/#producerconfigs.

  • 簡單看個(gè)配置,如下:
    kafka 02——三個(gè)重要的kafka客戶端,# 分布式架構(gòu),# Kafka,kafka,分布式

    kafka 02——三個(gè)重要的kafka客戶端,# 分布式架構(gòu),# Kafka,kafka,分布式

3.1.2 關(guān)于acks 的配置(消息傳遞保障)

關(guān)于 acks 的配置,在考慮請求完成之前,生產(chǎn)者要求領(lǐng)導(dǎo)已收到的確認(rèn)次數(shù)。這控制發(fā)送的記錄的持久性。允許以下設(shè)置:

  • acks=0 ,如果設(shè)置為0,那么生產(chǎn)者將不會等待服務(wù)器的任何確認(rèn)。(即:消息發(fā)送之后就不管了,無論消息是否寫成功)

    • 該記錄將立即添加到套接字緩沖區(qū)并被認(rèn)為已發(fā)送。
    • 在這種情況下,不能保證服務(wù)器已經(jīng)接收到記錄,重試配置將不會生效(因?yàn)榭蛻舳送ǔ2粫廊魏问?。為每條記錄返回的偏移量將始終設(shè)置為-1。
    • 即:這種情況消息發(fā)送之后,要么根本沒收到要么收到一次,所以,最多收到一次消息(收到0次或多次)。
  • acks=1 ,這將意味著leader將記錄寫入其本地日志,但將在不等待所有follower完全確認(rèn)的情況下進(jìn)行響應(yīng)。

    • 在這種情況下,如果leader在確認(rèn)記錄后立即失敗,但在follower復(fù)制它之前,那么記錄將丟失。
    • 兩種情況:
      • 一是,沒收到消息沒有回應(yīng)的重復(fù)發(fā)送,這時(shí)還是收到1次;
      • 二是,收到消息但是回應(yīng)出了問題,即僅沒收到回應(yīng)的重發(fā),這時(shí)就會重復(fù)收到消息,所以多次。
    • 即:這種情況至少收到一次消息(一次或多次)。
  • acks=all(或者acks=-1) ,這意味著leader將等待同步副本的完整集合來確認(rèn)記錄

    • 這保證了只要至少有一個(gè)同步副本保持活動(dòng)狀態(tài),記錄就不會丟失。這是最有力的保證。
    • 即:這種情況下收到消息有且僅有一次,如果重復(fù)發(fā)送會拒收。
  • 注意,啟用冪等性需要這個(gè)配置值為“all”。如果設(shè)置了沖突的配置,并且冪等性沒有顯式啟用,則冪等性被禁用。

3.2 Producer API

3.2.1 異步發(fā)送

  • 代碼如下:
    kafka 02——三個(gè)重要的kafka客戶端,# 分布式架構(gòu),# Kafka,kafka,分布式
    package com.liu.susu.producer;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.util.Properties;
    
    /**
     * @Description
     * @Author susu
     */
    public class ProducerExample1 {
    
        public static Properties getProperties(){
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");
            properties.put(ProducerConfig.ACKS_CONFIG, "all");
            properties.put(ProducerConfig.RETRIES_CONFIG, "0");
            properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16348");
            properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");
            properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            return properties;
        }
    
        /**
         * 1. 發(fā)送消息(異步發(fā)送)
         *    1.1 一次發(fā)一條消息
         */
        public static void producerSendOne(String topicName){
            Properties properties = getProperties();
            //Producer對象
            Producer<String, String> producer = new KafkaProducer<>(properties);
            //消息對象
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName,"num1","A-10001");
            //發(fā)送消息
            producer.send(producerRecord);
            //所有的通道打開都要記得關(guān)閉
            producer.close();
        }
        /**
         * 1. 發(fā)送消息(異步發(fā)送)
         *    1.2 一次發(fā)多條消息
         */
        public static void producerSendMore(String topicName){
            Properties properties = getProperties();
            //Producer對象
            Producer<String, String> producer = new KafkaProducer<>(properties);
            for (int i = 0; i < 5; i++) {
                //消息對象
                ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName,"Record-"+i,"R-1000"+i);
                //發(fā)送消息
                producer.send(producerRecord);
            }
            producer.close();
        }
    
        public static void main(String[] args) {
            //1.1 一次發(fā)一條消息
    //        producerSendOne("susu-topic");
    
            //1.2 一次發(fā)多條消息
            producerSendMore("susu-topic");
        }
    
    }
    
    
  • 測試效果如下:
    kafka 02——三個(gè)重要的kafka客戶端,# 分布式架構(gòu),# Kafka,kafka,分布式

3.2.2 異步阻塞發(fā)送(同步發(fā)送)

  • 代碼如下:

    package com.liu.susu.producer;
    
    import org.apache.kafka.clients.producer.*;
    
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    
    /**
     * @Description
     * @Author susu
     */
    public class ProducerExample2 {
    
        public static Properties getProperties(){
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "43.143.190.116:9092");
            properties.put(ProducerConfig.ACKS_CONFIG, "all");
            properties.put(ProducerConfig.RETRIES_CONFIG, "0");
            properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16348");
            properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");
            properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            return properties;
        }
        
        /**
         * 1. 異步阻塞發(fā)送(同步發(fā)送)
         */
        public static void producerSendMore(String topicName) throws ExecutionException, InterruptedException {
            Properties properties = getProperties();
            //Producer對象
            Producer<String, String> producer = new KafkaProducer<>(properties);
            for (int i = 0; i < 5; i++) {
                //消息對象
                ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName,"Z-Record-"+i,"Z-R-1000"+i);
                //發(fā)送消息
    //            producer.send(producerRecord);
                Future<RecordMetadata> send = producer.send(producerRecord);
                RecordMetadata recordMetadata = send.get();//future.get會進(jìn)行阻塞直到返回?cái)?shù)據(jù)表示發(fā)送成功,才會繼續(xù)下一條消息的發(fā)送
    
                System.out.println("Z-Record-"+i + ",partition-->"+recordMetadata.partition() + ",offset-->"+recordMetadata.offset());
    
            }
            producer.close();
        }
    
        public static void main(String[] args) throws ExecutionException, InterruptedException{
            //1. 異步阻塞發(fā)送(同步發(fā)送)
            producerSendMore("susu-topic");
        }
    
    }
    
    
  • 測試如下:
    kafka 02——三個(gè)重要的kafka客戶端,# 分布式架構(gòu),# Kafka,kafka,分布式
    kafka 02——三個(gè)重要的kafka客戶端,# 分布式架構(gòu),# Kafka,kafka,分布式

3.2.3 異步發(fā)送并回調(diào)

  • 生產(chǎn)者發(fā)消息,發(fā)送完之后不用等待broker給回復(fù),直接執(zhí)行下面的業(yè)務(wù)邏輯??梢蕴峁┗卣{(diào)方法,讓broker異步的調(diào)用callback,告知生產(chǎn)者,消息發(fā)送的結(jié)果。這種方式就不用像異步阻塞那樣,發(fā)送完之后還得阻塞等著。

  • 效果如下:
    kafka 02——三個(gè)重要的kafka客戶端,# 分布式架構(gòu),# Kafka,kafka,分布式

  • 代碼如下:

    package com.liu.susu.producer;
    
    import org.apache.kafka.clients.producer.*;
    
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    
    /**
     * @Description
     * @Author susu
     */
    public class ProducerExample3 {
    
        public static Properties getProperties(){
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "43.143.190.116:9092");
            properties.put(ProducerConfig.ACKS_CONFIG, "all");
            properties.put(ProducerConfig.RETRIES_CONFIG, "0");
            properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16348");
            properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");
            properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            return properties;
        }
        
        /**
         * 1. 異步發(fā)送并回調(diào)
         */
        public static void producerSendMore(String topicName) throws ExecutionException, InterruptedException {
            Properties properties = getProperties();
            //Producer對象
            Producer<String, String> producer = new KafkaProducer<>(properties);
            for (int i = 0; i < 5; i++) {
                //消息對象
                ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName,"H4-Record-"+i,"H4-R-1000"+i);
    
                //1 發(fā)送消息:異步發(fā)送并回調(diào)
                producer.send(producerRecord, (recordMetadata, exception) -> {
                    if(exception == null) {
                        System.out.println("partition-->"+recordMetadata.partition() + ",offset-->"+recordMetadata.offset());
                    }
                    else {
                        exception.printStackTrace();
                    }
                });
    
                //2 發(fā)送消息:異步發(fā)送并回調(diào)
    //            producer.send(producerRecord, new Callback() {
    //                @Override
    //                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    //                    if(e == null) {
    //                        System.out.println("partition-->"+recordMetadata.partition() + ",offset-->"+recordMetadata.offset());
    //                    }
    //                    else {
    //                        e.printStackTrace();
    //                    }
    //                }
    //            });
    
            }
            producer.close();// 要關(guān)閉Producer實(shí)例
        }
        public static void main(String[] args) throws ExecutionException, InterruptedException{
            //1. 異步發(fā)送并回調(diào)
            producerSendMore("susu-topic");
        }
    
    }
    
    

3.2.4 總結(jié) ( 異步阻塞發(fā)送 與 異步發(fā)送)

3.2.3.1 異步阻塞發(fā)送
  • 此方式可理解為同步發(fā)送(即:同步就是逐條發(fā)送。)。
    • 一定是逐條發(fā)送的,第一條響應(yīng)到達(dá)后,才會請求第二條。會對每條消息的結(jié)果進(jìn)行判斷,future.get() 會進(jìn)行阻塞直到返回?cái)?shù)據(jù)表示發(fā)送成功,才會繼續(xù)下一條消息的發(fā)送,可以直到每條信息的發(fā)送情況。
    • 此方式如果發(fā)送失敗會進(jìn)行重試并拋出異常,直至重試達(dá)到retries最大次數(shù),此方式也是最大程度確保數(shù)據(jù)可靠性,可以記錄對應(yīng)的結(jié)果日志。
3.2.3.2 異步發(fā)送
  • 異步就是批量發(fā)送。
    • 如果設(shè)置成異步的模式,可以運(yùn)行生產(chǎn)者以batch的形式push數(shù)據(jù),這樣會極大的提高broker的性能,但是這樣會增加丟失數(shù)據(jù)的風(fēng)險(xiǎn)。
    • 異步方式,可以發(fā)送一條,也可以批量發(fā)送多條,特性是不需等第一次(注意這里單位是次,因?yàn)閱未慰梢允菃螚l,也可以是批量數(shù)據(jù))響應(yīng),就立即發(fā)送第二次。
3.2.3.3 參考
  • Kafka同步發(fā)送與異步發(fā)送消息.

3.3 Producer 自定義Partition分區(qū)規(guī)則(負(fù)載均衡)

3.3.1 把 Partition 增加到3

  • 如下,0 ,1 ,2:
    kafka 02——三個(gè)重要的kafka客戶端,# 分布式架構(gòu),# Kafka,kafka,分布式

3.3.2 核心代碼

  • 如下:
    kafka 02——三個(gè)重要的kafka客戶端,# 分布式架構(gòu),# Kafka,kafka,分布式
    kafka 02——三個(gè)重要的kafka客戶端,# 分布式架構(gòu),# Kafka,kafka,分布式

  • MyPartition.java

    package com.liu.susu.producer;
    
    import org.apache.kafka.clients.producer.Partitioner;
    import org.apache.kafka.common.Cluster;
    
    import java.util.Map;
    
    /**
     * @Description
     * @Author susu
     */
    public class MyPartition implements Partitioner {
    
    
        @Override
        public int partition(String topic, Object key, byte[] bytes, Object value, byte[] bytes1, Cluster cluster) {
            String newsKey = key + "";  //格式:"P-Record-"+i
    
            String newKeyNum = newsKey.substring(newsKey.length()-1);//取最后一位
            int keyNum = Integer.parseInt(newKeyNum);
    
            int partition = keyNum % 3;
    
            System.out.println("newsKey--->"+newsKey + ",newKeyNum-->"+newKeyNum+",partition-->"+partition);
    
            return partition;
        }
    
        @Override
        public void close() {
    
        }
    
        @Override
        public void configure(Map<String, ?> map) {
    
        }
    
    }
    
    
  • 例子:

    package com.liu.susu.producer;
    
    import org.apache.kafka.clients.producer.*;
    
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    
    /**
     * @Description
     * @Author susu
     */
    public class ProducerExample4 {
    
        public static Properties getProperties(){
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "43.143.190.116:9092");
            properties.put(ProducerConfig.ACKS_CONFIG, "all");
            properties.put(ProducerConfig.RETRIES_CONFIG, "0");
            properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16348");
            properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");
            properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
    
            properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.liu.susu.producer.MyPartition");
    
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            return properties;
        }
        
        /**
         * 1. 異步發(fā)送并回調(diào)
         */
        public static void producerSendMore(String topicName) throws ExecutionException, InterruptedException {
            Properties properties = getProperties();
            //Producer對象
            Producer<String, String> producer = new KafkaProducer<>(properties);
            for (int i = 1; i <= 15; i++) {
                //消息對象
                ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName,"P-Record-"+i,"P-R-1000"+i);
    
                //發(fā)送消息:異步發(fā)送并回調(diào)
                producer.send(producerRecord, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        if(e == null) {
                            System.out.println("partition-->"+recordMetadata.partition() + ",offset-->"+recordMetadata.offset());
                        }
                        else {
                            e.printStackTrace();
                        }
                    }
                });
    
            }
            producer.close();// 要關(guān)閉Producer實(shí)例
        }
        public static void main(String[] args) throws ExecutionException, InterruptedException{
            // 異步發(fā)送并回調(diào)
            producerSendMore("susu-topic");
        }
    
    }
    
    

3.3.3 效果

  • 使用異步發(fā)送并回調(diào),效果如下:
    kafka 02——三個(gè)重要的kafka客戶端,# 分布式架構(gòu),# Kafka,kafka,分布式

4. 消費(fèi)者

4.1 Consumer Configs

  • 參考官網(wǎng):
    https://kafka.apache.org/documentation/#consumerconfigs.
    kafka 02——三個(gè)重要的kafka客戶端,# 分布式架構(gòu),# Kafka,kafka,分布式

4.2 消費(fèi)者消費(fèi)例子

4.2.1 官網(wǎng)參考

  • 如下:
    https://kafka.apache.org/28/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html.
  • 例子可參考 Class KafkaConsumer<K,?V>
    kafka 02——三個(gè)重要的kafka客戶端,# 分布式架構(gòu),# Kafka,kafka,分布式

4.2.2 簡單入門例子——自動(dòng)偏移提交

  • 這種情況下,消費(fèi)過的不會再消費(fèi),代碼如下:
    kafka 02——三個(gè)重要的kafka客戶端,# 分布式架構(gòu),# Kafka,kafka,分布式

    package com.liu.susu.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.Properties;
    
    /**
     * @Description
     * @Author susu
     */
    public class ConsumerExample1 {
    
        public static void consumerTest(){
            Properties props = new Properties();
            props.setProperty("bootstrap.servers", "IP:9092");
            props.setProperty("group.id", "test");
            props.setProperty("enable.auto.commit", "true");//設(shè)置enable.auto.commit意味著自動(dòng)提交偏移量,其頻率由配置auto.commit.interval.ms控制
            props.setProperty("auto.commit.interval.ms", "1000");
            props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    
            /**
             * 消費(fèi)訂閱哪一個(gè)topic或者哪幾個(gè)topic
             *   我這里:消費(fèi)者訂閱了主題susu-topic和susu-topic-2,作為消費(fèi)者組test的一部分,并配置了group.id。
             */
            consumer.subscribe(Arrays.asList("susu-topic", "susu-topic-2"));
    
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));//每100毫秒拉取一次
                for (ConsumerRecord<String, String> record : records)
                    System.out.printf("topic = %s,partition = %d, offset = %d, key = %s, value = %s%n",
                            record.topic(),record.partition(),record.offset(), record.key(), record.value());
            }
        }
    
        public static void main(String[] args) {
            consumerTest();
        }
    
    }
    
    
  • 效果如下:
    kafka 02——三個(gè)重要的kafka客戶端,# 分布式架構(gòu),# Kafka,kafka,分布式

4.2.2 手動(dòng)偏移控制

4.2.3.1 解釋
  • 用戶還可以控制何時(shí)將記錄視為已消耗記錄,從而提交其偏移量,而不是依賴于消費(fèi)者定期提交所消耗的偏移量。當(dāng)消息的消費(fèi)與一些處理邏輯相結(jié)合時(shí),這很有用,因此在消息完成處理之前不應(yīng)將其視為消費(fèi)。
  • 在本例中,我們將使用一批記錄并在內(nèi)存中批量處理它們。當(dāng)我們有足夠的記錄時(shí),我們將把它們插入到數(shù)據(jù)庫中。如果我們像前面的例子一樣允許偏移量自動(dòng)提交,那么記錄在poll中返回給用戶后就會被認(rèn)為是消耗了。這樣,我們的流程就有可能在對記錄進(jìn)行批處理之后,但在將它們插入數(shù)據(jù)庫之前失敗。
    為了避免這種情況,我們將只在將相應(yīng)的記錄插入數(shù)據(jù)庫之后
    手動(dòng)提交偏移量
    。這使我們能夠精確控制記錄何時(shí)被消費(fèi)。這引發(fā)了相反的可能性:進(jìn)程可能在插入數(shù)據(jù)庫之后但在提交之前的時(shí)間間隔內(nèi)失敗(盡管這可能只有幾毫秒,但這是有可能的)。在這種情況下,接管消費(fèi)的進(jìn)程將從最后提交的偏移量中消費(fèi),并將重復(fù)插入最后一批數(shù)據(jù)。使用這種方式,Kafka提供了通常被稱為==“至少一次”的交付保證==,因?yàn)槊總€(gè)記錄可能只交付一次,但在失敗的情況下可以復(fù)制。
4.2.3.2 代碼
  • 代碼如下:
    kafka 02——三個(gè)重要的kafka客戶端,# 分布式架構(gòu),# Kafka,kafka,分布式
    kafka 02——三個(gè)重要的kafka客戶端,# 分布式架構(gòu),# Kafka,kafka,分布式

    package com.liu.susu.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    import java.util.Properties;
    
    /**
     * @Description 手動(dòng)提交
     * @Author susu
     */
    public class ConsumerExample2 {
    
        public static void consumerTest(){
            Properties props = new Properties();
            props.setProperty("bootstrap.servers", "43.143.190.116:9092");
            props.setProperty("group.id", "test");
            props.setProperty("enable.auto.commit", "false");//false 手動(dòng)提交
            props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    
    //        consumer.subscribe(Arrays.asList("susu-topic", "susu-topic-2"));
            consumer.subscribe(Arrays.asList("susu-topic"));
    
            final int minBatchSize = 20;
            List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
    
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    buffer.add(record);
                    System.out.printf("topic = %s,partition = %d, offset = %d, key = %s, value = %s%n",
                            record.topic(),record.partition(),record.offset(), record.key(), record.value());
                }
    
                if (buffer.size() >= minBatchSize) {
                    System.out.println(buffer);
    
                    try {
                        /**
                         * 這里是業(yè)務(wù)邏輯,把數(shù)據(jù)保存到數(shù)據(jù)庫中
                         *    如果失敗,則回滾
                         */
    //                insertIntoDb(buffer);
    
                        //如果成功,則手動(dòng)通知offset提交
                        consumer.commitSync();//消費(fèi)過之后不會再重復(fù)消費(fèi)
                    } catch (Exception e) {
                        System.out.println("失敗,不提交");//失敗不執(zhí)行commitSync,后續(xù)重復(fù)發(fā)送會消費(fèi)
                        throw new RuntimeException(e);
                    }
    
                    buffer.clear();
                }
            }
    
    
        }
    
        public static void main(String[] args) {
            consumerTest();
        }
    
    }
    
    

4.2.3 每個(gè) partition 單獨(dú)處理

4.2.3.1 解釋
  • 上面的例子使用 commitSync 將所有收到的記錄標(biāo)記為已提交。在某些情況下,你可能希望通過顯式指定偏移量來更好地控制已提交的記錄。在本示例中,我們在處理完每個(gè)分區(qū)中的記錄后提交偏移量。
4.2.3.2 代碼
  • 代碼如下:
    kafka 02——三個(gè)重要的kafka客戶端,# 分布式架構(gòu),# Kafka,kafka,分布式

    package com.liu.susu.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    import org.apache.kafka.common.TopicPartition;
    
    import java.time.Duration;
    import java.util.*;
    
    /**
     * @Description 處理完每個(gè)分區(qū)中的記錄后提交偏移量
     * @Author susu
     */
    public class ConsumerExample3 {
    
        public static void consumerTest(){
            Properties props = new Properties();
            props.setProperty("bootstrap.servers", "43.143.190.116:9092");
            props.setProperty("group.id", "test");
            props.setProperty("enable.auto.commit", "false");//false 手動(dòng)提交
            props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    
    //        consumer.subscribe(Arrays.asList("susu-topic", "susu-topic-2"));
            consumer.subscribe(Arrays.asList("susu-topic"));
    
            try {
                while(true) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
                    // 每個(gè)partition單獨(dú)處理
                    for (TopicPartition partition : records.partitions()) {
    
                        List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                        for (ConsumerRecord<String, String> record : partitionRecords) {
    //                        System.out.println(record.offset() + ": " + record.value());
                            System.out.printf("topic = %s,partition = %d, offset = %d, key = %s, value = %s%n",
                                    record.topic(),record.partition(),record.offset(), record.key(), record.value());
                        }
    
                        long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                        // 循環(huán)一個(gè)partition,提交一次
                        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(lastOffset + 1);
                        consumer.commitSync(Collections.singletonMap(partition, offsetAndMetadata));
                    }
                }
            } finally {
                consumer.close();
            }
    
        }
    
        public static void main(String[] args) {
            consumerTest();
        }
    
    }
    
    
  • 效果如下:
    kafka 02——三個(gè)重要的kafka客戶端,# 分布式架構(gòu),# Kafka,kafka,分布式

4.2.3.3 注意
  • 注意:提交的偏移量應(yīng)該始終是應(yīng)用程序?qū)⒆x取的下一條消息的偏移量。因此,在調(diào)用commitSync(offsets)時(shí),應(yīng)該在最后處理的消息的偏移量上添加一個(gè)。

4.2.4 手動(dòng)控制消費(fèi)哪個(gè)partition(手動(dòng)分區(qū)分配)

4.2.4.1 描述
  • 在前面的例子中,我們訂閱了我們感興趣的主題,并讓Kafka根據(jù)組中活躍的消費(fèi)者動(dòng)態(tài)地為這些主題分配公平的分區(qū)份額。但是,在某些情況下,您可能需要對分配的特定分區(qū)進(jìn)行更好的控制。例如:
    • 如果進(jìn)程正在維護(hù)與該分區(qū)相關(guān)的某種本地狀態(tài)(比如本地磁盤上的鍵值存儲),那么它應(yīng)該只獲取它在磁盤上維護(hù)的分區(qū)的記錄。
    • 如果進(jìn)程本身是高可用的,并且在失敗時(shí)將重新啟動(dòng)(可能使用像YARN、Mesos或AWS設(shè)施這樣的集群管理框架,或者作為流處理框架的一部分)。在這種情況下,Kafka不需要檢測故障并重新分配分區(qū),因?yàn)橄M(fèi)進(jìn)程將在另一臺機(jī)器上重新啟動(dòng)。
  • 要使用這種模式,不需要使用subscribe訂閱主題,只需調(diào)用assign(Collection),其中包含要使用的分區(qū)的完整列表。
4.2.4.2 代碼
  • 如下:
    kafka 02——三個(gè)重要的kafka客戶端,# 分布式架構(gòu),# Kafka,kafka,分布式

    package com.liu.susu.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    import org.apache.kafka.common.TopicPartition;
    
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.Collections;
    import java.util.List;
    import java.util.Properties;
    
    /**
     * @Description 指定消費(fèi)某個(gè)分區(qū)
     * @Author susu
     */
    public class ConsumerExample4 {
    
        public static void consumerTest(){
            Properties props = new Properties();
            props.setProperty("bootstrap.servers", "43.143.190.116:9092");
            props.setProperty("group.id", "test");
            props.setProperty("enable.auto.commit", "false");//false 手動(dòng)提交
            props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    
            String topicName = "susu-topic";
            TopicPartition partition0 = new TopicPartition(topicName, 0);
            TopicPartition partition1 = new TopicPartition(topicName, 1);
            TopicPartition partition2 = new TopicPartition(topicName, 2);
    
            consumer.assign(Arrays.asList(partition2)); //只有partition2消費(fèi)
    //        consumer.assign(Arrays.asList(partition0, partition1));  //只有partition0, partition1消費(fèi)
    
    
            try {
                while(true) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
                    // 每個(gè)partition單獨(dú)處理
                    for (TopicPartition partition : records.partitions()) {
    
                        List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                        for (ConsumerRecord<String, String> record : partitionRecords) {
    //                        System.out.println(record.offset() + ": " + record.value());
                            System.out.printf("topic = %s,partition = %d, offset = %d, key = %s, value = %s%n",
                                    record.topic(),record.partition(),record.offset(), record.key(), record.value());
                        }
    
                        long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                        // 循環(huán)一個(gè)partition,提交一次
                        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(lastOffset + 1);
                        consumer.commitSync(Collections.singletonMap(partition, offsetAndMetadata));
                    }
                }
            } finally {
                consumer.close();
            }
    
        }
    
        public static void main(String[] args) {
            consumerTest();
        }
    
    }
    
    
4.2.4.3 效果
  • 如下:
    kafka 02——三個(gè)重要的kafka客戶端,# 分布式架構(gòu),# Kafka,kafka,分布式

    kafka 02——三個(gè)重要的kafka客戶端,# 分布式架構(gòu),# Kafka,kafka,分布式

4.2.5 消費(fèi)者多線程處理

4.2.5.1 消費(fèi)者線程不安全
  • Kafka消費(fèi)者不是線程安全的。所有網(wǎng)絡(luò)I/O都發(fā)生在發(fā)出調(diào)用的應(yīng)用程序線程中。確保多線程訪問正確同步是用戶的責(zé)任。非同步訪問將導(dǎo)致ConcurrentModificationException。
  • 該規(guī)則的唯一例外是wakeup(),它可以安全地從外部線程中斷活動(dòng)操作。在這種情況下,阻塞操作的線程將拋出WakeupException。這可以用于從另一個(gè)線程關(guān)閉消費(fèi)者。
    • 然后在一個(gè)單獨(dú)的線程中,可以通過設(shè)置closed標(biāo)志并喚醒消費(fèi)者來關(guān)閉消費(fèi)者。
      closed.set(true);
      consumer.wakeup ();
      
4.2.5.2 兩種方式實(shí)現(xiàn)
4.2.5.2.1 每個(gè)線程一個(gè)消費(fèi)者
  • 一個(gè)簡單的選擇是為每個(gè)線程提供自己的消費(fèi)者實(shí)例。以下是這種方法的優(yōu)點(diǎn)和缺點(diǎn):
    • 利:這是最容易實(shí)現(xiàn)的
    • 優(yōu)點(diǎn):它通常是最快的,因?yàn)椴恍枰€程間的協(xié)調(diào)
    • 優(yōu)點(diǎn):它使得基于每個(gè)分區(qū)的有序處理非常容易實(shí)現(xiàn)(每個(gè)線程只按照接收消息的順序處理消息)。
    • 缺點(diǎn):更多的消費(fèi)者意味著更多的TCP連接到集群(每個(gè)線程一個(gè))。一般來說,Kafka處理連接非常有效,所以這通常是一個(gè)小成本。
    • 缺點(diǎn):多個(gè)消費(fèi)者意味著更多的請求被發(fā)送到服務(wù)器,稍微少一些數(shù)據(jù)批處理,這可能會導(dǎo)致I/O吞吐量下降。
    • 缺點(diǎn):所有進(jìn)程的線程總數(shù)將受到分區(qū)總數(shù)的限制。
4.2.5.2.1 將消費(fèi)和處理分離
  • 這種方法是讓一個(gè)或多個(gè)消費(fèi)者線程完成所有數(shù)據(jù)消費(fèi),并將ConsumerRecords實(shí)例交給阻塞隊(duì)列,該隊(duì)列由實(shí)際處理記錄處理的處理器線程池使用。這個(gè)選項(xiàng)同樣也有利弊:
    • 優(yōu)點(diǎn):這個(gè)選項(xiàng)允許獨(dú)立地?cái)U(kuò)展消費(fèi)者和處理器的數(shù)量。這使得單個(gè)消費(fèi)者可以為多個(gè)處理器線程提供服務(wù),從而避免了對分區(qū)的任何限制。
    • 缺點(diǎn):保證跨處理器的順序需要特別注意,因?yàn)榫€程將獨(dú)立執(zhí)行,由于線程執(zhí)行時(shí)間的運(yùn)氣,較早的數(shù)據(jù)塊實(shí)際上可能在較晚的數(shù)據(jù)塊之后處理。對于沒有訂購要求的處理,這不是問題。
    • 缺點(diǎn):手動(dòng)提交位置變得更加困難,因?yàn)樗枰芯€程協(xié)調(diào)以確保對該分區(qū)的處理完成。
      這種方法有許多可能的變體。例如,每個(gè)處理器線程可以有自己的隊(duì)列,消費(fèi)者線程可以使用TopicPartition散列到這些隊(duì)列中,以確保有序消費(fèi)并簡化提交。
4.2.5.3 典型的模式(每個(gè)線程一個(gè)消費(fèi)者)
  • 代碼如下:
    kafka 02——三個(gè)重要的kafka客戶端,# 分布式架構(gòu),# Kafka,kafka,分布式文章來源地址http://www.zghlxwxcb.cn/news/detail-649703.html

    package com.liu.susu.consumer.thread;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.errors.WakeupException;
    
    import java.time.Duration;
    import java.util.*;
    import java.util.concurrent.atomic.AtomicBoolean;
    
    /**
     * @Description
     * @Author susu
     */
    public class KafkaConsumerRunner implements Runnable {
        private final AtomicBoolean closed = new AtomicBoolean(false);
        private final KafkaConsumer consumer;
        public KafkaConsumerRunner(KafkaConsumer consumer) {
            this.consumer = consumer;
        }
    
        @Override
        public void run() {
            try {
                consumer.subscribe(Arrays.asList("susu-topic"));//訂閱
                while (!closed.get()) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
                    // Handle new records
                    for (TopicPartition partition : records.partitions()) {
    
                        List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                        for (ConsumerRecord<String, String> record : partitionRecords) {
                            System.out.printf("Thread = %s,topic = %s,partition = %d, offset = %d, key = %s, value = %s%n",
                                    Thread.currentThread().getName(),
                                    record.topic(),record.partition(),record.offset(), record.key(), record.value());
                        }
    
                        long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                        // 循環(huán)一個(gè)partition,提交一次
                        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(lastOffset + 1);
                        consumer.commitSync(Collections.singletonMap(partition, offsetAndMetadata));
                    }
                }
            } catch (WakeupException e) {
                // Ignore exception if closing
                if (!closed.get()) throw e;
            } finally {
                consumer.close();
            }
        }
    
        // Shutdown hook which can be called from a separate thread
        public void shutdown() {
            closed.set(true);
            consumer.wakeup();
        }
    
    
        /**
         * 構(gòu)建 consumer
         * @return consumer
         */
        public static KafkaConsumer<String, String> getKafkaConsumer(){
            Properties props = new Properties();
            props.setProperty("bootstrap.servers", "43.143.190.116:9092");
            props.setProperty("group.id", "test");
            props.setProperty("enable.auto.commit", "false");//false 手動(dòng)提交
            props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    
            return consumer;
        }
    
        public static void main(String[] args) {
            KafkaConsumer<String, String> consumer = getKafkaConsumer();
    
            KafkaConsumerRunner runner = new KafkaConsumerRunner(consumer);
    
            Thread thread = new Thread(runner);
            thread.start();
    
    //        runner.shutdown();
        }
    
    }
    
    
4.2.5.4 將消費(fèi)和處理分離(線程池處理)

4.2.6

4.3

到了這里,關(guān)于kafka 02——三個(gè)重要的kafka客戶端的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 【實(shí)踐篇】Redis最強(qiáng)Java客戶端(三)之Redisson 7種分布式鎖使用指南

    【實(shí)踐篇】Redis最強(qiáng)Java客戶端(三)之Redisson 7種分布式鎖使用指南

    前兩章我們了解了《【實(shí)踐篇】Redis最強(qiáng)Java客戶端(一)之Redisson入門介紹》和《【實(shí)踐篇】Redis最強(qiáng)Java客戶端(二)之Redisson基礎(chǔ)概念》本章第三章主要介紹Redisson的七種分布式鎖,分別是簡單鎖、公平鎖、可重入鎖、紅鎖、讀寫鎖、信號量和閉鎖。下面是每種鎖的基本概念、使用

    2024年02月09日
    瀏覽(25)
  • 【分布式技術(shù)專題】「OSS中間件系列」Minio的文件服務(wù)的存儲模型及整合Java客戶端訪問的實(shí)戰(zhàn)指南

    【分布式技術(shù)專題】「OSS中間件系列」Minio的文件服務(wù)的存儲模型及整合Java客戶端訪問的實(shí)戰(zhàn)指南

    Minio的元數(shù)據(jù) 數(shù)據(jù)存儲 MinIO對象存儲系統(tǒng)沒有元數(shù)據(jù)數(shù)據(jù)庫,所有的操作都是對象級別的粒度的,這種做法的優(yōu)勢是: 個(gè)別對象的失效,不會溢出為更大級別的系統(tǒng)失效。 便于實(shí)現(xiàn)\\\"強(qiáng)一致性\\\"這個(gè)特性。此特性對于機(jī)器學(xué)習(xí)與大數(shù)據(jù)處理非常重要。 數(shù)據(jù)管理 元數(shù)據(jù)與數(shù)據(jù)一起

    2024年02月11日
    瀏覽(21)
  • 基于SpringBoot+Vue+uniapp畢業(yè)論文管理系統(tǒng)(實(shí)現(xiàn)三個(gè)端,小程序客戶端、PC前臺客戶端、PC管理端)

    基于SpringBoot+Vue+uniapp畢業(yè)論文管理系統(tǒng)(實(shí)現(xiàn)三個(gè)端,小程序客戶端、PC前臺客戶端、PC管理端)

    文末獲取源碼 開發(fā)語言:Java 使用框架:spring boot 前端技術(shù):JavaScript、Vue?、css3 開發(fā)工具:IDEA/MyEclipse/Eclipse、Visual Studio Code 數(shù)據(jù)庫:MySQL 5.7/8.0 數(shù)據(jù)庫管理工具:phpstudy/Navicat JDK版本:Java jdk8 Maven:apache-maven 3.8.1-bin 小程序框架:uniapp 小程序開發(fā)軟件:HBuilder X 小程序運(yùn)行軟

    2024年02月04日
    瀏覽(26)
  • Kafka-客戶端使用

    Kafka-客戶端使用

    Kafka提供了兩套客戶端API,HighLevel API和LowLevel API。 HighLevel API 封裝了kafka的運(yùn)行細(xì)節(jié),使用起來比較簡單,是企業(yè)開發(fā)過程中最常用的客戶端API。 LowLevel API則需要客戶端自己管理Kafka的運(yùn)行細(xì)節(jié),Partition,Offset這些數(shù)據(jù)都由客戶端自行管理。這層API功能更靈活,但是使用起來

    2024年02月22日
    瀏覽(19)
  • kafka客戶端應(yīng)用參數(shù)詳解

    kafka客戶端應(yīng)用參數(shù)詳解

    Kafka提供了非常簡單的客戶端API。只需要引入一個(gè)Maven依賴即可: 1、消息發(fā)送者主流程? 然后可以使用Kafka提供的Producer類,快速發(fā)送消息。 ? 整體來說,構(gòu)建Producer分為三個(gè)步驟: 設(shè)置Producer核心屬性 ?:Producer可選的屬性都可以由ProducerConfig類管理。比如ProducerConfig.BOOTST

    2024年02月07日
    瀏覽(26)
  • kafka客戶端工具(Kafka Tool)的安裝

    kafka客戶端工具(Kafka Tool)的安裝

    官方下載 根據(jù)不同的系統(tǒng)下載對應(yīng)的版本,點(diǎn)擊下載后雙擊,如何一直下一步,安裝 kafka環(huán)境搭建請參考:CentOS 搭建Kafka集群 (1)連接kafka (2)簡單使用 ?

    2024年04月23日
    瀏覽(35)
  • kafka之java客戶端實(shí)戰(zhàn)

    kafka之java客戶端實(shí)戰(zhàn)

    ????????Kafka提供了兩套客戶端API, HighLevel API和LowLevel API 。 HighLevel API封裝了kafka的運(yùn)行細(xì)節(jié),使用起來比較簡單,是企業(yè)開發(fā)過程中最常用的客戶端API。 而LowLevel API則需要客戶端自己管理Kafka的運(yùn)行細(xì)節(jié),Partition,Offset這些數(shù)據(jù)都由客戶端自行管理。這層API功能更靈活,

    2024年01月17日
    瀏覽(22)
  • 自定義kafka客戶端消費(fèi)topic

    使用自定義的KafkaConsumer給spring進(jìn)行管理,之后在注入topic的set方法中,開單線程主動(dòng)訂閱和讀取該topic的消息。 后端服務(wù)不需要啟動(dòng)時(shí)就開始監(jiān)聽消費(fèi),而是根據(jù)啟動(dòng)的模塊或者用戶自定義監(jiān)聽需要監(jiān)聽或者停止的topic 使用的spring集成2.1.8.RELEASE的版本,在@KafkaListener注解中沒

    2024年02月02日
    瀏覽(19)
  • python-kafka客戶端封裝

    本文對python的kafka包做簡單封裝,方便kafka初學(xué)者使用。包安裝: kafka_helper.py kafka_test.py Kafka入門,這一篇就夠了(安裝,topic,生產(chǎn)者,消費(fèi)者)

    2024年02月09日
    瀏覽(20)
  • kafka:java集成 kafka(springboot集成、客戶端集成)

    kafka:java集成 kafka(springboot集成、客戶端集成)

    摘要 對于java的kafka集成,一般選用springboot集成kafka,但可能由于對接方kafka老舊、kafka不安全等問題導(dǎo)致kafak版本與spring版本不兼容,這個(gè)時(shí)候就得自己根據(jù)kafka客戶端api集成了。 一、springboot集成kafka 具體官方文檔地址:https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/

    2023年04月22日
    瀏覽(94)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包