架構(gòu)原理
一、高吞吐機制:Batch打包、緩沖區(qū)、acks
1. Kafka Producer怎么把消息發(fā)送給Broker集群的?
需要指定把消息發(fā)送到哪個topic去
首先需要選擇一個topic的分區(qū),默認是輪詢來負載均衡,但是如果指定了一個分區(qū)key,那么根據(jù)這個key的hash值來分發(fā)到指定的分區(qū),這樣可以讓相同的key分發(fā)到同一個分區(qū)里去,還可以自定義partitioner來實現(xiàn)分區(qū)策略
producer.send(msg); // 用類似這樣的方式去發(fā)送消息,就會把消息給你均勻的分布到各個分區(qū)上去
producer.send(key, msg); // 訂單id,或者是用戶id,他會根據(jù)這個key的hash值去分發(fā)到某個分區(qū)上去,他可以保證相同的key會路由分發(fā)到同一個分區(qū)上去
知道要發(fā)送到哪個分區(qū)之后,還得找到這個分區(qū)的leader副本所在的機器,然后跟那個機器上的Broker通過Socket建立連接來進行通信,發(fā)送Kafka自定義協(xié)議格式的請求過去,把消息就帶過去了
如果找到了partition的leader所在的broker之后,就可以通過socket跟那臺broker建立連接,接著發(fā)送消息過去
Producer(生產(chǎn)者客戶端),起碼要知道兩個元數(shù)據(jù),每個topic有幾個分區(qū),每個分區(qū)的leader是在哪臺broker上,會自己從broker上拉取kafka集群的元數(shù)據(jù),緩存在自己client本地客戶端上
kafka使用者的層面來考慮一下,我如果要把數(shù)據(jù)寫入kafka集群,應(yīng)該如何來做,怎么把數(shù)據(jù)寫入kafka集群,以及他背后的一些原理還有使用過程中需要設(shè)置的一些參數(shù),到底應(yīng)該怎么來弄
2. 用一張圖告訴你Producer發(fā)送消息的內(nèi)部實現(xiàn)原理
每次發(fā)送消息都必須先把數(shù)據(jù)封裝成一個ProducerRecord對象,里面包含了要發(fā)送的topic,具體在哪個分區(qū),分區(qū)key,消息內(nèi)容,timestamp時間戳,然后這個對象交給序列化器,變成自定義協(xié)議格式的數(shù)據(jù)
接著把數(shù)據(jù)交給partitioner分區(qū)器,對這個數(shù)據(jù)選擇合適的分區(qū),默認就輪詢所有分區(qū),或者根據(jù)key來hash路由到某個分區(qū),這個topic的分區(qū)信息,都是在客戶端會有緩存的,當然會提前跟broker去獲取
接著這個數(shù)據(jù)會被發(fā)送到producer內(nèi)部的一塊緩沖區(qū)里
然后producer內(nèi)部有一個Sender線程,會從緩沖區(qū)里提取消息封裝成一個一個的batch,然后每個batch發(fā)送給分區(qū)的leader副本所在的broker
3. 基于Java API寫一個Kafka Producer發(fā)送消息的代碼示例
package com.zhss.demo.kafka;
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class ProducerDemo {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
// 這里可以配置幾臺broker即可,他會自動從broker去拉取元數(shù)據(jù)進行緩存
props.put("bootstrap.servers", "hadoop03:9092,hadoop04:9092,hadoop05:9092");
// 這個就是負責(zé)把發(fā)送的key從字符串序列化為字節(jié)數(shù)組
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 這個就是負責(zé)把你發(fā)送的實際的message從字符串序列化為字節(jié)數(shù)組
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "-1");
props.put("retries", 3);
props.put("batch.size", 323840);
props.put("linger.ms", 10);
props.put("buffer.memory", 33554432);
props.put("max.block.ms", 3000);
// 創(chuàng)建一個Producer實例:線程資源,跟各個broker建立socket連接資源
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
ProducerRecord<String, String> record = new ProducerRecord<>(
"test-topic", "test-key", "test-value");
// 這是異步發(fā)送的模式
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception == null) {
// 消息發(fā)送成功
System.out.println("消息發(fā)送成功");
} else {
// 消息發(fā)送失敗,需要重新發(fā)送
}
}
});
Thread.sleep(10 * 1000);
// 這是同步發(fā)送的模式
// producer.send(record).get();
// 你要一直等待人家后續(xù)一系列的步驟都做完,發(fā)送消息之后
// 有了消息的回應(yīng)返回給你,你這個方法才會退出來
producer.close();
}
}
4. 發(fā)送消息給Broker時遇到的各種異常該如何處理?
之前我們看到不管是異步還是同步,都可能讓你處理異常,常見的異常如下:
LeaderNotAvailableException:這個就是如果某臺機器掛了,此時leader副本不可用,會導(dǎo)致你寫入失敗,要等待其他follower副本切換為leader副本之后,才能繼續(xù)寫入,此時可以重試發(fā)送即可
如果說你平時重啟kafka的broker進程,肯定會導(dǎo)致leader切換,一定會導(dǎo)致你寫入報錯,是LeaderNotAvailableException
NotControllerException:這個也是同理,如果說Controller所在Broker掛了,那么此時會有問題,需要等待Controller重新選舉,此時也是一樣就是重試即可
NetworkException:網(wǎng)絡(luò)異常,重試即可
我們之前配置了一個參數(shù),retries,他會自動重試的,但是如果重試幾次之后還是不行,就會提供Exception給我們來處理了
5. 發(fā)送消息的緩沖區(qū)應(yīng)該如何優(yōu)化來提升發(fā)送的吞吐量?
buffer.memory:設(shè)置發(fā)送消息的緩沖區(qū),默認值是33554432,就是32MB
如果發(fā)送消息出去的速度小于寫入消息進去的速度,就會導(dǎo)致緩沖區(qū)寫滿,此時生產(chǎn)消息就會阻塞住,所以說這里就應(yīng)該多做一些壓測,盡可能保證說這塊緩沖區(qū)不會被寫滿導(dǎo)致生產(chǎn)行為被阻塞住
compression.type,默認是none,不壓縮,但是也可以使用lz4壓縮,效率還是不錯的,壓縮之后可以減小數(shù)據(jù)量,提升吞吐量,但是會加大producer端的cpu開銷
6. 消息批量發(fā)送的核心參數(shù)batch.size是如何優(yōu)化吞吐量?
batch.size,設(shè)置meigebatch的大小,如果batch太小,會導(dǎo)致頻繁網(wǎng)絡(luò)請求,吞吐量下降;如果batch太大,會導(dǎo)致一條消息需要等待很久才能被發(fā)送出去,而且會讓內(nèi)存緩沖區(qū)有很大壓力,過多數(shù)據(jù)緩沖在內(nèi)存里
默認值是:16384,就是16kb,也就是一個batch滿了16kb就發(fā)送出去,一般在實際生產(chǎn)環(huán)境,這個batch的值可以增大一些來提升吞吐量,可以自己壓測一下
還有一個參數(shù),linger.ms,這個值默認是0,意思就是消息必須立即被發(fā)送,但是這是不對的,一般設(shè)置一個100毫秒之類的,這樣的話就是說,這個消息被發(fā)送出去后進入一個batch,如果100毫秒內(nèi),這個batch滿了16kb,自然就會發(fā)送出去
但是如果100毫秒內(nèi),batch沒滿,那么也必須把消息發(fā)送出去了,不能讓消息的發(fā)送延遲時間太長,也避免給內(nèi)存造成過大的一個壓力
7. 如何根據(jù)業(yè)務(wù)場景對消息大小以及請求超時進行合理的設(shè)置?
max.request.size:這個參數(shù)用來控制發(fā)送出去的消息的大小,默認是1048576字節(jié),也就1mb,這個一般太小了,很多消息可能都會超過1mb的大小,所以需要自己優(yōu)化調(diào)整,把他設(shè)置更大一些
你發(fā)送出去的一條大數(shù)據(jù),超大的JSON串,超過1MB,就不讓你發(fā)了
request.timeout.ms:這個就是說發(fā)送一個請求出去之后,他有一個超時的時間限制,默認是30秒,如果30秒都收不到響應(yīng),那么就會認為異常,會拋出一個TimeoutException來讓我們進行處理
8. 基于Kafka內(nèi)核架構(gòu)原理深入分析acks參數(shù)到底是干嘛的
acks參數(shù),其實是控制發(fā)送出去的消息的持久化機制的
如果acks=0,那么producer根本不管寫入broker的消息到底成功沒有,發(fā)送一條消息出去,立馬就可以發(fā)送下一條消息,這是吞吐量最高的方式,但是可能消息都丟失了,你也不知道的,但是說實話,你如果真是那種實時數(shù)據(jù)流分析的業(yè)務(wù)和場景,就是僅僅分析一些數(shù)據(jù)報表,丟幾條數(shù)據(jù)影響不大的
會讓你的發(fā)送吞吐量會提升很多,你發(fā)送弄一個batch出,不需要等待人家leader寫成功,直接就可以發(fā)送下一個batch了,吞吐量很大的,哪怕是偶爾丟一點點數(shù)據(jù),實時報表,折線圖,餅圖
acks=all,或者acks=-1:這個leader寫入成功以后,必須等待其他ISR中的副本都寫入成功,才可以返回響應(yīng)說這條消息寫入成功了,此時你會收到一個回調(diào)通知
min.insync.replicas = 2,ISR里必須有2個副本,一個leader和一個follower,最最起碼的一個,不能只有一個leader存活,連一個follower都沒有了
acks = -1,每次寫成功一定是leader和follower都成功才可以算做成功,leader掛了,follower上是一定有這條數(shù)據(jù),不會丟失
retries = Integer.MAX_VALUE,無限重試,如果上述兩個條件不滿足,寫入一直失敗,就會無限次重試,保證說數(shù)據(jù)必須成功的發(fā)送給兩個副本,如果做不到,就不停的重試,除非是面向金融級的場景,面向企業(yè)大客戶,或者是廣告計費,跟錢的計算相關(guān)的場景下,才會通過嚴格配置保證數(shù)據(jù)絕對不丟失
acks=1:只要leader寫入成功,就認為消息成功了,默認給這個其實就比較合適的,還是可能會導(dǎo)致數(shù)據(jù)丟失的,如果剛寫入leader,leader就掛了,此時數(shù)據(jù)必然丟了,其他的follower沒收到數(shù)據(jù)副本,變成leader
9. 針對瞬間異常的消息重試參數(shù)有哪些需要考慮的點
有的時候一些leader切換之類的問題,需要進行重試,設(shè)置retries即可,而且還可以跟消息不丟失結(jié)合起來,但是消息重試會導(dǎo)致重復(fù)發(fā)送的問題,比如說網(wǎng)絡(luò)抖動一下導(dǎo)致他以為沒成功,就重試了,其實人家都成功了
所以消息重試導(dǎo)致的消費重復(fù),需要你在下游consumer做冪等性處理,但是kafka已經(jīng)支持了一次且僅一次的消息語義
另外一個,消息重試是可能導(dǎo)致消息的亂序的,因為可能排在你后面的消息都發(fā)送出去了,你現(xiàn)在收到回調(diào)失敗了才在重試,此時消息就會亂序,所以可以使用“max.in.flight.requests.per.connection”參數(shù)設(shè)置為1,這樣可以保證producer同一時間只能發(fā)送一條消息
兩次重試的間隔默認是100毫秒,用“retry.backoff.ms”來進行設(shè)置
一般來說,某臺broker重啟導(dǎo)致的leader切換,是最常見的異常,所以盡可能把重試次數(shù)和間隔,設(shè)置的可以cover住新leader切換過來
10. Kafka Producer高階用法(一):自定義分區(qū)
public class HotDataPartitioner implements Partitioner {
private Random random;
@Override
public void configure(Map<String, ?> configs) {
random = new Random();
}
@Override
public int partition(String topic, Object keyObj, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
String key = (String)keyObj;
List<PartitionInfo> partitionInfoList = cluster.availablePartitionsForTopic(topic);
int partitionCount = partitionInfoList.size();
int hotDataPartition = partitionCount - 1;
return !key.contains(“hot_data”) ? random.nextInt(partitionCount - 1) : hotDataPartition;
}
}
props.put(“partitioner.class”, “com.zhss.HotDataPartitioner”);
測試發(fā)送
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test-topic
11. Kafka Producer高階用法(二):自定義序列化
12. Kafka Producer高階用法(三):自定義攔截器
二、Kafka Consumer選舉與Rebalance實現(xiàn)原理
1. 一張圖畫清Kafka基于Consumer Group的消費者組的模型
每個consumer都要屬于一個consumer.group,就是一個消費組,topic的一個分區(qū)只會分配給一個消費組下的一個consumer來處理,每個consumer可能會分配多個分區(qū),也有可能某個consumer沒有分配到任何分區(qū)
分區(qū)內(nèi)的數(shù)據(jù)是保證順序性的
group.id = “membership-consumer-group”
如果你希望實現(xiàn)一個廣播的效果,你的每臺機器都要消費到所有的數(shù)據(jù),每臺機器啟動的時候,group.id可以是一個隨機生成的UUID也可以,你只要讓不同的機器的KafkaConsumer的group.id是不一樣的
如果consumer group中某個消費者掛了,此時會自動把分配給他的分區(qū)交給其他的消費者,如果他又重啟了,那么又會把一些分區(qū)重新交還給他,這個就是所謂的消費者rebalance的過程
2. 消費者offset的記錄方式以及基于內(nèi)部topic的提交模式
每個consumer內(nèi)存里數(shù)據(jù)結(jié)構(gòu)保存對每個topic的每個分區(qū)的消費offset,定期會提交offset,老版本是寫入zk,但是那樣高并發(fā)請求zk是不合理的架構(gòu)設(shè)計,zk是做分布式系統(tǒng)的協(xié)調(diào)的,輕量級的元數(shù)據(jù)存儲,不能負責(zé)高并發(fā)讀寫,作為數(shù)據(jù)存儲
所以后來就是提交offset發(fā)送給內(nèi)部topic:__consumer_offsets,提交過去的時候,key是group.id+topic+分區(qū)號,value就是當前offset的值,每隔一段時間,kafka內(nèi)部會對這個topic進行compact
也就是每個group.id+topic+分區(qū)號就保留最新的那條數(shù)據(jù)即可
而且因為這個__consumer_offsets可能會接收高并發(fā)的請求,所以默認分區(qū)50個,這樣如果你的kafka部署了一個大的集群,比如有50臺機器,就可以用50臺機器來抗offset提交的請求壓力,就好很多
3. 基于Java API寫一個Kafka Consumer消費消息的代碼示例
String topicName = “test-topic”;
String groupId = “test-group”;
Properties props = new Properties();
props.put(“bootstrap.servers”, “l(fā)ocalhost:9092”);
props.put(“group.id”, “groupId”);
props.put(“enable.auto.commit”, “true”);
props.put(“auto.commit.ineterval.ms”, “1000”);
// 每次重啟都是從最早的offset開始讀取,不是接著上一次
props.put(“auto.offset.reset”, “earliest”);
props.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
props.put(“value.deserializer”, “org.apache.kafka.common.serialization.SttringDeserializer”);
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topicName));
try {
while(true) {
ConsumerRecords<String, String> records = consumer.poll(1000); // 超時時間
for(ConsumerRecord<String, String> record : records) {
System.out.println(record.offset() + “, ” + record.key() + “, ” + record.value());
}
}
} catch(Exception e) {
}
4. Kafka感知消費者故障是通過哪三個參數(shù)來實現(xiàn)的?
heartbeat.interval.ms:consumer心跳時間,必須得保持心跳才能知道consumer是否故障了,然后如果故障之后,就會通過心跳下發(fā)rebalance的指令給其他的consumer通知他們進行rebalance的操作
session.timeout.ms:kafka多長時間感知不到一個consumer就認為他故障了,默認是10秒
max.poll.interval.ms:如果在兩次poll操作之間,超過了這個時間,那么就會認為這個consume處理能力太弱了,會被踢出消費組,分區(qū)分配給別人去消費,一遍來說結(jié)合你自己的業(yè)務(wù)處理的性能來設(shè)置就可以了
5. 對消息進行消費時有哪幾個參數(shù)需要注意以及設(shè)置呢?
fetch.max.bytes:獲取一條消息最大的字節(jié)數(shù),一般建議設(shè)置大一些
max.poll.records:一次poll返回消息的最大條數(shù),默認是500條
connection.max.idle.ms:consumer跟broker的socket連接如果空閑超過了一定的時間,此時就會自動回收連接,但是下次消費就要重新建立socket連接,這個建議設(shè)置為-1,不要去回收
6. 消費者offset相關(guān)的參數(shù)設(shè)置會對運行產(chǎn)生什么樣的影響?
auto.offset.reset:這個參數(shù)的意思是,如果下次重啟,發(fā)現(xiàn)要消費的offset不在分區(qū)的范圍內(nèi),就會重頭開始消費;但是如果正常情況下會接著上次的offset繼續(xù)消費的
enable.auto.commit:這個就是開啟自動提交位移
7. Group Coordinator是什么以及主要負責(zé)什么?
每個consumer group都會選擇一個broker作為自己的coordinator,他是負責(zé)監(jiān)控這個消費組里的各個消費者的心跳,以及判斷是否宕機,然后開啟rebalance的,那么這個如何選擇呢?
就是根據(jù)group.id來進行選擇,他有內(nèi)部的一個選擇機制,會給你挑選一個對應(yīng)的Broker,總會把你的各個消費組均勻分配給各個Broker作為coordinator來進行管理的
他負責(zé)的事情只要就是rebalance,說白了你的consumer group中的每個consumer剛剛啟動就會跟選舉出來的這個consumer group對應(yīng)的coordinator所在的broker進行通信,然后由coordinator分配分區(qū)給你的這個consumer來進行消費
coordinator會盡可能均勻的分配分區(qū)給各個consumer來消費
8. 為消費者選擇Coordinator的算法是如何實現(xiàn)的?
首先對groupId進行hash,接著對__consumer_offsets的分區(qū)數(shù)量取模,默認是50,可以通過offsets.topic.num.partitions來設(shè)置,找到你的這個consumer group的offset要提交到__consumer_offsets的哪個分區(qū)
比如說:groupId,“membership-consumer-group” -> hash值(數(shù)字)-> 對50取模 -> 就知道這個consumer group下的所有的消費者提交offset的時候是往哪個分區(qū)去提交offset,大家可以找到__consumer_offsets的一個分區(qū)
__consumer_offset的分區(qū)的副本數(shù)量默認來說1,只有一個leader
然后對這個分區(qū)找到對應(yīng)的leader所在的broker,這個broker就是這個consumer group的coordinator了,接著就會維護一個Socket連接跟這個Broker進行通信
9. Coordinator和Consume Leader如何協(xié)作制定分區(qū)方案?
每個consumer都發(fā)送JoinGroup請求到Coordinator,然后Coordinator從一個consumer group中選擇一個consumer作為leader,把consumer group情況發(fā)送給這個leader,接著這個leader會負責(zé)制定分區(qū)方案,通過SyncGroup發(fā)給Coordinator
接著Coordinator就把分區(qū)方案下發(fā)給各個consumer,他們會從指定的分區(qū)的leader broker開始進行socket連接以及消費消息
10. rebalance的三種策略分別有哪些優(yōu)劣勢?
這里有三種rebalance的策略:range、round-robin、sticky
0~8
order-topic-0
order-topic-1
order-topic-2
range策略就是按照partiton的序號范圍,比如partitioin02給一個consumer,partition35給一個consumer,partition6~8給一個consumer,默認就是這個策略;
round-robin策略,就是輪詢分配,比如partiton0、3、6給一個consumer,partition1、4、7給一個consumer,partition2、5、8給一個consumer
但是上述的問題就在于說,可能在rebalance的時候會導(dǎo)致分區(qū)被頻繁的重新分配,比如說掛了一個consumer,然后就會導(dǎo)致partition04分配給第一個consumer,partition58分配給第二個consumer
這樣的話,原本是第二個consumer消費的partition3~4就給了第一個consumer,實際上來說未必就很好
最新的一個sticky策略,就是說盡可能保證在rebalance的時候,讓原本屬于這個consumer的分區(qū)還是屬于他們,然后把多余的分區(qū)再均勻分配過去,這樣盡可能維持原來的分區(qū)分配的策略
consumer1:0~2 + 6~7
consumer2:3~5 + 8
11. Consumer內(nèi)部單線程處理一切事務(wù)的核心設(shè)計思想
其實就是在一個while循環(huán)里不停的去調(diào)用poll()方法,其實是我們自己的一個線程,就是我們自己的這個線程就是唯一的KafkaConsumer的工作線程,新版本的kafka api,簡化,減少了線程數(shù)量
Consumer自己內(nèi)部就一個后臺線程,定時發(fā)送心跳給broker;但是其實負責(zé)進行拉取消息、緩存消息、在內(nèi)存里更新offset、每隔一段時間提交offset、執(zhí)行rebalance這些任務(wù)的就一個線程,其實就是我們調(diào)用Consumer.poll()方法的那個線程
就一個線程調(diào)用進去,會負責(zé)把所有的事情都干了
為什么叫做poll呢?因為就是你可以監(jiān)聽N多個Topic的消息,此時會跟集群里很多Kafka Broker維護一個Socket連接,然后每一次線程調(diào)用poll(),就會監(jiān)聽多個socket是否有消息傳遞過來
可能一個consumer會消費很多個partition,每個partition其實都是leader可能在不同的broker上,那么如果consumer要拉取多個partition的數(shù)據(jù),就需要跟多個broker進行通信,維護socket
每個socket就會跟一個broker進行通信
每個Consumer內(nèi)部會維護多個Socket,負責(zé)跟多個Broker進行通信,我們就一個工作線程每次調(diào)用poll()的時候,他其實會監(jiān)聽多個socket跟broker的通信,是否有新的數(shù)據(jù)可以去拉取
12. 消費過程中的各種offset之間的關(guān)系是什么?
上一次提交offset,當前offset(還未提交),高水位offset,LEO
內(nèi)存里記錄這么幾個東西:上一次提交offset,當前消費到的offset,你不斷的在消費消息,不停的在拉取新的消息,不停的更新當前消費的offset,HW offset,你拉取的時候,是只能看到HW他前面的數(shù)據(jù)
LEO,leader partition已經(jīng)更新到了一個offset了,但是HW在前面,你只能拉取到HW的數(shù)據(jù),HW后面的數(shù)據(jù),意味著不是所有的follower都寫入進去了,所以不能去讀取的
13. 自動提交offset的語義以及導(dǎo)致消息丟失和重復(fù)消費的問題
默認是自動提交
auto.commit.inetrval.ms:5000,默認是5秒提交一次
如果你提交了消費到的offset之后,人家kafka broker就可以感知到了,比如你消費到了offset = 56987,下次你的consumer再次重啟的時候,就會自動從kafka broker感知到說自己上一次消費到的offset = 56987
這次重啟之后,就繼續(xù)從offset = 56987這個位置繼續(xù)往后去消費就可以了
他的語義是一旦消息給你poll到了之后,這些消息就認為處理完了,后續(xù)就可以提交了,所以這里有兩種問題:
第一,消息丟失,如果你剛poll到消息,然后還沒來得及處理,結(jié)果人家已經(jīng)提交你的offset了,此時你如果consumer宕機,再次重啟,數(shù)據(jù)丟失,因為上一次消費的那批數(shù)據(jù)其實你沒處理,結(jié)果人家認為你處理了
poll到了一批數(shù)據(jù),offset = 65510~65532,人家剛好就是到了時間提交了offset,offset = 65532這個地方已經(jīng)提交給了kafka broker,接著你準備對這批數(shù)據(jù)進行消費,但是不巧的是,你剛要消費就直接宕機了
其實你消費到的數(shù)據(jù)是沒處理的,但是消費offset已經(jīng)提交給kafka了,下次你重啟的時候,offset = 65533這個位置開始消費的,之前的一批數(shù)據(jù)就丟失了
第二,重復(fù)消費,如果你poll到消息,都處理完畢了,此時還沒來得及提交offset,你的consumer就宕機了,再次重啟會重新消費到這一批消息,再次處理一遍,那么就是有消息重復(fù)消費的問題
poll到了一批數(shù)據(jù),offset = 65510~65532,你很快的處理完了,都寫入數(shù)據(jù)庫了,結(jié)果還沒來得及提交offset就宕機了,上一次提交的offset = 65509,重啟,他會再次讓你消費offset = 65510~65532,一樣的數(shù)據(jù)再次重復(fù)消費了一遍,寫入數(shù)據(jù)庫
重啟kafka consumer,修改了他的代碼
14. 如何實現(xiàn)Consumer Group的狀態(tài)機流轉(zhuǎn)機制?
剛開始Consumer Group狀態(tài)是:Empty
接著如果部分consumer發(fā)送了JoinGroup請求,會進入:PreparingRebalance的狀態(tài),等待一段時間其他成員加入,這個時間現(xiàn)在默認就是max.poll.interval.ms來指定的,所以這個時間間隔一般可以稍微大一點
接著如果所有成員都加入組了,就會進入AwaitingSync狀態(tài),這個時候就不能允許任何一個consumer提交offset了,因為馬上要rebalance了,進行重新分配了,這個時候就會選擇一個leader consumer,由他來制定分區(qū)方案
然后leader consumer制定好了分區(qū)方案,SyncGroup請求發(fā)送給coordinator,他再下發(fā)方案給所有的consumer成員,此時進入stable狀態(tài),都可以正?;趐oll來消費了
所以如果說在stable狀態(tài)下,有consumer進入組或者離開崩潰了,那么都會重新進入PreparingRebalance狀態(tài),重新看看當前組里有誰,如果剩下的組員都在,那么就進入AwaitingSync狀態(tài)
leader consumer重新制定方案,然后再下發(fā)
15. 最新設(shè)計的rebalance分代機制可以有什么作用?
大家設(shè)想一個場景,在rebalance的時候,可能你本來消費了partition3的數(shù)據(jù),結(jié)果有些數(shù)據(jù)消費了還沒提交offset,結(jié)果此時rebalance,把partition3分配給了另外一個cnosumer了,此時你如果提交partition3的數(shù)據(jù)的offset,能行嗎?
必然不行,所以每次rebalance會觸發(fā)一次consumer group generation,分代,每次分代會加1,然后你提交上一個分代的offset是不行的,那個partiton可能已經(jīng)不屬于你了,大家全部按照新的partiton分配方案重新消費數(shù)據(jù)
consumer group generation = 1
consumer group generation = 2
16. Consumer端的自定義反序列化器是什么?
17. 自行指定每個Consumer要消費哪些分區(qū)有用嗎?
List partitions = consumer.partitionsFor(“order-topic”);
new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
consumer.assign(partitions); //指定每個consumer要消費哪些分區(qū),你就不是依靠consumer的自動的分區(qū)分配方案來做了
18. 老版本的high-level consumer的實現(xiàn)原理是什么?
producer和consumer api原理,都是新版本的kafka api
老版本的kafka consumer api分成兩種,high-level和low-level,都是基于zk實現(xiàn)的,只不過前者有consumer group的概念,后者沒有
high-level的api,比如說consumer啟動就是在zk里寫一個臨時節(jié)點,但是如果自己宕機了,那么zk臨時節(jié)點就沒了,別人就會發(fā)現(xiàn),然后就會開啟rebalance
然后在消費的時候,可以指定多個線程取消費一個topic,比如說你和這個consumer分配到了5個分區(qū),那么你可以指定最多5個線程,每個線程消費一個分區(qū)的數(shù)據(jù),但是新版本的就一個線程負責(zé)消費所有分區(qū)
在提交offset,就是向zk寫入對某個分區(qū)現(xiàn)在消費到了哪個offset了,默認60秒才提交一次
新版本的api就不基于zk來實現(xiàn)了呢,zk主要是做輕量級的分布式協(xié)調(diào),元數(shù)據(jù)存儲,并不適合高并發(fā)大量連接的場景,cnosumer可能有成百上千個,成千上萬個,zk來做的,連接的壓力,高并發(fā)的讀寫
broker內(nèi)部基于zk來進行協(xié)調(diào)
19. 老版本的low-level consumer的實現(xiàn)原理是什么?
老版本的low-level消費者,是可以自己控制offset的,實現(xiàn)很底層的一些控制,但是需要自己去提交offset,還要自己找到某個分區(qū)對應(yīng)的leader broker,跟他進行連接獲取消息,如果leader變化了,也得自己處理,非常的麻煩
比如說storm-kafka這個插件,在storm消費kafka數(shù)據(jù)的時候,就是使用的low-level api,自己獲取offset,提交寫入zk中自己指定的znode中,但是在未來基本上老版本的會越來越少使用
三、Kafka的時間輪延時調(diào)度機制與架構(gòu)原理總結(jié)
1. Producer的緩沖區(qū)內(nèi)部數(shù)據(jù)結(jié)構(gòu)是什么樣子的?
producer會創(chuàng)建一個accumulator緩沖區(qū),他里面是一個HashMap數(shù)據(jù)結(jié)構(gòu),每個分區(qū)都會對應(yīng)一個batch隊列,因為你打包成出來的batch,那必須是這個batch都是發(fā)往同一個分區(qū)的,這樣才能發(fā)送一個batch到這個分區(qū)的leader broker
{
“order-topic-0” -> [batch1, batch2],
“order-topic-1” -> [batch3]
}
batch.size
每個batch包含三個東西,一個是compressor,這是負責(zé)追加寫入batch的組件;第二個是batch緩沖區(qū),就是寫入數(shù)據(jù)的地方;第三個是thunks,就是每個消息都有一個回調(diào)Callback匿名內(nèi)部類的對象,對應(yīng)batch里每個消息的回調(diào)函數(shù)
每次寫入一條數(shù)據(jù)都對應(yīng)一個Callback回調(diào)函數(shù)的
2. 消息緩沖區(qū)滿的時候是阻塞住還是拋出異常?
max.block.ms,其實就是說如果寫緩沖區(qū)滿了,此時是阻塞住一段時間,然后什么時候拋異常,默認是60000,也就是60秒
3. 負責(zé)IO請求的Sender線程是如何基于緩沖區(qū)發(fā)送數(shù)據(jù)的?
Sender線程會不停的輪詢緩沖區(qū)內(nèi)的HashMap,看batch是否滿了,或者是看linger.ms時間是不是到了,然后就得發(fā)送數(shù)據(jù)去,發(fā)送的時候會根據(jù)各個batch的目標leader broker來進行分組
因為可能不同的batch是對應(yīng)不同的分區(qū),但是不同的分區(qū)的Leader是在一個broker上的,<Node, List>,接著會進一步封裝為<Node, Request>,每個broker一次就是一個請求,但是這里可能包含很多個batch,接著就是將分組好的batch發(fā)送給leader broker,并且處理response,來反過來調(diào)用每個batch的callback函數(shù)
發(fā)送出去的Request會被放入InFlighRequests里面去保存,Map<NodeId, Deque>,這里就代表了發(fā)送出去的請求,但是還沒接收到響應(yīng)的
4. 同時可以接受有幾個發(fā)送到Broker的請求沒收到響應(yīng)?
Map<NodeId, Deque> => 給這個broker發(fā)送了哪些請求過去了?
max.in.flight.requests.per.connection:5
這個參數(shù)默認值是5,默認情況下,每個Broker最多只能有5個請求是發(fā)送出去但是還沒接收到響應(yīng)的,所以這種情況下是有可能導(dǎo)致順序錯亂的,大家一定要搞清楚這一點,先發(fā)送的請求可能后續(xù)要重發(fā)
5. Kafka自定義的基于TCP的二進制協(xié)議深入探秘一番(一)
kafka自定義了一組二進制的協(xié)議,現(xiàn)在一共是包含了43種協(xié)議類型,每種協(xié)議都有對應(yīng)的請求和響應(yīng),Request和Response,其實說白了,如果大家現(xiàn)在看咱們的那個自研分布式存儲系統(tǒng)的課,里面用到了gRPC
你大概可以認為就是定義了43種接口,每個接口就是一種協(xié)議,然后每個接口都有自己對應(yīng)的Request和Response,就這個意思
每個協(xié)議的Request都有相同的請求頭(RequestHeader),也有不同的請求體(RequestBody),請求頭包含了:api_key、api_version、correlation_id、client_id,這里的api_key就類似于“PRODUCE”、“FETCH”,你可以認為是接口的名字吧
“PRODUCE”就是發(fā)送消息的接口,“FETCH”就是拉取消息的接口,就這個意思
api_version,就是這個API的版本號
correlation_id,就是類似客戶端生成的一次請求的唯一標志位,唯一標識一次請求
client_id,就是客戶端的id
每個協(xié)議的Response也有相同的響應(yīng)頭,就是一個correlation_id,就是對某個請求的響應(yīng)
6. Kafka自定義的基于TCP的二進制協(xié)議深入探秘一番(二)
比如說發(fā)送消息,就是ProduceRequest和ProduceResponse,代表“PRODUCE”這個接口的請求和響應(yīng),api_key=0,其實就是“PRODUCE”接口的代表
他的RequestBody,包含了:transactional_id,acks,timeout,topic_data(topic,data(partition,record_set)),acks就是客戶端自己指定的acks參數(shù),這個會指示leader和follower副本的寫入方式,timeout就是超時時間,默認就是30秒,request.timeout.ms
然后就是要寫入哪個topic,哪個分區(qū),以及對應(yīng)數(shù)據(jù)集合,里面是多個batch
ProduceResponse,ResponseBody,包含了responses(topic,partition_responses(partition,error_code,base_offset,log_append_time,log_start_offset)),throttle_time_ms,簡單來說就是當前響應(yīng)是對哪個topic寫入的響應(yīng)
包含了每個topic的各個分區(qū)的響應(yīng),每個partition的寫入響應(yīng),包括error_code錯誤碼,base_offset是消息集合的起始offset,log_append_time是寫入broker端的時間,log_start_offset是分區(qū)的起始offset
其實各種接口大體上來說就是如此,所以現(xiàn)在大家就知道了,協(xié)議就是一種規(guī)定,你發(fā)送過來的請求是什么格式的,他可能有請求頭還有請求體,分別包含哪些字段,按什么格式放數(shù)據(jù),響應(yīng)也是一樣的
然后大家就可以按一樣的協(xié)議來發(fā)送請求和接收響應(yīng)
7. 盤點一下在Broker內(nèi)部有哪些不同場景下會有延時任務(wù)?
比如說acks=-1,那么必須等待leader和follower都寫完才能返回響應(yīng),而且有一個超時時間,默認是30秒,也就是request.timeout.ms,那么在寫入一條數(shù)據(jù)到leader磁盤之后,就必須有一個延時任務(wù),到期時間是30秒
延時任務(wù)會被放到DelayedOperationPurgatory,延時操作管理器中
這個延時任務(wù)如果因為所有follower都寫入副本到本地磁盤了,那么就會被自動觸發(fā)蘇醒,那么就可以返回響應(yīng)結(jié)果給客戶端了,否則的話,這個延時任務(wù)自己指定了最多是30秒到期,如果到了超時時間都沒等到,那么就直接超時返回異常了
還有一種是延時拉取任務(wù),也就是說follower往leader拉取消息的時候,如果發(fā)現(xiàn)是空的,那么此時會創(chuàng)建一個延時拉取任務(wù),然后延時時間到了之后,就會再次讀取一次消息,如果過程中l(wèi)eader寫入了消息那么也會自動執(zhí)行這個拉取任務(wù)
8. Kafka的時間輪延時調(diào)度機制(一):O(1)時間復(fù)雜度
Kafka內(nèi)部有很多延時任務(wù),沒有基于JDK Timer來實現(xiàn),那個插入和刪除任務(wù)的時間復(fù)雜度是O(nlogn),而是基于了自己寫的時間輪來實現(xiàn)的,時間復(fù)雜度是O(1),其實Netty、ZooKeeper、Quartz很多中間件都會實現(xiàn)時間輪
延時任務(wù)是很多很多的,大量的發(fā)送消息以及拉取消息,都會涉及到延時任務(wù),任務(wù)數(shù)量很多,如果基于傳統(tǒng)的JDK Timer把大量的延時任務(wù)頻繁的插入和刪除,時間復(fù)雜度是O(nlogn)性能比較低的
時間輪的機制,延時任務(wù)插入和刪除,O(1)
簡單來說,一個時間輪(TimerWheel)就是一個數(shù)組實現(xiàn)的存放定時任務(wù)的環(huán)形隊列,數(shù)組每個元素都是一個定時任務(wù)列表(TimerTaskList),這個TimerTaskList是一個環(huán)形雙向鏈表,鏈表里的每個元素都是定時任務(wù)(TimerTask)
時間輪是有很多個時間格的,一個時間格就是時間輪的時間跨度tickMs,wheelSize就是時間格的數(shù)量,時間輪的總時間跨度就是tickMs * wheelSize(interval),然后還有一個表盤指針(currentTime),就是時間輪當前所處的時間
currentTime指向的時間格就是到期,需要執(zhí)行里面的定時任務(wù)
比如說tickMs = 1ms,wheelSize = 20,那么時間輪跨度(inetrval)就是20ms,剛開始currentTime = 0,這個時候如果有一個延時2ms之后執(zhí)行的任務(wù)插入進來,就會基于數(shù)組的index直接定位到時間輪底層數(shù)組的第三個元素
因為tickMs = 1ms,所以第一個元素代表的是0ms,第二個元素代表的是1ms的地方,第三個元素代表的就是2ms的地方,直接基于數(shù)組來定位就是O(1)是吧,然后到數(shù)組之后把這個任務(wù)插入其中的雙向鏈表,這個時間復(fù)雜度也是O(1)
所以這個插入定時任務(wù)的時間復(fù)雜度就是O(1)
然后currentTime會隨著時間不斷的推移,1ms之后會指向第二個時間格,2ms之后會指向第三個時間格,這個時候就會執(zhí)行第三個時間格里剛才插入進來要在2ms之后執(zhí)行的那個任務(wù)了
這個時候如果插入進來一個8ms之后要執(zhí)行的任務(wù),那么就會放到第11個時間格上去,相比于currentTime剛好是8ms之后,對吧,就是個意思,然后如果是插入一個19ms之后執(zhí)行的呢?那就會放在第二個時間格
每個插入進來的任務(wù),他都會依據(jù)當前的currentTime來放,最后正好要讓currentTime轉(zhuǎn)動這么多時間之后,正好可以執(zhí)行那個時間格里的任務(wù)
9. Kafka的時間輪延時調(diào)度機制(二):多層級時間輪
接著上一講的內(nèi)容,那如果這個時候來一個350毫秒之后執(zhí)行的定時任務(wù)呢?已經(jīng)超出當前這個時間輪的范圍了,那么就放到上層時間輪,上層時間輪的tickMs就是下層時間輪的interval,也就是20ms
wheelSize是固定的,都是20,那么上層時間輪的inetrval周期就是400ms,如果再上一層的時間輪他的tickMs是400ms,那么interval周期就是8000ms,也就是8s,再上一層時間輪的tickMs是8s,interval就是160s,也就是好幾分鐘了,以此類推即可
反正有很多層級的時間輪,一個時間輪不夠,就往上開辟一個新的時間輪出來,每個時間輪的tickMs是下級時間輪的interval,而且currentTime就跟時鐘的指針一樣是不停的轉(zhuǎn)動的,你只要根據(jù)定時周期把他放入對應(yīng)的輪子即可
每個輪子插入的時候根據(jù)currentTime,放到對應(yīng)時間之后的時間格即可
比如定時350ms后執(zhí)行的任務(wù),就可以放到interval位400ms的時間輪內(nèi),currentTime自然會轉(zhuǎn)動到那個時間格來執(zhí)行他
10. Kafka的時間輪延時調(diào)度機制(三):時間輪層級的下滑
接著上一講的內(nèi)容,那如果這個時候來一個350毫秒之后執(zhí)行的定時任務(wù)呢?已經(jīng)超出當前這個時間輪的范圍了,那么就放到上層時間輪,上層時間輪的tickMs就是下層時間輪的interval,也就是20ms
wheelSize是固定的,都是20,那么上層時間輪的inetrval周期就是400ms,如果再上一層的時間輪他的tickMs是400ms,那么interval周期就是8000ms,也就是8s,再上一層時間輪的tickMs是8s,interval就是160s,也就是好幾分鐘了,以此類推即可
反正有很多層級的時間輪,一個時間輪不夠,就往上開辟一個新的時間輪出來,每個時間輪的tickMs是下級時間輪的interval,而且currentTime就跟時鐘的指針一樣是不停的轉(zhuǎn)動的,你只要根據(jù)定時周期把他放入對應(yīng)的輪子即可
每個輪子插入的時候根據(jù)currentTime,放到對應(yīng)時間之后的時間格即可
比如定時350ms后執(zhí)行的任務(wù),就可以放到interval位400ms的時間輪內(nèi),currentTime自然會轉(zhuǎn)動到那個時間格來執(zhí)行他
11. Kafka的時間輪延時調(diào)度機制(四):基于DelayQueue推動
基于數(shù)組和雙向鏈表來O(1)時間度可以插入任務(wù)
但是推進時間輪怎么做呢?搞一個線程不停的空循環(huán)判斷是否進入下一個時間格嗎?那樣很浪費CPU資源,所以采取的是DelayQueue
每個時間輪里的TimerTaskList作為這個時間格的任務(wù)列表,都會插入DelayQueue中,設(shè)置一個延時出隊時間,DelayQueue會自動把過期時間最短的排在隊頭,然后專門有一個線程來從DelayQueue里獲取到期任務(wù)列表文章來源:http://www.zghlxwxcb.cn/news/detail-698776.html
某個時間格對應(yīng)的TimerTaskList到期之后,就會被線程獲取到,這種方式就可以實現(xiàn)時間輪推進的效果,推進時間輪基于DelayQueue,時間復(fù)雜度也是O(1),因為只要從隊頭獲取即可文章來源地址http://www.zghlxwxcb.cn/news/detail-698776.html
到了這里,關(guān)于Kafka核心原理第二彈——更新中的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!