道生一,一生二,二生三,三生萬物
這張盡量結合上一章進行使用:上一章
這章主要是講如何通過redis
實現(xiàn)分布式鎖的
redis實現(xiàn)
這里我用redis
去實現(xiàn):
技術:golang
,redis
,數(shù)據(jù)結構
這里是有一個大體的實現(xiàn)思路:主要是使用redis
中這些語法
redis
命令說明:
setnx
命令:set if not exists
,當且僅當key
不存在時,將key
的值設為value
。若給定的key
已經(jīng)存在,則 SETNX不做任何動作。
- 返回1,說明該進程獲得鎖,將密鑰的值設為值
- 返回0,說明其他進程已經(jīng)獲得了鎖,進程不能進入臨界區(qū)命令格式:設置鎖。
get
命令:獲取鍵的值
- 如果存在,則返回
- 如果不存在,則返回
nil
命令格式:獲取鎖getset
命令:該方法是原子的,對鍵設置newvalue
這個值,并且返回鍵原來的舊值。
- 命令格式:設置鎖并設置鍵新值
del
命令:刪除redis
中指定的key
- 命令格式:
del lock.key
看了很多博客,這里總結一些比較常用的一些方法:
方案1:
原理:基于set命令的分布式鎖
使用:set命令
存在問題:可能產(chǎn)生死鎖
- 原因:假設線程獲取了鎖之后,在執(zhí)行任務的過程中掛掉,來不及顯示地執(zhí)行del命令釋放鎖,那么競爭該鎖的線程都會執(zhí)行不了,產(chǎn)生死鎖的情況。
- 解決辦法:設置鎖超時時間
- 原理:可以使用expire命令設置鎖超時時間
- 使用:setnx 的 key 必須設置一個超時時間,以保證即使沒有被顯式釋放,這把鎖也要在一定時間后自動釋放。
- 存在問題:可能產(chǎn)生死鎖
- 問題原因:setnx 和 expire 不是原子性的操作:
假設某個線程執(zhí)行 setnx 命令,成功獲得了鎖,但是還沒來得及執(zhí)行expire 命令,服務器就掛掉了,這樣一來,這把鎖就沒有設置過期時間了,變成了死鎖,別的線程再也沒有辦法獲得鎖了 - 使用:setnx 的 key 必須設置一個超時時間,以保證即使沒有被顯式釋放,這把鎖也要在一定時間后自動釋放
- 解決辦法:redis 的 set 命令支持在獲取鎖的同時設置 key 的過期時間
- 存在問題:鎖過期提前自動釋放,線程A刪除了線程B的鎖
- 問題原因:鎖過期提前自動釋放
- 假如線程A成功得到了鎖,并且設置的超時時間是 30 秒。如果某些原因?qū)е戮€程 A 執(zhí)行的很慢,過了 30 秒都沒執(zhí)行完,這時候鎖過期自動釋放,線程 B 得到了鎖。
- 隨后,線程A執(zhí)行完任務,接著執(zhí)行del指令來釋放鎖。但這時候線程 B 還沒執(zhí)行完,線程A實際上刪除的是線程B加的鎖。
- 使用:在加鎖的時候把當前的線程 ID 當做value,并在刪除之前驗證 key 對應的 value 是不是自己線程的 ID
- 解決辦法:可以在 del 釋放鎖之前做一個判斷,驗證當前的鎖是不是自己加的鎖
- 存在問題:get操作、判斷和釋放鎖是兩個獨立操作,非原子操作
- 問題原因:判斷和釋放鎖是兩個獨立操作
- 解決辦法:對于非原子性的問題,我們可以使用Lua腳本來確保操作的原子性
- 問題原因:鎖過期提前自動釋放
- 問題原因:setnx 和 expire 不是原子性的操作:
諾是想要更好的體驗可以通過我的飛書觀看:飛升思維導圖
方式2:
這里的一些出現(xiàn)的方法是java
中的。諾是需要可以改成自己的所屬語言,這張圖較為清晰我也就不做多余的說名,詳情可以看我的飛書:飛書思維導圖
具體的實現(xiàn)操作:
const (
//解鎖,使用lua變成原子性
unLockScript = "if redis.call('get',KEYS[1])==ARGV[1]" +
"then redis.call('del',KEYS[1]) " +
"return 1 " +
"else " +
"return 0 " +
"end"
//續(xù)期(看門狗)
watchLogScript = "if redis.call('get',KEYS[1])==ARGV[1] then return redis.call('expire',KEYS[1],ARGV[2]) else return 0 end"
)
type DispersedLock struct {
key string //鎖
value string //鎖的值,隨機值(可以用userId+requestId)
expire int //鎖過期時間,單位毫秒
lockClient redis.Cmdable //啟用鎖的客戶端,redis目前
unLockScript string //lua 腳本
watchLogScript string //看門狗 lua
unlockChan chan struct{} //通知通道
}
func (d DispersedLock) getScript(ctx context.Context, script string) string {
result, _ := d.lockClient.ScriptLoad(ctx, script).Result()
return result
}
var scriptMap sync.Map
func NewLockRedis(ctx context.Context, cmdable redis.Cmdable, key string, expire int, value string) *DispersedLock {
lock := &DispersedLock{
key: key,
value: value,
expire: expire,
}
lock.lockClient = cmdable
lockScrip, _ := scriptMap.LoadOrStore("dispersed_lock", lock.getScript(ctx, unLockScript))
lockWatch, _ := scriptMap.LoadOrStore("watch_log", lock.getScript(ctx, watchLogScript))
lock.unLockScript = lockScrip.(string)
lock.watchLogScript = lockWatch.(string)
lock.unlockChan = make(chan struct{}, 0)
return lock
}
func (d DispersedLock) Lock(ctx context.Context) bool {
ok, _ := d.lockClient.SetNX(ctx, d.key, d.value, time.Duration(d.expire)*time.Millisecond).Result()
if ok {
go d.watchDog(ctx)
}
return ok
}
func (d DispersedLock) watchDog(ctx context.Context) {
//創(chuàng)建一個定時器,每到工作時間的2/3就出發(fā)一次
duration := time.Duration(d.expire*1e3*2/3) * time.Millisecond
ticker := time.NewTicker(duration)
//打包成原子
for {
select {
case <-ticker.C:
//腳本參數(shù)
args := []interface{}{
d.value,
d.expire,
}
result, err := d.lockClient.Eval(ctx, d.watchLogScript, []string{d.key}, args...).Result()
if err != nil {
logS.LogM.ErrorF(ctx, "watchDog error %s", err)
return
}
res, ok := result.(int64)
if !ok {
return
}
if res == 0 {
return
}
case <-d.unlockChan:
return
}
}
}
func (d DispersedLock) unlock(ctx context.Context) bool {
//腳本參數(shù)
args := []interface{}{
d.value,
}
result, _ := d.lockClient.Eval(ctx, d.unLockScript, []string{d.key}, args...).Result()
close(d.unlockChan)
if result.(int64) > 0 {
return true
} else {
return false
}
}
const lockMaxLoopNum = 1000
// LoopLock 輪詢等待
func (d DispersedLock) LoopLock(ctx context.Context, sleepTime int) bool {
cancel, cannel := context.WithCancel(context.Background())
ticker := time.NewTicker(time.Duration(sleepTime) * time.Millisecond)
count := 0
status := 0
loop:
for {
select {
case <-cancel.Done():
break loop
default:
}
if d.Lock(ctx) {
ticker.Stop()
cannel()
break
} else {
<-ticker.C
}
count++
//判斷是否大于最大獲取次數(shù),達到最大直接退出循環(huán)
if count >= lockMaxLoopNum {
status = 1
break
}
}
cannel()
if status != 0 {
return false
}
return true
}
這些就是通過redis去實現(xiàn)一個分布式鎖的具體步驟,很多實現(xiàn),估計很多其他語言的朋友們可能會有些蒙圈。但是沒有關系。go 關鍵字
你就當他是一個線程就可以了,select 關鍵字
,你可以理解成隊列+if
的判斷
推薦使用包
golang
:redsync
import "github.com/go-redsync/redsync/v4"
這個包基本上滿足了市面上分布式鎖的所有需求,包括續(xù)租:(但是這里的續(xù)租需要一定的條件才能觸發(fā),這個條件要達到redis實例的最大值時才能觸發(fā))。所以為了,方便使用,建議可以自己續(xù)寫一個續(xù)租的方法。文章來源:http://www.zghlxwxcb.cn/news/detail-814446.html
這里獻上我的:文章來源地址http://www.zghlxwxcb.cn/news/detail-814446.html
// NewLock 實例化一個分布式鎖,用來實現(xiàn)冪等,降低重試成本
func NewLock(mutexName string) *redsync.Mutex {
pool := goredis.NewPool(configuration.RedisClient)
rs := redsync.New(pool)
newString := uuid.NewString()
lockName := "Lock:" + newString + ":" + mutexName
mutex := rs.NewMutex(lockName)
return mutex
}
// LockRelet 周期性續(xù)租,過去無可挽回,未來可以改變
// num定義時間:單位毫秒
// size定義續(xù)租的次數(shù)
func LockRelet(num int, size int, mutex *redsync.Mutex) chan bool {
done := make(chan bool)
if size <= 0 {
return nil
}
go func() {
ticker := time.NewTicker(time.Duration(num) * time.Millisecond)
defer ticker.Stop()
for size > 0 {
size--
select {
case <-ticker.C:
extend, err := mutex.Extend()
if err != nil {
logS.LogM.Panicf("Failed to extend lock:", err)
} else if !extend {
logS.LogM.Panicf("Failed to extend lock: not successes")
}
case <-done:
return
}
}
}()
return done
}
到了這里,關于分布式鎖實現(xiàn)(mysql,以及redis)以及分布式的概念(續(xù))redsync包使用的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!