1.概述
1.1 消息隊(duì)列
目 前 企 業(yè) 中 比 較 常 見(jiàn) 的 消 息 隊(duì) 列 產(chǎn) 品 主 要 有
- Kafka(在大數(shù)據(jù)場(chǎng)景主要采用 Kafka 作為消息隊(duì)列。)
- ActiveMQ
- RabbitMQ
- RocketMQ
1.1.1 傳統(tǒng)消息隊(duì)列的應(yīng)用場(chǎng)景
傳統(tǒng)的消息隊(duì)列的主要應(yīng)用場(chǎng)景包括:緩存/消峰、解耦和異步通信。
- 緩沖/消峰:
有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過(guò)系統(tǒng)的速度,解決生產(chǎn)消息和消費(fèi)消息的處理速度不一致的情況。 - 解耦:
允許你獨(dú)立的擴(kuò)展或修改兩邊的處理過(guò)程,只要確保它們遵守同樣的接口約束。 - 異步通信:
允許用戶把一個(gè)消息放入隊(duì)列,但并不立即處理它,然后在需要的時(shí)候再去處理它們。
1.1.2 消息隊(duì)列的兩種模式
- 點(diǎn)對(duì)點(diǎn)模式
- 發(fā)布/訂閱模式
一、點(diǎn)對(duì)點(diǎn)模式
消費(fèi)者主動(dòng)拉取數(shù)據(jù),消息收到后清除消息
二、發(fā)布/訂閱模式
- 可以有多個(gè)topic主題(瀏覽、點(diǎn)贊、收藏、評(píng)論等)
- 消費(fèi)者消費(fèi)數(shù)據(jù)之后,不刪除數(shù)據(jù)
- 每個(gè)消費(fèi)者相互獨(dú)立,都可以消費(fèi)到數(shù)據(jù)
1.2 kafka基礎(chǔ)結(jié)構(gòu)
- Producer:消息生產(chǎn)者,就是向 Kafka broker 發(fā)消息的客戶端。
- Consumer:消息消費(fèi)者,向 Kafka broker 取消息的客戶端。
- Consumer Group(CG):消費(fèi)者組,由多個(gè) consumer 組成。
消費(fèi)者組內(nèi)每個(gè)消費(fèi)者負(fù)責(zé)消費(fèi)不同分區(qū)的數(shù)據(jù),一個(gè)分區(qū)只能由一個(gè)組內(nèi)消費(fèi)者消費(fèi);消費(fèi)者組之間互不影響。所有的消費(fèi)者都屬于某個(gè)消費(fèi)者組,即消費(fèi)者組是邏輯上的一個(gè)訂閱者。 - Broker:一臺(tái) Kafka 服務(wù)器就是一個(gè) broker。一個(gè)集群由多個(gè) broker 組成。一個(gè)broker 可以容納多個(gè) topic。
- Topic:可以理解為一個(gè)隊(duì)列,生產(chǎn)者和消費(fèi)者面向的都是一個(gè) topic。
- Partition:為了實(shí)現(xiàn)擴(kuò)展性,一個(gè)非常大的 topic 可以分布到多個(gè) broker(即服務(wù)器)上,一個(gè) topic 可以分為多個(gè) partition,每個(gè) partition 是一個(gè)有序的隊(duì)列。
- Replica:副本。一個(gè) topic 的每個(gè)分區(qū)都有若干個(gè)副本,一個(gè) Leader 和若干個(gè)Follower。
- Leader:每個(gè)分區(qū)多個(gè)副本的“主”,生產(chǎn)者發(fā)送數(shù)據(jù)的對(duì)象,以及消費(fèi)者消費(fèi)數(shù)據(jù)的對(duì)象都是 Leader。
- Follower:每個(gè)分區(qū)多個(gè)副本中的“從”,實(shí)時(shí)從 Leader 中同步數(shù)據(jù),保持和Leader 數(shù)據(jù)的同步。Leader 發(fā)生故障時(shí),某個(gè) Follower 會(huì)成為新的 Leader。
2.kafka的快速入門(mén)
2.1 集群部署
2.1.1 安裝java
一、上傳并解壓安裝包
1.將資料中的jdk-8u361-linux-x64.tar.gz上傳到/usr/local/java目錄下
mkdir /usr/local/java/
tar -zxvf jdk-8u361-linux-x64.tar.gz
二、配置環(huán)境變量
export JAVA_HOME=/usr/local/java/jdk1.8.0_361
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH
三、使環(huán)境變量生效
source /etc/profile
四、檢查java版本
java -version
2.1.2 部署zookeeper集群
一、上傳并解壓安裝包
1.將資料中的apache-zookeeper-3.7.1-bin.tar.gz上傳到/usr/local目錄下
cd /usr/local
2.解壓安裝包到本地文件夾
tar -zxvf apache-zookeeper-3.7.1-bin.tar.gz
3.修改文件名稱
mv apache-zookeeper-3.7.1-bin/ zookeeper
二、修改配置
1.進(jìn)入zookeeper目錄下的conf目錄,將目錄中的zoo_sample.cfg改成zoo.cfg
cd zookeeper/conf/
mv zoo_sample.cfg zoo.cfg
2.在zookeeper目錄下新建一個(gè)zkData文件夾, 作為數(shù)據(jù)文件存儲(chǔ)目錄
cd /usr/local/zookeeper/
mkdir zkData
3.回到zoo.cfg, 將dataDir的路徑換成我們剛剛新建的zkData的路徑
dataDir=/usr/local/zookeeper/zkData
三、啟動(dòng)和關(guān)閉
進(jìn)入bin目錄下,啟動(dòng)服務(wù)端
./zkServer.sh start
進(jìn)入bin目錄下,關(guān)閉服務(wù)端
./zkServer.sh stop
四、集群部署_生成myid
1.在/zkData下新建myid文件, 并寫(xiě)入對(duì)應(yīng)的server.num中的num數(shù)字
echo 1 > myid
2.依次給其他兩個(gè)服務(wù)器上的zookeeper設(shè)置num,分別是2和3
五、集群部署_配置集群中的各個(gè)節(jié)點(diǎn)
1.回到zoo.cfg, 將其他幾個(gè)節(jié)點(diǎn)配置進(jìn)來(lái)
server.1=192.168.202.128:2888:3888
server.2=192.168.202.130:2888:3888
server.3=192.168.202.131:2888:3888
六、分別啟動(dòng)三臺(tái)zookeeper服務(wù)
進(jìn)入 /bin 路徑下,執(zhí)行如下命令,啟動(dòng) zookeeper 服務(wù):
./zkServer.sh start
七、查看zk集群狀態(tài)
可以看到哪個(gè)zk是leader,哪些是follow
./bin/zkServer.sh status ./conf/zoo.cfg
八、驗(yàn)證是否搭建成功
在命令行中輸入:./zkCli.sh -server 127.0.0.1:2181,即可連接到本機(jī) ZooKeeper 服務(wù)器。其他自動(dòng)實(shí)現(xiàn)同步,客戶端只需要和一臺(tái)保持連接即可。出現(xiàn)如下語(yǔ)句表示鏈接成功:
注意:關(guān)閉其他linux的防火墻
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
[zk: 127.0.0.1:2181(CONNECTED) 0]
2.1.3 部署kafka集群
一、集群規(guī)劃
我們?cè)谌齻€(gè)服務(wù)器上分別部署zk和kafka
linux_1 | linux_2 | linux_3 |
---|---|---|
zk | zk | zk |
kafka | kafka | kafka |
二、上傳并解壓安裝包
1.將資料中的kafka_2.12-3.0.0.tgz上傳到/usr/local目錄下
cd /usr/local
2.解壓安裝包到本地文件夾
tar -zxvf kafka_2.12-3.0.0.tgz
3.修改文件名稱
mv kafka_2.12-3.0.0/ kafka
三、修改kafka配置文件
1.進(jìn)入到kafka 目錄, 修改配置文件
cd config/
vim server.properties
2.修改以下內(nèi)容:
#broker 的全局唯一編號(hào),不能重復(fù),只能是數(shù)字。
broker.id=0
#kafka 運(yùn)行日志(數(shù)據(jù))存放的路徑,路徑不需要提前創(chuàng)建,kafka 自動(dòng)幫你創(chuàng)建,可以
配置多個(gè)磁盤(pán)路徑,路徑與路徑之間可以用","分隔
log.dirs=/usr/local/kafka/datas
# topic 在當(dāng)前broker上的分片個(gè)數(shù),與broker保持一致
num.partitions=3
#配置連接 Zookeeper 集群地址(在 zk 根目錄下創(chuàng)建/kafka,方便管理)
zookeeper.connect=192.168.202.128:2181,192.168.202.130:2181,192.168.202.131:2181
3.依次修改其他節(jié)點(diǎn)的配置, 需要保證broker.id唯一
分別修改另兩個(gè)節(jié)點(diǎn)的broker.id=1、broker.id=2
四、啟動(dòng)集群
1.啟動(dòng)Zookeeper 集群,然后啟動(dòng) Kafka。
2.依次在這三個(gè)節(jié)點(diǎn)上啟動(dòng) Kafka。(在kafka目錄下執(zhí)行)
bin/kafka-server-start.sh -daemon config/server.properties
五、關(guān)閉集群
bin/kafka-server-stop.sh
注意:停止 Kafka 集群時(shí),一定要等 Kafka 所有節(jié)點(diǎn)進(jìn)程全部停止后再停止 Zookeeper集群。因?yàn)?Zookeeper 集群當(dāng)中記錄著 Kafka 集群相關(guān)信息,Zookeeper 集群一旦先停止,Kafka 集群就沒(méi)有辦法再獲取停止進(jìn)程的信息,只能手動(dòng)殺死 Kafka 進(jìn)程了。
Kafka啟動(dòng)報(bào)錯(cuò):The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.
處理方式:
- log.dirs配置的路徑要正確
- 刪除這里面的meta.properties文件
參考文章:Linux 搭建Kafka集群,最新教程,細(xì)到極致
2.2 Kafka命令行操作
2.2.1 主題命令行操作
操作主題命令行參數(shù)
bin/kafka-topics.sh
參數(shù) | 描述 |
---|---|
--bootstrap-server <String: server toconnect to> | 連接的 Kafka Broker 主機(jī)名稱和端口號(hào)。 |
--topic <String: topic> | 操作的 topic 名稱。 |
--create | 創(chuàng)建主題。 |
--delete | 刪除主題。 |
--alter | 修改主題。 |
--list | 查看所有主題。 |
--describe | 查看主題詳細(xì)描述。 |
--partitions <Integer: # of partitions> | 設(shè)置分區(qū)數(shù)。 |
--replication-factor<Integer: replication factor> | 設(shè)置分區(qū)副本。 |
--config <String: name=value> | 更新系統(tǒng)默認(rèn)的配置。 |
一、查看當(dāng)前服務(wù)器中所有topic
# 連接多個(gè)節(jié)點(diǎn)(集群)
bin/kafka-topics.sh --bootstrap-server 192.168.202.128:9092,192.168.202.130:9092,192.168.202.131:9092 --list
二、創(chuàng)建first topic
bin/kafka-topics.sh --bootstrap-server 192.168.202.128:9092 --create --partitions 1 --replication-factor 3 --topic first
- --topic first:代表創(chuàng)建名稱為first的topic
- --partitions 1:代表這個(gè)topic只有一個(gè)分區(qū)
- --replication-factor 3:代表這個(gè)topic有三個(gè)副本
三、查看first主題的詳情
bin/kafka-topics.sh --bootstrap-server 192.168.202.128:9092 --describe --topic first
- Replicas: 2,0,1 代表副本存儲(chǔ)在哪些節(jié)點(diǎn)中, first這個(gè)topic我們?cè)O(shè)置了3個(gè)副本,此處的2,0,1對(duì)應(yīng)的就是這三個(gè)副本存儲(chǔ)的位置(broker.id)
- Isr: 2,0,1 Isr代表同步副本, 此處2,0,1都屬于同步副本(主副本也屬于同步副本)
- Leader: 2 代表主副本是borker.id=2的節(jié)點(diǎn)
四、修改分區(qū)數(shù)
注意:分區(qū)數(shù)只能增加,不能減少
bin/kafka-topics.sh --bootstrap-server 192.168.202.128:9092 --alter --topic first --partitions 3
無(wú)法通過(guò)命令行的方式修改副本
五、刪除topic
bin/kafka-topics.sh --bootstrap-server 192.168.202.128:9092 --delete --topic first
2.2.2 生產(chǎn)者命令行操作
操作生產(chǎn)者命令參數(shù)
bin/kafka-console-producer.sh
參數(shù) | 描述 |
---|---|
-`-bootstrap-server <String: server toconnect to> | 連接的 Kafka Broker 主機(jī)名稱和端口號(hào)。 |
--topic <String: topic> | 操作的 topic 名稱。 |
一、發(fā)送消息
bin/kafka-console-producer.sh --bootstrap-server 192.168.202.128:9092 --topic first
>hello world
>atguigu atguigu
2.2.3 消費(fèi)者命令行操作
操作消費(fèi)者命令參數(shù)
bin/kafka-console-consumer.sh
參數(shù) | 描述 |
---|---|
-`-bootstrap-server <String: server toconnect to> | 連接的 Kafka Broker 主機(jī)名稱和端口號(hào)。 |
--topic <String: topic> | 操作的 topic 名稱。 |
--from-beginning | 從頭開(kāi)始消費(fèi)。 |
--group <String: consumer group id> | 指定消費(fèi)者組名稱。 |
一、消費(fèi)first主題的數(shù)據(jù)
bin/kafka-console-consumer.sh --bootstrap-server 192.168.202.128:9092 --topic first
二、把主題中所有的數(shù)據(jù)都讀取出來(lái)(包括歷史數(shù)據(jù))。
bin/kafka-console-consumer.sh --bootstrap-server 192.168.202.128:9092 --from-beginning --topic first
3 Kafka 生產(chǎn)者
3.1 生產(chǎn)者消息發(fā)送流程
3.1.1 發(fā)送原理
- KafkaProducer將消息封裝成ProducerRecord, 然后通過(guò)攔截鏈, 根據(jù)指定序列化方式進(jìn)行序列化, 其次在分區(qū)器中根據(jù)設(shè)置的分區(qū)策略進(jìn)行數(shù)據(jù)分區(qū),封裝成TopicPartition, TopicPartition中就包含了目標(biāo)partition的信息
- 其后分區(qū)器將消息寫(xiě)入RecordAccumulator進(jìn)行緩沖, RecordAccumulator是一個(gè)雙端隊(duì)列, RecordAccumulator中維護(hù)了一個(gè)ConcurrentMap<TopicPartition, Deque<ProducerBatch>> 類型的集合, 其中的Key是TopicPartition,它用來(lái)標(biāo)識(shí)目標(biāo)partition(消息的最終存儲(chǔ)位置), Value是Deque<ProducerBatch> 隊(duì)列,用來(lái)緩沖發(fā)往目標(biāo) partition 的消息。
- 當(dāng)Deque達(dá)到一定閾值后,就會(huì)喚醒sender線程將消息發(fā)送到kafka集群
3.1.2 生產(chǎn)者重要參數(shù)列表
參數(shù)名稱 | 描述 |
---|---|
bootstrap.servers | 生產(chǎn)者連接集群所需的broker地址清單. 例如hadoop102:9092,hadoop103:9092, 可以設(shè)置1個(gè)或者多個(gè), 中間用逗號(hào)隔開(kāi)。注意這里并非需要所有的 broker 地址,因?yàn)樯a(chǎn)者從給定的 broker里查找到其他 broker 信息 |
key.serializer和value.serializer | 指定發(fā)送消息的 key 和 value 的序列化類型。一定要寫(xiě)全類名。 |
buffer.memory | RecordAccumulator 緩沖區(qū)總大小,默認(rèn) 32m。
|
batch.size | 緩沖區(qū)一批數(shù)據(jù)最大值,默認(rèn)16k。 適當(dāng)增加該值,可以提高吞吐量,但是如果該值設(shè)置太大,會(huì)導(dǎo)致數(shù)據(jù)傳輸延遲增加。 |
linger.ms | 如果數(shù)據(jù)遲遲未達(dá)到 batch.size,sender 等待 linger.time之后就會(huì)發(fā)送數(shù)據(jù)。單位 ms,默認(rèn)值是 0ms, 表示沒(méi)有延遲。生產(chǎn)環(huán)境建議該值大小為 5-100ms 之間。 |
acks | 0:生產(chǎn)者發(fā)送過(guò)來(lái)的數(shù)據(jù),不需要等數(shù)據(jù)落盤(pán)應(yīng)答。1:生產(chǎn)者發(fā)送過(guò)來(lái)的數(shù)據(jù),Leader 收到數(shù)據(jù)后應(yīng)答。-1(all):生產(chǎn)者發(fā)送過(guò)來(lái)的數(shù)據(jù),Leader+和 isr 隊(duì)列里面的所有節(jié)點(diǎn)收齊數(shù)據(jù)后應(yīng)答。默認(rèn)值是-1,-1 和all 是等價(jià)的。
|
max.in.flight.requests.per.connection | 允許最多沒(méi)有返回 ack 的次數(shù),默認(rèn)為 5, 開(kāi)啟冪等性要保證該值是 1-5 的數(shù)字。 |
retries | 當(dāng)消息發(fā)送出現(xiàn)錯(cuò)誤的時(shí)候,系統(tǒng)會(huì)重發(fā)消息。retries表示重試次數(shù)。默認(rèn)是 int 最大值,2147483647。 如果設(shè)置了重試,還想保證消息的有序性,需要設(shè)置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否則在重試此失敗消息的時(shí)候,其他的消息可能發(fā)送成功了。 |
retry.backoff.ms | 兩次重試之間的時(shí)間間隔,默認(rèn)是 100ms。 |
enable.idempotence | 是否開(kāi)啟冪等性,默認(rèn) true, 開(kāi)啟冪等性。 |
compression.type | 生產(chǎn)者發(fā)送的所有數(shù)據(jù)的壓縮方式。默認(rèn)是 none,也就是不壓縮。支持壓縮類型:none、gzip、snappy、lz4 和 zstd。 |
3.2 異步發(fā)送API
3.2.1 普通異步發(fā)送
一、在broker2中開(kāi)啟kafka消費(fèi)者
bin/kafka-console-consumer.sh --bootstrap-server 192.168.202.130:9092 --topic first
二、生產(chǎn)者生產(chǎn)消息
public class CustomProducer {
public static void main(String[] args) {
// 1.創(chuàng)建kafka生產(chǎn)者的配置對(duì)象
Properties properties = new Properties();
// 2.給kafka配置對(duì)象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.202.128:9092");
// 3.key,value 序列化(必須)
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 4.創(chuàng)建kafka生產(chǎn)者對(duì)象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
// 5.調(diào)用send方法, 發(fā)送消息
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i));
}
// 6.關(guān)閉資源
kafkaProducer.close();
}
}
觀察broker2中控制臺(tái)消息
3.2.2 帶回調(diào)函數(shù)的異步發(fā)送
回調(diào)函數(shù)會(huì)在 producer 收到 ack 時(shí)調(diào)用,為異步調(diào)用,該方法有兩個(gè)參數(shù),分別是元數(shù)據(jù)信息(RecordMetadata)和異常信息(Exception),如果 Exception 為 null,說(shuō)明消息發(fā)送成功,如果 Exception 不為 null,說(shuō)明消息發(fā)送失敗。
callback只有在收到ack時(shí)調(diào)用, 它可以幫我們返回:主題、分區(qū)等消息, 例如我們想看看消息最終存儲(chǔ)到哪個(gè)topic的哪個(gè)分區(qū)中
public class CustomProducerCallback {
public static void main(String[] args) throws InterruptedException {
// 1.創(chuàng)建kafka生產(chǎn)者的配置對(duì)象
Properties properties = new Properties();
// 2.給kafka配置對(duì)象添加配置信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.202.128:9092");
// 3.key,value序列化(必須):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 4.創(chuàng)建kafka生產(chǎn)者對(duì)象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
// 5.調(diào)用send方法,發(fā)送消息
for (int i = 0; i < 5; i++) {
// 添加回調(diào)
kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
if (exception == null) {
// 沒(méi)有異常, 輸出消息到控制臺(tái)
System.out.println("主題:" + recordMetadata.topic() + "->" + "分區(qū):" + recordMetadata.partition());
} else {
// 出現(xiàn)異常打印
exception.printStackTrace();
}
}
});
// 延遲一會(huì)會(huì)看到數(shù)據(jù)發(fā)往不同分區(qū)
Thread.sleep(2);
}
// 6.關(guān)閉資源
kafkaProducer.close();
}
}
觀察控制臺(tái)
3.3 同步發(fā)送API
只需在異步發(fā)送的基礎(chǔ)上,再調(diào)用一下 get()方法即可。
public class CustomProducerSync {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1.創(chuàng)建kafka生產(chǎn)者的配置對(duì)象
Properties properties = new Properties();
// 2.給kafka配置對(duì)象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.202.128:9092");
// 3.key,value 序列化(必須)
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 4.創(chuàng)建kafka生產(chǎn)者對(duì)象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
// 5.調(diào)用send方法, 發(fā)送消息
for (int i = 0; i < 10; i++) {
// 異步發(fā)送
kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i));
// 同步發(fā)送
kafkaProducer.send(new ProducerRecord<>("first", "kafka" + i)).get();
}
// 6.關(guān)閉資源
kafkaProducer.close();
}
}
3.4 生產(chǎn)者分區(qū)
3.4.1 分區(qū)好處
- 便于合理使用存儲(chǔ)資源
- 提供并行速度
3.4.2 生產(chǎn)者發(fā)送消息的分區(qū)策略
默認(rèn)的分區(qū)器DefaultPartitioner
在在 IDEA 中 ctrl +n,全局查找 DefaultPartitioner。
案例一:將數(shù)據(jù)發(fā)往指定 partition
觀察kafka控制臺(tái)中是否接收到消息。
在 IDEA 控制臺(tái)觀察回調(diào)信息。
案例二:沒(méi)有指明 partition 值但有 key 的情況下
將 key 的 hash 值與 topic 的 partition 數(shù)進(jìn)行取余得到 partition 值。
1.key="a"時(shí),在控制臺(tái)查看結(jié)果。
主題:first->分區(qū):1
主題:first->分區(qū):1
主題:first->分區(qū):1
主題:first->分區(qū):1
主題:first->分區(qū):1
2.key="b"時(shí),在控制臺(tái)查看結(jié)果。
主題:first->分區(qū):2
主題:first->分區(qū):2
主題:first->分區(qū):2
主題:first->分區(qū):2
主題:first->分區(qū):2
3.key="f"時(shí),在控制臺(tái)查看結(jié)果。
主題:first->分區(qū):0
主題:first->分區(qū):0
主題:first->分區(qū):0
主題:first->分區(qū):0
主題:first->分區(qū):0
3.4.3 自定義分區(qū)器
如果研發(fā)人員可以根據(jù)企業(yè)需求,自己重新實(shí)現(xiàn)分區(qū)器。
例如我們實(shí)現(xiàn)一個(gè)分區(qū)器實(shí)現(xiàn),發(fā)送過(guò)來(lái)的數(shù)據(jù)中如果包含 atguigu,就發(fā)往 0 號(hào)分區(qū),
不包含 atguigu,就發(fā)往 1 號(hào)分區(qū)。
一、定義自定義分區(qū)器
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 獲取消息
String msgValue = value.toString();
// 創(chuàng)建 partition
int partition;
// 判斷消息是否包含atguigu
if (msgValue.contains("atguigu")) {
partition = 0;
} else {
partition = 1;
}
// 返回分區(qū)號(hào)
return partition;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
二、使用自定義分區(qū)器
// 添加自定義分區(qū)器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.atguigu.kafka.producer.MyPartitioner");
3.5 生產(chǎn)經(jīng)驗(yàn)—生產(chǎn)者如何提高吞吐量
Deque達(dá)到一定閾值后,就會(huì)喚醒sender線程將消息發(fā)送到kafka集群, 這個(gè)閾值受兩個(gè)參數(shù)影響
- batch.size:批次大小, 默認(rèn)16K
當(dāng)Deque中的積壓的消息達(dá)到16K后, 就會(huì)喚醒sender線程將消息打包發(fā)送到同一個(gè)分區(qū) - linger.ms:等待時(shí)間,默認(rèn)為0
如果Deque中的消息一直沒(méi)有達(dá)到16K, 此時(shí)會(huì)根據(jù)linger.ms設(shè)置的時(shí)間,比如設(shè)置了1秒, 那么到了這個(gè)時(shí)間(上一個(gè)批次的消息發(fā)送完成后開(kāi)始計(jì)時(shí)),即使數(shù)據(jù)沒(méi)有達(dá)到16K, 也會(huì)喚醒sender線程發(fā)送消息
一、linger.ms=0產(chǎn)生的問(wèn)題
因?yàn)閘inger.ms默認(rèn)為0, 所以來(lái)一個(gè)消息就會(huì)喚醒sender來(lái)發(fā)送消息, 這樣的效率并不高(會(huì)頻繁開(kāi)啟線程發(fā)送消息), 為了提高拉取速度的能力, 我們希望一次能發(fā)送很多消息
所以在生產(chǎn)環(huán)境中, 我們一般會(huì)修改linger.ms的值, 改為5~100ms, 而batch.size使用默認(rèn)值即可
注意點(diǎn):不能將batch.size和linger.ms設(shè)置的很大, 這樣每批次消息的發(fā)送時(shí)間間隔就會(huì)很大(延遲過(guò)大)
// batch.size:批次大小,默認(rèn) 16K
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
// linger.ms:等待時(shí)間,默認(rèn) 0
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// RecordAccumulator:緩沖區(qū)大小,默認(rèn) 32M:buffer.memory
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
// compression.type:壓縮,默認(rèn) none,可配置值 gzip、snappy、lz4 和 zstd
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
3.6 生產(chǎn)經(jīng)驗(yàn)—數(shù)據(jù)可靠性
為了保證producer發(fā)送的數(shù)據(jù), 能可靠的發(fā)送到指定的partition, kafka為producer提供了消息確認(rèn)機(jī)制(ack)
3.6.1 消息確認(rèn)機(jī)制(ack)
- ack = 0
- ack = 1
- ack = -1或者all
一、ack=0
生產(chǎn)者發(fā)送消息后, 不需要等待任何來(lái)自服務(wù)器的響應(yīng)
- 優(yōu)點(diǎn):
- 1.不需要等待服務(wù)器的響應(yīng), 所以可以以網(wǎng)絡(luò)能夠支持的最大速度發(fā)送消息, 從而達(dá)到很高的吞吐量
- 2.producer不管發(fā)送成不成功,只發(fā)送一次就不再發(fā)送了, 至少保證消息不會(huì)被重復(fù)消費(fèi)
- 缺點(diǎn):
- 如果當(dāng)中出現(xiàn)問(wèn)題,導(dǎo)致服務(wù)器沒(méi)有收到消息, 沒(méi)有落盤(pán)到partition,生產(chǎn)者無(wú)從得知,會(huì)造成消息丟失
二、ack=1
生產(chǎn)者發(fā)送消息后, 等待分區(qū)的leader收到數(shù)據(jù)后應(yīng)答
- leader節(jié)點(diǎn)收到了消息, 生產(chǎn)者就會(huì)收到服務(wù)器的成功響應(yīng).(代表消息發(fā)送成功)
- leader節(jié)點(diǎn)沒(méi)有收到消息, 生產(chǎn)者就會(huì)收到服務(wù)器的錯(cuò)誤響應(yīng),為了避免數(shù)據(jù)丟失, 生產(chǎn)者會(huì)重發(fā)消息
存在的問(wèn)題:如果leader落盤(pán)成功了, 向producer也收到了成功響應(yīng), 但是還沒(méi)來(lái)得及將消息同步副本(follower), 此時(shí)leader掛了, 此時(shí)服務(wù)器會(huì)從follower中推選新的leader, 新的leader并沒(méi)有同步消息, 而producer也不會(huì)再發(fā)了, 此時(shí)消息就丟失了
三、ack=-1或者all
生產(chǎn)者發(fā)送過(guò)來(lái)的數(shù)據(jù), leader和ISR隊(duì)列里面的所有節(jié)點(diǎn)都落盤(pán)成功后, 進(jìn)行應(yīng)答
producer只有收到分區(qū)中所有副本的成功寫(xiě)入通知來(lái)認(rèn)為推送消息成功了
3.6.2 ISR機(jī)制
思考:如果leader在同步數(shù)據(jù)時(shí), 有一個(gè)follow掛了, 遲遲不能與leader進(jìn)行同步, 這個(gè)問(wèn)題怎么解決?
一、ISR的概念
leader維護(hù)了一個(gè)動(dòng)態(tài)的in-sync replica set(ISR), 意指跟leader保持同步的follower + leader集合, 例如我們之前看到的(leader:0, isr:0,1,2)
二、ISR的剔除
如果Follower長(zhǎng)時(shí)間未向Leader發(fā)送通信請(qǐng)求或同步數(shù)據(jù),則該Follower將被踢出ISR。該時(shí)間閾值由replica.lag.time.max.ms
參數(shù)設(shè)定,默認(rèn)30s。例如2超時(shí)后,會(huì)顯示(leader:0, isr:0,1)。
這樣就不用等長(zhǎng)期聯(lián)系不上或者已經(jīng)故障的節(jié)點(diǎn)
3.6.3 數(shù)據(jù)完全可靠條件
如果分區(qū)的副本設(shè)置為1(即只有l(wèi)eader沒(méi)有follower), 或者ISR中應(yīng)答的最小副本數(shù)量(min.insync.replicas 默認(rèn)為1)設(shè)置為1, 這種情況下就和ack=1時(shí)效果是一樣的, 存在數(shù)據(jù)丟失問(wèn)題(leader:0, isr:0)
數(shù)據(jù)完全可靠條件
- 分區(qū)副本大于等于2(除了leader以外, 存在至少一個(gè)follow副本)
- ACK級(jí)別設(shè)置為-1(保證ISR中所有節(jié)點(diǎn)都存入消息)
- min.insync.replicas >=2
ISR里應(yīng)答的最小副本數(shù)量大于等于2(ISR中的數(shù)量至少有兩個(gè), 否則broker不處理這條消息, 并直接給生產(chǎn)者報(bào)錯(cuò))
min.insync.replicas = n,代表的語(yǔ)義是,如果生產(chǎn)者acks=all,而在發(fā)送消息時(shí),Broker的ISR數(shù)量沒(méi)有達(dá)到n,Broker不能處理這條消息,需要直接給生產(chǎn)者報(bào)錯(cuò)。
3.6.4 可靠性總結(jié)
- acks=0,生產(chǎn)者發(fā)送過(guò)來(lái)數(shù)據(jù)就不管了,可靠性差,效率高;
- acks=1,生產(chǎn)者發(fā)送過(guò)來(lái)數(shù)據(jù)Leader應(yīng)答,可靠性中等,效率中等;
- acks=-1,生產(chǎn)者發(fā)送過(guò)來(lái)數(shù)據(jù)Leader和ISR隊(duì)列里面所有Follwer應(yīng)答,可靠性高,效率低;
生產(chǎn)環(huán)境的使用場(chǎng)景:
- acks=0很少使用;
- acks=1,一般用于傳輸普通日志,允許丟個(gè)別數(shù)據(jù)
- acks=-1或者all,一般用于傳輸和錢(qián)相關(guān)的數(shù)據(jù),對(duì)可靠性要求比較高的場(chǎng)景。
3.6.5 生產(chǎn)者重復(fù)發(fā)送消息
消息重復(fù)存在幾個(gè)場(chǎng)景
- 生產(chǎn)端:服務(wù)器響應(yīng)失敗后, 基本的解決措施就是重發(fā)消息
- 消費(fèi)端: poll 一批數(shù)據(jù),處理完畢還沒(méi)提交 offset ,機(jī)子宕機(jī)重啟了,又會(huì) poll 上批數(shù)據(jù),再度消費(fèi)就造成了消息重復(fù)。
一、生產(chǎn)端重復(fù)發(fā)送消息的一種情況
Leader收到數(shù)據(jù)后, 將數(shù)據(jù)落盤(pán), 并將數(shù)據(jù)同步到follower, 此時(shí)在給Producer應(yīng)答時(shí)Leader宕機(jī)了, 此時(shí)Producer就會(huì)收到服務(wù)器傳來(lái)的響應(yīng)失敗, 重新發(fā)送消息, 服務(wù)器會(huì)重新挑選一個(gè)follower成為leader, 而這個(gè)新的leader其實(shí)已經(jīng)落盤(pán)了消息
3.7 生產(chǎn)經(jīng)驗(yàn)—數(shù)據(jù)去重
3.7.1 數(shù)據(jù)傳遞語(yǔ)義
-
至少一次(At Least Once)
-
什么是至少一次
: 生產(chǎn)者發(fā)送到kafka集群, 至少kafka集群能收到一次數(shù)據(jù) -
如何保證至少一次
: ACK級(jí)別設(shè)置為-1或者all + 分區(qū)副本大于等于2 + ISR里應(yīng)答的最小副本數(shù)量大于等于2 -
至少一次會(huì)產(chǎn)生的問(wèn)題
: kafka集群重復(fù)收到數(shù)據(jù)的問(wèn)題, 即可以保證數(shù)據(jù)不丟失,但是不能保證數(shù)據(jù)不重復(fù)
-
什么是至少一次
-
最多一次(At Most Once)
-
什么是最多一次
: 生產(chǎn)者發(fā)送到kafka集群, 不論成功與否, 只會(huì)發(fā)送一次 -
如何保證最多一次
: ACK級(jí)別設(shè)置為0 -
最多一次會(huì)產(chǎn)生的問(wèn)題
: 無(wú)法保證數(shù)據(jù)是否落盤(pán), 即可以保證數(shù)據(jù)不重復(fù),但是不能保證數(shù)據(jù)不丟失
-
什么是最多一次
-
精確一次(Exactly Once)
-
什么是精確一次
: 數(shù)據(jù)既不會(huì)丟失, 也不會(huì)重復(fù)發(fā)送 -
如何保證精確一次
: Kafka 0.11版本以后,引入了一項(xiàng)重大特性:冪等性
和事務(wù)
, 通過(guò)這兩點(diǎn)來(lái)保證嚴(yán)格一次
-
什么是精確一次
3.7.2 冪等性
冪等性就是指Producer不論向Broker發(fā)送多少次重復(fù)數(shù)據(jù),Broker端都只會(huì)持久化一條,保證了不重復(fù)。
一、冪等性原理
精確一次(Exactly Once) = 冪等性 + 至少一次(ack=-1或者all + 分區(qū)副本數(shù)>=2 + ISR最小副本數(shù)量>=2)
二、重復(fù)數(shù)據(jù)的判斷標(biāo)準(zhǔn)
一個(gè)消息會(huì)被封裝成TopicPartition, TopicPartition中記錄了以下幾個(gè)信息:PID、Partition、SeqNumber; 重復(fù)數(shù)據(jù)的判斷標(biāo)準(zhǔn):具有PID, Partition, SeqNumber相同主鍵的消息提交時(shí),Broker只會(huì)持久化一條。
- PID是每個(gè)Producer在初始化時(shí)分配的一個(gè)唯一ID, 對(duì)于一個(gè)PID來(lái)說(shuō), Sequence Number是從0開(kāi)始自增
- Partition 表示分區(qū)的標(biāo)識(shí)
- Sequence Number是Producer在發(fā)送消息時(shí), 會(huì)給每一條消息標(biāo)識(shí)Sequence Number, 同一條消息被重復(fù)發(fā)送時(shí), Sequence Number是不會(huì)遞增
三、冪等性的條件
- 只能保證Producer在單個(gè)會(huì)話內(nèi)不丟不重, 如果producer出現(xiàn)意外掛掉了再重啟是無(wú)法保證冪等性, 因?yàn)镻ID已經(jīng)改變了(單會(huì)話)
- 冪等性無(wú)法跨域多個(gè)topic-partition, 只能保證單個(gè)partition內(nèi)的冪等性(單分區(qū))
所以冪等性只能保證的是在單分區(qū)單會(huì)話內(nèi)不重復(fù)。
四、如何使用冪等性
enable.idempotence被設(shè)置成true后, Producer自動(dòng)升級(jí)成冪等性Producer,其他所有的代碼邏輯都不需要改變。(enable.idempotence默認(rèn)為true, 不需要手動(dòng)開(kāi)啟)
properties.put(“enable.idempotence”, ture)
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)。
3.7.3 生產(chǎn)者事務(wù)
開(kāi)啟事務(wù), 必須開(kāi)啟冪等性(即enable.idempotence設(shè)置為true), 不需要保證精確一次(冪等性 + 至少一次)
一、生產(chǎn)者事務(wù)的概念
Kafka事務(wù)是2017年Kafka 0.11.0.0引入的新特性。類似于數(shù)據(jù)庫(kù)的事務(wù)。Kafka事務(wù)指的是消費(fèi)者提交以及生產(chǎn)者生產(chǎn)消息offset的操作可以在一個(gè)原子操作中,要么都成功,要么都失敗。
二、事務(wù)操作的API
// 1 初始化事務(wù)
void initTransactions();
// 2 開(kāi)啟事務(wù)
void beginTransaction() throws ProducerFencedException;
// 3 在事務(wù)內(nèi)提交已經(jīng)消費(fèi)的偏移量(主要用于消費(fèi)者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException;
// 4 提交事務(wù)
void commitTransaction() throws ProducerFencedException;
// 5 放棄事務(wù)(類似于回滾事務(wù)的操作)
void abortTransaction() throws ProducerFencedException;
三、事務(wù)編程流程
- 1.設(shè)置事務(wù)id
- 2.初始化事務(wù)
- 3.開(kāi)啟事務(wù)
- 4.運(yùn)行結(jié)束 > 提交事務(wù)
- 5.運(yùn)行失敗 > 回滾事務(wù)
3.8 生產(chǎn)經(jīng)驗(yàn)—數(shù)據(jù)有序
kafka只能保證單分區(qū)數(shù)據(jù)有序, 多分區(qū)時(shí), 分區(qū)與分區(qū)間無(wú)序
3.9 生產(chǎn)經(jīng)驗(yàn)—數(shù)據(jù)亂序
- kafka在1.x版本之前保證數(shù)據(jù)單分區(qū)有序,條件如下:
max.in.flight.requests.per.connection=1
(不需要考慮是否開(kāi)啟冪等性)。 - kafka在1.x及以后版本保證數(shù)據(jù)單分區(qū)有序,條件如下:
- 未開(kāi)啟冪等性
max.in.flight.requests.per.connection
需要設(shè)置為1。 - 開(kāi)啟冪等性
max.in.flight.requests.per.connection
需要設(shè)置小于等于5。
原因說(shuō)明:因?yàn)樵趉afka1.x以后,啟用冪等后,kafka服務(wù)端會(huì)緩存producer發(fā)來(lái)的最近5個(gè)request的元數(shù)據(jù),故無(wú)論如何,都可以保證最近5個(gè)request的數(shù)據(jù)都是有序的。(原理是使用了冪等性的Sequence Number, 連續(xù)5個(gè)消息會(huì)自動(dòng)根據(jù)Sequence Number排序)
- 未開(kāi)啟冪等性
4.Kafka Broker
4.1 Kafka Broker工作流程
4.1.1 Zookeeper存儲(chǔ)的 Kafka 信息
一、啟動(dòng)zookeeper客戶端
bin/zkCli.sh
二、通過(guò) ls 命令可以查看 kafka 相關(guān)信息。
ls /kafka
4.1.2 Kafka Broker總體工作流程
一、controller的概念
在Kafka集群中,某個(gè)Broker將被選舉出來(lái)?yè)?dān)任一種特殊的角色,其用于管理和協(xié)調(diào)Kafka集群,即管理集群中的所有分區(qū)的狀態(tài)并執(zhí)行相應(yīng)的管理操作。每個(gè)Kafka集群任意時(shí)刻都只能有一個(gè)Controller。當(dāng)集群?jiǎn)?dòng)時(shí),所有Broker都參與Controller的競(jìng)選,最終有一個(gè)勝出,一旦Controller在某個(gè)時(shí)刻崩潰,集群中的其他的Broker會(huì)收到通知,然后開(kāi)啟新一輪的Controller選舉,新選舉出來(lái)的Controller將承擔(dān)起之前Controller的所有工作。
二、controller的作用
- 維護(hù)每臺(tái)Broker上的分區(qū)副本信息
- 維護(hù)每個(gè)分區(qū)的Leader副本信息
三、controller為每個(gè)分區(qū)選舉leader
選舉規(guī)則:在isr中存活為前提, 按照AR中排在前面的優(yōu)先, 例如AR[1, 0, 2], ISR[1, 2], 那么leader會(huì)按照1,2的順序輪巡
對(duì)于topicA的partition0這個(gè)分區(qū),它選舉出broker1作為leader, 而broker0、broker2作為follower, controller會(huì)把這個(gè)信息告訴zookeeper(將節(jié)點(diǎn)信息上傳到zookeeper),這是為了防止controller掛了后, 新的controller不知道主副本信息
4.1.2 Kafka Broker上線與下線
模擬 Kafka 上下線,Zookeeper 中數(shù)據(jù)變化
一、查看/kafka/brokers/ids 路徑上的節(jié)點(diǎn)。
[zk: localhost:2181(CONNECTED) 2] ls /kafka/brokers/ids
[0, 1, 2]
二、查看/kafka/controller 路徑上的數(shù)據(jù)。
[zk: localhost:2181(CONNECTED) 15] get /kafka/controller
{"version":1,"brokerid":0,"timestamp":"1637292471777"}
三、查看/kafka/brokers/topics/first/partitions/0/state 路徑上的數(shù)據(jù)。
顯示first這個(gè)topic中id為0的partitions的情況
[zk:localhost:2181(CONNECTED)16] get/kafka/brokers/topics/first/partitions/0/state
{"controller_epoch":24,"leader":0,"version":1,"leader_epoch":18,"isr":[0,1,2]}
4.1.3 Broker 重要參數(shù)
參數(shù)名稱 | 描述 |
---|---|
replica.lag.time.max.ms | ISR中,如果Follower長(zhǎng)時(shí)間未向Leader發(fā)送通信請(qǐng)求或同步數(shù)據(jù),則該Follower將被踢出ISR。該時(shí)間閾值,默認(rèn)30s。 |
auto.leader.rebalance.enable | 默認(rèn)是 true。 自動(dòng)Leader Partition 平衡。 |
leader.imbalance.per.broker.percentage | 默認(rèn)是10%。每個(gè)broker允許的不平衡的leader的比率。如果每個(gè)broker超過(guò)了這個(gè)值,控制器會(huì)觸發(fā)leader的平衡。 |
leader.imbalance.check.interval.seconds | 默認(rèn)值 300 秒。檢查 leader 負(fù)載是否平衡的間隔時(shí)間。 |
log.segment.bytes | Kafka 中 log 日志是分成一塊塊存儲(chǔ)的,此配置是指 log 日志劃分 成塊的大小,默認(rèn)值 1G |
log.index.interval.bytes | 默認(rèn) 4kb,kafka 里面每當(dāng)寫(xiě)入了 4kb 大小的日志(.log),然后就往 index 文件里面記錄一個(gè)索引。 |
log.retention.hours | Kafka 中數(shù)據(jù)保存的時(shí)間,默認(rèn) 7 天。 |
log.retention.minutes | Kafka 中數(shù)據(jù)保存的時(shí)間,分鐘級(jí)別,默認(rèn)關(guān)閉。 |
log.retention.ms | Kafka 中數(shù)據(jù)保存的時(shí)間,毫秒級(jí)別,默認(rèn)關(guān)閉。 |
log.retention.check.interval.ms | 檢查數(shù)據(jù)是否保存超時(shí)的間隔,默認(rèn)是 5 分鐘。 |
log.retention.bytes | 默認(rèn)等于-1,表示無(wú)窮大。超過(guò)設(shè)置的所有日志總大小,刪除最早的 segment。 |
log.cleanup.policy | 默認(rèn)是 delete,表示所有數(shù)據(jù)啟用刪除策略;如果設(shè)置值為 compact,表示所有數(shù)據(jù)啟用壓縮策略。 |
num.io.threads | 默認(rèn)是 8。負(fù)責(zé)寫(xiě)磁盤(pán)的線程數(shù)。整個(gè)參數(shù)值要占總核數(shù)的 50%。 |
num.replica.fetchers | 副本拉取線程數(shù),這個(gè)參數(shù)占總核數(shù)的 50%的 1/3 |
num.network.threads | 默認(rèn)是 3。數(shù)據(jù)傳輸線程數(shù),這個(gè)參數(shù)占總核數(shù)的50%的 2/3 |
log.flush.interval.messages | 強(qiáng)制頁(yè)緩存刷寫(xiě)到磁盤(pán)的條數(shù),默認(rèn)是 long 的最大值,9223372036854775807。一般不建議修改,交給系統(tǒng)自己管理。 |
log.flush.interval.ms | 每隔多久,刷數(shù)據(jù)到磁盤(pán),默認(rèn)是 null。一般不建議修改,交給系統(tǒng)自己管理。 |
4.2 生產(chǎn)經(jīng)驗(yàn)—節(jié)點(diǎn)服役和退役
4.2.1 服役新節(jié)點(diǎn)(todo)
4.2.2 退役舊節(jié)點(diǎn)(todo)
4.3 Kafka副本
4.3.1 副本基本信息
一、kafka副本的作用
- kafka副本的作用:提高數(shù)據(jù)的可靠性
- kafka默認(rèn)副本1個(gè), 生產(chǎn)環(huán)境一般配置2個(gè), 保證數(shù)據(jù)可靠性; 太多副本會(huì)增加磁盤(pán)存儲(chǔ)空間, 增加網(wǎng)絡(luò)上傳數(shù)據(jù)傳輸, 降低效率
- kafka中副本分為:Leader和Follower, kafka生產(chǎn)者只會(huì)把數(shù)據(jù)發(fā)往Leader, 然后follower自己找leader進(jìn)行數(shù)據(jù)同步
- kafka分區(qū)中所有的副本統(tǒng)稱為AR(Assigned Repllicas), AR = ISR + OSR
- ISR: 表示和 Leader 保持同步的 Follower 集合。
如果 Follower 長(zhǎng)時(shí)間未向 Leader 發(fā)送通信請(qǐng)求或同步數(shù)據(jù),則該 Follower 將被踢出 ISR。該時(shí)間閾值由 replica.lag.time.max.ms參數(shù)設(shè)定,默認(rèn) 30s。Leader 發(fā)生故障之后,就會(huì)從 ISR 中選舉新的 Leader。 - OSR: 表示 Follower 與 Leader 副本同步時(shí),延遲過(guò)多的副本。
- ISR: 表示和 Leader 保持同步的 Follower 集合。
4.3.2 Leader選舉流程
一、Controller Leader
Kafka 集群中有一個(gè)broker的Controller會(huì)被選舉為Controller Leader,負(fù)責(zé)管理集群broker 的上下線,所有topic的分區(qū)副本分配和Leader選舉等工作。(Controller的信息同步工作是依賴于Zookeeper的)
1.創(chuàng)建一個(gè)新的topic, 設(shè)置4個(gè)分區(qū), 4個(gè)副本
bin/kafka-topics.sh --bootstrap-server 192.168.202.128:9092 --create --topic atguigu1 --partitions 4 --replication-factor 4
Created topic atguigu1.
2.查看這4個(gè)分區(qū)的leander分布情況
bin/kafka-topics.sh --bootstrap-server 192.168.202.128:9092 --describe --topic atguigu1
Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes=1073741824
Topic: atguigu1Partition: 0 Leader: 3 Replicas: 3,0,2,1 Isr: 3,0,2,1
Topic: atguigu1Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,2,3,0
Topic: atguigu1Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,3,1,2
Topic: atguigu1Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 2,1,0,3
3.停止其中一個(gè)kafka的進(jìn)程, 查看Leader分區(qū)的狀況
其中我們停到了Broker.id=3的節(jié)點(diǎn), 我們發(fā)現(xiàn)分區(qū)0的leader原本是節(jié)點(diǎn)3. 但是因?yàn)楣?jié)點(diǎn)3掛了, 所以ISR中重新找到節(jié)點(diǎn)0作為leader
Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,2,1
Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,2,0
Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,2
Topic: atguigu1 Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 2,1,0
4.停止Broker.id=2的節(jié)點(diǎn), 并查看 Leader 分區(qū)情況
Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,1
Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,0
Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1
Topic: atguigu1 Partition: 3 Leader: 1 Replicas: 2,1,0,3 Isr: 1,0
5.重新啟動(dòng)關(guān)停的kafka節(jié)點(diǎn), 查看leader分區(qū)的狀況
我們發(fā)現(xiàn)leader已經(jīng)選舉成功后, 并不會(huì)因?yàn)楣?jié)點(diǎn)的重啟再去選舉一次
Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,1,3,2
Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,0,3,2
Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,3,2
Topic: atguigu1 Partition: 3 Leader: 1 Replicas: 2,1,0,3 Isr: 1,0,3,2
4.3.3 Leader和Follower故障處理細(xì)節(jié)(todo)
4.3.4 分區(qū)副本分配
如果kafka服務(wù)器只有4個(gè)節(jié)點(diǎn), 那么設(shè)置kafka的分區(qū)數(shù) > 服務(wù)器臺(tái)數(shù), 在kafka底層是如何分配存儲(chǔ)副本的呢?
1.創(chuàng)建16個(gè)分區(qū), 3個(gè)副本
創(chuàng)建一個(gè)新的 topic,名稱為 second。
bin/kafka-topics.sh --bootstrap-server 192.168.202.128:9092 --create --partitions 16 --replication-factor 3 --topic second
2.查看分區(qū)和副本情況
Topic: second4 Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: second4 Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: second4 Partition: 2 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0
Topic: second4 Partition: 3 Leader: 3 Replicas: 3,0,1 Isr: 3,0,1
Topic: second4 Partition: 4 Leader: 0 Replicas: 0,2,3 Isr: 0,2,3
Topic: second4 Partition: 5 Leader: 1 Replicas: 1,3,0 Isr: 1,3,0
Topic: second4 Partition: 6 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: second4 Partition: 7 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: second4 Partition: 8 Leader: 0 Replicas: 0,3,1 Isr: 0,3,1
Topic: second4 Partition: 9 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: second4 Partition: 10 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
Topic: second4 Partition: 11 Leader: 3 Replicas: 3,2,0 Isr: 3,2,0
Topic: second4 Partition: 12 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: second4 Partition: 13 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: second4 Partition: 14 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0
Topic: second4 Partition: 15 Leader: 3 Replicas: 3,0,1 Isr: 3,0,1
4.3.5 手動(dòng)調(diào)整分區(qū)副本存儲(chǔ)
在生產(chǎn)環(huán)境中,每臺(tái)服務(wù)器的配置和性能不一致,但是Kafka只會(huì)根據(jù)自己的代碼規(guī)則創(chuàng)建對(duì)應(yīng)的分區(qū)副本,就會(huì)導(dǎo)致個(gè)別服務(wù)器存儲(chǔ)壓力較大。所有需要手動(dòng)調(diào)整分區(qū)副本的存儲(chǔ)。
需求:創(chuàng)建一個(gè)新的topic,4個(gè)分區(qū),兩個(gè)副本,名稱為three。將該topic的所有副本都存儲(chǔ)到broker0和broker1兩臺(tái)服務(wù)器上。
1.創(chuàng)建一個(gè)新的 topic,名稱為 three。
bin/kafka-topics.sh --bootstrap-server 192.168.202.128:9092 --create --partitions 4 --replication-factor 2 --topic three
2.創(chuàng)建副本存儲(chǔ)計(jì)劃
所有副本都指定存儲(chǔ)在 broker0、broker1 中
vim increase-replication-factor.json
輸入如下內(nèi)容:
{
"version":1,
"partitions":[
{"topic":"three","partition":0,"replicas":[0,1]},
{"topic":"three","partition":1,"replicas":[0,1]},
{"topic":"three","partition":2,"replicas":[1,0]},
{"topic":"three","partition":3,"replicas":[1,0]}
]
}
3.執(zhí)行副本存儲(chǔ)計(jì)劃文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-676707.html
bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.202.128:9092 --reassignment-json-file increase-replication-factor.json --execute
4.3.6 生產(chǎn)經(jīng)驗(yàn)—Leader Partition 負(fù)載平衡(todo)
正常情況下,Kafka本身會(huì)自動(dòng)把Leader Partition均勻分散在各個(gè)機(jī)器上,來(lái)保證每臺(tái)機(jī)器的讀寫(xiě)吞吐量都是均勻的。但是如果某些broker宕機(jī),會(huì)導(dǎo)致Leader Partition過(guò)于集中在其他少部分幾臺(tái)broker上,這會(huì)導(dǎo)致少數(shù)幾臺(tái)broker的讀寫(xiě)請(qǐng)求壓力過(guò)高,其他宕機(jī)的broker重啟之后都是follower partition,讀寫(xiě)請(qǐng)求很低,造成集群負(fù)載不均衡。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-676707.html
4.3.7 生產(chǎn)經(jīng)驗(yàn)—增加副本因子(todo)
4.4 文件存儲(chǔ)
4.4.1 文件存儲(chǔ)機(jī)制(todo)
4.4.2 文件清洗策略(todo)
4.5 高效讀寫(xiě)數(shù)據(jù)
到了這里,關(guān)于Kafka(生產(chǎn)者)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!