1.分布式鎖概述
1.1 什么是分布式鎖
1)要介紹分布式鎖,首先要提到與分布式鎖相對應(yīng)的是線程鎖。
-
線程鎖:主要用來給方法、代碼塊加鎖。當(dāng)某個方法或代碼使用鎖,在同一時刻僅有一個線程執(zhí)行該方法或該代碼段。
線程鎖只在同一JVM中有效果,因為線程鎖的實現(xiàn)在根本上是依靠線程之間共享內(nèi)存實現(xiàn)的,比如synchronized是共享對象頭,顯示鎖Lock是共享某個變量(state)。
-
分布式鎖:分布式鎖,即分布式系統(tǒng)中的鎖。在單體應(yīng)用中我們通過鎖解決的是控制共享資源訪問的問題,而分布式鎖,就是解決了分布式系統(tǒng)中控制共享資源訪問的問題。與單體應(yīng)用不同的是,分布式系統(tǒng)中競爭共享資源的最小粒度從線程升級成了進(jìn)程。
分布式鎖是在分布式或者集群環(huán)境下, 多進(jìn)程可見,并且互斥的鎖。
2)分布式鎖介紹
-
傳統(tǒng)單體應(yīng)用單機部署的情況下,可以使用并發(fā)處理相關(guān)的功能進(jìn)行互斥控制,但是原單體單機部署的系統(tǒng)被演化成分布式集群系統(tǒng)后,由于分布式系統(tǒng)多線程、多進(jìn)程并且分布在不同機器上,這將使原單機部署情況下的并發(fā)控制鎖策略失效。提出分布式鎖的概念,是為了解決跨機器的互斥機制來控制共享資源的訪問。
-
分布式場景下解決并發(fā)問題,需要應(yīng)用分布式鎖技術(shù)。如上圖所示,分布式鎖的目的是保證在分布式部署的應(yīng)用集群中,多個服務(wù)在請求同一個方法或者同一個業(yè)務(wù)操作的情況下,對應(yīng)業(yè)務(wù)邏輯只能被一臺機器上的一個線程執(zhí)行,避免出現(xiàn)并發(fā)問題。
1.2 分布式鎖的設(shè)計原則
Redis官網(wǎng)上對使用分布式鎖提出至少需要滿足如下三個要求:
-
互斥(屬于安全性): 在任何給定時刻,只有一個客戶端可以持有鎖。
-
無死鎖(屬于有效性): 即如果一個線程已經(jīng)持有了鎖,那么它可以多次獲取該鎖而不會發(fā)生死鎖。
-
容錯性(屬于有效性): 如果一個線程獲取了鎖,那么即使崩潰或者失去連接,鎖也必須被釋放。
除此之外,分布式鎖的設(shè)計中還可以需要考慮:
-
加鎖解鎖的同源性:A加的鎖,不能被B解鎖。
-
獲取鎖非阻塞: 如果獲取不到鎖,不能無限期等待(在某個服務(wù)來獲取鎖時,假設(shè)該鎖已經(jīng)被另一個服務(wù)獲取,我們要能直接返回失敗,不能一直等待。)。
-
鎖失效機制:假設(shè)某個應(yīng)用獲取到鎖之后,一直沒有來釋放鎖,可能服務(wù)本身已經(jīng)掛掉了,不能一直不釋放,導(dǎo)致其他服務(wù)一直獲取不到鎖。
-
高性能: 加鎖解鎖是高性能的,加鎖時間一般是幾毫秒。(我們這個分布式鎖,可能會有很多的服務(wù)器來獲取,所以加鎖解鎖一定是需要高能的)。
-
高可用: 為了避免單點故障,鎖需要有一定的容錯方式。例如鎖服務(wù)本身就是一個集群的形式。
1.3 分布式鎖的實現(xiàn)方式
分布式鎖的使用流程: 加鎖 -----》 執(zhí)行業(yè)務(wù)邏輯 ----》釋放鎖
-
基于數(shù)據(jù)庫實現(xiàn)分布式鎖
-
基于 redis 實現(xiàn)分布式鎖
-
基于 zookeeper實現(xiàn)分布式鎖
2.基于mysql實現(xiàn)分布式鎖
基于Mysql實現(xiàn)分布式鎖,適用于對性能要求不高,并且不希望因為要使用分布式鎖而引入新組件。
可以利用唯一鍵索引不能重復(fù)插入的特點實現(xiàn)。
2.1 基于唯一索引實現(xiàn)
2.1.1 實現(xiàn)思路
-
創(chuàng)建鎖表,內(nèi)部存在字段表示資源名及資源描述,同一資源名使用數(shù)據(jù)庫唯一性限制。
-
多個進(jìn)程同時往數(shù)據(jù)庫鎖表中寫入對某個資源的占有記錄,當(dāng)某個進(jìn)程成功寫入時則表示其獲取鎖成功
-
其他進(jìn)程由于資源字段唯一性限制插入失敗陷入自旋并且失敗重試。
-
當(dāng)執(zhí)行完業(yè)務(wù)后持有該鎖的進(jìn)程則刪除該表內(nèi)的記錄,此時回到步驟一。
2.1.2 創(chuàng)建數(shù)據(jù)庫以及表
-
在mysql下創(chuàng)建數(shù)據(jù)庫,名為: distribute_lock (這里使用navicat創(chuàng)建)
-
多個進(jìn)程同時往表中插入記錄(鎖資源為1,描述為測試鎖),插入成功則執(zhí)行流程,執(zhí)行完流程后刪除其在數(shù)據(jù)庫表中的記錄。
create table `database_lock`( `id` BIGINT NOT NULL AUTO_INCREMENT, `resource` INT NOT NULL COMMENT '鎖資源', `description` varchar(1024) NOT NULL DEFAULT "" COMMENT '描述', PRIMARY KEY (`id`), UNIQUE KEY `resource` (`resource`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='數(shù)據(jù)庫分布式鎖表';
2.1.3 創(chuàng)建maven工程
-
創(chuàng)建maven工程,distribute-lock,引入依賴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" ? ? ? ? xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" ? ? ? ? xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> ? ?<modelVersion>4.0.0</modelVersion> ? ? ?<groupId>com.mashibing</groupId> ? ?<artifactId>zk-client1</artifactId> ? ?<version>1.0-SNAPSHOT</version> ? ? ?<dependencies> ? ? ? ?<dependency> ? ? ? ? ? ?<groupId>junit</groupId> ? ? ? ? ? ?<artifactId>junit</artifactId> ? ? ? ? ? ?<version>4.10</version> ? ? ? ? ? ?<scope>test</scope> ? ? ? ?</dependency> ? ? ? ? ?<!--curator--> ? ? ? ?<dependency> ? ? ? ? ? ?<groupId>org.apache.curator</groupId> ? ? ? ? ? ?<artifactId>curator-framework</artifactId> ? ? ? ? ? ?<version>4.0.0</version> ? ? ? ?</dependency> ? ? ? ? ?<dependency> ? ? ? ? ? ?<groupId>org.apache.curator</groupId> ? ? ? ? ? ?<artifactId>curator-recipes</artifactId> ? ? ? ? ? ?<version>4.0.0</version> ? ? ? ?</dependency> ? ? ? ? ?<!--日志--> ? ? ? ?<dependency> ? ? ? ? ? ?<groupId>org.slf4j</groupId> ? ? ? ? ? ?<artifactId>slf4j-api</artifactId> ? ? ? ? ? ?<version>1.7.21</version> ? ? ? ?</dependency> ? ? ? ? ?<dependency> ? ? ? ? ? ?<groupId>org.slf4j</groupId> ? ? ? ? ? ?<artifactId>slf4j-log4j12</artifactId> ? ? ? ? ? ?<version>1.7.21</version> ? ? ? ?</dependency> ? ? ? ? ?<!-- zookeeper --> ? ? ? ?<dependency> ? ? ? ? ? ?<groupId>com.101tec</groupId> ? ? ? ? ? ?<artifactId>zkclient</artifactId> ? ? ? ? ? ?<version>0.10</version> ? ? ? ?</dependency> ? ? ? ? ?<!-- lombok --> ? ? ? ?<dependency> ? ? ? ? ? ?<groupId>org.projectlombok</groupId> ? ? ? ? ? ?<artifactId>lombok</artifactId> ? ? ? ? ? ?<version>1.18.6</version> ? ? ? ?</dependency> ? ? ? ? ?<!-- jdbc --> ? ? ? ?<dependency> ? ? ? ? ? ?<groupId>mysql</groupId> ? ? ? ? ? ?<artifactId>mysql-connector-java</artifactId> ? ? ? ? ? ?<version>5.1.48</version> ? ? ? ?</dependency> ? ? ?</dependencies> ? ? ?<build> ? ? ? ?<plugins> ? ? ? ? ? ?<plugin> ? ? ? ? ? ? ? ?<groupId>org.apache.maven.plugins</groupId> ? ? ? ? ? ? ? ?<artifactId>maven-compiler-plugin</artifactId> ? ? ? ? ? ? ? ?<version>3.1</version> ? ? ? ? ? ? ? ?<configuration> ? ? ? ? ? ? ? ? ? ?<source>1.8</source> ? ? ? ? ? ? ? ? ? ?<target>1.8</target> ? ? ? ? ? ? ? ?</configuration> ? ? ? ? ? ?</plugin> ? ? ? ?</plugins> ? ?</build> </project>
-
添加數(shù)據(jù)庫配置文件 db.properties
driver=com.mysql.cj.jdbc.Driver url=jdbc:mysql://localhost:3306/distribute_lock?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=Asia/Shanghai user=root password=123456
-
創(chuàng)建包結(jié)構(gòu)
基礎(chǔ)包結(jié)構(gòu): com.mashibing.lock , 在lock包下創(chuàng)建 mysql 包
com.mashibing.lock.mysql
-
導(dǎo)入工具類
-
PropertiesReader
@Slf4j public class PropertiesReader { ? ? ?// Properties緩存文件 ? ?private static final Map<String, Properties> propertiesCache = new HashMap<String, Properties>(); ? ? ?public static Properties getProperties(String propertiesName) throws IOException { ? ? ? ?if (propertiesCache.containsKey(propertiesName)) { ? ? ? ? ? ?return propertiesCache.get(propertiesName); ? ? ? } ? ? ? ?loadProperties(propertiesName); ? ? ? ?return propertiesCache.get(propertiesName); ? } ? ? ?private synchronized static void loadProperties(String propertiesName) throws IOException { ? ? ? ?FileReader fileReader = null; ? ? ? ? ?try { ? ? ? ? ? ?// 創(chuàng)建Properties集合類 ? ? ? ? ? ?Properties pro = new Properties(); ? ? ? ? ? ?// 獲取src路徑下的文件--->ClassLoader類加載器 ? ? ? ? ? ?ClassLoader classLoader = PropertiesReader.class.getClassLoader(); ? ? ? ? ? ?URL resource = classLoader.getResource(propertiesName); ? ? ? ? ? ?// 獲取配置路徑 ? ? ? ? ? ?String path = resource.getPath(); ? ? ? ? ? ?// 讀取文件 ? ? ? ? ? ?fileReader = new FileReader(path); ? ? ? ? ? ?// 加載文件 ? ? ? ? ? ?pro.load(fileReader); ? ? ? ? ? ?// 初始化 ? ? ? ? ? ?propertiesCache.put(propertiesName, pro); ? ? ? } catch (IOException e) { ? ? ? ? ? ?log.error("讀取Properties文件失敗,Properties名為:" + propertiesName); ? ? ? ? ? ?throw e; ? ? ? } finally { ? ? ? ? ? ?try { ? ? ? ? ? ? ? ?if (fileReader != null) { ? ? ? ? ? ? ? ? ? ?fileReader.close(); ? ? ? ? ? ? ? } ? ? ? ? ? } catch (IOException e) { ? ? ? ? ? ? ? ?log.error("fileReader關(guān)閉失??!", e); ? ? ? ? ? } ? ? ? } ? } }
-
JDBCUtils
@Slf4j public class JDBCUtils { ? ? ?private static String url; ? ?private static String user; ? ?private static String password; ? ? ?static { ? ? ? ?//讀取文件,獲取值 ? ? ? ?try { ? ? ? ? ? ?Properties properties = PropertiesReader.getProperties("db.properties"); ? ? ? ? ? ?url = properties.getProperty("url"); ? ? ? ? ? ?user = properties.getProperty("user"); ? ? ? ? ? ?password = properties.getProperty("password"); ? ? ? ? ? ?String driver = properties.getProperty("driver"); ? ? ? ? ? ?//4.注冊驅(qū)動 ? ? ? ? ? ?Class.forName(driver); ? ? ? } catch (IOException | ClassNotFoundException e) { ? ? ? ? ? ?log.error("初始化jdbc連接失敗!", e); ? ? ? } ? } ? ? ?/** ? ? * 獲取連接 ? ? * ? ? * @return 連接對象 ? ? */ ? ?public static Connection getConnection() throws SQLException { ? ? ? ?return DriverManager.getConnection(url, user, password); ? } ? ? ?/** ? ? * 釋放資源 ? ? * ? ? * @param rs ? ? * @param st ? ? * @param conn ? ? */ ? ?public static void close(ResultSet rs, Statement st, Connection conn) { ? ? ? ?if (rs != null) { ? ? ? ? ? ?try { ? ? ? ? ? ? ? ?rs.close(); ? ? ? ? ? } catch (SQLException e) { ? ? ? ? ? ? ? ?e.printStackTrace(); ? ? ? ? ? } ? ? ? } ? ? ? ?if (st != null) { ? ? ? ? ? ?try { ? ? ? ? ? ? ? ?st.close(); ? ? ? ? ? } catch (SQLException e) { ? ? ? ? ? ? ? ?e.printStackTrace(); ? ? ? ? ? } ? ? ? } ? ? ? ?if (conn != null) { ? ? ? ? ? ?try { ? ? ? ? ? ? ? ?conn.close(); ? ? ? ? ? } catch (SQLException e) { ? ? ? ? ? ? ? ?e.printStackTrace(); ? ? ? ? ? } ? ? ? } ? } }
2.1.4 數(shù)據(jù)庫操作類
-
com.mashibing.lock.mysql.service
/** * MySQL 鎖操作類(加鎖+釋放鎖) */ @Slf4j public class MySQLDistributedLockService { ? ? ?private static Connection connection; ? ?private static Statement statement; ? ?private static ResultSet resultSet; ? ? ?static{ ? ? ? ?try { ? ? ? ? ? ?connection = JDBCUtils.getConnection(); ? ? ? ? ? ?statement = connection.createStatement(); ? ? ? ? ? ?resultSet = null; ? ? ? } catch (SQLException e) { ? ? ? ? ? ?log.error("數(shù)據(jù)庫連接失?。?); ? ? ? } ? } ? ? ?/** ? ? * 鎖表 - 獲取鎖 ? ? * @param resource ? ? 資源 ? ? * @param description ? 鎖描述 ? ? * @return 是否操作成功 ? ? */ ? ?public static boolean tryLock(int resource,String description){ ? ? ? ? ?String sql = "insert into database_lock (resource,description) values (" + resource + ", '" + description + "');"; ? ? ? ? ?//獲取數(shù)據(jù)庫連接 ? ? ? ?try { ? ? ? ? ? ?int stat = statement.executeUpdate(sql); ? ? ? ? ? ?return stat == 1; ? ? ? } catch (SQLException e) { ? ? ? ? ? ?return false; ? ? ? } ? } ? ? ?/** ? ? * 鎖表-釋放鎖 ? ? * @return ? ? */ ? ?public static boolean releaseLock(int resource) throws SQLException { ? ? ? ?String sql = "delete from database_lock where resource = " + resource; ? ? ? ?//獲取數(shù)據(jù)庫連接 ? ? ? ?int stat = statement.executeUpdate(sql); ? ? ? ?return stat == 1; ? } ? ? ?/** ? ? * 關(guān)閉連接 ? ? */ ? ?public static void close(){ ? ? ? ?log.info("當(dāng)前線程: " + ManagementFactory.getRuntimeMXBean().getName().split("@")[0] + ? ? ? ? ? ? ? ?",關(guān)閉了數(shù)據(jù)庫連接!"); ? ? ? ?JDBCUtils.close(resultSet,statement,connection); ? } }
2.1.5 創(chuàng)建LockTable
/** * mysql分布式鎖 * ? ? 執(zhí)行流程: 多進(jìn)程搶占數(shù)據(jù)庫某個資源,然后執(zhí)行業(yè)務(wù),執(zhí)行完釋放資源 * ? ? 鎖機制: 單一進(jìn)程獲取鎖時,則其他進(jìn)程提交失敗 */ @Slf4j public class LockTable extends Thread { ? ? ?@Override ? ?public void run() { ? ? ? ?super.run(); ? ? ? ? ?//獲取Java虛擬機的進(jìn)程ID ? ? ? ?String pid = ManagementFactory.getRuntimeMXBean().getName().split("@")[0]; ? ? ? ?try{ ? ? ? ? ? ?while(true){ ? ? ? ? ? ? ? ?log.info("當(dāng)前進(jìn)程PID:" + pid + ",嘗試獲取鎖資源!"); ? ? ? ? ? ? ? ?if(MySQLDistributedLockService.tryLock(1,"測試鎖")){ ? ? ? ? ? ? ? ? ? ?log.info("當(dāng)前進(jìn)程PID:" + pid + ",獲取鎖資源成功!"); ? ? ? ? ? ? ? ? ? ? ?//sleep模擬業(yè)務(wù)處理過程 ? ? ? ? ? ? ? ? ? ?log.info("開始處理業(yè)務(wù)!"); ? ? ? ? ? ? ? ? ? ?Thread.sleep(10*1000); ? ? ? ? ? ? ? ? ? ?log.info("業(yè)務(wù)處理完成!"); ? ? ? ? ? ? ? ? ? ? ?MySQLDistributedLockService.releaseLock(1); ? ? ? ? ? ? ? ? ? ?log.info("當(dāng)前進(jìn)程PID: " + pid + ",釋放了鎖資源!"); ? ? ? ? ? ? ? ? ? ?break; ? ? ? ? ? ? ? }else{ ? ? ? ? ? ? ? ? ? ?log.info("當(dāng)前進(jìn)程PID: " + pid + ",獲取鎖資源失??!"); ? ? ? ? ? ? ? ? ? ?Thread.sleep(2000); ? ? ? ? ? ? ? } ? ? ? ? ? } ? ? ? }catch (Exception e){ ? ? ? ? ? ?log.error("搶占鎖發(fā)生錯誤!",e); ? ? ? }finally { ? ? ? ? ? ?MySQLDistributedLockService.close(); ? ? ? } ? } ? ? ?// 程序入口 ? ?public static void main(String[] args) { ? ? ? ? ?new LockTable().start(); ? } }
2.1.6 分布式鎖測試
-
運行時開啟并行執(zhí)行選項,每次運行三個或三個以上進(jìn)程. Allow parallel run 運行并行執(zhí)行
-
三個進(jìn)程的執(zhí)行情況
注意事項:
-
該鎖為非阻塞的
-
當(dāng)某進(jìn)程持有鎖并且掛死時候會造成資源一直不釋放的情況,造成死鎖,因此需要維護(hù)一個定時清理任務(wù)去清理持有過久的鎖
-
要注意數(shù)據(jù)庫的單點問題,最好設(shè)置備庫,進(jìn)一步提高可靠性
-
該鎖為非可重入鎖,如果要設(shè)置成可重入鎖需要添加數(shù)據(jù)庫字段記錄持有該鎖的設(shè)備信息以及加鎖次數(shù)
2.2 基于樂觀鎖
2.2.1 需求分析
需求: 數(shù)據(jù)庫中設(shè)定某商品基本信息(名為外科口罩,數(shù)量為10),多進(jìn)程對該商品進(jìn)行搶購,當(dāng)商品數(shù)量為0時結(jié)束搶購。
-
創(chuàng)建表
# 創(chuàng)建表 create table `database_lock_2`( `id` BIGINT NOT NULL AUTO_INCREMENT, `good_name` VARCHAR(256) NOT NULL DEFAULT "" COMMENT '商品名稱', `good_count` INT NOT NULL COMMENT '商品數(shù)量', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='數(shù)據(jù)庫分布式鎖表2'; # 插入原始數(shù)據(jù) insert into database_lock_2 (good_name,good_count) values ('醫(yī)用口罩',10);
2.2.2 實現(xiàn)思路
-
每次執(zhí)行業(yè)務(wù)前首先進(jìn)行數(shù)據(jù)庫查詢,查詢當(dāng)前的需要修改的資源值(或版本號)。
-
進(jìn)行資源的修改操作,并且修改前進(jìn)行資源(或版本號)的比對操作,比較此時數(shù)據(jù)庫中的值是否和上一步查詢結(jié)果相同。
-
查詢結(jié)果相同則修改對應(yīng)資源值,不同則回到第一步。
2.2.3 代碼實現(xiàn)
-
在 MySQLDistributedLockService 中,添加對樂觀鎖的操作
/** * 樂觀鎖-獲取資源 * @param id 資源ID * @return result */ public static ResultSet getGoodCount(int id) throws SQLException { String sql = "select * from database_lock_2 where id = " + id; //查詢數(shù)據(jù) resultSet = statement.executeQuery(sql); return resultSet; } /** * 樂觀鎖-修改資源 * @param id 資源ID * @param goodCount 資源 * @return 修改狀態(tài) */ public static boolean setGoodCount(int id, int goodCount) throws SQLException { String sql = "update database_lock_2 set good_count = good_count - 1 where id =" + id +" and good_count = " + goodCount; int stat = statement.executeUpdate(sql); return stat == 1; } /** * 樂觀鎖-開啟事務(wù)自動提交 */ public static void AutoCommit(){ try { connection.setAutoCommit(true); } catch (SQLException e) { log.error("開啟自動提交!",e); } }
-
創(chuàng)建OptimisticLock,模擬并發(fā)操作分布式鎖
/** * mysql分布式鎖-樂觀鎖 * 執(zhí)行流程: 多進(jìn)程搶購?fù)簧唐?,每次搶購成功商品?shù)量-1,商品數(shù)據(jù)量為0時退出 * 鎖機制: 單一進(jìn)程獲取鎖時,則其他進(jìn)程提交失敗 */ @Slf4j public class OptimisticLock extends Thread{ @Override public void run() { super.run(); String pid = ManagementFactory.getRuntimeMXBean().getName().split("@")[0]; ResultSet resultSet = null; String goodName = null; int goodCount = 0; try { while(true){ log.info("當(dāng)前線程:" + pid + ",開始搶購商品!"); //獲取當(dāng)前商品信息 resultSet = MySQLDistributedLockService.getGoodCount(1); while (resultSet.next()){ goodName = resultSet.getString("good_name"); goodCount = resultSet.getInt("good_count"); } log.info("獲取庫存成功,當(dāng)前商品名為:" + goodName + ",當(dāng)前庫存剩余量為:" + goodCount); //模擬執(zhí)行業(yè)務(wù)操作 Thread.sleep(2*3000); if(0 == goodCount){ log.info("搶購失敗,當(dāng)前庫存為0! "); break; } //修改庫存信息,庫存量-1 if(MySQLDistributedLockService.setGoodCount(1,goodCount)){ log.info("當(dāng)前線程:" + pid + " 搶購商品:" + goodName + "成功,剩余庫存為:" + (goodCount -1)); //模擬延遲,防止鎖每次被同一進(jìn)程獲取 Thread.sleep(2 * 1000); }else{ log.error("搶購商品:" + goodName +"失敗,商品數(shù)量已被修改"); } } }catch (Exception e){ log.error("搶購商品發(fā)生錯誤!",e); }finally { if(resultSet != null){ try { resultSet.close(); } catch (SQLException e) { e.printStackTrace(); log.error("關(guān)閉Result失敗!" , e); } } MySQLDistributedLockService.close(); } } public static void main(String[] args) { new OptimisticLock().start(); } }
2.3.4 代碼測試
開啟三個進(jìn)程,查看執(zhí)行情況
-
9 7 4 1
-
8 5 2
-
6 3
注意事項:
-
該鎖為非阻塞的
-
該鎖對于業(yè)務(wù)具有侵入式,如果設(shè)置版本號校驗則增加的額外的字段,增加了數(shù)據(jù)庫冗余
-
當(dāng)并發(fā)量過高時會有大量請求訪問數(shù)據(jù)庫的某行記錄,對數(shù)據(jù)庫造成很大的寫壓力
-
因此樂觀鎖適用于并發(fā)量不高,并且寫操作不頻繁的場景
2.3 基于悲觀鎖
2.3.1 實現(xiàn)思路
-
關(guān)閉jdbc連接自動commit屬性
-
每次執(zhí)行業(yè)務(wù)前先使用查詢語句后接for update表示鎖定該行數(shù)據(jù)(注意查詢條件如果未命中主鍵或索引,此時將會從行鎖變?yōu)楸礞i)
-
執(zhí)行業(yè)務(wù)流程修改表資源
-
執(zhí)行commit操作
2.3.2 代碼實現(xiàn)
-
在 MySQLDistributedLockService 中,添加對悲觀鎖的操作
/** * 悲觀鎖-獲取資源 * @param id 資源ID * @return result */ public static ResultSet getGoodCount2(int id) throws SQLException { String sql = "select * from database_lock_2 where id = " + id + "for update"; //查詢數(shù)據(jù) resultSet = statement.executeQuery(sql); return resultSet; } /** * 悲觀鎖-修改資源 * @param id 資源ID * @return 修改狀態(tài) */ public static boolean setGoodCount2(int id) throws SQLException { String sql = "update database_lock_2 set good_count = good_count - 1 where id =" + id; int stat = statement.executeUpdate(sql); return stat == 1; } /** * 悲觀鎖-關(guān)閉事務(wù)自動提交 */ public static void closeAutoCommit(){ try { connection.setAutoCommit(false); } catch (SQLException e) { log.error("關(guān)閉自動提交失??!",e); } } /** * 悲觀鎖-提交事務(wù) */ public static void commit(String pid,String goodName,int goodCount) throws SQLException { connection.commit(); log.info("當(dāng)前線程:" + pid + "搶購商品: " + goodName + "成功,剩余庫存為:" + (goodCount-1)); } /** * 悲觀鎖-回滾 */ public static void rollBack() throws SQLException { connection.rollback(); }
-
創(chuàng)建PessimisticLock,模擬并發(fā)操作分布式鎖
/** * mysql 分布式鎖-悲觀鎖 * 執(zhí)行流程:多個進(jìn)程搶占同一個商品,執(zhí)行業(yè)務(wù)完畢則通過connection.commit() 釋放鎖 * 鎖機制:單一進(jìn)程獲取鎖時,則其他進(jìn)程將阻塞等待 */ @Slf4j public class PessimisticLock extends Thread { @Override public void run() { super.run(); ResultSet resultSet = null; String goodName = null; int goodCount = 0; String pid = ManagementFactory.getRuntimeMXBean().getName().split("@")[0]; //關(guān)閉自動提交 MySQLDistributedLockService.closeAutoCommit(); try{ while(true){ log.info("當(dāng)前線程:" + pid + ""); //獲取庫存 resultSet = MySQLDistributedLockService.getGoodCount2(1); while (resultSet.next()) { goodName = resultSet.getString("good_name"); goodCount = resultSet.getInt("good_count"); } log.info("獲取庫存成功,當(dāng)前商品名稱為:" + goodName + ",當(dāng)前庫存剩余量為:" + goodCount); // 模擬執(zhí)行業(yè)務(wù)事件 Thread.sleep(2 * 1000); if (0 == goodCount) { log.info("搶購失敗,當(dāng)前庫存為0!"); break; } // 搶購商品 if (MySQLDistributedLockService.setGoodCount2(1)) { // 模擬延時,防止鎖每次被同一進(jìn)程獲取 MySQLDistributedLockService.commit(pid, goodName, goodCount); Thread.sleep(2 * 1000); } else { log.error("搶購商品:" + goodName + "失敗!"); } } }catch (Exception e){ //搶購失敗 log.error("搶購商品發(fā)生錯誤!",e); try { MySQLDistributedLockService.rollBack(); } catch (SQLException ex) { log.error("回滾失?。?",e); } }finally { if(resultSet != null){ try { resultSet.close(); } catch (SQLException e) { log.error("Result關(guān)閉失?。?,e); } } MySQLDistributedLockService.close(); } } public static void main(String[] args) { new PessimisticLock().start(); } }
2.3.3 代碼測試
開啟三個進(jìn)程,查看執(zhí)行情況
-
9 6 3 0
-
8 5 2
-
7 4 1文章來源地址http://www.zghlxwxcb.cn/news/detail-826573.html
注意事項:
-
該鎖為阻塞鎖
-
每次請求存在額外加鎖的開銷
-
在并發(fā)量很高的情況下會造成系統(tǒng)中存在大量阻塞的請求,影響系統(tǒng)的可用性
-
因此悲觀鎖適用于并發(fā)量不高,讀操作不頻繁的寫場景
總結(jié):
-
在實際使用中,由于受到性能以及穩(wěn)定性約束,對于關(guān)系型數(shù)據(jù)庫實現(xiàn)的分布式鎖一般很少被用到。但是對于一些并發(fā)量不高、系統(tǒng)僅提供給內(nèi)部人員使用的單一業(yè)務(wù)場景可以考慮使用關(guān)系型數(shù)據(jù)庫分布式鎖,因為其復(fù)雜度較低,可靠性也能夠得到保證。
3.基于Zookeeper分布式鎖
3.1 Zookeeper分布式鎖應(yīng)用場景
-
全部的訂單服務(wù)在調(diào)用 createId 接口前都往 ZooKeeper 的注冊中心的指定目錄寫入注冊信息(如 /lock/server 01)和綁定值改變事件
-
全部的訂單服務(wù)判斷自己往注冊中心指定目錄寫入的注冊信息是否是全部注冊信息中的第一條?如果是,調(diào)用 createId 接口(不是第一條就等著)。調(diào)用結(jié)束后,去注冊中心移除自己的信息
-
ZooKeeper 注冊中心信息改變后,通知所有的綁定了值改變事件的訂單服務(wù)執(zhí)行第 2 條
3.2 Zookeeper分布式鎖分析
客戶端(對zookeeper集群而言)向zookeeper集群進(jìn)行了上線注冊并在一個永久節(jié)點下創(chuàng)建有序的臨時子節(jié)點后,根據(jù)編號順序,最小順序的子節(jié)點獲取到鎖,其他子節(jié)點由小到大監(jiān)聽前一個節(jié)點。
當(dāng)拿到鎖的節(jié)點處理完事務(wù)后,釋放鎖,后一個節(jié)點監(jiān)聽到前一個節(jié)點釋放鎖后,立刻申請獲得鎖,以此類推
過程解析
-
第一部分:客戶端在zookeeper集群創(chuàng)建帶序號的、臨時的節(jié)點
-
第二部分:判斷節(jié)點是否是最小的節(jié)點,如果是,獲取到鎖,如果不是,監(jiān)聽前一個節(jié)點
3.3 分布式鎖實現(xiàn)
1)創(chuàng)建 Distributedlock類, 獲取與zookeeper的連接
-
構(gòu)造方法中獲取連接
-
添加 CountDownLatch (閉鎖)
CountDownLatch是具有synchronized機制的一個工具,目的是讓一個或者多個線程等待,直到其他線程的一系列操作完成。
CountDownLatch初始化的時候,需要提供一個整形數(shù)字,數(shù)字代表著線程需要調(diào)用countDown()方法的次數(shù),當(dāng)計數(shù)為0時,線程才會繼續(xù)執(zhí)行await()方法后的其他內(nèi)容。
/** * 分布式鎖 */ public class DistributedLock { private ZooKeeper client; // 連接信息 private String connectString = "192.168.58.200:2181,192.168.58.200:2182,192.168.58.200:2183"; // 超時時間 private int sessionTimeOut = 30000; private CountDownLatch countDownLatch = new CountDownLatch(1); //1. 在構(gòu)造方法中獲取連接 public DistributedLock() throws Exception { client = new ZooKeeper(connectString, sessionTimeOut, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { } }); //等待Zookeeper連接成功,連接完成繼續(xù)往下走 countDownLatch.await(); //2. 判斷節(jié)點是否存在 Stat stat = client.exists("/locks", false); if(stat == null){ //創(chuàng)建一下根節(jié)點 client.create("/locks","locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } //3.對ZK加鎖 public void zkLock(){ //創(chuàng)建 臨時帶序號節(jié)點 //判斷 創(chuàng)建的節(jié)點是否是最小序號節(jié)點,如果是 就獲取到鎖;如果不是就監(jiān)聽前一個節(jié)點 } //4.解鎖 public void unZkLock(){ //刪除節(jié)點 } }
2)對zk加鎖
/** * 分布式鎖 */ public class DistributedLock { private ZooKeeper client; // 連接信息 private String connectString = "192.168.58.200:2181,192.168.58.200:2182,192.168.58.200:2183"; // 超時時間 private int sessionTimeOut = 30000; // 等待zk連接成功 private CountDownLatch countDownLatch = new CountDownLatch(1); // 等待節(jié)點變化 private CountDownLatch waitLatch = new CountDownLatch(1); //當(dāng)前節(jié)點 private String currentNode; //前一個節(jié)點路徑 private String waitPath; //1. 在構(gòu)造方法中獲取連接 public DistributedLock() throws Exception { client = new ZooKeeper(connectString, sessionTimeOut, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { //countDownLatch 連上ZK,可以釋放 if(watchedEvent.getState() == Event.KeeperState.SyncConnected){ countDownLatch.countDown(); } //waitLatch 需要釋放 (節(jié)點被刪除并且刪除的是前一個節(jié)點) if(watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)){ waitLatch.countDown(); } } }); //等待Zookeeper連接成功,連接完成繼續(xù)往下走 countDownLatch.await(); //2. 判斷節(jié)點是否存在 Stat stat = client.exists("/locks", false); if(stat == null){ //創(chuàng)建一下根節(jié)點 client.create("/locks","locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } //3.對ZK加鎖 public void zkLock(){ //創(chuàng)建 臨時帶序號節(jié)點 try { currentNode = client.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); //判斷 創(chuàng)建的節(jié)點是否是最小序號節(jié)點,如果是 就獲取到鎖;如果不是就監(jiān)聽前一個節(jié)點 List<String> children = client.getChildren("/locks", false); //如果創(chuàng)建的節(jié)點只有一個值,就直接獲取到鎖,如果不是,監(jiān)聽它前一個節(jié)點 if(children.size() == 1){ return; }else{ //先排序 Collections.sort(children); //獲取節(jié)點名稱 String nodeName = currentNode.substring("/locks/".length()); //通過名稱獲取該節(jié)點在集合的位置 int index = children.indexOf(nodeName); //判斷 if(index == -1){ System.out.println("數(shù)據(jù)異常"); }else if(index == 0){ //就一個節(jié)點,可以獲取鎖 return; }else{ //需要監(jiān)聽前一個節(jié)點變化 waitPath = "/locks/" + children.get(index-1); client.getData(waitPath,true,null); //等待監(jiān)聽執(zhí)行 waitLatch.await(); return; } } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
3)zk刪除鎖
//4.解鎖 public void unZkLock() throws KeeperException, InterruptedException { //刪除節(jié)點 client.delete(currentNode,-1); }
4)測試
public class DistributedLockTest { public static void main(String[] args) throws Exception { final DistributedLock lock1 = new DistributedLock(); final DistributedLock lock2 = new DistributedLock(); new Thread(new Runnable() { @Override public void run() { try { lock1.zkLock(); System.out.println("線程1 啟動 獲取到鎖"); Thread.sleep(5 * 1000); lock1.unZkLock(); System.out.println("線程1 釋放鎖"); } catch (InterruptedException | KeeperException e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { try { lock2.zkLock(); System.out.println("線程2 啟動 獲取到鎖"); Thread.sleep(5 * 1000); lock2.unZkLock(); System.out.println("線程2 釋放鎖"); } catch (InterruptedException | KeeperException e) { e.printStackTrace(); } } }).start(); } }
3.4 Curator框架實現(xiàn)分布式鎖案例
3.4.1 InterProcessMutex介紹
Apache Curator 內(nèi)置了分布式鎖的實現(xiàn): InterProcessMutex
。
-
InterProcessMutex有兩個構(gòu)造方法
public InterProcessMutex(CuratorFramework client, String path) { this(client, path, new StandardLockInternalsDriver()); } public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver) { this(client, path, LOCK_NAME, 1, driver); }
-
參數(shù)說明如下
參數(shù) | 說明 |
---|---|
client | curator中zk客戶端對象 |
path | 搶鎖路徑,同一個鎖path需一致 |
driver | 可自定義lock驅(qū)動實現(xiàn)分布式鎖 |
-
主要方法如下
//獲取鎖,若失敗則阻塞等待直到成功,支持重入 public void acquire() throws Exception //超時獲取鎖,超時失敗 public boolean acquire(long time, TimeUnit unit) throws Exception //釋放鎖 public void release() throws Exception
-
注意點,調(diào)用acquire()方法后需相應(yīng)調(diào)用release()來釋放鎖
3.4.2 實現(xiàn)思路
3.4.3 分布式鎖測試
-
9 6 3 0
-
8 5 2
文章來源:http://www.zghlxwxcb.cn/news/detail-826573.html
-
7 4 1
到了這里,關(guān)于第五節(jié) zookeeper集群與分布式鎖_2的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!