一、前言
在數(shù)據(jù)庫系統(tǒng)中有個概念叫事務,事務的作用是為了保證數(shù)據(jù)的一致性,意思是要么數(shù)據(jù)成功,要么數(shù)據(jù)失敗,不存在數(shù)據(jù)操作了一半的情況,這就是數(shù)據(jù)的一致性。在很多系統(tǒng)或者組件中,很多場景都需要保證數(shù)據(jù)的一致性,有的是高度的一致性。特別是在交易系統(tǒng)等這樣場景。有些組件的數(shù)據(jù)不一定需要高度保證數(shù)據(jù)的一致性,比如日志系統(tǒng)。本節(jié)從從kafka如何保證數(shù)據(jù)一致性看通常數(shù)據(jù)一致性設計。
二、kafka那些環(huán)節(jié)存在數(shù)據(jù)不一致性
- 數(shù)據(jù)復制:在Kafka中,數(shù)據(jù)從主節(jié)點(leader)復制到從節(jié)點(follower)的過程中,由于網(wǎng)絡延遲、節(jié)點故障或其他原因,可能導致從節(jié)點未能及時獲取或處理主節(jié)點的數(shù)據(jù)變更,從而產生數(shù)據(jù)不一致。
- 消息提交:Kafka中的消息提交涉及多個階段,包括生產者發(fā)送消息、消息被寫入日志、消息被復制到從節(jié)點等。如果在這個過程中發(fā)生錯誤或異常,可能導致消息丟失或重復,進而引發(fā)數(shù)據(jù)不一致。
- 消費者處理:消費者在處理消息時,如果因為某些原因(如網(wǎng)絡中斷、消費者進程崩潰等)未能成功處理消息,而消息又被重新投遞給其他消費者處理,也可能導致數(shù)據(jù)不一致。
- 分區(qū)重新分配:在Kafka中,如果分區(qū)的leader節(jié)點發(fā)生故障,Kafka會觸發(fā)分區(qū)重新分配,將leader切換到其他節(jié)點。在這個過程中,如果切換不及時或切換過程中發(fā)生錯誤,可能導致數(shù)據(jù)不一致。
三、kafka如何保證數(shù)據(jù)一致性(內容摘自半畝方塘立身)
我們知道Kafka架構如下,主要由 Producer、Broker、Consumer 三部分組成。一條消息從生產到消費完成這個過程,可以劃分三個階段,生產階段、存儲階段、消費階段。
? 生產階段: 在這個階段,從消息在 Producer 創(chuàng)建出來,經過網(wǎng)絡傳輸發(fā)送到 Broker 端。
? 存儲階段: 在這個階段,消息在 Broker 端存儲,如果是集群,消息會在這個階段被復制到其他的副本上。
? 消費階段: 在這個階段,Consumer 從 Broker 上拉取消息,經過網(wǎng)絡傳輸發(fā)送到Consumer上。
那么如何保證消息不丟我們可以從這三部分來分析。
消息傳遞語義
在深度剖析消息丟失場景之前,我們先來聊聊「消息傳遞語義」到底是個什么玩意?
所謂的消息傳遞語義是 Kafka 提供的 Producer 和 Consumer 之間的消息傳遞過程中消息傳遞的保證性。主要分為三種。
作者:半畝方塘立身
鏈接:https://zhuanlan.zhihu.com/p/682321210
來源:知乎
著作權歸作者所有。商業(yè)轉載請聯(lián)系作者獲得授權,非商業(yè)轉載請注明出處。
?
1. 首先當 Producer 向 Broker 發(fā)送數(shù)據(jù)后,會進行 commit
,如果 commit
成功,由于 Replica
副本機制的存在,則意味著消息不會丟失,但是 Producer
發(fā)送數(shù)據(jù)給 Broker
后,遇到網(wǎng)絡問題而造成通信中斷,那么 Producer
就無法準確判斷該消息是否已經被提交(commit
),這就可能造成 at least once
語義。
2. 在 Kafka 0.11.0.0
之前, 如果 Producer
沒有收到消息 commit
的響應結果,它只能重新發(fā)送消息,確保消息已經被正確的傳輸?shù)?Broker,重新發(fā)送的時候會將消息再次寫入日志中;而在 0.11.0.0
版本之后, Producer
支持冪等傳遞選項,保證重新發(fā)送不會導致消息在日志出現(xiàn)重復。為了實現(xiàn)這個, Broker
為 Producer
分配了一個ID,并通過每條消息的序列號進行去重。也支持了類似事務語義來保證將消息發(fā)送到多個 Topic 分區(qū)中,保證所有消息要么都寫入成功,要么都失敗,這個主要用在 Topic 之間的 exactly once
語義。 其中啟用冪等傳遞的方法配置:enable.idempotence = true
。 啟用事務支持的方法配置:設置屬性 transcational.id = "指定值"
。
3. 從 Consumer
角度來剖析, 我們知道 Offset
是由 Consumer
自己來維護的, 如果 Consumer
收到消息后更新 Offset
, 這時 Consumer
異常 crash
掉, 那么新的 Consumer
接管后再次重啟消費,就會造成 at most once
語義(消息會丟,但不重復)。
4. 如果 Consumer
消費消息完成后, 再更新 Offset
,如果這時 Consumer crash
掉,那么新的 Consumer
接管后重新用這個 Offset
拉取消息, 這時就會造成 at least once
語義(消息不丟,但被多次重復處理)。
總結: 默認 Kafka 提供「at least once」語義的消息傳遞,允許用戶通過在處理消息之前保存 Offset的方式提供 「at mostonce」 語義。如果我們可以自己實現(xiàn)消費冪等,理想情況下這個系統(tǒng)的消息傳遞就是嚴格的「exactly once」, 也就是保證不丟失、且只會被精確的處理一次,但是這樣是很難做到的。
接下來我們從生產階段、存儲階段、消費階段這幾方面看下kafka如何保證消息不丟失。
生產階段
通過深入解析Kafka消息發(fā)送過程我們知道Kafka生產者異步發(fā)送消息并返回一個Future,代表發(fā)送結果。首先需要我們獲取返回結果判斷是否發(fā)送成功。
// 異步發(fā)送消息,并設置回調函數(shù)
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.err.println("消息發(fā)送失敗: " + exception.getMessage());
} else {
System.out.println("消息發(fā)送成功到主題: " + metadata.topic() + ", 分區(qū): " + metadata.partition() + ", 偏移量: " + metadata.offset());
}
}
});
?
?
消息隊列通過最常用的請求確認機制,來保證消息的可靠傳遞:當你的代碼調用發(fā)消息方法時,消息隊列的客戶端會把消息發(fā)送到 Broker,Broker 收到消息后,會給客戶端返回一個確認響應,表明消息已經收到了。客戶端收到響應后,完成了一次正常消息的發(fā)送。
Producer(生產者)保證消息不丟失的方法:
1. 發(fā)送確認機制:Producer可以使用Kafka的acks參數(shù)來配置發(fā)送確認機制。通過設置合適的acks參數(shù)值,Producer可以在消息發(fā)送后等待Broker的確認。確認機制提供了不同級別的可靠性保證,包括:
? acks=0:Producer在發(fā)送消息后不會等待Broker的確認,這可能導致消息丟失風險。
? acks=1:Producer在發(fā)送消息后等待Broker的確認,確保至少將消息寫入到Leader副本中。
? acks=all或acks=-1:Producer在發(fā)送消息后等待Broker的確認,確保將消息寫入到所有ISR(In-Sync Replicas)副本中。這提供了最高的可靠性保證。
2. 消息重試機制:Producer可以實現(xiàn)消息的重試機制來應對發(fā)送失敗或異常情況。如果發(fā)送失敗,Producer可以重新發(fā)送消息,直到成功或達到最大重試次數(shù)。重試機制可以保證消息不會因為臨時的網(wǎng)絡問題或Broker故障而丟失。
?
?
Broker存儲階段
正常情況下,只要 Broker 在正常運行,就不會出現(xiàn)丟失消息的問題,但是如果 Broker 出現(xiàn)了故障,比如進程死掉了或者服務器宕機了,還是可能會丟失消息的。
在kafka高性能設計原理中我們了解到kafka為了提高性能用到了 Page Cache
技術.在讀寫磁盤日志文件時,其實操作的都是內存,然后由操作系統(tǒng)決定什么時候將 Page Cache 里的數(shù)據(jù)真正刷入磁盤。如果內存中數(shù)據(jù)還未刷入磁盤服務宕機了,這個時候還是會丟消息的。
為了最大程度地降低數(shù)據(jù)丟失的可能性,我們可以考慮以下方法:
-
持久化配置優(yōu)化:可以通過調整 Kafka 的持久化配置參數(shù)來控制數(shù)據(jù)刷盤的頻率,從而減少數(shù)據(jù)丟失的可能性。例如,可以降低
flush.messages
和flush.ms
參數(shù)的值,以更頻繁地刷寫數(shù)據(jù)到磁盤。 - 副本因子增加:在 Kafka 中,可以為每個分區(qū)設置多個副本,以提高數(shù)據(jù)的可靠性。當某個 broker 發(fā)生故障時,其他副本仍然可用,可以避免數(shù)據(jù)丟失。
-
使用acks=all:在生產者配置中,設置
acks=all
可以確保消息在所有ISR(In-Sync Replicas)中都得到確認后才被視為發(fā)送成功。這樣可以確保消息被復制到多個副本中,降低數(shù)據(jù)丟失的風險。 - 備份數(shù)據(jù):定期備份 Kafka 的數(shù)據(jù),以便在發(fā)生災難性故障時可以進行數(shù)據(jù)恢復。
消費階段
消費階段采用和生產階段類似的確認機制來保證消息的可靠傳遞,客戶端從 Broker 拉取消息后,執(zhí)行用戶的消費業(yè)務邏輯,成功后,才會給 Broker 發(fā)送消費確認響應。如果 Broker 沒有收到消費確認響應,下次拉消息的時候還會返回同一條消息,確保消息不會在網(wǎng)絡傳輸過程中丟失,也不會因為客戶端在執(zhí)行消費邏輯中出錯導致丟失。
- 自動提交位移:Consumer可以選擇啟用自動提交位移的功能。當Consumer成功處理一批消息后,它會自動提交當前位移,標記為已消費。這樣即使Consumer發(fā)生故障,它可以使用已提交的位移來恢復并繼續(xù)消費之前未處理的消息。
- 手動提交位移:Consumer還可以選擇手動提交位移的方式。在消費一批消息后,Consumer可以顯式地提交位移,以確保處理的消息被正確記錄。這樣可以避免重復消費和位移丟失的問題。
作者:半畝方塘立身
鏈接:https://zhuanlan.zhihu.com/p/682321210
來源:知乎
著作權歸作者所有。商業(yè)轉載請聯(lián)系作者獲得授權,非商業(yè)轉載請注明出處。
// 創(chuàng)建消費者實例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 訂閱主題
consumer.subscribe(Collections.singletonList(topic));
try {
while (true) {
// 消費消息
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
// 處理消息邏輯
System.out.println("消費消息:Topic = " + record.topic() +
", Partition = " + record.partition() +
", Offset = " + record.offset() +
", Key = " + record.key() +
", Value = " + record.value());
// 手動提交位移
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
OffsetAndMetadata offsetMetadata = new OffsetAndMetadata(record.offset() + 1);
consumer.commitSync(Collections.singletonMap(topicPartition, offsetMetadata));
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
?四、數(shù)據(jù)一致系統(tǒng)設計特點
從kafka如何保證數(shù)據(jù)一致性看通常數(shù)據(jù)一致性設計,一般保證數(shù)據(jù)一致性,需要通過成功后commit的操作,消費過程中記錄小標。成功與失敗的環(huán)節(jié)都記上標志。
Kafka作為一個分布式發(fā)布-訂閱消息系統(tǒng),其數(shù)據(jù)一致性的系統(tǒng)設計特點主要包括以下幾個方面:文章來源:http://www.zghlxwxcb.cn/news/detail-826839.html
- 分區(qū)與副本機制:Kafka將數(shù)據(jù)分成多個分區(qū)(Partition),每個分區(qū)在集群中有多個副本(Replica)。這些副本分布在不同的Broker上,以實現(xiàn)數(shù)據(jù)的冗余備份和高可用性。當某個Broker發(fā)生故障時,其他Broker上的副本可以接管服務,保證數(shù)據(jù)的持續(xù)可用。
- ISR(In-Sync Replicas)機制:ISR是Kafka中用于維護數(shù)據(jù)一致性的重要機制。它包含所有與Leader保持同步的副本。當ISR中的副本數(shù)量不足時,Kafka會暫停寫入操作,以防止數(shù)據(jù)不一致。只有當ISR中的副本數(shù)量恢復到一定數(shù)量時,才會恢復寫入操作。
- 消息提交確認:生產者發(fā)送消息到Kafka時,需要等待消息被寫入ISR中的副本并得到確認,以確保消息被成功存儲。同時,消費者在處理消息時也需要定期提交偏移量(Offset),以便在發(fā)生故障時能夠從正確的位置繼續(xù)消費。
- 原子性操作:Kafka保證消息在分區(qū)內的順序性和原子性。這意味著在同一分區(qū)內的消息會按照發(fā)送的順序被消費,且不會被其他消息插入打斷。這有助于保證數(shù)據(jù)的一致性和正確性。
- 容錯處理:當Kafka集群中的節(jié)點發(fā)生故障時,Kafka會自動進行故障轉移和恢復操作。這包括從ISR中選擇新的Leader、重新同步數(shù)據(jù)等,以確保數(shù)據(jù)的持續(xù)可用和一致性。
總之,Kafka通過分區(qū)與副本機制、ISR機制、消息提交確認、原子性操作和容錯處理等手段,確保了其數(shù)據(jù)一致性的系統(tǒng)設計特點。這些設計使得Kafka能夠在分布式環(huán)境中實現(xiàn)高吞吐量、持久化存儲、可擴展性和高可靠性等特性,從而滿足各種復雜場景下的數(shù)據(jù)一致性需求。文章來源地址http://www.zghlxwxcb.cn/news/detail-826839.html
到了這里,關于從kafka如何保證數(shù)據(jù)一致性看通常數(shù)據(jù)一致性設計的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!