Redis分布式可重入鎖實(shí)現(xiàn)方案
前言
在單進(jìn)程環(huán)境下,要保證一個(gè)代碼塊的同步執(zhí)行,直接用synchronized
關(guān)鍵字或ReetrantLock
即可。在分布式環(huán)境下,要保證多個(gè)節(jié)點(diǎn)的線程對(duì)代碼塊的同步訪問,就必須要用到分布式鎖方案。
分布式鎖實(shí)現(xiàn)方案有很多,有基于關(guān)系型數(shù)據(jù)庫行鎖實(shí)現(xiàn)的;有基于ZooKeeper臨時(shí)順序節(jié)點(diǎn)實(shí)現(xiàn)的;還有基于 Redis setnx 命令實(shí)現(xiàn)的。本文介紹一下基于 Redis 實(shí)現(xiàn)的分布式鎖方案。
理解分布式鎖
實(shí)現(xiàn)分布式鎖有幾個(gè)要求
- 互斥性:任意時(shí)刻,最多只會(huì)有一個(gè)客戶端線程可以獲得鎖
- 可重入:同一客戶端的同一線程,獲得鎖后能夠再次獲得鎖
- 避免死鎖:客戶端獲得鎖后即使宕機(jī),后續(xù)客戶端也可以獲得鎖
- 避免誤解鎖:客戶端A加的鎖只能由A自己釋放
- 釋放鎖通知:持有鎖的客戶端釋放鎖后,最好可以通知其它客戶端繼續(xù)搶鎖
- 高性能和高可用
Redis 服務(wù)端命令是單線程串行執(zhí)行的,天生就是原子的,并且支持執(zhí)行自定義的 lua 腳本,功能上更加強(qiáng)大。
關(guān)于互斥性,我們可以用 setnx 命令實(shí)現(xiàn),Redis 可以保證只會(huì)有一個(gè)客戶端 set 成功。但是由于我們要實(shí)現(xiàn)的是一個(gè)分布式的可重入鎖,數(shù)據(jù)結(jié)構(gòu)得用 hash,用客戶端ID+線程ID作為 field,value 記作鎖的重入次數(shù)即可。
關(guān)于死鎖,代碼里建議把鎖的釋放寫在 finally 里面確保一定執(zhí)行,針對(duì)客戶端搶到鎖后宕機(jī)的場(chǎng)景,可以給 redis key 設(shè)置一個(gè)超時(shí)時(shí)間來解決。
關(guān)于誤解鎖,客戶端在釋放鎖時(shí),必須判斷 field 是否當(dāng)前客戶端ID以及線程ID一致,不一致就不執(zhí)行刪除,這里需要用到 lua 腳本判斷。
關(guān)于釋放鎖通知,可以利用 Redis 發(fā)布訂閱模式,給每個(gè)鎖創(chuàng)建一個(gè)頻道,釋放鎖的客戶端負(fù)責(zé)往頻道里發(fā)送消息通知等待搶鎖的客戶端。
最后關(guān)于高性能和高可用,因?yàn)?Redis 是基于內(nèi)存的,天生就是高性能的。但是 Redis 服務(wù)本身一旦出現(xiàn)問題,分布式鎖也就不可用了,此時(shí)可以多部署幾臺(tái)獨(dú)立的示例,使用 RedLock 算法來解決高可用的問題。
設(shè)計(jì)實(shí)現(xiàn)
首先我們定義一個(gè) RedisLock 鎖對(duì)象的抽象接口,只有嘗試加鎖和釋放鎖方法
public interface RedisLock { boolean tryLock(); boolean tryLock(long waitTime, long leaseTime, TimeUnit unit); void unlock(); }
然后提供一個(gè)默認(rèn)實(shí)現(xiàn) DefaultRedisLock
public class DefaultRedisLock implements RedisLock { // 客戶端ID UUID private final String clientId; private final StringRedisTemplate redisTemplate; // 鎖頻道訂閱器 接收釋放鎖通知 private final LockSubscriber lockSubscriber; // 加鎖的key private final String lockKey; }
關(guān)于tryLock()
,首先執(zhí)行l(wèi)ua腳本嘗試獲取鎖,如果加鎖失敗則返回其它客戶端持有鎖的過期時(shí)間,客戶端訂閱鎖對(duì)應(yīng)的頻道,然后sleep,直到收到鎖釋放的通知再繼續(xù)搶鎖。最終不管有沒有搶到鎖,都會(huì)在 finally 取消頻道訂閱。
@Override public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) { final long timeout = System.currentTimeMillis() + unit.toMillis(waitTime); final long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(leaseTime, unit, threadId); if (ttl == null) { return true; } if (System.currentTimeMillis() >= timeout) { return false; } final Semaphore semaphore = lockSubscriber.subscribe(getChannel(lockKey), threadId); try { while (true) { if (System.currentTimeMillis() >= timeout) { return false; } ttl = tryAcquire(leaseTime, unit, threadId); if (ttl == null) { return true; } if (System.currentTimeMillis() >= timeout) { return false; } semaphore.tryAcquire(timeout - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } } catch (Exception e) { e.printStackTrace(); } finally { lockSubscriber.unsubscribe(getChannel(lockKey), threadId); } return false; }
tryAcquire()
就是執(zhí)行l(wèi)ua腳本來加鎖,解釋一下這段腳本的邏輯:首先判斷 lockKey 是否存在,不存在則直接設(shè)置 lockKey并且設(shè)置過期時(shí)間,返回空,表示加鎖成功。存在則判斷 field 是否和當(dāng)前客戶端ID+線程ID一致,一致則代表鎖重入,遞增一下value即可,不一致代表加鎖失敗,返回鎖的過期時(shí)間
private Long tryAcquire(long leaseTime, TimeUnit timeUnit, long threadId) { return redisTemplate.execute(RedisScript.of( "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; end;" + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; end;" + "return redis.call('pttl', KEYS[1]);", Long.class), Collections.singletonList(lockKey), String.valueOf(timeUnit.toMillis(leaseTime)), getLockName(threadId)); }
lockName是由客戶端ID和線程ID組成的:
private String getLockName(long threadId) { return clientId + ":" + threadId; }
如果加鎖失敗,客戶端會(huì)嘗試訂閱對(duì)應(yīng)的頻道,名稱規(guī)則是:
private String getChannel(String lockKey) { return "__lock_channel__:" + lockKey; }
訂閱方法是LockSubscriber#subscribe
,同一個(gè)頻道無需訂閱多個(gè)監(jiān)聽器,所以用一個(gè) Map 記錄。訂閱成功以后,會(huì)返回當(dāng)前線程對(duì)應(yīng)的一個(gè) Semaphore 對(duì)象,默認(rèn)許可數(shù)是0,當(dāng)前線程會(huì)調(diào)用Semaphore#tryAcquire
等待許可數(shù),監(jiān)聽器在收到鎖釋放消息后會(huì)給 Semaphore 對(duì)象增加許可數(shù),喚醒線程繼續(xù)搶鎖。
@Component public class LockSubscriber { @Autowired private RedisMessageListenerContainer messageListenerContainer; private final Map<String, Map<Long, Semaphore>> channelSemaphores = new HashMap<>(); private final Map<String, MessageListener> listeners = new HashMap<>(); private final StringRedisSerializer serializer = new StringRedisSerializer(); public synchronized Semaphore subscribe(String channelName, long threadId) { MessageListener old = listeners.put(channelName, new MessageListener() { @Override public void onMessage(Message message, byte[] pattern) { String channel = serializer.deserialize(message.getChannel()); String ignore = serializer.deserialize(message.getBody()); Map<Long, Semaphore> semaphoreMap = channelSemaphores.get(channel); if (semaphoreMap != null && !semaphoreMap.isEmpty()) { semaphoreMap.values().stream().findFirst().ifPresent(Semaphore::release); } } }); if (old == null) { messageListenerContainer.addMessageListener(listeners.get(channelName), new ChannelTopic(channelName)); } Semaphore semaphore = new Semaphore(0); Map<Long, Semaphore> semaphoreMap = channelSemaphores.getOrDefault(channelName, new HashMap<>()); semaphoreMap.put(threadId, semaphore); channelSemaphores.put(channelName, semaphoreMap); return semaphore; } public synchronized void unsubscribe(String channelName, long threadId) { Map<Long, Semaphore> semaphoreMap = channelSemaphores.get(channelName); if (semaphoreMap != null) { semaphoreMap.remove(threadId); if (semaphoreMap.isEmpty()) { MessageListener listener = listeners.remove(channelName); if (listener != null) { messageListenerContainer.removeMessageListener(listener); } } } } }
對(duì)于 unlock,就只是一段 lua 腳本,這里解釋一下:判斷當(dāng)前客戶端ID+線程ID 這個(gè) field 是否存在,存在說明是自己加的鎖,可以釋放。不存在說明不是自己加的鎖,無需做任何處理。因?yàn)槭强芍厝腈i,每次 unlock 都只是遞減一下 value,只有當(dāng) value 等于0時(shí)才是真正的釋放鎖。釋放鎖的時(shí)候會(huì) del lockKey,再 publish 發(fā)送鎖釋放通知,讓其他客戶端可以繼續(xù)搶鎖。
@Override public void unlock() { long threadId = Thread.currentThread().getId(); redisTemplate.execute(RedisScript.of( "if (redis.call('hexists', KEYS[1], ARGV[1]) == 0) then " + "return nil;end;" + "local counter = redis.call('hincrby', KEYS[1], ARGV[1], -1); " + "if (counter > 0) then " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], 1); " + "return 1; " + "end; " + "return nil;", Long.class), Arrays.asList(lockKey, getChannel(lockKey)), getLockName(threadId)); }
最后,我們需要一個(gè) RedisLockFactory 來創(chuàng)建鎖對(duì)象,它同時(shí)會(huì)生成客戶端ID
@Component public class RedisLockFactory { private static final String CLIENT_ID = UUID.randomUUID().toString(); @Autowired private StringRedisTemplate redisTemplate; @Autowired private LockSubscriber lockSubscriber; public RedisLock getLock(String lockKey) { return new DefaultRedisLock(CLIENT_ID, redisTemplate, lockSubscriber, lockKey); } }
至此,一個(gè)基于 Redis 實(shí)現(xiàn)的分布式可重入鎖就完成了。
尾巴
目前這個(gè)版本的分布式鎖,保證了互斥性、可重入、避免死鎖和誤解鎖、實(shí)現(xiàn)了釋放鎖通知,但是并沒有高可用的保證。如果 Redis 是單實(shí)例部署,就會(huì)存在單點(diǎn)問題,Redis 一旦故障,整個(gè)分布式鎖將不可用。如果 Redis 是主從集群模式部署,雖然有主從自動(dòng)切換,但是 Master 和 Slave 之間的數(shù)據(jù)同步是存在延遲的,分布式鎖可能會(huì)出現(xiàn)問題。比如:客戶端A加鎖成功,lockKey 寫入了 Master,此時(shí) Master 宕機(jī),其它 Slave 升級(jí)成了 Master,但是還沒有同步到 lockKey,客戶端B來加鎖也會(huì)成功,這就沒有保證互斥性。針對(duì)這個(gè)問題,可以參考 RedLock 算法,部署多個(gè)單獨(dú)的 Redis 示例,只要一半以上的Redis節(jié)點(diǎn)加鎖成功就算成功,來盡可能的保證服務(wù)高可用。
到此這篇關(guān)于Redis分布式可重入鎖實(shí)現(xiàn)方案的文章就介紹到這了,更多相關(guān)Redis重入鎖內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
解讀Redis秒殺優(yōu)化方案(阻塞隊(duì)列+基于Stream流的消息隊(duì)列)
該文章介紹了使用Redis的阻塞隊(duì)列和Stream流的消息隊(duì)列來優(yōu)化秒殺系統(tǒng)的方案,通過將秒殺流程拆分為兩條流水線,使用Redis緩存緩解數(shù)據(jù)庫壓力,并結(jié)合Lua腳本進(jìn)行原子性判斷,使用阻塞隊(duì)列和消息隊(duì)列異步處理訂單,有效提高了系統(tǒng)的并發(fā)處理能力和可用性2025-02-02詳解RedisTemplate下Redis分布式鎖引發(fā)的系列問題
這篇文章主要介紹了詳解RedisTemplate下Redis分布式鎖引發(fā)的系列問題,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-03-03Redis數(shù)據(jù)結(jié)構(gòu)之鏈表與字典的使用
這篇文章主要介紹了Redis數(shù)據(jù)結(jié)構(gòu)之鏈表與字典的使用,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-05-05Redis持久化方式之RDB和AOF的原理及優(yōu)缺點(diǎn)
在Redis中,數(shù)據(jù)可以分為兩類,即內(nèi)存數(shù)據(jù)和磁盤數(shù)據(jù),Redis?提供了兩種不同的持久化方式,其中?RDB?是快照備份機(jī)制,AOF?則是追加寫操作機(jī)制,本文將詳細(xì)給大家介紹Redis?持久化方式RDB和AOF的原理及優(yōu)缺點(diǎn),感興趣的同學(xué)可以跟著小編一起來學(xué)習(xí)2023-06-06基于Redis實(shí)現(xiàn)分布式鎖以及任務(wù)隊(duì)列
這篇文章主要介紹了基于Redis實(shí)現(xiàn)分布式鎖以及任務(wù)隊(duì)列,需要的朋友可以參考下2015-11-11