目錄
一、Kafka概述
1、簡介
2、消息隊列
(1)消息隊列應(yīng)用場景
(2)消息隊列的兩種模式
?3、Kafka的基礎(chǔ)架構(gòu)
二、Kafka的安裝與常見命令
1、Kafka的安裝
2、Kafka的命令行操作
(1)kafka-topics.sh
(2)kafka-console-producer.sh和kafka-console-consumer.sh
?三、Kafka的生產(chǎn)者
1、發(fā)送原理
2、異步發(fā)送
3、同步發(fā)送
4、生產(chǎn)者分區(qū)(Partitioner分區(qū)器)
?(1)默認(rèn)分區(qū)器DefaultPartitioner
?(2)自定義分區(qū)器
?5、生產(chǎn)經(jīng)驗
(1)常見參數(shù)的經(jīng)驗配置
?(2)如何保證數(shù)據(jù)傳輸?shù)耐耆煽?/p>
(3)數(shù)據(jù)去重
?(4)數(shù)據(jù)有序
(5)數(shù)據(jù)亂序問題
Apache Kafka? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?---kafka官方文檔
一、Kafka概述
1、簡介
引例:某東購物網(wǎng)站,它會將所有用戶什么時間,使用什么設(shè)備訪問了什么內(nèi)容都進(jìn)行記錄并保存在后臺的日志文件log中。程序員可以使用Flume工具實時的從日志文件中采集信息,然后將這些信息存儲在Hadoop集群中進(jìn)行后續(xù)的分析統(tǒng)計工作。在平時網(wǎng)站的訪問量不大的情況下,Hadoop的上傳速度足夠支撐Flume的采集(一般Hadoop的上傳速度不大)。但是當(dāng)出現(xiàn)雙11等活動時,網(wǎng)站的訪問量急劇增加,F(xiàn)lume的采集速度也會隨之增加,此時Hadoop的上傳速度不足以支撐上傳海量的由Flume采集過來的數(shù)據(jù)。此時Kafka就誕生了,Kafka可以幫助我們先進(jìn)行數(shù)據(jù)緩存,Kafka有極強(qiáng)的數(shù)據(jù)處理能力,可以先將Flume采集的數(shù)據(jù)存放到Kafka集群中,然后讓Hadoop慢慢的從Kafka集群中上傳數(shù)據(jù)。(Flume采集過來的數(shù)據(jù)如果不及時取走很快就沒了)
?Kafka傳統(tǒng)定義:Kafka是一個分布式的基于發(fā)布/訂閱模式的消息隊列(Message Queue),主要應(yīng)用于大數(shù)據(jù)實時處理領(lǐng)域。發(fā)布/訂閱是指:消息的發(fā)布者Flume和Log不會將消息直接發(fā)送給特定的訂閱者Hadoop(這只有發(fā)布過程),而是先將消息發(fā)布給Kafka集群,Kafka將消息分成不同的類別(進(jìn)行分類整理),然后訂閱者Hadoop只接收感興趣的消息即可。有時發(fā)布者也稱生產(chǎn)者,訂閱者也稱消費(fèi)者。Kafka的本質(zhì)就是解決發(fā)布者與訂閱者處理數(shù)據(jù)不一致的問題。
?Kafka最新定義:Kafka是一個開源的分布式事件流平臺(Event Streaming Platform),被數(shù)千家公司用于高性能數(shù)據(jù)管道,流分析,數(shù)據(jù)集成和關(guān)鍵任務(wù)應(yīng)用。
2、消息隊列
(1)消息隊列應(yīng)用場景
傳統(tǒng)消息隊列的應(yīng)用場景:緩存/消峰、解耦和異步通信。緩存/消峰就是上面的例子。
解耦:允許你獨(dú)立的擴(kuò)展或修改數(shù)據(jù)源和目的地的處理過程,只要保證它們遵循同樣的接口約束,就和java中多態(tài)的作用一樣。實際開發(fā)環(huán)境中數(shù)據(jù)的數(shù)據(jù)源分布很廣mysql,sqlserver,Oracle,F(xiàn)lume...,如果沒有MQ,那么我們的程序想要獲取不同來源的數(shù)據(jù)就需要與不同數(shù)據(jù)源建立連接,通信,然后獲取數(shù)據(jù),關(guān)閉連接,這很麻煩。有了MQ可以將各個不同來源的數(shù)據(jù)全部存儲到MQ中,我們的程序直接從MQ里獲取即可,相當(dāng)于完成了解耦,可以隨意切換數(shù)據(jù)源。
?異步通信:允許用戶把一個消息放入隊列,但并不立刻處理它,而是在需要的時候在去處理它。
?上述例子告訴我們,登錄網(wǎng)站注冊信息,在寫入到數(shù)據(jù)庫以后,核心的任務(wù)已經(jīng)完成了,至于發(fā)不發(fā)短信不重要,同步方式的原則是必須把所有的事情一步一步做完,不能少了某個環(huán)節(jié)。異步方式的原則是把核心的任務(wù)做然后立刻響應(yīng)用戶,讓用戶放心,后續(xù)無關(guān)緊要的事情放到消息隊列中,讓其它人員慢慢完成即可。相當(dāng)于步驟124就是主線程,3以及發(fā)短信讓子線程慢慢去做。
(2)消息隊列的兩種模式
點對點模式:消費(fèi)者主動拉取數(shù)據(jù),消息收到后刪除數(shù)據(jù)(單主題)
?發(fā)布/訂閱模式:可以有多個topic主題(瀏覽,點贊,收藏,評論等),消費(fèi)者消費(fèi)數(shù)據(jù)之后,不刪除數(shù)據(jù)。每個消費(fèi)者相互獨(dú)立,都可以消費(fèi)到數(shù)據(jù)。
?3、Kafka的基礎(chǔ)架構(gòu)
?詳細(xì)解釋:Producer:消息的生產(chǎn)者,就是向Kafka cluster發(fā)消息的客戶端比如Flume,Producer可以對接各種數(shù)據(jù)源。
Consumer:就是消費(fèi)者,向Kafka cluster取消息的客戶端,比如Hadoop(分布式的)。
Consumer Group(CG):消息者組,由多個consumer組成,Kafka會將來源數(shù)據(jù)分區(qū)存儲在其集群中,因為數(shù)據(jù)量太大一臺服務(wù)器無法處理,顯然消費(fèi)者想要處理這么龐大的數(shù)據(jù)也不是單單一臺服務(wù)器可以處理的,比如Hadoop集群也是分布式的,spark,flink都是分布式的。
注意:A、消費(fèi)者組內(nèi)每個消費(fèi)者負(fù)責(zé)消費(fèi)不同分區(qū)的數(shù)據(jù),一個分區(qū)只能由一個組內(nèi)消費(fèi)者消費(fèi),組內(nèi)消費(fèi)者并行消費(fèi);B、消費(fèi)者組間互不影響;C、所有的消費(fèi)者都屬于某個消費(fèi)者組,即消費(fèi)者組是邏輯上的一個訂閱者。
Broker:表示一臺Kafka服務(wù)器,一個Kafka cluster由多個broker組成,一個broker可以容納多個topic。
Topic:就是一個隊列用于存儲具體的數(shù)據(jù),不同的topic表示不同類型,不同的類型表示不同的數(shù)據(jù)源,mysql、oracle、sqlserver等等。
Partition:一個很大的數(shù)據(jù)源一臺服務(wù)器存儲不下,可以存儲在多臺服務(wù)器上,一個topic可以分為多個partition,每個partition是一個有序的隊列。
Replica:副本,類似于Hadoop的副本機(jī)制,但還不完全相同,Hadoop的副本之間沒有主備關(guān)系,使用的時候隨便用即可。而在Kafka中一個topic的每個分區(qū)都有若干副本,其中有一個是leader和若干follower。
Leader:每個分區(qū)多個副本的“主”,生產(chǎn)者發(fā)送數(shù)據(jù)的對象,以及消費(fèi)者消費(fèi)數(shù)據(jù)的對象都是leader。
Follower:每個分區(qū)多個副本的“從”,實時從leader中同步數(shù)據(jù),保存與leader數(shù)據(jù)一致。leader發(fā)生故障時,利用zookeeper的leader選舉機(jī)制,某個follower會成為新的leader。zk的工作是記錄Kafka集群有那些服務(wù)器在線,記錄各個分區(qū)誰是leader,以及完成leader選舉機(jī)制。
注意:在Kafka2.8.0以前Kafka的啟動必須依賴zk,在2.8.0以后的版本中可以不依賴zk,它自己就能完成原來zk的工作,這就是Kafka的kraft模式。
二、Kafka的安裝與常見命令
1、Kafka的安裝
集群規(guī)劃:192.168.11.141(node11) :?zk:kafka
? ? ? ? ? ? ? ? ? 192.168.11.142(node22) : zk:kafka
? ? ? ? ? ? ? ? ? 192.168.11.143(node33) : zk:kafka
注意kafka與zookeeper一樣,不需要將每臺服務(wù)器都配置,比如當(dāng)前有100臺服務(wù)器,只需要在11臺服務(wù)器上配置zk和kafka即可。(zk服務(wù)器的臺數(shù)與kafka服務(wù)器的臺數(shù)不用必須相同,可以隨意靈活的配置)
下載Kafka():Apache Kafka
?在node11上執(zhí)行:
解壓:tar -zxvf kafka_2.12-3.0.0.tgz -C /export/server
ln -s /export/server/kafka_2.12-3.0.0 /export/server/kafka
cd /export/server/kafka
ll
?cd config
vim server.properties
? ? ? ? broker.id=0
? ? ? ? log.dirs=/export/server/kafka/datas? ? ? ? :? 設(shè)置log日志存儲位置
? ? ? ? # 設(shè)置連接的zk集群是什么,/kafka表示設(shè)置一個命名空間,后續(xù)所有的需要記錄在zk中的數(shù)據(jù)全部放在了/kafka/*
? ? ? ? zookeeper.connect=node11:2181,node22:2181,node33:2181/kafka
scp -r kafka_2.12-3.0.0 node22:`pwd`/
scp -r kafka_2.12-3.0.0 node33:`pwd`/
在node22、33上執(zhí)行
ln -s /export/server/kafka_2.12-3.0.0 /export/server/kafka
vim /export/server/kafka/config/server.properties
? ? ? ? 修改broker.id=1/2
在node11,22,33上配置path環(huán)境變量:
vim /etc/profile
? ? ? ? export KAFKA_HOME=/export/server/kafka
? ? ? ? export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile
chown -R hadoop:hadoop /export
su hadoop
zkServer.sh start? ? ? ? ? ? ? ? ? ?: 確保zookeeper集群已經(jīng)啟動
啟動kafka每臺服務(wù)器都需要執(zhí)行:kafka-server-start.sh -daemon /export/server/kafka/config/server.properties(kafka默認(rèn)對外提供的客戶端端口為9092)
?停止kafka每臺服務(wù)器都需要執(zhí)行(kafka的停止需要等一會兒):kafka-server.stop.sh
注意:服務(wù)器不用時必須先關(guān)kafka,然后等kafka的jps進(jìn)程沒了以后在關(guān)zk。如果先關(guān)了zk會導(dǎo)致kafka無法關(guān)閉,只能通過kill殺死。 (zk會存儲kafka的數(shù)據(jù),kafka在關(guān)閉前會向zk詢問一些事情,如果這時zk關(guān)了,將導(dǎo)致kafka無法關(guān)閉)
至此kafka就配置好并成功啟動了。但是上述配置是我們手動一個一個服務(wù)器的配置,如果服務(wù)器多了,那就太麻煩了,因此我們可以自行編寫shell腳本完成:scp分發(fā),jps命令查看,zk啟動,kafka啟動。最好把這些腳本放到同一個文件夾下面,并將該文件夾放入到PATH中。
創(chuàng)建一個/export/server/bin目錄用于存儲所有的shell腳本,然后配置PATH環(huán)境變量:
?vim /etc/profile? ? ? ? ? ? ? ? ?填入export PATH=$PATH:/export/server/bin
source?/etc/profile
chmod 755 shell腳本? ? ? ? 編寫完shell腳本后需要設(shè)置權(quán)限,使所有用戶都可以執(zhí)行。
scp分發(fā)shell:scpall 分發(fā)文件/目錄(可以使用該命令將所有腳本也分發(fā)到其他服務(wù)器上,使所有服務(wù)器都可以使用)
#!/bin/bash
#1. 判斷參數(shù)個數(shù)
# $#統(tǒng)計傳入?yún)?shù)個數(shù),-lt表示小于
if [ $# -lt 1 ]
then
echo Not Enough Arguement!
exit;
fi
# 至少輸入了一個參數(shù)
#2. 遍歷集群所有機(jī)器
for host in node11 node22 node33
do
echo ==================== $host ====================
#3. 遍歷所有目錄,挨個發(fā)送 $@表示傳入的參數(shù)列表kafka_2.12-3.0.0
for file in $@ # file = xxx目錄,xxx文件,......
do
#4. 判斷文件是否存在
if [ -e $file ]
then
#5. 獲取父目錄 cd -P:表示不走軟連接,有沒有-P都一樣
pdir=$(cd -P $(dirname $file); pwd) # kafka_2.12-3.0.0
#6. 獲取當(dāng)前文件的名稱
fname=$(basename $file) # bin
ssh $host "mkdir -p $pdir"
# rsync -av遠(yuǎn)程同步命令 -v, --verbose 詳細(xì)模式輸出
# -a, --archive 歸檔模式,表示以遞歸方式傳輸文件,并保持所有文件屬性
rsync -av $pdir/$fname $host:$pdir
else
echo $file does not exists!
fi
done
done
zk啟動shell:zk.sh
#!/bin/bash
case $1 in
"start"){
for i in node11 node22 node33
do
echo ---------------zookeeper $i start ---------------
ssh $i "/export/server/zookeeper/bin/zkServer.sh start"
done
}
;;
"stop"){
for i in node11 node22 node33
do
echo ---------------zookeeper $i stop ---------------
ssh $i "/export/server/zookeeper/bin/zkServer.sh stop"
done
}
;;
"status"){
for i in node11 node22 node33
do
echo ---------------zookeeper $i status ---------------
ssh $i "/export/server/zookeeper/bin/zkServer.sh status"
done
}
;;
"restart"){
for i in node11 node22 node33
do
echo ---------------zookeeper $i restart ---------------
ssh $i "/export/server/zookeeper/bin/zkServer.sh restart"
done
}
;;
esac
jps查看shell:jpsall
#!/bin/bash
for i in node11 node22 node33
do
echo ---------------$i jps information ---------------
ssh $i "/export/server/jdk/bin/jps"
done
建議對于執(zhí)行的程序?qū)懡^對路徑,這樣可以避免麻煩,如果我們這里寫ssh $i jps,可能會出現(xiàn):bash: jps: command not found的錯誤jps找不到,明明各個服務(wù)器jdk安裝了,PATH也設(shè)置了,直接運(yùn)行也可以運(yùn)行,使用ssh就不行了:
?這里需要注意:環(huán)境變量的配置有兩個地方,一個是/etc/profile文件,一個是~/.bashrc文件。如果直接執(zhí)行jps會自動從/etc/profile中讀取PATH,能正常執(zhí)行。而以ssh方式直接執(zhí)行jps實際讀取的是~/.bashrc這個文件里面的環(huán)境變量,~/.bashrc文件中并沒有進(jìn)行任何配置因此jps找不到。~/.bashrc文件設(shè)置的環(huán)境變量是針對某一個特定的用戶,使用ssh遠(yuǎn)程連接執(zhí)行jps隱含了有一個特定的用戶使用ssh遠(yuǎn)程連接執(zhí)行jps,此時讀取的是~/.bashrc文件中的環(huán)境變量。
?解決辦法就是在~/.bashrc文件中重新配置環(huán)境變量。把/etc/profile里面的環(huán)境變量追加到~/.bashrc文件中:
[root@node11 bin]# cat /etc/profile >> ~/.bashrc
[root@node22 bin]# cat /etc/profile >> ~/.bashrc
[root@node33 bin]# cat /etc/profile >> ~/.bashrc
?注意:?~/.bashrc文件是在~目錄下的,即每個用戶在自己的home目錄下都有這個文件。上述配置只配置了root home目錄下的.bashrc文件,切換到hadoop用戶下依舊無法使用。需要再次配置/home/hadoop/.bashrc文件。
?建議執(zhí)行的程序直接寫絕對路徑?。。。?!
kafka啟動shell:kf.sh
#!/bin/bash
case $1 in
"start"){
for i in node11 node22 node33
do
echo -------------------- $i start kafka --------------------
ssh $i "/export/server/kafka/bin/kafka-server-start.sh -daemon /export/server/kafka/config/server.properties"
done
}
;;
"stop"){
for i in node11 node22 node33
do
echo -------------------- $i stop kafka --------------------
ssh $i "/export/server/kafka/bin/kafka-server-stop.sh"
done
}
;;
esac
2、Kafka的命令行操作
Kafka有三種命令行操作,分別是針對Producer的kafka-console-producer.sh,針對Consumer的kafka-console-consumer.sh,針對Topic的kafka-topics.sh
(1)kafka-topics.sh
kafka-topics.sh? ? : 直接無法執(zhí)行,它會顯示該腳本的所有參數(shù)(幫助界面)
?kafka-topics.sh用于對Topic進(jìn)行操作,首先需要連接kafka集群(與zk一樣只需要連接一個kafka broker服務(wù)器即可,所做的操作都是共享的,底層就是一個Kafka消息隊列。實際生產(chǎn)環(huán)境中為了防止連接的Kafka broker掛掉通常連接兩個,使用逗號隔開。目前我們學(xué)習(xí)只連一個即可),然后選擇對那個topic進(jìn)行操作,操作一般是增刪改查。
(后續(xù)所有操作hadoop102就是node11,hadoop103是node22,hadoop104是node33等等)
?注意分區(qū)數(shù)只能增加,不能減少。
?注意命令行的方式無法修改副本數(shù)。后續(xù)可以通過其他方式進(jìn)行修改。
(2)kafka-console-producer.sh和kafka-console-consumer.sh
kafka-console-producer.sh
?kafka-console-consumer.sh?
?默認(rèn)情況下消費(fèi)者只能獲取連接以后生產(chǎn)者發(fā)來的數(shù)據(jù),歷史的數(shù)據(jù)無法獲?。ㄔ隽揩@取)。此時需要添加--from-beginning從頭開始消費(fèi)(全量獲?。?/p>
?三、Kafka的生產(chǎn)者
1、發(fā)送原理
?在生產(chǎn)者向kafka集群發(fā)送數(shù)據(jù)時,涉及到兩個線程--main線程和sender線程,main線程主要用于不斷的從生產(chǎn)者哪里獲取消息,并將消息發(fā)送給RecordAccumulator緩存。sender線程負(fù)責(zé)不斷的從RecordAccumulator中拉去滿足條件的消息(達(dá)到batch.size或linger.ms)發(fā)送到kafka集群。batch.size表示只有數(shù)據(jù)累計到batche.size之后sender才會去拉取并發(fā)送,默認(rèn)16K。linger.ms表示如果數(shù)據(jù)遲遲未達(dá)到batch.size,sender等待linger.ms設(shè)置的時間到了以后也會發(fā)送數(shù)據(jù),單位ms,默認(rèn)值為0ms,表示不等待來一個數(shù)據(jù)發(fā)一個數(shù)據(jù),此時batch.size設(shè)多少也沒用。在實際開發(fā)過程中這兩個參數(shù)需要靈活調(diào)整。
當(dāng)Kafka集群收到sender發(fā)來的消息時,它有3中應(yīng)答方式分別是acks = 0,acks = 1, acks = -1/all。0表示不響應(yīng)RecordAccumulator將數(shù)據(jù)發(fā)送以后就可以不管了,直接將數(shù)據(jù)清除,不用管數(shù)據(jù)是否發(fā)送成功。1表示生產(chǎn)者發(fā)來數(shù)據(jù),leader收到數(shù)據(jù)后應(yīng)答。-1表示生產(chǎn)者發(fā)來數(shù)據(jù),leader和follower都收到數(shù)據(jù)后應(yīng)答。
涉及參數(shù):
send(ProducerRecord):ProducerRecord里面記錄了發(fā)送的配置信息和發(fā)送的具體消息,比如發(fā)給kafka集群的那個topic,發(fā)給那個partition等等。
key和value:指定發(fā)送消息的類型。例如上述發(fā)的hello,只有value。key為空“”,都是String類型
key.serializer和value.serializer:指定發(fā)送消息的key和value的序列化類型,一般不使用jvm提供的序列化類型而是使用kafka自己提供的,如果消息的類型是String,那序列化類型就是StringSerializer。一定要寫全類名(見后面代碼)?
buffer.memory:RecordAccumulator緩存隊列總大小,默認(rèn)32M
batch.size:默認(rèn)16K。適當(dāng)增加該值,可以提高吞吐量,但不能設(shè)置太大,會導(dǎo)致數(shù)據(jù)傳輸延遲增加。
linger.ms:默認(rèn)0ms,一般建議設(shè)置為5-100ms之間
acks:默認(rèn)值-1
max.in.flight.requests.per.connection:允許最多沒有返回ack的次數(shù),默認(rèn)為5
enable.idempotence:是否開啟冪等性(保證數(shù)據(jù)傳輸不重復(fù)),默認(rèn)為true,開啟冪等后max.in.flight.requests.per.connection必須設(shè)置為1-5之間的數(shù)字
retries:重試次數(shù),默認(rèn)int最大值為2147483647。如果設(shè)置了重試,還要保證消息的有序性,需要設(shè)置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1(保證最大重試次數(shù)為1,即當(dāng)前消息重試時,其他消息的傳輸先等待),否則在重試當(dāng)前失敗消息時,其他消息可能發(fā)送成功了。
retry.backoff.ms:兩次重試之間的時間間隔,默認(rèn)100ms
compression.type:是否壓縮數(shù)據(jù),默認(rèn)為none,支持gzip、snappy、lz4、zstd。?
2、異步發(fā)送
顯然上述發(fā)送過程是異步發(fā)送的,main線程和sender線程是并行執(zhí)行的。main線程只負(fù)責(zé)不停的向RecordAccumulator緩存隊列中存儲消息,先把核心任務(wù)做了,至于后續(xù)將RecordAccumulator緩存隊列中的消息再發(fā)送到kafka集群中,由sender線程以它自己的速度和邏輯慢慢執(zhí)行即可。
接下來我們使用代碼演示一下異步發(fā)送過程。首先下載kafka-client-3.0.0.jar依賴包:
下載地址:Central Repository: org/apache/kafka/kafka-clients/3.0.0
然后將其整合到項目中。創(chuàng)建包:com.zxy.kafka.producer,創(chuàng)建CustomProducer類:?
普通異步發(fā)送:??
?注意:連接kafka集群只能通過:主機(jī)名:端口的方式,即使將主機(jī)名換成ip地址也不行。請確保自己的計算機(jī)配置了各linux主機(jī)的主機(jī)名映射。(kafka的配置文件必須要求只能通過主機(jī)名:端口方式連接)
如果出現(xiàn)hosts映射文件無法修改的情況(或修改后點擊保持顯示另存為界面),這是因為權(quán)限不足導(dǎo)致了,我們可以修改其權(quán)限:
我們在linux上開啟一個消費(fèi)者,然后運(yùn)行上述代碼,發(fā)現(xiàn)消費(fèi)者確實收到數(shù)據(jù)了。
?帶回調(diào)函數(shù)的異步發(fā)送:
回調(diào)函數(shù)會在prodecer收到ack時自動調(diào)用,為異步調(diào)用。該方法有兩個參數(shù):RecordMetadata元數(shù)據(jù)信息(主題、分區(qū)等信息)和Exception異常信息,如果Exception為null表示發(fā)送成功,如果Exception不為null表示發(fā)送失敗,消息發(fā)送失敗后會自動重試,不需要我們在回調(diào)函數(shù)中收到重試。?
?目前first設(shè)置了3個分區(qū)(0,1,2),為什么這里會將數(shù)據(jù)發(fā)給分區(qū)1呢?后續(xù)講解分區(qū)時會解釋。
3、同步發(fā)送
默認(rèn)情況下kafka就是異步發(fā)送方式,因為kafka的基本架構(gòu)就是異步方式的。當(dāng)然它也支持同步發(fā)送,即Kafka Producer生產(chǎn)者發(fā)來一批數(shù)據(jù)后,必須等到kafka集群收到數(shù)據(jù)且發(fā)來ack成功的消息后,Kafka Producer生產(chǎn)者才會再次發(fā)送下一批數(shù)據(jù)。
同步方式只需要在異步方式基礎(chǔ)上調(diào)用一下get方法即可:
為什么同步方式各個數(shù)據(jù)的分區(qū)不一樣,而異步方式卻都一樣呢??后續(xù)講解分區(qū)時會解釋。
4、生產(chǎn)者分區(qū)(Partitioner分區(qū)器)
分區(qū)可以合理的使用存儲資源,每個Partition在一個Broker上存儲,可以把海量的數(shù)據(jù)按照分區(qū)切割成一塊一塊數(shù)據(jù)(每塊1G)存儲在多臺Broker上。合理控制分區(qū)任務(wù),可以實現(xiàn)負(fù)載均衡。此外分區(qū)可以提高并行度,無論是生產(chǎn)者還是消費(fèi)者都可以以分區(qū)為單位對數(shù)據(jù)進(jìn)行發(fā)送/消費(fèi)。
?(1)默認(rèn)分區(qū)器DefaultPartitioner
Partitioner分區(qū)器有默認(rèn)分區(qū)器和自定義分區(qū)器兩種,上述我們寫代碼時并沒有設(shè)置分區(qū)器,系統(tǒng)使用了默認(rèn)分區(qū)器DefaultPartitioner,在IDEA中 Ctrl + n,全局查找DefaultPartitioner
注意這里設(shè)置的分區(qū)器是控制RecordAccumulator中的分區(qū),不同分區(qū)器的設(shè)置是在properties配置文件對象中配置,不寫就是使用默認(rèn)分區(qū)器。
?在ProducerRecord中我們可以設(shè)置存儲的分區(qū):
?上述同步異步發(fā)送演示代碼中,使用的就是最后一種方式,沒有指定分區(qū),也沒有key值,此時會使用黏性分區(qū)規(guī)則。但是同步和異步方式是有細(xì)微的差別的,默認(rèn)情況下linger.ms為0ms,對于異步方式,由于send發(fā)送數(shù)據(jù)的過程是在內(nèi)存中執(zhí)行的,執(zhí)行速度是極快的,上述循環(huán)發(fā)5次數(shù)據(jù)實質(zhì)上0.000xx納秒的時間就發(fā)過去了(系統(tǒng)認(rèn)為沒有超過0ms),因此執(zhí)行過程中沒有超時,是batch.size參數(shù)在生效,又由于傳輸?shù)臄?shù)據(jù)很小沒有使當(dāng)前batch存滿,因此所有數(shù)據(jù)是作為一個整體傳輸?shù)?,都存儲在一個分區(qū)上,隨機(jī)取的分區(qū)為1(sender線程只執(zhí)行了1次)。對于同步方式,必須按順序一個一個發(fā)送直到ack確認(rèn)后再發(fā)送下一個數(shù)據(jù),每send一個數(shù)據(jù)就會執(zhí)行一次sender線程,因此每發(fā)一個數(shù)據(jù)就重新選擇一個分區(qū),我們可以觀察到每次選擇的分區(qū)與上一次的分區(qū)不同。
總之:每執(zhí)行一次sender線程相當(dāng)于達(dá)到batch.size或linger.ms超時,就會重新隨機(jī)選擇一個分區(qū)。
測試:
?生產(chǎn)環(huán)境中使用key的情況還是很多的,比如當(dāng)前mysql中有很多表,要求將mysql中的student表放到kafka集群的某個分區(qū)上,這里只需要將student表的表名作為key傳輸即可。
?在這里每次發(fā)送一條數(shù)據(jù)停止2ms,此時就是linger.ms = 0ms在生效。每發(fā)一個數(shù)據(jù)都要重新隨機(jī)選擇新分區(qū),達(dá)到了數(shù)據(jù)打散的效果。但是發(fā)現(xiàn)重新選擇的分區(qū)存在與上一次分區(qū)相同的情況,這是為什么呢?經(jīng)過測試發(fā)現(xiàn)當(dāng)線程等待的時間比較小時,重新選擇的分區(qū)存在與上一次分區(qū)相同的情況;當(dāng)線程等待時間比較大時,重新選擇的分區(qū)與上一次分區(qū)不同(線程等待時間越長,數(shù)據(jù)打散程度越高)。原因不明
?(2)自定義分區(qū)器
在實際生產(chǎn)環(huán)境中默認(rèn)分區(qū)器是無法滿足所有的業(yè)務(wù)需求的,因此需要進(jìn)行自定義分區(qū)器。
特殊需求:自定義一個分區(qū)器實現(xiàn):如果發(fā)送來的數(shù)據(jù)包含”atguigu“就發(fā)往0號分區(qū),如果不包含”atguigui“就發(fā)往1號分區(qū)。
首先自己定義一個分區(qū)器MyPartitioner類,實現(xiàn)Partitioner接口:
?5、生產(chǎn)經(jīng)驗
(1)常見參數(shù)的經(jīng)驗配置
linger.ms:一般修改為5-100ms
batch.size:一般使用默認(rèn)值,或者設(shè)置為32k
compression.type :一般使用snappy壓縮方式
RecordAccumulator:一般使用默認(rèn),或設(shè)置為64M
?(2)如何保證數(shù)據(jù)傳輸?shù)耐耆煽?/h6>
acks應(yīng)答的級別有三種分別是0,1,-1。
0就是生產(chǎn)者發(fā)來數(shù)據(jù)之后不需要應(yīng)答,不用管數(shù)據(jù)發(fā)成功沒,直接不停的發(fā)就行了,這種方式效率肯定是最高的,但其可靠性是最差的,生產(chǎn)環(huán)境中一般不使用。
?1表示生產(chǎn)者發(fā)來的數(shù)據(jù),leader收到后就給生產(chǎn)者應(yīng)答。比0的可靠性高。
?-1表示生產(chǎn)者發(fā)來的數(shù)據(jù),leader收到并且所有follower也同步完后才給生產(chǎn)者應(yīng)答。可靠性最高,效率最低。
?思考:leader收到數(shù)據(jù),所有follower都開始同步數(shù)據(jù),-1級別要求:必須所有follower同步完成才應(yīng)答。但有一個follower因為某種故障,遲遲不能與leader進(jìn)行同步,此時會導(dǎo)致無法向生產(chǎn)者應(yīng)答,某個請求被卡死,影響系統(tǒng)性能。顯然在實際的生產(chǎn)環(huán)境中不可能因為某個follower故障而影響到整個集群的運(yùn)行。在kafka集群中每個Leader都維護(hù)了一個動態(tài)的ISR來解決上述問題。
ISR是一個存儲了leader? +? ?與leader保持正常同步的所有follower的集合(leader :0, isr:0,1,2)。如果某個follower長時間未向leader發(fā)送通信請求或同步數(shù)據(jù),則系統(tǒng)會認(rèn)為該follower死掉,并將其從isr中踢出。超時閾值由replica.lag.time.max.ms設(shè)置,默認(rèn)為30s。
有了這個機(jī)制,ack應(yīng)答的時候就不會死等長期聯(lián)系不上或者已經(jīng)故障的節(jié)點,它只會等待isr集合中記錄的沒問題的follower完成同步。
思考:添加完isr機(jī)制后就能保證完整的數(shù)據(jù)可靠傳輸呢?不能,因為當(dāng)分區(qū)副本數(shù)設(shè)為1,或ISR里應(yīng)答的最小副本數(shù)量(min.insync.replicas默認(rèn)為1)設(shè)置為1,此時就和ack=1的情況一樣了,仍然有丟失數(shù)據(jù)的風(fēng)險(leader :0,isr:0)
完整的數(shù)據(jù)可靠傳輸 = ACK級別設(shè)為-1? +?分區(qū)副本數(shù)>=2? ?+? ?ISR里應(yīng)答的最小副本數(shù)量>=2
?總結(jié):
?思考: ACK級別設(shè)為-1,能得到可靠性,那它有沒有別的問題呢(除了效率低之外)?可能會出現(xiàn)數(shù)據(jù)重復(fù):
?思考:數(shù)據(jù)重復(fù)問題如何解決呢?
(3)數(shù)據(jù)去重
進(jìn)行數(shù)據(jù)去重之前先了解以下幾個概念(kafka三種消息傳輸語義):
A:至少一次(At Least Once,生產(chǎn)者將數(shù)據(jù)發(fā)送給kafka集群后,kafka集群至少保證能正常收到1份數(shù)據(jù))=?完整的數(shù)據(jù)可靠傳輸 = ACK級別設(shè)為-1? +?分區(qū)副本數(shù)>=2? ?+? ?ISR里應(yīng)答的最小副本數(shù)量>=2
B、最多一次(At Most Once)= ACK級別設(shè)為0(至少一次可能會發(fā)生數(shù)據(jù)重復(fù)問題,最多一次一份數(shù)據(jù)最多發(fā)一次,可以保證數(shù)據(jù)不重復(fù))
?思考:與錢相關(guān)的業(yè)務(wù)能使用At Least Once或At Most Once嗎?顯然不能,如果使用At Least Once可能會發(fā)生銀行多給我轉(zhuǎn)了一次錢。如果使用At Most Once可能會發(fā)生自己卡里的錢莫名奇妙沒了。
C、精確一次(Exactly Once,對于一些非常重要的信息,比如與錢相關(guān)的數(shù)據(jù),要求數(shù)據(jù)既不能重復(fù)也不能丟失)= 冪等性 +?至少一次
什么是冪等性:指Producer無論向broker發(fā)送多少次重復(fù)數(shù)據(jù),broker端都只會持久化一條,保證了數(shù)據(jù)不重復(fù)。
思考:kafka集群如何知道當(dāng)前傳過來的數(shù)據(jù)是不是重復(fù)數(shù)據(jù)呢?
重復(fù)數(shù)據(jù)的判斷標(biāo)準(zhǔn):具有<PID,Partition,SeqNumber>相同主鍵的消息提交時,broker只會持久化一條。其中Kafka每次重新啟動都會分配一個新的PID(生產(chǎn)者id號);Partition表示分區(qū)編號;SeqNumber是單調(diào)遞增的。我們可以看出冪等性只能保證在單分區(qū)單會話內(nèi)不重復(fù)。
?思考:冪等性并不能跨越多個分區(qū)運(yùn)作,而事務(wù)可以彌補(bǔ)這個缺陷。事務(wù)可以保證對多個分區(qū)寫入操作的原子性。對多個分區(qū)的寫入操作要么全部成功要么全部失敗。Producer在使用事務(wù)功能前,必須先手動設(shè)置一個唯一的tansactional.id,有了事務(wù)id即使客戶端掛掉了,重啟后也能繼續(xù)處理未完成的事務(wù)。
注意:一旦使用了事務(wù)功能,那么每個broker上的Transaction Coordinator事務(wù)協(xié)調(diào)器就啟動了,事務(wù)協(xié)調(diào)器主要負(fù)責(zé)處理當(dāng)前事務(wù),那每個broker都有自己的Transaction Coordinator,此時需要選舉一個老大出來負(fù)責(zé)當(dāng)前的事務(wù)處理。選舉規(guī)則如上圖所示:假設(shè)transactional.id我們寫了“a”,它的hashcode值為97,97%50=47,即當(dāng)前要處理的事務(wù)屬于47號分區(qū):__transaction_state-Partition47,__transaction_state-Partition47-Leader所在的broker節(jié)點上的Transaction Coordinator就被選舉成為處理當(dāng)前事務(wù)的老大。接下來就可以處理事務(wù)了(了解)。
由于事務(wù)底層依賴冪等性,因此開啟事務(wù),必須開啟冪等性(enable.idempotence?= true默認(rèn)開啟)。
?(4)數(shù)據(jù)有序
數(shù)據(jù)有序是指:生產(chǎn)者發(fā)來數(shù)據(jù)A,B,C,D,E消費(fèi)者在讀取數(shù)據(jù)時還是A,B,C,D,E,保證與生產(chǎn)者發(fā)送數(shù)據(jù)順序一致。在實際生產(chǎn)環(huán)境中數(shù)據(jù)都是多分區(qū)存儲的。假設(shè)當(dāng)前有3個分區(qū)(分區(qū)0,1,2)。數(shù)據(jù)在發(fā)送過程中可能A,B存儲在分區(qū)0內(nèi),C,D存儲在分區(qū)1內(nèi),E存儲在分區(qū)2內(nèi)。消費(fèi)者在消費(fèi)數(shù)據(jù)時是以異步方式執(zhí)行的,很可能第二個消費(fèi)者先執(zhí)行完讀到C,D,然后第三個消費(fèi)者執(zhí)行完讀到E,最后第一個消費(fèi)者執(zhí)行完讀到A,B,最終讀到C,D,E,A,B。在某些應(yīng)用場景下(發(fā)送一篇文章)必須要求讀的數(shù)據(jù)與發(fā)送時的數(shù)據(jù)順序保持一致,這該這么解決呢?
?方案一:將Kafka cluster直接設(shè)置為單分區(qū)(有條件限制)
方案二:多分區(qū)情況下,先將所有的數(shù)據(jù)拉取到消費(fèi)者那里,然后統(tǒng)一進(jìn)行一次排序操作,最后由消費(fèi)者讀取排好序的數(shù)據(jù)。
(5)數(shù)據(jù)亂序問題
數(shù)據(jù)亂序問題是指:生產(chǎn)者向Kafka集群發(fā)送消息時,kafka收到的消息順序與生產(chǎn)者發(fā)送數(shù)據(jù)順序不一致。例如當(dāng)前kafka cluster有兩個broker。在發(fā)送數(shù)據(jù)時會有兩個請求隊列,每個請求隊列最多緩存5個請求。假設(shè)在broker0請求隊列中request1請求發(fā)送數(shù)據(jù)A,kafka cluster收到了并返回了ack信息,request1完成。然后request2請求發(fā)送數(shù)據(jù)B,也發(fā)送成功。request3請求發(fā)送數(shù)據(jù)C,發(fā)送失敗,系統(tǒng)自動進(jìn)行重試。此時broker0請求隊列中只有1個請求,因此還可以再發(fā)請求,request4請求發(fā)送數(shù)據(jù)D,在request3重試期間request4請求發(fā)送成功,然后request3才重試成功,此時broker0得到的數(shù)據(jù)是A,B,D,C,導(dǎo)致數(shù)據(jù)亂序(單分區(qū)亂序)。
?上述數(shù)據(jù)有序問題中解決方案一:將Kafka cluster直接設(shè)置為單分區(qū)是有條件限制的,如果出現(xiàn)了數(shù)據(jù)亂序問題,單分區(qū)內(nèi)的數(shù)據(jù)本身也亂序了,即使設(shè)置為單分區(qū),也無法保證消費(fèi)者獲得的數(shù)據(jù)有序。
解決方法:
1)在kafka1.x版本之前保證數(shù)據(jù)單分區(qū)內(nèi)有序,需要進(jìn)行如下配置
max.in.flight.requests.per.connection=1:最多每個broker緩存1個請求(不需要考慮是否開啟冪等性,在kafka1.x版本之前沒有冪等性功能)
2)在kafka1.x版本之后保證數(shù)據(jù)單分區(qū)內(nèi)有序,需要考慮是否開啟冪等性,配置如下:
? ? ? ? A、未開啟冪等性:max.in.flight.requests.per.connection=1
? ? ? ? B、開啟冪等性:max.in.flight.requests.per.connection只需要設(shè)置<=5,就能保證單分區(qū)內(nèi)有序。
????????主要原因是:啟動冪等后,kafka會自動緩存producer發(fā)來的最近(最多5個)5個request的元數(shù)據(jù)(PID,分區(qū),自增序列號),并根據(jù)自增序列號的順序依次在kafka cluster中落盤。假設(shè)request1發(fā)送數(shù)據(jù)A成功,它的序列號為1,成功落盤。request2發(fā)送數(shù)據(jù)B成功,序列號為2,成功羅盤。request3發(fā)送數(shù)據(jù)C失敗,序列號為3系統(tǒng)自動對request3進(jìn)行重試,此時request4發(fā)來數(shù)據(jù)D成功,但是它的序列號為4,系統(tǒng)識別到序列號為3的請求還沒落盤呢,request4只能等待。當(dāng)request3重試成功,數(shù)據(jù)落盤以后,request4發(fā)送的數(shù)據(jù)才落盤。相當(dāng)于有自動排序的功能。
?補(bǔ)充:
(1).var快捷輸入
(2)Ctrl + n快捷鍵文章來源:http://www.zghlxwxcb.cn/news/detail-822217.html
查看各種類的源碼文章來源地址http://www.zghlxwxcb.cn/news/detail-822217.html
到了這里,關(guān)于Kafka 基礎(chǔ)知識-01的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!