国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

分布式 - 消息隊(duì)列Kafka:Kafka 消費(fèi)者消費(fèi)位移的提交方式

這篇具有很好參考價(jià)值的文章主要介紹了分布式 - 消息隊(duì)列Kafka:Kafka 消費(fèi)者消費(fèi)位移的提交方式。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

1. 自動(dòng)提交消費(fèi)位移

最簡(jiǎn)單的提交方式是讓消費(fèi)者自動(dòng)提交偏移量,自動(dòng)提交 offset 的相關(guān)參數(shù):

  • enable.auto.commit:是否開(kāi)啟自動(dòng)提交 offset 功能,默認(rèn)為 true;
  • auto.commit.interval.ms:自動(dòng)提交 offset 的時(shí)間間隔,默認(rèn)為5秒;

如果 enable.auto.commit 被設(shè)置為true,那么每過(guò)5秒,消費(fèi)者就會(huì)自動(dòng)提交 poll() 返回的最大偏移量,即將拉取到的每個(gè)分區(qū)中最大的消息位移進(jìn)行提交。提交時(shí)間間隔通過(guò) auto.commit.interval.ms 來(lái)設(shè)定,默認(rèn)是5秒。與消費(fèi)者中的其他處理過(guò)程一樣,自動(dòng)提交也是在輪詢(xún)循環(huán)中進(jìn)行的。消費(fèi)者會(huì)在每次輪詢(xún)時(shí)檢查是否該提交偏移量了,如果是,就會(huì)提交最后一次輪詢(xún)返回的偏移量。

① 啟動(dòng)消費(fèi)者消費(fèi)程序,并設(shè)置為自動(dòng)提交消費(fèi)者位移的方式:

public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-ni");

        // 顯式配置消費(fèi)者自動(dòng)提交位移
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);

        // 顯式配置消費(fèi)者自動(dòng)提交位移的事件間隔
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,4);

        // 創(chuàng)建消費(fèi)者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        // 訂閱主題
        consumer.subscribe(Arrays.asList("ni"));

        // 消費(fèi)數(shù)據(jù)
        while (true){
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record : consumerRecords) {
                System.out.printf("主題 = %s, 分區(qū) = %d, 位移 = %d, " + "消息鍵 = %s, 消息值 = %s\n",
                        record.topic(), record.partition(), record.offset(), record.key(), record.value());
            }
        }
    }
}

② 啟動(dòng)生產(chǎn)者程序發(fā)送3條消息,消息的內(nèi)容都為 hello,kafka

③ 查看消費(fèi)者消費(fèi)的消息記錄:

主題 = ni, 分區(qū) = 0, 位移 = 0, 消息鍵 = null, 消息值 = hello,kafka
主題 = ni, 分區(qū) = 0, 位移 = 1, 消息鍵 = null, 消息值 = hello,kafka
主題 = ni, 分區(qū) = 0, 位移 = 2, 消息鍵 = null, 消息值 = hello,kafka

可以看到,消費(fèi)者消費(fèi)分區(qū)的最新消息的位移為 offset= 2,即消費(fèi)者的消息位移為 offset =2;

④ 查看消費(fèi)者提交的位移:

[root@master01 kafka01]# bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic __consumer_offsets --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning

[group-ni,ni,0]::OffsetAndMetadata(offset=3, leaderEpoch=Optional[0], metadata=, commitTimestamp=1692168114999, expireTimestamp=None)

可以看到,消費(fèi)者的消息位移為 offset =2,但是消費(fèi)者的提交位移為 offset =3;

2. 自動(dòng)提交消費(fèi)位移存在的問(wèn)題?

假設(shè)剛剛提交完一次消費(fèi)位移,然后拉取一批消息進(jìn)行消費(fèi),在下一次自動(dòng)提交消費(fèi)位移之前,消費(fèi)者崩潰了,那么又得從上一次位移提交的地方重新開(kāi)始消費(fèi),這樣便發(fā)生了重復(fù)消費(fèi)的現(xiàn)象(對(duì)于再均衡的情況同樣適用,再均衡完成之后,接管分區(qū)的消費(fèi)者將從最后一次提交的偏移量的位置開(kāi)始讀取消息)??梢酝ㄟ^(guò)修改提交時(shí)間間隔來(lái)更頻繁地提交偏移量,縮小可能導(dǎo)致重復(fù)消息的時(shí)間窗口,但無(wú)法完全避免。

在使用自動(dòng)提交時(shí),到了該提交偏移量的時(shí)候,輪詢(xún)方法將提交上一次輪詢(xún)返回的偏移量,但它并不知道具體哪些消息已經(jīng)被處理過(guò)了。所以,在再次調(diào)用poll()之前,要確保上一次poll()返回的所有消息都已經(jīng)處理完畢(調(diào)用close()方法也會(huì)自動(dòng)提交偏移量)。通常情況下這不會(huì)有什么問(wèn)題,但在處理異?;蛱崆巴顺鲚喸?xún)循環(huán)時(shí)需要特別小心。

雖然自動(dòng)提交很方便,但是沒(méi)有為避免開(kāi)發(fā)者重復(fù)處理消息留有余地。

3. 手動(dòng)提交消費(fèi)位移

在Kafka中還提供了手動(dòng)位移提交的方式,這樣可以使得開(kāi)發(fā)人員對(duì)消費(fèi)位移的管理控制更加靈活。很多時(shí)候并不是說(shuō)拉取到消息就算消費(fèi)完成,而是需要將消息寫(xiě)入數(shù)據(jù)庫(kù)、寫(xiě)入本地緩存,或者是更加復(fù)雜的業(yè)務(wù)處理。在這些場(chǎng)景下,所有的業(yè)務(wù)處理完成才能認(rèn)為消息被成功消費(fèi),手動(dòng)的提交方式可以讓開(kāi)發(fā)人員根據(jù)程序的邏輯在合適的地方進(jìn)行位移提交。

開(kāi)啟手動(dòng)提交功能的前提是消費(fèi)者客戶(hù)端參數(shù) enable.auto.commit 配置為 false,讓?xiě)?yīng)用程序自己決定何時(shí)提交偏移量。手動(dòng)提交可以細(xì)分為同步提交和異步提交,對(duì)應(yīng)于 KafkaConsumer 中的 commitSync() 和 commitAsync() 兩種類(lèi)型的方法。

① 同步提交位移是指消費(fèi)者在提交位移時(shí)會(huì)阻塞,直到提交完成并收到確認(rèn)。它會(huì)提交 poll() 返回的最新偏移量,提交成功后馬上返回,如果由于某些原因提交失敗就拋出異常。 commitAsync() 方法有四個(gè)不同的重載方法,具體定義如下:

public void commitSync()
public void commitSync(Duration timeout)
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) 
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration timeout) 

② 異步提交位移在執(zhí)行的時(shí)候消費(fèi)者線程不會(huì)被阻塞,可能在提交消費(fèi)位移的結(jié)果還未返回之前就開(kāi)始了新一次的拉取操作。異步提交可以使消費(fèi)者的性能得到一定的增強(qiáng)。commitAsync方法有三個(gè)不同的重載方法,具體定義如下:

public void commitAsync() 
public void commitAsync(OffsetCommitCallback callback) 
public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) 

1. 同步提交消費(fèi)位移

在消費(fèi)消息的循環(huán)中,處理完當(dāng)前批次的消息后,在輪詢(xún)更多的消息之前,調(diào)用 commitSync() 方法提交當(dāng)前批次最新的偏移量,這會(huì)阻塞當(dāng)前線程,直到位移提交完成并收到確認(rèn)。 只要沒(méi)有發(fā)生不可恢復(fù)的錯(cuò)誤,commitSync() 方法就會(huì)一直嘗試直至提交成功。如果提交失敗,就把異常記錄到錯(cuò)誤日志里。

public void commitSync()
@Slf4j
public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
        // 顯式配置消費(fèi)者手動(dòng)提交位移
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

        // 創(chuàng)建消費(fèi)者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        // 訂閱主題
        consumer.subscribe(Arrays.asList("topic-01"));
        // 消費(fèi)數(shù)據(jù)
        while (true){
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record : consumerRecords) {
                // 業(yè)務(wù)處理拉取的消息
            }
            try{
                // 消費(fèi)者手動(dòng)提交消費(fèi)位移:同步提交方式
                consumer.commitSync();
            }catch (CommitFailedException exception){
                log.error("commit failed....");
            }
        }
    }
}

還可以將消費(fèi)者程序修改為批量處理+批量提交的方式:

@Slf4j
public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
        // 顯式配置消費(fèi)者手動(dòng)提交位移
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

        // 創(chuàng)建消費(fèi)者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        // 訂閱主題
        consumer.subscribe(Arrays.asList("topic-01"));
        // 消費(fèi)數(shù)據(jù)
        while (true){
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            int minSize = 200;
            List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
            for (ConsumerRecord<String, String> record : consumerRecords) {
                buffer.add(record);
            }
            try{
                // 消費(fèi)者手動(dòng)提交消費(fèi)位移:同步提交方式
                if(buffer.size()>minSize){
                    // 批量處理消息
                    // ...
                }
                // 手動(dòng)提交位移:同步方式
                consumer.commitSync();
            }catch (CommitFailedException exception){
                log.error("commit failed....");
            }
        }
    }
}

上面的示例中將拉取到的消息存入緩存 buffer,等到積累到足夠多的時(shí)候,也就是大于等于200個(gè)的時(shí)候,再做相應(yīng)的批量處理,之后再做批量提交。

commitSync() 方法會(huì)根據(jù) poll() 方法拉取的最新位移來(lái)進(jìn)行提交,只要沒(méi)有發(fā)生不可恢復(fù)的錯(cuò)誤,它就會(huì)阻塞消費(fèi)者線程直至位移提交完成。對(duì)于不可恢復(fù)的錯(cuò)誤,比如 CommitFailedException、WakeupException、InterruptException、AuthenticationException、AuthorizationException 等,我們可以將其捕獲并做針對(duì)性的處理。

需要注意的是,同步提交位移時(shí)需要確保在處理完消息后再進(jìn)行提交,因?yàn)?commitSync() 將會(huì)提交 poll() 返回的最新偏移量,如果你在處理完所有記錄之前就調(diào)用了 commitSync(),那么一旦應(yīng)用程序發(fā)生崩潰,就會(huì)有丟失消息的風(fēng)險(xiǎn)(消息已被提交但未被處理)。如果應(yīng)用程序在處理記錄時(shí)發(fā)生崩潰,但 commitSync() 還沒(méi)有被調(diào)用,那么從最近批次的開(kāi)始位置到發(fā)生再均衡時(shí)的所有消息都將被再次處理——這或許比丟失消息更好,或許更壞。

2. 異步提交消費(fèi)位移

同步提交有一個(gè)缺點(diǎn),在broker對(duì)請(qǐng)求做出回應(yīng)之前,應(yīng)用程序會(huì)一直阻塞,這樣會(huì)限制應(yīng)用程序的吞吐量??梢酝ㄟ^(guò)降低提交頻率來(lái)提升吞吐量,但如果發(fā)生了再均衡,則會(huì)增加潛在的消息重復(fù)。這個(gè)時(shí)候可以使用異步提交API。只管發(fā)送請(qǐng)求,無(wú)須等待broker做出響應(yīng)。

public void commitAsync() 
@Slf4j
public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
        // 顯式配置消費(fèi)者手動(dòng)提交位移
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

        // 創(chuàng)建消費(fèi)者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        // 訂閱主題
        consumer.subscribe(Arrays.asList("topic-01"));
        // 消費(fèi)數(shù)據(jù)
        while (true){
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                // 業(yè)務(wù)邏輯處理
            }
            // 異步提交消費(fèi)位移
            consumer.commitAsync();
        }
    }
}

在提交成功或碰到無(wú)法恢復(fù)的錯(cuò)誤之前,commitSync() 會(huì)一直重試,但commitAsync()不會(huì),這是commitAsync() 的一個(gè)缺點(diǎn)。之所以不進(jìn)行重試,是因?yàn)?commitAsync() 在收到服務(wù)器端的響應(yīng)時(shí),可能已經(jīng)有一個(gè)更大的位移提交成功。假設(shè)我們發(fā)出一個(gè)提交位移2000的請(qǐng)求,這個(gè)時(shí)候出現(xiàn)了短暫的通信問(wèn)題,服務(wù)器收不到請(qǐng)求,自然也不會(huì)做出響應(yīng)。與此同時(shí),我們處理了另外一批消息,并成功提交了位移3000。如果此時(shí) commitAsync() 重新嘗試提交位移2000,則有可能在位移3000之后提交成功。這個(gè)時(shí)候如果發(fā)生再均衡,就會(huì)導(dǎo)致消息重復(fù)。

之所以提到這個(gè)問(wèn)題并強(qiáng)調(diào)提交順序的重要性,是因?yàn)?commitAsync() 也支持回調(diào),回調(diào)會(huì)在broker返回響應(yīng)時(shí)執(zhí)行?;卣{(diào)經(jīng)常被用于記錄位移提交錯(cuò)誤或生成指標(biāo),如果要用它來(lái)重試提交位移,那么一定要注意提交順序。

public void commitAsync(OffsetCommitCallback callback)
@Slf4j
public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
        // 顯式配置消費(fèi)者手動(dòng)提交位移
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

        // 創(chuàng)建消費(fèi)者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        // 訂閱主題
        consumer.subscribe(Arrays.asList("topic-01"));
        // 消費(fèi)數(shù)據(jù)
        while (true){
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                // 業(yè)務(wù)邏輯處理
            }
            // 異步提交消費(fèi)位移
            consumer.commitAsync(new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap, Exception exception) {
                    if(exception!=null){
                        log.info("fail to commit offsets:{}",offsetAndMetadataMap,exception);
                    }
                }
            });
        }
    }
}

異步提交中如何實(shí)現(xiàn)重試:我們可以設(shè)置一個(gè)遞增的序號(hào)來(lái)維護(hù)異步提交的順序,每次位移提交之后就增加序號(hào)相對(duì)應(yīng)的值。在遇到位移提交失敗需要重試的時(shí)候,可以檢查所提交的位移和序號(hào)的值的大小,如果前者小于后者,則說(shuō)明有更大的位移已經(jīng)提交了,不需要再進(jìn)行本次重試;如果兩者相同,則說(shuō)明可以進(jìn)行重試提交。

3. 同步和異步組合提交消費(fèi)位移

一般情況下,偶爾提交失敗但不進(jìn)行重試不會(huì)有太大問(wèn)題,因?yàn)槿绻峤皇∈怯捎谂R時(shí)問(wèn)題導(dǎo)致的,后續(xù)的提交總會(huì)成功。如果消費(fèi)者異常退出,那么這個(gè)重復(fù)消費(fèi)的問(wèn)題就很難避免,因?yàn)檫@種情況下無(wú)法及時(shí)提交消費(fèi)位移;但如果這是發(fā)生在消費(fèi)者被關(guān)閉或再均衡前的最后一次提交,則要確保提交是成功的,可以在退出或再均衡執(zhí)行之前使用同步提交的方式做最后的把關(guān)。

@Slf4j
public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
        // 顯式配置消費(fèi)者手動(dòng)提交位移
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Arrays.asList("topic-01"));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll( Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
					// 業(yè)務(wù)邏輯處理
                }
                // 異步提交位移
                consumer.commitAsync();
            }
        } catch (Exception e) {
            log.error("Unexpected error", e);
        } finally {
            try {
                // 同步提交位移
                consumer.commitSync();
            }finally{
                consumer.close();
            }
        }
    }
}

4. 提交特定的消費(fèi)位移

對(duì)于采用 commitSync() 的無(wú)參方法而言,它提交消費(fèi)位移的頻率和拉取批次消息、處理批次消息的頻率是一樣的。但如果想要更頻繁地提交位移該怎么辦?如果 poll() 返回了一大批數(shù)據(jù),那么為了避免可能因再均衡引起的消息重復(fù),想要在批次處理過(guò)程中提交位移該怎么辦?這個(gè)時(shí)候不能只是調(diào)用 commitSync() 或commitAsync(),因?yàn)樗鼈冎粫?huì)提交消息批次里的最后一個(gè)位移。

幸運(yùn)的是,消費(fèi)者API允許在調(diào)用 commitSync() 和 commitAsync() 時(shí)傳給它們想要提交的分區(qū)和位移:

public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets)
public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)

分布式 - 消息隊(duì)列Kafka:Kafka 消費(fèi)者消費(fèi)位移的提交方式,【分布式-消息隊(duì)列Kafka】,分布式,kafka,linq

如圖:消費(fèi)者的提交位移=當(dāng)前一次poll拉取的分區(qū)消息的最大位移offset + 1,這個(gè)提交位移就是下次

@Slf4j
public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
        // 顯式配置消費(fèi)者手動(dòng)提交位移
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Arrays.asList("topic-01"));

        ConcurrentHashMap<TopicPartition,OffsetAndMetadata> offsets = new ConcurrentHashMap<>();
        int count = 0;
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                // 消息所屬的主題和分區(qū)
                TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
                // 消費(fèi)者提交的消費(fèi)位移=當(dāng)前消費(fèi)消息的位移+1
                OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset() + 1);
                offsets.put(topicPartition, offsetAndMetadata);
                if(count % 1000 == 0){
                    consumer.commitAsync(offsets,null);
                }
                count++;
            }
        }
    }
}

5. 按分區(qū)提交消費(fèi)位移

@Slf4j
public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
        // 顯式配置消費(fèi)者手動(dòng)提交位移
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Arrays.asList("topic-01"));

        while (true){
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            // 獲取拉取的消息包含的所有分區(qū)列表
            Set<TopicPartition> partitions = consumerRecords.partitions();
            for (TopicPartition partition : partitions) {
                // 獲取當(dāng)前分區(qū)要消費(fèi)的消息
                List<ConsumerRecord<String, String>> partitionRecords = consumerRecords.records(partition);
                // 獲取當(dāng)前分區(qū)消息的最大位移
                long lastConsumerOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                // 當(dāng)前分區(qū)的消費(fèi)位移提交 = 當(dāng)前分區(qū)消息的最大位移 + 1
                Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap = Collections.singletonMap(partition, new OffsetAndMetadata(lastConsumerOffset + 1));
                consumer.commitSync(topicPartitionOffsetAndMetadataMap);
            }
        }
    }
}

4. 消費(fèi)者查找不到消費(fèi)位移時(shí)怎么辦?

當(dāng)一個(gè)新的消費(fèi)組建立的時(shí)候,它根本沒(méi)有可以查找的消費(fèi)位移。或者消費(fèi)組內(nèi)的一個(gè)新消費(fèi)者訂閱了一個(gè)新的主題,它也沒(méi)有可以查找的消費(fèi)位移。當(dāng)__consumer_offsets 主題中有關(guān)這個(gè)消費(fèi)組的位移信息過(guò)期而被刪除后,它也沒(méi)有可以查找的消費(fèi)位移。當(dāng) Kafka 中沒(méi)有初始位移或服務(wù)器上不再存在當(dāng)前位移時(shí),該怎么辦?

此時(shí)會(huì)根據(jù)消費(fèi)者客戶(hù)端參數(shù) auto.offset.reset 的配置來(lái)決定從何處開(kāi)始進(jìn)行消費(fèi),auto.offset.reset 參數(shù)的取值如下:

  • latest(默認(rèn)值):表示從分區(qū)末尾開(kāi)始消費(fèi)消息。
  • earliest: 表示消費(fèi)者會(huì)從起始處,也就是0開(kāi)始消費(fèi)。
  • none:查到不到消費(fèi)位移的時(shí)候,既不從最新的消息位置處開(kāi)始消費(fèi),也不從最早的消息位置處開(kāi)始消費(fèi),此時(shí)會(huì)報(bào)出NoOffsetForPartitionException異常。如果能夠找到消費(fèi)位移,那么配置為“none”不會(huì)出現(xiàn)任何異常。

如果配置的不是“l(fā)atest”、“earliest”和“none”,則會(huì)報(bào)出ConfigException異常。

auto.offset.reset 參數(shù)用于指定消費(fèi)者在啟動(dòng)時(shí),如果找不到消費(fèi)位移應(yīng)該從哪里開(kāi)始消費(fèi)消息。 如果能夠找到消費(fèi)位移,那么消費(fèi)者會(huì)從該位移處開(kāi)始消費(fèi)消息,那么 auto.offset.reset 參數(shù)并不會(huì)奏效,只有在找不到消費(fèi)位移時(shí)才會(huì)生效。如果發(fā)生位移越界,即消費(fèi)位移超出了消息隊(duì)列中消息的數(shù)量或位置范圍,那么 auto.offset.reset 參數(shù)也會(huì)生效。

5. 如何從特定分區(qū)位移處讀取消息?

如果消費(fèi)者能夠找到消費(fèi)位移,使用 poll() 可以從各個(gè)分區(qū)的最新位移處讀取消息, 而且提供的 auto.offset.reset 參數(shù)也可以在找不到消費(fèi)位移或位移越界的情況下粗粒度地從開(kāi)頭或末尾開(kāi)始消費(fèi)。但是有些時(shí)候,我們需要一種更細(xì)粒度的掌控,可以讓我們從特定的位移處開(kāi)始拉取消息,而 KafkaConsumer 中的 seek() 方法正好提供了這個(gè)功能,讓我們得以追前消費(fèi)或回溯消費(fèi)。

public void seek(TopicPartition partition, long offset)
public void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata)

① seek() 方法中的參數(shù) partition 表示分區(qū),而 offset 參數(shù)用來(lái)指定從分區(qū)的哪個(gè)位置開(kāi)始消費(fèi)。seek() 方法只能重置消費(fèi)者分配到的分區(qū)的消費(fèi)位置,而分區(qū)的分配是在 poll() 方法的調(diào)用過(guò)程中實(shí)現(xiàn)的。也就是說(shuō),在執(zhí)行 seek() 方法之前需要先執(zhí)行一次poll()方法,等到分配到分區(qū)之后才可以重置消費(fèi)位置:

@Slf4j
public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
        // 顯式配置消費(fèi)者手動(dòng)提交位移
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Arrays.asList("topic-01"));

        // 執(zhí)行一次poll() 方法完成分區(qū)分配的邏輯
        //  ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(0));
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
        Set<TopicPartition> topicPartitions = consumer.assignment();
        for (TopicPartition topicPartition : topicPartitions) {
            consumer.seek(topicPartition,10);
        }

        while (true) {
            ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(1000));
            // ...
        }
    }
}

② 如果 poll() 方法中的參數(shù)為0,此方法立刻返回,那么 poll() 方法內(nèi)部進(jìn)行分區(qū)分配的邏輯就會(huì)來(lái)不及實(shí)施,也就是說(shuō),消費(fèi)者此時(shí)并未分配到任何分區(qū),那么 topicPartitions 便是一個(gè)空列表。那么這里的 timeout 參數(shù)設(shè)置為多少合適呢?太短會(huì)使分配分區(qū)的動(dòng)作失敗,太長(zhǎng)又有可能造成一些不必要的等待。我們可以通過(guò) KafkaConsumer的 assignment()方法來(lái)判定是否分配到了相應(yīng)的分區(qū):

@Slf4j
public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
        // 顯式配置消費(fèi)者手動(dòng)提交位移
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Arrays.asList("topic-01"));

        Set<TopicPartition> topicPartitions = consumer.assignment();
        // 此時(shí)說(shuō)明還未完成分區(qū)分配
        while (topicPartitions.size()==0){
            consumer.poll(Duration.ofMillis(100));
            topicPartitions = consumer.assignment();
        }
        for (TopicPartition topicPartition : topicPartitions) {
            // 重置每個(gè)分區(qū)的消費(fèi)位置為10
            consumer.seek(topicPartition,10);
        }

        while (true) {
            ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(1000));
            // 消費(fèi)消息
        }
    }
}

③ 如果對(duì)未分配到的分區(qū)執(zhí)行seek() 方法,那么會(huì)報(bào)出 IllegalStateException 的異常。類(lèi)似在調(diào)用subscribe() 方法之后直接調(diào)用seek() 方法:

@Slf4j
public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
        // 顯式配置消費(fèi)者手動(dòng)提交位移
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Arrays.asList("topic-01"));


        // 未完成分區(qū)分配,直接調(diào)用seek方法,重置分區(qū)1的消費(fèi)位置為10
        consumer.seek(new TopicPartition("topic-01",1),10);

        while (true) {
            ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(1000));
            // 消費(fèi)消息
        }
    }
}

報(bào)錯(cuò):

Exception in thread "main" java.lang.IllegalStateException: No current assignment for partition topic-01-1

④ 如果消費(fèi)組內(nèi)的消費(fèi)者在啟動(dòng)的時(shí)候能夠找到消費(fèi)位移,那么消費(fèi)者就會(huì)從該位移處開(kāi)始消費(fèi)消息。除非發(fā)生位移越界,即消費(fèi)位移超出了消息隊(duì)列中消息的數(shù)量或位置范圍,否則 auto.offset.reset 參數(shù)并不會(huì)奏效,此時(shí)如果想指定從開(kāi)頭或末尾開(kāi)始消費(fèi),就需要seek() 方法的幫助了,指定從分區(qū)末尾開(kāi)始消費(fèi):

endOffsets() 方法用來(lái)獲取指定分區(qū)的末尾的消息位置, endOffsets 的具體方法定義如下:

public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions)
public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout)

其中 partitions 參數(shù)表示分區(qū)集合,而 timeout 參數(shù)用來(lái)設(shè)置等待獲取的超時(shí)時(shí)間。如果沒(méi)有指定 timeout 參數(shù)的值,那么 endOffsets() 方法的等待時(shí)間由客戶(hù)端參數(shù) request.timeout.ms 來(lái)設(shè)置,默認(rèn)值為 30000。與 endOffsets 對(duì)應(yīng)的是 beginningOffset() 方法,一個(gè)分區(qū)的起始位置起初是0,但并不代表每時(shí)每刻都為0,因?yàn)槿罩厩謇淼膭?dòng)作會(huì)清理舊的數(shù)據(jù),所以分區(qū)的起始位置會(huì)自然而然地增加,beginningOffsets() 方法的具體定義如下:

public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) 
public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout)

beginningOffsets() 方法中的參數(shù)內(nèi)容和含義都與 endOffsets() 方法中的一樣,配合這兩個(gè)方法我們就可以從分區(qū)的開(kāi)頭或末尾開(kāi)始消費(fèi)。其實(shí)KafkaConsumer中直接提供了seekToBeginning() 方法和seekToEnd() 方法來(lái)實(shí)現(xiàn)這兩個(gè)功能,這兩個(gè)方法的具體定義如下:

public void seekToBeginning(Collection<TopicPartition> partitions)
public void seekToEnd(Collection<TopicPartition> partitions)

⑤ 有時(shí)候我們并不知道特定的消費(fèi)位置,卻知道一個(gè)相關(guān)的時(shí)間點(diǎn),比如我們想要消費(fèi)昨天8點(diǎn)之后的消息,這個(gè)需求更符合正常的思維邏輯。此時(shí)我們無(wú)法直接使用seek() 方法來(lái)追溯到相應(yīng)的位置。KafkaConsumer同樣考慮到了這種情況,它提供了一個(gè)offsetsForTimes() 方法,通過(guò)timestamp來(lái)查詢(xún)與此對(duì)應(yīng)的分區(qū)位置:

public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout)

offsetsForTimes() 方法的參數(shù) timestampsToSearch 是一個(gè)Map類(lèi)型,key為待查詢(xún)的分區(qū),而 value 為待查詢(xún)的時(shí)間戳,該方法會(huì)返回時(shí)間戳大于等于待查詢(xún)時(shí)間的第一條消息對(duì)應(yīng)的位置和時(shí)間戳,對(duì)應(yīng)于 OffsetAndTimestamp 中的 offset 和 timestamp字段。下面的示例演示了 offsetsForTimes() 和 seek() 之間的使用方法,首先通過(guò) offsetsForTimes() 方法獲取一天之前的消息位置,然后使用 seek() 方法追溯到相應(yīng)位置開(kāi)始消費(fèi):

@Slf4j
public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
        // 顯式配置消費(fèi)者手動(dòng)提交位移
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Arrays.asList("topic-01"));

        Map<TopicPartition,Long> timestampToSearch = new HashMap<>();
        Set<TopicPartition> topicPartitionSet = consumer.assignment();
        // 查詢(xún)的分區(qū)以及查詢(xún)的時(shí)間戳
        for (TopicPartition topicPartition : topicPartitionSet) {
            timestampToSearch.put(topicPartition,System.currentTimeMillis()-1*24*3600*1000);
        }

        // 獲取時(shí)間戳大于等于待查詢(xún)時(shí)間的第一條消息對(duì)應(yīng)的位置和時(shí)間戳
        Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = consumer.offsetsForTimes(timestampToSearch);
        for (TopicPartition topicPartition : topicPartitionSet) {
            OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(topicPartition);
            // seek 方法重置消費(fèi)的位移
            if(offsetAndTimestamp != null){
                consumer.seek(topicPartition,offsetAndTimestamp.offset());
            }
        }
    }
}

⑥ 位移越界也會(huì)觸發(fā) auto.offset.reset 參數(shù)的執(zhí)行,位移越界是指知道消費(fèi)位置卻無(wú)法在實(shí)際的分區(qū)中查找到,比如原本拉取位置為101(fetch offset 101),但已經(jīng)越界了(out of range),所以此時(shí)會(huì)根據(jù) auto.offset.reset 參數(shù)的默認(rèn)值來(lái)將拉取位置重置(resetting offset)為100,我們也能知道此時(shí)分區(qū)中最大的消息 offset 為99。

6. 如何優(yōu)雅地退出輪詢(xún)循環(huán)消費(fèi)?

如何優(yōu)雅地退出輪詢(xún)循環(huán),如果你確定馬上要關(guān)閉消費(fèi)者(即使消費(fèi)者還在等待一個(gè)poll()返回),那么可以在另一個(gè)線程中調(diào)用consumer.wakeup()。如果輪詢(xún)循環(huán)運(yùn)行在主線程中,那么可以在ShutdownHook里調(diào)用這個(gè)方法。需要注意的是,consumer.wakeup() 是消費(fèi)者唯一一個(gè)可以在其他線程中安全調(diào)用的方法。調(diào)用 consumer.wakeup() 會(huì)導(dǎo)致poll()拋出WakeupException,如果調(diào)用 consumer.wakeup() 時(shí)線程沒(méi)有在輪詢(xún),那么異常將在下一次調(diào)用 poll() 時(shí)拋出。不一定要處理WakeupException,但在退出線程之前必須調(diào)用consumer.close() 。消費(fèi)者在被關(guān)閉時(shí)會(huì)提交還沒(méi)有提交的偏移量,并向消費(fèi)者協(xié)調(diào)器發(fā)送消息,告知自己正在離開(kāi)群組。協(xié)調(diào)器會(huì)立即觸發(fā)再均衡,被關(guān)閉的消費(fèi)者所擁有的分區(qū)將被重新分配給群組里其他的消費(fèi)者,不需要等待會(huì)話(huà)超時(shí)。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-657359.html

到了這里,關(guān)于分布式 - 消息隊(duì)列Kafka:Kafka 消費(fèi)者消費(fèi)位移的提交方式的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶(hù)投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 分布式 - 消息隊(duì)列Kafka:Kafka消費(fèi)者的分區(qū)分配策略

    分布式 - 消息隊(duì)列Kafka:Kafka消費(fèi)者的分區(qū)分配策略

    Kafka 消費(fèi)者負(fù)載均衡策略? Kafka 消費(fèi)者分區(qū)分配策略? 1. 環(huán)境準(zhǔn)備 創(chuàng)建主題 test 有5個(gè)分區(qū),準(zhǔn)備 3 個(gè)消費(fèi)者并進(jìn)行消費(fèi),觀察消費(fèi)分配情況。然后再停止其中一個(gè)消費(fèi)者,再次觀察消費(fèi)分配情況。 ① 創(chuàng)建主題 test,該主題有5個(gè)分區(qū),2個(gè)副本: ② 創(chuàng)建3個(gè)消費(fèi)者CustomConsu

    2024年02月13日
    瀏覽(31)
  • 分布式 - 消息隊(duì)列Kafka:Kafka消費(fèi)者分區(qū)再均衡(Rebalance)

    分布式 - 消息隊(duì)列Kafka:Kafka消費(fèi)者分區(qū)再均衡(Rebalance)

    01. Kafka 消費(fèi)者分區(qū)再均衡是什么? 消費(fèi)者群組里的消費(fèi)者共享主題分區(qū)的所有權(quán)。當(dāng)一個(gè)新消費(fèi)者加入群組時(shí),它將開(kāi)始讀取一部分原本由其他消費(fèi)者讀取的消息。當(dāng)一個(gè)消費(fèi)者被關(guān)閉或發(fā)生崩潰時(shí),它將離開(kāi)群組,原本由它讀取的分區(qū)將由群組里的其他消費(fèi)者讀取。 分區(qū)

    2024年02月12日
    瀏覽(31)
  • 【分布式技術(shù)】消息隊(duì)列Kafka

    【分布式技術(shù)】消息隊(duì)列Kafka

    目錄 一、Kafka概述 二、消息隊(duì)列Kafka的好處 三、消息隊(duì)列Kafka的兩種模式 四、Kafka 1、Kafka 定義 2、Kafka 簡(jiǎn)介 3、Kafka 的特性 五、Kafka的系統(tǒng)架構(gòu) 六、實(shí)操部署Kafka集群 ?步驟一:在每一個(gè)zookeeper節(jié)點(diǎn)上完成kafka部署 ?編輯 步驟二:傳給其他節(jié)點(diǎn) 步驟三:?jiǎn)?dòng)3個(gè)節(jié)點(diǎn) kafka管理

    2024年01月23日
    瀏覽(27)
  • kafka 分布式的情況下,如何保證消息的順序消費(fèi)?

    kafka 分布式的情況下,如何保證消息的順序消費(fèi)?

    目錄 一、什么是分布式 二、kafka介紹 三、消息的順序消費(fèi) 四、如何保證消息的順序消費(fèi) ? 分布式是指將計(jì)算任務(wù)分散到多個(gè)計(jì)算節(jié)點(diǎn)上進(jìn)行并行處理的一種計(jì)算模型。在分布式系統(tǒng)中,多臺(tái)計(jì)算機(jī)通過(guò)網(wǎng)絡(luò)互聯(lián),共同協(xié)作完成任務(wù)。每個(gè)計(jì)算節(jié)點(diǎn)都可以獨(dú)立運(yùn)行,并且可以

    2024年02月10日
    瀏覽(21)
  • 分布式 - 消息隊(duì)列Kafka:Kafka生產(chǎn)者發(fā)送消息的方式

    分布式 - 消息隊(duì)列Kafka:Kafka生產(chǎn)者發(fā)送消息的方式

    不管是把Kafka作為消息隊(duì)列、消息總線還是數(shù)據(jù)存儲(chǔ)平臺(tái),總是需要一個(gè)可以往Kafka寫(xiě)入數(shù)據(jù)的生產(chǎn)者、一個(gè)可以從Kafka讀取數(shù)據(jù)的消費(fèi)者,或者一個(gè)兼具兩種角色的應(yīng)用程序。 Kafka 生產(chǎn)者是指使用 Apache Kafka 消息系統(tǒng)的應(yīng)用程序,它們負(fù)責(zé)將消息發(fā)送到 Kafka 集群中的一個(gè)或多

    2024年02月13日
    瀏覽(29)
  • 分布式 - 消息隊(duì)列Kafka:Kafka生產(chǎn)者發(fā)送消息的分區(qū)策略

    分布式 - 消息隊(duì)列Kafka:Kafka生產(chǎn)者發(fā)送消息的分區(qū)策略

    01. Kafka 分區(qū)的作用 分區(qū)的作用就是提供負(fù)載均衡的能力,或者說(shuō)對(duì)數(shù)據(jù)進(jìn)行分區(qū)的主要原因,就是為了實(shí)現(xiàn)系統(tǒng)的高伸縮性。不同的分區(qū)能夠被放置到不同節(jié)點(diǎn)的機(jī)器上,而數(shù)據(jù)的讀寫(xiě)操作也都是針對(duì)分區(qū)這個(gè)粒度而進(jìn)行的,這樣每個(gè)節(jié)點(diǎn)的機(jī)器都能獨(dú)立地執(zhí)行各自分區(qū)的

    2024年02月13日
    瀏覽(32)
  • 分布式 - 消息隊(duì)列Kafka:Kafka生產(chǎn)者發(fā)送消息的3種方式

    分布式 - 消息隊(duì)列Kafka:Kafka生產(chǎn)者發(fā)送消息的3種方式

    不管是把Kafka作為消息隊(duì)列、消息總線還是數(shù)據(jù)存儲(chǔ)平臺(tái),總是需要一個(gè)可以往Kafka寫(xiě)入數(shù)據(jù)的生產(chǎn)者、一個(gè)可以從Kafka讀取數(shù)據(jù)的消費(fèi)者,或者一個(gè)兼具兩種角色的應(yīng)用程序。 Kafka 生產(chǎn)者是指使用 Apache Kafka 消息系統(tǒng)的應(yīng)用程序,它們負(fù)責(zé)將消息發(fā)送到 Kafka 集群中的一個(gè)或多

    2024年02月13日
    瀏覽(28)
  • zookeeper+kafka分布式消息隊(duì)列集群的部署

    zookeeper+kafka分布式消息隊(duì)列集群的部署

    目錄 一、zookeeper 1.Zookeeper 定義 2.Zookeeper 工作機(jī)制 3.Zookeeper 特點(diǎn) 4.Zookeeper 數(shù)據(jù)結(jié)構(gòu) 5.Zookeeper 應(yīng)用場(chǎng)景 (1)統(tǒng)一命名服務(wù) (2)統(tǒng)一配置管理 (3)統(tǒng)一集群管理 (4)服務(wù)器動(dòng)態(tài)上下線 6.Zookeeper 選舉機(jī)制 (1)第一次啟動(dòng)選舉機(jī)制 (2)非第一次啟動(dòng)選舉機(jī)制 7.部署zookeepe

    2024年02月14日
    瀏覽(25)
  • 分布式應(yīng)用之zookeeper集群+消息隊(duì)列Kafka

    分布式應(yīng)用之zookeeper集群+消息隊(duì)列Kafka

    ? ? ? ?ZooKeeper是一個(gè)分布式的,開(kāi)放源碼的分布式應(yīng)用程序協(xié)調(diào)服務(wù),是Google的Chubby一個(gè)開(kāi)源的實(shí)現(xiàn),是Hadoop和Hbase的重要組件。它是一個(gè)為分布式應(yīng)用提供一致性服務(wù)的軟件,提供的功能包括:配置維護(hù)、域名服務(wù)、分布式同步、組服務(wù)等。為分布式框架提供協(xié)調(diào)服務(wù)的

    2024年02月06日
    瀏覽(139)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包