目錄
一、基于zookeeper的分布式鎖
1.1 基于Zookeeper實(shí)現(xiàn)分布式鎖的原理
1.1.1 分布式鎖特性說(shuō)明
1.1.1.1 特點(diǎn)分析
1.1.1.2 本質(zhì)
1.1.2 Zookeeper 分布式鎖實(shí)現(xiàn)原理
1.1.2.1 Zookeeper臨時(shí)順序節(jié)點(diǎn)特性
1.1.2.2 Zookeeper滿足分布式鎖基本要求
1.1.2.3 Watcher機(jī)制
1.1.2.3 總結(jié)
1.2 分布式鎖流程說(shuō)明
1.2.1 分布式鎖流程圖
1.2.2 流程說(shuō)明
1.3 分布式鎖代碼實(shí)現(xiàn)
1.3.1 自己手寫(xiě),實(shí)現(xiàn)Lock接口
1.3.1.1 分布式鎖ZookeeperDistributedLock
1.3.1.2 模擬下單處理OrderServiceHandle
1.3.1.3 訂單號(hào)生成類(lèi)OrderCodeGenerator
1.3.1.4 分布式鎖測(cè)試類(lèi)TestZookeeperDistributedLock
1.3.1.5 測(cè)試效果
1.3.2 基于Apache Curator 框架調(diào)用
1.3.2.1 maven依賴(lài)
1.3.2.2 代碼實(shí)現(xiàn)
1.3.2.2.1 分布式鎖類(lèi)CuratorDistributeLock
1.3.2.2.2 測(cè)試類(lèi)TestCuratorDistributedLock
1.3.2.3 執(zhí)行結(jié)果
一、基于zookeeper的分布式鎖
1.1 基于Zookeeper實(shí)現(xiàn)分布式鎖的原理
1.1.1 分布式鎖特性說(shuō)明
1.1.1.1 特點(diǎn)分析
- 每次只能一個(gè)占用鎖;
- 可以重復(fù)進(jìn)入鎖;
- 只有占用者才可以解鎖;
- 獲取鎖和釋放鎖都需要原子
- 不能產(chǎn)生死鎖
- 盡量滿足性能
1.1.1.2 本質(zhì)
同步互斥,使得處理任務(wù)能夠一個(gè)一個(gè)逐步的過(guò)臨界資源。
1.1.2 Zookeeper 分布式鎖實(shí)現(xiàn)原理
1.1.2.1 Zookeeper臨時(shí)順序節(jié)點(diǎn)特性
zookeeper中有一種臨時(shí)順序節(jié)點(diǎn),它具有以下特征:
- 時(shí)效性,當(dāng)會(huì)話結(jié)束,節(jié)點(diǎn)將自動(dòng)被刪除
- 順序性,當(dāng)多個(gè)應(yīng)用向其注冊(cè)順序節(jié)點(diǎn)時(shí),每個(gè)順序號(hào)將只能被一個(gè)應(yīng)用獲取
1.1.2.2 Zookeeper滿足分布式鎖基本要求
- 因?yàn)轫樞蛐裕梢宰屪钚№樞蛱?hào)的應(yīng)用獲取到鎖,從而滿足分布式鎖的 每次只能一個(gè)占用鎖,因?yàn)橹挥兴粋€(gè)獲取到,所以可以實(shí)現(xiàn) 重復(fù)進(jìn)入?,只要設(shè)置標(biāo)識(shí)即可。鎖的釋放,即刪除應(yīng)用在zookeeper上注冊(cè)的節(jié)點(diǎn),因?yàn)槊總€(gè)節(jié)點(diǎn)只被自己注冊(cè)擁有,所以只有自己才能刪除,這樣就滿足只有占用者才可以解鎖
- zookeeper的序號(hào)分配是原子的,分配后即不會(huì)再改變,讓最小序號(hào)者獲取鎖,所以獲取鎖是原子的
- 因?yàn)樽?cè)的是臨時(shí)節(jié)點(diǎn),在會(huì)話期間內(nèi)有效,所以不會(huì)產(chǎn)生死鎖
- zookeeper注冊(cè)節(jié)點(diǎn)的性能能滿足幾千,而且支持集群,能夠滿足大部分情況下的性能
1.1.2.3 Watcher機(jī)制
Zookeeper 允許客戶端向服務(wù)端的某個(gè) Znode 注冊(cè)一個(gè) Watcher 監(jiān)聽(tīng),當(dāng)服務(wù)端的一些指定事
件觸發(fā)了這個(gè) Watcher,服務(wù)端會(huì)向指定客戶端發(fā)送一個(gè)事件通知來(lái)實(shí)現(xiàn)分布式的通知功能,然
后客戶端根據(jù) Watcher 通知狀態(tài)和事件類(lèi)型做出業(yè)務(wù)上的改變。
在實(shí)現(xiàn)分布式鎖的時(shí)候,主要利用這個(gè)機(jī)制,實(shí)現(xiàn)釋放鎖的時(shí)候,通知等待鎖的線程競(jìng)爭(zhēng)鎖。
1.1.2.3 總結(jié)
綜上可知,Zookeeper其實(shí)是基于臨時(shí)順序節(jié)點(diǎn)特性實(shí)現(xiàn)的分布式鎖。當(dāng)然,還結(jié)合了他的Watcher機(jī)制,實(shí)現(xiàn)釋放鎖的時(shí)候,通知等待鎖的線程去競(jìng)爭(zhēng)鎖。
1.2 分布式鎖流程說(shuō)明
1.2.1 分布式鎖流程圖
1.2.2 流程說(shuō)明
- client判斷/lock目錄是否存在,如果不存在則向其注冊(cè)/lock的持久節(jié)點(diǎn)
- client向/lock/目錄下注冊(cè)/lock/Node-前綴的臨時(shí)順序節(jié)點(diǎn),并得到順序號(hào)
- client獲取/lock/目錄下的所有臨時(shí)順序子節(jié)點(diǎn)
- client判斷臨時(shí)子節(jié)點(diǎn)序號(hào)中是否存在比自身的序號(hào)小的節(jié)點(diǎn)。如果不存在,則獲取到鎖;如果存在,則對(duì)象該臨時(shí)節(jié)點(diǎn)做watch監(jiān)控
- 獲得鎖的線程,執(zhí)行業(yè)務(wù)邏輯,執(zhí)行完之后,刪除臨時(shí)節(jié)點(diǎn),完成鎖的釋放。
- 等待鎖的線程如果收到監(jiān)控的臨時(shí)節(jié)點(diǎn)被刪除的通知,則再重復(fù)4、5、6步驟,進(jìn)入下一個(gè)獲得鎖、釋放鎖的循環(huán)。
1.3 分布式鎖代碼實(shí)現(xiàn)
1.3.1 自己手寫(xiě),實(shí)現(xiàn)Lock接口
1.3.1.1 分布式鎖ZookeeperDistributedLock
package com.ningzhaosheng.distributelock.zookeeper;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* @author ningzhaosheng
* @date 2024/4/17 18:13:38
* @description 基于zookeeper實(shí)現(xiàn)的分布式鎖
*/
public class ZookeeperDistributedLock implements Lock {
private static Logger logger = LoggerFactory.getLogger(ZookeeperDistributedLock.class);
// zookeeper 地址
private String ZOOKEEPER_IP_PORT = "192.168.152.130:2181";
// zookeeper 鎖目錄
private String LOCK_PATH = "/LOCK";
// 創(chuàng)建 zookeeper客戶端zkClient
private ZkClient client = null;
private CountDownLatch cdl;
// 當(dāng)前請(qǐng)求的節(jié)點(diǎn)前一個(gè)節(jié)點(diǎn)
private String beforePath;
// 當(dāng)前請(qǐng)求的節(jié)點(diǎn)
private String currentPath;
/**
* 初始化客戶端和創(chuàng)建LOCK目錄
*
* @param ZOOKEEPER_IP_PORT
* @param LOCK_PATH
*/
public ZookeeperDistributedLock(String ZOOKEEPER_IP_PORT, String LOCK_PATH) {
this.ZOOKEEPER_IP_PORT = ZOOKEEPER_IP_PORT;
this.LOCK_PATH = LOCK_PATH;
client = new ZkClient(ZOOKEEPER_IP_PORT, 4000, 4000, new SerializableSerializer());
// 判斷有沒(méi)有LOCK目錄,沒(méi)有則創(chuàng)建
if (!this.client.exists(LOCK_PATH)) {
this.client.createPersistent(LOCK_PATH);
}
}
@Override
public void lock() {
if (!tryLock()) {
//對(duì)次小節(jié)點(diǎn)進(jìn)行監(jiān)聽(tīng)
waitForLock();
lock();
} else {
logger.info(Thread.currentThread().getName() + " 獲得分布式鎖!");
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock() {
// 如果currentPath為空則為第一次嘗試加鎖,第一次加鎖賦值currentPath
if (currentPath == null || currentPath.length() <= 0) {
// 創(chuàng)建一個(gè)臨時(shí)順序節(jié)點(diǎn)
currentPath = this.client.createEphemeralSequential(LOCK_PATH + '/', "lock");
System.out.println("---------------------------->" + currentPath);
}
// 獲取所有臨時(shí)節(jié)點(diǎn)并排序,臨時(shí)節(jié)點(diǎn)名稱(chēng)為自增長(zhǎng)的字符串如:0000000400
List<String> childrens = this.client.getChildren(LOCK_PATH);
//由小到大排序所有子節(jié)點(diǎn)
Collections.sort(childrens);
//判斷創(chuàng)建的子節(jié)點(diǎn)/LOCK/Node-n是否最小,即currentPath,如果當(dāng)前節(jié)點(diǎn)等于childrens中的最小的一個(gè)就占用鎖
if (currentPath.equals(LOCK_PATH + '/' + childrens.get(0))) {
return true;
}
//找出比創(chuàng)建的臨時(shí)順序節(jié)子節(jié)點(diǎn)/LOCK/Node-n次小的節(jié)點(diǎn),并賦值給beforePath
else {
int wz = Collections.binarySearch(childrens, currentPath.substring(6));
beforePath = LOCK_PATH + '/' + childrens.get(wz - 1);
}
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
//等待鎖,對(duì)次小節(jié)點(diǎn)進(jìn)行監(jiān)聽(tīng)
private void waitForLock() {
IZkDataListener listener = new IZkDataListener() {
public void handleDataDeleted(String dataPath) throws Exception {
logger.info(Thread.currentThread().getName() + ":捕獲到DataDelete事件!---------------------------");
if (cdl != null) {
cdl.countDown();
}
}
public void handleDataChange(String dataPath, Object data) throws Exception {
}
};
// 對(duì)次小節(jié)點(diǎn)進(jìn)行監(jiān)聽(tīng),即beforePath-給排在前面的的節(jié)點(diǎn)增加數(shù)據(jù)刪除的watcher
this.client.subscribeDataChanges(beforePath, listener);
if (this.client.exists(beforePath)) {
cdl = new CountDownLatch(1);
try {
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.client.unsubscribeDataChanges(beforePath, listener);
}
@Override
public void unlock() {
// 刪除當(dāng)前臨時(shí)節(jié)點(diǎn)
client.delete(currentPath);
}
@Override
public Condition newCondition() {
return null;
}
}
1.3.1.2 模擬下單處理OrderServiceHandle
package com.ningzhaosheng.distributelock.zookeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Lock;
/**
* @author ningzhaosheng
* @date 2024/4/17 21:45:46
* @description 模擬訂單處理
*/
public class OrderServiceHandle implements Runnable {
private static OrderCodeGenerator ong = new OrderCodeGenerator();
private Logger logger = LoggerFactory.getLogger(OrderServiceHandle.class);
// 按照線程數(shù)初始化倒計(jì)數(shù)器,倒計(jì)數(shù)器
private CountDownLatch cdl = null;
private Lock lock = null;
public OrderServiceHandle(CountDownLatch cdl, Lock lock) {
this.cdl = cdl;
this.lock = lock;
}
// 創(chuàng)建訂單
public void createOrder() {
String orderCode = null;
//準(zhǔn)備獲取鎖
lock.lock();
try {
// 獲取訂單編號(hào)
orderCode = ong.getOrderCode();
} catch (Exception e) {
// TODO: handle exception
} finally {
//完成業(yè)務(wù)邏輯以后釋放鎖
lock.unlock();
}
// ……業(yè)務(wù)代碼
logger.info("insert into DB使用id:=======================>" + orderCode);
}
@Override
public void run() {
try {
// 等待其他線程初始化
cdl.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// 創(chuàng)建訂單
createOrder();
}
}
1.3.1.3 訂單號(hào)生成類(lèi)OrderCodeGenerator
package com.ningzhaosheng.distributelock.zookeeper;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @author ningzhaosheng
* @date 2024/4/17 21:44:06
* @description 生成訂單號(hào)
*/
public class OrderCodeGenerator {
// 自增長(zhǎng)序列
private static int i = 0;
// 按照“年-月-日-小時(shí)-分鐘-秒-自增長(zhǎng)序列”的規(guī)則生成訂單編號(hào)
public String getOrderCode() {
Date now = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
return sdf.format(now) + ++i;
}
}
1.3.1.4 分布式鎖測(cè)試類(lèi)TestZookeeperDistributedLock
package com.ningzhaosheng.distributelock.zookeeper;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Lock;
/**
* @author ningzhaosheng
* @date 2024/4/17 21:48:28
* @description zookeeper分布式鎖測(cè)試類(lèi)
*/
public class TestZookeeperDistributedLock {
public static void main(String[] args) {
// zookeeper 地址
String ZOOKEEPER_IP_PORT = "192.168.31.9:2181";
// zookeeper 鎖目錄
String LOCK_PATH = "/LOCK";
// 線程并發(fā)數(shù)
int NUM = 10;
CountDownLatch cdl = new CountDownLatch(NUM);
for (int i = 1; i <= NUM; i++) {
// 按照線程數(shù)迭代實(shí)例化線程
Lock lock = new ZookeeperDistributedLock(ZOOKEEPER_IP_PORT, LOCK_PATH);
new Thread(new OrderServiceHandle(cdl, lock)).start();
// 創(chuàng)建一個(gè)線程,倒計(jì)數(shù)器減1
cdl.countDown();
}
}
}
1.3.1.5 測(cè)試效果
從上圖執(zhí)行結(jié)果中可以看出,在多線程情況下,分布式鎖獲取和釋放正常。
1.3.2 基于Apache Curator 框架調(diào)用
1.3.2.1 maven依賴(lài)
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
</dependency>
1.3.2.2 代碼實(shí)現(xiàn)
這里模擬業(yè)務(wù)使用分布式鎖,還是使用的OrderServiceHandle類(lèi),這里只給出分布式鎖實(shí)現(xiàn)類(lèi)和測(cè)試類(lèi),不再給出OrderServiceHandle代碼,可以參考上一小節(jié)的OrderServiceHandle類(lèi)。
1.3.2.2.1 分布式鎖類(lèi)CuratorDistributeLock
package com.ningzhaosheng.distributelock.zookeeper.curator;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* @author ningzhaosheng
* @date 2024/4/17 22:03:45
* @description 實(shí)現(xiàn)Lock接口(其實(shí)可以不用,可以直接使用InterProcessMutex,這里是為了和jvm的Lock鎖保持一致,所以做了一層封裝)
*/
public class CuratorDistributeLock implements Lock {
private CuratorFramework client;
private InterProcessMutex mutex;
public CuratorDistributeLock(String connString, String lockPath) {
this(connString, lockPath, new ExponentialBackoffRetry(3000,5));
}
public CuratorDistributeLock(String connString, String lockPath, ExponentialBackoffRetry retryPolicy) {
try {
client = CuratorFrameworkFactory.builder()
.connectString(connString)
.retryPolicy(retryPolicy)
.build();
client.start();
mutex = new InterProcessMutex(client, lockPath);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void lock() {
try {
// 獲取鎖
mutex.acquire();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock() {
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public void unlock() {
try {
// 釋放鎖
mutex.release();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public Condition newCondition() {
return null;
}
}
1.3.2.2.2 測(cè)試類(lèi)TestCuratorDistributedLock
package com.ningzhaosheng.distributelock.zookeeper.curator;
import com.ningzhaosheng.distributelock.zookeeper.OrderServiceHandle;
import java.util.concurrent.CountDownLatch;
/**
* @author ningzhaosheng
* @date 2024/4/17 21:54:33
* @description 基于 apache curator分布式鎖測(cè)試類(lèi)
*/
public class TestCuratorDistributedLock {
private static final String ZK_ADDRESS = "192.168.31.9:2181";
private static final String LOCK_PATH = "/distributed_lock";
public static void main(String[] args) {
int NUM = 10;
CountDownLatch cdl = new CountDownLatch(NUM);
for (int i = 1; i <= NUM; i++) {
// 按照線程數(shù)迭代實(shí)例化線程
/** 創(chuàng)建CuratorDistributeLock
* 其實(shí)可以不用,可以直接使用InterProcessMutex,這里是為了和jvm的Lock鎖保持一致,所以做了一層封裝
*/
CuratorDistributeLock curatorDistributeLock = new CuratorDistributeLock(ZK_ADDRESS,LOCK_PATH);
new Thread(new OrderServiceHandle(cdl, curatorDistributeLock)).start();
// 創(chuàng)建一個(gè)線程,倒計(jì)數(shù)器減1
cdl.countDown();
}
}
}
1.3.2.3 執(zhí)行結(jié)果
從執(zhí)行結(jié)果可以看出,基于apche curator框架實(shí)現(xiàn)zookeeper鎖,它也是按照臨時(shí)順序節(jié)點(diǎn)的順序獲取鎖的,每次獲得鎖的節(jié)點(diǎn)都是最小順序節(jié)點(diǎn),然后等待鎖的線程,會(huì)基于watcher機(jī)制,每次給最小臨時(shí)順序節(jié)點(diǎn)加回調(diào),監(jiān)聽(tīng)節(jié)點(diǎn)的變更(即釋放鎖的線程會(huì)刪除節(jié)點(diǎn)),然后再重新判斷最小臨時(shí)順序節(jié)點(diǎn),最小的獲得鎖執(zhí)行,依次循環(huán)完成。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-856880.html
好了,本次內(nèi)容就分享到這,歡迎關(guān)注本博主。如果有幫助到大家,歡迎大家點(diǎn)贊+關(guān)注+收藏,有疑問(wèn)也歡迎大家評(píng)論留言!文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-856880.html
到了這里,關(guān)于分布式鎖實(shí)現(xiàn)方案-基于zookeeper的分布式鎖實(shí)現(xiàn)(原理與代碼)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!