1 消費者入門概述
1.1 基礎(chǔ)概念
1.1.1 消費者群組
????????Kafka 里消費者從屬于消費者群組,一個群組里的消費者訂閱的都是同一個主題,每個消費者接收主題一部分分區(qū)的消息。
????????如上圖,主題 T 有 4 個分區(qū),群組中只有一個消費者,則該消費者將收到主題 T1 全部 4 個分區(qū)的消息。
????????如上圖,在群組中增加一個消費者 2 ,那么每個消費者將分別從兩個分區(qū)接收消息,上圖中就表現(xiàn)為消費者 1 接收分區(qū) 1 和分區(qū) 3 的消息,消費者 2 接收分區(qū) 2 和分區(qū) 4 的消息。
????????如上圖,在群組中有 4 個消費者,那么每個消費者將分別從 1 個分區(qū)接收消息。
????????但是,當我們增加更多的消費者,超過了主題的分區(qū)數(shù)量,就會有一部分的消費者被閑置,不會接收到任何消息。 往消費者群組里增加消費者是進行橫向伸縮能力的主要方式。所以我們有必要為主題設(shè)定合適規(guī)模的分區(qū),在負載均衡的時候可以加入更多的消費者。但是要記住,一個群組里消費者數(shù)量超過了主題的分區(qū)數(shù)量,多出來的消費者是沒有用處的。 如果是多個應用程序,需要從同一個主題中讀取數(shù)據(jù),只要保證每個應用程序有自己的消費者群組就行了。
????????一般來說,建議分區(qū)數(shù)和消費者數(shù)量保持一致是最好的,當消費組的消費能力不足時,是可以通過增加分區(qū)數(shù)量來提高并行度,但是盡量避免這樣情況發(fā)生,因為,增加一個topic的分區(qū)數(shù)量這個時候,kafka會進行分區(qū)再均衡,在這個期間topic是不可用的,而且一個topic可能有多個消費者組在消費他的數(shù)據(jù),增加分區(qū)數(shù)量會影響到每一個消費者組的,所以再創(chuàng)建topic的時候一定要考慮好分區(qū)數(shù)。
????????具體實現(xiàn)如圖,先建立一個 2 分區(qū)的主題:
1.1.2 其他核心概念
1、訂閱
????????創(chuàng)建消費者后,使用 subscribe() 方法訂閱主題,這個方法接受一個主題列表為參數(shù),也可以接受一個正則表達式為參數(shù);正則表達式同樣也匹配多個主題。如果新創(chuàng)建了新主題,并且主題名字和正則表達式匹配,那么會立即觸發(fā)一次再均衡,消費者就可以讀取新添加的主題。比如,要訂閱所有和 test 相關(guān)的主題,可以 subscribe(“tets.*”)。
2、輪詢
????????為了不斷的獲取消息,我們要在循環(huán)中不斷的進行輪詢,也就是不停調(diào)用 poll 方法。
????????poll 方法的參數(shù)為超時時間,控制 poll 方法的阻塞時間,它會讓消費者在指定的毫秒數(shù)內(nèi)一直等待 broker 返回數(shù)據(jù)。 poll 方法將會返回一個記錄(消息)列表,每一條記錄都包含了記錄所屬的主題信息,記錄所在分區(qū)信息,記錄在分區(qū)里的偏移量,以及記錄的鍵值對。
????????poll 方法不僅僅只是獲取數(shù)據(jù),在新消費者第一次調(diào)用時,它會負責查找群組,加入群組,接受分配的分區(qū)。如果發(fā)生了再均衡,整個過程也是在輪詢期間進行的。
3、提交偏移量
????????當我們調(diào)用 poll 方法的時候, broker 返回的是生產(chǎn)者寫入 Kafka 但是還沒有被消費者讀取過的記錄,消費者可以使用 Kafka 來追蹤消息在分區(qū)里的位置,我們稱之為偏移量 。消費者更新自己讀取到哪個消息的操作,我們稱之為“提交”。
????????消費者是如何提交偏移量的呢?消費者會往一個叫做 _consumer_offset 的特殊主題發(fā)送一個消息, 里面會包括每個分區(qū)的偏移量。
4、多線程安全:
????????KafkaConsumer 的實現(xiàn) 不是 線程安全的,所以我們在多線程的環(huán)境下, 使用 KafkaConsumer 的實例要小心,應該每個消費數(shù)據(jù)的線程擁有自己的 KafkaConsumer 實例。
5、群組協(xié)調(diào):
????????消費者要加入群組時,會向 群組協(xié)調(diào)器 發(fā)送一個 JoinGroup 請求,第一個加入群主的消費者成為群主,群主會獲得群組的成員列表,并負責給每一個消費者分配分區(qū)。分配完畢后,群主把分配情況發(fā)送給 群組協(xié)調(diào)器 ,協(xié)調(diào)器再把這些信息發(fā)送給所有的消費者,每個消費者只能看到自己的分配信息, 只有群主知道群組里所有消費者的分配信息。群組協(xié)調(diào)的工作會在消費者發(fā)生變化( 新加入或者掉線 ) ,主題中分區(qū)發(fā)生了變化(增加)時發(fā)生。
6、分區(qū)再均衡
????????當消費者群組里的消費者發(fā)生變化,或者主題里的分區(qū)發(fā)生了變化,都會導致再均衡現(xiàn)象的發(fā)生。從前面的知識中,我們知道, Kafka 中,存在著消費者對分區(qū)所有權(quán)的關(guān)系,這樣無論是消費者變化,比如增加了消費者,新消費者會讀取原本由其他消費者讀取的分區(qū),消費者減少,原本由它負責的分區(qū)要由其他消費者來讀取,增加了分區(qū),哪個消費者來讀取這個新增的分區(qū),這些行為,都會導致分區(qū)所有權(quán)的變化,這種變化就被稱為再均衡 。再均衡對 Kafka 很重要,這是消費者群組帶來高可用性和伸縮性的關(guān)鍵所在。 不過一般情況下,盡量減少再均衡,因為再均衡期間,消費者是無法讀 取消息的,會造成整個群組一小段時間的不可用 。
????????消費者通過向稱為群組協(xié)調(diào)器的 broker (不同的群組有不同的協(xié)調(diào)器)發(fā)送心跳來維持它和群組的從屬關(guān)系以及對分區(qū)的所有權(quán)關(guān)系。如果消費者長時間不發(fā)送心跳,群組協(xié)調(diào)器認為它已經(jīng)死亡,就會觸發(fā)一次再均衡。 在 0.10.1 及以后的版本中,心跳由單獨的線程負責,相關(guān)的控制參數(shù)為 max.poll.interval.ms 。
7、消費安全問題:
????????一般情況下,我們調(diào)用 poll 方法的時候, broker 返回的是生產(chǎn)者寫入 Kafka 同時 kafka 的消費者提交偏移量,這樣可以確保消費者消息消費不丟失也 不重復,所以一般情況下 Kafka 提供的原生的消費者是安全的,但是事情會這么完美嗎?答案顯然不是的!
1.2 消費者重要參數(shù)
1.3 消費者配置
????????消費者有很多屬性可以設(shè)置,大部分都有合理的默認值,無需調(diào)整。有些參數(shù)可能對內(nèi)存使用,性能和可靠性方面有較大影響??梢詤⒖既缦麓a:
public static void main(String[] args) {
//TODO 消費者三個屬性必須指定(broker地址清單、key和value的反序列化器)
Properties properties = new Properties();
properties.put("bootstrap.servers","127.0.0.1:9092");
properties.put("key.deserializer", StringDeserializer.class);
properties.put("value.deserializer", StringDeserializer.class);
//TODO 群組并非完全必須
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");
//TODO 更多消費者配置(重要的)
properties.put("auto.offset.reset","latest"); //消費者在讀取一個沒有偏移量的分區(qū)或者偏移量無效的情況下,如何處理
properties.put("enable.auto.commit",true); // 表明消費者是否自動提交偏移 默認值true
properties.put("max.poll.records",500); // 控制每次poll方法返回的的記錄數(shù)量 默認值500
//分區(qū)分配給消費者的策略。系統(tǒng)提供兩種策略。默認為Range
properties.put("partition.assignment.strategy",Collections.singletonList(RangeAssignor.class));
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
try {
//TODO 消費者訂閱主題(可以多個)
consumer.subscribe(Collections.singletonList(BusiConst.HELLO_TOPIC));
while(true){
//TODO 拉取(新版本)
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
for(ConsumerRecord<String, String> record:records){
System.out.println(String.format("topic:%s,分區(qū):%d,偏移量:%d," + "key:%s,value:%s",record.topic(),record.partition(),
record.offset(),record.key(),record.value()));
//do my work
//打包任務投入線程池
// ex
}
}
} finally {
consumer.close();
}
}
- auto.offset.reset
????????消費者在讀取一個沒有偏移量的分區(qū)或者偏移量無效的情況下,如何處理。默認值是 latest ,從最新的記錄開始讀取,另一個值是 earliest ,表示消費者從起始位置讀取分區(qū)的記錄。
注意:如果是消費者在讀取一個沒有偏移量的分區(qū)或者偏移量無效的情況(因消費者長時間失效,包含的偏移量記錄已經(jīng)過時并被刪除)下,默認值是 latest 的話,消費者將從最新的記錄開始讀取數(shù)據(jù)(在消費者啟動之后生成的記錄),可以先啟動生產(chǎn)者,再啟動消費者,觀察到這種情況。
- enable .auto.commit
????????默認值 true ,表明消費者是否自動提交偏移。為了盡量避免重復數(shù)據(jù)和數(shù)據(jù)丟失,可以改為 false ,自行控制何時提交。
- partition.assignment.strategy
????????分區(qū)分配給消費者的策略。系統(tǒng)提供兩種策略。默認為 Range 。允許自定義策略。
- Range
????????把主題的連續(xù)分區(qū)分配給消費者。(如果分區(qū)數(shù)量無法被消費者整除、第一個消費者會分到更多分區(qū))
- RoundRobin
????????把主題的分區(qū)循環(huán)分配給消費者。
1.4 自定義策略
????????extends 類 AbstractPartitionAssignor ,然后在消費者端增加參數(shù):properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 類 .class.getName()); 即可。
- max.poll.records,控制每次 poll 方法返回的的記錄數(shù)量。
- fetch.min.bytes,每次 fetch 請求時, server 應該返回的最小字節(jié)數(shù)。如果沒有足夠的數(shù)據(jù)返回,請求會等待,直到足夠的數(shù)據(jù)才會返回。缺省為 1 個字節(jié)。多消費者下,可以設(shè)大這個值,以降低 broker 的工作負載。
- fetch.wait.max.ms,如果沒有足夠的數(shù)據(jù)能夠滿足 fetch.min.bytes ,則此項配置是指在應答 fetch 請求之前, server 會阻塞的最大時間。缺省為 500 個毫秒。和上面的fetch.min.bytes 結(jié)合起來,要么滿足數(shù)據(jù)的大小,要么滿足時間,就看哪個條件先滿足。
- max.partition.fetch.bytes,指定了服務器從每個分區(qū)里返回給消費者的最大字節(jié)數(shù),默認 1MB 。假設(shè)一個主題有 20 個分區(qū)和 5 個消費者,那么每個消費者至少要有 4MB 的可用內(nèi)存來接收記錄,而且一旦有消費者崩潰,這個內(nèi)存還需更大。注意,這個參數(shù)要比服務器的 message.max.bytes 更大,否則消費者可能無法讀取消息。
- session.timeout.ms,如果 consumer 在這段時間內(nèi)沒有發(fā)送心跳信息,則它會被認為掛掉了。默認 3 秒。
- client.id,當向 server 發(fā)出請求時,這個字符串會發(fā)送給 server 。目的是能夠追蹤請求源頭,以此來允許 ip/port 許可列表之外的一些應用可以發(fā)送信息。這項應用可以設(shè)置任意字符串,因為沒有任何功能性的目的,除了記錄和跟蹤。
- receive.buffer.bytes 和 send.buffer.bytes,指定 TCP socket 接受和發(fā)送數(shù)據(jù)包的緩存區(qū)大小。如果它們被設(shè)置為 -1 ,則使用操作系統(tǒng)的默認值。如果生產(chǎn)者或消費者處在不同的數(shù)據(jù)中心,那么可以適當增大這些值,因為跨數(shù)據(jù)中心的網(wǎng)絡一般都有比較高的延遲和比較低的帶寬。
2 kafka消費者工作原理
2.1 kafka消費者工作流程
2.2 消費者組初始化流程
- 確定協(xié)調(diào)器coordinator:每當我們創(chuàng)建一個消費者組的時候,kafka會分配一個broker作為該消費組的一個coordinator,coordinator節(jié)點的選擇:groupid的hash值 % __consumer_offsets的分區(qū)數(shù)量,這個是系統(tǒng)給的;
- 注冊消費者,并選出leader consumer,當有了coordinate,消費者將會開始往該coordinate上進行注冊,第一個注冊的消費者將成為消費組的leader,后續(xù)的作為follower;
- 選出leader后,leader將會從coordinate獲取分區(qū)信息,并會根據(jù)分區(qū)策略給每個consumer分配分區(qū)形成一個消費策略,并將消費策略匯報給coordinate;
- coordinate將每一個consumer對應的分區(qū)下發(fā)給每一個consumer,對所有的follower而言,只知道自己的分區(qū),不知道別人的,但是leader知道所有人的分區(qū);
- 當發(fā)生分區(qū)再均衡的時候,leader將會重復分配過程;
2.3 消費者組詳細消費流程
2.4 消費者使用示例
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
public class CustomConsumer {
public static void main(String[] args) {
// 1. 創(chuàng)建消費者配置對象
Properties properties = new Properties();
// 2. 給消費者配置對象添加參數(shù)(不同于生產(chǎn)者,消費者有 4個必要的配置參數(shù))
// broker的ip地址
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
// 配置 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//配置消費者組(組名必須)
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
// 3. 創(chuàng)建消費者對象
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
// 注冊消費主題
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
consumer.subscribe(topics);
// 4.調(diào)用方法消費數(shù)據(jù)
// 如果kafka集群沒有新數(shù)據(jù)會造成空轉(zhuǎn)
// 填寫參數(shù)為時間,如果沒有拉取數(shù)據(jù),線程睡眠一會
while (true) {
// 設(shè)置1s中消費的一批數(shù)據(jù)
// Duration.ofSeconds(1)不會導致空轉(zhuǎn),拉取不到的時候睡眠1s
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
// 打印消費數(shù)據(jù)
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.topic() + "-" + consumerRecord.partition() + "-" + consumerRecord.offset());
}
}
//5.關(guān)閉資源
// consumer.close();不使用的原因是,已關(guān)閉進程,就不會再消費數(shù)據(jù)了,進程停止就以為著JVM為斷電了,不再工作
}
}
2.5 提交偏移量導致的問題
????????當我們調(diào)用 poll 方法的時候, broker 返回的是生產(chǎn)者寫入 Kafka 但是還沒有被消費者讀取過的記錄,消費者可以使用此記錄來追蹤消息在分區(qū)里的位置,我們稱之為偏移量 。消費者更新自己讀取到哪個消息的操作,我們稱之為“提交”。
????????消費者是如何提交偏移量的呢?消費者會往一個叫做 _consumer_offset 的特殊主題發(fā)送一個消息, 里面會包括每個分區(qū)的偏移量。發(fā)生了再均衡之后,消費者可能會被分配新的分區(qū),為了能夠繼續(xù)工作,消費者者需要讀取每個分區(qū)最后一次提交的偏移量,然后從指定的位置,繼續(xù)讀取消息做處理。
????????1 )如果提交的偏移量小于消費者實際處理的最后一個消息的偏移量,處于兩個偏移量之間的消息會被重復處理。
????????2 )如果提交的偏移量大于客戶端處理的最后一個消息的偏移量 , 那么處于兩個偏移量之間的消息將會丟失。
????????所以, 處理偏移量的方式對客戶端會有很大的影響 。KafkaConsumer API 提供了很多種方式來提交偏移量 。
2.5.1 自動提交
????????最簡單的提交方式是讓消費者自動提交偏移量。 如果 enable.auto.comnit 被設(shè)為 true ,消費者會自動把從 poll() 方法接收到的 最大 偏移量提交上去。 提交時間間隔由 auto.commit.interval.ms 控制,默認值是 5s 。自動提交是在輪詢里進行的,消費者每次在進行輪詢時會檢査是否該提交偏移量了,如果是, 那么就會提交從上一次輪詢返回的偏移量。 不過, 在使用這種簡便的方式之前 , 需要知道它將會帶來怎樣的結(jié)果。 假設(shè)我們?nèi)匀皇褂媚J的 5s 提交時間間隔 , 在最近一次提交之后的 3s 發(fā)生了再均衡,再均衡之后 , 消費者從最后一次提交的偏移量位置開始讀取消息。 這個時候偏移量已經(jīng)落后了 3s ,所以在這 3s 內(nèi)到達的消息會被重復處理??梢酝ㄟ^修改提交時間間隔來更頻繁地提交偏移量 , 減小可能出現(xiàn)重復消息的時間窗, 不過這種情況是無法完全避免的 。 在使用自動提交時, 每次調(diào)用輪詢方法都會把上一次調(diào)用返回的最大偏移量提交上去 , 它并不知道具體哪些消息已經(jīng)被處理了 , 所以在再次調(diào)用之前最好確保所有當前調(diào)用返回的消息都已經(jīng)處理完畢(enable.auto.comnit 被設(shè)為 true 時,在調(diào)用 close() 方法之前也會進行自動提交 ) 。一般情況下不會有什么問題, 不過在處理異常或提前退出輪詢時要格外小心。
????????自動提交雖然方便 , 但是很明顯是一種基于時間提交的方式 , 不過并沒有為我們留有余地來避免重復處理消息。
2.5.2 手動提交(同步)
????????我們通過控制偏移量提交時間來消除丟失消息的可能性,并在發(fā)生再均衡時減少重復消息的數(shù)量。消費者 API 提供了另一種提交偏移量的方式,開發(fā) 者可以在必要的時候提交當前偏移量,而不是基于時間間隔。 把 auto.commit. offset 設(shè)為 false,自行決定何時提交偏移量。使用 commitsync()提交偏移量最簡單也最可靠。這個方法會提交由 poll()方法返回的最 新偏移量,提交成功后馬上返回,如果提交失敗就拋出異常。
注意: commitsync() 將會提交由 poll() 返回的最新偏移量 , 所以在處理完所有記錄后要確保調(diào)用了 commitsync() ,否則還是會有丟失消息的風險。如果發(fā)生了再均衡, 從最近批消息到發(fā)生再均衡之間的所有消息都將被重復處理。 只要沒有發(fā)生不可恢復的錯誤,commitSync ()方法會阻塞,會一直嘗試直至提交成功,如果失敗,也只能記錄異常日志。
????????具體代碼參考:
public static void main(String[] args) {
/*消息消費者*/
Properties properties = KafkaConst.consumerConfig("CommitSync",
StringDeserializer.class,
StringDeserializer.class);
//TODO 取消自動提交
/*取消自動提交*/
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
KafkaConsumer<String,String> consumer
= new KafkaConsumer<String, String>(properties);
try {
consumer.subscribe(Collections.singletonList(
BusiConst.CONSUMER_COMMIT_TOPIC));
while(true){
ConsumerRecords<String, String> records
= consumer.poll(Duration.ofMillis(500));
for(ConsumerRecord<String, String> record:records){ //100個 100~ 200
System.out.println(String.format(
"主題:%s,分區(qū):%d,偏移量:%d,key:%s,value:%s",
record.topic(),record.partition(),record.offset(),
record.key(),record.value()));
//do our work
}
//開始事務
//讀業(yè)務寫數(shù)據(jù)庫-
//偏移量寫入數(shù)據(jù)庫
//TODO 同步提交(這個方法會阻塞)
consumer.commitSync(); //offset =200 max
consumer.commitSync(); //offset =200 max
}
} finally {
consumer.close();
}
}
2.5.3 異步提交
????????手動提交時,在 broker 對提交請求作出回應之前,應用程序會一直阻塞。這時我們可以使用異步提交 API ,我們只管發(fā)送提交請求,無需等待 broker 的響應。
????????在成功提交或碰到無法恢復的錯誤之前 , commitsync() 會一直重試 , 但是 commitAsync 不會。它之所以不進行重試 , 是因為在它收到服務器響應的時候 , 可能有一個更大的偏移量已經(jīng)提交成功。
????????假設(shè)我們發(fā)出一個請求用于提交偏移量 2000,, 這個時候發(fā)生了短暫的通信問題 , 服務器收不到請求 , 自然也不會作出任何響應。與此同時 , 我們處理了另外一批消息, 并成功提交了偏移量 3000 。如果 commitAsync() 重新嘗試提交偏移量 2000, 它有可能在偏移量 3000 之后提交成功。這個時候如果發(fā)生再均衡 , 就會出現(xiàn)重復消息。
????????commitAsync() 也支持回調(diào) , 在 broker 作出響應時會執(zhí)行回調(diào)?;卣{(diào)經(jīng)常被用于記錄提交錯誤或生成度量指標。 回調(diào)具體代碼參考:
public static void main(String[] args) {
/*消息消費者*/
Properties properties = KafkaConst.consumerConfig(
"CommitAsync",
StringDeserializer.class,
StringDeserializer.class);
//TODO 取消自動提交
/*取消自動提交*/
properties.put("enable.auto.commit",false);
KafkaConsumer<String,String> consumer
= new KafkaConsumer<String, String>(properties);
try {
consumer.subscribe(Collections.singletonList(
BusiConst.CONSUMER_COMMIT_TOPIC));
while(true){
ConsumerRecords<String, String> records
= consumer.poll(Duration.ofMillis(500));
for(ConsumerRecord<String, String> record:records){
System.out.println(String.format(
"主題:%s,分區(qū):%d,偏移量:%d,key:%s,value:%s",
record.topic(),record.partition(),record.offset(),
record.key(),record.value()));
//do our work
}
//TODO 異步提交偏移量
consumer.commitAsync();
/*允許執(zhí)行回調(diào)*/
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(
Map<TopicPartition, OffsetAndMetadata> offsets,
Exception exception) {
if(exception!=null){
System.out.print("Commmit failed for offsets ");
System.out.println(offsets);
exception.printStackTrace();
}
}
});
}
} finally {
consumer.close();
}
}
2.5.4 同步異步組合
????????因為同步提交一定會成功、異步可能會失敗,所以一般的場景是同步和異步一起來做。 一般情況下, 針對偶爾出現(xiàn)的提交失敗 , 不進行重試不會有太大問題,因為如果提交失敗是因為臨時問題導致的 , 那么后續(xù)的提交總會有成功的。但如果這是發(fā)生在關(guān)閉消費者或再均衡前的最后一次提交, 就要確保能夠提交成功。 因此, 在消費者關(guān)閉前一般會組合使用 commitAsync() 和 commitsync() 。具體使用,參見代碼如下:
public static void main(String[] args) {
/*消息消費者*/
Properties properties = KafkaConst.consumerConfig("SyncAndAsync",
StringDeserializer.class,
StringDeserializer.class);
//TODO 取消自動提交
/*取消自動提交*/
properties.put("enable.auto.commit",false);
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
try {
consumer.subscribe(Collections.singletonList(
BusiConst.CONSUMER_COMMIT_TOPIC));
while(true){
ConsumerRecords<String, String> records
= consumer.poll(Duration.ofMillis(500));
for(ConsumerRecord<String, String> record:records){
System.out.println(String.format(
"主題:%s,分區(qū):%d,偏移量:%d,key:%s,value:%s",
record.topic(),record.partition(),record.offset(),
record.key(),record.value()));
//do our work
}
//TODO 異步提交
consumer.commitAsync();
}
} catch (CommitFailedException e) {
System.out.println("Commit failed:");
e.printStackTrace();
} finally {
try {
//TODO 為了萬不一失,需要同步提交下
consumer.commitSync();
} finally {
consumer.close();
}
}
}
2.5.5 特定提交
????????在我們前面的提交中,提交偏移量的頻率與處理消息批次的頻率是一樣的。但如果想要更頻繁地提交該怎么辦 ?
????????如果 poll() 方法返回一大批數(shù)據(jù) , 為了避免因再均衡引起的重復處理整批消息 , 想要在批次中間提交偏移量該怎么辦 ? 這種情況無法通過調(diào)用 commitSync()或 commitAsync() 來實現(xiàn),因為它們只會提交最后一個偏移量 , 而此時該批次里的消息還沒有處理完。 消費者 API 允許在調(diào)用 commitsync() 和 commitAsync() 方法時傳進去希望提交的分區(qū)和偏移量的 map 。假設(shè)我們處理了半個批次的消息 , 最后一個來自 主題“customers ”,分區(qū) 3 的消息的偏移量是 5000 ,你可以調(diào)用 commitsync() 方法來提交它。不過,因為消費者可能不只讀取一個分區(qū) , 因為我們需要跟蹤所有分區(qū)的偏移量, 所以在這個層面上控制偏移量的提交會讓代碼變復雜。
????????具體使用,參見代碼如下:
public static void main(String[] args) {
/*消息消費者*/
Properties properties = KafkaConst.consumerConfig(
"CommitSpecial",
StringDeserializer.class,
StringDeserializer.class);
//TODO 必須做
/*取消自動提交*/
properties.put("enable.auto.commit",false);
KafkaConsumer<String,String> consumer
= new KafkaConsumer<String, String>(properties);
Map<TopicPartition, OffsetAndMetadata> currOffsets
= new HashMap<TopicPartition, OffsetAndMetadata>();
int count = 0;
try {
consumer.subscribe(Collections.singletonList(
BusiConst.CONSUMER_COMMIT_TOPIC));
while(true){
ConsumerRecords<String, String> records
= consumer.poll(Duration.ofMillis(500));
for(ConsumerRecord<String, String> record:records){
System.out.println(String.format(
"主題:%s,分區(qū):%d,偏移量:%d,key:%s,value:%s",
record.topic(),record.partition(),record.offset(),
record.key(),record.value()));
currOffsets.put(new TopicPartition(record.topic(),record.partition()),
new OffsetAndMetadata(record.offset()+1,"no meta"));
if(count%11==0){
//TODO 這里特定提交(異步方式,加入偏移量),每11條提交一次
consumer.commitAsync(currOffsets,null);
}
count++;
}
}
} finally {
//TODO 在關(guān)閉前最好同步提交一次偏移量
consumer.commitSync();
consumer.close();
}
}
2.6 分區(qū)再均衡
2.6.1 再均衡監(jiān)聽器
????????在提交偏移量一節(jié)中提到過 , 消費者在退出和進行分區(qū)再均衡之前 , 會做一些清理工作比如,提交偏移量、關(guān)閉文件句柄、數(shù)據(jù)庫連接等。 在為消費者分配新分區(qū)或移除舊分區(qū)時, 可以通過消費者 API 執(zhí)行一些應用程序代碼,在調(diào)用 subscribe() 方法時傳進去一個 ConsumerRebalancelistener實例就可以了。
ConsumerRebalancelistener 有兩個需要實現(xiàn)的方法。
- public void onPartitionsRevoked( Collection< TopicPartition> partitions) 方法會在再均衡開始之前和消費者停止讀取消息之后被調(diào)用。如果在這里提交偏移量,下一個接管分區(qū)的消費者就知道該從哪里開始讀取了;
- public void onPartitionsAssigned( Collection< TopicPartition> partitions) 方法會在重新分配分區(qū)之后和消費者開始讀取消息之前被調(diào)用。
2.6.2 從特定偏移量開始記錄
????????到目前為止 , 我們知道了如何使用 poll() 方法從各個分區(qū)的最新偏移量處開始處理消息。 不過, 有時候我們也需要從特定的偏移量處開始讀取消息。 如果想從分區(qū)的起始位置開始讀取消息, 或者直接跳到分區(qū)的末尾開始讀取消息 , 可以使 seekToBeginning(Collection tp) 和seekToEnd( Collectiontp) 這兩個方法。 不過,Kafka 也為我們提供了用于查找特定偏移量的 API 。它有很多用途 , 比如向后回退幾個消息或者向前跳過幾個消息 ( 對時間比較敏感的應用程序在處理滯后的情況下希望能夠向前跳過若干個消息) 。在使用 Kafka 以外的系統(tǒng)來存儲偏移量時 , 它將給我們帶來更大的驚喜 -- 讓消息的業(yè)務處理和偏移量的提交變得一致。
????????試想一下這樣的場景: 應用程序從 Kafka 讀取事件 ( 可能是網(wǎng)站的用戶點擊事件流 ), 對它們進行處理 ( 可能是使用自動程序清理點擊操作并添加會話信息 ), 然后把結(jié)果保存到數(shù)據(jù)庫。假設(shè)我們真的不想丟失任何數(shù)據(jù), 也不想在數(shù)據(jù)庫里多次保存相同的結(jié)果。 我們可能會,毎處理一條記錄就提交一次偏移量。盡管如此, 在記錄被保存到數(shù)據(jù)庫之后以及偏移量被提交之前 , 應用程序仍然有可能發(fā)生崩潰 , 導致重復處理數(shù)據(jù), 數(shù)據(jù)庫里就會出現(xiàn)重復記錄。 如果保存記錄和偏移量可以在一個原子操作里完成 , 就可以避免出現(xiàn)上述情況。記錄和偏移量要么都被成功提交 , 要么都不提交。如果記錄是保存在數(shù)據(jù)庫里而偏移量是提交到Kafka上 , 那么就無法實現(xiàn)原子操作不過 , 如果在同一個事務里把記錄和偏移量都寫到數(shù)據(jù)庫里會怎樣呢 ? 那么我們就會知道記錄和偏移量要么都成功提交, 要么都沒有 , 然后重新處理記錄。 現(xiàn)在的問題是: 如果偏移量是保存在數(shù)據(jù)庫里而不是 Kafka 里 , 那么消費者在得到新分區(qū)時怎么知道該從哪里開始讀取 ? 這個時候可以使用 seek() 方法。 在消費者啟動或分配到新分區(qū)時, 可以使用 seck() 方法查找保存在數(shù)據(jù)庫里的偏移量。我們可以使用使用 Consumer Rebalancelistener 和 seek() 方法確保我們是從數(shù)據(jù)庫里保存的偏移量所指定的位置開始處理消息的。
2.7 獨立消費者
????????到目前為止 , 我們討論了消費者群組 , 分區(qū)被自動分配給群組里的消費者 , 在群組里新增或移除消費者時自動觸發(fā)再均衡。不過有時候可能只需要一個消費者從一個主題的所有分區(qū)或者某個特定的分區(qū)讀取數(shù)據(jù)。這個時候就不需要消費者群組和再均衡了, 只需要把主題或者分區(qū)分配給消費者 , 然后開始讀取消息并提交偏移量。 如果是這樣的話, 就不需要訂閱主題 , 取而代之的是為自己分配分區(qū)。一個消費者可以訂閱主題 ( 并加入消費者群組 ), 或者為自己分配分區(qū) , 但不能同時做這兩件事情。 獨立消費者相當于自己來分配分區(qū),但是這樣做的好處是自己控制,但是就沒有動態(tài)特性的支持了,包括加入消費者(分區(qū)再均衡之類的),新增分區(qū),這些都需要代碼中去解決,所以一般情況下不推薦使用。
2.8 優(yōu)雅退出
????????如果確定要退出循環(huán) , 需要通過另一個線程調(diào)用 consumer. wakeup() 方法。如果循環(huán)運行在主線程里 , 可以在 ShutdownHook 里調(diào)用該方法。要記住 , consumer. wakeup()是消費者唯一一個可以從其他線程里安全調(diào)用的方法。調(diào)用 consumer. wakeup() 可以退出 poll(), 并拋出 WakeupException 異常。我們不需要處理 Wakeup Exception, 因為它只是用于跳出循環(huán)的一種方式。不過 , 在退出線程之前調(diào)用 consumer.close() 是很有必要的 , 它會提交任何還沒有提交的東西, 并向群組協(xié)調(diào)器發(fā)送消息 , 告知自己要離開群組 , 接下來就會觸發(fā)再均衡 , 而不需要等待會話超時。
參考鏈接
Kafka基本原理詳解-CSDN博客
這是最詳細的Kafka應用教程了 - 掘金
Kafka : Kafka入門教程和JAVA客戶端使用-CSDN博客
簡易教程 | Kafka從搭建到使用 - 知乎
kafka簡介-CSDN博客
Kafka 架構(gòu)及基本原理簡析
kafka是什么
再過半小時,你就能明白kafka的工作原理了(推薦閱讀)
Kafka 設(shè)計與原理詳解
Kafka【入門】就這一篇! - 知乎
kafka簡介_kafka_唏噗-華為云開發(fā)者聯(lián)盟
kafka詳解
Kafka 設(shè)計與原理詳解_kafka的設(shè)計初衷不包括-CSDN博客
kafka學習知識點總結(jié)(三)
Kafka知識總結(jié)之Broker原理總結(jié)_kafka broker-CSDN博客
深度解析kafka broker網(wǎng)絡模型運行原理_kafka broker原理-CSDN博客
Kafka源碼分析及圖解原理之Broker端?
kafka——消費者原理解析
深入分析kafka的消費者配置原理_kafkaconsumer<string, string> consumer;-CSDN博客
一探究竟,詳解Kafka生產(chǎn)者和消費者的工作原理! - 簡書文章來源:http://www.zghlxwxcb.cn/news/detail-835381.html
kafka消費原理_kafka消費邏輯-CSDN博客文章來源地址http://www.zghlxwxcb.cn/news/detail-835381.html
到了這里,關(guān)于【云原生進階之PaaS中間件】第三章Kafka-4.4-消費者工作流程的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!