1. 下載安裝
https://www.cnblogs.com/zhangzhonghui/articles/12444070.html
kafka配置詳解
若kafka運(yùn)行在內(nèi)網(wǎng)服務(wù)器允許外網(wǎng)訪問,例如內(nèi)網(wǎng)ip: 172.10.22.134,外網(wǎng)ip: 9.70.168.130
進(jìn)行如下配置:
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://9.70.168.130:9092
外網(wǎng)訪問時使用9.70.168.130:9092
訪問即可
2. 命令行命令
cd kafka安裝目錄
cd ~/kafka
- 后臺啟動
# zookeeper
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
# kafka
bin/kafka-server-start.sh -daemon config/server.properties
- 停止
bin/kafka-server-stop.sh
- topic
#創(chuàng)建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic your_topic
# 查看topic
bin/kafka-topics.sh --zookeeper localhost:2181 --list
# 查看特定topic
bin/kafka-topics.sh --zookeeper localhost:2181 --topic your_topic --describe
- producer
# 發(fā)送數(shù)據(jù)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic your_topic
- 查看 consumer-groups
# 新版
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list
# 舊版
bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list
# 新版特定consumer-groups
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9292 --group YOUR_GROUP_ID --describe
# 舊版特定consumer-groups
bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --group YOUR_GROUP_ID --describe
- 開啟consumer消費(fèi)某個topic(從頭消費(fèi))
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TOPIC --from-beginning
3. 概述
3.1 定義
Kafka是一個分布式的基于發(fā)布/訂閱模式的消息隊列,主要應(yīng)用于大數(shù)據(jù)實(shí)時處理領(lǐng)域。Kafka對消息保存時根據(jù)Topic進(jìn)行歸類,發(fā)送消息者稱為Producer,消息接受者稱為Consumer,此外kafka集群有多個kafka實(shí)例組成,每個實(shí)例(server)成為broker。
3.2 基本架構(gòu)
1)Producer :消息生產(chǎn)者,就是向kafka broker發(fā)消息的客戶端。
2)Consumer :消息消費(fèi)者,向kafka broker取消息的客戶端
3)Topic :可以理解為一個隊列。
4) Consumer Group (CG):消費(fèi)者組,由多個consumer組成。消費(fèi)者組內(nèi)每個消費(fèi)者負(fù)責(zé)消費(fèi)不同分區(qū)的數(shù)據(jù),一個分區(qū)只能由一個消費(fèi)者消費(fèi);消費(fèi)者組之間互不影響。所有的消費(fèi)者都屬于某個消費(fèi)者組,即消費(fèi)者組是邏輯上的一個訂閱者。
5)Broker :一臺kafka服務(wù)器就是一個broker。一個集群由多個broker組成。一個broker可以容納多個topic。
6)Partition:為了實(shí)現(xiàn)擴(kuò)展性,一個非常大的topic可以分布到多個broker(即服務(wù)器)上,一個topic可以分為多個partition,每個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)。kafka只保證按一個partition中的順序?qū)⑾l(fā)給consumer,不保證一個topic的整體(多個partition間)的順序。
7)Replica:副本,為保證集群中的某個節(jié)點(diǎn)發(fā)生故障時,該節(jié)點(diǎn)上的partition數(shù)據(jù)不丟失,且kafka仍然能夠繼續(xù)工作,kafka提供了副本機(jī)制,一個topic的每個分區(qū)都有若干個副本,一個leader和若干個follower。
8)leader:每個分區(qū)多個副本的“主”,生產(chǎn)者發(fā)送數(shù)據(jù)的對象,以及消費(fèi)者消費(fèi)數(shù)據(jù)的對象都是leader。
9)follower:每個分區(qū)多個副本中的“從”,實(shí)時從leader中同步數(shù)據(jù),保持和leader數(shù)據(jù)的同步。leader發(fā)生故障時,某個follower會成為新的follower。
10)Offset:kafka的存儲文件都是按照offset.kafka來命名,用offset做名字的好處是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。當(dāng)然the first offset就是00000000000.kafka
4. 架構(gòu)深入
4.1 生產(chǎn)者
4.1.1 分區(qū)
1)分區(qū)的原因
- 方便在集群中擴(kuò)展,每個Partition可以通過調(diào)整以適應(yīng)它所在的機(jī)器,而一個topic又可以有多個Partition組成,因此整個集群就可以適應(yīng)任意大小的數(shù)據(jù)了;
- 可以提高并發(fā),因為可以以Partition為單位讀寫了。
2)分區(qū)的原則
我們需要將producer發(fā)送的數(shù)據(jù)封裝成一個ProducerRecord對象。
- 指明 partition 的情況下,直接將指明的值直接作為 partiton 值;
- 沒有指明 partition 值但有 key 的情況下,將 key 的 hash 值與 topic 的 partition 數(shù)進(jìn)行取余得到 partition 值;
- 既沒有 partition 值又沒有 key 值的情況下,第一次調(diào)用時隨機(jī)生成一個整數(shù)(后面每次調(diào)用在這個整數(shù)上自增),將這個值與 topic 可用的 partition 總數(shù)取余得到 partition 值,也就是常說的 round-robin 算法。
4.1.2 數(shù)據(jù)可靠性保證
為保證producer發(fā)送的數(shù)據(jù),能可靠的發(fā)送到指定的topic,topic的每個partition收到producer發(fā)送的數(shù)據(jù)后,都需要向producer發(fā)送ack(acknowledgement確認(rèn)收到),如果producer收到ack,就會進(jìn)行下一輪的發(fā)送,否則重新發(fā)送數(shù)據(jù)。
1)副本數(shù)據(jù)同步策略
方案 | 優(yōu)點(diǎn) | 缺點(diǎn) |
---|---|---|
半數(shù)以上完成同步,就發(fā)送ack | 延遲低 | 選舉新的leader時,為了容忍n臺節(jié)點(diǎn)的故障,需要2n+1個副本 |
全部完成同步,才發(fā)送ack | 選舉新的leader時,容忍n臺節(jié)點(diǎn)的故障,只需要n+1個副本 | 延遲高 |
Kafka選擇了第二種方案,原因如下:
- 同樣為了容忍n臺節(jié)點(diǎn)的故障,第一種方案需要2n+1個副本,而第二種方案只需要n+1個副本,而Kafka的每個分區(qū)都有大量的數(shù)據(jù),第一種方案會造成大量數(shù)據(jù)的冗余。
- 雖然第二種方案的網(wǎng)絡(luò)延遲會比較高,但網(wǎng)絡(luò)延遲對Kafka的影響較小。
2)ISR
采用第二種方案之后,設(shè)想以下情景:leader收到數(shù)據(jù),所有follower都開始同步數(shù)據(jù),但有一個follower,因為某種故障,遲遲不能與leader進(jìn)行同步,那leader就要一直等下去,直到它完成同步,才能發(fā)送ack。這個問題怎么解決呢?
Leader維護(hù)了一個動態(tài)的in-sync replica set (ISR),即leader保持同步的follower集合。當(dāng)ISR中的follower完成數(shù)據(jù)的同步之后,leader就會給follower發(fā)送ack。如果follower長時間未向leader同步數(shù)據(jù),則該follower將被踢出ISR,該時間閾值由replica.lag.time.max.ms
參數(shù)設(shè)定。Leader發(fā)生故障之后,就會從ISR中選舉新的leader。之前舊版本還有保留 replica.lag.max.messages
,但后續(xù)被刪除了, 因為假如某個時刻數(shù)據(jù)量很大,follower還沒反應(yīng)過來就被踢了,但follower本身是健康的,這顯然是不合理的。
3)ack應(yīng)答機(jī)制
對于某些不太重要的數(shù)據(jù),對數(shù)據(jù)的可靠性要求不是很高,能夠容忍數(shù)據(jù)的少量丟失,所以沒必要等ISR中的follower全部接收成功。所以Kafka為用戶提供了三種可靠性級別,用戶根據(jù)對可靠性和延遲的要求進(jìn)行權(quán)衡,選擇以下的配置。
acks:
-
0:producer不等待broker的ack,這一操作提供了一個最低的延遲,broker一接收到還沒有寫入磁盤就已經(jīng)返回,當(dāng)broker故障時有可能丟失數(shù)據(jù)
-
1:producer等待broker的ack,partition的leader落盤成功后返回ack,如果在follower同步成功之前l(fā)eader故障,那么將會丟失數(shù)據(jù)
-
-1(all):producer等待broker的ack,partition的leader和follower全部落盤成功后才返回ack。但是如果在follower同步完成后,broker發(fā)送ack之前,leader發(fā)生故障,那么會造成數(shù)據(jù)重復(fù)
4)故障處理細(xì)節(jié)
(1)follower故障
follower發(fā)生故障后會被臨時踢出ISR,待該follower恢復(fù)后,follower會讀取本地磁盤記錄的上次的HW,并將log文件高于HW的部分截取掉,從HW開始向leader進(jìn)行同步。等該follower的LEO大于等于該P(yáng)artition的HW,即follower追上leader之后,就可以重新加入ISR了。
(2)leader故障
leader發(fā)生故障之后,會從ISR中選出一個新的leader,之后,為保證多個副本之間的數(shù)據(jù)一致性,其余的follower會先將各自的log文件高于HW的部分截掉,然后從新的leader同步數(shù)據(jù)。
注意:這只能保證副本之間的數(shù)據(jù)一致性,并不能保證數(shù)據(jù)不丟失或者不重復(fù)。數(shù)據(jù)的不丟失或不重復(fù)由ack來決定
4.1.3 Exactly Once語義
對于某些比較重要的消息,我們需要保證exactly once語義,即保證每條消息被發(fā)送且僅被發(fā)送一次。
在0.11版本之后,Kafka引入了冪等性機(jī)制(idempotent),配合acks = -1時的at least once語義,實(shí)現(xiàn)了producer到broker的exactly once語義(消費(fèi)者的exactly once后續(xù)再說)。
使用時,只需將enable.idempotence屬性設(shè)置為true,kafka自動將acks屬性設(shè)為-1
在Kafka中冪等性指相同的多條數(shù)據(jù)只會保存一條,那如何確認(rèn)是相同的數(shù)據(jù)?kafka中對于每條數(shù)據(jù)都有個id,id由producerId+SequenceNumber組成,當(dāng)數(shù)據(jù)到達(dá)kafka后,id將會被暫時緩存起來,若此時producer沒有收到ack會重發(fā)數(shù)據(jù),發(fā)現(xiàn)數(shù)據(jù)跟緩存中的數(shù)據(jù)id是一致的則不持久化
Properties props = new Properties();
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put("acks", "all"); // 當(dāng) enable.idempotence 為 true,這里默認(rèn)為 all
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer(props);
producer.send(new ProducerRecord(topic, "test");
4.1.4 發(fā)送消息流程
Kafka的Producer發(fā)送消息采用的是異步發(fā)送的方式。在消息發(fā)送的過程中,涉及到了兩個線程——main線程和Sender線程,以及一個線程共享變量——RecordAccumulator。main線程將消息發(fā)送給RecordAccumulator,Sender線程不斷從RecordAccumulator中拉取消息發(fā)送到Kafka broker。
相關(guān)參數(shù):
- batch.size:只有數(shù)據(jù)積累到batch.size之后,sender才會發(fā)送數(shù)據(jù)。
- linger.ms:如果數(shù)據(jù)遲遲未達(dá)到batch.size,sender等待linger.time之后就會發(fā)送數(shù)據(jù)。
- buffer.memory:RecordAccumulator緩沖區(qū)大小
package com.atguigu.kafka;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class CustomProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop102:9092");//kafka集群,broker-list
props.put("acks", "all");
props.put("retries", 1);//重試次數(shù)
props.put("batch.size", 16384);//批次大小
props.put("linger.ms", 1);//等待時間
props.put("buffer.memory", 33554432);//RecordAccumulator緩沖區(qū)大小
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
// 同步方式 -- get()
// producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i))).get();
// 異步:不帶回調(diào)函數(shù)
// producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i)));
// 異步:帶回調(diào)函數(shù)
producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i)), new Callback() {
//回調(diào)函數(shù),該方法會在Producer收到ack時調(diào)用,為異步調(diào)用
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("success->" + metadata.offset());
} else {
exception.printStackTrace();
}
}
});
}
producer.close();
}
}
4.2 broker
4.2.1 日志結(jié)構(gòu)
Kafka中消息是以topic進(jìn)行分類的,生產(chǎn)者生產(chǎn)消息,消費(fèi)者消費(fèi)消息,都是面向topic的。
topic是邏輯上的概念,而partition是物理上的概念,每個partition對應(yīng)于一個log文件,該log文件中存儲的就是producer生產(chǎn)的數(shù)據(jù)。Producer生產(chǎn)的數(shù)據(jù)會被不斷追加到該log文件末端,且每條數(shù)據(jù)都有自己的offset。消費(fèi)者組中的每個消費(fèi)者,都會實(shí)時記錄自己消費(fèi)到了哪個offset,以便出錯恢復(fù)時,從上次的位置繼續(xù)消費(fèi)。
由于生產(chǎn)者生產(chǎn)的消息會不斷追加到log文件末尾,為防止log文件過大導(dǎo)致數(shù)據(jù)定位效率低下,Kafka采取了分片和索引機(jī)制,將每個partition分為多個segment。每個segment對應(yīng)兩個文件——“.index”文件和“.log”文件。這些文件位于一個文件夾下,該文件夾的命名規(guī)則為:topic名稱+分區(qū)序號。例如,first這個topic有三個分區(qū),則其對應(yīng)的文件夾為first-0,first-1,first-2,每個文件夾下有如下文件:
00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log
“.index”文件存儲大量的索引信息,“.log”文件存儲大量的數(shù)據(jù),索引文件中的元數(shù)據(jù)指向?qū)?yīng)數(shù)據(jù)文件中message的物理偏移地址,index和log文件以當(dāng)前segment的第一條消息的offset命名
4.2.2 存儲策略
無論消息是否被消費(fèi),kafka都會保留所有消息,當(dāng)然可對消息進(jìn)行壓縮或者刪除。
有兩種策略可以刪除舊數(shù)據(jù):
1)基于時間:log.retention.hours=168
2)基于大?。簂og.retention.bytes=1073741824
需要注意的是,因為Kafka讀取特定消息的時間復(fù)雜度為O(1),即與文件大小無關(guān),所以這里刪除過期文件與提高 Kafka 性能無關(guān)。
4.2.3 Controller & ZooKeeper
Kafka集群中有一個broker會被選舉為Controller(先到先得),負(fù)責(zé)管理集群broker的上下線,所有topic的分區(qū)副本分配和leader選舉等工作,Controller的管理工作都是依賴于Zookeeper的。
以下為partition的leader選舉過程:
(1) 到zookeeper中的brokers/ids/
注冊節(jié)點(diǎn)信息 [0,1,2]
(2) 從ISR中獲取選取leader 0,到/brokers/topics/first/partitions/0/state
更新 topic的partition信息
(3) leader 0發(fā)生故障,通知 /brokers/ids/
節(jié)點(diǎn),更新為 [1,2]
(4) Controller監(jiān)聽到節(jié)點(diǎn)發(fā)生變化,則重新獲取ISD,選舉新leader,再更新leader及ISR
4.2.4 高效讀寫數(shù)據(jù)
1)順序?qū)懘疟P
Kafka的producer生產(chǎn)數(shù)據(jù),要寫入到log文件中,寫的過程是一直追加到文件末端,為順序?qū)?。官網(wǎng)有數(shù)據(jù)表明,同樣的磁盤,順序?qū)懩艿降?00M/s,而隨機(jī)寫只有100k/s。這與磁盤的機(jī)械機(jī)構(gòu)有關(guān),順序?qū)懼钥?,是因為其省去了大量磁頭尋址的時間。
2) Page Cache
為了優(yōu)化讀寫性能,Kafka 利用了操作系統(tǒng)本身的 Page Cache。數(shù)據(jù)通過mmap內(nèi)存映射的方式直接寫入page cache,定時刷新臟頁到磁盤。消費(fèi)者拉取消息時,如果數(shù)據(jù)在page cache中,甚至能不需要去讀磁盤io。讀操作可直接在 Page Cache 內(nèi)進(jìn)行。如果消費(fèi)和生產(chǎn)速度相當(dāng),甚至不需要通過物理磁盤(直接通過 Page Cache)交換數(shù)據(jù)。
3)零拷貝技術(shù)
傳統(tǒng)路徑:File -> Page Cache -> Application Cache -> Socket Cache -> NIC
Kafka零拷貝過程:File -> Page Cache -> NIC
為啥可以進(jìn)行零拷貝?因為kafka寫進(jìn)去.log文件的數(shù)據(jù)就是經(jīng)過序列化的,讀取出來的數(shù)據(jù)就不要經(jīng)過用戶空間的處理了,減少了不必要的拷貝次數(shù)和用戶態(tài)和內(nèi)核態(tài)的切換,大大減少讀取的時延
參考:kafka的零拷貝
4) 分區(qū)分段+稀疏索引
Kafka 的 message 是按 topic分 類存儲的,topic 中的數(shù)據(jù)又是按照一個一個的 partition 即分區(qū)存儲到不同 broker 節(jié)點(diǎn)。每個 partition 對應(yīng)了操作系統(tǒng)上的一個文件夾,partition 實(shí)際上又是按照segment分段存儲的。通過這種分區(qū)分段的設(shè)計,Kafka 的 message 消息實(shí)際上是分布式存儲在一個一個小的 segment 中的,每次文件操作也是直接操作的 segment。為了進(jìn)一步的查詢優(yōu)化,Kafka 又默認(rèn)為分段后的數(shù)據(jù)文件建立了索引文件,就是文件系統(tǒng)上的.index文件。這種分區(qū)分段+索引的設(shè)計,不僅提升了數(shù)據(jù)讀取的效率,同時也提高了數(shù)據(jù)操作的并行度。
5) 批量讀寫
生產(chǎn)者可以借助累加器,批量發(fā)送消息,消費(fèi)者也可以批量拉取消費(fèi)
4.3 消費(fèi)者
4.3.1 消費(fèi)方式
push(推)模式很難適應(yīng)消費(fèi)速率不同的消費(fèi)者,因為消息發(fā)送速率是由broker決定的。它的目標(biāo)是盡可能以最快速度傳遞消息,但是這樣很容易造成consumer來不及處理消息,典型的表現(xiàn)就是拒絕服務(wù)以及網(wǎng)絡(luò)擁塞。而pull模式則可以根據(jù)consumer的消費(fèi)能力以適當(dāng)?shù)乃俾氏M(fèi)消息。
所以consumer采用pull(拉)模式從broker中讀取數(shù)據(jù)。
pull模式不足之處是,如果kafka沒有數(shù)據(jù),消費(fèi)者可能會陷入循環(huán)中,一直返回空數(shù)據(jù)。針對這一點(diǎn),Kafka的消費(fèi)者在消費(fèi)數(shù)據(jù)時會傳入一個時長參數(shù)timeout,如果當(dāng)前沒有數(shù)據(jù)可供消費(fèi),consumer會等待一段時間之后再返回,這段時長即為timeout。
4.3.2 分區(qū)分配策略
一個consumer group中有多個consumer,一個 topic有多個partition,所以必然會涉及到partition的分配問題,即確定那個partition由哪個consumer來消費(fèi)。
Kafka有三種消費(fèi)者分區(qū)分配策略:
1.RoundRobin
2.Range
3.Sticky
kafka在0.11版本引入了Sticky分區(qū)分配策略,它的兩個主要目的是:
- 分區(qū)的分配要盡可能的均勻,分配給消費(fèi)者者的主題分區(qū)數(shù)最多相差一個;
- 分區(qū)的分配盡可能的與上次分配的保持相同。
當(dāng)兩者發(fā)生沖突時,第一個目標(biāo)優(yōu)先于第二個目標(biāo)。
參考:kafka的消費(fèi)者分區(qū)分配策略
4.3.3 offset的維護(hù)
由于consumer在消費(fèi)過程中可能會出現(xiàn)斷電宕機(jī)等故障,consumer恢復(fù)后,需要從故障前的位置的繼續(xù)消費(fèi),所以consumer需要實(shí)時記錄自己消費(fèi)到了哪個offset,以便故障恢復(fù)后繼續(xù)消費(fèi)。
Kafka 0.9版本之前,consumer默認(rèn)將offset保存在Zookeeper中,從0.9版本開始,consumer默認(rèn)將offset保存在Kafka一個內(nèi)置的topic中,該topic為__consumer_offsets。
4.3.4 Eactly Once語義
需要保證消費(fèi)數(shù)據(jù)和提交offset是原子性的
4.4 攔截器
Producer攔截器(interceptor)是在Kafka 0.10版本被引入的,主要用于實(shí)現(xiàn)clients端的定制化控制邏輯。
對于producer而言,interceptor使得用戶在消息發(fā)送前以及producer回調(diào)邏輯前有機(jī)會對消息做一些定制化需求,比如修改消息等。同時,producer允許用戶指定多個interceptor按序作用于同一條消息從而形成一個攔截鏈(interceptor chain)。Intercetpor的實(shí)現(xiàn)接口是org.apache.kafka.clients.producer.ProducerInterceptor
,其定義的方法包括:
(1)configure(configs)
獲取配置信息和初始化數(shù)據(jù)時調(diào)用。
(2)onSend(ProducerRecord):
該方法封裝進(jìn)KafkaProducer.send方法中,即它運(yùn)行在用戶主線程中。Producer確保在消息被序列化以及計算分區(qū)前調(diào)用該方法。用戶可以在該方法中對消息做任何操作,但最好保證不要修改消息所屬的topic和分區(qū),否則會影響目標(biāo)分區(qū)的計算。
(3)onAcknowledgement(RecordMetadata, Exception):
該方法會在消息從RecordAccumulator成功發(fā)送到Kafka Broker之后,或者在發(fā)送過程中失敗時調(diào)用。并且通常都是在producer回調(diào)邏輯觸發(fā)之前。onAcknowledgement運(yùn)行在producer的IO線程中,因此不要在該方法中放入很重的邏輯,否則會拖慢producer的消息發(fā)送效率。
(4)close:
關(guān)閉interceptor,主要用于執(zhí)行一些資源清理工作
如前所述,interceptor可能被運(yùn)行在多個線程中,因此在具體實(shí)現(xiàn)時用戶需要自行確保線程安全。另外倘若指定了多個interceptor,則producer將按照指定順序調(diào)用它們,并僅僅是捕獲每個interceptor可能拋出的異常記錄到錯誤日志中而非在向上傳遞。這在使用過程中要特別留意。
案例:
在消息發(fā)送前將時間戳信息加到消息value的最前部
package com.atguigu.kafka.interceptor;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class TimeInterceptor implements ProducerInterceptor<String, String> {
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
// 創(chuàng)建一個新的record,把時間戳寫入消息體的最前部
return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
System.currentTimeMillis() + "," + record.value().toString());
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
}
主程序
package com.atguigu.kafka.interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
public class InterceptorProducer {
public static void main(String[] args) throws Exception {
// 1 設(shè)置配置信息
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop102:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 2 構(gòu)建攔截鏈
List<String> interceptors = new ArrayList<>();
interceptors.add("com.atguigu.kafka.interceptor.TimeInterceptor");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
String topic = "first";
Producer<String, String> producer = new KafkaProducer<>(props);
// 3 發(fā)送消息
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "message" + i);
producer.send(record);
}
// 4 一定要關(guān)閉producer,這樣才會調(diào)用interceptor的close方法
producer.close();
}
}
5. 面試題
-
Kafka中的ISR、AR又代表什么?
ISR:與leader保持同步的follower集合
AR:分區(qū)的所有副本 -
Kafka中的HW、LEO等分別代表什么?
LEO:沒個副本的最后條消息的offset
HW:一個分區(qū)中所有副本最小的offset -
Kafka中是怎么體現(xiàn)消息順序性的?
每個分區(qū)內(nèi),每條消息都有一個offset,故只能保證分區(qū)內(nèi)有序。 -
Kafka中的分區(qū)器、序列化器、攔截器是否了解?它們之間的處理順序是什么?
攔截器 -> 序列化器 -> 分區(qū)器
但攔截器中主要有兩個方法,onSend() 和 onAcknowledgement(),onSend()是在消息被序列化以及計算分區(qū)前調(diào)用該方法,onAcknowledgement()成功發(fā)送到Kafka Broker之后,或者在發(fā)送過程中失敗時調(diào)用 -
Kafka生產(chǎn)者客戶端的整體結(jié)構(gòu)是什么樣子的?使用了幾個線程來處理?分別是什么?
兩個線程:主線程和sender線程 -
“消費(fèi)組中的消費(fèi)者個數(shù)如果超過topic的分區(qū),那么就會有消費(fèi)者消費(fèi)不到數(shù)據(jù)”這句話是否正確?
正確 -
消費(fèi)者提交消費(fèi)位移時提交的是當(dāng)前消費(fèi)到的最新消息的offset還是offset+1?
offset+1 -
有哪些情形會造成重復(fù)消費(fèi)?
生產(chǎn)者:ack=-1時,leader和follower同步完成后,broker發(fā)送ack之前,leader發(fā)生故障,那么會造成數(shù)據(jù)重復(fù)。
消費(fèi)者:先消費(fèi)后提交offset -
那些情景會造成消息漏消費(fèi)?
生產(chǎn)者:ack=0,broker還沒完成寫入 或 ack=-1,leader落盤返回ack,但follower還沒完成同步leader就掛了
消費(fèi)者:先提交offset,后消費(fèi),有可能造成數(shù)據(jù)的重復(fù) -
當(dāng)你使用kafka-topics.sh創(chuàng)建(刪除)了一個topic之后,Kafka背后會執(zhí)行什么邏輯?
1)會在zookeeper中的/brokers/topics節(jié)點(diǎn)下創(chuàng)建一個新的topic節(jié)點(diǎn),如:/brokers/topics/first
2)觸發(fā)Controller的監(jiān)聽程序
3)kafka Controller 負(fù)責(zé)topic的創(chuàng)建工作,并更新metadata cache -
topic的分區(qū)數(shù)可不可以增加?如果可以怎么增加?如果不可以,那又是為什么?
可以增加
bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic topic-config --partitions 3 -
topic的分區(qū)數(shù)可不可以減少?如果可以怎么減少?如果不可以,那又是為什么?
不可以減少,被刪除的分區(qū)數(shù)據(jù)難以處理(該分區(qū)的數(shù)據(jù)該如何被其他分區(qū)消費(fèi)?如何分配?從哪里開始消費(fèi)?等一系列難以處理的問題,所以kafka不支持)。 -
Kafka有內(nèi)部的topic嗎?如果有是什么?有什么所用?
__consumer_offsets,保存消費(fèi)者offset -
Kafka分區(qū)分配的概念?
一個topic多個分區(qū),一個消費(fèi)者組多個消費(fèi)者,故需要將分區(qū)分配個消費(fèi)者(roundrobin、range、Sticky) -
簡述Kafka的日志目錄結(jié)構(gòu)?
每個分區(qū)對應(yīng)一個文件夾,文件夾的命名為topic-0,topic-1,內(nèi)部為.log和.index文件 -
聊一聊Kafka Controller的作用?
負(fù)責(zé)管理集群broker的上下線,所有topic的分區(qū)副本分配和leader選舉等工作。 -
Kafka中有那些地方需要選舉?這些地方的選舉策略又有哪些?
partition leader(ISR),controller(先到先得) -
失效副本是指什么?有那些應(yīng)對措施?
不能及時與leader同步,暫時踢出ISR,恢復(fù)后讀取磁盤中持久化的HW,從HW開始消費(fèi),等其追上leader之后再重新加入 -
Kafka的那些設(shè)計讓它有如此高的性能?
分區(qū),順序?qū)懘疟P,0-copy -
Kafka如何保證Exactly Once語義
生產(chǎn)者:ack=-1并且開啟冪等性,enable.idempotence屬性設(shè)置為true(該選項會自動將ack設(shè)置為-1)
消費(fèi)者:保證消費(fèi)和提交offset是原子性的 -
如何提升吞吐量?
-
提升生產(chǎn)吞吐量
(1)buffer.memory:發(fā)送消息的緩沖區(qū)大小,默認(rèn)值是 32m,可以增加到 64m。
(2)batch.size:默認(rèn)是 16k。如果 batch 設(shè)置太小,會導(dǎo)致頻繁網(wǎng)絡(luò)請求,吞吐量下降;
如果 batch 太大,會導(dǎo)致一條消息需要等待很久才能被發(fā)送出去,增加網(wǎng)絡(luò)延時。
(3)linger.ms,這個值默認(rèn)是 0,意思就是消息必須立即被發(fā)送。一般設(shè)置一個 5-100
毫秒。如果 linger.ms 設(shè)置的太小,會導(dǎo)致頻繁網(wǎng)絡(luò)請求,吞吐量下降;如果 linger.ms 太長,
會導(dǎo)致一條消息需要等待很久才能被發(fā)送出去,增加網(wǎng)絡(luò)延時。
(4)compression.type:默認(rèn)是 none,不壓縮,但是也可以使用 lz4 壓縮,效率還是不
錯的,壓縮之后可以減小數(shù)據(jù)量,提升吞吐量,但是會加大 producer 端的 CPU 開銷。 -
增加分區(qū)
-
消費(fèi)者提高吞吐量
(1)增大 fetch.max.bytes 大?。J(rèn)是 50m) – 批量消費(fèi)每次最多消費(fèi)多少條消息
(2)增大 max.partition.fetch.bytes(默認(rèn)是 1m) – 一次fetch請求中,從一個partition中取得的records最大大小
(3)增大 max.poll.records 大小(默認(rèn)是 500 條) – 一次fetch請求中從一個broker中取得records的最大大小文章來源:http://www.zghlxwxcb.cn/news/detail-500415.html -
增加下游消費(fèi)者處理能力文章來源地址http://www.zghlxwxcb.cn/news/detail-500415.html
到了這里,關(guān)于kafka原理&架構(gòu)深入的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!