国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Kafka入門,手動提交offset,同步提交,異步提交,指定 Offset 消費(fèi)(二十三)

這篇具有很好參考價值的文章主要介紹了Kafka入門,手動提交offset,同步提交,異步提交,指定 Offset 消費(fèi)(二十三)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報違法"按鈕提交疑問。

手動提交offset

kafka異步消費(fèi),kafka,linq,分布式
雖然offset十分遍歷,但是由于其是基于時間提交的,開發(fā)人員難以把握offset提交的實(shí)際。因此Kafka還提供了手動提交offset的API
手動提交offset的方法有兩種:分別commitSync(同步提交)和commitAsync(異步提交)。兩者的相同點(diǎn)是,都會將本次提交的一批數(shù)據(jù)最高的偏移量提交:不同點(diǎn)是,同步提交阻塞當(dāng)前線程,一致到提交成功,并且會自動失敗重試(由不可控因素導(dǎo)致,也會出現(xiàn)提交失?。┒惒教峤粍t沒有重試機(jī)制,故有可能提交失敗。
commitSync(同步提交):必須等待offset提交完畢,再去消費(fèi)下一批數(shù)據(jù)。
commitAsync(異步提交):發(fā)送完提交offset請求后,就開始消費(fèi)下一批數(shù)據(jù)了

同步提交

是否自動提交offset properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
同步提交offset kafkaConsumer.commitSync();

由于同步提交offset有失敗重試機(jī)制,故更加可靠,但是由于一致等待提交結(jié)果,提交的效率比較低。以下為同步提交offset的示例

package com.longer.handsync;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class CustomConsumerByHandSync {
    public static void main(String[] args) {
        //創(chuàng)建消費(fèi)者的配置對象
        Properties properties=new Properties();
        //2、給消費(fèi)者配置對象添加參數(shù)
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");
        //配置序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        //配置消費(fèi)者組(組名任意起名)必須
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
        //修改分區(qū)策略
        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");
//        properties.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG,"false");
        //是否自動提交offset
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        //創(chuàng)建消費(fèi)者對象
        KafkaConsumer<String,String> kafkaConsumer=new KafkaConsumer<String, String>(properties);
        //注冊要消費(fèi)的主題
        ArrayList<String> topics=new ArrayList<>();
        topics.add("two");
        kafkaConsumer.subscribe(topics);
        while (true){
            //設(shè)置1s中消費(fèi)一批數(shù)據(jù)
            ConsumerRecords<String,String> consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(1));
            //打印消費(fèi)到的數(shù)據(jù)
            for(ConsumerRecord<String,String> record:consumerRecords){
                System.out.println(record);
            }
            //同步提交offset
             kafkaConsumer.commitSync();
        }
    }
}

異步提交

雖然同步提交offset更可靠一些,但是由于其會阻塞當(dāng)前線程,直到提交成功。因此吞吐量會收到很大的影響,因此更多情況下會選擇異步offset的方式
kafkaConsumer.commitAsync();

package com.longer.handasync;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

/**
 * 同步提交
 */
public class CustomConsumerByHandAsync {
    public static void main(String[] args) {
        //創(chuàng)建消費(fèi)者的配置對象
        Properties properties=new Properties();
        //2、給消費(fèi)者配置對象添加參數(shù)
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");
        //配置序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        //配置消費(fèi)者組(組名任意起名)必須
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
        //修改分區(qū)策略
        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");
//        properties.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG,"false");
        //是否自動提交offset
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        //創(chuàng)建消費(fèi)者對象
        KafkaConsumer<String,String> kafkaConsumer=new KafkaConsumer<String, String>(properties);
        //注冊要消費(fèi)的主題
        ArrayList<String> topics=new ArrayList<>();
        topics.add("two");
        kafkaConsumer.subscribe(topics);
        while (true){
            //設(shè)置1s中消費(fèi)一批數(shù)據(jù)
            ConsumerRecords<String,String> consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(1));
            //打印消費(fèi)到的數(shù)據(jù)
            for(ConsumerRecord<String,String> record:consumerRecords){
                System.out.println(record);
            }
            //同步提交offset
            kafkaConsumer.commitAsync();
        }
    }
}

指定 Offset 消費(fèi)

auto.offset.reset = earliest | latest | none 默認(rèn)是latest
當(dāng)Kafka中沒有初始偏移量(消費(fèi)者組第一次消費(fèi))或服務(wù)器上不再存在當(dāng)前偏移量時(例如該數(shù)據(jù)已被刪除),該怎么辦?
1)earliest:自動將偏移量重置為最早的偏移量,–from-beginning
2) latest(默認(rèn)值):自動將偏移量重置為最新偏移量
3)如果未找到消費(fèi)者組的先前偏移量,則向消費(fèi)者拋出異常。
kafka異步消費(fèi),kafka,linq,分布式
主要代碼文章來源地址http://www.zghlxwxcb.cn/news/detail-665281.html

		Set<TopicPartition> assigment=new HashSet<>();

        while (assigment.size()==0){
            kafkaConsumer.poll(Duration.ofSeconds(1));
            //獲取消費(fèi)者分區(qū)分配信息(有了分區(qū)分配信息才能開始消費(fèi))
            assigment= kafkaConsumer.assignment();
        }
        //遍歷所有分區(qū),并指定從100得位置開始消費(fèi)
        for (TopicPartition tp : assigment) {
            kafkaConsumer.seek(tp,100);
        }
package com.longer.seek;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;

public class CustomConsumerSeek {
    public static void main(String[] args) {
        //創(chuàng)建消費(fèi)者的配置對象
        Properties properties=new Properties();
        //2、給消費(fèi)者配置對象添加參數(shù)
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");
        //配置序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        //配置消費(fèi)者組(組名任意起名)必須
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
        //修改分區(qū)策略
        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");
//        properties.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG,"false");

        //創(chuàng)建消費(fèi)者對象
        KafkaConsumer<String,String> kafkaConsumer=new KafkaConsumer<String, String>(properties);
        //注冊要消費(fèi)的主題
        ArrayList<String> topics=new ArrayList<>();
        topics.add("two");
        kafkaConsumer.subscribe(topics);

        Set<TopicPartition> assigment=new HashSet<>();

        while (assigment.size()==0){
            kafkaConsumer.poll(Duration.ofSeconds(1));
            //獲取消費(fèi)者分區(qū)分配信息(有了分區(qū)分配信息才能開始消費(fèi))
            assigment= kafkaConsumer.assignment();
        }
        //遍歷所有分區(qū),并指定從100得位置開始消費(fèi)
        for (TopicPartition tp : assigment) {
            kafkaConsumer.seek(tp,100);
        }


        while (true){
            //設(shè)置1s中消費(fèi)一批數(shù)據(jù)
            ConsumerRecords<String,String> consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(1));
            //打印消費(fèi)到的數(shù)據(jù)
            for(ConsumerRecord<String,String> record:consumerRecords){
                System.out.println(record);
            }
        }
    }
}

到了這里,關(guān)于Kafka入門,手動提交offset,同步提交,異步提交,指定 Offset 消費(fèi)(二十三)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Kafka3.0.0版本——消費(fèi)者(自動提交 offset)

    Kafka3.0.0版本——消費(fèi)者(自動提交 offset)

    官網(wǎng)文檔 參數(shù)解釋 參數(shù) 描述 enable.auto.commi 默認(rèn)值為 true,消費(fèi)者會自動周期性地向服務(wù)器提交偏移量。 auto.commit.interval.ms 如果設(shè)置了 enable.auto.commit 的值為 true, 則該值定義了消費(fèi)者偏移量向 Kafka 提交的頻率,默認(rèn) 5s。 圖解分析 消費(fèi)者自動提交 offset代碼 消費(fèi)者自動提交

    2024年02月09日
    瀏覽(28)
  • Kafka-Java四:Spring配置Kafka消費(fèi)者提交Offset的策略

    Kafka消費(fèi)者提交Offset的策略有 自動提交Offset: 消費(fèi)者將消息拉取下來以后未被消費(fèi)者消費(fèi)前,直接自動提交offset。 自動提交可能丟失數(shù)據(jù),比如消息在被消費(fèi)者消費(fèi)前已經(jīng)提交了offset,有可能消息拉取下來以后,消費(fèi)者掛了 手動提交Offset 消費(fèi)者在消費(fèi)消息時/后,再提交o

    2024年02月08日
    瀏覽(23)
  • 大數(shù)據(jù)篇Kafka消息隊(duì)列指定Topic打印Key、Value、Offset和Partition

    說到Apache Kafka消息傳遞系統(tǒng)時,以下是一些關(guān)鍵概念的解釋: Key(鍵):Kafka消息由Key和Value組成。Key是一個可選的字段,它通常用于消息的路由和分區(qū)策略。Key的目的是確保具有相同Key的消息被寫入同一個分區(qū)。當(dāng)消費(fèi)者接收到消息時,可以使用Key來進(jìn)行消息處理和路由操

    2024年02月16日
    瀏覽(27)
  • Kafka入門,offset的默認(rèn)維護(hù)位置(二十一)

    Kafka入門,offset的默認(rèn)維護(hù)位置(二十一)

    0.9版本之前:consumer默認(rèn)將offset保持在zookeeper中 從0.9版本開始,consumer默認(rèn)將offset保存在kafka一個內(nèi)置的topic中,該topic為__consumer_offsets __consumer_offsets 主題里面采用key和value方式存儲數(shù)據(jù),key是group.id+topic+分區(qū)號,value就是當(dāng)前offset的值。每隔一段時間,kafka內(nèi)部會對這個topic進(jìn)

    2024年02月15日
    瀏覽(15)
  • kafka查詢offset&生產(chǎn)者offset計算&消費(fèi)offset計算

    kafka查詢offset&生產(chǎn)者offset計算&消費(fèi)offset計算

    1、簡介 ? kafka的介紹:略…(有興趣的同學(xué)可自行Google,這與本文無關(guān) ^ _ ^) 2、需求背景 ? 對kafka做監(jiān)控,需要獲取到kafka接收到消息的offset和被消費(fèi)者消費(fèi)掉消息的offset,編寫接口將數(shù)值交給prometheus,直接觀察判斷kafka的消費(fèi)性能如何。(如何自定義prometheus的監(jiān)控指標(biāo)后續(xù)

    2023年04月25日
    瀏覽(24)
  • kafka—offset偏移量

    offset定義 :消費(fèi)者再消費(fèi)的過程中通過offset來記錄消費(fèi)數(shù)據(jù)的具體位置 offset存放的位置 :從0.9版本開始,consumer默認(rèn)將offset保存在Kafka一個內(nèi)置的topic(系統(tǒng)主題)中,名為__consumer_offsets,即offset維護(hù)在系統(tǒng)主題中 說明:__consumer_offsets 主題里面采用 key 和 value 的方式存儲數(shù)

    2024年02月05日
    瀏覽(23)
  • Kafka之offset位移

    offset :在 Apache Kafka 中,offset 是一個用來唯一標(biāo)識消息在分區(qū)中位置的數(shù)字。每個分區(qū)中的消息都會被分配一個唯一的 offset 值,用來表示該消息在該分區(qū)中的位置。消費(fèi)者可以通過記錄自己消費(fèi)的最后一個 offset 值來跟蹤自己消費(fèi)消息的進(jìn)度,確保不會漏掉消息或者重復(fù)消

    2024年04月10日
    瀏覽(24)
  • Kafka中offset的相關(guān)操作

    offset用于記錄消息消費(fèi)的進(jìn)度,主要有以下幾種, Current offset,用于記錄消費(fèi)者已經(jīng)接收到(不一定有完成消費(fèi))的消息序號,保證同一個消息不會被重復(fù)消費(fèi),可以我們通過kafka-consumer-groups.sh查詢,這也是我們測試或者實(shí)際環(huán)境需要調(diào)整的offset Committed offset,用于記錄消費(fèi)者已

    2024年02月12日
    瀏覽(25)
  • Kafka【應(yīng)用 01】Offset Explorer Kafka 的終極 UI 工具安裝+簡單上手+關(guān)鍵特性測試(一篇學(xué)會使用 Offset Explorer)

    Kafka【應(yīng)用 01】Offset Explorer Kafka 的終極 UI 工具安裝+簡單上手+關(guān)鍵特性測試(一篇學(xué)會使用 Offset Explorer)

    官方自稱 Offset Explorer 是 Kafka 的終極 UI 工具 ?? 我們看一下 官網(wǎng) 的介紹: Offset Explorer (formerly Kafka Tool) is a GUI application for managing and using Apache Kafka ? clusters. It provides an intuitive UI that allows one to quickly view objects within a Kafka cluster as well as the messages stored in the topics of the cluster.

    2024年02月12日
    瀏覽(24)
  • Offset Explorer中添加Kafka連接

    Offset Explorer中添加Kafka連接

    Kafka連接 1. Host中填I(lǐng)P,Port中填端口號,點(diǎn)擊test測試鏈接,點(diǎn)擊Add添加鏈接; ?2. 若第一種方法添加鏈接不成功,點(diǎn)擊Advanced,在bootstrap server中填入ip:port即可鏈接成功。 ?

    2024年02月16日
    瀏覽(24)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包