前言
在單進(jìn)程環(huán)境下,要保證一個(gè)代碼塊的同步執(zhí)行,直接用synchronized
關(guān)鍵字或ReetrantLock
即可。在分布式環(huán)境下,要保證多個(gè)節(jié)點(diǎn)的線程對(duì)代碼塊的同步訪問,就必須要用到分布式鎖方案。
分布式鎖實(shí)現(xiàn)方案有很多,有基于關(guān)系型數(shù)據(jù)庫行鎖實(shí)現(xiàn)的;有基于ZooKeeper臨時(shí)順序節(jié)點(diǎn)實(shí)現(xiàn)的;還有基于 Redis setnx 命令實(shí)現(xiàn)的。本文介紹一下基于 Redis 實(shí)現(xiàn)的分布式鎖方案。
理解分布式鎖
實(shí)現(xiàn)分布式鎖有幾個(gè)要求
- 互斥性:任意時(shí)刻,最多只會(huì)有一個(gè)客戶端線程可以獲得鎖
- 可重入:同一客戶端的同一線程,獲得鎖后能夠再次獲得鎖
- 避免死鎖:客戶端獲得鎖后即使宕機(jī),后續(xù)客戶端也可以獲得鎖
- 避免誤解鎖:客戶端A加的鎖只能由A自己釋放
- 釋放鎖通知:持有鎖的客戶端釋放鎖后,最好可以通知其它客戶端繼續(xù)搶鎖
- 高性能和高可用
Redis 服務(wù)端命令是單線程串行執(zhí)行的,天生就是原子的,并且支持執(zhí)行自定義的 lua 腳本,功能上更加強(qiáng)大。
關(guān)于互斥性,我們可以用 setnx 命令實(shí)現(xiàn),Redis 可以保證只會(huì)有一個(gè)客戶端 set 成功。但是由于我們要實(shí)現(xiàn)的是一個(gè)分布式的可重入鎖,數(shù)據(jù)結(jié)構(gòu)得用 hash,用客戶端ID+線程ID作為 field,value 記作鎖的重入次數(shù)即可。
關(guān)于死鎖,代碼里建議把鎖的釋放寫在 finally 里面確保一定執(zhí)行,針對(duì)客戶端搶到鎖后宕機(jī)的場(chǎng)景,可以給 redis key 設(shè)置一個(gè)超時(shí)時(shí)間來解決。
關(guān)于誤解鎖,客戶端在釋放鎖時(shí),必須判斷 field 是否當(dāng)前客戶端ID以及線程ID一致,不一致就不執(zhí)行刪除,這里需要用到 lua 腳本判斷。
關(guān)于釋放鎖通知,可以利用 Redis 發(fā)布訂閱模式,給每個(gè)鎖創(chuàng)建一個(gè)頻道,釋放鎖的客戶端負(fù)責(zé)往頻道里發(fā)送消息通知等待搶鎖的客戶端。
最后關(guān)于高性能和高可用,因?yàn)?Redis 是基于內(nèi)存的,天生就是高性能的。但是 Redis 服務(wù)本身一旦出現(xiàn)問題,分布式鎖也就不可用了,此時(shí)可以多部署幾臺(tái)獨(dú)立的示例,使用 RedLock 算法來解決高可用的問題。
設(shè)計(jì)實(shí)現(xiàn)
首先我們定義一個(gè) RedisLock 鎖對(duì)象的抽象接口,只有嘗試加鎖和釋放鎖方法
public interface RedisLock {
boolean tryLock();
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit);
void unlock();
}
然后提供一個(gè)默認(rèn)實(shí)現(xiàn) DefaultRedisLock
public class DefaultRedisLock implements RedisLock {
// 客戶端ID UUID
private final String clientId;
private final StringRedisTemplate redisTemplate;
// 鎖頻道訂閱器 接收釋放鎖通知
private final LockSubscriber lockSubscriber;
// 加鎖的key
private final String lockKey;
}
關(guān)于tryLock()
,首先執(zhí)行l(wèi)ua腳本嘗試獲取鎖,如果加鎖失敗則返回其它客戶端持有鎖的過期時(shí)間,客戶端訂閱鎖對(duì)應(yīng)的頻道,然后sleep,直到收到鎖釋放的通知再繼續(xù)搶鎖。最終不管有沒有搶到鎖,都會(huì)在 finally 取消頻道訂閱。
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) {
final long timeout = System.currentTimeMillis() + unit.toMillis(waitTime);
final long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
if (ttl == null) {
return true;
}
if (System.currentTimeMillis() >= timeout) {
return false;
}
final Semaphore semaphore = lockSubscriber.subscribe(getChannel(lockKey), threadId);
try {
while (true) {
if (System.currentTimeMillis() >= timeout) {
return false;
}
ttl = tryAcquire(leaseTime, unit, threadId);
if (ttl == null) {
return true;
}
if (System.currentTimeMillis() >= timeout) {
return false;
}
semaphore.tryAcquire(timeout - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lockSubscriber.unsubscribe(getChannel(lockKey), threadId);
}
return false;
}
tryAcquire()
就是執(zhí)行l(wèi)ua腳本來加鎖,解釋一下這段腳本的邏輯:首先判斷 lockKey 是否存在,不存在則直接設(shè)置 lockKey并且設(shè)置過期時(shí)間,返回空,表示加鎖成功。存在則判斷 field 是否和當(dāng)前客戶端ID+線程ID一致,一致則代表鎖重入,遞增一下value即可,不一致代表加鎖失敗,返回鎖的過期時(shí)間
private Long tryAcquire(long leaseTime, TimeUnit timeUnit, long threadId) {
return redisTemplate.execute(RedisScript.of(
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; end;" +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; end;" +
"return redis.call('pttl', KEYS[1]);", Long.class), Collections.singletonList(lockKey),
String.valueOf(timeUnit.toMillis(leaseTime)), getLockName(threadId));
}
lockName是由客戶端ID和線程ID組成的:
private String getLockName(long threadId) {
return clientId + ":" + threadId;
}
如果加鎖失敗,客戶端會(huì)嘗試訂閱對(duì)應(yīng)的頻道,名稱規(guī)則是:
private String getChannel(String lockKey) {
return "__lock_channel__:" + lockKey;
}
訂閱方法是LockSubscriber#subscribe
,同一個(gè)頻道無需訂閱多個(gè)監(jiān)聽器,所以用一個(gè) Map 記錄。訂閱成功以后,會(huì)返回當(dāng)前線程對(duì)應(yīng)的一個(gè) Semaphore 對(duì)象,默認(rèn)許可數(shù)是0,當(dāng)前線程會(huì)調(diào)用Semaphore#tryAcquire
等待許可數(shù),監(jiān)聽器在收到鎖釋放消息后會(huì)給 Semaphore 對(duì)象增加許可數(shù),喚醒線程繼續(xù)搶鎖。
@Component
public class LockSubscriber {
@Autowired
private RedisMessageListenerContainer messageListenerContainer;
private final Map<String, Map<Long, Semaphore>> channelSemaphores = new HashMap<>();
private final Map<String, MessageListener> listeners = new HashMap<>();
private final StringRedisSerializer serializer = new StringRedisSerializer();
public synchronized Semaphore subscribe(String channelName, long threadId) {
MessageListener old = listeners.put(channelName, new MessageListener() {
@Override
public void onMessage(Message message, byte[] pattern) {
String channel = serializer.deserialize(message.getChannel());
String ignore = serializer.deserialize(message.getBody());
Map<Long, Semaphore> semaphoreMap = channelSemaphores.get(channel);
if (semaphoreMap != null && !semaphoreMap.isEmpty()) {
semaphoreMap.values().stream().findFirst().ifPresent(Semaphore::release);
}
}
});
if (old == null) {
messageListenerContainer.addMessageListener(listeners.get(channelName), new ChannelTopic(channelName));
}
Semaphore semaphore = new Semaphore(0);
Map<Long, Semaphore> semaphoreMap = channelSemaphores.getOrDefault(channelName, new HashMap<>());
semaphoreMap.put(threadId, semaphore);
channelSemaphores.put(channelName, semaphoreMap);
return semaphore;
}
public synchronized void unsubscribe(String channelName, long threadId) {
Map<Long, Semaphore> semaphoreMap = channelSemaphores.get(channelName);
if (semaphoreMap != null) {
semaphoreMap.remove(threadId);
if (semaphoreMap.isEmpty()) {
MessageListener listener = listeners.remove(channelName);
if (listener != null) {
messageListenerContainer.removeMessageListener(listener);
}
}
}
}
}
對(duì)于 unlock,就只是一段 lua 腳本,這里解釋一下:判斷當(dāng)前客戶端ID+線程ID 這個(gè) field 是否存在,存在說明是自己加的鎖,可以釋放。不存在說明不是自己加的鎖,無需做任何處理。因?yàn)槭强芍厝腈i,每次 unlock 都只是遞減一下 value,只有當(dāng) value 等于0時(shí)才是真正的釋放鎖。釋放鎖的時(shí)候會(huì) del lockKey,再 publish 發(fā)送鎖釋放通知,讓其他客戶端可以繼續(xù)搶鎖。
@Override
public void unlock() {
long threadId = Thread.currentThread().getId();
redisTemplate.execute(RedisScript.of(
"if (redis.call('hexists', KEYS[1], ARGV[1]) == 0) then " +
"return nil;end;" +
"local counter = redis.call('hincrby', KEYS[1], ARGV[1], -1); " +
"if (counter > 0) then " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], 1); " +
"return 1; " +
"end; " +
"return nil;", Long.class), Arrays.asList(lockKey, getChannel(lockKey)),
getLockName(threadId));
}
最后,我們需要一個(gè) RedisLockFactory 來創(chuàng)建鎖對(duì)象,它同時(shí)會(huì)生成客戶端ID
@Component
public class RedisLockFactory {
private static final String CLIENT_ID = UUID.randomUUID().toString();
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private LockSubscriber lockSubscriber;
public RedisLock getLock(String lockKey) {
return new DefaultRedisLock(CLIENT_ID, redisTemplate, lockSubscriber, lockKey);
}
}
至此,一個(gè)基于 Redis 實(shí)現(xiàn)的分布式可重入鎖就完成了。文章來源:http://www.zghlxwxcb.cn/news/detail-826911.html
尾巴
目前這個(gè)版本的分布式鎖,保證了互斥性、可重入、避免死鎖和誤解鎖、實(shí)現(xiàn)了釋放鎖通知,但是并沒有高可用的保證。如果 Redis 是單實(shí)例部署,就會(huì)存在單點(diǎn)問題,Redis 一旦故障,整個(gè)分布式鎖將不可用。如果 Redis 是主從集群模式部署,雖然有主從自動(dòng)切換,但是 Master 和 Slave 之間的數(shù)據(jù)同步是存在延遲的,分布式鎖可能會(huì)出現(xiàn)問題。比如:客戶端A加鎖成功,lockKey 寫入了 Master,此時(shí) Master 宕機(jī),其它 Slave 升級(jí)成了 Master,但是還沒有同步到 lockKey,客戶端B來加鎖也會(huì)成功,這就沒有保證互斥性。針對(duì)這個(gè)問題,可以參考 RedLock 算法,部署多個(gè)單獨(dú)的 Redis 示例,只要一半以上的Redis節(jié)點(diǎn)加鎖成功就算成功,來盡可能的保證服務(wù)高可用。文章來源地址http://www.zghlxwxcb.cn/news/detail-826911.html
到了這里,關(guān)于Redis分布式可重入鎖實(shí)現(xiàn)方案的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!