Redis之Redisson原理詳解
1 Redisson
1.1 簡介
Redisson
顧名思義,Redis
的兒子,本質上還是 Redis
加鎖,不過是對 Redis
做了很多封裝,它不僅提供了一系列的分布式的 Java
常用對象,還提供了許多分布式服務。
1.2 與其他客戶端比較
Redisson
和Jedis
、Lettuce
有什么區(qū)別?
Redisson
和它倆的區(qū)別就像一個用鼠標操作圖形化界面,一個用命令行操作文件。Redisson
是更高層的抽象,Jedis
和Lettuce
是Redis
命令的封裝。Jedis
是Redis
官方推出的用于通過Java
連接Redis
客戶端的一個工具包,提供了Redis
的各種命令支持Lettuce
是一種可擴展的線程安全的Redis
客戶端,通訊框架基于Netty
,支持高級的Redis
特性,比如哨兵,集群,管道,自動重新連接和Redis
數(shù)據(jù)模型。Spring Boot 2.x
開始Lettuce
已取代Jedis
成為首選Redis
的客戶端。Redisson
是架設在Redis
基礎上,通訊基于Netty
的綜合的、新型的中間件,企業(yè)級開發(fā)中使用Redis
的最佳范本Jedis
把Redis
命令封裝好,Lettuce
則進一步有了更豐富的Api
,也支持集群等模式。但是兩者只給了你操作Redis
數(shù)據(jù)庫的腳手架,而Redisson
則是基于Redis
、Lua
和Netty
建立起了成熟的分布式解決方案,甚至redis官方都推薦的一種工具集
1.3 操作使用
1.3.1 pom.xml
在引入 Redisson
的依賴后,就可以直接進行調(diào)用:
<!-- 原生 --> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.13.4</version> </dependency> <!-- 或者 另一種Spring集成starter --> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.13.6</version> </dependency>
1.3.2 配置
@Configuration public class RedissionConfig { @Value("${spring.redis.host}") private String redisHost; @Value("${spring.redis.password}") private String password; private int port = 6379; @Bean public RedissonClient getRedisson() { Config config = new Config(); config.useSingleServer(). setAddress("redis://" + redisHost + ":" + port). setPassword(password); config.setCodec(new JsonJacksonCodec()); return Redisson.create(config); } }
1.3.3 啟用分布式鎖
@Resource private RedissonClient redissonClient; RLock rLock = redissonClient.getLock(lockName); try { boolean isLocked = rLock.tryLock(expireTime, TimeUnit.MILLISECONDS); if (isLocked) { // TODO } } catch (Exception e) { rLock.unlock(); }
簡潔明了,只需要一個RLock
,既然推薦Redisson,就往里面看看他是怎么實現(xiàn)的。
就是這么簡單,使用方法 jdk
的 ReentrantLock
差不多,并且也支持 ReadWriteLock
(讀寫鎖)、Reentrant Lock
(可重入鎖)、Fair Lock
(公平鎖)、RedLock
(紅鎖)等各種鎖,詳細可以參照redisson官方文檔來查看。
那么 Redisson
到底有哪些優(yōu)勢呢?鎖的自動續(xù)期(默認都是 30 秒)
,如果業(yè)務超長,運行期間會自動給鎖續(xù)上新的 30s
,不用擔心業(yè)務執(zhí)行時間超長而鎖被自動刪掉。
加鎖的業(yè)務只要運行完成,就不會給當前續(xù)期,即便不手動解鎖,鎖默認在 30s
后刪除,不會造成死鎖問題。
前面也提到了鎖的自動續(xù)期,我們來看看 Redisson
是如何來實現(xiàn)的。
1.4 大致操作原理
我們一起來看下Redisson
底層原理圖吧:
只要線程一加鎖成功,就會啟動一個watch dog
看門狗,它是一個后臺線程,會每隔10
秒檢查一下,如果線程一還持有鎖,那么就會不斷的延長鎖key
的生存時間。因此,Redisson
就是使用Redisson
解決了鎖過期釋放,業(yè)務沒執(zhí)行完問題。
1.5 RLock
RLock
是Redisson
分布式鎖的最核心接口
,繼承了concurrent
包的Lock
接口和自己的RLockAsync
接口,RLockAsync
的返回值都是RFuture
,是Redisson
執(zhí)行異步實現(xiàn)的核心邏輯,也是Netty
發(fā)揮的主要陣地。
1.5.1 RLock如何加鎖
從RLock
進入,找到RedissonLock
類,找到 tryLock
方法再遞進到干活的tryAcquireOnceAsync
方法,這是加鎖的主要代碼(版本不一此處實現(xiàn)有差別,和最新3.15.x有一定出入,但是核心邏輯依然未變。此處以3.13.6為例)
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { if (leaseTime != -1L) { return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN); } else { RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e == null) { if (ttlRemaining) { this.scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; } }
此處出現(xiàn)leaseTime
時間判斷的2個分支,實際上就是加鎖時是否設置過期時間,未設置過期時間(-1
)時則會有watchDog
的鎖續(xù)約 (下文),一個注冊了加鎖事件的續(xù)約任務。我們先來看有過期時間tryLockInnerAsync
部分,
evalWriteAsync
是eval命令執(zhí)行l(wèi)ua的入口
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { this.internalLockLeaseTime = unit.toMillis(leaseTime); return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)}); }
這里揭開真面目,eval
命令執(zhí)行Lua
腳本的地方,此處的Lua腳本展開
-- 不存在該key時 if (redis.call('exists', KEYS[1]) == 0) then -- 新增該鎖并且hash中該線程id對應的count置1 redis.call('hincrby', KEYS[1], ARGV[2], 1); -- 設置過期時間 redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; -- 存在該key 并且 hash中線程id的key也存在 if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then -- 線程重入次數(shù)++ redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);
redisson具體參數(shù)分析
// keyName KEYS[1] = Collections.singletonList(this.getName()) // leaseTime ARGV[1] = this.internalLockLeaseTime // uuid+threadId組合的唯一值 ARGV[2] = this.getLockName(threadId)
總共3個參數(shù)完成了一段邏輯:
- 判斷該鎖是否已經(jīng)有對應
hash
表存在, - 沒有對應的
hash
表:則set
該hash
表中一個entry的key為鎖名稱,value為1,之后設置該hash表失效時間為leaseTime
- 存在對應的
hash
表:則將該lockName
的value
執(zhí)行+1操作,也就是計算進入次數(shù),再設置失效時間leaseTime
- 最后返回這把鎖的ttl剩余時間
也和上述自定義鎖沒有區(qū)別
既然如此,那解鎖的步驟也肯定有對應的-1操作,再看unlock方法,同樣查找方法名,一路到
protected RFuture<Boolean> unlockInnerAsync(long threadId) { return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "...lua...腳本", Arrays.asList(this.getName(), this.getChannelName()), new Object[]{LockPubSub.unlockMessage, this.internalLockLeaseTime, this.getLockName(threadId)}); }
拿出Lua部分
-- 不存在key if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil; end; -- 計數(shù)器 -1 local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then -- 過期時間重設 redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else -- 刪除并發(fā)布解鎖消息 redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;
該Lua KEYS有2個Arrays.asList(getName()
,getChannelName())
- name 鎖名稱
- channelName,用于pubSub發(fā)布消息的channel名稱
ARGV
變量有三個LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)
LockPubSub.UNLOCK_MESSAGE
,channel發(fā)送消息的類別,此處解鎖為0internalLockLeaseTime
,watchDog配置的超時時間,默認為30slockName
這里的lockName指的是uuid和threadId組合的唯一值
步驟如下:
- 如果該鎖不存在則返回nil
- 如果該鎖存在則將其線程的hash key計數(shù)器-1
- 計數(shù)器
counter>0
,重置下失效時間,返回0;否則,刪除該鎖,發(fā)布解鎖消息unlockMessage,返回1;
其中unLock
的時候使用到了Redis
發(fā)布訂閱PubSub
完成消息通知。
而訂閱的步驟就在RedissonLock
的加鎖入口的lock方法里
long threadId = Thread.currentThread().getId(); Long ttl = this.tryAcquire(-1L, leaseTime, unit, threadId); if (ttl != null) { // 訂閱 RFuture<RedissonLockEntry> future = this.subscribe(threadId); if (interruptibly) { this.commandExecutor.syncSubscriptionInterrupted(future); } else { this.commandExecutor.syncSubscription(future); } // 省略
當鎖被其他線程占用時,通過監(jiān)聽鎖的釋放通知(在其他線程通過RedissonLock
釋放鎖時,會通過發(fā)布訂閱pub/sub
功能發(fā)起通知),等待鎖被其他線程釋放,也是為了避免自旋的一種常用效率手段
1.5.2 解鎖消息
為了一探究竟通知了什么,通知后又做了什么,進入LockPubSub
。
這里只有一個明顯的監(jiān)聽方法onMessage
,其訂閱和信號量的釋放都在父類PublishSubscribe
,我們只關注監(jiān)聽事件的實際操作
protected void onMessage(RedissonLockEntry value, Long message) { Runnable runnableToExecute; if (message.equals(unlockMessage)) { // 從監(jiān)聽器隊列取監(jiān)聽線程執(zhí)行監(jiān)聽回調(diào) runnableToExecute = (Runnable)value.getListeners().poll(); if (runnableToExecute != null) { runnableToExecute.run(); } // getLatch()返回的是Semaphore,信號量,此處是釋放信號量 // 釋放信號量后會喚醒等待的entry.getLatch().tryAcquire去再次嘗試申請鎖 value.getLatch().release(); } else if (message.equals(readUnlockMessage)) { while(true) { runnableToExecute = (Runnable)value.getListeners().poll(); if (runnableToExecute == null) { value.getLatch().release(value.getLatch().getQueueLength()); break; } runnableToExecute.run(); } } }
發(fā)現(xiàn)一個是默認解鎖消息
,一個是讀鎖解鎖消息
,因為redisson
是有提供讀寫鎖的,而讀寫鎖讀讀情況和讀寫、寫寫情況互斥情況不同,我們只看上面的默認解鎖消息unlockMessage
分支
LockPubSub監(jiān)聽最終執(zhí)行了2件事
runnableToExecute.run()
執(zhí)行監(jiān)聽回調(diào)value.getLatch().release()
; 釋放信號量
Redisson
通過LockPubSub
監(jiān)聽解鎖消息,執(zhí)行監(jiān)聽回調(diào)和釋放信號量通知等待線程可以重新?lián)屾i。
這時再回來看tryAcquireOnceAsync另一分支
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { if (leaseTime != -1L) { return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN); } else { RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e == null) { if (ttlRemaining) { this.scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; } }
可以看到,無超時時間時,在執(zhí)行加鎖操作后,還執(zhí)行了一段費解的邏輯
ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e == null) { if (ttlRemaining) { this.scheduleExpirationRenewal(threadId); } } }) } } })
此處涉及到Netty
的Future/Promise-Listener
模型,Redisson
中幾乎全部以這種方式通信(所以說Redisson
是基于Netty
通信機制實現(xiàn)的),理解這段邏輯可以試著先理解
在 Java 的
Future
中,業(yè)務邏輯為一個Callable
或Runnable
實現(xiàn)類,該類的call()或 run()
執(zhí)行完畢意味著業(yè)務邏輯的完結,在Promise
機制中,可以在業(yè)務邏輯中人工設置業(yè)務邏輯的成功與失敗,這樣更加方便的監(jiān)控自己的業(yè)務邏輯。
這塊代碼的表面意義就是,在執(zhí)行異步加鎖的操作后,加鎖成功則根據(jù)加鎖完成返回的ttl是否過期來確認是否執(zhí)行一段定時任務。
這段定時任務的就是watchDog的核心。
1.5.3 鎖續(xù)約
查看RedissonLock.this.scheduleExpirationRenewal(threadId)
private void scheduleExpirationRenewal(long threadId) { RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry(); RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry); if (oldEntry != null) { oldEntry.addThreadId(threadId); } else { entry.addThreadId(threadId); this.renewExpiration(); } } private void renewExpiration() { RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName()); if (ee != null) { Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() { public void run(Timeout timeout) throws Exception { RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName()); if (ent != null) { Long threadId = ent.getFirstThreadId(); if (threadId != null) { RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null) { RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e); } else { if (res) { RedissonLock.this.renewExpiration(); } } }); } } } }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS); ee.setTimeout(task); } }
拆分來看,這段連續(xù)嵌套且冗長的代碼實際上做了幾步:
- 添加一個
netty
的Timeout
回調(diào)任務,每(internalLockLeaseTime / 3)毫秒執(zhí)行一次,執(zhí)行的方法是renewExpirationAsync
renewExpirationAsync
重置了鎖超時時間,又注冊一個監(jiān)聽器,監(jiān)聽回調(diào)又執(zhí)行了renewExpiration
renewExpirationAsync
的Lua如下
protected RFuture<Boolean> renewExpirationAsync(long threadId) { return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)}); } if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;
重新設置了超時時間。Redisson
加這段邏輯的目的是什么?
目的是為了某種場景下保證業(yè)務不影響,如任務執(zhí)行超時但未結束,鎖已經(jīng)釋放的問題。
當一個線程持有了一把鎖,由于并未設置超時時間leaseTime
,Redisson
默認配置了30S,開啟watchDog
,每10S對該鎖進行一次續(xù)約,維持30S的超時時間,直到任務完成再刪除鎖。
這就是Redisson
的鎖續(xù)約 ,也就是WatchDog
實現(xiàn)的基本思路。
1.5.4 流程概括
通過整體的介紹,流程簡單概括:
- A、B線程爭搶一把鎖,A獲取到后,B阻塞
- B線程阻塞時并非主動
CAS
,而是PubSub
方式訂閱該鎖的廣播消息 - A操作完成釋放了鎖,B線程收到訂閱消息通知
- B被喚醒開始繼續(xù)搶鎖,拿到鎖
詳細加鎖解鎖流程總結如下圖:
1.6 公平鎖
以上介紹的可重入鎖是非公平鎖,Redisson
還基于Redis
的隊列(List)和ZSet實現(xiàn)了公平鎖
1.6.1 java中公平鎖
公平的定義是什么?
公平就是按照客戶端的請求先來后到排隊來獲取鎖,先到先得,也就是FIFO,所以隊列和容器順序編排必不可少
回顧JUC
的ReentrantLock
公平鎖的實現(xiàn)
/** * Sync object for fair locks */ static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); } /** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
AQS
已經(jīng)提供了整個實現(xiàn),是否公平取決于實現(xiàn)類取出節(jié)點邏輯是否順序取
AbstractQueuedSynchronizer
是用來構建鎖或者其他同步組件的基礎框架,通過內(nèi)置FIFO
隊列來完成資源獲取線程的排隊工作,自身沒有實現(xiàn)同步接口,僅僅定義了若干同步狀態(tài)獲取和釋放的方法來供自定義同步組件使用(上圖),支持獨占
和共享
獲取,這是基于模版方法模式的一種設計,給公平/非公平提供了土壤。
我們用2張圖來簡單解釋AQS的等待流程
一張是同步隊列(FIFO雙向隊列)管理 獲取同步狀態(tài)失?。〒屾i失?。┑木€程引用、等待狀態(tài)和前驅后繼節(jié)點的流程圖
一張是獨占式獲取同步狀態(tài)的總流程 ,核心acquire(int arg)方法調(diào)用流程
可以看出鎖的獲取流程AQS
維護一個同步隊列,獲取狀態(tài)失敗的線程都會加入到隊列中進行自旋,移出隊列或停止自旋的條件是前驅節(jié)點為頭節(jié)點切成功獲取了同步狀態(tài)。而比較另一段非公平鎖類NonfairSync
可以發(fā)現(xiàn),控制公平和非公平的關鍵代碼,在于hasQueuedPredecessors
方法。
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
NonfairSync
減少了了hasQueuedPredecessors
判斷條件,該方法的作用就是
- 查看同步隊列中當前節(jié)點是否有前驅節(jié)點,如果有比當前線程更早請求獲取鎖則返回true。
- 保證每次都取隊列的第一個節(jié)點(線程)來獲取鎖,這就是公平規(guī)則
為什么JUC以默認非公平鎖呢?
因為當一個線程請求鎖時,只要獲取來同步狀態(tài)即成功獲取。在此前提下,剛釋放的線程再次獲取同步狀態(tài)的幾率會非常大,使得其他線程只能在同步隊列中等待。但這樣帶來的好處是,非公平鎖大大減少了系統(tǒng)線程上下文的切換開銷。
可見公平的代價是性能與吞吐量。
Redis里沒有AQS,但是有List和zSet,看看Redisson是怎么實現(xiàn)公平的
1.6.2 RedissonFairLock
RedissonFairLock
用法依然很簡單
RLock fairLock = redissonClient.getFairLock(lockName);fairLock.lock();
RedissonFairLock
繼承自RedissonLock
,同樣一路向下找到加鎖實現(xiàn)方法tryLockInnerAsync
。
這里有2段冗長的Lua
,但是Debug發(fā)現(xiàn),公平鎖的入口在 command == RedisCommands.EVAL_LONG
之后,此段Lua較長,參數(shù)也多,我們著重分析Lua的實現(xiàn)規(guī)則
參數(shù)
-- lua中的幾個參數(shù) KEYS = Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName) KEYS[1]: lock_name, 鎖名稱 KEYS[2]: "redisson_lock_queue:{xxx}" 線程隊列 KEYS[3]: "redisson_lock_timeout:{xxx}" 線程id對應的超時集合 ARGV = internalLockLeaseTime, getLockName(threadId), currentTime + threadWaitTime, currentTime ARGV[1]: "{leaseTime}" 過期時間 ARGV[2]: "{Redisson.UUID}:{threadId}" ARGV[3] = 當前時間 + 線程等待時間:(10:00:00) + 5000毫秒 = 10:00:05 ARGV[4] = 當前時間(10:00:00) 部署服務器時間,非redis-server服務器時間
公平鎖實現(xiàn)的Lua腳本
-- 1.死循環(huán)清除過期key while true do -- 獲取頭節(jié)點 local firstThreadId2 = redis.call('lindex', KEYS[2], 0); -- 首次獲取必空跳出循環(huán) if firstThreadId2 == false then break; end; -- 清除過期key local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2)); if timeout <= tonumber(ARGV[4]) then redis.call('zrem', KEYS[3], firstThreadId2); redis.call('lpop', KEYS[2]); else break; end; end; -- 2.不存在該鎖 && (不存在線程等待隊列 || 存在線程等待隊列而且第一個節(jié)點就是此線程ID),加鎖部分主要邏輯 if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then -- 彈出隊列中線程id元素,刪除Zset中該線程id對應的元素 redis.call('lpop', KEYS[2]); redis.call('zrem', KEYS[3], ARGV[2]); local keys = redis.call('zrange', KEYS[3], 0, -1); -- 遍歷zSet所有key,將key的超時時間(score) - 當前時間ms for i = 1, #keys, 1 do redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]); end; -- 加鎖設置鎖過期時間 redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; -- 3.線程存在,重入判斷 if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then redis.call('hincrby', KEYS[1], ARGV[2],1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; -- 4.返回當前線程剩余存活時間 local timeout = redis.call('zscore', KEYS[3], ARGV[2]); if timeout ~= false then -- 過期時間timeout的值在下方設置,此處的減法算出的依舊是當前線程的ttl return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]); end; -- 5.尾節(jié)點剩余存活時間 local lastThreadId = redis.call('lindex', KEYS[2], -1); local ttl; -- 尾節(jié)點不空 && 尾節(jié)點非當前線程 if lastThreadId ~= false and lastThreadId ~= ARGV[2] then -- 計算隊尾節(jié)點剩余存活時間 ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]); else -- 獲取lock_name剩余存活時間 ttl = redis.call('pttl', KEYS[1]); end; -- 6.末尾排隊 -- zSet 超時時間(score),尾節(jié)點ttl + 當前時間 + 5000ms + 當前時間,無則新增,有則更新 -- 線程id放入隊列尾部排隊,無則插入,有則不再插入 local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]); if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then redis.call('rpush', KEYS[2], ARGV[2]); end; return ttl;
1.6.3 公平鎖加鎖步驟
通過以上Lua,可以發(fā)現(xiàn),lua操作的關鍵結構是列表(list)和有序集合(zSet)。
其中 list 維護了一個等待的線程隊列 redisson_lock_queue:{xxx}
,zSet維護了一個線程超時情況的有序集合 redisson_lock_timeout:{xxx}
,盡管lua較長,但是可以拆分為6個步驟
- 隊列清理
保證隊列中只有未過期的等待線程 - 首次加鎖
hset加鎖,pexpire過期時間 - 重入判斷
此處同可重入鎖lua - 返回ttl
- 計算尾節(jié)點ttl
初始值為鎖的剩余過期時間 - 末尾排隊
ttl + 2 * currentTime + waitTime是score的默認值計算公式
以上就是Redis之Redisson原理詳解的詳細內(nèi)容,更多關于Redisson原理的資料請關注腳本之家其它相關文章!
相關文章
基于Redis6.2.6版本部署Redis?Cluster集群的問題
這篇文章主要介紹了基于Redis6.2.6版本部署Redis?Cluster集群,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-04-04