国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Redis分布式可重入鎖實(shí)現(xiàn)方案

這篇具有很好參考價(jià)值的文章主要介紹了Redis分布式可重入鎖實(shí)現(xiàn)方案。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

前言

在單進(jìn)程環(huán)境下,要保證一個(gè)代碼塊的同步執(zhí)行,直接用synchronized 關(guān)鍵字或ReetrantLock 即可。在分布式環(huán)境下,要保證多個(gè)節(jié)點(diǎn)的線程對(duì)代碼塊的同步訪問,就必須要用到分布式鎖方案。
分布式鎖實(shí)現(xiàn)方案有很多,有基于關(guān)系型數(shù)據(jù)庫行鎖實(shí)現(xiàn)的;有基于ZooKeeper臨時(shí)順序節(jié)點(diǎn)實(shí)現(xiàn)的;還有基于 Redis setnx 命令實(shí)現(xiàn)的。本文介紹一下基于 Redis 實(shí)現(xiàn)的分布式鎖方案。

理解分布式鎖

實(shí)現(xiàn)分布式鎖有幾個(gè)要求

  • 互斥性:任意時(shí)刻,最多只會(huì)有一個(gè)客戶端線程可以獲得鎖
  • 可重入:同一客戶端的同一線程,獲得鎖后能夠再次獲得鎖
  • 避免死鎖:客戶端獲得鎖后即使宕機(jī),后續(xù)客戶端也可以獲得鎖
  • 避免誤解鎖:客戶端A加的鎖只能由A自己釋放
  • 釋放鎖通知:持有鎖的客戶端釋放鎖后,最好可以通知其它客戶端繼續(xù)搶鎖
  • 高性能和高可用

Redis 服務(wù)端命令是單線程串行執(zhí)行的,天生就是原子的,并且支持執(zhí)行自定義的 lua 腳本,功能上更加強(qiáng)大。
關(guān)于互斥性,我們可以用 setnx 命令實(shí)現(xiàn),Redis 可以保證只會(huì)有一個(gè)客戶端 set 成功。但是由于我們要實(shí)現(xiàn)的是一個(gè)分布式的可重入鎖,數(shù)據(jù)結(jié)構(gòu)得用 hash,用客戶端ID+線程ID作為 field,value 記作鎖的重入次數(shù)即可。
關(guān)于死鎖,代碼里建議把鎖的釋放寫在 finally 里面確保一定執(zhí)行,針對(duì)客戶端搶到鎖后宕機(jī)的場(chǎng)景,可以給 redis key 設(shè)置一個(gè)超時(shí)時(shí)間來解決。
關(guān)于誤解鎖,客戶端在釋放鎖時(shí),必須判斷 field 是否當(dāng)前客戶端ID以及線程ID一致,不一致就不執(zhí)行刪除,這里需要用到 lua 腳本判斷。
關(guān)于釋放鎖通知,可以利用 Redis 發(fā)布訂閱模式,給每個(gè)鎖創(chuàng)建一個(gè)頻道,釋放鎖的客戶端負(fù)責(zé)往頻道里發(fā)送消息通知等待搶鎖的客戶端。
最后關(guān)于高性能和高可用,因?yàn)?Redis 是基于內(nèi)存的,天生就是高性能的。但是 Redis 服務(wù)本身一旦出現(xiàn)問題,分布式鎖也就不可用了,此時(shí)可以多部署幾臺(tái)獨(dú)立的示例,使用 RedLock 算法來解決高可用的問題。

設(shè)計(jì)實(shí)現(xiàn)

首先我們定義一個(gè) RedisLock 鎖對(duì)象的抽象接口,只有嘗試加鎖和釋放鎖方法

public interface RedisLock {

    boolean tryLock();

    boolean tryLock(long waitTime, long leaseTime, TimeUnit unit);

    void unlock();
}

然后提供一個(gè)默認(rèn)實(shí)現(xiàn) DefaultRedisLock

public class DefaultRedisLock implements RedisLock {

    // 客戶端ID UUID
    private final String clientId;
    private final StringRedisTemplate redisTemplate;
    // 鎖頻道訂閱器 接收釋放鎖通知
    private final LockSubscriber lockSubscriber;
    // 加鎖的key
    private final String lockKey;
}

關(guān)于tryLock() ,首先執(zhí)行l(wèi)ua腳本嘗試獲取鎖,如果加鎖失敗則返回其它客戶端持有鎖的過期時(shí)間,客戶端訂閱鎖對(duì)應(yīng)的頻道,然后sleep,直到收到鎖釋放的通知再繼續(xù)搶鎖。最終不管有沒有搶到鎖,都會(huì)在 finally 取消頻道訂閱。

@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) {
    final long timeout = System.currentTimeMillis() + unit.toMillis(waitTime);
    final long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    if (ttl == null) {
        return true;
    }
    if (System.currentTimeMillis() >= timeout) {
        return false;
    }
    final Semaphore semaphore = lockSubscriber.subscribe(getChannel(lockKey), threadId);
    try {
        while (true) {
            if (System.currentTimeMillis() >= timeout) {
                return false;
            }
            ttl = tryAcquire(leaseTime, unit, threadId);
            if (ttl == null) {
                return true;
            }
            if (System.currentTimeMillis() >= timeout) {
                return false;
            }
            semaphore.tryAcquire(timeout - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        lockSubscriber.unsubscribe(getChannel(lockKey), threadId);
    }
    return false;
}

tryAcquire() 就是執(zhí)行l(wèi)ua腳本來加鎖,解釋一下這段腳本的邏輯:首先判斷 lockKey 是否存在,不存在則直接設(shè)置 lockKey并且設(shè)置過期時(shí)間,返回空,表示加鎖成功。存在則判斷 field 是否和當(dāng)前客戶端ID+線程ID一致,一致則代表鎖重入,遞增一下value即可,不一致代表加鎖失敗,返回鎖的過期時(shí)間

private Long tryAcquire(long leaseTime, TimeUnit timeUnit, long threadId) {
    return redisTemplate.execute(RedisScript.of(
                    "if (redis.call('exists', KEYS[1]) == 0) then " +
                            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return nil; end;" +
                            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return nil; end;" +
                            "return redis.call('pttl', KEYS[1]);", Long.class), Collections.singletonList(lockKey),
            String.valueOf(timeUnit.toMillis(leaseTime)), getLockName(threadId));
}

lockName是由客戶端ID和線程ID組成的:

private String getLockName(long threadId) {
    return clientId + ":" + threadId;
}

如果加鎖失敗,客戶端會(huì)嘗試訂閱對(duì)應(yīng)的頻道,名稱規(guī)則是:

private String getChannel(String lockKey) {
    return "__lock_channel__:" + lockKey;
}

訂閱方法是LockSubscriber#subscribe ,同一個(gè)頻道無需訂閱多個(gè)監(jiān)聽器,所以用一個(gè) Map 記錄。訂閱成功以后,會(huì)返回當(dāng)前線程對(duì)應(yīng)的一個(gè) Semaphore 對(duì)象,默認(rèn)許可數(shù)是0,當(dāng)前線程會(huì)調(diào)用Semaphore#tryAcquire 等待許可數(shù),監(jiān)聽器在收到鎖釋放消息后會(huì)給 Semaphore 對(duì)象增加許可數(shù),喚醒線程繼續(xù)搶鎖。

@Component
public class LockSubscriber {

    @Autowired
    private RedisMessageListenerContainer messageListenerContainer;
    private final Map<String, Map<Long, Semaphore>> channelSemaphores = new HashMap<>();
    private final Map<String, MessageListener> listeners = new HashMap<>();
    private final StringRedisSerializer serializer = new StringRedisSerializer();

    public synchronized Semaphore subscribe(String channelName, long threadId) {
        MessageListener old = listeners.put(channelName, new MessageListener() {
            @Override
            public void onMessage(Message message, byte[] pattern) {
                String channel = serializer.deserialize(message.getChannel());
                String ignore = serializer.deserialize(message.getBody());
                Map<Long, Semaphore> semaphoreMap = channelSemaphores.get(channel);
                if (semaphoreMap != null && !semaphoreMap.isEmpty()) {
                    semaphoreMap.values().stream().findFirst().ifPresent(Semaphore::release);
                }
            }
        });
        if (old == null) {
            messageListenerContainer.addMessageListener(listeners.get(channelName), new ChannelTopic(channelName));
        }
        Semaphore semaphore = new Semaphore(0);
        Map<Long, Semaphore> semaphoreMap = channelSemaphores.getOrDefault(channelName, new HashMap<>());
        semaphoreMap.put(threadId, semaphore);
        channelSemaphores.put(channelName, semaphoreMap);
        return semaphore;
    }

    public synchronized void unsubscribe(String channelName, long threadId) {
        Map<Long, Semaphore> semaphoreMap = channelSemaphores.get(channelName);
        if (semaphoreMap != null) {
            semaphoreMap.remove(threadId);
            if (semaphoreMap.isEmpty()) {
                MessageListener listener = listeners.remove(channelName);
                if (listener != null) {
                    messageListenerContainer.removeMessageListener(listener);
                }
            }
        }
    }
}

對(duì)于 unlock,就只是一段 lua 腳本,這里解釋一下:判斷當(dāng)前客戶端ID+線程ID 這個(gè) field 是否存在,存在說明是自己加的鎖,可以釋放。不存在說明不是自己加的鎖,無需做任何處理。因?yàn)槭强芍厝腈i,每次 unlock 都只是遞減一下 value,只有當(dāng) value 等于0時(shí)才是真正的釋放鎖。釋放鎖的時(shí)候會(huì) del lockKey,再 publish 發(fā)送鎖釋放通知,讓其他客戶端可以繼續(xù)搶鎖。

@Override
public void unlock() {
    long threadId = Thread.currentThread().getId();
    redisTemplate.execute(RedisScript.of(
                    "if (redis.call('hexists', KEYS[1], ARGV[1]) == 0) then " +
                            "return nil;end;" +
                            "local counter = redis.call('hincrby', KEYS[1], ARGV[1], -1); " +
                            "if (counter > 0) then " +
                            "return 0; " +
                            "else " +
                            "redis.call('del', KEYS[1]); " +
                            "redis.call('publish', KEYS[2], 1); " +
                            "return 1; " +
                            "end; " +
                            "return nil;", Long.class), Arrays.asList(lockKey, getChannel(lockKey)),
            getLockName(threadId));
}

最后,我們需要一個(gè) RedisLockFactory 來創(chuàng)建鎖對(duì)象,它同時(shí)會(huì)生成客戶端ID

@Component
public class RedisLockFactory {

    private static final String CLIENT_ID = UUID.randomUUID().toString();
    @Autowired
    private StringRedisTemplate redisTemplate;
    @Autowired
    private LockSubscriber lockSubscriber;

    public RedisLock getLock(String lockKey) {
        return new DefaultRedisLock(CLIENT_ID, redisTemplate, lockSubscriber, lockKey);
    }
}

至此,一個(gè)基于 Redis 實(shí)現(xiàn)的分布式可重入鎖就完成了。

尾巴

目前這個(gè)版本的分布式鎖,保證了互斥性、可重入、避免死鎖和誤解鎖、實(shí)現(xiàn)了釋放鎖通知,但是并沒有高可用的保證。如果 Redis 是單實(shí)例部署,就會(huì)存在單點(diǎn)問題,Redis 一旦故障,整個(gè)分布式鎖將不可用。如果 Redis 是主從集群模式部署,雖然有主從自動(dòng)切換,但是 Master 和 Slave 之間的數(shù)據(jù)同步是存在延遲的,分布式鎖可能會(huì)出現(xiàn)問題。比如:客戶端A加鎖成功,lockKey 寫入了 Master,此時(shí) Master 宕機(jī),其它 Slave 升級(jí)成了 Master,但是還沒有同步到 lockKey,客戶端B來加鎖也會(huì)成功,這就沒有保證互斥性。針對(duì)這個(gè)問題,可以參考 RedLock 算法,部署多個(gè)單獨(dú)的 Redis 示例,只要一半以上的Redis節(jié)點(diǎn)加鎖成功就算成功,來盡可能的保證服務(wù)高可用。文章來源地址http://www.zghlxwxcb.cn/news/detail-826911.html

到了這里,關(guān)于Redis分布式可重入鎖實(shí)現(xiàn)方案的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Redisson 分布式鎖可重入的原理

    Redisson 分布式鎖可重入的原理

    目錄 1. 使用 Redis 實(shí)現(xiàn)分布式鎖存在的問題 2. Redisson 的分布式鎖解決不可重入問題的原理 不可重入:同一個(gè)線程無法兩次 / 多次獲取鎖 舉例 method1 執(zhí)行需要獲取鎖 method2 執(zhí)行也需要(同一把)鎖 如果 method1 中調(diào)用了 method2,就會(huì)出現(xiàn)死鎖的情況 method1 執(zhí)行的過程是同一個(gè)線

    2024年01月25日
    瀏覽(20)
  • 【面試 分布式鎖詳細(xì)解析】續(xù)命 自旋鎖 看門狗 重入鎖,加鎖 續(xù)命 解鎖 核心源碼,lua腳本解析,具體代碼和lua腳本如何實(shí)現(xiàn)

    自己實(shí)現(xiàn)鎖續(xù)命 在 controller 里開一個(gè) 線程 (可以為 守護(hù)線程) 每10秒,判斷一個(gè) 這個(gè) UUID是否存在,如果 存在,重置為 30秒。 如果不存在,守護(hù)線程 也結(jié)束。 基本的key value 基本的使用 setIfAbsent存在不設(shè)置 16384 Redis 集群沒有使用一致性hash, 而是引入了哈希槽的概念。 R

    2023年04月09日
    瀏覽(27)
  • curator實(shí)現(xiàn)的zookeeper可重入鎖

    Curator是一個(gè)Apache開源的ZooKeeper客戶端庫,它提供了許多高級(jí)特性和工具類,用于簡化在分布式環(huán)境中使用ZooKeeper的開發(fā)。其中之一就是可重入鎖。 Curator提供了 InterProcessMutex 類來實(shí)現(xiàn)可重入鎖。以下是使用Curator實(shí)現(xiàn)ZooKeeper可重入鎖的示例: import org.apache.curator.framework.Curato

    2024年02月15日
    瀏覽(19)
  • Java入門-可重入鎖

    Java入門-可重入鎖

    什么是可重入鎖? 當(dāng)線程獲取某個(gè)鎖后,還可以繼續(xù)獲取它,可以遞歸調(diào)用,而不會(huì)發(fā)生死鎖; 可重入鎖案例 程序可重入加鎖 A.class,沒有發(fā)生死鎖。 sychronized鎖 運(yùn)行結(jié)果 ReentrantLock 運(yùn)行結(jié)果 如何保證可重入 當(dāng)一個(gè)線程訪問同步塊并獲取鎖時(shí),會(huì)在對(duì)象頭和棧幀中的鎖記錄里

    2024年02月22日
    瀏覽(26)
  • java~理解可重入鎖

    在Java中,可重入鎖(Reentrant Lock)是一種同步機(jī)制,允許線程在持有鎖的情況下再次獲取該鎖,而不會(huì)被自己所持有的鎖所阻塞。也就是說,一個(gè)線程可以多次獲得同一個(gè)鎖,而不會(huì)出現(xiàn)死鎖的情況。 可重入鎖在多線程編程中非常有用,它允許線程在訪問共享資源時(shí)多次獲取

    2024年02月09日
    瀏覽(25)
  • 并發(fā)編程之可重入鎖ReentrantLock

    并發(fā)編程之可重入鎖ReentrantLock

    大家都知道在并發(fā)編程中一般會(huì)用到多線程技術(shù),多線程技術(shù)可以大大增加系統(tǒng)QPS/TPS。但是在一些特殊的業(yè)務(wù)場(chǎng)景下我們需要限制線程的并發(fā)數(shù)目,比如秒殺系統(tǒng)、多種商品金額疊加運(yùn)算等等都是需要限制線程數(shù)量。特別是在分布式微服務(wù)架構(gòu),多線程同步問題尤為明顯。一

    2023年04月25日
    瀏覽(43)
  • 【Java | 多線程】可重入鎖的概念以及示例

    可重入鎖(又名遞歸鎖)是一種特殊類型的鎖,它允許 同一個(gè)線程在獲取鎖后再次進(jìn)入該鎖保護(hù)的代碼塊或方法,而不需要重新獲取鎖 。 說白了,可重入鎖的特點(diǎn)就是同一個(gè)線程可以多次獲取同一個(gè)鎖,而不會(huì)因?yàn)橹耙呀?jīng)獲取過鎖而阻塞。 可重入鎖的一個(gè)優(yōu)點(diǎn)是可以一定

    2024年04月24日
    瀏覽(20)
  • 解讀分布式鎖(redis實(shí)現(xiàn)方案)

    解讀分布式鎖(redis實(shí)現(xiàn)方案)

    分布式鎖是一種用于分布式系統(tǒng)中的并發(fā)控制機(jī)制,它用于確保在多個(gè)節(jié)點(diǎn)或多個(gè)進(jìn)程之間的并發(fā)操作中,某些關(guān)鍵資源或代碼塊只能被一個(gè)節(jié)點(diǎn)或進(jìn)程同時(shí)訪問。分布式鎖的目的是避免多個(gè)節(jié)點(diǎn)同時(shí)修改共享資源而導(dǎo)致的數(shù)據(jù)不一致或沖突的問題。通俗的來說,分布式鎖的

    2024年02月15日
    瀏覽(38)
  • 【Java】三種方案實(shí)現(xiàn) Redis 分布式鎖

    【Java】三種方案實(shí)現(xiàn) Redis 分布式鎖

    setnx、Redisson、RedLock 都可以實(shí)現(xiàn)分布式鎖,從易到難得排序?yàn)椋簊etnx Redisson RedLock。一般情況下,直接使用 Redisson 就可以啦,有很多邏輯框架的作者都已經(jīng)考慮到了。 1.1、簡單實(shí)現(xiàn) 下面的鎖實(shí)現(xiàn)可以用在測(cè)試或者簡單場(chǎng)景,但是它存在以下問題,使其不適合用在正式環(huán)境。

    2024年02月05日
    瀏覽(25)
  • Zookeeper 和 Redis 哪種更好? 為什么使用分布式鎖? 1. 利用 Redis 提供的 第二種,基于 ZK 實(shí)現(xiàn)分布式鎖的落地方案 對(duì)于 redis 的分布式鎖而言,它有以下缺點(diǎn):

    關(guān)于這個(gè)問題,我們 可以從 3 個(gè)方面來說: 為什么使用分布式鎖? 使用分布式鎖的目的,是為了保證同一時(shí)間只有一個(gè) JVM 進(jìn)程可以對(duì)共享資源進(jìn)行操作。 根據(jù)鎖的用途可以細(xì)分為以下兩類: 允許多個(gè)客戶端操作共享資源,我們稱為共享鎖 這種鎖的一般是對(duì)共享資源具有

    2024年01月16日
    瀏覽(24)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包