1. 文章引言
最近在學(xué)習(xí)kafka相關(guān)的知識(shí),特將學(xué)習(xí)成功記錄成文章,以供大家共同學(xué)習(xí)。
首先要注意的是,Kafka
中的Topic
和ActiveMQ
中的Topic
是不一樣的。
在Kafka
中,Topic
是一個(gè)存儲(chǔ)消息的邏輯概念,可以認(rèn)為是一個(gè)消息集合。每條消息發(fā)送到Kafka
集群的消息都有一個(gè)類(lèi)別。
物理上來(lái)說(shuō),不同的Topic
的消息是分開(kāi)存儲(chǔ)的,每個(gè)Topic
可以有多個(gè)生產(chǎn)者向它發(fā)送消息,也可以有多個(gè)消費(fèi)者去消費(fèi)其中的消息。
每個(gè)Topic
可以劃分多個(gè)分區(qū)**(每個(gè)Topic
至少有一個(gè)分區(qū))**,同一Topic
下的不同分區(qū)包含的消息是不同的。
每個(gè)消息在被添加到分區(qū)時(shí),都會(huì)被分配一個(gè)offset
,它是消息在此分區(qū)中的唯一編號(hào),Kafka
通過(guò)offset
保證消息在分區(qū)內(nèi)的順序。
offset
的順序不跨分區(qū),即Kafka
只保證在同一個(gè)分區(qū)內(nèi)的消息是有序的。
消息是每次追加到對(duì)應(yīng)的Partition
的后面:
2. Topic & Partition的存儲(chǔ)
Topic
是一個(gè)邏輯上的概念,具體的存儲(chǔ)還是基于Partition
來(lái)的。
創(chuàng)建一個(gè)test2 Topic
(注意這里的 partitions 參數(shù)為 3):
可以進(jìn)入/tmp/kafka-logs
目錄下進(jìn)行查看(當(dāng)前機(jī)器IP 192.168.220.135
),如下圖所示:
在另外一臺(tái)136
機(jī)器上:
可以發(fā)現(xiàn):
-
在
135
機(jī)器上有test2-0
和test2-2
-
在
136
機(jī)器上有test2-1
。
接下來(lái),再結(jié)合Kafka
的消息分發(fā)策略來(lái)看。
3. Kafka的消息分發(fā)
Kafka
中最基本的數(shù)據(jù)單元就是消息,而一條消息其實(shí)是由Key + Value
組成:
-
Key
是可選項(xiàng),可傳空值 -
Value
也可以傳空值
這也是與ActiveMQ
不同的一個(gè)地方。
在發(fā)送一條消息時(shí),我們可以指定這 Key
,那 Producer
會(huì)根據(jù)Key
和partition
機(jī)制來(lái)判斷當(dāng)前這條消息應(yīng)該發(fā)送并存儲(chǔ)到哪個(gè)partition
中(這個(gè)就跟分片機(jī)制類(lèi)似)。
我們可以根據(jù)需要進(jìn)行擴(kuò)展Producer
的partition
機(jī)制 (默認(rèn)算法是hash
取%
)。
如下擴(kuò)展自己的partition
代碼所示:
/**
* 消息發(fā)送后會(huì)調(diào)用自定義的策略
*
* @author super先生
* @date 2023/2/10 14:20
*/
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//獲取當(dāng)前 topic 有多少個(gè)分區(qū)(分區(qū)列表)
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int partitionNum = 0;
if (key == null) { //之前介紹過(guò) Key 是可以傳空值的
partitionNum = new Random().nextInt(partitions.size()); //隨機(jī)
} else {
//取 %
partitionNum = Math.abs((key.hashCode()) % partitions.size());
}
System.out.println("key:" + key + ",value:" + value + ",partitionNum:" + partitionNum);
//發(fā)送到指定分區(qū)
return partitionNum;
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
生產(chǎn)者和消費(fèi)者代碼可參考我的博文:實(shí)現(xiàn)kafka的生產(chǎn)者(Producer)和消費(fèi)者(Consumer)的代碼
如下創(chuàng)建kafka
生產(chǎn)者(Producer
)的代碼:
/**
* @author super先生
* @date 2023/2/10 14:40
*/
public class KafkaProducerDemo extends Thread {
/**
* 消息發(fā)送者
*/
private final KafkaProducer<Integer, String> producer;
/**
* topic
*/
private final String topic;
private final Boolean isAsync;
public KafkaProducerDemo(String topic, Boolean isAsync) {
this.isAsync = isAsync;
//構(gòu)建相關(guān)屬性
//@see ProducerConfig
Properties properties = new Properties();
//Kafka 地址
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.220.135:9092,192.168.220.136:9092");
//kafka 客戶(hù)端 Demo
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducerDemo");
//The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent.
/**發(fā)送端消息確認(rèn)模式:
* 0:消息發(fā)送給broker后,不需要確認(rèn)(性能較高,但是會(huì)出現(xiàn)數(shù)據(jù)丟失,而且風(fēng)險(xiǎn)最大,因?yàn)楫?dāng) server 宕機(jī)時(shí),數(shù)據(jù)將會(huì)丟失)
* 1:只需要獲得集群中的 leader節(jié)點(diǎn)的確認(rèn)即可返回
* -1/all:需要 ISR 中的所有的 Replica進(jìn)行確認(rèn)(集群中的所有節(jié)點(diǎn)確認(rèn)),最安全的,也有可能出現(xiàn)數(shù)據(jù)丟失(因?yàn)?ISR 可能會(huì)縮小到僅包含一個(gè) Replica)
*/
properties.put(ProducerConfig.ACKS_CONFIG, "-1");
/**【調(diào)優(yōu)】
* batch.size 參數(shù)(默認(rèn) 16kb)
* public static final String BATCH_SIZE_CONFIG = "batch.size";
*
* producer對(duì)于同一個(gè) 分區(qū) 來(lái)說(shuō),會(huì)按照 batch.size 的大小進(jìn)行統(tǒng)一收集進(jìn)行批量發(fā)送,相當(dāng)于消息并不會(huì)立即發(fā)送,而是會(huì)收集整理大小至 16kb.若將該值設(shè)為0,則不會(huì)進(jìn)行批處理
*/
/**【調(diào)優(yōu)】
* linger.ms 參數(shù)
* public static final String LINGER_MS_CONFIG = "linger.ms";
* 一個(gè)毫秒值。Kafka 默認(rèn)會(huì)把兩次請(qǐng)求的時(shí)間間隔之內(nèi)的消息進(jìn)行搜集。相當(dāng)于會(huì)有一個(gè) delay 操作。比如定義的是1000(1s),消息一秒鐘發(fā)送5條,那么這 5條消息不會(huì)立馬發(fā)送,而是會(huì)有一個(gè) delay操作進(jìn)行聚合,
* delay以后再次批量發(fā)送到 broker。默認(rèn)是 0,就是不延遲(同 TCP Nagle算法),那么 batch.size 也就不生效了
*/
//linger.ms 參數(shù)和batch.size 參數(shù)只要滿(mǎn)足其中一個(gè)都會(huì)發(fā)送
/**【調(diào)優(yōu)】
* max.request.size 參數(shù)(默認(rèn)是1M) 設(shè)置請(qǐng)求最大字節(jié)數(shù)
* public static final String MAX_REQUEST_SIZE_CONFIG = "max.request.size";
* 如果設(shè)置的過(guò)大,發(fā)送的性能會(huì)受到影響,同時(shí)寫(xiě)入接收的性能也會(huì)受到影響。
*/
//設(shè)置 key的序列化,key 是 Integer類(lèi)型,使用 IntegerSerializer
//org.apache.kafka.common.serialization
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
//設(shè)置 value 的序列化
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//指定分區(qū)策略
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"superJson.kafka.partition.MyPartitioner");
//構(gòu)建 kafka Producer,這里 key 是 Integer 類(lèi)型,Value 是 String 類(lèi)型
producer = new KafkaProducer<Integer, String>(properties);
this.topic = topic;
}
public static void main(String[] args) {
new KafkaProducerDemo("test2",true).start();
}
@Override
public void run() {
int num = 0;
while (num < 100) {
String message = "message--->" + num;
System.out.println("start to send message 【 " + message + " 】");
if (isAsync) { //如果是異步發(fā)送
producer.send(new ProducerRecord<Integer, String>(topic, message), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (metadata!=null){
System.out.println("async-offset:"+metadata.offset()+"-> partition"+metadata.partition());
}
}
});
} else { //同步發(fā)送
try {
RecordMetadata metadata = producer.send(new ProducerRecord<Integer, String>(topic, message)).get();
System.out.println("sync-offset:"+metadata.offset()+"-> partition"+metadata.partition());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
num++;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
實(shí)現(xiàn)kafka
的消費(fèi)者(Consumer
)中在接收消息的時(shí)候輸出分區(qū),如下代碼所示:
/**
* @author super先生
* @date 2023/2/10 15:10
*/
public class KafkaConsumerDemo extends Thread {
private final KafkaConsumer<Integer, String> kafkaConsumer;
public KafkaConsumerDemo(String topic) {
//構(gòu)建相關(guān)屬性
//@see ConsumerConfig
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.220.135:9092,192.168.220.136:9092");
//消費(fèi)組
/**
* consumer group是kafka提供的可擴(kuò)展且具有容錯(cuò)性的消費(fèi)者機(jī)制。既然是
一個(gè)組,那么組內(nèi)必然可以有多個(gè)消費(fèi)者或消費(fèi)者實(shí)例((consumer instance),
它們共享一個(gè)公共的ID,即group ID。組內(nèi)的所有消費(fèi)者協(xié)調(diào)在一起來(lái)消費(fèi)訂
閱主題(subscribed topics)的所有分區(qū)(partition)。當(dāng)然,每個(gè)分區(qū)只能由同一
個(gè)消費(fèi)組內(nèi)的一個(gè)consumer來(lái)消費(fèi).后面會(huì)進(jìn)一步介紹。
*/
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaConsumerDemo");
/** auto.offset.reset 參數(shù) 從什么時(shí)候開(kāi)始消費(fèi)
* public static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset";
*
* 這個(gè)參數(shù)是針對(duì)新的groupid中的消費(fèi)者而言的,當(dāng)有新groupid的消費(fèi)者來(lái)消費(fèi)指定的topic時(shí),對(duì)于該參數(shù)的配置,會(huì)有不同的語(yǔ)義
* auto.offset.reset=latest情況下,新的消費(fèi)者將會(huì)從其他消費(fèi)者最后消費(fèi)的offset處開(kāi)始消費(fèi)topic下的消息
* auto.offset.reset= earliest情況下,新的消費(fèi)者會(huì)從該topic最早的消息開(kāi)始消費(fèi)
auto.offset.reset=none情況下,新的消費(fèi)組加入以后,由于之前不存在 offset,則會(huì)直接拋出異常。說(shuō)白了,新的消費(fèi)組不要設(shè)置這個(gè)值
*/
//enable.auto.commit
//消費(fèi)者消費(fèi)消息以后自動(dòng)提交,只有當(dāng)消息提交以后,該消息才不會(huì)被再次接收到(如果沒(méi)有 commit,消息可以重復(fù)消費(fèi),也沒(méi)有 offset),還可以配合auto.commit.interval.ms控制自動(dòng)提交的頻率。
//當(dāng)然,我們也可以通過(guò)consumer.commitSync()的方式實(shí)現(xiàn)手動(dòng)提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
/**max.poll.records
*此參數(shù)設(shè)置限制每次調(diào)用poll返回的消息數(shù),這樣可以更容易的預(yù)測(cè)每次poll間隔
要處理的最大值。通過(guò)調(diào)整此值,可以減少poll間隔
*/
//間隔時(shí)間
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
//反序列化 key
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
//反序列化 value
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//構(gòu)建 KafkaConsumer
kafkaConsumer = new KafkaConsumer<>(properties);
//設(shè)置 topic
kafkaConsumer.subscribe(Collections.singletonList(topic));
}
/**
* 接收消息
*/
@Override
public void run() {
while (true) {
//拉取消息
ConsumerRecords<Integer, String> consumerRecord = kafkaConsumer.poll(100000000);
for (ConsumerRecord<Integer, String> record : consumerRecord) {
//record.partition() 獲取當(dāng)前分區(qū)
System.out.println(record.partition()+"】】 message receive 【" + record.value() + "】");
}
}
}
public static void main(String[] args) {
new KafkaConsumerDemo("test2").start();
}
}
首先啟動(dòng)Consumer
,再啟動(dòng)Producer
:
可以看到是能夠?qū)Φ纳系摹?/p>
默認(rèn)情況下,Kafka
采用的是hash 取 % 的分區(qū)算法
。
如果Key
為null
,則會(huì)隨機(jī)分配一個(gè)分區(qū)。
這個(gè)隨機(jī)是在這個(gè)參數(shù)metadata.max.age.ms
的時(shí)間范圍內(nèi)隨機(jī)選擇一個(gè)。
對(duì)于這個(gè)時(shí)間段內(nèi),如果Key
為 null,則只會(huì)發(fā)送到唯一的分區(qū)。這個(gè)值默認(rèn)情況下是 10
分鐘更新一次 (因?yàn)?partition
狀態(tài)可能會(huì)發(fā)生變化)。
4. 關(guān)于 Metadata
Metadata
包含 Topic
和 Partition
和 broker
的映射關(guān)系,每一個(gè) Topic
的每一個(gè) partition
,需要知道對(duì)應(yīng)的 broker
列表是什么,Leader
是誰(shuí),Follower
是誰(shuí)。
這些信息都是存儲(chǔ)在Metadata
這個(gè)類(lèi)中,如下圖所示:
5. 消費(fèi)端如何消費(fèi)指定分區(qū)
Consumer
可以指定具體消費(fèi)的分區(qū),如下圖所示:
再重新啟動(dòng)Consumer
和Producer
:
可以看到Consumer
只消費(fèi)了分區(qū)為1
的消息。
以上是單個(gè)Consumer
消費(fèi)(指定)分區(qū)的情況。
一般每個(gè)Topic
都會(huì)有多個(gè)partition
(主要是用于數(shù)據(jù)分片,減少消息的容量,從而提升 I/O 性能)。
當(dāng)然也可以使用多個(gè)Consumer
從而提高消費(fèi)能力,有一個(gè)消費(fèi)組的概念(具體可參看:一文了解kafka消息隊(duì)列)
如果Consumer1、Consumer2 和 Consumer3
都屬于group.id 為1
的消費(fèi)組,那么消費(fèi)情況如下:
-
Consumer1
就會(huì)消費(fèi)p0
-
Consumer2
就會(huì)消費(fèi)p1
-
Consumer3
就會(huì)消費(fèi)p2
不使用指定分區(qū)的方式創(chuàng)建三個(gè)Consumer
:
而且,它們都是同一個(gè)消費(fèi)組:
同時(shí)啟動(dòng)三個(gè)Consumer
和Producer
,而且它們都是同一個(gè)消費(fèi)組:
可以看到三個(gè)Consumer
分別消費(fèi)三個(gè)Partition
,很均勻。
對(duì)同一個(gè)Group
來(lái)說(shuō),其中的Consumer
可以消費(fèi)指定分區(qū)也可以消費(fèi)自動(dòng)分配的分區(qū)(這里是 Consumer
數(shù)量和partition
數(shù)量一致,均勻分配)。
如果下述情況如何處理:
- 如果
Consumer
數(shù)量大于partition
數(shù)量呢? - 如果
Consumer
數(shù)量小于partition
數(shù)量呢?
這兩種情況,讀者可自行測(cè)試。
但要注意如下情況:
-
如果
Consumer
數(shù)量比partition
數(shù)量多,會(huì)有的Consumer
閑置無(wú)法消費(fèi),這樣是一個(gè)浪費(fèi)。 -
如果
Consumer
數(shù)量小于partition
數(shù)量會(huì)有一個(gè)Consumer
消費(fèi)多個(gè)partition
。
Kafka
在partition
上是不允許并發(fā)的。Consuemr
數(shù)量建議最好是partition
的整數(shù)倍。
還有一點(diǎn),如果Consumer
從多個(gè)partiton
上讀取數(shù)據(jù),是不保證順序性的。Kafka
只保證一個(gè)partition
的順序性,跨partition
是不保證順序性的。增減 Consumer、broker、partition 會(huì)導(dǎo)致 Rebalance。
6. Kafka 分區(qū)分配策略
在Kafka
中,同一個(gè)Group
中的消費(fèi)者對(duì)于一個(gè)Topic
中的多個(gè)partition
存在一定的分區(qū)分配策略——分區(qū)分配策略有如下兩種:
-
Range(默認(rèn))
策略 -
RoundRobin(輪詢(xún))
策略
通過(guò)partition.assignment.strategy
這個(gè)參數(shù)來(lái)設(shè)置。
6.1 Range strategy(范圍分區(qū))
Range
策略是對(duì)每個(gè)主題而言的,首先對(duì)同一個(gè)主題里面的分區(qū)按照序號(hào)進(jìn)行排序,并對(duì)消費(fèi)者按照字母順序進(jìn)行排序。
假設(shè)我們有10
個(gè)分區(qū),3
個(gè)消費(fèi)者,排完序的分區(qū)將會(huì)是0,1,2,3,4,5,6,7,8,9
;消費(fèi)者線(xiàn)程排完序?qū)?huì)是C1-0, C2-0, C3-0
。
然后partitions
的個(gè)數(shù)除于消費(fèi)者線(xiàn)程的總數(shù)來(lái)決定每個(gè)消費(fèi)者線(xiàn)程消費(fèi)幾個(gè)分區(qū)。如果除不盡,那么前面幾個(gè)消費(fèi)者線(xiàn)程將會(huì)多消費(fèi)一個(gè)分區(qū)。
假如在 Topic1
中有10
個(gè)分區(qū),3
個(gè)消費(fèi)者線(xiàn)程,10/3 = 3
,而且除不盡,那么消費(fèi)者線(xiàn)程C1-0
將會(huì)多消費(fèi)一個(gè)分區(qū),所以最后分區(qū)分配的結(jié)果是這樣的:
-
C1-0
將消費(fèi)0,1,2,3
分區(qū) -
C2-0
將消費(fèi)4,5,6
分區(qū) -
C3-0
將消費(fèi)7,8,9
分區(qū)
假如在Topic1
中有11
個(gè)分區(qū),那么最后分區(qū)分配的結(jié)果看起來(lái)是這樣的:
-
C1-0
將消費(fèi)0,1,2,3
分區(qū) -
C2-0
將消費(fèi)4, 5, 6, 7
分區(qū) -
C3-0
將消費(fèi)8,9,10
分區(qū)
假如有兩個(gè)Topic:Topic1 和 Topic2
,都有10
個(gè)分區(qū)。那么,最后分區(qū)分配的結(jié)果看起來(lái)是這樣的:
-
C1-0
將消費(fèi)Topic1
的0,1,2,3
分區(qū)和Topic1
的0,1,2,3
分區(qū) -
C2-0
將消費(fèi)Topic1
的4,5,6
分區(qū)和Topic2
的4,5,6
分區(qū) -
C3-0
將消費(fèi)Topic1
的7,8,9
分區(qū)和Topic2
的7,8,9
分區(qū)
其實(shí)這樣就會(huì)有一個(gè)問(wèn)題,C1-0
就會(huì)多消費(fèi)兩個(gè)分區(qū),這就是一個(gè)很明顯的弊端。
6.2 RoundRobin strategy(輪詢(xún)分區(qū))
輪詢(xún)分區(qū)策略是把所有partition
和所有Consumer
線(xiàn)程都列出來(lái),然后按照hashcode
進(jìn)行排序。
最后通過(guò)輪詢(xún)算法分配partition
給消費(fèi)線(xiàn)程。如果所有Consumer
實(shí)例的訂閱是相同的,那么partition
會(huì)均勻分布。
假如按照hashCode
排序完的Topic / partitions
組依次為:
T1一5
T1一3
T1-0
T1-8
T1-2
T1-1
T1-4
T1-7
T1-6
T1-9
消費(fèi)者線(xiàn)程排序?yàn)椋?/p>
C1-0
C1-1
C2-0
C2-1
最后的分區(qū)分配的結(jié)果為:
-
C1-0
將消費(fèi)T1-5, T1-2, T1-6
分區(qū) -
C1-1
將消費(fèi)T1-3, T1-1, T1-9
分區(qū) -
C2-0
將消費(fèi)T1-0, T1-4
分區(qū) -
C2-1
將消費(fèi)T1-8, T1-7
分區(qū)
使用輪詢(xún)分區(qū)策略必須滿(mǎn)足兩個(gè)條件:
-
每個(gè)主題的消費(fèi)者實(shí)例具有相同數(shù)量的流
-
每個(gè)消費(fèi)者訂閱的主題必須是相同的
1. 什么時(shí)候會(huì)觸發(fā)這個(gè)策略呢?
當(dāng)出現(xiàn)以下幾種情況時(shí),Kafka
會(huì)進(jìn)行一次分區(qū)分配操作,也就是Kafka Consumer
的Rebalance
-
同一個(gè)
Consumer group
內(nèi)新增了消費(fèi)者 -
消費(fèi)者離開(kāi)當(dāng)前所屬的
Consumer group
,比如主動(dòng)停機(jī)或者宕機(jī) -
Topic
新增了分區(qū)(也就是分區(qū)數(shù)量發(fā)生了變化)
Kafka Consuemr
的Rebalance
機(jī)制規(guī)定了一個(gè)Consumer group
下的所有Consumer
如何達(dá)成一致來(lái)分配訂閱Topic
的每個(gè)分區(qū)。
而具體如何執(zhí)行分區(qū)策略,就是前面提到過(guò)的兩種內(nèi)置的分區(qū)策略。而Kafka
對(duì)于分配策略這塊,提供了可插拔的實(shí)現(xiàn)方式,也就是說(shuō),除了這兩種之外,我們還可以創(chuàng)建自己的分配機(jī)制。
2. 誰(shuí)來(lái)執(zhí)行Rebalance
以及管理Consumer
的group
呢?
Consumer group
如何確定自己的coordinator
是誰(shuí)呢,消費(fèi)者向Kafka
集群中的任意一個(gè)broker
發(fā)送一個(gè) GroupCoord inatorRequest
請(qǐng)求,服務(wù)端會(huì)返回一個(gè)負(fù)載最小的broker
節(jié)點(diǎn)的id
,并將該broker
設(shè)置為 coordinator
。
3. JoinGroup
的過(guò)程
在Rebalance
之前,需要保證coordinator
是已經(jīng)確定好了的,整個(gè)Rebalance
的過(guò)程分為兩個(gè)步驟,Join和Syncjoin
:表示加入到Consumer group
中,在這一步中,所有的成員都會(huì)向coordinator
發(fā)送 joinGroup
的請(qǐng)求。
一旦所有成員都發(fā)了joinGroup
請(qǐng)求,那么coordinator
會(huì)選擇一個(gè)Consumer
擔(dān)任leader
角色,并把組成員信息和訂閱信息發(fā)送給消費(fèi)者:
-
protocol-metadata
:序列化后的消費(fèi)者的訂閱信息 -
leader id
:消費(fèi)組中的消費(fèi)者,coordinator
會(huì)選擇一個(gè)作為leader
,對(duì)應(yīng)的就是member id
-
member metadata
:對(duì)應(yīng)消費(fèi)者的訂閱信息 -
members
:consumer group
中全部的消費(fèi)者的訂閱信息 -
generation_id
:年代信息,類(lèi)似于ZooKeepe
中的epoch
,對(duì)于每一輪Rebalance
,generation_id
都會(huì)遞增。主要用來(lái)保護(hù)consumer group
,隔離無(wú)效的offset
提交。也就是上一輪的consumer
成員無(wú)法提交offset
到新的Consumer group
中。
4. Synchronizing Group State
階段
完成分區(qū)分配之后,就進(jìn)入了Synchronizing Group Stat
階段,主要邏輯是向GroupCoordinator
發(fā)送SyncGroupRequest
請(qǐng)求,并且處理SyncGroupResponse
響應(yīng),簡(jiǎn)單來(lái)說(shuō),就是leader
將消費(fèi)者對(duì)應(yīng)的 partition
分配方案同步給Consumer group
中的所有Consumer
:
每個(gè)消費(fèi)者都會(huì)向coordinator
發(fā)送syncgroup
請(qǐng)求,不過(guò)只有leader
節(jié)點(diǎn)會(huì)發(fā)送分配方案,其他消費(fèi)者只是打打醬油而已。
當(dāng)leader
把方案發(fā)給coordinator
以后,coordinator
會(huì)把結(jié)果設(shè)置到SyncGroupResponse
中。這樣所有成員都知道自己應(yīng)該消費(fèi)哪個(gè)分區(qū)。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-823649.html
Consumer group
的分區(qū)分配方案是在客戶(hù)端執(zhí)行的!Kafka
將這個(gè)權(quán)利下放給客戶(hù)端主要是因?yàn)檫@樣做可以有更好的靈活性。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-823649.html
7.參考文獻(xiàn)
- https://dongguabai.blog.csdn.net/article/details/86536894
到了這里,關(guān)于全網(wǎng)最詳細(xì)地理解Kafka中的Topic和Partition以及關(guān)于kafka的消息分發(fā)、服務(wù)端如何消費(fèi)指定分區(qū)、kafka的分區(qū)分配策略(range策略和RoundRobin策略)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!