Redis之Redisson原理詳解
1 Redisson
1.1 簡介
Redisson 顧名思義,Redis 的兒子,本質(zhì)上還是 Redis 加鎖,不過是對 Redis 做了很多封裝,它不僅提供了一系列的分布式的 Java 常用對象,還提供了許多分布式服務(wù)。
1.2 與其他客戶端比較
Redisson和Jedis、Lettuce有什么區(qū)別?
Redisson和它倆的區(qū)別就像一個用鼠標(biāo)操作圖形化界面,一個用命令行操作文件。Redisson是更高層的抽象,Jedis和Lettuce是Redis命令的封裝。Jedis是Redis官方推出的用于通過Java連接Redis客戶端的一個工具包,提供了Redis的各種命令支持Lettuce是一種可擴展的線程安全的Redis客戶端,通訊框架基于Netty,支持高級的Redis特性,比如哨兵,集群,管道,自動重新連接和Redis數(shù)據(jù)模型。Spring Boot 2.x開始Lettuce已取代Jedis成為首選Redis的客戶端。Redisson是架設(shè)在Redis基礎(chǔ)上,通訊基于Netty的綜合的、新型的中間件,企業(yè)級開發(fā)中使用Redis的最佳范本Jedis把Redis命令封裝好,Lettuce則進一步有了更豐富的Api,也支持集群等模式。但是兩者只給了你操作Redis數(shù)據(jù)庫的腳手架,而Redisson則是基于Redis、Lua和Netty建立起了成熟的分布式解決方案,甚至redis官方都推薦的一種工具集
1.3 操作使用
1.3.1 pom.xml
在引入 Redisson 的依賴后,就可以直接進行調(diào)用:
<!-- 原生 -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.4</version>
</dependency>
<!-- 或者 另一種Spring集成starter -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.13.6</version>
</dependency>1.3.2 配置
@Configuration
public class RedissionConfig {
@Value("${spring.redis.host}")
private String redisHost;
@Value("${spring.redis.password}")
private String password;
private int port = 6379;
@Bean
public RedissonClient getRedisson() {
Config config = new Config();
config.useSingleServer().
setAddress("redis://" + redisHost + ":" + port).
setPassword(password);
config.setCodec(new JsonJacksonCodec());
return Redisson.create(config);
}
}1.3.3 啟用分布式鎖
@Resource
private RedissonClient redissonClient;
RLock rLock = redissonClient.getLock(lockName);
try {
boolean isLocked = rLock.tryLock(expireTime, TimeUnit.MILLISECONDS);
if (isLocked) {
// TODO
}
} catch (Exception e) {
rLock.unlock();
}簡潔明了,只需要一個RLock,既然推薦Redisson,就往里面看看他是怎么實現(xiàn)的。
就是這么簡單,使用方法 jdk 的 ReentrantLock 差不多,并且也支持 ReadWriteLock(讀寫鎖)、Reentrant Lock(可重入鎖)、Fair Lock(公平鎖)、RedLock(紅鎖)等各種鎖,詳細可以參照redisson官方文檔來查看。
那么 Redisson 到底有哪些優(yōu)勢呢?鎖的自動續(xù)期(默認都是 30 秒),如果業(yè)務(wù)超長,運行期間會自動給鎖續(xù)上新的 30s,不用擔(dān)心業(yè)務(wù)執(zhí)行時間超長而鎖被自動刪掉。
加鎖的業(yè)務(wù)只要運行完成,就不會給當(dāng)前續(xù)期,即便不手動解鎖,鎖默認在 30s 后刪除,不會造成死鎖問題。
前面也提到了鎖的自動續(xù)期,我們來看看 Redisson 是如何來實現(xiàn)的。
1.4 大致操作原理
我們一起來看下Redisson底層原理圖吧:

只要線程一加鎖成功,就會啟動一個watch dog看門狗,它是一個后臺線程,會每隔10秒檢查一下,如果線程一還持有鎖,那么就會不斷的延長鎖key的生存時間。因此,Redisson就是使用Redisson解決了鎖過期釋放,業(yè)務(wù)沒執(zhí)行完問題。
1.5 RLock
RLock是Redisson分布式鎖的最核心接口,繼承了concurrent包的Lock接口和自己的RLockAsync接口,RLockAsync的返回值都是RFuture,是Redisson執(zhí)行異步實現(xiàn)的核心邏輯,也是Netty發(fā)揮的主要陣地。
1.5.1 RLock如何加鎖
從RLock進入,找到RedissonLock類,找到 tryLock 方法再遞進到干活的tryAcquireOnceAsync 方法,這是加鎖的主要代碼(版本不一此處實現(xiàn)有差別,和最新3.15.x有一定出入,但是核心邏輯依然未變。此處以3.13.6為例)
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1L) {
return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
} else {
RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining) {
this.scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
}此處出現(xiàn)leaseTime時間判斷的2個分支,實際上就是加鎖時是否設(shè)置過期時間,未設(shè)置過期時間(-1)時則會有watchDog 的鎖續(xù)約 (下文),一個注冊了加鎖事件的續(xù)約任務(wù)。我們先來看有過期時間tryLockInnerAsync 部分,
evalWriteAsync是eval命令執(zhí)行l(wèi)ua的入口
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
}這里揭開真面目,eval命令執(zhí)行Lua腳本的地方,此處的Lua腳本展開
-- 不存在該key時
if (redis.call('exists', KEYS[1]) == 0) then
-- 新增該鎖并且hash中該線程id對應(yīng)的count置1
redis.call('hincrby', KEYS[1], ARGV[2], 1);
-- 設(shè)置過期時間
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
-- 存在該key 并且 hash中線程id的key也存在
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
-- 線程重入次數(shù)++
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
return redis.call('pttl', KEYS[1]);redisson具體參數(shù)分析
// keyName KEYS[1] = Collections.singletonList(this.getName()) // leaseTime ARGV[1] = this.internalLockLeaseTime // uuid+threadId組合的唯一值 ARGV[2] = this.getLockName(threadId)
總共3個參數(shù)完成了一段邏輯:
- 判斷該鎖是否已經(jīng)有對應(yīng)
hash表存在, - 沒有對應(yīng)的
hash表:則set該hash表中一個entry的key為鎖名稱,value為1,之后設(shè)置該hash表失效時間為leaseTime - 存在對應(yīng)的
hash表:則將該lockName的value執(zhí)行+1操作,也就是計算進入次數(shù),再設(shè)置失效時間leaseTime - 最后返回這把鎖的ttl剩余時間
也和上述自定義鎖沒有區(qū)別
既然如此,那解鎖的步驟也肯定有對應(yīng)的-1操作,再看unlock方法,同樣查找方法名,一路到
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "...lua...腳本", Arrays.asList(this.getName(), this.getChannelName()), new Object[]{LockPubSub.unlockMessage, this.internalLockLeaseTime, this.getLockName(threadId)});
}拿出Lua部分
-- 不存在key
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;
end;
-- 計數(shù)器 -1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then
-- 過期時間重設(shè)
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else
-- 刪除并發(fā)布解鎖消息
redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
return nil;該Lua KEYS有2個Arrays.asList(getName(),getChannelName())
- name 鎖名稱
- channelName,用于pubSub發(fā)布消息的channel名稱
ARGV變量有三個LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)
LockPubSub.UNLOCK_MESSAGE,channel發(fā)送消息的類別,此處解鎖為0internalLockLeaseTime,watchDog配置的超時時間,默認為30slockName這里的lockName指的是uuid和threadId組合的唯一值
步驟如下:
- 如果該鎖不存在則返回nil
- 如果該鎖存在則將其線程的hash key計數(shù)器-1
- 計數(shù)器
counter>0,重置下失效時間,返回0;否則,刪除該鎖,發(fā)布解鎖消息unlockMessage,返回1;
其中unLock的時候使用到了Redis發(fā)布訂閱PubSub完成消息通知。
而訂閱的步驟就在RedissonLock的加鎖入口的lock方法里
long threadId = Thread.currentThread().getId();
Long ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
if (ttl != null) {
// 訂閱
RFuture<RedissonLockEntry> future = this.subscribe(threadId);
if (interruptibly) {
this.commandExecutor.syncSubscriptionInterrupted(future);
} else {
this.commandExecutor.syncSubscription(future);
}
// 省略當(dāng)鎖被其他線程占用時,通過監(jiān)聽鎖的釋放通知(在其他線程通過RedissonLock釋放鎖時,會通過發(fā)布訂閱pub/sub功能發(fā)起通知),等待鎖被其他線程釋放,也是為了避免自旋的一種常用效率手段
1.5.2 解鎖消息
為了一探究竟通知了什么,通知后又做了什么,進入LockPubSub。
這里只有一個明顯的監(jiān)聽方法onMessage,其訂閱和信號量的釋放都在父類PublishSubscribe,我們只關(guān)注監(jiān)聽事件的實際操作
protected void onMessage(RedissonLockEntry value, Long message) {
Runnable runnableToExecute;
if (message.equals(unlockMessage)) {
// 從監(jiān)聽器隊列取監(jiān)聽線程執(zhí)行監(jiān)聽回調(diào)
runnableToExecute = (Runnable)value.getListeners().poll();
if (runnableToExecute != null) {
runnableToExecute.run();
}
// getLatch()返回的是Semaphore,信號量,此處是釋放信號量
// 釋放信號量后會喚醒等待的entry.getLatch().tryAcquire去再次嘗試申請鎖
value.getLatch().release();
} else if (message.equals(readUnlockMessage)) {
while(true) {
runnableToExecute = (Runnable)value.getListeners().poll();
if (runnableToExecute == null) {
value.getLatch().release(value.getLatch().getQueueLength());
break;
}
runnableToExecute.run();
}
}
}發(fā)現(xiàn)一個是默認解鎖消息 ,一個是讀鎖解鎖消息 ,因為redisson是有提供讀寫鎖的,而讀寫鎖讀讀情況和讀寫、寫寫情況互斥情況不同,我們只看上面的默認解鎖消息unlockMessage分支
LockPubSub監(jiān)聽最終執(zhí)行了2件事
runnableToExecute.run()執(zhí)行監(jiān)聽回調(diào)value.getLatch().release(); 釋放信號量
Redisson通過LockPubSub 監(jiān)聽解鎖消息,執(zhí)行監(jiān)聽回調(diào)和釋放信號量通知等待線程可以重新?lián)屾i。
這時再回來看tryAcquireOnceAsync另一分支
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1L) {
return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
} else {
RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining) {
this.scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
}可以看到,無超時時間時,在執(zhí)行加鎖操作后,還執(zhí)行了一段費解的邏輯
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining) {
this.scheduleExpirationRenewal(threadId);
}
}
}) } } })此處涉及到Netty的Future/Promise-Listener模型,Redisson中幾乎全部以這種方式通信(所以說Redisson是基于Netty通信機制實現(xiàn)的),理解這段邏輯可以試著先理解
在 Java 的
Future中,業(yè)務(wù)邏輯為一個Callable或Runnable實現(xiàn)類,該類的call()或 run()執(zhí)行完畢意味著業(yè)務(wù)邏輯的完結(jié),在Promise機制中,可以在業(yè)務(wù)邏輯中人工設(shè)置業(yè)務(wù)邏輯的成功與失敗,這樣更加方便的監(jiān)控自己的業(yè)務(wù)邏輯。
這塊代碼的表面意義就是,在執(zhí)行異步加鎖的操作后,加鎖成功則根據(jù)加鎖完成返回的ttl是否過期來確認是否執(zhí)行一段定時任務(wù)。
這段定時任務(wù)的就是watchDog的核心。
1.5.3 鎖續(xù)約
查看RedissonLock.this.scheduleExpirationRenewal(threadId)
private void scheduleExpirationRenewal(long threadId) {
RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry();
RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
this.renewExpiration();
}
}
private void renewExpiration() {
RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
if (ee != null) {
Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
if (ent != null) {
Long threadId = ent.getFirstThreadId();
if (threadId != null) {
RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);
} else {
if (res) {
RedissonLock.this.renewExpiration();
}
}
});
}
}
}
}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
}拆分來看,這段連續(xù)嵌套且冗長的代碼實際上做了幾步:
- 添加一個
netty的Timeout回調(diào)任務(wù),每(internalLockLeaseTime / 3)毫秒執(zhí)行一次,執(zhí)行的方法是renewExpirationAsync renewExpirationAsync重置了鎖超時時間,又注冊一個監(jiān)聽器,監(jiān)聽回調(diào)又執(zhí)行了renewExpiration
renewExpirationAsync 的Lua如下
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
}
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('pexpire', KEYS[1], ARGV[1]);
return 1;
end;
return 0;重新設(shè)置了超時時間。Redisson加這段邏輯的目的是什么?
目的是為了某種場景下保證業(yè)務(wù)不影響,如任務(wù)執(zhí)行超時但未結(jié)束,鎖已經(jīng)釋放的問題。
當(dāng)一個線程持有了一把鎖,由于并未設(shè)置超時時間leaseTime,Redisson 默認配置了30S,開啟watchDog,每10S對該鎖進行一次續(xù)約,維持30S的超時時間,直到任務(wù)完成再刪除鎖。
這就是Redisson的鎖續(xù)約 ,也就是WatchDog 實現(xiàn)的基本思路。
1.5.4 流程概括
通過整體的介紹,流程簡單概括:
- A、B線程爭搶一把鎖,A獲取到后,B阻塞
- B線程阻塞時并非主動
CAS,而是PubSub方式訂閱該鎖的廣播消息 - A操作完成釋放了鎖,B線程收到訂閱消息通知
- B被喚醒開始繼續(xù)搶鎖,拿到鎖
詳細加鎖解鎖流程總結(jié)如下圖:

1.6 公平鎖
以上介紹的可重入鎖是非公平鎖,Redisson還基于Redis的隊列(List)和ZSet實現(xiàn)了公平鎖
1.6.1 java中公平鎖
公平的定義是什么?
公平就是按照客戶端的請求先來后到排隊來獲取鎖,先到先得,也就是FIFO,所以隊列和容器順序編排必不可少
回顧JUC的ReentrantLock公平鎖的實現(xiàn)
/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}AQS已經(jīng)提供了整個實現(xiàn),是否公平取決于實現(xiàn)類取出節(jié)點邏輯是否順序取

AbstractQueuedSynchronizer是用來構(gòu)建鎖或者其他同步組件的基礎(chǔ)框架,通過內(nèi)置FIFO隊列來完成資源獲取線程的排隊工作,自身沒有實現(xiàn)同步接口,僅僅定義了若干同步狀態(tài)獲取和釋放的方法來供自定義同步組件使用(上圖),支持獨占和共享獲取,這是基于模版方法模式的一種設(shè)計,給公平/非公平提供了土壤。
我們用2張圖來簡單解釋AQS的等待流程
一張是同步隊列(FIFO雙向隊列)管理 獲取同步狀態(tài)失?。〒屾i失敗)的線程引用、等待狀態(tài)和前驅(qū)后繼節(jié)點的流程圖

一張是獨占式獲取同步狀態(tài)的總流程 ,核心acquire(int arg)方法調(diào)用流程

可以看出鎖的獲取流程AQS維護一個同步隊列,獲取狀態(tài)失敗的線程都會加入到隊列中進行自旋,移出隊列或停止自旋的條件是前驅(qū)節(jié)點為頭節(jié)點切成功獲取了同步狀態(tài)。而比較另一段非公平鎖類NonfairSync可以發(fā)現(xiàn),控制公平和非公平的關(guān)鍵代碼,在于hasQueuedPredecessors方法。
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}NonfairSync減少了了hasQueuedPredecessors判斷條件,該方法的作用就是
- 查看同步隊列中當(dāng)前節(jié)點是否有前驅(qū)節(jié)點,如果有比當(dāng)前線程更早請求獲取鎖則返回true。
- 保證每次都取隊列的第一個節(jié)點(線程)來獲取鎖,這就是公平規(guī)則
為什么JUC以默認非公平鎖呢?
因為當(dāng)一個線程請求鎖時,只要獲取來同步狀態(tài)即成功獲取。在此前提下,剛釋放的線程再次獲取同步狀態(tài)的幾率會非常大,使得其他線程只能在同步隊列中等待。但這樣帶來的好處是,非公平鎖大大減少了系統(tǒng)線程上下文的切換開銷。
可見公平的代價是性能與吞吐量。
Redis里沒有AQS,但是有List和zSet,看看Redisson是怎么實現(xiàn)公平的
1.6.2 RedissonFairLock
RedissonFairLock 用法依然很簡單
RLock fairLock = redissonClient.getFairLock(lockName);fairLock.lock();
RedissonFairLock繼承自RedissonLock,同樣一路向下找到加鎖實現(xiàn)方法tryLockInnerAsync 。
這里有2段冗長的Lua,但是Debug發(fā)現(xiàn),公平鎖的入口在 command == RedisCommands.EVAL_LONG 之后,此段Lua較長,參數(shù)也多,我們著重分析Lua的實現(xiàn)規(guī)則
參數(shù)
-- lua中的幾個參數(shù)
KEYS = Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName)
KEYS[1]: lock_name, 鎖名稱
KEYS[2]: "redisson_lock_queue:{xxx}" 線程隊列
KEYS[3]: "redisson_lock_timeout:{xxx}" 線程id對應(yīng)的超時集合
ARGV = internalLockLeaseTime, getLockName(threadId), currentTime + threadWaitTime, currentTime
ARGV[1]: "{leaseTime}" 過期時間
ARGV[2]: "{Redisson.UUID}:{threadId}"
ARGV[3] = 當(dāng)前時間 + 線程等待時間:(10:00:00) + 5000毫秒 = 10:00:05
ARGV[4] = 當(dāng)前時間(10:00:00) 部署服務(wù)器時間,非redis-server服務(wù)器時間公平鎖實現(xiàn)的Lua腳本
-- 1.死循環(huán)清除過期key
while true do
-- 獲取頭節(jié)點
local firstThreadId2 = redis.call('lindex', KEYS[2], 0);
-- 首次獲取必空跳出循環(huán)
if firstThreadId2 == false then
break;
end;
-- 清除過期key
local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));
if timeout <= tonumber(ARGV[4]) then
redis.call('zrem', KEYS[3], firstThreadId2);
redis.call('lpop', KEYS[2]);
else
break;
end;
end;
-- 2.不存在該鎖 && (不存在線程等待隊列 || 存在線程等待隊列而且第一個節(jié)點就是此線程ID),加鎖部分主要邏輯
if (redis.call('exists', KEYS[1]) == 0) and
((redis.call('exists', KEYS[2]) == 0) or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then
-- 彈出隊列中線程id元素,刪除Zset中該線程id對應(yīng)的元素
redis.call('lpop', KEYS[2]);
redis.call('zrem', KEYS[3], ARGV[2]);
local keys = redis.call('zrange', KEYS[3], 0, -1);
-- 遍歷zSet所有key,將key的超時時間(score) - 當(dāng)前時間ms
for i = 1, #keys, 1 do
redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);
end;
-- 加鎖設(shè)置鎖過期時間
redis.call('hset', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
-- 3.線程存在,重入判斷
if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then
redis.call('hincrby', KEYS[1], ARGV[2],1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
-- 4.返回當(dāng)前線程剩余存活時間
local timeout = redis.call('zscore', KEYS[3], ARGV[2]);
if timeout ~= false then
-- 過期時間timeout的值在下方設(shè)置,此處的減法算出的依舊是當(dāng)前線程的ttl
return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);
end;
-- 5.尾節(jié)點剩余存活時間
local lastThreadId = redis.call('lindex', KEYS[2], -1);
local ttl;
-- 尾節(jié)點不空 && 尾節(jié)點非當(dāng)前線程
if lastThreadId ~= false and lastThreadId ~= ARGV[2] then
-- 計算隊尾節(jié)點剩余存活時間
ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);
else
-- 獲取lock_name剩余存活時間
ttl = redis.call('pttl', KEYS[1]);
end;
-- 6.末尾排隊
-- zSet 超時時間(score),尾節(jié)點ttl + 當(dāng)前時間 + 5000ms + 當(dāng)前時間,無則新增,有則更新
-- 線程id放入隊列尾部排隊,無則插入,有則不再插入
local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);
if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then
redis.call('rpush', KEYS[2], ARGV[2]);
end;
return ttl;1.6.3 公平鎖加鎖步驟
通過以上Lua,可以發(fā)現(xiàn),lua操作的關(guān)鍵結(jié)構(gòu)是列表(list)和有序集合(zSet)。
其中 list 維護了一個等待的線程隊列 redisson_lock_queue:{xxx},zSet維護了一個線程超時情況的有序集合 redisson_lock_timeout:{xxx},盡管lua較長,但是可以拆分為6個步驟
- 隊列清理
保證隊列中只有未過期的等待線程 - 首次加鎖
hset加鎖,pexpire過期時間 - 重入判斷
此處同可重入鎖lua - 返回ttl
- 計算尾節(jié)點ttl
初始值為鎖的剩余過期時間 - 末尾排隊
ttl + 2 * currentTime + waitTime是score的默認值計算公式
以上就是Redis之Redisson原理詳解的詳細內(nèi)容,更多關(guān)于Redisson原理的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
基于Redis6.2.6版本部署Redis?Cluster集群的問題
這篇文章主要介紹了基于Redis6.2.6版本部署Redis?Cluster集群,本文給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-04-04

