目錄
一、zookeeper
1.Zookeeper 定義
2.Zookeeper 工作機(jī)制
3.Zookeeper 特點(diǎn)
4.Zookeeper 數(shù)據(jù)結(jié)構(gòu)
5.Zookeeper 應(yīng)用場景
(1)統(tǒng)一命名服務(wù)
(2)統(tǒng)一配置管理
(3)統(tǒng)一集群管理
(4)服務(wù)器動(dòng)態(tài)上下線
6.Zookeeper 選舉機(jī)制
(1)第一次啟動(dòng)選舉機(jī)制
(2)非第一次啟動(dòng)選舉機(jī)制
7.部署zookeeper群集
二、消息隊(duì)列概述
1.為什么需要消息隊(duì)列(MQ)
2.使用消息隊(duì)列的好處
(1)解耦
(2)可恢復(fù)性
(3)緩沖
(4)靈活性和峰值處理能力
(5)異步通信
3.消息隊(duì)列的兩種模式
(1)點(diǎn)對點(diǎn)模式(一對一,消費(fèi)者主動(dòng)拉取數(shù)據(jù),消息收到后消息清除)
(2)發(fā)布/訂閱模式(一對多,又叫觀察者模式,消費(fèi)者消費(fèi)數(shù)據(jù)之后不會清除消息)
三、kafka概述
1.Kafka 定義
2.Kafka 簡介
3.Kafka 的特性
(1)高吞吐量、低延遲
(2)可擴(kuò)展性
(3)持久性、可靠性
(4)容錯(cuò)性
(5)高并發(fā)
4.Kafka 系統(tǒng)架構(gòu)
(1)Broker
(2)Topic
(3)Partition
(4)Replica
(5)Leader
(6)Follower????????
(7)Producer????????
(8)Consumer
(9)Consumer Group(CG)
(10)offset 偏移量
(11)Zookeeper
5.kafka的部署
6.kafka命令的使用
(1)創(chuàng)建topic
(2) 列出所有topic
(3)查看topic信息
(4) 發(fā)布消息
(5)消費(fèi)消息?
(6)修改指定topic分區(qū)數(shù)
(7)刪除指定topic
一、zookeeper
1.Zookeeper 定義
????????Zookeeper是一個(gè)開源的分布式的,為分布式框架提供協(xié)調(diào)服務(wù)的Apache項(xiàng)目。
2.Zookeeper 工作機(jī)制
????????Zookeeper從設(shè)計(jì)模式角度來理解:是一個(gè)基于觀察者模式設(shè)計(jì)的分布式服務(wù)管理框架,它負(fù)責(zé)存儲和管理大家都關(guān)心的數(shù)據(jù),然后接受觀察者的注冊,一旦這些數(shù)據(jù)的狀態(tài)發(fā)生變化,Zookeeper就將負(fù)責(zé)通知已經(jīng)在Zookeeper上注冊的那些觀察者做出相應(yīng)的反應(yīng)。也就是說 Zookeeper = 文件系統(tǒng) + 通知機(jī)制。
3.Zookeeper 特點(diǎn)
(1)一個(gè)領(lǐng)導(dǎo)者(Leader),多個(gè)跟隨者(Follower)組成的集群。
(2)Zookeeper集群中只要有半數(shù)以上節(jié)點(diǎn)存活,Zookeeper集群就能正常服務(wù)。所以Zookeeper適合安裝奇數(shù)(>=3)臺服務(wù)器。
(3)全局?jǐn)?shù)據(jù)一致:每個(gè)Server保存一份相同的數(shù)據(jù)副本,Client無論連接到哪個(gè)Server,數(shù)據(jù)都是一致的。
(4)更新請求順序執(zhí)行,來自同一個(gè)Client的更新請求按其發(fā)送順序依次執(zhí)行,即先進(jìn)先出
(5)數(shù)據(jù)更新原子性,一次數(shù)據(jù)更新要么成功,要么失敗。
(6)實(shí)時(shí)性,在一定時(shí)間范圍內(nèi),Client能讀到最新數(shù)據(jù)。
4.Zookeeper 數(shù)據(jù)結(jié)構(gòu)
????????ZooKeeper數(shù)據(jù)模型的結(jié)構(gòu)與Linux文件系統(tǒng)很類似,整體上可以看作是一棵樹,每個(gè)節(jié)點(diǎn)稱做一個(gè)ZNode。每一個(gè)ZNode默認(rèn)能夠存儲1MB的數(shù)據(jù),每個(gè)ZNode都可以通過其路徑唯一標(biāo)識.
5.Zookeeper 應(yīng)用場景
????????提供的服務(wù)包括:統(tǒng)一命名服務(wù)、統(tǒng)一配置管理、統(tǒng)一集群管理、服務(wù)器節(jié)點(diǎn)動(dòng)態(tài)上下線、軟負(fù)載均衡等。
(1)統(tǒng)一命名服務(wù)
????????在分布式環(huán)境下,經(jīng)常需要對應(yīng)用/服務(wù)進(jìn)行統(tǒng)一命名,便于識別。例如:IP不容易記住,而域名容易記住。
(2)統(tǒng)一配置管理
????????分布式環(huán)境下,配置文件同步非常常見。一般要求一個(gè)集群中,所有節(jié)點(diǎn)的配置信息是一致的,比如Kafka集群。對配置文件修改后,希望能夠快速同步到各個(gè)節(jié)點(diǎn)上。
????????配置管理可交由ZooKeeper實(shí)現(xiàn)??蓪⑴渲眯畔懭隯ooKeeper上的一個(gè)Znode。各個(gè)客戶端服務(wù)器監(jiān)聽這個(gè)Znode。一旦 Znode中的數(shù)據(jù)被修改,ZooKeeper將通知各個(gè)客戶端服務(wù)器。
(3)統(tǒng)一集群管理
????????分布式環(huán)境中,實(shí)時(shí)掌握每個(gè)節(jié)點(diǎn)的狀態(tài)是必要的。可根據(jù)節(jié)點(diǎn)實(shí)時(shí)狀態(tài)做出一些調(diào)整。
????????ZooKeeper可以實(shí)現(xiàn)實(shí)時(shí)監(jiān)控節(jié)點(diǎn)狀態(tài)變化??蓪⒐?jié)點(diǎn)信息寫入ZooKeeper上的一個(gè)ZNode。監(jiān)聽這個(gè)ZNode可獲取它的實(shí)時(shí)狀態(tài)變化。
(4)服務(wù)器動(dòng)態(tài)上下線
????????客戶端能實(shí)時(shí)洞察到服務(wù)器上下線的變化。
(5)軟負(fù)載均衡
????????在Zookeeper中記錄每臺服務(wù)器的訪問數(shù),讓訪問數(shù)最少的服務(wù)器去處理最新的客戶端請求。(類似于調(diào)度算法中的最小連接)
6.Zookeeper 選舉機(jī)制
SID:服務(wù)器ID。用來唯一標(biāo)識一臺ZooKeeper集群中的機(jī)器,每臺機(jī)器不能重復(fù),和myid一致。
ZXID:事務(wù)ID。ZXID是一個(gè)事務(wù)ID,用來標(biāo)識一次服務(wù)器狀態(tài)的變更。在某一時(shí)刻,集群中的每臺機(jī)器的ZXID值不一定完全一致,這和ZooKeeper服務(wù)器對于客戶端“更新請求”的處理邏輯速度有關(guān)。
Epoch:每個(gè)Leader任期的代號。沒有Leader時(shí)同一輪投票過程中的邏輯時(shí)鐘值是相同的。每投完一次票這個(gè)數(shù)據(jù)就會增加
(1)第一次啟動(dòng)選舉機(jī)制
? ? ? ? 服務(wù)器1啟動(dòng),發(fā)起一次選舉。服務(wù)器1投自己一票。此時(shí)服務(wù)器1票數(shù)一票,不夠半數(shù)以上(3票),選舉無法完成,服務(wù)器1狀態(tài)保持為LOOKING;
? ? ? ? 服務(wù)器2啟動(dòng),再發(fā)起一次選舉。服務(wù)器1和2分別投自己一票并交換選票信息:此時(shí)服務(wù)器1發(fā)現(xiàn)服務(wù)器2的myid比自己目前投票推舉的(服務(wù)器1)大,更改選票為推舉服務(wù)器2。此時(shí)服務(wù)器1票數(shù)0票,服務(wù)器2票數(shù)2票,沒有半數(shù)以上結(jié)果,選舉無法完成,服務(wù)器1,2狀態(tài)保持LOOKING。
? ? ? ? 服務(wù)器3啟動(dòng),發(fā)起一次選舉。此時(shí)服務(wù)器1和2都會更改選票為服務(wù)器3。此次投票結(jié)果:服務(wù)器1為0票,服務(wù)器2為0票,服務(wù)器3為3票。此時(shí)服務(wù)器3的票數(shù)已經(jīng)超過半數(shù),服務(wù)器3當(dāng)選Leader。服務(wù)器1,2更改狀態(tài)為FOLLOWING,服務(wù)器3更改狀態(tài)為LEADING;
? ? ? ? 服務(wù)器4啟動(dòng),發(fā)起一次選舉。此時(shí)服務(wù)器1,2,3已經(jīng)不是LOOKING狀態(tài),不會更改選票信息。交換選票信息結(jié)果:服務(wù)器3為3票,服務(wù)器4為1票。此時(shí)服務(wù)器4服從多數(shù),更改選票信息為服務(wù)器3,并更改狀態(tài)為FOLLOWING;
????????服務(wù)器5啟動(dòng),同4一樣做FOLLOWING。
(2)非第一次啟動(dòng)選舉機(jī)制
? ? ? ? 當(dāng)ZooKeeper 集群中的一臺服務(wù)器出現(xiàn)以下兩種情況之一時(shí),就會開始進(jìn)入Leader選舉:
- 服務(wù)器初始化啟動(dòng)。
- 服務(wù)器運(yùn)行期間無法和Leader保持連接。
????????而當(dāng)一臺機(jī)器進(jìn)入Leader選舉流程時(shí),當(dāng)前集群也可能會處于以下兩種狀態(tài):
- 集群中本來就已經(jīng)存在一個(gè)Leader。
????????對于已經(jīng)存在Leader的情況,機(jī)器試圖去選舉Leader時(shí),會被告知當(dāng)前服務(wù)器的Leader信息,對于該機(jī)器來說,僅僅需要和 Leader機(jī)器建立連接,并進(jìn)行狀態(tài)同步即可。
- 集群中確實(shí)不存在Leader。
????????假設(shè)ZooKeeper由5臺服務(wù)器組成,SID分別為1、2、3、4、5,ZXID分別為8、8、8、7、7,并且此時(shí)SID為3的服務(wù)器是Leader。某一時(shí)刻,3和5服務(wù)器出現(xiàn)故障,因此開始進(jìn)行Leader選舉。
選舉Leader規(guī)則
1.Epoch大的直接勝出
2.Epoch相同,事務(wù)id大的勝出
3.事務(wù)id相同,服務(wù)器id大的勝出
7.部署zookeeper群集
準(zhǔn)備 3 臺服務(wù)器做 Zookeeper 集群
192.168.116.20
192.168.116.30
192.168.116.40
三臺安裝jdk環(huán)境
yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
下載安裝包,官方下載地址:Index of /dist/zookeeper
將下載好的安裝包放在/opt目錄,解壓到/usr/local/zookeeper/
cd /opt
tar zxvf apache-zookeeper-3.5.7-bin.tar.gz
mv apache-zookeeper-3.5.7-bin /usr/local/zookeeper/
修改配置文件
cd /usr/local/zookeeper/conf/
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg
tickTime=2000 #通信心跳時(shí)間,Zookeeper服務(wù)器與客戶端心跳時(shí)間,單位毫秒
initLimit=10 #Leader和Follower初始連接時(shí)能容忍的最多心跳數(shù)(tickTime的數(shù)量),這里表示為10*2s
syncLimit=5 #Leader和Follower之間同步通信的超時(shí)時(shí)間,這里表示如果超過5*2s,Leader認(rèn)為Follwer死掉,并從服務(wù)器列表中刪除Follwer
dataDir=/usr/local/zookeeper/data #修改,指定保存Zookeeper中的數(shù)據(jù)的目錄,目錄需要單獨(dú)創(chuàng)建
dataLogDir=/usr/local/zookeeper/logs #添加,指定存放日志的目錄,目錄需要單獨(dú)創(chuàng)建
clientPort=2181 #客戶端連接端口
#添加集群信息
server.1=192.168.116.10:3188:3288
server.2=192.168.116.20:3188:3288
server.3=192.168.116.30:3188:3288
集群信息字段解釋
server.A=B:C:D
????????A是一個(gè)數(shù)字,表示這個(gè)是第幾號服務(wù)器。集群模式下需要在zoo.cfg中dataDir指定的目錄下創(chuàng)建一個(gè)文件myid,這個(gè)文件里面有一個(gè)數(shù)據(jù)就是A的值,Zookeeper啟動(dòng)時(shí)讀取此文件,拿到里面的數(shù)據(jù)與zoo.cfg里面的配置信息比較從而判斷到底是哪個(gè)server。
????????B是這個(gè)服務(wù)器的地址。
????????C是這個(gè)服務(wù)器Follower與集群中的Leader服務(wù)器交換信息的端口。
????????D是萬一集群中的Leader服務(wù)器掛了,需要一個(gè)端口來重新進(jìn)行選舉,選出一個(gè)新的Leader,而這個(gè)端口就是用來執(zhí)行選舉時(shí)服務(wù)器相互通信的端口。
將配置好的文件傳到另外兩臺機(jī)器
scp /usr/local/zookeeper/conf/zoo.cfg 192.168.116.20:/usr/local/zookeeper/conf/
scp /usr/local/zookeeper/conf/zoo.cfg 192.168.116.30:/usr/local/zookeeper/conf/
在每個(gè)節(jié)點(diǎn)上創(chuàng)建數(shù)據(jù)目錄和日志目錄
mkdir /usr/local/zookeeper/data
mkdir /usr/local/zookeeper/logs
在每個(gè)節(jié)點(diǎn)的dataDir指定的目錄下創(chuàng)建一個(gè) myid 的文件?
echo 1 > /usr/local/zookeeper/data/myid
echo 2 > /usr/local/zookeeper/data/myid
echo 3 > /usr/local/zookeeper/data/myid
開啟zookeeper
二、消息隊(duì)列概述
1.為什么需要消息隊(duì)列(MQ)
????????主要原因是由于在高并發(fā)環(huán)境下,同步請求來不及處理,請求往往會發(fā)生阻塞。比如大量的請求并發(fā)訪問數(shù)據(jù)庫,導(dǎo)致行鎖表鎖,最后請求線程會堆積過多,從而觸發(fā) too many connection 錯(cuò)誤,引發(fā)雪崩效應(yīng)。
????????我們使用消息隊(duì)列,通過異步處理請求,從而緩解系統(tǒng)的壓力。消息隊(duì)列常應(yīng)用于異步處理,流量削峰,應(yīng)用解耦,消息通訊等場景。
當(dāng)前比較常見的 MQ 中間件有 ActiveMQ、RabbitMQ、RocketMQ、Kafka 等。
2.使用消息隊(duì)列的好處
(1)解耦
????????允許你獨(dú)立的擴(kuò)展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。
(2)可恢復(fù)性
????????系統(tǒng)的一部分組件失效時(shí),不會影響到整個(gè)系統(tǒng)。消息隊(duì)列降低了進(jìn)程間的耦合度,所以即使一個(gè)處理消息的進(jìn)程掛掉,加入隊(duì)列中的消息仍然可以在系統(tǒng)恢復(fù)后被處理。
(3)緩沖
????????有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過系統(tǒng)的速度,解決生產(chǎn)消息和消費(fèi)消息的處理速度不一致的情況。
(4)靈活性和峰值處理能力
????????在訪問量劇增的情況下,應(yīng)用仍然需要繼續(xù)發(fā)揮作用,但是這樣的突發(fā)流量并不常見。如果為以能處理這類峰值訪問為標(biāo)準(zhǔn)來投入資源隨時(shí)待命無疑是巨大的浪費(fèi)。
????????使用消息隊(duì)列能夠使關(guān)鍵組件頂住突發(fā)的訪問壓力(例如雙十一),而不會因?yàn)橥话l(fā)的超負(fù)荷的請求而完全崩潰。
(5)異步通信
????????很多時(shí)候,用戶不想也不需要立即處理消息。消息隊(duì)列提供了異步處理機(jī)制,允許用戶把一個(gè)消息放入隊(duì)列,但并不立即處理它。想向隊(duì)列中放入多少消息就放多少,然后在需要的時(shí)候再去處理它們。
3.消息隊(duì)列的兩種模式
(1)點(diǎn)對點(diǎn)模式(一對一,消費(fèi)者主動(dòng)拉取數(shù)據(jù),消息收到后消息清除)
????????消息生產(chǎn)者生產(chǎn)消息發(fā)送到消息隊(duì)列中,然后消息消費(fèi)者從消息隊(duì)列中取出并且消費(fèi)消息。消息被消費(fèi)以后,消息隊(duì)列中不再有存儲,所以消息消費(fèi)者不可能消費(fèi)到已經(jīng)被消費(fèi)的消息。消息隊(duì)列支持存在多個(gè)消費(fèi)者,但是對一個(gè)消息而言,只會有一個(gè)消費(fèi)者可以消費(fèi)。
(2)發(fā)布/訂閱模式(一對多,又叫觀察者模式,消費(fèi)者消費(fèi)數(shù)據(jù)之后不會清除消息)
????????消息生產(chǎn)者(發(fā)布)將消息發(fā)布到 topic 中,同時(shí)有多個(gè)消息消費(fèi)者(訂閱)消費(fèi)該消息。和點(diǎn)對點(diǎn)方式不同,發(fā)布到 topic 的消息會被所有訂閱者消費(fèi)。
????????發(fā)布/訂閱模式是定義對象間一種一對多的依賴關(guān)系,使得每當(dāng)一個(gè)對象(目標(biāo)對象)的狀態(tài)發(fā)生改變,則所有依賴于它的對象(觀察者對象)都會得到通知并自動(dòng)更新。
三、kafka概述
1.Kafka 定義
????????Kafka 是一個(gè)分布式的基于發(fā)布/訂閱模式的消息隊(duì)列(MQ,Message Queue),主要應(yīng)用于大數(shù)據(jù)實(shí)時(shí)處理領(lǐng)域。
2.Kafka 簡介
????????Kafka 是最初由 Linkedin 公司開發(fā),是一個(gè)分布式、支持分區(qū)的(partition)、多副本的(replica),基于 Zookeeper 協(xié)調(diào)的分布式消息中間件系統(tǒng),它的最大的特性就是可以實(shí)時(shí)的處理大量數(shù)據(jù)以滿足各種需求場景,比如基于 hadoop 的批處理系統(tǒng)、低延遲的實(shí)時(shí)系統(tǒng)\Spark/Flink 流式處理引擎,nginx 訪問日志,消息服務(wù)等等,用 scala 語言編寫,Linkedin 于 2010 年貢獻(xiàn)給了 Apache 基金會并成為頂級開源項(xiàng)目。
3.Kafka 的特性
(1)高吞吐量、低延遲
????????Kafka 每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒。每個(gè) topic 可以分多個(gè) Partition,Consumer Group 對 Partition 進(jìn)行消費(fèi)操作,提高負(fù)載均衡能力和消費(fèi)能力。
(2)可擴(kuò)展性
????????kafka 集群支持熱擴(kuò)展。
(3)持久性、可靠性
????????消息被持久化到本地磁盤,并且支持?jǐn)?shù)據(jù)備份防止數(shù)據(jù)丟失。
(4)容錯(cuò)性
????????允許集群中節(jié)點(diǎn)失?。ǘ喔北厩闆r下,若副本數(shù)量為 n,則允許 n-1 個(gè)節(jié)點(diǎn)失敗)。
(5)高并發(fā)
????????支持?jǐn)?shù)千個(gè)客戶端同時(shí)讀寫。
4.Kafka 系統(tǒng)架構(gòu)
(1)Broker
????????一臺 kafka 服務(wù)器就是一個(gè) broker。一個(gè)集群由多個(gè) broker 組成。一個(gè) broker 可以容納多個(gè) topic。
(2)Topic
????????可以理解為一個(gè)隊(duì)列,生產(chǎn)者和消費(fèi)者面向的都是一個(gè) topic。類似于數(shù)據(jù)庫的表名或者 ES 的 index 。物理上不同 topic 的消息分開存儲。
(3)Partition
????????為了實(shí)現(xiàn)擴(kuò)展性,一個(gè)非常大的 topic 可以分布到多個(gè) broker(即服務(wù)器)上,一個(gè) topic 可以分割為一個(gè)或多個(gè) partition,每個(gè) partition 是一個(gè)有序的隊(duì)列。Kafka 只保證 partition 內(nèi)的記錄是有序的,而不保證 topic 中不同 partition 的順序。
????????每個(gè) topic 至少有一個(gè) partition,當(dāng)生產(chǎn)者產(chǎn)生數(shù)據(jù)的時(shí)候,會根據(jù)分配策略選擇分區(qū),然后將消息追加到指定的分區(qū)的隊(duì)列末尾。
Partation 數(shù)據(jù)路由規(guī)則
- 指定了 patition,則直接使用;
- 未指定 patition 但指定 key(相當(dāng)于消息中某個(gè)屬性),通過對 key 的 value 進(jìn)行 hash 取模,選出一個(gè) patition;
- patition 和 key 都未指定,使用輪詢選出一個(gè) patition。
????????每條消息都會有一個(gè)自增的編號,用于標(biāo)識消息的偏移量,標(biāo)識順序從 0 開始。
????????每個(gè) partition 中的數(shù)據(jù)使用多個(gè) segment 文件存儲。
????????如果 topic 有多個(gè) partition,消費(fèi)數(shù)據(jù)時(shí)就不能保證數(shù)據(jù)的順序。嚴(yán)格保證消息的消費(fèi)順序的場景下(例如商品秒殺、 搶紅包),需要將 partition 數(shù)目設(shè)為 1。
- broker 存儲 topic 的數(shù)據(jù)。如果某 topic 有 N 個(gè) partition,集群有 N 個(gè) broker,那么每個(gè) broker 存儲該 topic 的一個(gè) partition。
- 如果某 topic 有 N 個(gè) partition,集群有 (N+M) 個(gè) broker,那么其中有 N 個(gè) broker 存儲 topic 的一個(gè) partition, 剩下的 M 個(gè) broker 不存儲該 topic 的 partition 數(shù)據(jù)。
- 如果某 topic 有 N 個(gè) partition,集群中 broker 數(shù)目少于 N 個(gè),那么一個(gè) broker 存儲該 topic 的一個(gè)或多個(gè) partition。在實(shí)際生產(chǎn)環(huán)境中,盡量避免這種情況的發(fā)生,這種情況容易導(dǎo)致 Kafka 集群數(shù)據(jù)不均衡。
分區(qū)的原因
- 方便在集群中擴(kuò)展,每個(gè)Partition可以通過調(diào)整以適應(yīng)它所在的機(jī)器,而一個(gè)topic又可以有多個(gè)Partition組成,因此整個(gè)集群就可以適應(yīng)任意大小的數(shù)據(jù)了;
- 可以提高并發(fā),因?yàn)榭梢砸訮artition為單位讀寫了。
(4)Replica
????????副本,為保證集群中的某個(gè)節(jié)點(diǎn)發(fā)生故障時(shí),該節(jié)點(diǎn)上的 partition 數(shù)據(jù)不丟失,且 kafka 仍然能夠繼續(xù)工作,kafka 提供了副本機(jī)制,一個(gè) topic 的每個(gè)分區(qū)都有若干個(gè)副本,一個(gè) leader 和若干個(gè) follower。
(5)Leader
????????每個(gè) partition 有多個(gè)副本,其中有且僅有一個(gè)作為 Leader,Leader 是當(dāng)前負(fù)責(zé)數(shù)據(jù)的讀寫的 partition。
(6)Follower????????
????????Follower 跟隨 Leader,所有寫請求都通過 Leader 路由,數(shù)據(jù)變更會廣播給所有 Follower,F(xiàn)ollower 與 Leader 保持?jǐn)?shù)據(jù)同步。Follower 只負(fù)責(zé)備份,不負(fù)責(zé)數(shù)據(jù)的讀寫。
????????如果 Leader 故障,則從 Follower 中選舉出一個(gè)新的 Leader。
????????當(dāng) Follower 掛掉、卡住或者同步太慢,Leader 會把這個(gè) Follower 從 ISR(Leader 維護(hù)的一個(gè)和 Leader 保持同步的 Follower 集合) 列表中刪除,重新創(chuàng)建一個(gè) Follower。
(7)Producer????????
????????生產(chǎn)者即數(shù)據(jù)的發(fā)布者,該角色將消息 push 發(fā)布到 Kafka 的 topic 中。broker 接收到生產(chǎn)者發(fā)送的消息后,broker 將該消息追加到當(dāng)前用于追加數(shù)據(jù)的 segment 文件中。
? ? ? ? 生產(chǎn)者發(fā)送的消息,存儲到一個(gè) partition 中,生產(chǎn)者也可以指定數(shù)據(jù)存儲的 partition。
(8)Consumer
????????消費(fèi)者可以從 broker 中 pull 拉取數(shù)據(jù)。消費(fèi)者可以消費(fèi)多個(gè) topic 中的數(shù)據(jù)。
(9)Consumer Group(CG)
????????消費(fèi)者組,由多個(gè) consumer 組成。所有的消費(fèi)者都屬于某個(gè)消費(fèi)者組,即消費(fèi)者組是邏輯上的一個(gè)訂閱者??蔀槊總€(gè)消費(fèi)者指定組名,若不指定組名則屬于默認(rèn)的組。
????????將多個(gè)消費(fèi)者集中到一起去處理某一個(gè) Topic 的數(shù)據(jù),可以更快的提高數(shù)據(jù)的消費(fèi)能力。
????????消費(fèi)者組內(nèi)每個(gè)消費(fèi)者負(fù)責(zé)消費(fèi)不同分區(qū)的數(shù)據(jù),一個(gè)分區(qū)只能由一個(gè)組內(nèi)消費(fèi)者消費(fèi),防止數(shù)據(jù)被重復(fù)讀取。消費(fèi)者組之間互不影響。
(10)offset 偏移量
????????可以唯一的標(biāo)識一條消息。偏移量決定讀取數(shù)據(jù)的位置,不會有線程安全的問題,消費(fèi)者通過偏移量來決定下次讀取的消息(即消費(fèi)位置)。
????????消息被消費(fèi)之后,并不被馬上刪除,這樣多個(gè)業(yè)務(wù)就可以重復(fù)使用 Kafka 的消息。
????????某一個(gè)業(yè)務(wù)也可以通過修改偏移量達(dá)到重新讀取消息的目的,偏移量由用戶控制。????????
????????消息最終還是會被刪除的,默認(rèn)生命周期為 1 周(7*24小時(shí))。
(11)Zookeeper
????????Kafka 通過 Zookeeper 來存儲集群的 meta 信息。
????????由于 consumer 在消費(fèi)過程中可能會出現(xiàn)斷電宕機(jī)等故障,consumer 恢復(fù)后,需要從故障前的位置的繼續(xù)消費(fèi),所以 consumer 需要實(shí)時(shí)記錄自己消費(fèi)到了哪個(gè) offset,以便故障恢復(fù)后繼續(xù)消費(fèi)。
????????Kafka 0.9 版本之前,consumer 默認(rèn)將 offset 保存在 Zookeeper 中;從 0.9 版本開始,consumer 默認(rèn)將 offset 保存在 Kafka 一個(gè)內(nèi)置的 topic 中,該 topic 為 __consumer_offsets。
????????也就是說,zookeeper的作用就是,生產(chǎn)者push數(shù)據(jù)到kafka集群,就必須要找到kafka集群的節(jié)點(diǎn)在哪里,這些都是通過zookeeper去尋找的。消費(fèi)者消費(fèi)哪一條數(shù)據(jù),也需要zookeeper的支持,從zookeeper獲得offset,offset記錄上一次消費(fèi)的數(shù)據(jù)消費(fèi)到哪里,這樣就可以接著下一條數(shù)據(jù)進(jìn)行消費(fèi)。
5.kafka的部署
下載安裝包,官方下載地址:Apache Kafka
解壓到/usr/local/kafka
tar xf kafka_2.13-2.7.1.tgz
mv kafka_2.13-2.7.1 /usr/local/kafka/
修改服務(wù)配置文件server.properties
vim server.properties
broker.id=0 #21行,broker的全局唯一編號,每個(gè)broker不能重復(fù),因此要在其他機(jī)器上配置 id=1和2
listeners=PLAINTEXT://192.168.116.10:9092 ●31行,指定監(jiān)聽的IP和端口,如果修改每個(gè)broker的IP需區(qū)分開來,也可保持默認(rèn)配置不用修改
num.network.threads=3 #42行,broker 處理網(wǎng)絡(luò)請求的線程數(shù)量,一般情況下不需要去修改
num.io.threads=8 #45行,用來處理磁盤IO的線程數(shù)量,數(shù)值應(yīng)該大于硬盤數(shù)
socket.send.buffer.bytes=102400 #48行,發(fā)送套接字的緩沖區(qū)大小
socket.receive.buffer.bytes=102400 #51行,接收套接字的緩沖區(qū)大小
socket.request.max.bytes=104857600 #54行,請求套接字的緩沖區(qū)大小
log.dirs=/usr/local/kafka/logs #60行,kafka運(yùn)行日志存放的路徑,也是數(shù)據(jù)存放的路徑
num.partitions=1 #65行,topic在當(dāng)前broker上的默認(rèn)分區(qū)個(gè)數(shù),會被topic創(chuàng)建時(shí)的指定參數(shù)覆蓋
num.recovery.threads.per.data.dir=1 #69行,用來恢復(fù)和清理data下數(shù)據(jù)的線程數(shù)量
log.retention.hours=168 #103行,segment文件(數(shù)據(jù)文件)保留的最長時(shí)間,單位為小時(shí),默認(rèn)為7天,超時(shí)將被刪除
log.segment.bytes=1073741824 #110行,一個(gè)segment文件最大的大小,默認(rèn)為 1G,超出將新建一個(gè)新的segment文件
zookeeper.connect=192.168.116.10:2181,192.168.116.20:2181,192.168.116.30:2181 #123行,配置連接Zookeeper集群地址
添加環(huán)境變量
//修改環(huán)境變量
vim /etc/profile
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
添加kafka到系統(tǒng)服務(wù)
vim /etc/init.d/kafka
#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME='/usr/local/kafka'
case $1 in
start)
echo "---------- Kafka 啟動(dòng) ------------"
${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)
echo "---------- Kafka 停止 ------------"
${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)
$0 stop
$0 start
;;
status)
echo "---------- Kafka 狀態(tài) ------------"
count=$(ps -ef | grep kafka | egrep -cv "grep|$$")
if [ "$count" -eq 0 ];then
echo "kafka is not running"
else
echo "kafka is running"
fi
;;
*)
echo "Usage: $0 {start|stop|restart|status}"
esac
#設(shè)置開機(jī)自啟
chmod +x /etc/init.d/kafka
chkconfig --add kafka
#分別啟動(dòng) Kafka
service kafka start
6.kafka命令的使用
(1)創(chuàng)建topic
使用kafka-topics.sh
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? --create:執(zhí)行創(chuàng)建,topic端口為2181
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??--zookeeper:定義 zookeeper 集群服務(wù)器地址,如果有多個(gè) IP 地址使用逗號分割,一般使用一個(gè) IP 即可
????????????????????????????????--replication-factor:定義分區(qū)副本數(shù),1 代表單副本,建議為 2?
????????????????????????????????--partitions:定義分區(qū)數(shù)?
????????????????????????????????--topic:定義 topic 名稱
(2) 列出所有topic
????????????????????????????????--list:列出當(dāng)前所有topic
(3)查看topic信息
????????????????????????????????--describe:查看topic的信息
????????????????????????????????--topic:指定topic名,不指定顯示所有
(4) 發(fā)布消息
使用?kafka-console-producer.sh
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? --broker-list:指定代理節(jié)點(diǎn),發(fā)布端口為9092
(5)消費(fèi)消息?
使用?kafka-console-consumer.sh
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??--bootstrap-server:指定消費(fèi)服務(wù)器地址,消費(fèi)端口也是9092
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??--from-beginning:會把主題中以往所有的數(shù)據(jù)都讀取出來
但是如果實(shí)時(shí)發(fā)布和讀?。窗l(fā)布一條消費(fèi)一條),那么順序就能保證;還有前文提到,嚴(yán)格保證消息的消費(fèi)順序的場景下(例如商品秒殺、 搶紅包),需要將 partitions 數(shù)目設(shè)為 1。
(6)修改指定topic分區(qū)數(shù)
使用kafka-topics.sh
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?--alter:執(zhí)行修改
注意:像上一點(diǎn)提到的,修改 partitions 為1,不能使用此方法,只能刪除此topic然后重建,是因?yàn)樾薷囊汛嬖趖opic時(shí),partitions 的數(shù)目只允許增。例:
(7)刪除指定topic
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? --delete:執(zhí)行刪除文章來源:http://www.zghlxwxcb.cn/news/detail-629259.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-629259.html
到了這里,關(guān)于zookeeper+kafka分布式消息隊(duì)列集群的部署的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!