目錄
zookeeper知識點復(fù)習
相關(guān)概念
java客戶端操作
實現(xiàn)思路分析?
基本實現(xiàn)
初始化鏈接
代碼落地?
優(yōu)化:性能優(yōu)化
?實現(xiàn)阻塞鎖
監(jiān)聽實現(xiàn)阻塞鎖
優(yōu)化:可重入鎖
zk分布式鎖小結(jié)?
zookeeper知識點復(fù)習
Zookeeper(業(yè)界簡稱zk)是一種提供配置管理、分布式協(xié)同以及命名的中心化服務(wù),這些提供的
功能都是分布式系統(tǒng)中非常底層且必不可少的基本功能,但是如果自己實現(xiàn)這些功能而且要達到高吞吐、低延遲同時還要保持一致性和可用性,實際上非常困難。因此zookeeper提供了這些功能,開發(fā)者在zookeeper之上構(gòu)建自己的各種分布式系統(tǒng)。
相關(guān)概念
Zookeeper提供一個多層級的節(jié)點命名空間(節(jié)點稱為znode),每個節(jié)點都用一個以斜杠(/)分隔的路徑表示,而且每個節(jié)點都有父節(jié)點(根節(jié)點除外),非常類似于文件系統(tǒng)。并且每個節(jié)點都是唯一的。
znode節(jié)點有四種類型:
- PERSISTENT:永久節(jié)點??蛻舳伺czookeeper斷開連接后,該節(jié)點依舊存在
- EPHEMERAL:臨時節(jié)點??蛻舳伺czookeeper斷開連接后,該節(jié)點被刪除
- PERSISTENT_SEQUENTIAL:永久節(jié)點、序列化。客戶端與zookeeper斷開連接后,該節(jié)點依舊存在,只是Zookeeper給該節(jié)點名稱進行順序編號
- EPHEMERAL_SEQUENTIAL:臨時節(jié)點、序列化??蛻舳伺czookeeper斷開連接后,該節(jié)點被刪除,只是Zookeeper給該節(jié)點名稱進行順序編號
創(chuàng)建這四種節(jié)點:
?事件監(jiān)聽:在讀取數(shù)據(jù)時,我們可以同時對節(jié)點設(shè)置事件監(jiān)聽,當節(jié)點數(shù)據(jù)或結(jié)構(gòu)變化時,zookeeper會通知客戶端。當前zookeeper有如下四種事件:
1. 節(jié)點創(chuàng)建
2. 節(jié)點刪除
3. 節(jié)點數(shù)據(jù)修改
4. 子節(jié)點變更
java客戶端操作
1. 引入依賴
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.14</version>
</dependency>
2. 常用api及其方法
初始化zookeeper客戶端類,負責建立與zkServer的會話?
new ZooKeeper(connectString, 30000, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("獲取鏈接成功??!");
}
});
創(chuàng)建一個節(jié)點,1-節(jié)點路徑 2-節(jié)點內(nèi)容 3-訪問控制控制 4-節(jié)點類型?
String fullPath = zooKeeper.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
判斷一個節(jié)點是否存在
Stat stat = zooKeeper.exists(rootPath, false);
if (stat != null) {...}
查詢一個節(jié)點的內(nèi)容?
Stat stat = new Stat();
byte[] data = zooKeeper.getData(path, false, stat);
更新一個節(jié)點?
zooKeeper.setData(rootPath, new byte[]{}, stat.getVersion() + 1);
刪除一個節(jié)點?
zooKeeper.delete(path, stat.getVersion());
查詢一個節(jié)點的子節(jié)點列表?
List<String> children = zooKeeper.getChildren(rootPath, false);
關(guān)閉鏈接?
if (zooKeeper != null) {
zooKeeper.close();
}
實現(xiàn)思路分析?
?分布式鎖的步驟:
1. 獲取鎖:create一個節(jié)點
2. 刪除鎖:delete一個節(jié)點
3. 重試:沒有獲取到鎖的請求重試
參照redis分布式鎖的特點:
????????1. 互斥 排他
????????2. 防死鎖:
????????1. 可自動釋放鎖(臨時節(jié)點) :獲得鎖之后客戶端所在機器宕機了,客戶端沒有主動刪除子節(jié)點;如果創(chuàng)建的是永久的節(jié)點,那么這個鎖永遠不會釋放,導(dǎo)致死鎖;由于創(chuàng)建的是臨時節(jié)點,客戶端宕機后,過了一定時間zookeeper沒有收到客戶端的心跳包判斷會話失效,將臨時節(jié)點刪除從而釋放鎖。
????????2. 可重入鎖:借助于ThreadLocal
3. 防誤刪:宕機自動釋放臨時節(jié)點,不需要設(shè)置過期時間,也就不存在誤刪問題。
4. 加鎖/解鎖要具備原子性
5. 單點問題:使用Zookeeper可以有效的解決單點問題,ZK一般是集群部署的。
6. 集群問題:zookeeper集群是強一致性的,只要集群中有半數(shù)以上的機器存活,就可以對外提供服務(wù)。
基本實現(xiàn)
實現(xiàn)思路:
1. 多個請求同時添加一個相同的臨時節(jié)點,只有一個可以添加成功。添加成功的獲取到鎖
2. 執(zhí)行業(yè)務(wù)邏輯
3. 完成業(yè)務(wù)流程后,刪除節(jié)點釋放鎖。
初始化鏈接
由于zookeeper獲取鏈接是一個耗時過程,這里可以在項目啟動時,初始化鏈接,并且只初始化一次。借助于spring特性,代碼實現(xiàn)如下:
@Component
public class zkClient {
private static final String connectString = "192.168.107.135";
private static final String ROOT_PATH = "/distributed";
private ZooKeeper zooKeeper;
@PostConstruct
public void init() throws IOException {
this.zooKeeper = new ZooKeeper(connectString, 30000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("zookeeper 獲取鏈接成功");
}
});
//創(chuàng)建分布式鎖根節(jié)點
try {
if (this.zooKeeper.exists(ROOT_PATH, false) == null) {
this.zooKeeper.create(ROOT_PATH, null,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@PreDestroy
public void destroy() {
if (zooKeeper != null) {
try {
zooKeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 初始化分布式對象方法
*/
public ZkDistributedLock getZkDistributedLock(String lockname){
return new ZkDistributedLock(zooKeeper,lockname);
}
}
代碼落地?
public class ZkDistributedLock {
public static final String ROOT_PATH = "/distribute";
private String path;
private ZooKeeper zooKeeper;
public ZkDistributedLock(ZooKeeper zooKeeper, String lockname) {
this.zooKeeper = zooKeeper;
this.path = ROOT_PATH + "/" + lockname;
}
public void lock() {
try {
zooKeeper.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
Thread.sleep(200);
lock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void unlock(){
try {
this.zooKeeper.delete(path,0);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
改造StockService的checkAndLock方法:
@Autowired
private zkClient client;
public void checkAndLock() {
// 加鎖,獲取鎖失敗重試
ZkDistributedLock lock = this.client.getZkDistributedLock("lock");
lock.lock();
// 先查詢庫存是否充足
Stock stock = this.stockMapper.selectById(1L);
// 再減庫存
if (stock != null && stock.getCount() > 0) {
stock.setCount(stock.getCount() - 1);
this.stockMapper.updateById(stock);
}
lock.unlock();
}
Jmeter壓力測試:
?性能一般,mysql數(shù)據(jù)庫的庫存余量為0(注意:所有測試之前都要先修改庫存量為5000)
基本實現(xiàn)存在的問題:
????????1. 性能一般(比mysql略好)
????????2. 不可重入
接下來首先來提高性能
優(yōu)化:性能優(yōu)化
基本實現(xiàn)中由于無限自旋影響性能:
試想:每個請求要想正常的執(zhí)行完成,最終都是要創(chuàng)建節(jié)點,如果能夠避免爭搶必然可以提高性能。這里借助于zk的臨時序列化節(jié)點,實現(xiàn)分布式鎖:?
?實現(xiàn)阻塞鎖
代碼實現(xiàn):
public class ZkDistributedLock {
public static final String ROOT_PATH = "/distribute";
private String path;
private ZooKeeper zooKeeper;
public ZkDistributedLock(ZooKeeper zooKeeper, String lockname) {
this.zooKeeper = zooKeeper;
try {
this.path = zooKeeper.create(ROOT_PATH + "/" + lockname + "_",
null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void lock() {
String preNode = getpreNode(path);
//如果該節(jié)點沒有前一個節(jié)點,說明該節(jié)點是最小的節(jié)點
if (StringUtils.isEmpty(preNode)) {
return;
}
//重新檢查是否獲取到鎖
try {
Thread.sleep(20);
lock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 獲取指定節(jié)點的前節(jié)點
*
* @param path
* @return
*/
private String getpreNode(String path) {
//獲取當前節(jié)點的序列化序號
Long curSerial = Long.valueOf(StringUtil.substringAfter(path, '_'));
//獲取根路徑下的所有序列化子節(jié)點
try {
List<String> nodes = this.zooKeeper.getChildren(ROOT_PATH, false);
//判空處理
if (CollectionUtils.isEmpty(nodes)) {
return null;
}
//獲取前一個節(jié)點
Long flag = 0L;
String preNode = null;
for (String node : nodes) {
//獲取每個節(jié)點的序列化號
Long serial = Long.valueOf(StringUtil.substringAfter(path, '_'));
if (serial < curSerial && serial > flag) {
flag = serial;
preNode = node;
}
}
return preNode;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
public void unlock() {
try {
this.zooKeeper.delete(path, 0);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
主要修改了構(gòu)造方法和lock方法:
?并添加了getPreNode獲取前置節(jié)點的方法。
測試結(jié)果如下:
?性能反而更弱了。
原因:雖然不用反復(fù)爭搶創(chuàng)建節(jié)點了,但是會自選判斷自己是最小的節(jié)點,這個判斷邏輯反而更復(fù)雜更 耗時。
解決方案:監(jiān)聽實現(xiàn)阻塞鎖
監(jiān)聽實現(xiàn)阻塞鎖
對于這個算法有個極大的優(yōu)化點:假如當前有1000個節(jié)點在等待鎖,如果獲得鎖的客戶端釋放鎖時,這1000個客戶端都會被喚醒,這種情況稱為“羊群效應(yīng)”;在這種羊群效應(yīng)中,zookeeper需要通知1000個 客戶端,這會阻塞其他的操作,最好的情況應(yīng)該只喚醒新的最小節(jié)點對應(yīng)的客戶端。應(yīng)該怎么做呢?在 設(shè)置事件監(jiān)聽時,每個客戶端應(yīng)該對剛好在它之前的子節(jié)點設(shè)置事件監(jiān)聽,例如子節(jié)點列表 為/lock/lock-0000000000、/lock/lock-0000000001、/lock/lock-0000000002,序號為1的客戶端監(jiān)聽 序號為0的子節(jié)點刪除消息,序號為2的監(jiān)聽序號為1的子節(jié)點刪除消息。
所以調(diào)整后的分布式鎖算法流程如下:
- 客戶端連接zookeeper,并在/lock下創(chuàng)建臨時的且有序的子節(jié)點,第一個客戶端對應(yīng)的子節(jié)點 為/lock/lock-0000000000,第二個為/lock/lock-0000000001,以此類推;
- 客戶端獲取/lock下的子節(jié)點列表,判斷自己創(chuàng)建的子節(jié)點是否為當前子節(jié)點列表中序號最小的子 節(jié)點,如果是則認為獲得鎖,否則監(jiān)聽剛好在自己之前一位的子節(jié)點刪除消息,獲得子節(jié)點變更通 知后重復(fù)此步驟直至獲得鎖;
- 執(zhí)行業(yè)務(wù)代碼;
- 完成業(yè)務(wù)流程后,刪除對應(yīng)的子節(jié)點釋放鎖。
改造ZkDistributedLock的lock方法:
public void lock() {
String preNode = getpreNode(path);
//如果該節(jié)點沒有前一個節(jié)點,說明該節(jié)點是最小的節(jié)點
if (StringUtils.isEmpty(preNode)) {
return;
} else {
CountDownLatch countDownLatch = new CountDownLatch(1);
try {
if (this.zooKeeper.exists(ROOT_PATH + "/" + preNode, watchedEvent -> {
countDownLatch.countDown();
}) == null) {
return;
}
countDownLatch.await();
return;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock();
}
}
壓力測試效果如下:
?由此可見性能提高不少僅次于redis的分布式鎖
優(yōu)化:可重入鎖
引入ThreadLocal線程局部變量保證zk分布式鎖的可重入性。
在對應(yīng)的線程的存儲數(shù)據(jù)
public class ZkDistributedLock {
public static final String ROOT_PATH = "/distribute";
private String path;
private ZooKeeper zooKeeper;
private static final ThreadLocal<Integer> THREAD_LOCAL = new ThreadLocal<>();
public ZkDistributedLock(ZooKeeper zooKeeper, String lockname) {
this.zooKeeper = zooKeeper;
try {
this.path = zooKeeper.create(ROOT_PATH + "/" + lockname + "_",
null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void lock() {
Integer flag = THREAD_LOCAL.get();
if (flag != null && flag > 0) {
THREAD_LOCAL.set(flag + 1);
return;
}
String preNode = getpreNode(path);
//如果該節(jié)點沒有前一個節(jié)點,說明該節(jié)點是最小的節(jié)點
if (StringUtils.isEmpty(preNode)) {
return;
} else {
CountDownLatch countDownLatch = new CountDownLatch(1);
try {
if (this.zooKeeper.exists(ROOT_PATH + "/" + preNode, watchedEvent -> {
countDownLatch.countDown();
}) == null) {
return;
}
countDownLatch.await();
THREAD_LOCAL.set(1);
return;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock();
}
}
/**
* 獲取指定節(jié)點的前節(jié)點
*
* @param path
* @return
*/
private String getpreNode(String path) {
//獲取當前節(jié)點的序列化序號
Long curSerial = Long.valueOf(StringUtil.substringAfter(path, '_'));
//獲取根路徑下的所有序列化子節(jié)點
try {
List<String> nodes = this.zooKeeper.getChildren(ROOT_PATH, false);
//判空處理
if (CollectionUtils.isEmpty(nodes)) {
return null;
}
//獲取前一個節(jié)點
Long flag = 0L;
String preNode = null;
for (String node : nodes) {
//獲取每個節(jié)點的序列化號
Long serial = Long.valueOf(StringUtil.substringAfter(path, '_'));
if (serial < curSerial && serial > flag) {
flag = serial;
preNode = node;
}
}
return preNode;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
public void unlock() {
try {
THREAD_LOCAL.set(THREAD_LOCAL.get() - 1);
if (THREAD_LOCAL.get() == 0) {
this.zooKeeper.delete(path, 0);
THREAD_LOCAL.remove();
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
文章來源:http://www.zghlxwxcb.cn/news/detail-433256.html
zk分布式鎖小結(jié)?
參照redis分布式鎖的特點:
1. 互斥 排他:zk節(jié)點的不可重復(fù)性,以及序列化節(jié)點的有序性
2. 防死鎖:
????????1. 可自動釋放鎖:臨時節(jié)點
????????2. 可重入鎖:借助于ThreadLocal
3. 防誤刪:臨時節(jié)點
4. 加鎖/解鎖要具備原子性
5. 單點問題:使用Zookeeper可以有效的解決單點問題,ZK一般是集群部署的。
6. 集群問題:zookeeper集群是強一致性的,只要集群中有半數(shù)以上的機器存活,就可以對外提供服務(wù)。
7. 公平鎖:有序性節(jié)點文章來源地址http://www.zghlxwxcb.cn/news/detail-433256.html
到了這里,關(guān)于基于zookeeper實現(xiàn)分布式鎖的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!