国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Kafka-Java四:Spring配置Kafka消費(fèi)者提交Offset的策略

這篇具有很好參考價(jià)值的文章主要介紹了Kafka-Java四:Spring配置Kafka消費(fèi)者提交Offset的策略。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

一、Kafka消費(fèi)者提交Offset的策略

Kafka消費(fèi)者提交Offset的策略有

  1. 自動(dòng)提交Offset:
    1. 消費(fèi)者將消息拉取下來(lái)以后未被消費(fèi)者消費(fèi)前,直接自動(dòng)提交offset。
    2. 自動(dòng)提交可能丟失數(shù)據(jù),比如消息在被消費(fèi)者消費(fèi)前已經(jīng)提交了offset,有可能消息拉取下來(lái)以后,消費(fèi)者掛了
  2. 手動(dòng)提交Offset
    1. 消費(fèi)者在消費(fèi)消息時(shí)/后,再提交offset,在消費(fèi)者中實(shí)現(xiàn)
    2. 手動(dòng)提交Offset分為:手動(dòng)同步提交(commitSync)、手動(dòng)異步提交(commitAsync)
  3. 什么是Offset
    1. 參考文章:Linux:【Kafka三】組件介紹

二、自動(dòng)提交策略

? ? ? ? Kafka消費(fèi)者默認(rèn)是自動(dòng)提交Offset的策略

? ? ? ? 可設(shè)置自動(dòng)提交的時(shí)間間隔

package com.demo.lxb.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

/**
 * @Description: kafka消費(fèi)者消費(fèi)消息,自動(dòng)提交offset
 * @Author: lvxiaobu
 * @Date: 2023-10-24 16:26
 **/
public class MyConsumerAutoSubmitOffset {

    private  final static String CONSUMER_GROUP_NAME = "GROUP1";
    private  final static String TOPIC_NAME = "topic0921";

    public static void main(String[] args) {
        Properties props = new Properties();

        // 一、設(shè)置參數(shù)
        // 配置kafka地址
//        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
//                "192.168.151.28:9092"); // 單機(jī)配置
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094"); // 集群配置
        // 配置消息 鍵值的序列化規(guī)則
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        // 配置消費(fèi)者組
        props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);

        // 設(shè)置消費(fèi)者offset的提交方式
        // 自動(dòng)提交:默認(rèn)配置
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
        // 自動(dòng)提交offset的時(shí)間間隔
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");

        // 二、創(chuàng)建消費(fèi)者
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
        // 三、消費(fèi)者訂閱主題
        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        // 四、拉取消息,開(kāi)始消費(fèi)
        while (true){
            // 從kafka集群中拉取消息
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            // 消費(fèi)消息,當(dāng)前是自動(dòng)提交模式,在消息上一行消息被拉取下來(lái)以后,offset就自動(dòng)被提交了,下面的代碼如果出錯(cuò),或者此時(shí)
            // 消費(fèi)者掛掉了,那么消費(fèi)其實(shí)是沒(méi)有進(jìn)行消費(fèi)的(也就是業(yè)務(wù)邏輯處理)
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("接收到的消息: 分區(qū): " + record.partition() + ", offset: " + record.offset()
                + ", key值: " + record.key() + " , value值: "+record.value());
            }
        }
    }
}

上述代碼中的如下代碼是自動(dòng)提交策略的相關(guān)設(shè)置?

        // 設(shè)置消費(fèi)者offset的提交方式
        // 自動(dòng)提交:默認(rèn)配置
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
        // 自動(dòng)提交offset的時(shí)間間隔
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");

三、手動(dòng)提交策略

3.1、手動(dòng)同步提交策略

????????手動(dòng)同步提交,會(huì)在提交offset處阻塞。當(dāng)消費(fèi)者接收到 kafka集群返回的消費(fèi)者提交offset成功的ack后,才開(kāi)始執(zhí)行消費(fèi)者中后續(xù)的代碼。

? ? ? ? 因?yàn)槭褂卯惒教峤蝗菀讈G失消息,固一般使用同步提交,在同步提交后不要再做其他邏輯處理。

package com.demo.lxb.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

/**
 * @Description: kafka消費(fèi)者消費(fèi)消息,手動(dòng)同步提交offset
 * @Author: lvxiaobu
 * @Date: 2023-10-24 16:26
 **/
public class MyConsumerMauSubmitOffset {

    private  final static String CONSUMER_GROUP_NAME = "GROUP1";
    private  final static String TOPIC_NAME = "topic0921";

    public static void main(String[] args) {
        Properties props = new Properties();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094"); 
 
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);

        // 關(guān)鍵代碼:關(guān)閉自動(dòng)提交
        // 手動(dòng)提交offset
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
        // 自動(dòng)提交offset的時(shí)間間隔:此時(shí)不再需要設(shè)置該值
//        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");

        KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
        
        consumer.subscribe(Arrays.asList(TOPIC_NAME));
       
        while (true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("接收到的消息: 分區(qū): " + record.partition() + ", offset: " + record.offset()
                + ", key值: " + record.key() + " , value值: "+record.value());
            }

            // 關(guān)鍵代碼:commitSync():同步提交方法
            // 同步方式提交,此時(shí)會(huì)產(chǎn)生阻塞,當(dāng)kafka集群返回了提交成功的ack以后,才會(huì)消除阻塞,進(jìn)行后續(xù)的代碼邏輯。
            // 一般使用同步提交,在同步提交后不再做其他邏輯處理
            consumer.commitSync();

            // do anything
        }
    }
}

3.2、手動(dòng)異步提交策略

????????異步提交,不會(huì)在提交offset代碼處阻塞,即消費(fèi)者提交了offset后,不需要等待kafka集群返回的ack即可繼續(xù)執(zhí)行后續(xù)代碼。但是在提交offset時(shí)需要提供一個(gè)回調(diào)方法,供kafka集群回調(diào),來(lái)告訴消費(fèi)者提交offset的結(jié)果。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-715700.html

package com.demo.lxb.kafka;

import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;

/**
 * @Description: kafka消費(fèi)者消費(fèi)消息,手動(dòng)異步提交offset
 * @Author: lvxiaobu
 * @Date: 2023-10-24 16:26
 **/
public class MyConsumerMauSubmitOffset2 {

    private  final static String CONSUMER_GROUP_NAME = "GROUP1";
    private  final static String TOPIC_NAME = "topic0921";

    public static void main(String[] args) {
        Properties props = new Properties();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094"); 
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);

        // 關(guān)鍵代碼:關(guān)閉自動(dòng)提交
        // 手動(dòng)提交offset
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
        // 自動(dòng)提交offset的時(shí)間間隔:此時(shí)不再需要設(shè)置該值
//        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");

        KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
       
        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        while (true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
         
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("接收到的消息: 分區(qū): " + record.partition() + ", offset: " + record.offset()
                + ", key值: " + record.key() + " , value值: "+record.value());
            }
            // 關(guān)鍵代碼:commitAsync() 異步提交
            // new OffsetCommitCallback是kafka集群會(huì)回調(diào)的方法,告訴消費(fèi)者提交offset的結(jié)果
            consumer.commitAsync(new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
                    if(e != null){
                        // 可將提交失敗的消息記錄到日志
                        System.out.println("記錄提交offset失敗的消息到日志");
                        System.out.println("消費(fèi)者提交offset拋出異常:" + Arrays.toString(e.getStackTrace()));
                        System.out.println("消費(fèi)者提交offset異常的消息信息:" + JSONObject.toJSONString(map));
                    }
                }
            });

            // 后續(xù)邏輯處理,不需要等到kafka集群返回了提交成功的ack以后才開(kāi)始處理。
            //do anything

        }
    }
}

到了這里,關(guān)于Kafka-Java四:Spring配置Kafka消費(fèi)者提交Offset的策略的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶(hù)投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 關(guān)于kafka消費(fèi)者超時(shí)配置

    在Kafka中,消費(fèi)者超時(shí)配置是指消費(fèi)者在等待服務(wù)器響應(yīng)時(shí)的超時(shí)時(shí)間。如果消費(fèi)者在超時(shí)時(shí)間內(nèi)未收到服務(wù)器的響應(yīng),它將重新發(fā)起請(qǐng)求或執(zhí)行其他邏輯。 以下是關(guān)于Kafka消費(fèi)者超時(shí)配置的一些常見(jiàn)選項(xiàng): session.timeout.ms :該配置定義了消費(fèi)者與Kafka集群之間的會(huì)話(huà)超時(shí)時(shí)間

    2024年02月16日
    瀏覽(23)
  • 淺析Spring-kafka源碼——消費(fèi)者模型的實(shí)現(xiàn)

    淺析Spring-kafka源碼——消費(fèi)者模型的實(shí)現(xiàn)

    SpringBoot項(xiàng)目中的消費(fèi)端實(shí)現(xiàn)而言,Spring-kafka沒(méi)有用原生的ConsumerConnector,,而是借助原生client的拉取消息功能做了自己的消費(fèi)模型的實(shí)現(xiàn),提供了@KafkaListener注解這種方式實(shí)現(xiàn)消費(fèi)。 開(kāi)發(fā)中在使用Spring-kafka時(shí),一般也都是通過(guò)使用@KafkaListener注解的方法來(lái)實(shí)現(xiàn)消息監(jiān)聽(tīng)和消費(fèi)。

    2024年02月09日
    瀏覽(27)
  • 基于spring mockito 編寫(xiě)kafka消費(fèi)者的單元測(cè)試

    注: message.json為消息體內(nèi)容的json文件 重點(diǎn)關(guān)注@MockBean和Mockito的使用 readData為從文件中讀取json對(duì)象的方法

    2024年02月04日
    瀏覽(45)
  • 分布式 - 消息隊(duì)列Kafka:Kafka 消費(fèi)者消息消費(fèi)與參數(shù)配置

    分布式 - 消息隊(duì)列Kafka:Kafka 消費(fèi)者消息消費(fèi)與參數(shù)配置

    01. 創(chuàng)建消費(fèi)者 在讀取消息之前,需要先創(chuàng)建一個(gè)KafkaConsumer對(duì)象。創(chuàng)建KafkaConsumer對(duì)象與創(chuàng)建KafkaProducer對(duì)象非常相似——把想要傳給消費(fèi)者的屬性放在Properties對(duì)象里。 為簡(jiǎn)單起見(jiàn),這里只提供4個(gè)必要的屬性:bootstrap.servers、key.deserializer 和 value.deserializer。 ① bootstrap.servers 指

    2024年02月12日
    瀏覽(27)
  • Kafka消費(fèi)者常用超時(shí)時(shí)間配置

    https://blog.csdn.net/BHSZZY/article/details/126757295 //心跳超時(shí)時(shí)間(session超時(shí)時(shí)間)增加成25秒(之前項(xiàng)目設(shè)置了15秒) spring.kafka.properties.session.timeout.ms = 25000 //每次拉取的消息減少為20(之前是默認(rèn)值500) spring.kafka.consumer.max-poll-records=20 //消息消費(fèi)超時(shí)時(shí)間增加為10分鐘 spring.kafka.p

    2024年02月03日
    瀏覽(26)
  • Kafka系列——詳解創(chuàng)建Kafka消費(fèi)者及相關(guān)配置

    參考自kafka系列文章——消費(fèi)者創(chuàng)建與配置 在讀取消息之前,需要先創(chuàng)建一個(gè) KafkaConsumer 對(duì)象。 創(chuàng)建 KafkaConsumer 對(duì)象與創(chuàng)建 KafkaProducer 對(duì)象非常相似——把想要傳給消費(fèi)者的屬性放在 Properties 對(duì)象里,后面深入討論所有屬性。這里我們只需要使用 3 個(gè)必要的屬性: bootstrap.

    2024年02月09日
    瀏覽(21)
  • Spring Boot中使用Kafka時(shí)遇到“構(gòu)建Kafka消費(fèi)者失敗“的問(wèn)題

    在使用Spring Boot開(kāi)發(fā)應(yīng)用程序時(shí),集成Apache Kafka作為消息隊(duì)列是一種常見(jiàn)的做法。然而,有時(shí)候在配置和使用Kafka時(shí)可能會(huì)遇到一些問(wèn)題。本文將探討在Spring Boot應(yīng)用程序中使用Kafka時(shí)可能遇到的\\\"構(gòu)建Kafka消費(fèi)者失敗\\\"錯(cuò)誤,并提供解決方案。 錯(cuò)誤描述: 當(dāng)嘗試構(gòu)建Kafka消費(fèi)者時(shí)

    2024年01月17日
    瀏覽(23)
  • kafka生產(chǎn)者和消費(fèi)者配置介紹

    每個(gè)kafka broker中配置文件 server.properties 默認(rèn)必須配置的屬性如下: **bootstrap.servers** - 指定生產(chǎn)者客戶(hù)端連接kafka集群所需的broker地址列表,格式為host1:port1,host2:port2,可以設(shè)置一個(gè)或多個(gè)。這里并非需要所有的broker地址,因?yàn)樯a(chǎn)者會(huì)從給定的broker里尋找其它的broker。 **key

    2024年02月12日
    瀏覽(23)
  • 在Windows上搭建Kafka環(huán)境的步驟,包括安裝Java、下載Kafka、配置Zookeeper和Kafka、啟動(dòng)Zookeeper和Kafka、創(chuàng)建主題和生產(chǎn)者/消費(fèi)者等

    1. 安裝Java Kafka需要Java環(huán)境支持??梢詮腛racle官網(wǎng)下載JDK,或者使用OpenJDK。 2. 下載Kafka 可以從Kafka官網(wǎng)下載Kafka二進(jìn)制壓縮包。解壓后可以看到bin、config、libs等目錄。 3. 配置Zookeeper Kafka依賴(lài)Zookeeper實(shí)現(xiàn)分布式協(xié)作??梢允褂肒afka自帶的Zookeeper,也可以獨(dú)立安裝Zookeeper。 如果使

    2024年02月11日
    瀏覽(22)
  • 【項(xiàng)目實(shí)戰(zhàn)】Java 開(kāi)發(fā) Kafka 消費(fèi)者

    【項(xiàng)目實(shí)戰(zhàn)】Java 開(kāi)發(fā) Kafka 消費(fèi)者

    ?? 博主介紹 : 博主從事應(yīng)用安全和大數(shù)據(jù)領(lǐng)域,有8年研發(fā)經(jīng)驗(yàn),5年面試官經(jīng)驗(yàn),Java技術(shù)專(zhuān)家,WEB架構(gòu)師,阿里云專(zhuān)家博主,華為云云享專(zhuān)家,51CTO TOP紅人 Java知識(shí)圖譜點(diǎn)擊鏈接: 體系化學(xué)習(xí)Java(Java面試專(zhuān)題) ???? 感興趣的同學(xué)可以收藏關(guān)注下 , 不然下次找不到喲

    2024年02月16日
    瀏覽(31)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包