-
方法入?yún)ventType指定觸發(fā)事件的類型:data變更還是childNodes變更等。
-
將KeeperState、EventType、Path new 到WatchedEvent實例中
-
根據(jù)Path路徑從watchTable中找到所有對應(yīng)的Watcher,為空continue,不為空時再iterator.remove掉,這樣Watcher觸發(fā)一次就失效了
-
從Map中找到的所有Watcher執(zhí)行一遍其process方法,服務(wù)端Watcher的實際類型是ServerCnxn,其process方法(NIOServerCnxn#process)內(nèi)容:
- 創(chuàng)建ReplyHeader,設(shè)置zxid=-1表示當(dāng)前是一個通知
- 調(diào)用WatchedEvent#getWrapper方法將WatchedEvent包裝成WatcherEvent(類似BeanUtil#copyProperties),便于網(wǎng)絡(luò)傳輸
- 調(diào)用sendResponse方法向客戶端發(fā)送通知
ZK借助當(dāng)前客戶端TCP連接的ServerCnxn對象實現(xiàn)對客戶端的WatchedEvent傳遞,Watcher的回調(diào)和業(yè)務(wù)處理都在客戶端執(zhí)行。
客戶端回調(diào)Watcher
ClientCnxn.SendThread#readResponse()方法中接收并處理服務(wù)端響應(yīng),以xid=-1(表示通知事件)為例:
- 調(diào)用WatcherEvent#deserialize方法對接收到的字節(jié)流進行反序列化,轉(zhuǎn)換成WatcherEvent對象
- 處理chrootPath:如果客戶端設(shè)置了chrootPath屬性,需要修改反序列化得到的Event#path
- WatcherEvent 再轉(zhuǎn)成 WatchedEvent
- 交給ClientCnxn中的EventThread處理,放入隊列,等待下一輪詢周期中進行Watchcer回調(diào)
客戶端回調(diào)Watcher
-
處理邏輯位于ClientCnxn.SendThread#readResponse(ByteBuffer incomingBuffer),轉(zhuǎn)成WatchedEvent最終會交給ClientCnxn.EventThread#queueEvent 處理。
-
queueEvent方法通過ZKWatchManager#materialize方法(入?yún)?KeeperState、EventType、String path)從ZKWatchManager中取出所有相關(guān)Watcher。
-
在對于EventType=NodeDataChanged或NodeCreated的處理中,調(diào)用了addTo方法,先通過Map#remove API從 注冊的dataWatches和existWatches中移除指定path的Set并將其返回,放入結(jié)果result中進行返回。這里的操作說明客戶端的Wathcer機制同樣是一次性的。
-
queueEvent會將取出的Watcher集合放入EventThread的LinkedBlockingQueue waitingEvents字段中,其run方法中對LinkBlockingQueue進行while true式的不斷take傳遞給ClientCnxn.EventThread#processEvent處理:
-
判斷出類型是WatcherSetEventPair,取出Set字段for循環(huán)式調(diào)用其process方法,這個process方法就是用戶處理業(yè)務(wù)邏輯的回調(diào)方法了綜上分析,ZK的Watcher具有以下特性:
-
一次性:這樣可以減輕服務(wù)端的通知壓力。如果一個Watcher一直有效,對于頻繁更新的節(jié)點,服務(wù)端會不斷向客戶端發(fā)送事件通知,對于服務(wù)端的網(wǎng)絡(luò)和性能都是一個挑戰(zhàn)。
-
客戶端串行執(zhí)行:客戶端取出Set時逐個(串行)同步回調(diào),這樣可以保證順序。但要注意Wathcer處理邏輯的異常捕獲
-
Wathcer注冊輕量化:客戶端注冊Watcher不會將Watcher實例傳遞到Server端,僅僅在客戶端請求中使用boolean類型屬性標記是否Watch,服務(wù)端也僅僅只保存了當(dāng)前連接的ServerCnxn對象。
-
Wathcer通知輕量化:WatchedEvent是ZK整個Watcher通知機制的最小通知單元,該對象只有3個字段:KeeperStat、EventType、String path,只會告訴客戶端發(fā)生了事件,而事件的具體內(nèi)容需要客戶端發(fā)起查詢請求。
-
ACL:保障數(shù)據(jù)安全
ZooKeeper內(nèi)部存儲的分布式系統(tǒng)的狀態(tài)信息需要保障數(shù)據(jù)安全,這需要借助ACL權(quán)限控制機制。
在Unix\Linux文件系統(tǒng)中廣泛使用的權(quán)限控制方式是UGO(User\Group\Others)權(quán)限控制機制,這是一種粗粒度的文件系統(tǒng)權(quán)限控制模式。ACL訪問控制列表可以實現(xiàn)更細粒度的權(quán)限控制,Linux 2.6內(nèi)核已開始支持這一特性。
ZK的ACL機制通常使用 ?schema:id:permission
來標識一個有效的ACL信息:
-
權(quán)限模式 Schema:有四種權(quán)限模式:
- IP(IPAuthenticationProvider):如 ip:192.168.1.10,或按網(wǎng)段配置 ip:192.168.1.1/24 表示 192.168.0.* 這個IP段
- Digest(DigestAuthenticationProvider):最常用的權(quán)限控制模式,以 username:password 的形式,ZK內(nèi)部通過DigestAuthenticationProvider.generateDigest static 方法進行編碼
- World:即 world:anyone 特殊的Digest模式,節(jié)點對所有用戶開放
- Super:超級用戶可以對ZK上的任意數(shù)據(jù)節(jié)點進行任何操作
-
授權(quán)對象 ID:上述每種Schema對應(yīng)的ID分別是 192.168.1.10、192.168.1.1/24、username:BASE64(SHA-1(username:password))、anyone
-
權(quán)限Permission:通過權(quán)限檢查后可以被允許執(zhí)行的操作:CREATE、DELETE、READ、WRITE、ADMIN-數(shù)據(jù)節(jié)點的管理員權(quán)限,允許授權(quán)對象對該數(shù)據(jù)節(jié)點進行ACL相關(guān)的設(shè)置操作
自定義權(quán)限控制 Pluggable ZooKeeper Authentication:需要用戶實現(xiàn) AuthenticationProvider 接口,通過配置ZK啟動參數(shù) -Dzookeeper.authProvider.1=com.zkbook.CustomAuthenticationProvider
或 通過配置文件 zoo.cfg 添加 authProvider.1=com.zklearn.CustomAuthenticationProvider
。對于權(quán)限控制器的注冊,ZK采用了延遲加載的策略,只有在第一次處理包含權(quán)限控制的客戶端請求時,才會進行權(quán)限控制器的初始化。ZK會將所有的權(quán)限控制器注冊到ProviderRegistry中,邏輯位于 ProviderRegistry#initialize 方法中,對 zookeeper.authProvider. 這個屬性進行了解析
使用zkCli進行ACL操作
ZooKeeper ZkCli 官方文檔
# 創(chuàng)建節(jié)點時指定ACL
# -e 臨時節(jié)點 -s sequential節(jié)點,digest固定開頭,crwd表示權(quán)限,支持 create read write delete admin
> create -e /zklearn/c4 data_content digest:userJ:passJ:crwd
Created /zklearn/c4
# 查看節(jié)點權(quán)限
> getAcl /zklearn/c3
'world,'anyone
: cdrwa
# 修改節(jié)點權(quán)限
# 已設(shè)置acl的path再setAcl就不行了,需要開啟super權(quán)限
> setAcl path acl
序列化與協(xié)議
Jute序列化反序列化
OutputArchive和InputArchive分別是Jute底層的序列化器和反序列化器的接口定義,最新的實現(xiàn)有 BinaryXXputArchive、CsvXXputArchive、XmlXXputArchive。無論哪種實現(xiàn),都是基于OutputStream和InputStream進行操作。
通信協(xié)議
ZK基于TCP/IP協(xié)議實現(xiàn)了自己的通信協(xié)議,進行客戶端與服務(wù)端、服務(wù)端與服務(wù)端的網(wǎng)絡(luò)通信。
ZK請求的數(shù)據(jù)包
以 獲取節(jié)點數(shù)據(jù)請求 GetDataRequest 為例
請求頭 RequestHeader
/*
?zk中的許多類是jute proto文件定義的,通過JavaGenerator生成的源碼
?這些proto為了正反序列化以便網(wǎng)絡(luò)傳輸,需要實現(xiàn)Jute的Record接口
?同時這些類的注解里會有這樣一行 File generated by hadoop record compiler. Do not edit.
*/
public class RequestHeader implements Record {
?private int xid;// 記錄客戶端請求發(fā)起的先后序號,確保單個客戶端請求的響應(yīng)順序
?private int type;// 請求的操作類型,定義在ZooDefs.OpCode中:創(chuàng)建節(jié)點 OpCode.create-1;刪除節(jié)點 OpCode.delete-2;獲取節(jié)點數(shù)據(jù) OpCode.getDate-4;
請求體 Request
// ConnectRequest 會話創(chuàng)建
public class ConnectRequest implements Record {
?private int protocolVersion;//協(xié)議版本號
?private long lastZxidSeen;//最近一次收到的服務(wù)器ZXID
?private int timeOut;// 會話超時時間
?private long sessionId;// 會話標識
?private byte[] passwd;// 會話密碼
// GetDataRequest 獲取節(jié)點數(shù)據(jù)
public class GetDataRequest implements Record {
?private String path;//
?private boolean watch;//是否注冊 Watcher
// SetDataRequest 更新節(jié)點數(shù)據(jù)
public class SetDataRequest implements Record {
?private String path;// 數(shù)據(jù)節(jié)點的節(jié)點路徑
?private byte[] data;//數(shù)據(jù)內(nèi)容
?private int version;//節(jié)點數(shù)據(jù)的期望版本號
請求體的抓包分析
使用WireShark嗅探GetDataRequest產(chǎn)生的TCP包(十六進制字節(jié)數(shù)組)
十六進制位 | 協(xié)議部分 | 數(shù)值或字符串 |
---|---|---|
00,00,00,1d | 0-3位:len 整個數(shù)據(jù)包長度 | 長度29 |
00,00,00,01 | 4-7位:xid 客戶端請求的發(fā)起序號 | 1 |
00,00,00,04 | 8-11位:type 客戶端請求類型 | 4 OpCode.getData |
00,00,00,10 | 12-15位:len 節(jié)點路徑的長度 | 16 節(jié)點路徑長度轉(zhuǎn)換成十六進制是16位 |
2f,24,37,5f, 32,5f,34,2f, 67,65,74,5f, 64,61,74,61 | 16-31位:path 節(jié)點路徑 | Hex編碼 |
01 | 32位:是否注冊Watcher | 1-是 |
響應(yīng)
GetDataResponse響應(yīng)完整協(xié)議定義
???????
響應(yīng)頭 ReplyHeader
public class ReplyHeader implements Record {
?private int xid; // 請求時傳過來的xid會在響應(yīng)中原樣返回
?private long zxid; // zxid 代表ZK服務(wù)器上當(dāng)前最新事務(wù)ID
?private int err; // 錯誤碼:Code.OK-0,NONODE-101,NOAUTH-102,定義在KeeperException.Code中
響應(yīng)體Response
//會話創(chuàng)建
public class ConnectResponse implements Record {
?private int protocolVersion;
?private int timeOut;
?private long sessionId;
?private byte[] passwd;
// 獲取節(jié)點數(shù)據(jù)
public class GetDataResponse implements Record {
?private byte[] data;
?private org.apache.zookeeper.data.Stat stat;
// 更新節(jié)點數(shù)據(jù)
public class SetDataResponse implements Record {
?private org.apache.zookeeper.data.Stat stat;
GetDataResponse 協(xié)議定義
十六進制位 | 協(xié)議解釋 | 當(dāng)前值 |
---|---|---|
00,00,00,63 | 0-3位:len 整個響應(yīng)的數(shù)據(jù)包長度 | 99 |
00,00,00,05 | 4-7位:xid 客戶端請求序號 | 5 本次請求所屬會話創(chuàng)建后的第5次請求 |
00,00,00,00, 00,00,00,04 | 8-15位: zxid 當(dāng)前服務(wù)器處理過的最大ZXID | 4 |
00,00,00,00 | 16-19位:err 錯誤碼 | 0-Codes.OK |
00,00,00,0b | 20-23位:len 節(jié)點數(shù)據(jù)內(nèi)容的長度 | 11 后面11位是數(shù)據(jù)內(nèi)容的字節(jié)數(shù)組 |
xxx | 24-34位:data 節(jié)點數(shù)據(jù)內(nèi)容 | Hex編碼 |
00,00,00,00, 00,00,00,04 | 35-42位:czxid 創(chuàng)建該節(jié)點時的ZXID | 4 |
00,00,00,00, 00,00,00,04 | 43-50位:mzxid 最后一次訪問該數(shù)據(jù)節(jié)點時的ZXID | 4 |
00,00,01,43,67,bd,0e,08 | 51-58位:ctime 數(shù)據(jù)節(jié)點的創(chuàng)建時間 | unix_timestamp 1389014879752 |
00,00,01,43,67,bd,0e,08 | 59-66位:mtime 數(shù)據(jù)節(jié)點最后一次變更的時間 | |
00,00,00,00 | 67-70位:version 數(shù)據(jù)節(jié)點內(nèi)容的版本號 | 0 |
00,00,00,00 | 71-74位:cversion 數(shù)據(jù)節(jié)點的子版本號 | 0 |
00,00,00,00 | 75-78位:aversion 數(shù)據(jù)節(jié)點的ACL變更版本號 | 0 |
00,00,00,00,00,00,00,00 | 79-86位:ephemeralOwner 如果是臨時節(jié)點,則記錄創(chuàng)建該節(jié)點的sessionID,否則置0 | 0 (該節(jié)點是永久節(jié)點) |
00,00,00,0b | 87-90位:dataLength 數(shù)據(jù)節(jié)點的數(shù)據(jù)內(nèi)容長度 | 11 |
00,00,00,00 | 91-94位:numChildren 數(shù)據(jù)節(jié)點的子節(jié)點個數(shù) | 0 |
00,00,00,00,00,00,00,04 | 95-102位:pzxid 最后一次對子節(jié)點列表變更的ZXID | 4 |
ZK客戶端
ZK客戶端的組成:ZooKeeper實例-客戶端入口,HostProvider - 客戶端地址列表管理器,ClientCnxn-客戶端核心線程,內(nèi)部包含SendThread和EventThread兩個線程。前者是一個IO線程,負責(zé)ZooKeeper客戶端和服務(wù)器端間的網(wǎng)絡(luò)IO通信,后者是一個事件線程,負責(zé)對服務(wù)端事件進行處理。
ZK會話的創(chuàng)建過程
初始化階段
- 初始化ZK對象,通過調(diào)用ZooKeeper的構(gòu)造方法實例化,在此過程中會創(chuàng)建客戶端Watcher管理器 ClientWatcherManager
- 設(shè)置會話默認Watcher:如果在構(gòu)造方法中傳入了一個Watcher對象,客戶端會將這個對象作為默認Watcher保存在ClientWatcherManager中
- 構(gòu)造ZooKeeper服務(wù)器地址列表管理器 HostProvider:對于構(gòu)造函數(shù)傳入的服務(wù)器地址,客戶端會將其存放在服務(wù)器地址列表管理器HostProvider中
- 創(chuàng)建并初始化客戶端網(wǎng)絡(luò)連接器 ClientCnxn:ClientCnxn連接器的底層IO處理器是ClientCnxnSocket。另外還會初始化客戶端兩個核心隊列 outgoingQueue 和 pendingQueue 分別作為客戶端的請求發(fā)送隊列和服務(wù)端響應(yīng)的等待隊列。
- 初始化SendThread和EventThread:前者管理客戶端與服務(wù)端之間的所有網(wǎng)絡(luò)IO,后者用于客戶端的事件處理
會話創(chuàng)建階段
- 啟動SendThread和EventThread
- 獲取一個服務(wù)器地址:開始創(chuàng)建TCP連接前,SendThread從HostProvider中隨機選擇一個地址,調(diào)用ClientCnxnSocket 創(chuàng)建與ZK服務(wù)器之間的TCP連接
- 創(chuàng)建TCP長連接
- 構(gòu)造ConnectRequest請求:SendThread根據(jù)當(dāng)前客戶端的實際設(shè)置,構(gòu)造出一個ConnectRequest請求,代表了客戶端視圖與服務(wù)端創(chuàng)建一個會話。同時ZK客戶端會將請求包裝成IO層的Packet對象放入請求發(fā)送隊列outgoingQueue中
- 發(fā)送請求:ClientCnxnSocket從outgoingQueue中取出一個待發(fā)送的Pocket對象序列化成ByteBuffer發(fā)送到服務(wù)端
響應(yīng)處理階段
- 接收并處理服務(wù)端響應(yīng):ClientCnxnSocket接收到服務(wù)端的響應(yīng)后,會首先判斷當(dāng)前客戶端狀態(tài)是否是已初始化,才進行反序列化,得到ConnectResponse對象,從中獲取ZK服務(wù)端分配的sessionID
- 連接成功:通知SendThread進一步對客戶端進行會話參數(shù)的設(shè)置:readTimeout\connectTimeout,更新客戶端狀態(tài)。通知HostProvider當(dāng)前成功連接的服務(wù)器地址
- 生成事件 SyncConnected - None:為了讓上層應(yīng)用感知到會話的成功創(chuàng)建,SendThread會生成該事件傳遞給EventThread,通知會話創(chuàng)建成功
- 查詢Watcher:EventThread線程收到事件后,會從ClientWatchManager中獲取對應(yīng)Watcher,針對SyncConnected-None事件找到默認的Wathcer,放入EventThread的waitingEvents隊列中
- 處理事件:EventThread不斷從waitingEvents隊列中取出待處理的Watcher對象,調(diào)用process方法觸發(fā)Watcher
connectString解析
connectString 形如 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181,ZK客戶端允許將服務(wù)器所有地址配置在字符上,ZK客戶端在連接服務(wù)器的過程中是如何從服務(wù)器列表中選擇機器的?是順序?還是隨機?
org.apache.zookeeper.client.ConnectStringParser 中的構(gòu)造方法對connectString進行的處理有:解析chrootPath + 保存服務(wù)器地址列表到 ArrayList serverAddresses
chroot 客戶端命名空間
ZK3.2.0 之后的版本中添加了該特性,connectString 可 設(shè)置為 192.168.0.1:2181,192.168.0.2:2181/apps/domainName,將解析出chroot=/apps/domainName,這樣客戶端的所有操作都會限制在這個命名空間下
ZooKeeper.java
private static HostProvider createDefaultHostProvider(String connectString) {
??return new StaticHostProvider(new ConnectStringParser(connectString).getServerAddresses());
}
解析的結(jié)果會返回 地址列表管理器 StaticHostProvider 的構(gòu)造方法中
HostProvider
HostProvider 提供了客戶端連接所需的host,每一個實現(xiàn)該接口的類需要確保下述幾點:
- next() 方法必須有效的InetSocketAddress,這樣迭代器能一直運行下去。必須返回解析過的InetSocketAddress實例
- size() 方法不能返回0
public interface HostProvider {
? ?//當(dāng)前服務(wù)器地址列表的個數(shù),不能返回0
? ?int size();
? ?// 獲取下一個將要連接的InetSocketAddress,spinDelay 表示所有地址都嘗試過后的等待時間
? ?InetSocketAddress next(long spinDelay);
? ?//連接成功后的回調(diào)方法
? ?void onConnected();
? ?//更新服務(wù)器列表,返回是否需要改變連接用于負載均衡
? ?boolean updateServerList(Collection<InetSocketAddress> serverAddresses, InetSocketAddress currentHost);
}
解析服務(wù)器地址:StaticHostProvider會解析服務(wù)器地址放入serverAddress 集合中,同時使用Collections#shuffle方法將服務(wù)器地址列表進行隨機打散。
獲取可用的服務(wù)器地址:StaticHostProvider#next() 方法中將隨機排序后的服務(wù)器地址列表拼成一個環(huán)形循環(huán)隊列,該過程是一次性的。
HostProvider的實現(xiàn):自動從配置文件中讀取服務(wù)器地址列表、動態(tài)變更的地址列表管理器(定時從配置管理中心上解析ZK服務(wù)器地址)、實現(xiàn)服務(wù)調(diào)用時同機房優(yōu)先的策略
ClientCnxn 網(wǎng)絡(luò)IO
ClientCnxn維護客戶端與服務(wù)器之間的網(wǎng)絡(luò)連接并進行通信
Packet是ClientCnxn的內(nèi)部類,定義:
static class Packet {
? ? ? ?RequestHeader requestHeader;
? ? ? ?ReplyHeader replyHeader;
? ? ? ?Record request;
? ? ? ?Record response;
? ? ? ?ByteBuffer bb;
? ? ? ?String clientPath;
? ? ? ?//server視角下的path,chroot不同
? ? ? ?String serverPath;
? ? ? ?boolean finished;
? ? ? ?AsyncCallback cb;
? ? ? ?Object ctx;
? ? ? ?WatchRegistration watchRegistration;
? ? ? ?public boolean readOnly;
? ? ? ?WatchDeregistration watchDeregistration;
? ?//并不是Packet中的所有字段都進行網(wǎng)絡(luò)傳輸,在createBB方法中定義了用于網(wǎng)絡(luò)傳輸?shù)腂yteBuffer bb字段的生成邏輯
? ?//里面只用到了RequestHeader requestHeader,Record request,boolean readOnly 3個字段
? ?public void createBB() {}
}
ClientCnxn的兩個核心隊列(都是Packet隊列):
- outgoingQueue:客戶端的請求發(fā)送隊列,存儲要發(fā)送到服務(wù)端的Packet集合
- pendingQueue:服務(wù)端響應(yīng)的等待隊列,存儲已經(jīng)從客戶端發(fā)送到服務(wù)端但需要等待服務(wù)端響應(yīng)的Packer集合
ClientCnxnSocket
ZK3.4之后ClientCnxnSocket從ClientCnxn中提取了出來,便于對底層Socket進行擴展(如使用Netty實現(xiàn))
通過系統(tǒng)變量配合ClientCnxnSocket實現(xiàn)類的全類名:-Dzookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNIO
ClientCnxnSocketNIO是ClientCnxnSocket的Java NIO原生實現(xiàn)
會話Session
【分布式】Zookeeper會話 - leesf - 博客園
會話狀態(tài)有:CONNECTING CONNECTED RECONNECTING RECONNECTED CLOSE
Session是ZK中的會話實體,代表一個客戶端會話,包含以下4個基本屬性:
- sessionID 唯一標識一個會話,每次客戶端創(chuàng)建新會話時,ZK會為其分配一個全局唯一的sessionID
- timeout 會話超時時間,客戶端構(gòu)造ZK實例時會傳入sessionTImeout指定會話的超時時間,客戶端向服務(wù)器發(fā)送這個超時時間后,服務(wù)器會根據(jù)自己的超時限定確定會話的超時時間
- tickTime 下次會話超時時間點,這個參數(shù)用于會話管理的分桶策略執(zhí)行。TickTIme是一個13位的long型(unix_timestamp)
- isClosing 服務(wù)端檢測到一個會話失效后會標記其isClosing=true,這樣就不再處理來自該會話的新請求了
sessionID的生成原理
代碼位于 SessionTrackerImpl#initializeNextSession
//最終返回的sessionID:高8位是傳入的id,剩下的56位最后16位被置零了,前面的40位是最高位截掉的timestamp(去掉數(shù)字1)
public static long initializeNextSessionId(long id) {
?long nextSid;
?// nanoTime/10^6 就是 currentTimeMillis 13位long型,long型占空間8B,共64位
?//如 1657349408123 對應(yīng) 44 位的二進制是 00011000000111110001101110010000010101111011
?//左移24位后再右移8位后的結(jié)果:00000000(-8位)1000000111110001101110010000010101111011(16位-)0000000000000000
?//注意這個右移8位是無符號右移,防止unixtimes第5位是1帶來的負數(shù)問題
?nextSid = (System.nanoTime() / 1000000 << 24) >>> 8;
?//添加機器標識 sid 正好補在前面騰出的8位中
?nextSid = nextSid | (id << 56);
?if (nextSid == EphemeralType.CONTAINER_EPHEMERAL_OWNER) {
? ?++nextSid; ?// this is an unlikely edge case, but check it just in case
}
?return nextSid;
}
左移24位可以將高位的1去掉(unixTimestamp轉(zhuǎn)二進制的44位數(shù)字開頭總是0001),防止負數(shù)(負數(shù)右移8位后最高位的1不變),sid不能明確得出
SessionTracker
ZK服務(wù)端的會話管理器,負責(zé)會話的創(chuàng)建、管理和清理,使用3個數(shù)據(jù)結(jié)構(gòu)管理Session:
- sessionsById:ConcurrentHashMap<Long, SessionImpl>類型,根據(jù)sessionID管理Session實體
- sessionsWithTimeout:ConcurrentMap<Long, Integer> 根據(jù)sessionID管理會話的超時時間,定期被持久化到快照文件中
- sessionSets:ExpiryQueue sessionExpiryQueue 服務(wù)于會話管理和超時檢查,分桶策略會用到
Session管理 - 分桶策略
ZK的會話管理主要由SessionTracker負責(zé),其采用了分桶策略:將理論上可以在同一時間點超時的會話放在同一區(qū)塊中,便于進行會話的隔離處理和同一區(qū)塊的統(tǒng)一管理。
對于一個會話的超時時間理論上就是客戶端設(shè)置的超時時間之后,即圖中的 ExpirationTime = CurrentTime + sessionTimeout(客戶端進行設(shè)置),這樣到達這個ExpirationTime檢查各會話是否真的需要置超時狀態(tài)
但是ZK服務(wù)端檢查各區(qū)塊的會話是否超時是有周期的,如每隔 ExpirationInterval 進行檢查,這樣實際的 ExpirationTime 是在原數(shù)值之后的最近一個周期上進行檢查,這樣
ExpirationTime_Adjust = ((CurrentTime + sessionTimeout) / ExpirationInterval + 1) * ExpirationInterval (單位均是ms)
如對于當(dāng)前時間為4,,10 超時,檢查周期為3,在15的時候才是第一個可能的超時時間。這樣 ExpirationTime_Adjust 總是 ExpirationInterval 的整數(shù)倍。這樣SessionTracker中的會話超時檢查線程就可以在 ExpirationInterval 的整數(shù)倍的時間點上對會話進行批量清理(未及時移走的會話都是要被清理掉的,沒有客戶端觸發(fā)會話激活)
會話激活
Leader服務(wù)器收到客戶端的心跳消息PING后:
- 檢查改會話是否是isClose
- 如果會話尚未關(guān)閉,則激活會話,計算出會話的下一次超時時間點 ExpirationTime_NEW
- 根據(jù)會話的舊超時時間點 ExpirationTime_Old 定位到會話所在的區(qū)塊
- 遷移會話,將會話放入 ExpirationTime_NEW 對應(yīng)的新區(qū)塊中
觸發(fā)會話激活的兩種場景:
- 只要客戶端向服務(wù)器發(fā)送請求(不論讀/寫)就會觸發(fā)一次會話激活
- 客戶端在sessionTimeout / 3 的時間間隔內(nèi)沒有向服務(wù)器發(fā)出任何請求,就會主動發(fā)起一次PING請求觸發(fā)會話激活
會話清理的步驟
- 先將該會話的isClosing置為true,這樣在會話清理期間再收到客戶端的新請求就返回 Session_Expire,再標記會話狀態(tài)為已關(guān)閉 - CLOSE
- 發(fā)起會話關(guān)閉 請求給 PrepRequestProcessor處理器進行處理
- 根據(jù)sessionID從內(nèi)存數(shù)據(jù)庫中找到對應(yīng)的臨時節(jié)點列表
- 將這些臨時節(jié)點轉(zhuǎn)換成 節(jié)點刪除 請求,放入事務(wù)變更隊列 outstandingChanges 中
- FinalRequestProcessor觸發(fā)內(nèi)存數(shù)據(jù)庫,刪除該會話對應(yīng)的所有臨時節(jié)點
- 節(jié)點刪除后從SessionTracker中移除session(從sessionById sessionWithTimeout sessionExpiryQueue中移除對應(yīng)session的信息)
- 從NIOServerCnxnFactory中找到會話對應(yīng)的NIOServerCnxn進行關(guān)閉
重連機制
客戶端與服務(wù)端網(wǎng)絡(luò)連接斷開時,ZK客戶端會進行反復(fù)的重連
客戶端經(jīng)常看到的兩種連接異常是:CONNECTION_LOSS 連接斷開,SESSION_EXPIRE 會話過期;服務(wù)端可能看到的連接異常是SESSION_MOVED 會話轉(zhuǎn)移
- CONNECTION_LOSS:客戶端在發(fā)現(xiàn)連接斷開時會逐個嘗試連接 connectString 解析出的服務(wù)器地址,同時此時收到連接事件 None-Disconnected,同時拋出異常 KeeperException$ConnectionLossException,應(yīng)用層應(yīng)捕獲住此異常并等待重連成功(收到None-SyncConnected事件)后進行重試
- SESSION_EXPIRE:通常發(fā)生在CONNECTION_LOSS,客戶端重連成功后會話在服務(wù)端已過期被清理。應(yīng)用層此時需要重新創(chuàng)建一個ZooKeeper實例進行初始化
- SESSION_MOVED:ZooKeeper在3.2.0版本后明確提出的概念,客戶端 C 向服務(wù)端 S1發(fā)出的請求R1因網(wǎng)絡(luò)抖動導(dǎo)致重連到S2,并重試請求R11,但后面R1成功到達S1,導(dǎo)致S1 S2 都執(zhí)行了相同的請求。針對這一罕見場景,ZooKeeper提出的處理方案: 在處理客戶端請求時檢查此會話Owner是不是當(dāng)前服務(wù)器,不是的話會拋出 SessionMovedException 異常,但C1因為已斷開與S1的連接,看不到S1上的這個異常。在多個客戶端使用相同的sessionId/pass連接不同服務(wù)端時才會看到這種異常
ZK服務(wù)端
ZK服務(wù)端架構(gòu)
zookeeper學(xué)習(xí)筆記Sky_的博客-CSDN博客
單機版ZK服務(wù)器的啟動流程
預(yù)啟動
- 不論是單機還是集群模式,zkServer.cmd和zkServer.sh兩個腳本中都配置了使用QuorumPeerMain 作為啟動入口類
ZOOMAIN="org.apache.zookeeper.server.quorum.QuorumPeerMain"
- 解析配置文件 zoo.cfg
- 在QuorumPeerMain#initializeAndRun方法中創(chuàng)建并啟動了文件清理器 DatadirCleanupManager,包括對事物日志和快照數(shù)據(jù)文件的定時清理
- 根據(jù)zoo.cfg配置文件的解析判斷當(dāng)前是單機還是集群模式啟動,單機模式使用ZooKeeperServerMain啟動
- 創(chuàng)建ZooKeeperServer實例并進行初始化,包括連接器、內(nèi)存數(shù)據(jù)庫和請求處理器等組件的初始化
初始化
-
創(chuàng)建服務(wù)器統(tǒng)計器ServerStats,包含下述基本運行時信息:
- packetsSent: 從服務(wù)啟動或重置以來,服務(wù)端向客戶端發(fā)送的響應(yīng)包次數(shù)
- packetsReceived: … 服務(wù)端接收到的來自客戶端的請求包次數(shù)
- maxLatency/minLatency/totalLatency: 服務(wù)端請求處理的最大延時、最小延時、總延時
- count: 服務(wù)端處理的客戶端請求總次數(shù)
-
創(chuàng)建ZK數(shù)據(jù)管理器FileTxnSnapLog:FileTxnSnapLog是ZK上層服務(wù)器和底層數(shù)據(jù)存儲之間的對接層,提供了一些列操作數(shù)據(jù)文件的接口,包括事務(wù)日志文件(TxnLog接口)和快照數(shù)據(jù)文件(SnapShot接口)。ZK根據(jù)zoo.cfg文件中解析出的快照數(shù)據(jù)目錄dataDir和事務(wù)日志目錄dataLogDir來創(chuàng)建FileTxnSnapLog。
-
設(shè)置服務(wù)端 tickTime 和 會話超時時間 限制
-
創(chuàng)建并初始化 ServerCnxnFactory , 通過屬性 zookeeper.serverCnxnFactory 指定zookeeper使用 Java原生NIO還是Netty框架作為ZooKeeper服務(wù)端網(wǎng)絡(luò)連接工廠
-
啟動ServerCnxnFactory主線程(執(zhí)行主邏輯所在的run方法)此時ZK的NIO服務(wù)器已經(jīng)對外開放了端口,客戶端可以訪問到2181端口,但此時zk服務(wù)器還無法正常處理客戶端請求
-
恢復(fù)本地數(shù)據(jù):ZK啟動時都會從本地快照文件和事務(wù)日志文件中進行數(shù)據(jù)恢復(fù)
-
創(chuàng)建并啟動會話管理器SessionTracker,同時會設(shè)置 expirationInterval 計算 nextExpirationTime、sessionID ,初始化本地數(shù)據(jù)結(jié)構(gòu) sessionsWithTimeout(保存每個會話的超時時間)。之后ZK就會開始會話管理器的會話超時檢查
-
初始化ZK的請求處理鏈,ZK服務(wù)端對于請求的初始方式是典型的責(zé)任鏈模式,單機版服務(wù)器的處理鏈主要包括:PrepRequestProcessor -> SyncRequestProcessor ->FinalRequestProcessor
-
注冊JMX服務(wù):ZK會將服務(wù)器運行時的一些狀態(tài)信息以JMX的方式暴露出來
-
注冊ZK服務(wù)器實例:此時ZK服務(wù)器初始化完畢,注冊到ServerCnxnFactory之后就可以對外提供服務(wù)了,至此單機版的ZK服務(wù)器啟動完畢
集群版ZK服務(wù)器的啟動過程
zk源碼閱讀26:集群版服務(wù)器啟動概述 - 簡書
預(yù)啟動過程與單機版一致
初始化
- 創(chuàng)建并初始化 ServerCnxnFactory
- 創(chuàng)建ZooKeeper數(shù)據(jù)管理器 FileTxnSnapLog
- 創(chuàng)建QuorumPeer 實例:Quorum是集群模式下特有的對象,是ZooKeeper服務(wù)器實例ZooKeeperServer的托管者。從集群層面看QuorumPeer代表了ZooKeeper集群中的一臺機器。在運行期間,Quorum會不斷檢測當(dāng)前服務(wù)器實例的運行狀態(tài),同時根據(jù)情況發(fā)起Leader選舉
- 創(chuàng)建內(nèi)存數(shù)據(jù)庫 ZKDatabase,管理ZooKeeper的所有會話記錄以及DataTree 和事務(wù)日志的存儲
- 初始化 QuorumPeer,將一些核心組件注冊到QuorumPeer,包括 FileTxnSnapLog、ServerCnxnFactory、ZKDatabase,同時配置一些參數(shù),包括服務(wù)器地址列表、Leader選舉算法和會話超時時間限制等
- 恢復(fù)本地數(shù)據(jù)
- 啟動 ServerCnxnFactory 主線程
Leader選舉
- Leader選舉初始化階段:Leader選舉是集群版啟動流程與單機版最大的不同,ZK會根據(jù)SID(服務(wù)器分配的ID)、lastLoggedZxid(最新的ZXID)和當(dāng)前的服務(wù)器epoch(currentEpoch)生成一個初始化的投票,初始化過程中每個服務(wù)器會為自己投票。 ZooKeeper會根據(jù)zoo.cfg中的配置(electionAlg),創(chuàng)建響應(yīng)的Leader選舉算法實現(xiàn),3.4.0之前支持 LeaderElection\AuthFastLeaderElection\FastLeaderElection 三種算法實現(xiàn),3.4.0之后只支持FastLeaderElection。 在初始化階段,ZooKeeper會首先創(chuàng)建Leader選舉所需的網(wǎng)絡(luò)IO層 QuorumCnxManager,同時啟動對Leader選舉端口的監(jiān)聽,等待集群中的其他服務(wù)器創(chuàng)建連接
- 注冊JMX服務(wù)
- 檢測當(dāng)前服務(wù)器狀態(tài):QuorumPeer不斷檢測當(dāng)前服務(wù)器的狀態(tài)做出相應(yīng)的處理,正常情況下,ZK服務(wù)器的狀態(tài)在LOOKING、LEADING和FOLLOWING/OBSERVING之間進行切換,。啟動階段QuorumPeer的狀態(tài)是LOOKING,因此開始進行Leader選舉
- Leader選舉:投票選舉產(chǎn)生Leader服務(wù)器,其他機器成為Follower或是Observer; Leader選舉算法的原則:集群中的數(shù)據(jù)越新(根據(jù)每個服務(wù)器處理過的最大ZXID來確定數(shù)據(jù)是否比較新)越有可能成為Leader,ZXID相同時SID越大越有可能成為Leader。
Leader和Follower服務(wù)器啟動期交互過程
- 完成Leader選舉后,每個服務(wù)器根據(jù)自己的角色創(chuàng)建相應(yīng)服務(wù)器實例,并開始進入各自角色主流程
- Leader服務(wù)器啟動Follower接收器LearnerCnxAcceptor,負責(zé)接收所有非Leader服務(wù)器的連接請求
- Learner服務(wù)器根據(jù)投票選舉結(jié)果找到當(dāng)前集群中的Leader服務(wù)器,與其建立連接
- Leader接收來自其他機器的連接創(chuàng)建請求后,創(chuàng)建一個LearnerHandler實例。每個LearnerHandler實例都對應(yīng)了一個Leader與Learner的服務(wù)器之間的連接,負責(zé)消息、數(shù)據(jù)同步
- Learner向Leader發(fā)起注冊:將含有當(dāng)前服務(wù)器SID和服務(wù)器處理的最新ZXID信息的LearnerInfo發(fā)送給Leader服務(wù)器
- Leader接收到注冊信息后解析出SID和ZXID,根據(jù)ZXID解析出Learner對應(yīng)的epoch_of_learner_parse,與自己的epoch_of_leader_self進行比較,如果epoch_of_learner_parse>epoch_of_leader_self,則更新 epoch_of_leader_self=epoch_of_learner_parse+1。LearnerHandler會進行等待,直到過半的Learner向Leader注冊完畢,同時更新 epoch_of_leader 之后,Leader就可以確定當(dāng)前集群的epoch
- Leader將最終的epoch以LEADERINFO消息的形式發(fā)送給Learner,同時等待Learner的響應(yīng)
- Follower從LEADERINFO消息中解析出epoch和ZXID向Leader返回ACKEPOCH響應(yīng)
- Leader收到反饋響應(yīng)ACKEPOCH后與Follower進行數(shù)據(jù)同步
- 如果過半的Learner完成了數(shù)據(jù)同步,就啟動Leader和Learner服務(wù)器實例
Leader和Follower啟動
接上面步驟10,啟動步驟如下:
- 創(chuàng)建并啟動會話管理器
- 初始化ZooKeeper的請求處理鏈:根據(jù)服務(wù)器角色的不同生成不同的請求處理鏈
- 注冊JMX服務(wù)
至此,集群版的ZK服務(wù)器啟動完畢
Leader選舉過程
Leader選舉是ZooKeeper中最重要的技術(shù)之一,也是保證分布式數(shù)據(jù)一致性的關(guān)鍵
服務(wù)器啟動時期的Leader選舉
以3臺機器組成的集群為例:Server1首先啟動,此時無法完成Leader選舉
-
Server2啟動后,與Server1進行Leader選舉,由于是初始化階段,都會投票給自己,于是Server1投票內(nèi)容 (myid, ZXID) 為 (1,0),Server2投票 (2,0),各自將這個投票發(fā)送給集群中的其他所有機器
-
每個服務(wù)器接收來自其他各服務(wù)器的投票,并判斷投票的有效性:檢查是否是本輪投票,是否來自LOOKING狀態(tài)的服務(wù)器
-
收到其他服務(wù)器的投票后與自己的投票進行PK,PK規(guī)則有:
- 優(yōu)先檢查ZXID,ZXID較大的服務(wù)器優(yōu)先作為Leader
- ZXID相同時比較myid,myid較大的作為Leader
此時Server1收到Server2的投票(2,0),ZXID相同,但myid較小,會更新自己的投票為 (2,0) 并發(fā)出。Server2發(fā)現(xiàn)自己的myid較大,無需更新投票信息,只是再次向集群中所有機器發(fā)出上一次投票信息
- 統(tǒng)計投票:每次投票后服務(wù)器會統(tǒng)計所有投票,判斷是否有過半(> n/2 + 1)的機器接收到相同的投票信息來決定Leader服務(wù)器 此時3臺服務(wù)器已有 2臺(Server1 Server2)達成一致,超過半數(shù),將選舉出Leader - Server2
- 改變服務(wù)器狀態(tài):確定了Leader后服務(wù)器需要更新自己的狀態(tài),F(xiàn)ollower變更為FOLLOWING,Leader會變更為 LEADING 狀態(tài)
服務(wù)器運行期間的Leader選舉
Leader服務(wù)器宕機后進入新一輪的Leader選舉
- 變更狀態(tài):Leader宕機后剩下的非Observer服務(wù)器都會將自己的狀態(tài)變更為LOOKING,開始進入Leader選舉流程
- 每個Server發(fā)出一個投票:生成投票信息(myid, ZXID)在第一輪投票中,每個服務(wù)器都會投自己,后續(xù)的判斷過程與服務(wù)器啟動時期的Leader選舉相同
Leader選舉算法 - FastLeaderElection
ZooKeeper提供了3種Leader選舉算法:LeaderElection、UDP版本的FastLeaderElection、TCP版本的FastLeaderElection。
術(shù)語解釋:
SID - 服務(wù)器ID,唯一標識ZooKeeper集群中的機器的數(shù)字,與myid一致
ZXID - 事務(wù)ID,用于唯一標識一次服務(wù)器狀態(tài)的變更,某一時刻,集群中的每臺服務(wù)器的ZXID不一定完全一致
Vote - 投票
Quorum - 過半機器數(shù),quorum = n/2 + 1
ZooKeeper集群中服務(wù)器出現(xiàn)下述兩種情況之一就會進入Leader選舉:集群初始化啟動階段;Leader宕機/斷網(wǎng)
而一臺機器進入Leader選舉流程時,當(dāng)前集群也可能會處于兩種狀態(tài):
- 集群中本來就存在Leader,此時試圖發(fā)起選舉會被告知當(dāng)前服務(wù)器的Leader信息,直接與Leader建立連接并同步狀態(tài)
- 集群中不存在Leader:所有機器進入LOOKING狀態(tài)進行投票選舉Leader
【選舉案例】集群有5臺機器,SID分別為 1 2 3 4 5,ZXID分別為 9 9 9 8 8,在某一時刻SID為 1 2 的機器宕機退出,集群此時開始進行Leader選舉
第一次投票時,由于還無法檢測到集群中其他機器的狀態(tài)信息,每臺機器都將投自己,于是SID為 3 4 5的機器分別投票(SID,ZXID) (3,9) (4,8) (5,8)
每臺機器發(fā)出自己的投票后也會收到來自集群中其他機器的投票,每臺機器都會對比收到的投票,決定是否替換。假設(shè)機器自己的投票是 (self_sid, self_zxid) 接收到的投票是 (vote_sid, vote_zxid),對比的規(guī)則是:
- 如果 vote_zxid > self_zxid 則認可當(dāng)前投票,并再次將更新后的投票發(fā)送出去
- 如果 vote_zxid < self_zxid 則不作變更
- 如果 vote_zxid = self_zxid && vote_sid > self_sid,就認可當(dāng)前接收到的投票,并改為 (vote_sid, vote_zxid) 投遞出去
- 如果 vote_zxid = self_zxid && vote_sid < self_sid,則不作變更
SID為 3 4 5的機器對投票進行對比,會統(tǒng)一更新為投票 (3,9) ,此時quorum = 3 >= (5/2 + 1) 超過半數(shù),選舉服務(wù)器3作為Leader
ZXID越大的機器,數(shù)據(jù)也就越新,這樣可以保證數(shù)據(jù)的恢復(fù)(更少的數(shù)據(jù)丟失),所以適合作為Leader服務(wù)器
Leader選舉的實現(xiàn)細節(jié)
在QuorumPeer.ServerState 類中定義了4種服務(wù)器狀態(tài)
public enum ServerState {
?LOOKING, // 尋找Leader狀態(tài),當(dāng)前集群中沒有Leader,需要進入Leader選舉流程
?FOLLOWING, // 當(dāng)前服務(wù)器的角色是Follower
?LEADING, ?// 當(dāng)前服務(wù)器角色是Leader
?OBSERVING // 當(dāng)前服務(wù)器角色是 Observer
}
org.apache.zookeeper.server.quorum.Vote 數(shù)據(jù)結(jié)構(gòu)的定義
public class Vote {
?private final int version;
?private final long id; // 選舉的Leader的SID
?private final long zxid;
//邏輯時鐘,用于判斷多個投票是否在同一輪選舉周期中。該值在服務(wù)端是一個自增序列,每次進入新一輪投票后,都會對該值+1
?private final long electionEpoch;// 被推舉的Leader的epoch
?private final long peerEpoch;//當(dāng)前服務(wù)器的狀態(tài)
QuorumCnxManager 網(wǎng)絡(luò)IO
每個服務(wù)器啟動時會啟動一個QuorumCnxManager,負責(zé)各服務(wù)器的底層Leader選舉過程中的網(wǎng)絡(luò)通信。
QuorumCnxManager內(nèi)部維護了一系列按SID分組的消息隊列:
recvQueue:消息接收隊列,存放從其他服務(wù)器接收到的消息
queueSendMap:消息發(fā)送隊列,保存待發(fā)送的消息。此Map的key是SID,分別為集群中的每臺機器分配了一個單獨隊列,從而保證各臺機器之間的消息發(fā)送互不影響
senderWorkerMap:發(fā)送器集合,同樣按SID分組,每個SenderWorker消息發(fā)送器對應(yīng)一臺遠程ZooKeeper服務(wù)器
lastMessageSent:最近發(fā)送過的消息,為每個SID記錄最近發(fā)送過的消息
選舉時集群中的機器是如何建立連接的:
為了能夠進行互相投票,ZooKeeper集群中的機器需要兩兩建立網(wǎng)絡(luò)連接。
QuorumCnxManager啟動時會創(chuàng)建一個ServerSocket監(jiān)聽Leader選舉的通信端口(默認3888),接收其他服務(wù)器的TCP連接請求并交給receiveConnection函數(shù)來處理。為了避免兩臺機器之間重復(fù)創(chuàng)建TCP連接,ZooKeeper設(shè)計一種建立TCP連接的規(guī)則:只允許SID大的服務(wù)器主動和其他服務(wù)器建立連接,否則斷開連接。如果服務(wù)器收到TCP連接請求發(fā)現(xiàn)比自己的SID值小,會斷開這個連接并主動與發(fā)起連接的遠程服務(wù)器建立連接。
建立連接后就會根據(jù)外部服務(wù)器的SID創(chuàng)建對應(yīng)的消息發(fā)送器 SendWorker 和 消息接收器RecvWorker 并啟動
FastLeaderElection選舉算法的核心
ZooKeeper對于選票的管理
- sendqueue:選票發(fā)送隊列,保存待發(fā)送的選票
- recvqueue:選票接收隊列,保存接收到的外部選票
- FastLeaderElection.Messenger.WorkerReceiver:選票接收器,不斷從QuorumCnxManager中取出其他服務(wù)器發(fā)出的選舉消息,并轉(zhuǎn)成Vote,保存到recvqueueu。如果接收到的外部投票選舉輪次小于當(dāng)前服務(wù)器(validVoter方法返回false),直接忽略改選票同時發(fā)出自己的投票。如果當(dāng)前的服務(wù)器并不是LOOKING狀態(tài)(if (self.getPeerState() == QuorumPeer.ServerState.LOOKING)),就將Leader信息以投票的形式發(fā)出。 選票接收器接收到的消息如果來自O(shè)bserver就會忽略該消息,并將自己當(dāng)前的投票發(fā)送出去
- WorkerSender 選票發(fā)送器,會不斷從sendqueue隊列中獲取待發(fā)送的選票,并將其傳遞到底層QuorumCnxManager中
FastLeaderElection#lookForLeader方法中揭示了選舉算法的流程,該方法在服務(wù)器狀態(tài)變成LOOKING時觸發(fā)
選舉算法流程
-
自增選舉輪次 logicalclock ++ FastLeaderElection中的 AtomicLong logicalclock 字段標記當(dāng)前Leader的選舉輪次,ZooKeeper在開始新一輪投票時,會首先對logicalclock進行自增操作
-
初始化選票 初始化選票Vote的屬性:將自己推薦為Leader(id=服務(wù)器自身SID,zxid=當(dāng)前服務(wù)器最新ZXID,electionEpoch=當(dāng)前服務(wù)器的選舉輪次,peerEpoch=被推舉的服務(wù)器的選舉輪次,state=LOOKING)
-
將初始化好的選票放入sendqueue中,由WorkerSender負責(zé)發(fā)出
-
服務(wù)器不斷從 recvqueue 接收外部投票,如果服務(wù)器發(fā)現(xiàn)無法獲取到任何投票會檢查與其他服務(wù)器的連接,修復(fù)連接后重新發(fā)出
-
處理外部投票,根據(jù)選舉輪次判斷進行不同的處理:
- 外部投票選舉輪次 > 內(nèi)部輪次:立即更新自己的選舉輪次logicalclock,清空所有已收到的投票,使用初始化的投票進行PK以確定是否變更內(nèi)部投票,最終將內(nèi)部投票發(fā)送出去
- 外部投票選舉輪次 < 內(nèi)部輪次:忽略外部投票,返回步驟4
- 兩邊一致,絕大多數(shù)場景,選舉輪次一致時開始進行選票PK
-
選票PK:收到其他服務(wù)器有效的外部投票后,進行選票PK,執(zhí)行FastLeaderElection.totalOrderPredicate方法,選票PK的目的是確定當(dāng)前服務(wù)器是否需要變更投票,主要從 logicalclock、ZXID、SID三個維度判斷,符合下述任意一個條件就進行投票變更:
- 外部投票推舉的Leader服務(wù)器的 logicalclock > 內(nèi)部投票的,需要進行內(nèi)部投票變更
- logicalclock一致的,對比兩者的ZXID,外部投票ZXID > 內(nèi)部的,進行內(nèi)部投票變更
- 兩者的ZXID一致就對比SID,外部的大就進行投票變更
-
變更投票:如果需要變更投票就使用外部投票的選票信息覆蓋內(nèi)部投票,變更完成后再將這個變更后的內(nèi)部投票發(fā)出去
-
選票歸檔:無論是否進行了投票變更,外部投票都會存入recvset中進行歸檔,recvset中按照服務(wù)器對應(yīng)的SID來區(qū)分{(1,vote1),(2,vote2),…}
-
統(tǒng)計投票:統(tǒng)計集群中是否已經(jīng)有過半的機器認可了當(dāng)前的內(nèi)部投票,否則返回步驟4
-
更新服務(wù)器狀態(tài):如果此時已經(jīng)確定可以終止投票,就更新服務(wù)器狀態(tài):根據(jù)過半機器認可的投票對應(yīng)的服務(wù)器是否是自己確定是否成為Leader,并將狀態(tài)切換為LEADING/FOLLOWING/OBSERVING
上述10個步驟就是FastLeaderElection的選舉流程,步驟4~9會經(jīng)過幾輪循環(huán),直到Leader選舉產(chǎn)生。在步驟9如果已經(jīng)有過半服務(wù)器認可了當(dāng)前選票,此時ZooKeeper并不會立即進入步驟10,而是等待一段時間(默認200ms)來確定是否有新的更優(yōu)的投票。
服務(wù)器角色介紹
Leader
工作內(nèi)容:事務(wù)請求的唯一調(diào)度和處理者,保證集群事務(wù)處理的順序性;集群內(nèi)部各服務(wù)器的調(diào)度者;
ZooKeeper使用責(zé)任鏈模式來處理客戶端請求
-
PrepRequestProcessor是Leader服務(wù)器的請求預(yù)處理器,在ZK中,將創(chuàng)建刪除節(jié)點/更新數(shù)據(jù)/創(chuàng)建會話等會改變服務(wù)器狀態(tài)的請求稱為事務(wù)請求,對于事務(wù)請求,預(yù)處理器會進行一系列預(yù)處理,如創(chuàng)建請求事務(wù)頭、事務(wù)體、會話檢查、ACL檢查和版本檢查
-
ProposalRequestProcessor Leader的事務(wù)投票處理器,也是Leader服務(wù)器事務(wù)處理流程的發(fā)起者。
- 對于非事務(wù)請求:直接將請求流轉(zhuǎn)到CommitProcessor,不作其他處理
- 對于事務(wù)請求:除了交給CommitProcessor,還會根據(jù)對應(yīng)請求類型創(chuàng)建對應(yīng)的Proposal,并發(fā)送給所有Follower服務(wù)器發(fā)起一次集群內(nèi)的事務(wù)投票。ProposalRequestProcessor還會將事務(wù)請求交給SyncRequestProcessor進行事務(wù)日志的記錄
-
SyncRequestProcessor 事務(wù)日志處理器,將事務(wù)請求記錄到事務(wù)日志文件中,觸發(fā)ZooKeeper進行數(shù)據(jù)快照
-
AckRequestProcessor 是Leader特有的處理器,負責(zé)在SyncRequestProcessor處理器完成事務(wù)日志記錄后向Proposal的投票收集器發(fā)送ACK反饋,通知投票收集器當(dāng)前服務(wù)器已完成對該Proposal的事務(wù)日志記錄
-
CommitProcessor 事務(wù)提交處理器
-
ToBeCommitProcessor 該處理類中有一個toBeApplied隊列(ConcurrentLinkedQueue toBeApplied)存儲被CommitProcessor處理過的可被提交的Proposal,等待FinalRequestProcessor處理完提交的請求后從隊列中移除
-
FinalRequestProcessor 進行客戶端請求返回前的收尾工作:創(chuàng)建客戶端請求的響應(yīng)、將事務(wù)應(yīng)用到內(nèi)存數(shù)據(jù)庫
LearnerHandler:Leader服務(wù)器會與每一個Follower/Observer服務(wù)器建立一個TCP長鏈接,同時為每個Follower/Observer服務(wù)器創(chuàng)建LearnerHandler。LearnerHandler是ZK集群中的Learner服務(wù)器的管理器,負責(zé)Follower/Observer服務(wù)器和Leader服務(wù)器之間的網(wǎng)絡(luò)通信:數(shù)據(jù)同步、請求轉(zhuǎn)發(fā)、Proposal提議的投票。
Follower
Follower的職責(zé):處理客戶端非事務(wù)請求,轉(zhuǎn)發(fā)事務(wù)請求給Leader服務(wù)器;參與事務(wù)請求Proposal的投票;參與Leader選舉投票;
Follower不需要負責(zé)事務(wù)請求的投票處理(所以不需要ProposalRequestProcessor),所以其請求處理鏈簡單一些
- FollowerRequestProcessor 識別出當(dāng)前請求是否是事務(wù)請求,如果是事務(wù)請求,F(xiàn)ollower就會將請求轉(zhuǎn)發(fā)給Leader服務(wù)器,Leader服務(wù)器收到請求后提交給請求處理器鏈,按正常事務(wù)請求進行處理
- SendAckRequestProcessor Follower服務(wù)器上另一個和Leader服務(wù)器有差異的請求處理器,與Leader服務(wù)器上的AckRequestProcessor類似,SendAckRequestProcessor同樣承擔(dān)了事務(wù)日志記錄反饋的角色,在完成事務(wù)日志記錄后,會向Leader服務(wù)器發(fā)送ACK消息表明自身完成了事務(wù)日志的記錄工作。兩者的一個區(qū)別是:AckRequestProcessor在Leader服務(wù)器上,因此ACK反饋是一個本地操作,而SendAckRequestProcessor在Follower上,需要通過ACK消息的形式向Leader服務(wù)器進行反饋。
Observer
觀察ZooKeeper集群的最新狀態(tài)并將這些狀態(tài)變更同步過來,Observer服務(wù)器在工作原理上與Follower基本一致,對于非事務(wù)請求可以進行獨立的處理,對于事務(wù)請求同樣需要轉(zhuǎn)發(fā)到Leader服。與Follower的一大區(qū)別是:Observer不參與任何形式的投票,包括Leader選舉和事務(wù)請求Proposal的投票。
集群內(nèi)消息通信
ZK集群各服務(wù)器間消息類型分為4類:數(shù)據(jù)同步型、服務(wù)器初始化型、請求處理型、會話管理型
數(shù)據(jù)同步消息
Learner與Leader進行數(shù)據(jù)同步使用的消息,分為4種(消息類型定義在Leader.java中,使用常量數(shù)字標記):
- DIFF, 13 Leader發(fā)送給Learner,通知Learner進行DIFF方式的數(shù)據(jù)同步
- TRUNC, 14 Leader --> Learner 觸發(fā)Learner服務(wù)器進行內(nèi)存數(shù)據(jù)庫的回滾操作
- SNAP, 15 Leader --> Learner 通知Learner,Leader即將與其進行全量數(shù)據(jù)同步
- UPTODATE, 12 Leader --> Learner 通知Learner完成了數(shù)據(jù)同步,可以對外提供服務(wù)
服務(wù)器初始化型消息
整個集群或某些機器初始化時,Leader與Learner之間相互通信所使用的消息類型:
- OBSERVERINFO,16: Observer在啟動時發(fā)送消息給Leader,用于向Leader注冊O(shè)bserver身份,消息中包含當(dāng)前Observer服務(wù)器的SID和已經(jīng)處理的最新ZXID
- FOLLOWERINFO,11:Follower啟動時發(fā)送包含SID和已處理的最新ZXID的注冊消息到Leader
- LEADERINFO,17:上述兩種情形下,Leader服務(wù)器會返回包含最新EPOCH值的LeaderInfo返回給Observer或Follower
- ACKEPOCH,18:Learner在收到LEADERINFO消息時會將自己的最新ZXID和EPOCH以ACKEPOCH消息的形式發(fā)送給Leader
- NEWLEADER,10:足夠多的Follower連接上Leader服務(wù)器,或是Leader服務(wù)器完成數(shù)據(jù)同步后,Leader向Learner發(fā)送的階段性標識信息,包含當(dāng)前最新ZXID
請求處理型
請求處理過程中Leader和Learner之間互相通信所使用的消息:
- REQUEST,1:Learner收到事務(wù)請求時需要將請求轉(zhuǎn)發(fā)給Leader,該請求使用REQUEST消息的形式進行轉(zhuǎn)發(fā)
- PROPOSAL,2:在處理事務(wù)請求時,Leader服務(wù)器會將事務(wù)請求以PROPOSAL消息的形式創(chuàng)建投票發(fā)送給集群中的所有的Follower進行事務(wù)日志的記錄
- ACK,3:Follower完成事務(wù)日志的記錄后會以ACK消息的形式反饋給Leader
- COMMIT,4:Leader通知集群中的所有Follower,可以進行事務(wù)請求的提交了,Leader在收到過半Follower發(fā)來的ACK消息后,進入事務(wù)請求的最終提交流程——生成COMMIT消息,告知所有Follower進行事務(wù)請求的提交,這是一個2PC的過程
- INFORM,8:Leader發(fā)起事務(wù)投票并通知提交事務(wù),只需要PROPOSAL和COMMIT消息給Follower就可以了,而Observer不參與事務(wù)投票,無法接收COMMIT消息,但需要知道事務(wù)提交的內(nèi)容,所以ZK設(shè)計了INFORM消息發(fā)給Observer,消息中會攜帶事務(wù)請求的內(nèi)容
- SYNC,7:Leader通知Learner服務(wù)器已完成Sync操作
會話管理型
ZK服務(wù)器在進行會話管理過程中,與Learner服務(wù)器之間通信所使用的消息:
- PING,5:ZK客戶端隨機選擇一個服務(wù)器進行連接,所以Leader服務(wù)器無法直接收到所有客戶端的心跳檢測,所以需要委托Learner維護所有客戶端的心跳檢測。Leader定時向Learner發(fā)送PING消息就是要求Learner將一段時間內(nèi)保持心跳檢測的客戶端列表同樣以PING消息的形式返回給Leader,這樣Leader就能獲取到全部客戶端的活躍狀態(tài)并進行會話激活了。
- REVALIDATE,6:客戶端發(fā)生重連后(可能切換了服務(wù)器)新連接的服務(wù)器需要向Leader發(fā)送REVALIDATE消息以確定客戶端會話是否已經(jīng)超時。
客戶端請求的處理
會話創(chuàng)建請求
ZK服務(wù)端對于會話創(chuàng)建的處理,可以分為請求接收、會話創(chuàng)建、預(yù)處理、事務(wù)處理、事務(wù)應(yīng)用和會話響應(yīng)。
zookeeper源碼分析(3)— 一次會話的創(chuàng)建過程 - 簡書— 一次會話的創(chuàng)建過程 - 簡書")
請求接收
- IO層接收來自客戶端的請求,NIOServerCnxn實例維護每一個客戶端連接,負責(zé)客戶端與服務(wù)端通信,并將請求內(nèi)容從底層網(wǎng)絡(luò)IO中讀取出來
- 判斷是否是客戶端“會話創(chuàng)建”請求:檢查當(dāng)前請求對應(yīng)的NIOServerCnxn實體是否已經(jīng)初始化,未初始化時第一個請求必定是會話創(chuàng)建請求
- 反序列化ConnectRequest請求,確定是會話創(chuàng)建請求后就可以反序列化得到一個ConnectRequest請求實體
- 判斷是否是ReadOnly客戶端,如果ZK服務(wù)器是以ReadOnly模式啟動,所有來自非ReadOnly型客戶端的請求將無法處理。所以服務(wù)端需要從ConnectRequest中檢查是否是ReadOnly客戶端,以此來決定是否接受此“會話創(chuàng)建”請求
- 檢查客戶端ZXID:出現(xiàn)客戶端ZXID比服務(wù)端還大這種反常情形時,服務(wù)端不接受此會話創(chuàng)建請求
- 協(xié)商sessionTimeout:客戶端有自己設(shè)置的sessionTimeout值,傳到服務(wù)端后,服務(wù)端要根據(jù)自身配置進行檢查限定,通常的規(guī)則是 2 * ticketTime ~ 20 * tickerTime 之間
- 判斷是否需要重新創(chuàng)建會話:解析客戶端傳入的sessionID進行判斷
會話創(chuàng)建
- 為客戶端生成sessionID:每個ZK服務(wù)器啟動時都會初始化一個會話管理器SessionTracker,同時初始化一個基準sessionID,這個基準sessionID的生成需要保證后續(xù)客戶端在此基礎(chǔ)上不斷+1能夠全局唯一。sessionID生成算法見客戶端介紹:會話Session > sesssionID的生成原理。
- 注冊會話:將會話信息保存到SessionTracker的本地字段中:ConcurrentHashMap<Long, SessionImpl> sessionsById、ConcurrentMap<Long, Integer> sessionsWithTimeout
- 會話激活:服務(wù)端根據(jù)配置的ticketTime和會話超時時間比對計算下一次會話超時時間(使用了分桶策略)sessionsWithTimeout
- 生成會話密碼:隨機數(shù),生成代碼見 ZooKeeperServer#generatePasswd
預(yù)處理
- PrepRequestProcessor處理請求(責(zé)任鏈模式)
- 創(chuàng)建請求事務(wù)頭:對于事務(wù)請求,ZK會為其創(chuàng)建請求事務(wù)頭,后續(xù)請求處理器都是基于該請求頭標識當(dāng)前請求是否是事務(wù)請求,請求事務(wù)頭包含:clientId(唯一標識請求所屬客戶端)cxid(客戶端操作序列號)zxid(事務(wù)請求對應(yīng)的zxid)time(服務(wù)端開始處理事務(wù)請求的時間)type(事務(wù)請求的類型:ZooDefs.OpCode.create、delete、setData和createSession等)
- 創(chuàng)建請求事務(wù)體CreateSessionTxn
- 注冊與激活會話:額外處理非Leader轉(zhuǎn)發(fā)的會話創(chuàng)建請求
事務(wù)處理
-
ProposalRequestProcessor處理請求:PrepRequestProcessor將請求交給下一級處理器,提案Proposal是ZK中對因事務(wù)請求展開的投票流程中的事務(wù)操作的包裝,該處理器就是處理提案的,處理流程有:
-
Sync流程:SyncRequestProcessor處理器記錄事務(wù)日志。完成事務(wù)日志記錄后,每個Follower都會向Leader發(fā)送ACK消息,表明自身完成了事務(wù)日志的記錄,以便Leader服務(wù)器統(tǒng)計每個事務(wù)請求的投票情況
-
Proposal流程:ZK的實現(xiàn)中,每個事務(wù)請求都需要集群中過半機器投票認可才能真正應(yīng)用到ZK的內(nèi)存數(shù)據(jù)庫中,這個投票與統(tǒng)計的過程就叫 Proposal流程:
- 發(fā)起投票:對于事務(wù)請求,Leader服務(wù)器會發(fā)起一輪事務(wù)投票,發(fā)起事務(wù)投票之前會檢查服務(wù)端ZXID是否可用,如果不可用會拋出XidRolloverException
- 生成提議Proposal:如果服務(wù)端ZXID可用,就可以開始事務(wù)投票了,ZK會將之前創(chuàng)建的請求頭和事務(wù)體,以及ZXID和請求本身序列化到Proposal對象中
- 廣播提議:Leader服務(wù)器會以ZXID作為key,將提議放入投票箱ConcurrentMap<Long, Proposal> outstandingProposals中,同時將該提議廣播給所有Follower服務(wù)器
- 收集投票:Follower服務(wù)器接收到Leader發(fā)來的提議后,會進入Sync流程進行事務(wù)日志的記錄,執(zhí)行完后發(fā)送ACK消息給Leader,Leader根據(jù)ACK消息統(tǒng)計Proposal的投票情況。當(dāng)過半機器通過時,就進入Proposal的Commit階段
- Commit Proposal前將請求放入 toBeApplied 隊列中
- 廣播COMMIT消息:Leader會向Observer廣播包含Proposal內(nèi)容的INFORM消息,而對于Follower服務(wù)器則需只發(fā)送ZXID(上文有介紹)
-
Commit流程:
- 將請求交給CommitProcessor.java處理器,放入 LinkedBlockingQueue queuedRequests 中,獨立線程會取出處理
- 標記topPending:如果是事務(wù)請求(write類型),就會將topPending標記為當(dāng)前請求,用于確保事務(wù)請求的順序性,便于CommitProcessor檢測當(dāng)前集群中是否正在進行事務(wù)請求的投票
- 等待Proposal投票:Commit流程處理時,Leader根據(jù)當(dāng)前事務(wù)請求生成Proposal廣播給所有Follower,此時Commit流程需要等待
- 投票通過,提議獲得過半機器認可,ZK會將請求放入committedRequests隊列中,同時喚醒Commit流程
- 提交請求:將請求放入toProcess隊列中,交給FinalRequestProcessor處理
-
事務(wù)應(yīng)用
自我介紹一下,小編13年上海交大畢業(yè),曾經(jīng)在小公司待過,也去過華為、OPPO等大廠,18年進入阿里一直到現(xiàn)在。
深知大多數(shù)大數(shù)據(jù)工程師,想要提升技能,往往是自己摸索成長或者是報班學(xué)習(xí),但對于培訓(xùn)機構(gòu)動則幾千的學(xué)費,著實壓力不小。自己不成體系的自學(xué)效果低效又漫長,而且極易碰到天花板技術(shù)停滯不前!
因此收集整理了一份《2024年大數(shù)據(jù)全套學(xué)習(xí)資料》,初衷也很簡單,就是希望能夠幫助到想自學(xué)提升又不知道該從何學(xué)起的朋友。
既有適合小白學(xué)習(xí)的零基礎(chǔ)資料,也有適合3年以上經(jīng)驗的小伙伴深入學(xué)習(xí)提升的進階課程,基本涵蓋了95%以上大數(shù)據(jù)開發(fā)知識點,真正體系化!
由于文件比較大,這里只是將部分目錄大綱截圖出來,每個節(jié)點里面都包含大廠面經(jīng)、學(xué)習(xí)筆記、源碼講義、實戰(zhàn)項目、講解視頻,并且后續(xù)會持續(xù)更新文章來源:http://www.zghlxwxcb.cn/news/detail-855559.html
如果你覺得這些內(nèi)容對你有幫助,可以添加VX:vip204888 (備注大數(shù)據(jù)獲?。?/strong>
t> queuedRequests 中,獨立線程會取出處理
+ 標記topPending:如果是事務(wù)請求(write類型),就會將topPending標記為當(dāng)前請求,用于確保事務(wù)請求的順序性,便于CommitProcessor檢測當(dāng)前集群中是否正在進行事務(wù)請求的投票
+ 等待Proposal投票:Commit流程處理時,Leader根據(jù)當(dāng)前事務(wù)請求生成Proposal廣播給所有Follower,此時Commit流程需要等待
+ 投票通過,提議獲得過半機器認可,ZK會將請求放入committedRequests隊列中,同時喚醒Commit流程
+ 提交請求:將請求放入toProcess隊列中,交給FinalRequestProcessor處理
事務(wù)應(yīng)用
自我介紹一下,小編13年上海交大畢業(yè),曾經(jīng)在小公司待過,也去過華為、OPPO等大廠,18年進入阿里一直到現(xiàn)在。
深知大多數(shù)大數(shù)據(jù)工程師,想要提升技能,往往是自己摸索成長或者是報班學(xué)習(xí),但對于培訓(xùn)機構(gòu)動則幾千的學(xué)費,著實壓力不小。自己不成體系的自學(xué)效果低效又漫長,而且極易碰到天花板技術(shù)停滯不前!
因此收集整理了一份《2024年大數(shù)據(jù)全套學(xué)習(xí)資料》,初衷也很簡單,就是希望能夠幫助到想自學(xué)提升又不知道該從何學(xué)起的朋友。
[外鏈圖片轉(zhuǎn)存中…(img-WS3sPhEW-1712866405876)]
[外鏈圖片轉(zhuǎn)存中…(img-jfkJMYrj-1712866405877)]
[外鏈圖片轉(zhuǎn)存中…(img-mdjdQu4x-1712866405877)]
[外鏈圖片轉(zhuǎn)存中…(img-pLReZrFw-1712866405878)]
[外鏈圖片轉(zhuǎn)存中…(img-VqzvFvPS-1712866405878)]
既有適合小白學(xué)習(xí)的零基礎(chǔ)資料,也有適合3年以上經(jīng)驗的小伙伴深入學(xué)習(xí)提升的進階課程,基本涵蓋了95%以上大數(shù)據(jù)開發(fā)知識點,真正體系化!
由于文件比較大,這里只是將部分目錄大綱截圖出來,每個節(jié)點里面都包含大廠面經(jīng)、學(xué)習(xí)筆記、源碼講義、實戰(zhàn)項目、講解視頻,并且后續(xù)會持續(xù)更新
如果你覺得這些內(nèi)容對你有幫助,可以添加VX:vip204888 (備注大數(shù)據(jù)獲?。?/strong>
[外鏈圖片轉(zhuǎn)存中…(img-4S1Fq4Kd-1712866405878)]文章來源地址http://www.zghlxwxcb.cn/news/detail-855559.html
到了這里,關(guān)于ZooKeeper技術(shù)細節(jié)_zookeeper and 服務(wù)器 and 客戶端 and 數(shù)據(jù)節(jié)點的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!