目錄
1. Zookeeper集群操作
1.1 客戶端操作zk集群
1.2 模擬集群異常操作
1.3 curate客戶端連接zookeeper集群
2. Zookeeper實戰(zhàn)案例
2.1 創(chuàng)建項目引入依賴
2.2 獲取zk客戶端對象
2.3 常用API
2.4 客戶端向服務(wù)端寫入數(shù)據(jù)流程
2.5 服務(wù)器動態(tài)上下線、客戶端動態(tài)監(jiān)聽
2.6 測試
3.Zookeeper分布式鎖
3.1 什么是分布式鎖
3.2 Zookeeper分布式鎖分析
3.3 分布式鎖實現(xiàn)
1. Zookeeper集群操作
1.1 客戶端操作zk集群
1) 第一步啟動集群,啟動后查看Zookeeper進程。
jps命令 作用是顯示當(dāng)前所有java 進程的pid 的命令,QuorumPeerMain是zookeeper集群的啟動入口類
2) 客戶端連接
-
連接集群所有客戶端
[root@localhost zookeeper-1]# ./bin/zkCli.sh -server 192.168.58.200:2181,192.168.58.200:2182,192.168.58.200:2183 ? ? ? ? ? ? ? ? ?
連接集群單個客戶端
# 連接2181 [root@localhost zookeeper-1]# ./bin/zkCli.sh -server 192.168.58.200:2181 ? # 連接2182 [root@localhost zookeeper-1]# ./bin/zkCli.sh -server 192.168.58.200:2182 ? # 在2181中創(chuàng)建節(jié)點 [zk: 192.168.58.200:2181(CONNECTED) 0] create /test2 ? # 在2182中查詢,發(fā)現(xiàn)數(shù)據(jù)已同步 [zk: 192.168.58.200:2182(CONNECTED) 0] ls / [test1, test2, zookeeper]
以上兩種方式的區(qū)別在于:
-
如果只連接單個客戶端,如果當(dāng)前連接的服務(wù)器掛掉,當(dāng)前客戶端連接也會掛掉,連接失敗。
-
如果是連接所有客戶端的形式,則允許集群中半數(shù)以下的服務(wù)掛掉!當(dāng)半數(shù)以上服務(wù)掛掉才會停止服務(wù),可用性更高一點!
3)集群節(jié)點信息查看
集群中的節(jié)點信息被存放在每一個節(jié)點/zookeeper/config/目錄下
1.2 模擬集群異常操作
Leader選舉
-
Serverid : 服務(wù)器ID
-
三臺服務(wù) 編號分別是 1 2 3
-
編號越大在選擇算法中權(quán)重越大
-
-
Zxid: 數(shù)據(jù)ID
-
服務(wù)器中存放的最大的數(shù)據(jù)ID, 值越大數(shù)據(jù)越新
-
-
在Leader選舉的過程中 如果某臺Zookeeper獲得了超過半數(shù)的選票,就可以當(dāng)選Leader
(1)首先我們先測試如果是從服務(wù)器掛掉,會怎么樣
把3號服務(wù)器停掉,觀察1號和2號,發(fā)現(xiàn)狀態(tài)并沒有變化
/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh stop ? /usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh status /usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh status
由此得出結(jié)論,3個節(jié)點的集群,從服務(wù)器掛掉,集群正常
(2)我們再把1號服務(wù)器(從服務(wù)器)也停掉,查看2號(主服務(wù)器)的狀態(tài),發(fā)現(xiàn)已經(jīng)停止運行了。
/usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh stop ? /usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh status
由此得出結(jié)論,3個節(jié)點的集群,2個從服務(wù)器都掛掉,主服務(wù)器也無法運行。因為可運行的機器沒有超過集群總數(shù)量的半數(shù)。
(3)我們再次把1號服務(wù)器啟動起來,發(fā)現(xiàn)2號服務(wù)器又開始正常工作了。而且依然是領(lǐng)導(dǎo)者。
(4)我們把3號服務(wù)器也啟動起來,把2號服務(wù)器停掉,停掉后觀察1號和3號的狀態(tài)。
/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh start /usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh stop ? /usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh status /usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh status
發(fā)現(xiàn)新的leader產(chǎn)生了~
由此我們得出結(jié)論,當(dāng)集群中的主服務(wù)器掛了,集群中的其他服務(wù)器會自動進行選舉狀態(tài),然后產(chǎn)生新得leader 。
(5)我們再次測試,當(dāng)我們把2號服務(wù)器重新啟動起來啟動后,會發(fā)生什么?2號服務(wù)器會再次成為新的領(lǐng)導(dǎo)嗎?我們看結(jié)果
/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh start ? /usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh status /usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh status
我們會發(fā)現(xiàn),2號服務(wù)器啟動后依然是跟隨者(從服務(wù)器),3號服務(wù)器依然是領(lǐng)導(dǎo)者(主服務(wù)器),沒有撼動3號服務(wù)器的領(lǐng)導(dǎo)地位。
由此我們得出結(jié)論,當(dāng)領(lǐng)導(dǎo)者產(chǎn)生后,再次有新服務(wù)器加入集群,不會影響到現(xiàn)任領(lǐng)導(dǎo)者。
1.3 curate客戶端連接zookeeper集群
public class CuratorCluster { ? ? ?//zookeeper連接 ? ?private final static String CLUSTER_CONNECT = "192.168.58.200:2181,192.168.58.200:2182,192.168.58.200:2183"; ? ? ?//session超時時間 ? ?private static final int sessionTimeoutMs = 60 * 1000; ? ? ?//連接超時時間 ? ?private static final int connectionTimeoutMs = 5000; ? ? ?private static CuratorFramework client; ? ? ?public static String getClusterConnect() { ? ? ? ?return CLUSTER_CONNECT; ? } ? ? ?@Before ? ?public void init(){ ? ? ? ? ?// 重試策略 ? ? ? ?RetryPolicy retryPolicy =new ExponentialBackoffRetry(3000,10); ? ? ? ? ?// zookeeper連接 ? ? ? ?client = CuratorFrameworkFactory.builder() ? ? ? ? ? ? ? .connectString(getClusterConnect()) ? ? ? ? ? ? ? .sessionTimeoutMs(60*1000) ? ? ? ? ? ? ? .connectionTimeoutMs(15*1000) ? ? ? ? ? ? ? .retryPolicy(retryPolicy) ? ? ? ? ? ? ? .namespace("mashibing") ?//當(dāng)前程序創(chuàng)建目錄的根目錄 ? ? ? ? ? ? ? .build(); ? ? ? ? ?// 添加監(jiān)聽器 ? ? ? ?client.getConnectionStateListenable().addListener(new ConnectionStateListener() { ? ? ? ? ? ?@Override ? ? ? ? ? ?public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) { ? ? ? ? ? ? ? ?System.out.println("連接成功!"); ? ? ? ? ? } ? ? ? }); ? ? ? ? ?client.start(); ? } ? ? ?//創(chuàng)建節(jié)點 ? ?public void createIfNeed(String path) throws Exception { ? ? ? ?Stat stat = client.checkExists().forPath(path); ? ? ? ?if(stat == null){ ? ? ? ? ? ?String s = client.create().forPath(path); ? ? ? ? ? ?System.out.println("創(chuàng)建節(jié)點: " + s); ? ? ? } ? } ? ? ? ?//從集群中獲取數(shù)據(jù) ? ?@Test ? ?public void testCluster() throws Exception { ? ? ? ? ?createIfNeed("/test"); ? ? ? ? ?//每隔一段時間 獲取一次數(shù)據(jù) ? ? ? ?while(true){ ? ? ? ? ? ?byte[] data = client.getData().forPath("/test"); ? ? ? ? ? ?System.out.println(new String(data)); ? ? ? ? ? ? ?TimeUnit.SECONDS.sleep(5); ? ? ? } ? } }
在集群中的任意服務(wù)器節(jié)點,為test設(shè)置數(shù)據(jù)
[zk: 192.168.58.200:2181,192.168.58.200:2182,192.168.58.200:2183(CONNECTED) 2] set /mashibing/test 12345
2. Zookeeper實戰(zhàn)案例
2.1 創(chuàng)建項目引入依賴
<dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.10</version> </dependency>
2.2 獲取zk客戶端對象
public class ZkClientTest { private String connectString = "192.168.58.200:2181,192.168.58.200:2182,192.168.58.200:2183"; private int sessionTimeout = 2000; private ZkClient zkClient; /** * 獲取zk客戶端連接 */ @Before public void Before(){ /** * 參數(shù)1:服務(wù)器的IP和端口 * 參數(shù)2:會話的超時時間 * 參數(shù)3:會話的連接時間 * 參數(shù)4:序列化方式 */ zkClient = new ZkClient(connectString, sessionTimeout, 1000 * 15, new SerializableSerializer()); } @After public void after(){ zkClient.close(); } }
2.3 常用API
-
創(chuàng)建節(jié)點
/** * 創(chuàng)建節(jié)點 */ @Test public void testCreateNode(){ //創(chuàng)建方式 返回創(chuàng)建節(jié)點名稱 String nodeName = zkClient.create("/node1", "lisi", CreateMode.PERSISTENT); System.out.println("路徑名稱為:" + nodeName); zkClient.create("/node2","wangwu",CreateMode.PERSISTENT_SEQUENTIAL); zkClient.create("/node3","hehe",CreateMode.EPHEMERAL); zkClient.create("/node4","haha",CreateMode.EPHEMERAL_SEQUENTIAL); while(true){} }
-
刪除節(jié)點
/** * 刪除節(jié)點 */ @Test public void testDeleteNode(){ // 刪除沒有子節(jié)點的節(jié)點 // boolean b1 = zkClient.delete("/node2"); // System.out.println("刪除成功: " + b1); // 遞歸刪除節(jié)點信息 boolean b2 = zkClient.deleteRecursive("/node2"); System.out.println("刪除成功: " + b2); }
-
查看節(jié)點的子節(jié)點
/** * 查詢節(jié)點的子節(jié)點 */ @Test public void testFindNodes(){ //返回指定路徑的節(jié)點信息 List<String> ch = zkClient.getChildren("/"); for (String c1 : ch) { System.out.println(c1); } }
-
查看當(dāng)前節(jié)點的數(shù)據(jù)
-
注意:如果出現(xiàn):org.I0Itec.zkclient.exception.ZkMarshallingError: java.io.StreamCorruptedException: invalid stream header: 61616161 異常的原因是: 在shell中的數(shù)據(jù)序列化方式 和 java代碼中使用的序列化方式不一致導(dǎo)致 因此要解決這個問題只需要保證序列化一致即可 都使用相同端操作即可
-
/** * 獲取節(jié)點數(shù)據(jù) */ @Test public void testFindNodeData(){ String nodeName = zkClient.create("/node3", "taotao", CreateMode.PERSISTENT); Object data = zkClient.readData("/node3"); System.out.println(data); }
-
查看當(dāng)前節(jié)點的數(shù)據(jù)并獲取狀態(tài)信息
/** * 獲取數(shù)據(jù)以及當(dāng)前節(jié)點狀態(tài)信息 */ @Test public void testFindNodeDataAndStat(){ Stat stat = new Stat(); Object data = zkClient.readData("/node20000000004", stat); System.out.println(data); System.out.println(stat); }
-
修改節(jié)點數(shù)據(jù)
/** * 修改節(jié)點 */ @Test public void testUpdateNodeData(){ zkClient.writeData("/node3","123456"); }
-
監(jiān)聽節(jié)點數(shù)據(jù)的變化
/** * 監(jiān)聽節(jié)點數(shù)據(jù) */ @Test public void testNodeChange(){ zkClient.subscribeDataChanges("/node3", new IZkDataListener() { // 當(dāng)節(jié)點的值在修改時,會自動調(diào)用這個方法 @Override public void handleDataChange(String nodeName, Object result) throws Exception { System.out.println("節(jié)點名稱: " + nodeName); System.out.println("節(jié)點數(shù)據(jù): " + result); } // 當(dāng)節(jié)點被刪除時,會調(diào)用該方法 @Override public void handleDataDeleted(String nodeName) throws Exception { System.out.println("節(jié)點名稱: " + nodeName); } }); while(true){} }
-
監(jiān)聽節(jié)點目錄的變化
/** * 監(jiān)聽節(jié)點目錄的變化 */ @Test public void testNodesChange(){ zkClient.subscribeChildChanges("/node3", new IZkChildListener() { @Override public void handleChildChange(String nodeName, List<String> list) throws Exception { System.out.println("父節(jié)點名稱: " + nodeName); System.out.println("發(fā)生變更后,所有子節(jié)點名稱: "); for (String name : list) { System.out.println(name); } } }); while(true){} }
-
判斷某一個節(jié)點是否存在
//判斷節(jié)點是否存在 @Test public void exist(){ boolean exists = zkClient.exists("/node3"); System.out.println(exists == true ? "節(jié)點存在" : "節(jié)點不存在"); }
2.4 客戶端向服務(wù)端寫入數(shù)據(jù)流程
-
寫流程之寫入請求,直接發(fā)送給Leader
-
寫流程之寫入請求,發(fā)送給follower節(jié)點
2.5 服務(wù)器動態(tài)上下線、客戶端動態(tài)監(jiān)聽
某分布式系統(tǒng)中,主節(jié)點可以有多臺,可以動態(tài)上下線,任意一臺客戶端都能實時感知到主節(jié)點服務(wù)器的上下線。
1)根節(jié)點下,創(chuàng)建servers節(jié)點
[zk: 192.168.58.200:2181,192.168.58.200:2182,192.168.58.200:2183(CONNECTED) 0] create /servers "servers" Created /servers
2)服務(wù)端代碼
完成服務(wù)端向zookeeper注冊、動態(tài)上下線的代碼。
/** * 服務(wù)端 */ public class DistributeServer { private ZooKeeper client; // 連接信息 private String connectString = "192.168.58.200:2181,192.168.58.200:2182,192.168.58.200:2183"; // 超時時間 private int sessionTimeOut = 30000; public static void main(String[] args) throws Exception { DistributeServer server = new DistributeServer(); //1.獲取zk連接 server.getConnect(); //2.將服務(wù)器注冊到zk集群,args參數(shù)通過啟動 main方法時傳入即可 server.register(args[0]); //3.啟動業(yè)務(wù)邏輯(線程睡眠) Thread.sleep(Long.MAX_VALUE); } /** * 注冊操作 * @param hostName 將服務(wù)器注冊到zk集群時,所需的服務(wù)名稱 */ private void register(String hostName) throws Exception { /** * ZooDefs.Ids.OPEN_ACL_UNSAFE: 此權(quán)限表示允許所有人訪問該節(jié)點(服務(wù)器) * CreateMode.EPHEMERAL_SEQUENTIAL: 由于服務(wù)器是動態(tài)上下線的,上線后存在,下線后不存在,所以是臨時節(jié)點 * 而服務(wù)器一般都是有序號的,所以是臨時、有序的節(jié)點. */ String node = client.create("/servers/" + hostName, hostName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("已成功創(chuàng)建" + node + "節(jié)點"); System.out.println(hostName + " 已經(jīng)上線"); } /** * 獲取連接 */ private void getConnect() throws IOException { client = new ZooKeeper(connectString, sessionTimeOut, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { } }); } }
2)客戶端代碼
服務(wù)端代碼寫好之后,再來完成客戶端動態(tài)監(jiān)聽zk服務(wù)端各個節(jié)點的代碼。
/** * 客戶端 */ public class DistributeClient { private ZooKeeper zk; // 連接信息 private String connectString = "192.168.58.200:2181,192.168.58.200:2182,192.168.58.200:2183"; // 超時時間 private int sessionTimeOut = 30000; public static void main(String[] args) throws Exception { DistributeClient client = new DistributeClient(); //1.獲取zk連接 client.getConnection(); //2.監(jiān)聽 /servers下面所有的子節(jié)點變化 client.getServerList(); //3.業(yè)務(wù)邏輯 Thread.sleep(Long.MAX_VALUE); } /** * 獲取連接 */ private void getConnection() throws Exception { zk = new ZooKeeper(connectString, sessionTimeOut, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { //監(jiān)聽服務(wù)器地址的上下線 try { getServerList(); } catch (Exception e) { e.printStackTrace(); } } }); } /** * 監(jiān)聽 /servers路徑下的所有子節(jié)點變化,true表示啟動監(jiān)聽器 */ private void getServerList() throws Exception { List<String> zkChildren = zk.getChildren("/servers", true); List<String> servers = new ArrayList<>(); zkChildren.forEach(node -> { //拼接服務(wù)完整信息 try { byte[] data = zk.getData("/servers/" + node, false, null); servers.add(new String(data)); } catch (Exception e) { e.printStackTrace(); } }); System.out.println(servers); System.out.println("======================="); } }
2.6 測試
1)Zookeeper命令行,完成測試
[zk: localhost:2181(CONNECTED) 1] create -e -s /servers/zk01 "192.168.58.200:2181" Created /servers/zk010000000000
[zk: localhost:2181(CONNECTED) 2] create -e -s /servers/zk02 "192.168.58.200:2182" Created /servers/zk020000000001
[zk: localhost:2181(CONNECTED) 3] create -e -s /servers/zk03 "192.168.58.200:2183" Created /servers/zk030000000002
上面的執(zhí)行結(jié)果可以看到,在servers下面依次創(chuàng)建子結(jié)點,客戶端代碼都可以成功監(jiān)聽到。
下面我們刪除節(jié)點,看看客戶端能不能做到動態(tài)監(jiān)聽功能(也即刪除的節(jié)點不會再被監(jiān)聽到)。
[zk: localhost:2181(CONNECTED) 5] delete /servers/zk010000000000 [zk: localhost:2181(CONNECTED) 7] delete /servers/zk020000000001 [zk: localhost:2181(CONNECTED) 8] delete /servers/zk030000000002
使用Java代碼來測試
-
先啟動客戶端代碼
-
再啟動服務(wù)端代碼
只是在服務(wù)端代碼中,我們的 register 方法中傳參用到了 args ,所以啟動之前要傳入這個參數(shù)。
傳入 192.168.58.200 之后,可以看到服務(wù)端代碼已經(jīng)能夠?qū)崿F(xiàn)動態(tài)上線了。
這里我們的 192.168.58.200 動態(tài)上線之后,可以看到客戶端也正常的監(jiān)聽到它了。
轉(zhuǎn)到zk命令行中,也可以看到這臺服務(wù)器的節(jié)點信息。
由于我們之前的服務(wù)端代碼還啟動著,此時我們再傳入新的參數(shù) 192.168.58.200:2182,那么之前的2181服務(wù)器肯定會被擠掉(這里模擬就是main方法,同一個類肯定只能同時啟動一次main方法了),那么我們看看客戶端能不能動態(tài)監(jiān)聽到2181下線、2182上線。
服務(wù)端自然可以正常實現(xiàn)2182這臺服務(wù)器的動態(tài)上線。在客戶端代碼中可以看到List集合中已經(jīng)沒有2181了(即2181已經(jīng)下線了),而2182正常上線
3.Zookeeper分布式鎖
3.1 什么是分布式鎖
傳統(tǒng)單體應(yīng)用單機部署的情況下,可以使用并發(fā)處理相關(guān)的功能進行互斥控制,但是原單體單機部署的系統(tǒng)被演化成分布式集群系統(tǒng)后,由于分布式系統(tǒng)多線程、多進程并且分布在不同機器上,這將使原單機部署情況下的并發(fā)控制鎖策略失效。提出分布式鎖的概念,是為了解決跨機器的互斥機制來控制共享資源的訪問。
3.2 Zookeeper分布式鎖分析
客戶端(對zookeeper集群而言)向zookeeper集群進行上線注冊,并在一個永久節(jié)點下創(chuàng)建有序的臨時子節(jié)點后,根據(jù)編號順序,最小順序的子節(jié)點獲取到鎖,其他子節(jié)點由小到大監(jiān)聽前一個節(jié)點。
當(dāng)拿到鎖的節(jié)點處理完事務(wù)后,釋放鎖,后一個節(jié)點監(jiān)聽到前一個節(jié)點釋放鎖后,立刻申請獲得鎖,以此類推
3.3 分布式鎖實現(xiàn)
1)創(chuàng)建 Distributedlock類, 獲取與zookeeper的連接
-
構(gòu)造方法中獲取連接
-
添加 CountDownLatch
CountDownLatch是具有synchronized機制的一個工具,目的是讓一個或者多個線程等待,直到其他線程的一系列操作完成。
CountDownLatch初始化的時候,需要提供一個整形數(shù)字,數(shù)字代表著線程需要調(diào)用countDown()方法的次數(shù),當(dāng)計數(shù)為0時,線程才會繼續(xù)執(zhí)行await()方法后的其他內(nèi)容。
/** * 分布式鎖 */ public class DistributedLock { private ZooKeeper client; // 連接信息 private String connectString = "192.168.58.200:2181,192.168.58.200:2182,192.168.58.200:2183"; // 超時時間 private int sessionTimeOut = 30000; private CountDownLatch countDownLatch = new CountDownLatch(1); //1. 在構(gòu)造方法中獲取連接 public DistributedLock() throws Exception { client = new ZooKeeper(connectString, sessionTimeOut, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { } }); //等待Zookeeper連接成功,連接完成繼續(xù)往下走 countDownLatch.await(); //2. 判斷節(jié)點是否存在 } //3.對ZK加鎖 public void zkLock(){ //創(chuàng)建 臨時帶序號節(jié)點 //判斷 創(chuàng)建的節(jié)點是否是最小序號節(jié)點,如果是 就獲取到鎖;如果不是就監(jiān)聽前一個節(jié)點 } //4.解鎖 public void unZkLock(){ //刪除節(jié)點 } }
2)對zk加鎖
/** * 分布式鎖 */ public class DistributedLock { private ZooKeeper client; // 連接信息 private String connectString = "192.168.58.200:2181,192.168.58.200:2182,192.168.58.200:2183"; // 超時時間 private int sessionTimeOut = 30000; // 等待zk連接成功 private CountDownLatch countDownLatch = new CountDownLatch(1); // 等待節(jié)點變化 private CountDownLatch waitLatch = new CountDownLatch(1); //當(dāng)前節(jié)點 private String currentNode; //前一個節(jié)點路徑 private String waitPath; //1. 在構(gòu)造方法中獲取連接 public DistributedLock() throws Exception { client = new ZooKeeper(connectString, sessionTimeOut, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { //countDownLatch 連上ZK,可以釋放 if(watchedEvent.getState() == Event.KeeperState.SyncConnected){ countDownLatch.countDown(); } //waitLatch 需要釋放 (節(jié)點被刪除并且刪除的是前一個節(jié)點) if(watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)){ waitLatch.countDown(); } } }); //等待Zookeeper連接成功,連接完成繼續(xù)往下走 countDownLatch.await(); //2. 判斷節(jié)點是否存在 Stat stat = client.exists("/locks", false); if(stat == null){ //創(chuàng)建一下根節(jié)點 client.create("/locks","locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } //3.對ZK加鎖 public void zkLock(){ //創(chuàng)建 臨時帶序號節(jié)點 try { currentNode = client.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); //判斷 創(chuàng)建的節(jié)點是否是最小序號節(jié)點,如果是 就獲取到鎖;如果不是就監(jiān)聽前一個節(jié)點 List<String> children = client.getChildren("/locks", false); //如果創(chuàng)建的節(jié)點只有一個值,就直接獲取到鎖,如果不是,監(jiān)聽它前一個節(jié)點 if(children.size() == 1){ return; }else{ //先排序 Collections.sort(children); //獲取節(jié)點名稱 String nodeName = currentNode.substring("/locks/".length()); //通過名稱獲取該節(jié)點在集合的位置 int index = children.indexOf(nodeName); //判斷 if(index == -1){ System.out.println("數(shù)據(jù)異常"); }else if(index == 0){ //就一個節(jié)點,可以獲取鎖 return; }else{ //需要監(jiān)聽前一個節(jié)點變化 waitPath = "/locks/" + children.get(index-1); client.getData(waitPath,true,null); //等待監(jiān)聽執(zhí)行 waitLatch.await(); return; } } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
3)zk刪除鎖文章來源:http://www.zghlxwxcb.cn/news/detail-828621.html
//4.解鎖 public void unZkLock() throws KeeperException, InterruptedException { //刪除節(jié)點 client.delete(currentNode,-1); }
4)測試文章來源地址http://www.zghlxwxcb.cn/news/detail-828621.html
public class DistributedLockTest { ? ? ?public static void main(String[] args) throws Exception { ? ? ? ? ?final DistributedLock lock1 = new DistributedLock(); ? ? ? ?final DistributedLock lock2 = new DistributedLock(); ? ? ? ? ?new Thread(new Runnable() { ? ? ? ? ? ?@Override ? ? ? ? ? ?public void run() { ? ? ? ? ? ? ? ? ?try { ? ? ? ? ? ? ? ? ? ?lock1.zkLock(); ? ? ? ? ? ? ? ? ? ?System.out.println("線程1 啟動 獲取到鎖"); ? ? ? ? ? ? ? ? ? ? ?Thread.sleep(5 * 1000); ? ? ? ? ? ? ? ? ? ?lock1.unZkLock(); ? ? ? ? ? ? ? ? ? ?System.out.println("線程1 釋放鎖"); ? ? ? ? ? ? ? } catch (InterruptedException | KeeperException e) { ? ? ? ? ? ? ? ? ? ?e.printStackTrace(); ? ? ? ? ? ? ? } ? ? ? ? ? } ? ? ? }).start(); ? ? ? ? ?new Thread(new Runnable() { ? ? ? ? ? ?@Override ? ? ? ? ? ?public void run() { ? ? ? ? ? ? ? ? ?try { ? ? ? ? ? ? ? ? ? ?lock2.zkLock(); ? ? ? ? ? ? ? ? ? ?System.out.println("線程2 啟動 獲取到鎖"); ? ? ? ? ? ? ? ? ? ? ?Thread.sleep(5 * 1000); ? ? ? ? ? ? ? ? ? ?lock2.unZkLock(); ? ? ? ? ? ? ? ? ? ?System.out.println("線程2 釋放鎖"); ? ? ? ? ? ? ? } catch (InterruptedException | KeeperException e) { ? ? ? ? ? ? ? ? ? ?e.printStackTrace(); ? ? ? ? ? ? ? } ? ? ? ? ? } ? ? ? }).start(); ? } }
到了這里,關(guān)于第四節(jié) zookeeper集群與分布式鎖的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!