1 Zookeeper分布式案例
1.1 Zookeeper分布式鎖原理
核心思想:當(dāng)客戶端要獲取鎖,則創(chuàng)建節(jié)點(diǎn),使用完鎖,則刪除該節(jié)點(diǎn)。
當(dāng)我們假設(shè)根節(jié)點(diǎn)/ 下有/locks節(jié)點(diǎn)時
1)客戶端獲取鎖時,在locks節(jié)點(diǎn)下創(chuàng)建臨時順序節(jié)點(diǎn)。
2)然后獲取lock下面的所有子節(jié)點(diǎn),客戶端獲取到所有的子節(jié)點(diǎn)之后,如果發(fā)現(xiàn)自己創(chuàng)建的子節(jié)點(diǎn)序號最小,那么就認(rèn)為該客戶端獲取到了鎖。(即需要小的優(yōu)先)使用完鎖后,將刪除該結(jié)點(diǎn)。
3)如果發(fā)現(xiàn)自己創(chuàng)建的節(jié)點(diǎn)并非locks所有子節(jié)點(diǎn)中最小的,說明自己還沒獲取到鎖,此時客戶端需要找到比自己小的那個節(jié)點(diǎn),同時對其注冊事件監(jiān)聽器,監(jiān)聽刪除事件。
4)如果發(fā)現(xiàn)比自己小的那個節(jié)點(diǎn)被刪除,則客戶端的Watcher會收到相應(yīng)通知,此時再次判斷自己創(chuàng)建的節(jié)點(diǎn)是否是locks子節(jié)點(diǎn)中序號最小的,如果是則獲取到了鎖,如果不是則重復(fù)以上步驟繼續(xù)獲取到比自己小的一個節(jié)點(diǎn)并注冊監(jiān)聽。
1.2 分布式鎖案例分析
- 客戶端獲取到鎖時創(chuàng)建臨時順序節(jié)點(diǎn) create -e -s /locks/seq-
- 接收到請求后,在/locks節(jié)點(diǎn)下創(chuàng)建一個臨時順序節(jié)點(diǎn)
- 判斷自己是不是當(dāng)前節(jié)點(diǎn)下最小的節(jié)點(diǎn),如果是,獲取到鎖;如果不是,對前一個節(jié)點(diǎn)進(jìn)行監(jiān)聽
- 獲取到鎖,處理完業(yè)務(wù)以后,delete節(jié)點(diǎn)釋放鎖,然后下面的節(jié)點(diǎn)將會收到通知,重復(fù)上述判斷
1.2.1 原生Zookeeper實(shí)現(xiàn)代碼實(shí)現(xiàn)
package com.clear.case2;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* 分布式鎖案例
*/
public class DistributedLock {
private final String connectString = "kk01:2181,kk02:2181,kk01:2181";
private final int sessionTimeout = 2000;
private final ZooKeeper zk;
private CountDownLatch countDownLatch = new CountDownLatch(1);
private CountDownLatch waitLatch = new CountDownLatch(1);
private String waitPath;
private String currentMode;
public DistributedLock() throws IOException, InterruptedException, KeeperException {
// 獲取連接
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// countDownLatch 如果連接上zk,可以釋放
if (event.getState() == Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
// waitLatch 可以釋放
if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {
waitLatch.countDown();
}
System.out.println("");
}
});
// 等待zk正常連接后再執(zhí)行后續(xù)程序
countDownLatch.await();
// 判斷根節(jié)點(diǎn)/locks是否存在
Stat stat = zk.exists("/locks", false);
if (stat == null) {
// 創(chuàng)建根節(jié)點(diǎn)(根節(jié)點(diǎn)為持久節(jié)點(diǎn))
zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
// 對zk加鎖
public void zkLock() {
// 創(chuàng)建對應(yīng)的臨時帶序號的節(jié)點(diǎn)
try {
currentMode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 判斷創(chuàng)建的節(jié)點(diǎn)是否是根目錄下最小節(jié)點(diǎn),如果是獲得鎖;如果不是,監(jiān)聽它序號前一個節(jié)點(diǎn)
List<String> children = zk.getChildren("/locks", false);
// 如果children只有一個值,那就直接獲取鎖,如果有多個節(jié)點(diǎn),那就需要判斷誰最小
if (children.size() == 1) {
return;
} else {
Collections.sort(children);
// 獲取節(jié)點(diǎn)名稱 seq-0000...
String thisNode = currentMode.substring("/locks/".length());
// 通過 seq-0000... 獲取其在集合children 中的位置
int index = children.indexOf(thisNode);
// 判斷
if (index == -1) {
System.out.println("數(shù)據(jù)異常");
} else if (index == 0) {
// 就一個節(jié)點(diǎn),獲取到了鎖
return;
} else {
// 監(jiān)聽它前一個節(jié)點(diǎn)
waitPath = "/locks/" + children.get(index - 1);
zk.getData(waitPath, true, null);
// 等待監(jiān)聽
waitLatch.await();
return;
}
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 對zk解鎖
public void unZkLock() {
// 刪除節(jié)點(diǎn)
try {
zk.delete(currentMode, -1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
1.2.2 測試代碼
package com.clear.case2;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* 分布式鎖案例
*/
public class DistributedLockTest {
public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
final DistributedLock lock1 = new DistributedLock();
final DistributedLock lock2 = new DistributedLock();
new Thread(new Runnable() {
@Override
public void run() {
try {
lock1.zkLock();
System.out.println("線程1 啟動,獲取到鎖");
Thread.sleep(5000);
lock1.unZkLock();
System.out.println("線程1 釋放鎖");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
lock2.zkLock();
System.out.println("線程2 啟動,獲取到鎖");
Thread.sleep(5000);
lock2.unZkLock();
System.out.println("線程2 釋放鎖");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
啟動后,結(jié)果如下
線程1 啟動,獲取到鎖
線程1 釋放鎖
線程2 啟動,獲取到鎖
線程2 釋放鎖
兩個線程不會同時得到鎖,此致,分布式鎖案例完成
1.2.3 Curator 框架實(shí)現(xiàn)分布式案例
1)原生的 Java API 開發(fā)存在的問題
- 會話連接是異步的,需要自己去處理,比如使用 CountDownLatch
- Watch 需要重復(fù)注冊,不然就不能生效
- 開發(fā)的復(fù)雜性還是比較高的
- 不支持多節(jié)點(diǎn)刪除和創(chuàng)建。需要自己去遞歸
2)Curator是一個專門解決分布式鎖的框架,解決了原生Java API 開發(fā)分布式遇到的問題
Cutator官方文檔 https://curator.apache.org/index.html
1、導(dǎo)入依賴
<!-- curator-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.0.0</version>
</dependency>
2、代碼實(shí)現(xiàn)文章來源:http://www.zghlxwxcb.cn/news/detail-698626.html
package com.clear.case3;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.log4j.Logger;
public class CuratorLockTest {
private final static Logger logger = Logger.getLogger(CuratorLockTest.class);
public static void main(String[] args) {
// 創(chuàng)建分布式鎖1
InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");
// 創(chuàng)建分布式鎖2
InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");
new Thread(new Runnable() {
@Override
public void run() {
try {
lock1.acquire();
System.out.println("線程1 獲取到鎖");
lock1.acquire();
System.out.println("線程1 再次獲取到鎖");
Thread.sleep(5000);
lock1.release();
System.out.println("線程1 釋放鎖");
lock1.release();
System.out.println("線程1 再次釋放鎖");
} catch (Exception exception) {
exception.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
lock2.acquire();
System.out.println("線程2 獲取到鎖");
lock2.acquire();
System.out.println("線程2 再次獲取到鎖");
Thread.sleep(5000);
lock2.release();
System.out.println("線程2 釋放鎖");
lock2.release();
System.out.println("線程2 再次釋放鎖");
} catch (Exception exception) {
exception.printStackTrace();
}
}
}).start();
}
private static CuratorFramework getCuratorFramework() {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("kk01:2181,kk02:2181,kk03:2181")
.connectionTimeoutMs(2000)
.sessionTimeoutMs(2000)
.retryPolicy(new ExponentialBackoffRetry(3000, 3))
.build();
// 啟動客戶端
client.start();
logger.info("zookeeper啟動成功");
return client;
}
}
結(jié)果如下文章來源地址http://www.zghlxwxcb.cn/news/detail-698626.html
線程2 獲取到鎖
線程2 再次獲取到鎖
線程2 釋放鎖
線程2 再次釋放鎖
線程1 獲取到鎖
線程1 再次獲取到鎖
到了這里,關(guān)于02-zookeeper分布式鎖案例的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!