背景
優(yōu)惠券秒殺有兩個(gè)業(yè)務(wù)涉及線程并發(fā)問題,第一個(gè)是庫存超賣,第二個(gè)是一人一單,這就必須采取鎖的方案了。下面根據(jù)優(yōu)惠券秒殺功能一步一步進(jìn)行展開,利用悲觀鎖、同步鎖、分布式鎖等方案循序漸進(jìn)解決各種問題。
1. 實(shí)現(xiàn)秒殺下單
下單核心思路:當(dāng)我們點(diǎn)擊搶購(gòu)時(shí),會(huì)觸發(fā)右側(cè)的請(qǐng)求,我們只需要編寫對(duì)應(yīng)的controller即可
秒殺下單應(yīng)該思考的內(nèi)容:
下單時(shí)需要判斷兩點(diǎn):
- 秒殺是否開始或結(jié)束,如果尚未開始或已經(jīng)結(jié)束則無法下單
- 庫存是否充足,不足則無法下單
下單核心邏輯分析:
當(dāng)用戶開始進(jìn)行下單,我們應(yīng)當(dāng)去查詢優(yōu)惠卷信息,查詢到優(yōu)惠卷信息,判斷是否滿足秒殺條件
比如時(shí)間是否充足,如果時(shí)間充足,則進(jìn)一步判斷庫存是否足夠,如果兩者都滿足,則扣減庫存,創(chuàng)建訂單,然后返回訂單id,如果有一個(gè)條件不滿足則直接結(jié)束。
VoucherOrderServiceImpl
@Override
public Result seckillVoucher(Long voucherId) {
// 1.查詢優(yōu)惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判斷秒殺是否開始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未開始
return Result.fail("秒殺尚未開始!");
}
// 3.判斷秒殺是否已經(jīng)結(jié)束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未開始
return Result.fail("秒殺已經(jīng)結(jié)束!");
}
// 4.判斷庫存是否充足
if (voucher.getStock() < 1) {
// 庫存不足
return Result.fail("庫存不足!");
}
//5,扣減庫存
boolean success = seckillVoucherService.update()
.setSql("stock= stock -1")
.eq("voucher_id", voucherId).update();
if (!success) {
//扣減庫存
return Result.fail("庫存不足!");
}
//6.創(chuàng)建訂單
VoucherOrder voucherOrder = new VoucherOrder();
// 6.1.訂單id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 6.2.用戶id
Long userId = UserHolder.getUser().getId();
voucherOrder.setUserId(userId);
// 6.3.代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
return Result.ok(orderId);
}
2. 庫存超賣問題分析
有關(guān)超賣問題分析:在我們?cè)写a中是這么寫的
if (voucher.getStock() < 1) {
// 庫存不足
return Result.fail("庫存不足!");
}
//5,扣減庫存
boolean success = seckillVoucherService.update()
.setSql("stock= stock -1")
.eq("voucher_id", voucherId).update();
if (!success) {
//扣減庫存
return Result.fail("庫存不足!");
}
假設(shè)線程1過來查詢庫存,判斷出來庫存大于1,正準(zhǔn)備去扣減庫存,但是還沒有來得及去扣減,此時(shí)線程2過來,線程2也去查詢庫存,發(fā)現(xiàn)這個(gè)數(shù)量一定也大于1,那么這兩個(gè)線程都會(huì)去扣減庫存,最終多個(gè)線程相當(dāng)于一起去扣減庫存,此時(shí)就會(huì)出現(xiàn)庫存的超賣問題。
超賣問題是典型的多線程安全問題,針對(duì)這一問題的常見解決方案就是加鎖:而對(duì)于加鎖,我們通常有兩種解決方案:見下圖:
悲觀鎖:
悲觀鎖可以實(shí)現(xiàn)對(duì)于數(shù)據(jù)的串行化執(zhí)行,比如syn,和lock都是悲觀鎖的代表,同時(shí),悲觀鎖中又可以再細(xì)分為公平鎖,非公平鎖,可重入鎖,等等
樂觀鎖:
樂觀鎖:會(huì)有一個(gè)版本號(hào),每次操作數(shù)據(jù)會(huì)對(duì)版本號(hào)+1,再提交回?cái)?shù)據(jù)時(shí),會(huì)去校驗(yàn)是否比之前的版本大1 ,如果大1 ,則進(jìn)行操作成功,這套機(jī)制的核心邏輯在于,如果在操作過程中,版本號(hào)只比原來大1 ,那么就意味著操作過程中沒有人對(duì)他進(jìn)行過修改,他的操作就是安全的,如果不大1,則數(shù)據(jù)被修改過,當(dāng)然樂觀鎖還有一些變種的處理方式比如cas
樂觀鎖的典型代表:就是cas,利用cas進(jìn)行無鎖化機(jī)制加鎖,var5 是操作前讀取的內(nèi)存值,while中的var1+var2 是預(yù)估值,如果預(yù)估值 == 內(nèi)存值,則代表中間沒有被人修改過,此時(shí)就將新值去替換 內(nèi)存值
其中do while 是為了在操作失敗時(shí),再次進(jìn)行自旋操作,即把之前的邏輯再操作一次。
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
本文的使用方式:
本文中的使用方式是沒有像cas一樣帶自旋的操作,也沒有對(duì)version的版本號(hào)+1 ,他的操作邏輯是在操作時(shí),對(duì)版本號(hào)進(jìn)行+1 操作,然后要求version 如果是1 的情況下,才能操作,那么第一個(gè)線程在操作后,數(shù)據(jù)庫中的version變成了2,但是他自己滿足version=1 ,所以沒有問題,此時(shí)線程2執(zhí)行,線程2 最后也需要加上條件version =1 ,但是現(xiàn)在由于線程1已經(jīng)操作過了,所以線程2,操作時(shí)就不滿足version=1 的條件了,所以線程2無法執(zhí)行成功
2.1 樂觀鎖解決超賣問題
修改代碼方案一、
VoucherOrderServiceImpl 在扣減庫存時(shí),改為:
boolean success = seckillVoucherService.update()
.setSql("stock= stock -1") //set stock = stock -1
.eq("voucher_id", voucherId).eq("stock",voucher.getStock()).update(); //where id = ? and stock = ?
以上邏輯的核心含義是:只要我扣減庫存時(shí)的庫存和之前我查詢到的庫存是一樣的,就意味著沒有人在中間修改過庫存,那么此時(shí)就是安全的,但是以上這種方式通過測(cè)試發(fā)現(xiàn)會(huì)有很多失敗的情況,失敗的原因在于:在使用樂觀鎖過程中假設(shè)100個(gè)線程同時(shí)都拿到了100的庫存,然后大家一起去進(jìn)行扣減,但是100個(gè)人中只有1個(gè)人能扣減成功,其他的人在處理時(shí),他們?cè)诳蹨p時(shí),庫存已經(jīng)被修改過了,所以此時(shí)其他線程都會(huì)失敗
修改代碼方案二、
之前的方式要修改前后都保持一致,但是這樣我們分析過,成功的概率太低,所以我們的樂觀鎖需要變一下,改成stock大于0 即可
boolean success = seckillVoucherService.update()
.setSql("stock= stock -1")
.eq("voucher_id", voucherId).update().gt("stock",0); //where id = ? and stock > 0
這種方案也利用到了MySQL觸發(fā)的行級(jí)鎖,在更新的時(shí)候,where條件利用到索引或者主鍵會(huì)觸發(fā)行鎖,不走索引或者主鍵則觸發(fā)表鎖。當(dāng)某一個(gè)線程執(zhí)行update操作(例如id主鍵為1)并未commit的情況下,其它線程執(zhí)行update操作id主鍵為1的記錄時(shí)需要等待上一個(gè)操作提交完成。即行級(jí)鎖結(jié)合庫存大于零的條件有效避免了庫存超賣。
3. 優(yōu)惠券秒殺-一人一單
需求:修改秒殺業(yè)務(wù),要求同一個(gè)優(yōu)惠券,一個(gè)用戶只能下一單
現(xiàn)在的問題在于:
優(yōu)惠卷是為了引流,但是目前的情況是,一個(gè)人可以無限制的搶這個(gè)優(yōu)惠卷,所以我們應(yīng)當(dāng)增加一層邏輯,讓一個(gè)用戶只能下一個(gè)單,而不是讓一個(gè)用戶下多個(gè)單
具體操作邏輯如下:比如時(shí)間是否充足,如果時(shí)間充足,則進(jìn)一步判斷庫存是否足夠,然后再根據(jù)優(yōu)惠卷id和用戶id查詢是否已經(jīng)下過這個(gè)訂單,如果下過這個(gè)訂單,則不再下單,否則進(jìn)行下單
VoucherOrderServiceImpl
初步代碼:增加一人一單邏輯
@Override
public Result seckillVoucher(Long voucherId) {
// 1.查詢優(yōu)惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判斷秒殺是否開始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未開始
return Result.fail("秒殺尚未開始!");
}
// 3.判斷秒殺是否已經(jīng)結(jié)束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未開始
return Result.fail("秒殺已經(jīng)結(jié)束!");
}
// 4.判斷庫存是否充足
if (voucher.getStock() < 1) {
// 庫存不足
return Result.fail("庫存不足!");
}
// 5.一人一單邏輯
// 5.1.用戶id
Long userId = UserHolder.getUser().getId();
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
// 5.2.判斷是否存在
if (count > 0) {
// 用戶已經(jīng)購(gòu)買過了
return Result.fail("用戶已經(jīng)購(gòu)買過一次!");
}
//6,扣減庫存
boolean success = seckillVoucherService.update()
.setSql("stock= stock -1")
.eq("voucher_id", voucherId).update();
if (!success) {
//扣減庫存
return Result.fail("庫存不足!");
}
//7.創(chuàng)建訂單
VoucherOrder voucherOrder = new VoucherOrder();
// 7.1.訂單id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
// 7.3.代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
return Result.ok(orderId);
}
**存在問題:**現(xiàn)在的問題還是和之前一樣,并發(fā)過來,查詢數(shù)據(jù)庫,都不存在訂單,所以我們還是需要加鎖,但是樂觀鎖比較適合更新數(shù)據(jù),而現(xiàn)在是插入數(shù)據(jù),所以我們需要使用悲觀鎖操作
**注意:**在這里提到了非常多的問題,我們需要慢慢的來思考,首先我們的初始方案是封裝了一個(gè)createVoucherOrder方法,同時(shí)為了確保他線程安全,在方法上添加了一把synchronized 鎖
@Transactional
public synchronized Result createVoucherOrder(Long voucherId) {
Long userId = UserHolder.getUser().getId();
// 5.1.查詢訂單
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
// 5.2.判斷是否存在
if (count > 0) {
// 用戶已經(jīng)購(gòu)買過了
return Result.fail("用戶已經(jīng)購(gòu)買過一次!");
}
// 6.扣減庫存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0
.update();
if (!success) {
// 扣減失敗
return Result.fail("庫存不足!");
}
// 7.創(chuàng)建訂單
VoucherOrder voucherOrder = new VoucherOrder();
// 7.1.訂單id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 7.2.用戶id
voucherOrder.setUserId(userId);
// 7.3.代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
// 7.返回訂單id
return Result.ok(orderId);
}
,但是這樣添加鎖,鎖的粒度太粗了,在使用鎖過程中,控制鎖粒度 是一個(gè)非常重要的事情,因?yàn)槿绻i的粒度太大,會(huì)導(dǎo)致每個(gè)線程進(jìn)來都會(huì)鎖住,所以我們需要去控制鎖的粒度,以下這段代碼需要修改為:
intern() 這個(gè)方法是從常量池中拿到數(shù)據(jù),如果我們直接使用userId.toString() 他拿到的對(duì)象實(shí)際上是不同的對(duì)象,new出來的對(duì)象,我們使用鎖必須保證鎖必須是同一把,所以我們需要使用intern()方法
@Transactional
public Result createVoucherOrder(Long voucherId) {
Long userId = UserHolder.getUser().getId();
synchronized(userId.toString().intern()){
// 5.1.查詢訂單
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
// 5.2.判斷是否存在
if (count > 0) {
// 用戶已經(jīng)購(gòu)買過了
return Result.fail("用戶已經(jīng)購(gòu)買過一次!");
}
// 6.扣減庫存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0
.update();
if (!success) {
// 扣減失敗
return Result.fail("庫存不足!");
}
// 7.創(chuàng)建訂單
VoucherOrder voucherOrder = new VoucherOrder();
// 7.1.訂單id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 7.2.用戶id
voucherOrder.setUserId(userId);
// 7.3.代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
// 7.返回訂單id
return Result.ok(orderId);
}
}
但是以上代碼還是存在問題,問題的原因在于當(dāng)前方法被spring的事務(wù)控制,事務(wù)需要等待方法執(zhí)行結(jié)束后才可提交,如果在方法內(nèi)部加鎖,可能會(huì)導(dǎo)致當(dāng)前方法事務(wù)還沒有提交,但是鎖已經(jīng)釋放也會(huì)導(dǎo)致問題,所以我們選擇將當(dāng)前方法整體包裹起來,確保事務(wù)不會(huì)出現(xiàn)問題:如下:
在seckillVoucher 方法中,添加以下邏輯,這樣就能保證事務(wù)的特性,同時(shí)也控制了鎖的粒度
但是以上做法依然有問題,因?yàn)檎{(diào)用的方法,其實(shí)是this.的方式調(diào)用的,事務(wù)想要生效,還得利用代理來生效,所以這個(gè)地方,我們需要獲得原始的事務(wù)對(duì)象, 來操作事務(wù)
3.1 集群環(huán)境下的并發(fā)問題
通過加鎖可以解決在單機(jī)情況下的一人一單安全問題,但是在集群模式下就不行了。
1、我們將服務(wù)啟動(dòng)兩份,端口分別為8081和8082:
2、然后修改nginx的conf目錄下的nginx.conf文件,配置反向代理和負(fù)載均衡:
具體操作(略)
有關(guān)鎖失效原因分析
由于現(xiàn)在我們部署了多個(gè)tomcat,每個(gè)tomcat都有一個(gè)屬于自己的jvm,那么假設(shè)在服務(wù)器A的tomcat內(nèi)部,有兩個(gè)線程,這兩個(gè)線程由于使用的是同一份代碼,那么他們的鎖對(duì)象是同一個(gè),是可以實(shí)現(xiàn)互斥的,但是如果現(xiàn)在是服務(wù)器B的tomcat內(nèi)部,又有兩個(gè)線程,但是他們的鎖對(duì)象寫的雖然和服務(wù)器A一樣,但是鎖對(duì)象卻不是同一個(gè),所以線程3和線程4可以實(shí)現(xiàn)互斥,但是卻無法和線程1和線程2實(shí)現(xiàn)互斥,這就是 集群環(huán)境下,syn鎖失效的原因,在這種情況下,我們就需要使用分布式鎖來解決這個(gè)問題。
4、分布式鎖
4.1 基本原理和實(shí)現(xiàn)方式對(duì)比
分布式鎖:滿足分布式系統(tǒng)或集群模式下多進(jìn)程可見并且互斥的鎖。
分布式鎖的核心思想就是讓大家都使用同一把鎖,只要大家使用的是同一把鎖,那么我們就能鎖住線程,不讓線程進(jìn)行,讓程序串行執(zhí)行,這就是分布式鎖的核心思路
那么分布式鎖他應(yīng)該滿足一些什么樣的條件呢?
可見性:多個(gè)線程都能看到相同的結(jié)果,注意:這個(gè)地方說的可見性并不是并發(fā)編程中指的內(nèi)存可見性,只是說多個(gè)進(jìn)程之間都能感知到變化的意思
互斥:互斥是分布式鎖的最基本的條件,使得程序串行執(zhí)行
高可用:程序不易崩潰,時(shí)時(shí)刻刻都保證較高的可用性
高性能:由于加鎖本身就讓性能降低,所有對(duì)于分布式鎖本身需要他就較高的加鎖性能和釋放鎖性能
安全性:安全也是程序中必不可少的一環(huán)
常見的分布式鎖有三種
Mysql:mysql本身就帶有鎖機(jī)制,但是由于mysql性能本身一般,所以采用分布式鎖的情況下,其實(shí)使用mysql作為分布式鎖比較少見
Redis:redis作為分布式鎖是非常常見的一種使用方式,現(xiàn)在企業(yè)級(jí)開發(fā)中基本都使用redis或者zookeeper作為分布式鎖,利用setnx這個(gè)方法,如果插入key成功,則表示獲得到了鎖,如果有人插入成功,其他人插入失敗則表示無法獲得到鎖,利用這套邏輯來實(shí)現(xiàn)分布式鎖
Zookeeper:zookeeper也是企業(yè)級(jí)開發(fā)中較好的一個(gè)實(shí)現(xiàn)分布式鎖的方案,
4.2 Redis分布式鎖的實(shí)現(xiàn)核心思路
實(shí)現(xiàn)分布式鎖時(shí)需要實(shí)現(xiàn)的兩個(gè)基本方法:
-
獲取鎖:
- 互斥:確保只能有一個(gè)線程獲取鎖
- 非阻塞:嘗試一次,成功返回true,失敗返回false
-
釋放鎖:
- 手動(dòng)釋放
- 超時(shí)釋放:獲取鎖時(shí)添加一個(gè)超時(shí)時(shí)間
核心思路:
我們利用redis 的setNx 方法,當(dāng)有多個(gè)線程進(jìn)入時(shí),我們就利用該方法,第一個(gè)線程進(jìn)入時(shí),redis 中就有這個(gè)key 了,返回了1,如果結(jié)果是1,則表示他搶到了鎖,那么他去執(zhí)行業(yè)務(wù),然后再刪除鎖,退出鎖邏輯,沒有搶到鎖的哥們,等待一定時(shí)間后重試即可
4.3 實(shí)現(xiàn)分布式鎖版本一
- 加鎖邏輯
鎖的基本接口
SimpleRedisLock(類)
利用setnx方法進(jìn)行加鎖,同時(shí)增加過期時(shí)間,防止死鎖,此方法可以保證加鎖和增加過期時(shí)間具有原子性
private static final String KEY_PREFIX="lock:"
@Override
public boolean tryLock(long timeoutSec) {
// 獲取線程標(biāo)示
String threadId = Thread.currentThread().getId()
// 獲取鎖, name是“l(fā)ock:userId”,其值由構(gòu)造方法引入
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
- 釋放鎖邏輯
SimpleRedisLock
釋放鎖,防止刪除別人的鎖
public void unlock() {
//通過del刪除鎖
stringRedisTemplate.delete(KEY_PREFIX + name);
}
- 修改業(yè)務(wù)代碼
@Override
public Result seckillVoucher(Long voucherId) {
// 1.查詢優(yōu)惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判斷秒殺是否開始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未開始
return Result.fail("秒殺尚未開始!");
}
// 3.判斷秒殺是否已經(jīng)結(jié)束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未開始
return Result.fail("秒殺已經(jīng)結(jié)束!");
}
// 4.判斷庫存是否充足
if (voucher.getStock() < 1) {
// 庫存不足
return Result.fail("庫存不足!");
}
Long userId = UserHolder.getUser().getId();
//創(chuàng)建鎖對(duì)象(新增代碼)
SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
//獲取鎖對(duì)象
boolean isLock = lock.tryLock(1200);
//加鎖失敗
if (!isLock) {
return Result.fail("不允許重復(fù)下單");
}
try {
//獲取代理對(duì)象(事務(wù))
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
//釋放鎖
lock.unlock();
}
}
4.4 Redis分布式鎖誤刪情況說明
邏輯說明:
我們這里加的鎖一直是針對(duì)同一用戶的,同一個(gè)用戶點(diǎn)擊多次搶購(gòu),就會(huì)有多個(gè)線程進(jìn)入程序。
持有鎖的線程在鎖的內(nèi)部出現(xiàn)了阻塞,導(dǎo)致他的鎖自動(dòng)釋放,這時(shí)其他線程,線程2來嘗試獲得鎖,就拿到了這把鎖,然后線程2在持有鎖執(zhí)行過程中,線程1反應(yīng)過來,繼續(xù)執(zhí)行,而線程1執(zhí)行過程中,走到了刪除鎖邏輯,此時(shí)就會(huì)把本應(yīng)該屬于線程2的鎖進(jìn)行刪除,這就是誤刪別人鎖的情況說明
解決方案:解決方案就是在每個(gè)線程釋放鎖的時(shí)候,去判斷一下當(dāng)前這把鎖是否屬于自己,如果屬于自己,則不進(jìn)行鎖的刪除,假設(shè)還是上邊的情況,線程1卡頓,鎖自動(dòng)釋放,線程2進(jìn)入到鎖的內(nèi)部執(zhí)行邏輯,此時(shí)線程1反應(yīng)過來,然后刪除鎖,但是線程1,一看當(dāng)前這把鎖不是屬于自己,于是不進(jìn)行刪除鎖邏輯,當(dāng)線程2走到刪除鎖邏輯時(shí),如果沒有卡過自動(dòng)釋放鎖的時(shí)間點(diǎn),則判斷當(dāng)前這把鎖是屬于自己的,于是刪除這把鎖。
4.5 解決Redis分布式鎖誤刪問題
需求:修改之前的分布式鎖實(shí)現(xiàn),滿足:在獲取鎖時(shí)存入線程標(biāo)示(可以用UUID表示)
在釋放鎖時(shí)先獲取鎖中的線程標(biāo)示,判斷是否與當(dāng)前線程標(biāo)示一致
- 如果一致則釋放鎖
- 如果不一致則不釋放鎖
核心邏輯:在存入鎖時(shí),放入自己線程的標(biāo)識(shí),在刪除鎖時(shí),判斷當(dāng)前這把鎖的標(biāo)識(shí)是不是自己存入的,如果是,則進(jìn)行刪除,如果不是,則不進(jìn)行刪除。
具體代碼如下:加鎖
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
@Override
public boolean tryLock(long timeoutSec) {
// 獲取線程標(biāo)示
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 獲取鎖
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
釋放鎖
public void unlock() {
// 獲取線程標(biāo)示
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 獲取鎖中的標(biāo)示
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
// 判斷標(biāo)示是否一致
if(threadId.equals(id)) {
// 釋放鎖
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
有關(guān)代碼實(shí)操說明:
在我們修改完此處代碼后,我們重啟工程,然后啟動(dòng)兩個(gè)線程,第一個(gè)線程持有鎖后,手動(dòng)釋放鎖,第二個(gè)線程 此時(shí)進(jìn)入到鎖內(nèi)部,再放行第一個(gè)線程,此時(shí)第一個(gè)線程由于鎖的value值并非是自己,所以不能釋放鎖,也就無法刪除別人的鎖,此時(shí)第二個(gè)線程能夠正確釋放鎖,通過這個(gè)案例初步說明我們解決了鎖誤刪的問題。
4.6 分布式鎖的原子性問題
更為極端的誤刪邏輯說明:
線程1現(xiàn)在持有鎖之后,在執(zhí)行業(yè)務(wù)邏輯過程中,他正準(zhǔn)備刪除鎖,而且已經(jīng)走到了條件判斷的過程中,比如他已經(jīng)拿到了當(dāng)前這把鎖確實(shí)是屬于他自己的,正準(zhǔn)備刪除鎖,但是此時(shí)他的鎖到期了,那么此時(shí)線程2進(jìn)來,但是線程1他會(huì)接著往后執(zhí)行,當(dāng)他卡頓結(jié)束后,他直接就會(huì)執(zhí)行刪除鎖那行代碼,相當(dāng)于條件判斷并沒有起到作用,這就是刪鎖時(shí)的原子性問題,之所以有這個(gè)問題,是因?yàn)榫€程1的拿鎖,比鎖,刪鎖,實(shí)際上并不是原子性的,我們要防止剛才的情況發(fā)生,
4.7 Lua腳本解決多條命令原子性問題
Redis提供了Lua腳本功能,在一個(gè)腳本中編寫多條Redis命令,確保多條命令執(zhí)行時(shí)的原子性。Lua是一種編程語言,它的基本語法大家可以參考網(wǎng)站:https://www.runoob.com/lua/lua-tutorial.html,這里重點(diǎn)介紹Redis提供的調(diào)用函數(shù),我們可以使用lua去操作redis,又能保證他的原子性,這樣就可以實(shí)現(xiàn)拿鎖比鎖刪鎖是一個(gè)原子性動(dòng)作了,作為Java程序員這一塊并并不需要過于精通,只需要知道他有什么作用即可。
這里重點(diǎn)介紹Redis提供的調(diào)用函數(shù),語法如下:
redis.call('命令名稱', 'key', '其它參數(shù)', ...)
例如,我們要執(zhí)行set name jack,則腳本是這樣:
# 執(zhí)行 set name jack
redis.call('set', 'name', 'jack')
例如,我們要先執(zhí)行set name Rose,再執(zhí)行g(shù)et name,則腳本如下:
# 先執(zhí)行 set name jack
redis.call('set', 'name', 'Rose')
# 再執(zhí)行 get name
local name = redis.call('get', 'name')
# 返回
return name
寫好腳本以后,需要用Redis命令來調(diào)用腳本,調(diào)用腳本的常見命令如下:
例如,我們要執(zhí)行 redis.call(‘set’, ‘name’, ‘jack’) 這個(gè)腳本,語法如下:
如果腳本中的key、value不想寫死,可以作為參數(shù)傳遞。key類型參數(shù)會(huì)放入KEYS數(shù)組,其它參數(shù)會(huì)放入ARGV數(shù)組,在腳本中可以從KEYS和ARGV數(shù)組獲取這些參數(shù):
接下來我們來回一下我們釋放鎖的邏輯:
釋放鎖的業(yè)務(wù)流程是這樣的
? 1、獲取鎖中的線程標(biāo)示
? 2、判斷是否與指定的標(biāo)示(當(dāng)前線程標(biāo)示)一致
? 3、如果一致則釋放鎖(刪除)
? 4、如果不一致則什么都不做
如果用Lua腳本來表示則是這樣的:
最終我們操作redis的拿鎖比鎖刪鎖的lua腳本就會(huì)變成這樣
-- 這里的 KEYS[1] 就是鎖的key,這里的ARGV[1] 就是當(dāng)前線程標(biāo)示
-- 獲取鎖中的標(biāo)示,判斷是否與當(dāng)前線程標(biāo)示一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then
-- 一致,則刪除鎖
return redis.call('DEL', KEYS[1])
end
-- 不一致,則直接返回
return 0
4.8 利用Java代碼調(diào)用Lua腳本改造分布式鎖
lua腳本本身并不需要大家花費(fèi)太多時(shí)間去研究,只需要知道如何調(diào)用,大致是什么意思即可,所以在筆記中并不會(huì)詳細(xì)的去解釋這些lua表達(dá)式的含義。
我們的RedisTemplate中,可以利用execute方法去執(zhí)行l(wèi)ua腳本,參數(shù)對(duì)應(yīng)關(guān)系就如下圖股
Java代碼
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
public void unlock() {
// 調(diào)用lua腳本
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name),
ID_PREFIX + Thread.currentThread().getId());
}
經(jīng)過以上代碼改造后,我們就能夠?qū)崿F(xiàn) 拿鎖比鎖刪鎖的原子性動(dòng)作了~
小總結(jié):
基于Redis的分布式鎖實(shí)現(xiàn)思路:
- 利用set nx ex獲取鎖,并設(shè)置過期時(shí)間,保存線程標(biāo)示
- 釋放鎖時(shí)先判斷線程標(biāo)示是否與自己一致,一致則刪除鎖
- 特性:
- 利用set nx滿足互斥性
- 利用set ex保證故障時(shí)鎖依然能釋放,避免死鎖,提高安全性
- 利用Redis集群保證高可用和高并發(fā)特性
- 特性:
結(jié)尾總結(jié):一路走來,利用添加過期時(shí)間,防止死鎖問題的發(fā)生,但是有了過期時(shí)間之后,可能出現(xiàn)誤刪別人鎖的問題,這個(gè)問題我們開始是利用刪之前 通過拿鎖,比鎖,刪鎖這個(gè)邏輯來解決的,也就是刪之前判斷一下當(dāng)前這把鎖是否是屬于自己的,但是現(xiàn)在還有原子性問題,也就是我們沒法保證拿鎖比鎖刪鎖是一個(gè)原子性的動(dòng)作,最后通過lua表達(dá)式來解決這個(gè)問題文章來源:http://www.zghlxwxcb.cn/news/detail-438155.html
但是目前還剩下一個(gè)問題鎖不住,什么是鎖不住呢,你想一想,如果當(dāng)過期時(shí)間到了之后,我們可以給他續(xù)期一下,比如續(xù)個(gè)30s,就好像是網(wǎng)吧上網(wǎng), 網(wǎng)費(fèi)到了之后,然后說,來,網(wǎng)管,再給我來10塊的,是不是后邊的問題都不會(huì)發(fā)生了,那么續(xù)期問題怎么解決呢,可以依賴于redission框架,redission框架知識(shí)在下一篇博客展開描述。文章來源地址http://www.zghlxwxcb.cn/news/detail-438155.html
到了這里,關(guān)于Redis分布式鎖原理之實(shí)現(xiàn)秒殺搶優(yōu)惠卷業(yè)務(wù)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!