問題背景
研究分布式鎖,基于ZK實(shí)現(xiàn),需要整合到SpringBoot使用
前言
- 參考自SpringBoot集成Curator實(shí)現(xiàn)Zookeeper基本操作,Zookeeper入門
- 本篇的代碼筆者有自己運(yùn)行過,需要注意組件的版本號(hào)是否兼容,否則會(huì)有比較多的坑
實(shí)現(xiàn)
搭建Zookeeper容器
采用Docker compose快速搭建ZK容器,很快,幾分鐘就好了,而且是集群方式搭建。詳情見筆者的Docker搭建zookeeper
引入依賴
需要注意的點(diǎn):
Curator 2.x.x-
兼容兩個(gè)zk 3.4.x
和zk 3.5.x
,Curator 3.x.x
-兼容兼容zk 3.5
,根據(jù)搭建的zk的版本使用對(duì)應(yīng)的curator依賴。引入的zk依賴,如果項(xiàng)目中有使用logback
日志 ,需要排除zk中的log4j12
依賴,詳情見下面筆者給出的依賴:
<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.7</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
ZK客戶端的配置類
配置ZK的參數(shù),使用
@ConfigurationProperties
可以令配置熱更新,比如搭配Apollo、Nacos,如果使用@Valid
則無法熱更新,必須重啟項(xiàng)目才能生效
@Component
@ConfigurationProperties(prefix = "curator")
@Data
public class ZKClientProps {
private String connectString;
private int retryCount;
private int elapsedTimeMs;
private int sessionTimeoutMs;
private int connectionTimeoutMs;
}
對(duì)應(yīng)yml如下:
#curator配置
curator:
connectString: 192.168.163.128:2181,192.168.163.128:2182,192.168.163.128:2183 # zookeeper 地址
retryCount: 1 # 重試次數(shù)
elapsedTimeMs: 2000 # 重試間隔時(shí)間
sessionTimeoutMs: 60000 # session超時(shí)時(shí)間
connectionTimeoutMs: 10000 # 連接超時(shí)時(shí)間
ZK客戶端的工廠類
定制ZK客戶端:
@Component
public class ZKClientFactory {
@Resource
private ZKClientProps zkClientProps;
public CuratorFramework createSimple() {
//重試策略:第一次重試等待1S,第二次重試等待2S,第三次重試等待4s
//第一個(gè)參數(shù):等待時(shí)間的基礎(chǔ)單位,單位為毫秒
//第二個(gè)參數(shù):最大重試次數(shù)
ExponentialBackoffRetry retry = new ExponentialBackoffRetry(zkClientProps.getElapsedTimeMs(), zkClientProps.getRetryCount());
//獲取CuratorFramework示例的最簡(jiǎn)單方式
//第一個(gè)參數(shù):zk的連接地址
//第二個(gè)參數(shù):重試策略
return CuratorFrameworkFactory.newClient(zkClientProps.getConnectString(), retry);
}
public static CuratorFramework createWithOptions(String connectionString, RetryPolicy retryPolicy,
int connectionTimeoutMs, int sessionTimeoutMs) {
return CuratorFrameworkFactory.builder().connectString(connectionString)
.retryPolicy(retryPolicy).connectionTimeoutMs(connectionTimeoutMs).sessionTimeoutMs(sessionTimeoutMs).build();
}
}
注入bean
創(chuàng)建ZK的客戶端,詳情如下:
@Component
@Slf4j
public class ZKClient {
@Resource
private ZKClientFactory zkClientFactory;
public static final ZKClient INSTANCE = new ZKClient();
private ZKClient() {
}
public CuratorFramework getClient() {
return zkClientFactory.createSimple();
}
public boolean isNodeExist(String path) {
CuratorFramework client = getClient();
try {
client.start();
Stat stat = client.checkExists().forPath(path);
return stat != null;
} catch (Exception e) {
e.printStackTrace();
} finally {
CloseableUtils.closeQuietly(client);
}
return false;
}
public void createNode(String path, byte[] bytes) {
CuratorFramework client = getClient();
try {
// 必須start,否則報(bào)錯(cuò)
client.start();
client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, bytes);
} catch (Exception e) {
e.printStackTrace();
} finally {
CloseableUtils.closeQuietly(client);
}
}
public void deleteNode(String path) {
CuratorFramework client = getClient();
try {
client.start();
client.delete().forPath(path);
} catch (Exception e) {
e.printStackTrace();
} finally {
CloseableUtils.closeQuietly(client);
}
}
public List<String> getChildren(String path) {
List<String> result = new LinkedList<>();
CuratorFramework client = getClient();
try {
client.start();
result = client.getChildren().forPath(path);
} catch (Exception e) {
log.error("ZKClient getChildren error.");
}
return result;
}
}
構(gòu)建測(cè)試類
測(cè)試基類,設(shè)置激活環(huán)境
@Slf4j
@ActiveProfiles("test")
@RunWith(SpringRunner.class)
@SpringBootTest(classes = GmallZookeeperApplication.class)
@ContextConfiguration
public class BaseTest {
}
創(chuàng)建節(jié)點(diǎn)、刪除節(jié)點(diǎn)、獲取節(jié)點(diǎn)信息、分布式鎖的方法如下:
@ActiveProfiles("company")
是激活筆者一個(gè)application-company.yml
文件
application.yml如下:
server:
port: 8022
spring:
profiles:
active: home
application-compay.yml如下:文章來源:http://www.zghlxwxcb.cn/news/detail-650480.html
#curator配置
curator:
connectString: 192.168.163.128:2181,192.168.163.128:2182,192.168.163.128:2183 # zookeeper 地址
retryCount: 1 # 重試次數(shù)
elapsedTimeMs: 2000 # 重試間隔時(shí)間
sessionTimeoutMs: 60000 # session超時(shí)時(shí)間
connectionTimeoutMs: 10000 # 連接超時(shí)時(shí)間
創(chuàng)建節(jié)點(diǎn)、刪除節(jié)點(diǎn)、獲取節(jié)點(diǎn)信息、分布式鎖的方法如下:文章來源地址http://www.zghlxwxcb.cn/news/detail-650480.html
@Slf4j
@ActiveProfiles("company")
public class ZKClientTest extends BaseTest{
@Resource
private ZKClient zkClient;
public static final int THREAD_NUM = 10;
@Test
public void distributedLock() throws InterruptedException, BrokenBarrierException {
String lockPath = "/test/distributed2/lock";
CuratorFramework client = zkClient.getClient();
client.start();
InterProcessMutex lock = new InterProcessMutex(client, lockPath);
// 阻塞主線程,等待全部子線程執(zhí)行完
CyclicBarrier cyclicBarrier = new CyclicBarrier(THREAD_NUM);
for (int i = 0; i < THREAD_NUM; i++) {
new Thread(() -> {
log.info("{}->嘗試競(jìng)爭(zhēng)鎖", Thread.currentThread().getName());
try {
lock.acquire(); // 阻塞競(jìng)爭(zhēng)鎖
log.info("{}->成功獲得鎖", Thread.currentThread().getName());
Thread.sleep(2000);
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
lock.release(); //釋放鎖
} catch (Exception e) {
e.printStackTrace();
}
}
}, "Thread-" + i).start();
}
// 目的是為了等子線程搶完鎖再結(jié)束子線程,否則無法看到日志效果
cyclicBarrier.await();
log.info("全部子線程已執(zhí)行完畢");
}
@Test
public void createNode() {
// 創(chuàng)建一個(gè)ZNode節(jié)點(diǎn)
String data = "hello";
byte[] payload = data.getBytes(StandardCharsets.UTF_8);
String zkPath = "/test/CRUD/node-1";
zkClient.createNode(zkPath, payload);
log.info("createNode succeeded!");
}
@Test
public void getChildren() {
String zkPath = "/test/CRUD";
List<String> children = zkClient.getChildren(zkPath);
printList(children);
}
@Test
public void deleteNode() {
String parentPath = "/test";
log.info("======================Before delete===================");
List<String> before = zkClient.getChildren(parentPath);
printList(before);
String zkPath = "/test/CRUD/node-1";
zkClient.deleteNode(zkPath);
log.info("delete node secceeded!");
log.info("======================After delete===================");
List<String> after = zkClient.getChildren(parentPath);
printList(after);
}
private void printList(List<String> data) {
if (!CollectionUtils.isEmpty(data)) {
for (String datum : data) {
log.info("datum:{}", data);
}
}
}
}
到了這里,關(guān)于SpringBoot基于Zookeeper實(shí)現(xiàn)分布式鎖的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!