一文帶你剖析Redisson分布式鎖的原理
相信使用過redis的,或者正在做分布式開發(fā)的童鞋都知道redisson組件,它的功能很多,但我們使用最頻繁的應(yīng)該還是它的分布式鎖功能,少量的代碼,卻實現(xiàn)了加鎖、鎖續(xù)命(看門狗)、鎖訂閱、解鎖、鎖等待(自旋)等功能,我們來看看都是如何實現(xiàn)的。
加鎖
//獲取鎖對象 RLock redissonLock = redisson.getLock(lockKey); //加分布式鎖 redissonLock.lock();
根據(jù)redissonLock.lock()
方法跟蹤到具體的private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId)
方法,真正獲取加鎖的邏輯是在tryAcquireAsync
該方法中調(diào)用的tryLockInnerAsync()
方法,看看這個方法是怎么實現(xiàn)的?
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, // 判斷是否存在分布式鎖,getName()也就是KEYS[1],也就是鎖key名 "if (redis.call('exists', KEYS[1]) == 0) then " + // 加鎖,執(zhí)行hset 鎖key名 1 "redis.call('hset', KEYS[1], ARGV[2], 1); " + // 設(shè)置過期時間 "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + // 這個分支是redisson的重入鎖邏輯,鎖還在,鎖計數(shù)+1,重新設(shè)置過期時長 "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + // 返回鎖的剩余過期時長 "return redis.call('pttl', KEYS[1]);", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
發(fā)現(xiàn)底層是結(jié)合lua腳本實現(xiàn)了加鎖邏輯。
為什么底層結(jié)合了Lua腳本?Redis是在2.6推出了腳本功能,允許開發(fā)者使用Lua語言編寫腳本傳到redis執(zhí)行。使用腳本的好處如下:
1、減少網(wǎng)絡(luò)開銷:本來5次網(wǎng)絡(luò)請求的操作,可以用一個請求完成,原先5次請求的邏輯,可以一次性放到redis中執(zhí)行,較少了網(wǎng)絡(luò)往返時延。這點跟管道有點類似。
2、原子操作:Redis會將整個腳本作為一個整體執(zhí)行,中間不會被其他命令插入。管道不是原子的,不過
redis的批量操作命令(類似mset)是原子的。
也就意味著雖然腳本中有多條redis指令,那即使有多條線程并發(fā)執(zhí)行,在同一時刻也只有一個線程能夠執(zhí)行這段邏輯,等這段邏輯執(zhí)行完,分布式鎖也就獲取到了,其它線程再進來就獲取不到分布式鎖了。
鎖續(xù)命(自旋)
大家都聽過鎖續(xù)命,肯定也知道這里涉及到看門狗的概念。在調(diào)用tryLockInnerAsync()
方法時,第一個參數(shù)是commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout()
也就是默認的看門狗過期時間是private long lockWatchdogTimeout = 30 * 1000
毫秒。
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) { if (leaseTime != -1) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); // 添加監(jiān)聽器,判斷獲取鎖是否成功,成功的話,添加定時任務(wù):定期更新鎖過期時間 ttlRemainingFuture.addListener(new FutureListener<Long>() { @Override public void operationComplete(Future<Long> future) throws Exception { if (!future.isSuccess()) { return; } // 根據(jù)tryLockInnerAsync方法,加鎖成功,return nil 也就是null Long ttlRemaining = future.getNow(); // lock acquired if (ttlRemaining == null) { // 添加定時任務(wù):定期更新鎖過期時間 scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; }
當線程獲取到鎖后,會進入if (ttlRemaining == null)
分支,調(diào)用定期更新鎖過期時間scheduleExpirationRenewal
方法,我們看看該方法實現(xiàn):
private void scheduleExpirationRenewal(final long threadId) { if (expirationRenewalMap.containsKey(getEntryName())) { return; } Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // 檢測KEYS[1]鎖是否還在,在的話再次設(shè)置過期時間 "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); future.addListener(new FutureListener<Boolean>() { @Override public void operationComplete(Future<Boolean> future) throws Exception { expirationRenewalMap.remove(getEntryName()); if (!future.isSuccess()) { log.error("Can't update lock " + getName() + " expiration", future.cause()); return; } // 通過上面lua腳本執(zhí)行后會返回1,也就true,再次調(diào)用更新過期時間進行續(xù)期 if (future.getNow()) { // reschedule itself scheduleExpirationRenewal(threadId); } } }); } // 延遲 internalLockLeaseTime / 3再執(zhí)行續(xù)命 }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) { task.cancel(); } }
發(fā)現(xiàn)scheduleExpirationRenewal
方法只是用了Timeout作為任務(wù),并沒有使用java的Timer()之類的定時器,而是在Timeout任務(wù)run()方法中定義了RFuture對象,通過給RFuture對象設(shè)置listener,在listener中通過Lua腳本執(zhí)行結(jié)果進行判斷是否還需要進行續(xù)期。通過這樣的方式來給分布式鎖進行續(xù)期。
這種方式實現(xiàn)定時更新確實很巧妙,定期時間很靈活。
鎖訂閱及鎖等待
鎖訂閱是針對那些沒有獲取到分布式鎖的線程而言的。來看看整個獲取鎖的方法:
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException { long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired,獲取到鎖,直接退出 if (ttl == null) { return; } // 沒有獲取到鎖,進行訂閱 RFuture<RedissonLockEntry> future = subscribe(threadId); commandExecutor.syncSubscription(future); try { while (true) { ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { break; } // waiting for message if (ttl >= 0) { getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { getEntry(threadId).getLatch().acquire(); } } } finally { unsubscribe(future, threadId); } // get(lockAsync(leaseTime, unit)); }
當?shù)谝粋€線程獲取到鎖后,會在if (ttl == null)
分支進行返回,第二個及以后的線程進來在沒獲取到鎖時,只能接著走下面的邏輯,進行鎖的訂閱。
接著進入到一個while循環(huán),首先還是會進行一次嘗試獲取鎖(萬一此時第一個線程已經(jīng)釋放鎖了呢),通過tryAcquire(leaseTime, unit, threadId)
方法,如果沒有獲取到鎖的話,會返回鎖的剩余過期時間,如果剩余過期時間大于0,則當前線程通過Semaphore
信號號,將當前線程阻塞,底層執(zhí)行LockSupport.parkNanos(this, nanosTimeout)
線程掛起剩余過期時間后,會自動進行喚醒,再次執(zhí)行tryAcquire
嘗試獲取鎖。所有沒有獲取到鎖的線程都會執(zhí)行這個流程。
一定要等待剩余過期時間后才喚醒嗎?
假設(shè)線程一獲取到鎖,過期時間默認為30s,當前執(zhí)行業(yè)務(wù)邏輯已經(jīng)過了5s,那其他線程走到這里,則需要等待25s后才行進行喚醒,那萬一線程一執(zhí)行業(yè)務(wù)邏輯只要10s,那其他線程還需要等待20s嗎?這樣豈不是導致效率很低?
答案是否定的,詳細看解鎖邏輯。
解鎖
解鎖:redissonLock.unlock();
我們來看看具體的解鎖邏輯:
protected RFuture<Boolean> unlockInnerAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // 鎖不存在,發(fā)布unlockMessage解鎖消息,通知其他等待線程 "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + // 不存在該鎖,異常捕捉 "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + // redisson可重入鎖計數(shù)-1,依舊>0,則重新設(shè)置過期時間 "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + // redis刪除鎖,發(fā)布unlockMessage解鎖消息,通知其他等待線程 "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId)); }
發(fā)現(xiàn)解鎖邏輯底層也是用了一個lua腳本實現(xiàn)。具體的說明可以看代碼注釋,刪除鎖后,并發(fā)布解鎖消息,通知到其它線程,也就意味著不會其它等待的線程一直等待。
Semophore
信號量的訂閱中有個onMessage
方法,
protected void onMessage(RedissonLockEntry value, Long message) { // 喚醒線程 value.getLatch().release(message.intValue()); while (true) { Runnable runnableToExecute = null; synchronized (value) { Runnable runnable = value.getListeners().poll(); if (runnable != null) { if (value.getLatch().tryAcquire()) { runnableToExecute = runnable; } else { value.addListener(runnable); } } } if (runnableToExecute != null) { runnableToExecute.run(); } else { return; } } }
解鎖后通過if (opStatus)
分支取消鎖續(xù)期邏輯。
總結(jié)
總的來說,可以借助一張圖加深理解:
分布式鎖的整體實現(xiàn)很巧妙,借助lua腳本的原子性,實現(xiàn)了很多功能,當然redisson還有其它很多功能,比如為了解決主從集群中的異步復(fù)制會導致鎖丟失問題,引入了redlock機制,還有分布式下的可重入鎖等。
到此這篇關(guān)于一文帶你剖析Redisson分布式鎖的原理的文章就介紹到這了,更多相關(guān)Redisson分布式鎖內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
帶你了解Java數(shù)據(jù)結(jié)構(gòu)和算法之遞歸
這篇文章主要為大家介紹了Java數(shù)據(jù)結(jié)構(gòu)和算法之遞歸,具有一定的參考價值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助2022-01-01Java異常處理中同時有finally和return語句的執(zhí)行問題
這篇文章主要介紹了Java異常處理中同時有finally和return語句的執(zhí)行問題,首先確定的是一般finally語句都會被執(zhí)行...然后,需要的朋友可以參考下2015-11-11IDEA創(chuàng)建Maven工程Servlet的詳細教程
這篇文章主要介紹了IDEA創(chuàng)建Maven工程Servlet的詳細教程,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-10-10Spring MVC 文件、cookies的接收 與REST響應(yīng)詳
在SpringMVC中,使用@RequestPart注解可接收文件并處理多部分請求,同時可以通過@CookieValue和HttpServletResponse來獲取和設(shè)置Cookies,本文介紹Spring MVC 文件、cookies的接收 與REST響應(yīng),感興趣的朋友跟隨小編一起看看吧2024-09-09