一、簡介
簡介
Apache Kafka 是由 Apache 軟件基金會開發(fā)的一個開源流處理平臺,用于處理實時的大規(guī)模數(shù)據(jù)流。Kafka 的目標(biāo)是為了處理活躍的流式數(shù)據(jù),包括傳感器數(shù)據(jù),網(wǎng)站日志,應(yīng)用程序內(nèi)部的消息,等等。它可以處理成千上萬的消息,并讓你迅速地處理和存儲這些消息。在 Kafka 中,生產(chǎn)者負(fù)責(zé)將消息發(fā)送到 Kafka 集群中的 Broker,消費者則從 Broker 訂閱并接收消息。
架構(gòu)
Kafka 的架構(gòu)由 Producer,Broker 和 Consumer 三部分組成,同時具備高并發(fā)、高吞吐量和分布式等特點。Producer 可以將消息發(fā)送到 Broker,Consumer 可以從 Broker 訂閱和接收消息,而 Broker 則可以存儲多個 Topic。一個 Topic 可以有多個 Partition,Partition 中的消息可以通過 Offset 進(jìn)行管理,Kafka 中的消息以 Append-only 形式進(jìn)行存儲。
二、Kafka 安裝和配置
JDK
- 下載 JDK,例如:jdk-8u291-linux-x64.tar.gz。
- 解壓 JDK 到任意目錄,例如 /usr/lib/jvm/jdk1.8.0_291。
- 配置環(huán)境變量,例如:
$ export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_291
$ export CLASSPATH=.:$JAVA_HOME/lib:$JRE_HOME/lib:$CLASSPATH
$ export PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATH
安裝 Kafka
- 下載 Kafka,例如:kafka_2.12-2.8.0.tgz。
- 解壓 Kafka 到任意目錄,例如 /opt/kafka。
- 修改配置文件,根據(jù)需要修改 server.properties 文件。
配置文件詳解
Kafka 的配置文件位于 config/server.properties。下面是一些常用的配置項及其含義:
- broker.id,Broker 的唯一標(biāo)識符。
- advertised.listeners,監(jiān)聽該 Broker 的客戶端連接地址和端口。
- log.dirs,消息存儲文件目錄。
- zookeeper.connect,使用的 ZooKeeper 地址和端口。
- num.network.threads,用于處理網(wǎng)絡(luò)請求的線程數(shù)。
- num.io.threads,用于處理磁盤 IO 的線程數(shù)。
- socket.receive.buffer.bytes 和 socket.send.buffer.bytes,用于控制 TCP 緩沖區(qū)大小。
- group.initial.rebalance.delay.ms,當(dāng) Consumer Group 內(nèi)有 Consumer 加入或離開時,延遲多久再開始重新 balabce。
- auto.offset.reset,Consumer Group 在消費新的 Topic 或 Partition 時的 offset 已經(jīng)不存在時,如何設(shè)置 offset,默認(rèn)是 latest。
三、Kafka 的基本操作
啟動和關(guān)閉
//啟動Kafka
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
//關(guān)閉Kafka
$KAFKA_HOME/bin/kafka-server-stop.sh
Topic 創(chuàng)建和刪除
import kafka.admin.AdminUtils;
import kafka.utils.ZkUtils;
//創(chuàng)建Topic
String topicName = "test";
int numPartitions = 3;
int replicationFactor = 2;
Properties topicConfig = new Properties();
AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig);
//刪除Topic
AdminUtils.deleteTopic(zkUtils, topicName);
Partitions 和 Replication 配置
可以在創(chuàng)建Topic時指定Partitions數(shù)和Replication Factor,如果需要修改可以通過以下命令修改:
//修改Partitions數(shù)
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic test --partitions 4
//修改Replication Factor
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic test --replication-factor 3
Producer 和 Consumer 使用方法
Producer
import org.apache.kafka.clients.producer.*;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), Integer.toString(i)));
producer.close();
Consumer
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
四、Kafka 高級應(yīng)用
消息的可靠性保證
在 Kafka 中消息的可靠性保證是通過兩種機制來實現(xiàn)的:支持副本機制和 ISR (In-Sync Replicas)列表。
-
支持副本機制
副本機制是指一個主題(Topic)下的分區(qū)(Partition)可以有多個副本,每個副本都存儲了完整的消息,其中一個副本被指定為 leader 副本,其他副本為 follower 副本。當(dāng) producer 發(fā)送消息到某個分區(qū)時,只需要發(fā)送給 leader 副本,leader 副本再將消息分發(fā)給其他 follower 副本,這樣就保證了消息的可靠性。即使某個 follower 副本出現(xiàn)了故障,也不會影響消息的消費,因為其他副本依然存放著完整的消息。 -
ISR (In-Sync Replicas)列表
ISR 列表是指當(dāng)前與 leader 副本保持同步的所有 follower 副本構(gòu)成的列表。當(dāng)某個 follower 副本落后于 leader 副本時,會從 ISR 列表中移除,直到追上 leader 副本后再加入到 ISR 列表中。這個機制保證了 Kafka 集群的高可用性,同時也保證了消息的可靠性。 -
At least once 語義
Kafka 默認(rèn)保證的是 At least once 語義,即 “至少處理一次”,這種語義可以通過消息的重復(fù)消費來保證,但是會帶來處理效率的損失。如果希望保證消息僅被處理一次,可以選擇使用冪等性(Idempotence)或事務(wù)機制。
Kafka Stream
Kafka Stream 是 Kafka 生態(tài)系統(tǒng)中基于流處理模型的一個庫。它充分利用了 Kafka 的優(yōu)點,比如高吞吐、擴展性好、可靠性高等,支持實時的數(shù)據(jù)流處理和批量處理,并且操作符也非常豐富。
-
Stream 流處理模型
Stream 流處理模型是一種將輸入數(shù)據(jù)流轉(zhuǎn)換為輸出數(shù)據(jù)流的模型,可以完成實時的數(shù)據(jù)處理。在 Kafka Stream 中,數(shù)據(jù)流由一個一個記錄(Record)組成,每個記錄由一個鍵(Key)和一個值(Value)構(gòu)成。通過對 Stream 流處理模型的熟練掌握,可以快速開發(fā)出高效、高可靠性的流處理程序。 -
操作符詳解
操作符是 Kafka Stream 中最核心的概念,是用于轉(zhuǎn)換數(shù)據(jù)流的最基本單元。Kafka Stream 提供了豐富的操作符,包括過濾器、映射器、聚合器、分組器等,開發(fā)者可以根據(jù)需要靈活選擇。其中,映射器和聚合器是最常用的操作符,它們可以完成對數(shù)據(jù)流的各種處理和轉(zhuǎn)換。
Kafka Connect
Kafka Connect 是 Kafka 生態(tài)系統(tǒng)中用于將數(shù)據(jù)集成到和從 Kafka 中的工具。它通過 Connector 來實現(xiàn)數(shù)據(jù)的傳輸,Kafka Connect 可以集成各種數(shù)據(jù)源和數(shù)據(jù)目的地,如文件、數(shù)據(jù)庫、消息隊列等。使用 Kafka Connect 可以快速的完成數(shù)據(jù)的導(dǎo)入和導(dǎo)出,并且可以實現(xiàn)數(shù)據(jù)的有效管理和監(jiān)控。
-
Connector 快速入門教程
Kafka Connect 的使用非常簡單,只需要編寫一個 Connector 配置文件,然后啟動 Kafka Connect 進(jìn)程即可。在 Connector 的配置文件中,需要指定數(shù)據(jù)源和數(shù)據(jù)目的地的配置信息,并定義如何從數(shù)據(jù)源中讀取數(shù)據(jù),以及如何將數(shù)據(jù)發(fā)送到數(shù)據(jù)目的地中。 -
實現(xiàn)自定義 Connect
如果 Kafka Connect 自帶的 Connector 不能滿足需求,開發(fā)者還可以自定義 Connector 來實現(xiàn)數(shù)據(jù)的導(dǎo)入和導(dǎo)出。開發(fā)者可以參考 Kafka Connect 源碼中已經(jīng)實現(xiàn)的 Connector 來進(jìn)行開發(fā),并根據(jù)需要完善自己的 Connector 功能。通過自定義 Connector,開發(fā)者可以靈活定制符合自己業(yè)務(wù)需求的數(shù)據(jù)接入方案。
五、Kafka 集群管理
集群環(huán)境的部署
為了部署 Kafka 集群,可以按如下步驟進(jìn)行:
- 確保集群所有節(jié)點的操作系統(tǒng)都是一致的,建議使用 CentOS 7。
- 下載并配置 JDK,Kafka 依賴于 Java 運行環(huán)境。
- 下載 Kafka 安裝包,解壓到指定目錄。
- 修改 Kafka 配置文件
server.properties
,需要注意的配置項包括以下幾個:-
broker.id
:表示當(dāng)前節(jié)點的 ID,必須在所有節(jié)點中唯一。 -
listeners
:用于設(shè)置 Kafka 綁定的地址和端口,其中端口號需要在每個節(jié)點上都是唯一的。建議使用 IP 地址而非主機名作為監(jiān)聽地址。 -
log.dirs
:表示消息日志保存的路徑,建議為每個節(jié)點分別設(shè)置,避免多個節(jié)點共用一個目錄導(dǎo)致數(shù)據(jù)混亂。 -
zookeeper.connect
:表示 ZooKeeper 的連接地址,ZooKeeper 是 Kafka 集群的重要組件。
-
操作和維護(hù)集群
Kafka 集群的運維主要包括以下幾個方面:
監(jiān)控和告警
Kafka 集群應(yīng)該具備完善的監(jiān)控和告警機制,能夠及時檢測和處理集群中的異常情況,防止集群的宕機或數(shù)據(jù)丟失等問題。通常使用開源監(jiān)控系統(tǒng),如 Prometheus、Grafana。
消息備份和恢復(fù)
為了防止消息丟失,Kafka 集群需要配置合適的備份策略,保證消息能夠在系統(tǒng)故障或數(shù)據(jù)中心故障時依然可用。具體可以采用多副本備份策略或異地多活等方式來備份數(shù)據(jù),也可以使用相關(guān)的數(shù)據(jù)備份工具。
熱點問題處理
如果集群出現(xiàn)消費熱點問題,需要及時排查,可以使用 Kafka 自帶的 Consumer Lag 工具或第三方工具進(jìn)行分析,找出出現(xiàn)熱點的原因并制定相應(yīng)的解決方案。
集群擴容和縮容
當(dāng) Kafka 集群無法滿足業(yè)務(wù)需求或需要優(yōu)化性能時,我們可能需要對集群進(jìn)行擴容或縮容操作。
擴容操作
擴容可通過增加節(jié)點數(shù)量和調(diào)整多個配置項來進(jìn)行:
- 增加節(jié)點數(shù)量:新增節(jié)點需要與集群中的其它節(jié)點具有相同的環(huán)境配置,包括操作系統(tǒng)和 Java 版本等。新增節(jié)點后需要更新
server.properties
文件,并重啟 Kafka 進(jìn)程才能讓新節(jié)點生效。同時需要重新分配分區(qū)并執(zhí)行數(shù)據(jù)遷移。 - 調(diào)整多個配置項:可以通過調(diào)整消息生產(chǎn)和消費的吞吐量、擴容 Broker 的資源、增加副本數(shù)等一系列操作來提升 Kafka 集群的性能。
縮容操作
縮容可通過減少節(jié)點數(shù)量和刪除多個配置項來進(jìn)行:
- 減少節(jié)點數(shù)量:需要首先確認(rèn)是否有冗余的節(jié)點存在,如果存在冗余節(jié)點可以將其停機或從集群中移除。同時需要更新
server.properties
文件,并重啟 Kafka 進(jìn)程才能讓縮容生效。需要注意的是,在進(jìn)行節(jié)點縮容時需要重新分配分區(qū)和執(zhí)行數(shù)據(jù)遷移。 - 刪除多個配置項:可以通過調(diào)整消息保留時間、削弱單個 Broker 的吞吐量等一系列操作來縮小 Kafka 集群的規(guī)模。
在進(jìn)行擴容和縮容操作前,需要通過合適的監(jiān)控工具了解當(dāng)前集群的狀態(tài)和性能表現(xiàn),根據(jù)實際需求進(jìn)行配置和調(diào)整。同時使用備份策略,確保數(shù)據(jù)的完整性和可用性。
六、應(yīng)用案例
日志收集
Kafka 作為一個分布式的消息隊列,其在日志收集方面能夠做到高效、可靠且低延遲的處理。以下是一個簡單的 Java 代碼示例,用于將系統(tǒng)日志發(fā)送到 Kafka 集群中:
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaLogProducer {
private final KafkaProducer<String, String> producer;
private final String topic;
public KafkaLogProducer(String brokers, String topic) {
Properties prop = new Properties();
// 配置 Kafka 集群地址
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
// 配置 key 和 value 的序列化器
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
this.producer = new KafkaProducer(prop);
this.topic = topic;
}
public void sendLog(String message) {
producer.send(new ProducerRecord<>(topic, message));
}
public void close() {
producer.close();
}
}
數(shù)據(jù)同步
Kafka 除了可以作為日志收集的工具之外,還可以用于數(shù)據(jù)同步。使用 Kafka 可以將數(shù)據(jù)從一個系統(tǒng)復(fù)制到另一個系統(tǒng),并且可以實現(xiàn)異步和批量處理。以下是一個簡單的 Java 代碼示例,用于把數(shù)據(jù)從源數(shù)據(jù)庫同步到目標(biāo)數(shù)據(jù)庫:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import java.sql.*;
import java.util.Properties;
public class KafkaDataSync {
private final KafkaConsumer<String, String> consumer;
private final KafkaProducer<String, String> producer;
private final String sourceTopic;
private final String targetTopic;
public KafkaDataSync(String brokers, String sourceTopic, String targetTopic) {
Properties prop = new Properties();
// 配置 Kafka 集群地址
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
// 配置 key 和 value 的序列化器
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
this.producer = new KafkaProducer(prop);
// 配置消費者組
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer(props);
consumer.subscribe(Arrays.asList(sourceTopic));
this.sourceTopic = sourceTopic;
this.targetTopic = targetTopic;
}
public void start() throws SQLException {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
String message = record.value();
// 將數(shù)據(jù)解析并同步到目標(biāo)數(shù)據(jù)庫
syncData(message);
}
}
}
public void close() {
consumer.close();
producer.close();
}
private void syncData(String message) {
// 數(shù)據(jù)同步邏輯代碼
// ...
// 將同步后的數(shù)據(jù)發(fā)送到目標(biāo) Kafka Topic 中
producer.send(new ProducerRecord<>(targetTopic, message));
}
}
實時處理
Kafka 作為一個分布式流處理平臺,具有強大的實時處理能力??梢灾С侄喾N實時計算框架和處理引擎,例如 Apache Storm、Apache Flink 和 Apache Spark 等。以下是一個簡單的 Kafka 流處理代碼示例,用于統(tǒng)計指定時間范圍內(nèi)的日志數(shù)量:
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.util.Properties;
public class KafkaStreamProcessor {
public static void main(String[] args) {
Properties props = new Properties();
// 配置 Kafka 集群地址
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 配置 key 和 value 的序列化器和反序列化器
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> messages = builder.stream("logs");
// 統(tǒng)計指定時間范圍內(nèi)的日志數(shù)量
KTable<Windowed<String>, Long> logsCount = messages
.mapValues(log -> 1)
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count();
logsCount.toStream().foreach((key, value) -> System.out.println(key.toString() + " -> " + value));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
七、優(yōu)化調(diào)優(yōu)
性能指標(biāo)優(yōu)化
Kafka 集群的性能受多種因素影響,為了提高 Kafka 集群的性能,需要關(guān)注以下幾個重要的性能指標(biāo):
- 消息吞吐量(Message throughput):指 Kafka 集群每秒能夠處理的消息數(shù)量,這取決于硬件配置、網(wǎng)絡(luò)和磁盤速度、消息大小和復(fù)雜度等因素。
- 延遲(Latency):指消息從生產(chǎn)者發(fā)送到被消費者接收到的時間間隔,這主要取決于網(wǎng)絡(luò)延遲和磁盤 I/O 性能。
- 磁盤使用率(Disk utilization):指 Kafka 集群磁盤空間使用情況,如果磁盤使用率過高,可能會導(dǎo)致性能下降甚至堆積。
- 網(wǎng)絡(luò)帶寬(Network bandwidth):指 Kafka 集群節(jié)點之間的網(wǎng)絡(luò)傳輸速度,如果帶寬不足,可能會限制消息吞吐量和延遲。
參數(shù)配置優(yōu)化
Kafka 集群的性能受多個參數(shù)的影響,為了優(yōu)化 Kafka 集群的性能,需要考慮以下幾個關(guān)鍵參數(shù):文章來源:http://www.zghlxwxcb.cn/news/detail-612049.html
- 分區(qū)數(shù)量(number of partitions):分區(qū)數(shù)對于 Kafka 集群的性能至關(guān)重要,它決定了消息并行處理的能力。在平衡并行處理和分布式存儲之間做出權(quán)衡是至關(guān)重要的。
- 復(fù)制因子(replication factor):Kafka 提供了副本機制來保證數(shù)據(jù)的可靠性,增加副本機制可以提高容錯能力,但也會增加網(wǎng)絡(luò)負(fù)載和磁盤使用率。副本因子的選擇應(yīng)該根據(jù)數(shù)據(jù)的關(guān)鍵程度和集群的需求進(jìn)行調(diào)整。
- 批量大?。╞atch size):批量發(fā)送和接收消息是優(yōu)化 Kafka 吞吐量的一個重要方法。較大的批量大小可以減少網(wǎng)絡(luò)傳輸和 I/O 操作的數(shù)量,從而提高吞吐量。同時,較大的批量大小也會使得消息的延遲增大,需要做好權(quán)衡。
- 最大連接數(shù)(maximum connections):Kafka 服務(wù)器使用一次處理一個連接的方式,因此連接上限對于 Kafka 集群性能而言非常重要。過多的連接可能會導(dǎo)致服務(wù)器資源不足,從而造成性能的下降。
架構(gòu)設(shè)計優(yōu)化
為了進(jìn)一步提高 Kafka 集群的性能和可靠性,需要對集群的系統(tǒng)架構(gòu)進(jìn)行優(yōu)化。以下是一些常用的系統(tǒng)架構(gòu)優(yōu)化方法:文章來源地址http://www.zghlxwxcb.cn/news/detail-612049.html
- 添加緩存層(Add a caching layer):使用緩存將頻繁訪問的數(shù)據(jù)存儲到內(nèi)存中,可以減少 I/O 負(fù)載,加速數(shù)據(jù)訪問。
- 數(shù)據(jù)壓縮(Use data compression):在 Kafka 集群中使用消息壓縮算法,可以大幅減少網(wǎng)絡(luò)傳輸和磁盤寫入。
- 垂直擴展和水平擴展(Vertical and horizontal scaling):通過增加節(jié)點或者增加機器來擴展 Kafka 集群的規(guī)模,從而提高其性能和容錯能力。
- 異地多活(Geo-replication):將多個 Kafka 集群分布在不同地理位置,通過異地多活技術(shù)實現(xiàn)數(shù)據(jù)冗余,提高數(shù)據(jù)的可用性。
到了這里,關(guān)于Apache Kafka 入門教程的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!