Redisson分布式鎖的源碼解讀分享
前言
Redisson是一個在Redis的基礎(chǔ)上實現(xiàn)的Java駐內(nèi)存數(shù)據(jù)網(wǎng)格(In-Memory Data Grid)。Redisson有一樣功能是可重入的分布式鎖。本文來討論一下這個功能的特點以及源碼分析。
前置知識
在講Redisson,咱們先來聊聊分布式鎖的特點以及Redis的發(fā)布/訂閱機制,磨刀不誤砍柴工。
分布式鎖的思考
首先思考下,如果我們自己去實現(xiàn)一個分布式鎖,這個鎖需要具備哪些功能?
- 互斥(這是一個鎖最基本的功能)
- 鎖失效機制(也就是可以設(shè)置鎖定時長,防止死鎖)
- 高性能、高可用
- 阻塞、非阻塞
- 可重入、公平鎖
- 。。。
可見,實現(xiàn)一個分布式鎖,需要考慮的東西有很多。那么,如果用Redis來實現(xiàn)分布式鎖呢?如果只需要具備上面說的1、2點功能,要怎么寫?(ps:我就不寫了,自己想去)
Redis訂閱/發(fā)布機制
Redisson中用到了Redis的訂閱/發(fā)布機制,下面簡單介紹下。
簡單來說就是如果client2 、 client5 和 client1 訂閱了 channel1,當(dāng)有消息發(fā)布到 channel1 的時候,client2 、 client5 和 client1 都會收到這個消息。
圖片來自 菜鳥教程-Redis發(fā)布訂閱
Redisson
源碼版本:3.17.7
下面以Redisson官方的可重入同步鎖例子為入口,解讀下源碼。
RLock lock = redisson.getLock("anyLock"); // 嘗試加鎖,最多等待100秒,上鎖以后10秒自動解鎖 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS); if (res) { try { ... } finally { lock.unlock(); } }
加鎖
我用時序圖來表示加鎖和訂閱的過程。時序圖中括號后面的c1、c2代表client1,client2
當(dāng)線程2獲取了鎖但還沒釋放鎖時,如果線程1去獲取鎖,會阻塞等待,直到線程2解鎖,通過Redis的發(fā)布訂閱機制喚醒線程1,再次去獲取鎖。
加鎖方法是 lock.tryLock(100, 10, TimeUnit.SECONDS),對應(yīng)著就是RedissonLock#tryLock
/** * 獲取鎖 * @param waitTime 嘗試獲取鎖的最大等待時間,超過這個值,則認(rèn)為獲取鎖失敗 * @param leaseTime 鎖的持有時間,超過這個時間鎖會自動失效(值應(yīng)設(shè)置為大于業(yè)務(wù)處理的時間,確保在鎖有效期內(nèi)業(yè)務(wù)能處理完) * @param unit 時間單位 * @return 獲取鎖成功返回true,失敗返回false */ @Override public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis();// 當(dāng)前時間 long threadId = Thread.currentThread().getId();// 當(dāng)前線程id // 嘗試加鎖,加鎖成功返回null,失敗返回鎖的剩余超時時間 Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // 獲取鎖成功 if (ttl == null) { return true; } // time小于0代表此時已經(jīng)超過獲取鎖的等待時間,直接返回false time -= System.currentTimeMillis() - current; if (time <= 0) { // 沒看懂這個方法,里面里面空運行,有知道的大神還請不吝賜教 acquireFailed(waitTime, unit, threadId); return false; } current = System.currentTimeMillis(); CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); try { subscribeFuture.get(time, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { if (!subscribeFuture.cancel(false)) { subscribeFuture.whenComplete((res, ex) -> { // 出現(xiàn)異常,取消訂閱 if (ex == null) { unsubscribe(res, threadId); } }); } acquireFailed(waitTime, unit, threadId); return false; } catch (ExecutionException e) { acquireFailed(waitTime, unit, threadId); return false; } try { // 判斷是否超時(超過了waitTime) time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } while (true) { // 再次獲取鎖,成功則返回 long currentTime = System.currentTimeMillis(); ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } // 阻塞等待信號量喚醒或者超時,接收到訂閱時喚醒 // 使用的是Semaphore#tryAcquire() currentTime = System.currentTimeMillis(); if (ttl >= 0 && ttl < time) { commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } } } finally { // 因為是同步操作,所以無論加鎖成功或失敗,都取消訂閱 unsubscribe(commandExecutor.getNow(subscribeFuture), threadId); } // return get(tryLockAsync(waitTime, leaseTime, unit)); }
先看一下整體邏輯:
1.嘗試加鎖,成功直接返回true
2.判斷超時
3.訂閱
4.判斷超時
5.循環(huán) ( 嘗試獲取鎖 → 判斷超時 → 阻塞等待 )
tryLock方法看著很長,但是有很多代碼都是重復(fù)的,本小節(jié)重點說一下嘗試加鎖的方法tryAcquire
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId)); } private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture<Long> ttlRemainingFuture; if (leaseTime > 0) { // 調(diào)用lua腳本,嘗試加鎖 ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { // 這里的if、else的區(qū)別就在于,如果沒有設(shè)置leaseTime,就使用默認(rèn)的internalLockLeaseTime(默認(rèn)30秒) ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> { // lock acquired // 如果ttlRemaining為空,也就是tryLockInnerAsync方法中的lua執(zhí)行結(jié)果返回空,證明獲取鎖成功 if (ttlRemaining == null) { if (leaseTime > 0) { internalLockLeaseTime = unit.toMillis(leaseTime); } else { // 如果沒有設(shè)置鎖的持有時間(leaseTime),則啟動看門狗,定時給鎖續(xù)期,防止業(yè)務(wù)邏輯未執(zhí)行完成鎖就過期了 scheduleExpirationRenewal(threadId); } } return ttlRemaining; }); return new CompletableFutureWrapper<>(f); }
在tryAcquireAsync方法中,主要分為兩段邏輯:
1.調(diào)用lua腳本加鎖:tryLockInnerAsync
2.看門狗:scheduleExpirationRenewal
看門狗在后面講,本小節(jié)重點還是在加鎖
// RedissonLock#tryLockInnerAsync <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId)); }
Redisson使用了 Hash 結(jié)構(gòu)來表示一個鎖,這樣 Hash 里面的 key 為線程id,value 為鎖的次數(shù)。這樣巧妙地解決了可重入鎖的問題。
下面我們來分析下這段 lua 腳本的邏輯(下面說的threadId都是指變量,不是說key就叫’threadId’):
- 如果鎖(hash結(jié)構(gòu))不存在,則創(chuàng)建,并添加一個鍵值對 (threadId : 1),并設(shè)置鎖的過期時間
- 如果鎖存在,則將鍵值對 threadId 對應(yīng)的值 + 1,并設(shè)置鎖的過期時間
- 如果不如何1,2點,則返回鎖的剩余過期時間
訂閱
讓我們把視線重新回到RedissonLock#tryLock中,當(dāng)經(jīng)過一些嘗試獲取鎖,超時判斷之后,代碼來到while循環(huán)中。這個while循環(huán)是個死循環(huán),只有成功獲取鎖或者超時,才會退出。一般死循環(huán)的設(shè)計中,都會有阻塞等待的代碼,否則如果循環(huán)中的邏輯短時間拿不到結(jié)果,會造成資源搶占和浪費。阻塞代碼就是下面這段
if (ttl >= 0 && ttl < time) { commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); }
commandExecutor.getNow(subscribeFuture).getLatch() 得到的是一個Semaphore信號量對象,這是jdk的內(nèi)置對象,Semaphore#tryAcquire表示阻塞并等待喚醒。那么信號量什么時候被喚醒呢?在訂閱方法中RedissonLock#subscribe。訂閱方法的邏輯也不少,咱們直接講其最終調(diào)用的處理方法
// LockPubSub#onMessage protected void onMessage(RedissonLockEntry value, Long message) { // 普通的解鎖走的是這個 if (message.equals(UNLOCK_MESSAGE)) { Runnable runnableToExecute = value.getListeners().poll(); if (runnableToExecute != null) { runnableToExecute.run(); } // 這里就是喚醒信號量的地方 value.getLatch().release(); // 這個是讀寫鎖用的 } else if (message.equals(READ_UNLOCK_MESSAGE)) { while (true) { Runnable runnableToExecute = value.getListeners().poll(); if (runnableToExecute == null) { break; } runnableToExecute.run(); } value.getLatch().release(value.getLatch().getQueueLength()); } }
value.getLatch().release() 也就是Semaphore#release ,會喚醒Semaphore#tryAcquire阻塞的線程
解鎖
上面我們聊了加鎖,本小節(jié)來聊下解鎖。調(diào)用路徑如下
// RedissonLock#unlock // RedissonBaseLock#unlockAsync(long threadId) public RFuture<Void> unlockAsync(long threadId) { // 調(diào)用lua解鎖 RFuture<Boolean> future = unlockInnerAsync(threadId); CompletionStage<Void> f = future.handle((opStatus, e) -> { // 取消看門狗 cancelExpirationRenewal(threadId); if (e != null) { throw new CompletionException(e); } if (opStatus == null) { IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); throw new CompletionException(cause); } return null; }); return new CompletableFutureWrapper<>(f); }
解鎖的邏輯不復(fù)雜,調(diào)用lua腳本解鎖以及取消看門狗??撮T狗晚點說,先說下lua解鎖
// RedissonLock#unlockInnerAsync protected RFuture<Boolean> unlockInnerAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;", Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }
老規(guī)矩,分析下這段lua:
- 如果鎖不存在,返回null
- 鎖的值減1,如果鎖的值大于0(也就是可重入鎖仍然有加鎖次數(shù)),則重新設(shè)置過期時間
- 如果鎖的值小于等于0,這說明可以真正解鎖了,刪除鎖并通過發(fā)布訂閱機制發(fā)布解鎖消息
從 lua 中可以看到,解鎖時會發(fā)布消息到 channel 中,加鎖方法RedissonLock#tryLock中有相對應(yīng)的訂閱操作。
看門狗
試想一個場景:程序執(zhí)行需要10秒,程序執(zhí)行完成才去解鎖,而鎖的存活時間只有5秒,也就是程序執(zhí)行到一半的時候鎖就可以被其他程序獲取了,這顯然不合適。那么怎么解決呢?
方式一:鎖永遠(yuǎn)存在,直到解鎖。不設(shè)置存活時間。
這種方法的弊端在于,如果程序沒解鎖就掛了,鎖就成了死鎖
方式二:依然設(shè)置鎖存活時間,但是監(jiān)控程序的執(zhí)行,如果程序還沒有執(zhí)行完成,則定期給鎖續(xù)期。
方式二就是Redisson的看門狗機制??撮T狗只有在沒有顯示指定鎖的持有時間(leaseTime)時才會生效。
// RedissonLock#tryAcquireAsync // RedissonBaseLock#scheduleExpirationRenewal protected void scheduleExpirationRenewal(long threadId) { // 創(chuàng)建ExpirationEntry,并放入EXPIRATION_RENEWAL_MAP中,下面的renewExpiration()方法會從map中再拿出來用 ExpirationEntry entry = new ExpirationEntry(); ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); if (oldEntry != null) { oldEntry.addThreadId(threadId); } else { entry.addThreadId(threadId); try { // 看門狗的具體邏輯 renewExpiration(); } finally { // 如果線程被中斷了,就取消看門狗 if (Thread.currentThread().isInterrupted()) { // 取消看門狗 cancelExpirationRenewal(threadId); } } } }
scheduleExpirationRenewal 方法處理了ExpirationEntry和如果出現(xiàn)異常則取消看門狗,具體看門狗邏輯在 renewExpiration 方法中
private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } // 創(chuàng)建延時任務(wù),延時時間是internalLockLeaseTime / 3 Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } // lua腳本判斷,如果鎖存在,則續(xù)期并返回true,不存在則返回false CompletionStage<Boolean> future = renewExpirationAsync(threadId); future.whenComplete((res, e) -> { if (e != null) { log.error("Can't update lock " + getRawName() + " expiration", e); EXPIRATION_RENEWAL_MAP.remove(getEntryName()); return; } if (res) { // 鎖續(xù)期成功,則再啟動一個延時任務(wù),繼續(xù)監(jiān)測 renewExpiration(); } else { // 取消看門狗 cancelExpirationRenewal(null); } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); ee.setTimeout(task); }
Timeout 是一個延時任務(wù),延時 internalLockLeaseTime / 3 時間執(zhí)行。任務(wù)的內(nèi)容主要是通過 renewExpirationAsync 方法對鎖進(jìn)行續(xù)期,如果續(xù)期失?。ń怄i了、鎖到期等),則取消看門狗,如果續(xù)期成功,則遞歸 renewExpiration 方法,繼續(xù)創(chuàng)建延時任務(wù)。
internalLockLeaseTime 也就是 lockWatchdogTimeout 參數(shù),默認(rèn)是 30 秒。
以上就是Redisson分布式鎖的源碼解讀分享的詳細(xì)內(nèi)容,更多關(guān)于Redisson分布式鎖的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
idea設(shè)置@Author文件頭注釋的實現(xiàn)步驟
本文主要介紹了idea設(shè)置@Author文件頭注釋的實現(xiàn)步驟,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-07-07解決DataInputStream?read不等于-1,socket文件傳輸只能傳輸一個文件無法傳輸多個問題
這篇文章主要介紹了解決DataInputStream?read不等于-1,socket文件傳輸只能傳輸一個文件無法傳輸多個問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-08-08關(guān)于Spring Boot WebSocket整合以及nginx配置詳解
這篇文章主要給大家介紹了關(guān)于Spring Boot WebSocket整合以及nginx配置的相關(guān)資料,文中通過示例代碼給大家介紹的非常詳細(xì),相信對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)下吧。2017-09-09