欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Redis分布式可重入鎖實(shí)現(xiàn)方案

 更新時(shí)間:2024年02月19日 09:40:03   作者:程序員小潘  
在單進(jìn)程環(huán)境下,要保證一個(gè)代碼塊的同步執(zhí)行,直接用synchronized 關(guān)鍵字或ReetrantLock 即可,在分布式環(huán)境下,要保證多個(gè)節(jié)點(diǎn)的線程對(duì)代碼塊的同步訪問,就必須要用到分布式鎖方案,本文介紹一下基于 Redis實(shí)現(xiàn)的分布式鎖方案,感興趣的朋友一起看看吧

前言

在單進(jìn)程環(huán)境下,要保證一個(gè)代碼塊的同步執(zhí)行,直接用synchronized 關(guān)鍵字或ReetrantLock 即可。在分布式環(huán)境下,要保證多個(gè)節(jié)點(diǎn)的線程對(duì)代碼塊的同步訪問,就必須要用到分布式鎖方案。
分布式鎖實(shí)現(xiàn)方案有很多,有基于關(guān)系型數(shù)據(jù)庫行鎖實(shí)現(xiàn)的;有基于ZooKeeper臨時(shí)順序節(jié)點(diǎn)實(shí)現(xiàn)的;還有基于 Redis setnx 命令實(shí)現(xiàn)的。本文介紹一下基于 Redis 實(shí)現(xiàn)的分布式鎖方案。

理解分布式鎖

實(shí)現(xiàn)分布式鎖有幾個(gè)要求

  • 互斥性:任意時(shí)刻,最多只會(huì)有一個(gè)客戶端線程可以獲得鎖
  • 可重入:同一客戶端的同一線程,獲得鎖后能夠再次獲得鎖
  • 避免死鎖:客戶端獲得鎖后即使宕機(jī),后續(xù)客戶端也可以獲得鎖
  • 避免誤解鎖:客戶端A加的鎖只能由A自己釋放
  • 釋放鎖通知:持有鎖的客戶端釋放鎖后,最好可以通知其它客戶端繼續(xù)搶鎖
  • 高性能和高可用

Redis 服務(wù)端命令是單線程串行執(zhí)行的,天生就是原子的,并且支持執(zhí)行自定義的 lua 腳本,功能上更加強(qiáng)大。
關(guān)于互斥性,我們可以用 setnx 命令實(shí)現(xiàn),Redis 可以保證只會(huì)有一個(gè)客戶端 set 成功。但是由于我們要實(shí)現(xiàn)的是一個(gè)分布式的可重入鎖,數(shù)據(jù)結(jié)構(gòu)得用 hash,用客戶端ID+線程ID作為 field,value 記作鎖的重入次數(shù)即可。
關(guān)于死鎖,代碼里建議把鎖的釋放寫在 finally 里面確保一定執(zhí)行,針對(duì)客戶端搶到鎖后宕機(jī)的場(chǎng)景,可以給 redis key 設(shè)置一個(gè)超時(shí)時(shí)間來解決。
關(guān)于誤解鎖,客戶端在釋放鎖時(shí),必須判斷 field 是否當(dāng)前客戶端ID以及線程ID一致,不一致就不執(zhí)行刪除,這里需要用到 lua 腳本判斷。
關(guān)于釋放鎖通知,可以利用 Redis 發(fā)布訂閱模式,給每個(gè)鎖創(chuàng)建一個(gè)頻道,釋放鎖的客戶端負(fù)責(zé)往頻道里發(fā)送消息通知等待搶鎖的客戶端。
最后關(guān)于高性能和高可用,因?yàn)?Redis 是基于內(nèi)存的,天生就是高性能的。但是 Redis 服務(wù)本身一旦出現(xiàn)問題,分布式鎖也就不可用了,此時(shí)可以多部署幾臺(tái)獨(dú)立的示例,使用 RedLock 算法來解決高可用的問題。

設(shè)計(jì)實(shí)現(xiàn)

首先我們定義一個(gè) RedisLock 鎖對(duì)象的抽象接口,只有嘗試加鎖和釋放鎖方法

public interface RedisLock {
    boolean tryLock();
    boolean tryLock(long waitTime, long leaseTime, TimeUnit unit);
    void unlock();
}

然后提供一個(gè)默認(rèn)實(shí)現(xiàn) DefaultRedisLock

public class DefaultRedisLock implements RedisLock {
    // 客戶端ID UUID
    private final String clientId;
    private final StringRedisTemplate redisTemplate;
    // 鎖頻道訂閱器 接收釋放鎖通知
    private final LockSubscriber lockSubscriber;
    // 加鎖的key
    private final String lockKey;
}

關(guān)于tryLock() ,首先執(zhí)行l(wèi)ua腳本嘗試獲取鎖,如果加鎖失敗則返回其它客戶端持有鎖的過期時(shí)間,客戶端訂閱鎖對(duì)應(yīng)的頻道,然后sleep,直到收到鎖釋放的通知再繼續(xù)搶鎖。最終不管有沒有搶到鎖,都會(huì)在 finally 取消頻道訂閱。

@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) {
    final long timeout = System.currentTimeMillis() + unit.toMillis(waitTime);
    final long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    if (ttl == null) {
        return true;
    }
    if (System.currentTimeMillis() >= timeout) {
        return false;
    }
    final Semaphore semaphore = lockSubscriber.subscribe(getChannel(lockKey), threadId);
    try {
        while (true) {
            if (System.currentTimeMillis() >= timeout) {
                return false;
            }
            ttl = tryAcquire(leaseTime, unit, threadId);
            if (ttl == null) {
                return true;
            }
            if (System.currentTimeMillis() >= timeout) {
                return false;
            }
            semaphore.tryAcquire(timeout - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        lockSubscriber.unsubscribe(getChannel(lockKey), threadId);
    }
    return false;
}

tryAcquire() 就是執(zhí)行l(wèi)ua腳本來加鎖,解釋一下這段腳本的邏輯:首先判斷 lockKey 是否存在,不存在則直接設(shè)置 lockKey并且設(shè)置過期時(shí)間,返回空,表示加鎖成功。存在則判斷 field 是否和當(dāng)前客戶端ID+線程ID一致,一致則代表鎖重入,遞增一下value即可,不一致代表加鎖失敗,返回鎖的過期時(shí)間

private Long tryAcquire(long leaseTime, TimeUnit timeUnit, long threadId) {
    return redisTemplate.execute(RedisScript.of(
                    "if (redis.call('exists', KEYS[1]) == 0) then " +
                            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return nil; end;" +
                            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return nil; end;" +
                            "return redis.call('pttl', KEYS[1]);", Long.class), Collections.singletonList(lockKey),
            String.valueOf(timeUnit.toMillis(leaseTime)), getLockName(threadId));
}

lockName是由客戶端ID和線程ID組成的:

private String getLockName(long threadId) {
    return clientId + ":" + threadId;
}

如果加鎖失敗,客戶端會(huì)嘗試訂閱對(duì)應(yīng)的頻道,名稱規(guī)則是:

private String getChannel(String lockKey) {
    return "__lock_channel__:" + lockKey;
}

訂閱方法是LockSubscriber#subscribe ,同一個(gè)頻道無需訂閱多個(gè)監(jiān)聽器,所以用一個(gè) Map 記錄。訂閱成功以后,會(huì)返回當(dāng)前線程對(duì)應(yīng)的一個(gè) Semaphore 對(duì)象,默認(rèn)許可數(shù)是0,當(dāng)前線程會(huì)調(diào)用Semaphore#tryAcquire 等待許可數(shù),監(jiān)聽器在收到鎖釋放消息后會(huì)給 Semaphore 對(duì)象增加許可數(shù),喚醒線程繼續(xù)搶鎖。

@Component
public class LockSubscriber {
    @Autowired
    private RedisMessageListenerContainer messageListenerContainer;
    private final Map<String, Map<Long, Semaphore>> channelSemaphores = new HashMap<>();
    private final Map<String, MessageListener> listeners = new HashMap<>();
    private final StringRedisSerializer serializer = new StringRedisSerializer();
    public synchronized Semaphore subscribe(String channelName, long threadId) {
        MessageListener old = listeners.put(channelName, new MessageListener() {
            @Override
            public void onMessage(Message message, byte[] pattern) {
                String channel = serializer.deserialize(message.getChannel());
                String ignore = serializer.deserialize(message.getBody());
                Map<Long, Semaphore> semaphoreMap = channelSemaphores.get(channel);
                if (semaphoreMap != null && !semaphoreMap.isEmpty()) {
                    semaphoreMap.values().stream().findFirst().ifPresent(Semaphore::release);
                }
            }
        });
        if (old == null) {
            messageListenerContainer.addMessageListener(listeners.get(channelName), new ChannelTopic(channelName));
        }
        Semaphore semaphore = new Semaphore(0);
        Map<Long, Semaphore> semaphoreMap = channelSemaphores.getOrDefault(channelName, new HashMap<>());
        semaphoreMap.put(threadId, semaphore);
        channelSemaphores.put(channelName, semaphoreMap);
        return semaphore;
    }
    public synchronized void unsubscribe(String channelName, long threadId) {
        Map<Long, Semaphore> semaphoreMap = channelSemaphores.get(channelName);
        if (semaphoreMap != null) {
            semaphoreMap.remove(threadId);
            if (semaphoreMap.isEmpty()) {
                MessageListener listener = listeners.remove(channelName);
                if (listener != null) {
                    messageListenerContainer.removeMessageListener(listener);
                }
            }
        }
    }
}

對(duì)于 unlock,就只是一段 lua 腳本,這里解釋一下:判斷當(dāng)前客戶端ID+線程ID 這個(gè) field 是否存在,存在說明是自己加的鎖,可以釋放。不存在說明不是自己加的鎖,無需做任何處理。因?yàn)槭强芍厝腈i,每次 unlock 都只是遞減一下 value,只有當(dāng) value 等于0時(shí)才是真正的釋放鎖。釋放鎖的時(shí)候會(huì) del lockKey,再 publish 發(fā)送鎖釋放通知,讓其他客戶端可以繼續(xù)搶鎖。

@Override
public void unlock() {
    long threadId = Thread.currentThread().getId();
    redisTemplate.execute(RedisScript.of(
                    "if (redis.call('hexists', KEYS[1], ARGV[1]) == 0) then " +
                            "return nil;end;" +
                            "local counter = redis.call('hincrby', KEYS[1], ARGV[1], -1); " +
                            "if (counter > 0) then " +
                            "return 0; " +
                            "else " +
                            "redis.call('del', KEYS[1]); " +
                            "redis.call('publish', KEYS[2], 1); " +
                            "return 1; " +
                            "end; " +
                            "return nil;", Long.class), Arrays.asList(lockKey, getChannel(lockKey)),
            getLockName(threadId));
}

最后,我們需要一個(gè) RedisLockFactory 來創(chuàng)建鎖對(duì)象,它同時(shí)會(huì)生成客戶端ID

@Component
public class RedisLockFactory {
    private static final String CLIENT_ID = UUID.randomUUID().toString();
    @Autowired
    private StringRedisTemplate redisTemplate;
    @Autowired
    private LockSubscriber lockSubscriber;
    public RedisLock getLock(String lockKey) {
        return new DefaultRedisLock(CLIENT_ID, redisTemplate, lockSubscriber, lockKey);
    }
}

至此,一個(gè)基于 Redis 實(shí)現(xiàn)的分布式可重入鎖就完成了。

尾巴

目前這個(gè)版本的分布式鎖,保證了互斥性、可重入、避免死鎖和誤解鎖、實(shí)現(xiàn)了釋放鎖通知,但是并沒有高可用的保證。如果 Redis 是單實(shí)例部署,就會(huì)存在單點(diǎn)問題,Redis 一旦故障,整個(gè)分布式鎖將不可用。如果 Redis 是主從集群模式部署,雖然有主從自動(dòng)切換,但是 Master 和 Slave 之間的數(shù)據(jù)同步是存在延遲的,分布式鎖可能會(huì)出現(xiàn)問題。比如:客戶端A加鎖成功,lockKey 寫入了 Master,此時(shí) Master 宕機(jī),其它 Slave 升級(jí)成了 Master,但是還沒有同步到 lockKey,客戶端B來加鎖也會(huì)成功,這就沒有保證互斥性。針對(duì)這個(gè)問題,可以參考 RedLock 算法,部署多個(gè)單獨(dú)的 Redis 示例,只要一半以上的Redis節(jié)點(diǎn)加鎖成功就算成功,來盡可能的保證服務(wù)高可用。

到此這篇關(guān)于Redis分布式可重入鎖實(shí)現(xiàn)方案的文章就介紹到這了,更多相關(guān)Redis重入鎖內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 解讀Redis秒殺優(yōu)化方案(阻塞隊(duì)列+基于Stream流的消息隊(duì)列)

    解讀Redis秒殺優(yōu)化方案(阻塞隊(duì)列+基于Stream流的消息隊(duì)列)

    該文章介紹了使用Redis的阻塞隊(duì)列和Stream流的消息隊(duì)列來優(yōu)化秒殺系統(tǒng)的方案,通過將秒殺流程拆分為兩條流水線,使用Redis緩存緩解數(shù)據(jù)庫壓力,并結(jié)合Lua腳本進(jìn)行原子性判斷,使用阻塞隊(duì)列和消息隊(duì)列異步處理訂單,有效提高了系統(tǒng)的并發(fā)處理能力和可用性
    2025-02-02
  • Redis連接池配置方式

    Redis連接池配置方式

    文章介紹了Redis連接池的配置方法,包括與數(shù)據(jù)庫連接時(shí)引入連接池的必要性、Java中使用Redis連接池的示例、jar包準(zhǔn)備、編寫配置代碼以及連接池參數(shù)的設(shè)置
    2024-12-12
  • 詳解RedisTemplate下Redis分布式鎖引發(fā)的系列問題

    詳解RedisTemplate下Redis分布式鎖引發(fā)的系列問題

    這篇文章主要介紹了詳解RedisTemplate下Redis分布式鎖引發(fā)的系列問題,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-03-03
  • Redis 過期鍵刪除策略的實(shí)現(xiàn)示例

    Redis 過期鍵刪除策略的實(shí)現(xiàn)示例

    Redis的過期數(shù)據(jù)刪除策略主要有三種,包括定時(shí)刪除、惰性刪除和定期刪除,本文主要介紹了Redis 過期鍵刪除策略的實(shí)現(xiàn)示例,具有一定的參考價(jià)值,感興趣的可以了解一下
    2024-03-03
  • Redis數(shù)據(jù)結(jié)構(gòu)之鏈表與字典的使用

    Redis數(shù)據(jù)結(jié)構(gòu)之鏈表與字典的使用

    這篇文章主要介紹了Redis數(shù)據(jù)結(jié)構(gòu)之鏈表與字典的使用,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-05-05
  • Redis中LFU算法的深入分析

    Redis中LFU算法的深入分析

    這篇文章主要給大家介紹了關(guān)于Redis中LFU算法的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用Redis具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-06-06
  • Redis持久化方式之RDB和AOF的原理及優(yōu)缺點(diǎn)

    Redis持久化方式之RDB和AOF的原理及優(yōu)缺點(diǎn)

    在Redis中,數(shù)據(jù)可以分為兩類,即內(nèi)存數(shù)據(jù)和磁盤數(shù)據(jù),Redis?提供了兩種不同的持久化方式,其中?RDB?是快照備份機(jī)制,AOF?則是追加寫操作機(jī)制,本文將詳細(xì)給大家介紹Redis?持久化方式RDB和AOF的原理及優(yōu)缺點(diǎn),感興趣的同學(xué)可以跟著小編一起來學(xué)習(xí)
    2023-06-06
  • Redis?如何清空所有數(shù)據(jù)

    Redis?如何清空所有數(shù)據(jù)

    這篇文章主要介紹了Redis?如何清空所有數(shù)據(jù),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-08-08
  • 基于Redis實(shí)現(xiàn)分布式鎖以及任務(wù)隊(duì)列

    基于Redis實(shí)現(xiàn)分布式鎖以及任務(wù)隊(duì)列

    這篇文章主要介紹了基于Redis實(shí)現(xiàn)分布式鎖以及任務(wù)隊(duì)列,需要的朋友可以參考下
    2015-11-11
  • Redis哨兵模式介紹

    Redis哨兵模式介紹

    這篇文章介紹了Redis哨兵模式,對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2022-02-02

最新評(píng)論