一、簡介
1.1 Kafka的背景與演變
Kafka是一款由Apache開發(fā)的分布式流處理平臺,它最初是由LinkedIn公司在2010年開發(fā)的。從最初的消息隊(duì)列到如今的分布式流處理平臺Kafka經(jīng)歷了一個逐步演化的過程。
Kafka最開始的設(shè)計(jì)目的是解決LinkedIn內(nèi)部存在的海量數(shù)據(jù)傳輸問題,在其不斷的發(fā)展中Kafka逐漸發(fā)展成為一種可持久化、分布式、身臨其境的發(fā)布/訂閱消息系統(tǒng)。
1.2 Kafka的組成結(jié)構(gòu)
Kafka的核心模塊包括生產(chǎn)者、消費(fèi)者和代理三部分:
-
生產(chǎn)者可以發(fā)送消息至Kafka集群,以供后續(xù)的消費(fèi)者進(jìn)行消費(fèi)。
-
消費(fèi)者可以從Kafka集群中讀取數(shù)據(jù)并對其進(jìn)行響應(yīng)的操作。消費(fèi)者可以根據(jù)需要自由地決定何時啟動信號,以及在何時對消息進(jìn)行響應(yīng)。
-
代理是Kafka集群的關(guān)鍵組件之一,它主要負(fù)責(zé)消息的存儲和轉(zhuǎn)發(fā),并通過分布式機(jī)制保障Kafka集群的故障恢復(fù)能力和高可用性.
1.3 Kafka的優(yōu)勢和適用場景
Kafka基于高度可擴(kuò)展的架構(gòu)設(shè)計(jì),具有如下特性:
-
支持任意數(shù)量的生產(chǎn)者和消費(fèi)者,可以針對不同領(lǐng)域的數(shù)據(jù)模型、處理技術(shù)等進(jìn)行選擇和組合.
-
支持消息持久化存儲,在節(jié)點(diǎn)宕機(jī)或網(wǎng)絡(luò)故障時可以進(jìn)行可靠的數(shù)據(jù)恢復(fù)。
-
基于分布式設(shè)計(jì)原則,解決了海量數(shù)據(jù)傳輸和存儲成本問題。
-
適用于大規(guī)模的數(shù)據(jù)處理與實(shí)時數(shù)據(jù)流處理,如日志收集、在線分析、廣告引擎以及電商中的實(shí)時推薦等應(yīng)用場景。
下面是基于Kafka的Java代碼 供參考:
//創(chuàng)建kafka生產(chǎn)者
Properties properties = new Properties();
//服務(wù)地址,配置Kafka集群的服務(wù)器地址及端口
properties.put("bootstrap.servers", "localhost:9092");
//key序列化器,需要將發(fā)送給Kafka集群的key從對象轉(zhuǎn)換為字間接歷
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//value序列化器,需要將發(fā)送給Kafka集群的value從對象轉(zhuǎn)換為字節(jié)流
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//創(chuàng)建生產(chǎn)者對象
Producer<String, String> producer = new KafkaProducer<String, String>(properties);
//定義消息主題
String $topicName = "test-topic";
//定義要發(fā)送的消息內(nèi)容
String $value = "Kafka sends the message.";
//創(chuàng)建消息對象
ProducerRecord<String, String> $record = new ProducerRecord<String, String>($topicName, $value);
//發(fā)送消息
producer.send($record);
//關(guān)閉生產(chǎn)者實(shí)例
producer.close();
//創(chuàng)建kafka消費(fèi)者
//與生產(chǎn)者相同,需要配置消費(fèi)者訂閱主題,反序列化器等參數(shù)
//創(chuàng)建消費(fèi)者對象
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//訂閱消息主題
consumer.subscribe(Collections.singletonList("test-topic"));
while (true) {
//定期拉取消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
//消費(fèi)者處理已拉取的消息
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
以上代碼涵蓋了Kafka的生產(chǎn)者和消費(fèi)者在Java中的基本使用
二、Kafka架構(gòu)設(shè)計(jì)
2.1 Kafka Broker
2.1.1 Broker角色與特性
Kafka Broker是Kafka集群中的一臺或多臺服務(wù)器負(fù)責(zé)管理消息的存儲、傳輸和復(fù)制。每個Broker都有一個唯一的ID并且可以分配一個或多個Partition。
Kafka Broker有以下特性:
- 高吞吐量:Kafka Broker可以同時處理上千個Producer和Consumer的請求,并支持?jǐn)?shù)十萬級別的消息吞吐量。
- 高可用性:Kafka Broker可以通過將數(shù)據(jù)復(fù)制到多個節(jié)點(diǎn)來提高容錯性和可用性,保證系統(tǒng)故障時數(shù)據(jù)不會丟失。
- 可擴(kuò)展性:Kafka Broker可以通過添加更多的節(jié)點(diǎn)來擴(kuò)展集群規(guī)模,并且支持在線節(jié)點(diǎn)擴(kuò)容和縮容。
2.1.2 Broker之間的數(shù)據(jù)同步機(jī)制
Kafka Broker之間的數(shù)據(jù)同步采用分布式副本機(jī)制常見的同步方式有兩種:
-
Leader-Follower同步:一個Partition的某一個Broker被選為Leader,所有寫入該P(yáng)artition的消息都要先發(fā)送到該Leader,由Leader進(jìn)行消息確認(rèn)和數(shù)據(jù)復(fù)制,其他Follower節(jié)點(diǎn)從Leader中拉取數(shù)據(jù)并且只能讀取不能寫入
-
ISR同步:In-Sync Replicas(ISR)是指與Leader節(jié)點(diǎn)保持?jǐn)?shù)據(jù)同步的Follower節(jié)點(diǎn)。Leader接收到消息后會廣播到所有的Follower節(jié)點(diǎn),只有Follower節(jié)點(diǎn)成功接收并復(fù)制了Leader最新的一條消息后才能被認(rèn)為是ISR中的一員,則說明它們和Leader保持了一致的數(shù)據(jù)狀態(tài)。如果某個Follower在指定時間內(nèi)沒有復(fù)制Leader的最新消息,則會被剔除出ISR
2.2 Kafka消息存儲模型
2.2.1 分區(qū)Partition和偏移量Offset
Kafka消息通過Partition進(jìn)行管理一個Topic可以被分為多個Partition,每個Partition中的消息都是有序的。Producer在發(fā)送消息時需要指定消息所屬的Partition
每個Partition中的每條消息都有一個唯一的偏移量Offset,用于標(biāo)識Partition中消息的唯一位置Offset從0開始遞增。Consumer在消費(fèi)消息時需要指定消費(fèi)的Partition和起始的偏移量Offset。
2.2.2 日志Log和索引Index
Kafka使用日志文件來存儲消息,每個Partition都有一個對應(yīng)的日志文件(Log)。Kafka將消息追加到日志文件的尾部,并且不支持刪除或更新已經(jīng)追加到日志文件中的消息。
為了更快速的找到消息Kafka維護(hù)了一個基于內(nèi)存的索引文件(Index),每個索引文件對應(yīng)一個日志文件。索引文件中記錄著每個消息的偏移量Offset以及該Offset對應(yīng)的物理位置,當(dāng)Consumer需要讀取某個Offset對應(yīng)的消息時,Kafka可以快速的定位該消息在日志文件中的物理位置。
2.3 Kafka消息傳輸協(xié)議
2.3.1 生產(chǎn)者Producer
生產(chǎn)者Producer負(fù)責(zé)生產(chǎn)消息并將消息發(fā)送到Kafka Broker,消息被發(fā)送到指定的Topic和Partition。Producer通過給定的Partition策略選擇一個Partition來發(fā)送消息。Partition策略可以根據(jù)環(huán)形、隨機(jī)、哈希等方式進(jìn)行選擇。
2.3.2 消費(fèi)者Consumer
消費(fèi)者Consumer從指定的Partition中消費(fèi)消息,并且維護(hù)每個Partition的偏移量Offset。Consumer可以通過指定起始Offset來讀取歷史消息,也可以從當(dāng)前最新的Offset開始讀取新消息。消費(fèi)者之間可以進(jìn)行負(fù)載均衡,以實(shí)現(xiàn)高吞吐量和更好的可靠性。
2.3.3 中間件Middleware
Kafka提供了一些中間件來簡化消息的生產(chǎn)和消費(fèi),如連接器、轉(zhuǎn)換器和攔截器等。其中連接器可以將其他系統(tǒng)的數(shù)據(jù)轉(zhuǎn)換為Kafka消息,轉(zhuǎn)換器可以對消息進(jìn)行格式轉(zhuǎn)換和修改,攔截器可以對消息進(jìn)行過濾、監(jiān)控等處理操作,以適應(yīng)各種場景下的需求。
三、Kafka設(shè)計(jì)原則
Kafka是一個高性能、可擴(kuò)展且分布式的消息隊(duì)列系統(tǒng)其設(shè)計(jì)遵循了以下原則:
3.1 單一職責(zé)原則
Kafka采用了分布式的架構(gòu)設(shè)計(jì),通過將數(shù)據(jù)進(jìn)行分片存儲實(shí)現(xiàn)了高性能和可擴(kuò)展性。同時,它還將消息的生產(chǎn)、消費(fèi)和存儲分離開來,每個組件都只關(guān)注自己的職責(zé)因此符合單一職責(zé)原則。
3.2 開閉原則
Kafka的設(shè)計(jì)具有良好的擴(kuò)展性可以通過增加Broker節(jié)點(diǎn)和更改Topic的分區(qū)數(shù)量等方式來滿足不同的業(yè)務(wù)需求。這得益于Kafka采用了面向接口編程的設(shè)計(jì)模式,同時對修改關(guān)閉對擴(kuò)展開放,因此符合開閉原則。
3.3 迪米特法則
Kafka的各個模塊之間的依賴關(guān)系設(shè)計(jì)得非常簡潔明了沒有不必要的耦合。例如,Producer只需要知道 Broker 的地址而不需要了解 Broker 具體的實(shí)現(xiàn)細(xì)節(jié)。這種松散的耦合關(guān)系降低了各模塊之間的依賴程度,符合迪米特法則。
3.4 接口隔離原則
Kafka中的接口設(shè)計(jì)非常清晰明了,每個接口都只包含必要的方法,不存在臃腫的接口。例如Producer接口中只有一個send方法,而不包含其他和消息生產(chǎn)無關(guān)的方法。這樣做的好處是接口的修改不會影響到其他無關(guān)的模塊符合接口隔離原則
3.5 依賴倒置原則
Kafka采用了依賴注入的設(shè)計(jì)模式,即底層模塊不依賴于高層模塊而是高層模塊依賴于底層模塊的抽象。比如Consumer使用的是ConsumerConnector接口而不是具體的實(shí)現(xiàn)類,這種依賴倒置的設(shè)計(jì)能夠提高系統(tǒng)的靈活性和可維護(hù)性。
代碼示例:
public interface ConsumerConnector {
/**
* 創(chuàng)建指定數(shù)量的MessageStreams
* @param topicCountMap 表示需要消費(fèi)的每個topic的流的數(shù)量
* @return 每個topic對應(yīng)的MessageStreams
*/
Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams(
Map<String, Integer> topicCountMap);
}
public class Consumer {
private final ConsumerConnector consumerConnector;
public Consumer(ConsumerConnector consumerConnector) {
this.consumerConnector = consumerConnector;
}
public void consume(Map<String, Integer> topicCountMap) {
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreamsMap =
consumerConnector.createMessageStreams(topicCountMap);
//TODO 消費(fèi)消息的邏輯
}
}
上面的代碼中,Consumer依賴于ConsumerConnector接口,通過在構(gòu)造函數(shù)中注入具體的實(shí)現(xiàn)類實(shí)例化。ConsumerConnector
定義了創(chuàng)建需要消費(fèi)的每個topic的流的基本方法createMessageStreams
,而Consumer
則調(diào)用了該方法進(jìn)行消息的消費(fèi)處理。這種設(shè)計(jì)遵循了依賴倒置原則,使得系統(tǒng)具有更高的可擴(kuò)展性和可維護(hù)性。
4.1 Kafka集群規(guī)劃與配置
在進(jìn)行Kafka集群規(guī)劃時,需要考慮以下幾個方面:
4.1.1 節(jié)點(diǎn)規(guī)劃
節(jié)點(diǎn)規(guī)劃包括機(jī)器硬件規(guī)格和集群的節(jié)點(diǎn)數(shù)。Kafka在寫入速度和消息可靠性之間做了平衡,因此需要足夠多的節(jié)點(diǎn)以支持吞吐量和數(shù)據(jù)可靠性的要求。
4.1.2 副本系數(shù)設(shè)置
Kafka采用副本機(jī)制來提供數(shù)據(jù)冗余和故障轉(zhuǎn)移。為了保證數(shù)據(jù)可靠性,需要考慮設(shè)置適當(dāng)?shù)母北鞠禂?shù)。通常情況下,可設(shè)置為3或者以上。
4.1.3 日志保留時間和尺寸設(shè)置
Kafka的消息存儲是基于日志文件的,需要考慮設(shè)置適當(dāng)?shù)娜罩颈A魰r間和尺寸限制,避免占用過多磁盤空間和影響讀寫性能。
4.2 Kafka消息生產(chǎn)者的最佳實(shí)踐
4.2.1 數(shù)據(jù)分區(qū)和應(yīng)答機(jī)制的設(shè)置
在數(shù)據(jù)寫入時,需要設(shè)置合適的數(shù)據(jù)分區(qū)機(jī)制以便在后續(xù)的消費(fèi)中實(shí)現(xiàn)負(fù)載均衡和高吞吐量。此外,應(yīng)該設(shè)置合適的應(yīng)答機(jī)制,保證數(shù)據(jù)可靠性。
4.2.2 消息壓縮機(jī)制的設(shè)置
為了節(jié)省帶寬和提高網(wǎng)絡(luò)傳輸效率,可以考慮開啟消息壓縮機(jī)制,減少消息的傳輸量。
4.3 Kafka消息消費(fèi)者的最佳實(shí)踐
4.3.1 消費(fèi)進(jìn)度的管理和控制
在進(jìn)行消息消費(fèi)時,需要管理和控制消費(fèi)進(jìn)度,保證數(shù)據(jù)的完整性和可靠性。一般會采用Kafka內(nèi)置的消費(fèi)者群組和偏移量管理機(jī)制來實(shí)現(xiàn)。
4.3.2 批量處理和事務(wù)機(jī)制的使用
為了提高消息處理效率和避免數(shù)據(jù)不一致的問題,可以采用批量處理和事務(wù)機(jī)制來進(jìn)行消息消費(fèi)和處理。
4.4 Kafka安全機(jī)制的最佳實(shí)踐
4.4.1 認(rèn)證和授權(quán)機(jī)制的設(shè)置
Kafka可以通過設(shè)置認(rèn)證和授權(quán)機(jī)制來保證集群中的安全性和數(shù)據(jù)可信賴性。常見的方式包括Kerberos、SSL/TLS、SASL等。
4.4.2 數(shù)據(jù)加密和傳輸加密機(jī)制的設(shè)置
為了保證消息傳輸?shù)陌踩裕梢圆捎脭?shù)據(jù)加密和傳輸加密機(jī)制。Kafka支持配置SSL/TLS協(xié)議來進(jìn)行數(shù)據(jù)加密和傳輸加密。
五、小結(jié)回顧:
Kafka作為一個分布式的消息系統(tǒng),具有高吞吐量、數(shù)據(jù)冗余和可靠性等特點(diǎn)。在進(jìn)行Kafka的使用和部署時,需要考慮集群規(guī)劃與配置、消息生產(chǎn)者的最佳實(shí)踐、消息消費(fèi)者的最佳實(shí)踐以及安全機(jī)制的最佳實(shí)踐等方面。通過采用合適的技術(shù)手段和方案,可以在高并發(fā)、大數(shù)據(jù)量應(yīng)用場景中提供高效、可靠的處理服務(wù)。
四、Kafka最佳實(shí)踐
4.1 Kafka集群規(guī)劃與配置
4.1.1 節(jié)點(diǎn)規(guī)劃
Kafka集群應(yīng)該至少由3個節(jié)點(diǎn)組成,以確保高可用性。節(jié)點(diǎn)數(shù)量也應(yīng)該根據(jù)實(shí)際需求進(jìn)行擴(kuò)展。
4.1.2 副本系數(shù)設(shè)置
應(yīng)根據(jù)數(shù)據(jù)的重要性和備份策略來設(shè)置副本系數(shù),通常建議將副本系數(shù)設(shè)置為至少2或3。
4.1.3 日志保留時間和尺寸設(shè)置
Kafka通過日志保留時間和日志尺寸限制來控制磁盤空間的使用。可以根據(jù)業(yè)務(wù)需求、日志備份和恢復(fù)策略來配置日志保留時間和尺寸。
4.2 Kafka消息生產(chǎn)者的最佳實(shí)踐
4.2.1 數(shù)據(jù)分區(qū)和應(yīng)答機(jī)制的設(shè)置
應(yīng)根據(jù)數(shù)據(jù)的重要性和可靠性選擇合適的數(shù)據(jù)分區(qū)方案和應(yīng)答機(jī)制。在數(shù)據(jù)分區(qū)時,應(yīng)按照業(yè)務(wù)和數(shù)據(jù)的特性進(jìn)行劃分,以充分利用集群中的多個節(jié)點(diǎn)。
4.2.2 消息壓縮機(jī)制的設(shè)置
對于一些數(shù)據(jù)量較大的場景可以開啟消息壓縮機(jī)制以減小傳輸數(shù)據(jù)的大小和網(wǎng)絡(luò)帶寬的使用。
4.3 Kafka消息消費(fèi)者的最佳實(shí)踐
4.3.1 消費(fèi)進(jìn)度的管理和控制
在消費(fèi)消息時應(yīng)注意消息消費(fèi)的進(jìn)度管理和控制包括設(shè)置消費(fèi)者組、消費(fèi)消息的位置和偏移量等。如果消費(fèi)者組中的某個消費(fèi)者離線,Kafka將自動將其分區(qū)重新分配給其他消費(fèi)者,以保證數(shù)據(jù)的完整性和不間斷性。
4.3.2 批量處理和事務(wù)機(jī)制的使用
對于一些批量處理場景可以使用批量處理方式進(jìn)行消息消費(fèi),以提高消費(fèi)效率和降低連接和網(wǎng)絡(luò)開銷。同時,可以使用Kafka的事務(wù)機(jī)制對消息進(jìn)行原子操作和批量提交。
4.4 Kafka安全機(jī)制的最佳實(shí)踐
4.4.1 認(rèn)證和授權(quán)機(jī)制的設(shè)置
在Kafka集群中應(yīng)該啟用認(rèn)證和授權(quán)機(jī)制以確保Kafka集群的安全性和數(shù)據(jù)隱私性??梢赃x擇使用Kerberos或SSL等方式進(jìn)行身份認(rèn)證和數(shù)據(jù)傳輸加密。文章來源:http://www.zghlxwxcb.cn/news/detail-468327.html
4.4.2 數(shù)據(jù)加密和傳輸加密機(jī)制的設(shè)置
對于一些數(shù)據(jù)敏感的場景可以使用數(shù)據(jù)加密和傳輸加密機(jī)制來保護(hù)數(shù)據(jù)的隱私和安全性。Kafka支持基于SSL和TLS的通信加密機(jī)制,也支持對消息進(jìn)行AES和RSA等算法的加密處理。文章來源地址http://www.zghlxwxcb.cn/news/detail-468327.html
// 舉例:Kafka消息生產(chǎn)者的數(shù)據(jù)分區(qū)設(shè)置
public class KafkaProducerDemo {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
}
producer.close();
}
}
到了這里,關(guān)于深入理解Kafka:架構(gòu)、設(shè)計(jì)原則及最佳實(shí)踐的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!