Redisson實現(xiàn)分布式鎖、鎖續(xù)約的案例
一、基礎(chǔ)
0)Redisson版本說明、案例
使用當前(2022年12月初)最新的版本:3.18.1;
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.18.1</version>
</dependency>案例
案例采用redis-cluster集群的方式;
public class Main {
public static void main(String[] args) throws Exception {
// 1.配置Redis-Cluster集群節(jié)點的ip和port
Config config = new Config();
config.useClusterServers()
.addNodeAddress("redis://127.0.0.1:7001")
.addNodeAddress("redis://127.0.0.1:7002")
.addNodeAddress("redis://127.0.0.1:7003")
.addNodeAddress("redis://127.0.0.1:7004");
// 2.創(chuàng)建Redisson的客戶端
RedissonClient redisson = Redisson.create(config);
// 3.測試Redisson可重?鎖的加鎖、釋放鎖
testLock(redisson);
}
private static void testLock(RedissonClient redisson) throws InterruptedException {
// 1.獲取key為"anyLock"的鎖對象
final RLock lock = redisson.getLock("test_lock");
boolean locked = true;
try {
//2.1:加鎖
lock.lock();
// 2.2:加鎖,并設(shè)置嘗試獲取鎖超時時間30s、鎖超時?動釋放的時間10s
// locked = lock.tryLock(30, 10, TimeUnit.SECONDS);
if (locked)
System.out.println("加鎖成功!" + new Date());
Thread.sleep(20 * 1000);
System.out.println("鎖邏輯執(zhí)行完畢!" + new Date());
} finally {
// 3.釋放鎖
lock.unlock();
}
}
}
1)Redisson連接Redis的方式
redission支持4種連接redis方式,分別為單機、主從、Sentinel、Cluster 集群;在分布式鎖的實現(xiàn)上區(qū)別在于hash槽的獲取方式。
具體配置方式見Redisson的GitHub(https://github.com/redisson/redisson/wiki/2.-%E9%85%8D%E7%BD%AE%E6%96%B9%E6%B3%95#21-%E7%A8%8B%E5%BA%8F%E5%8C%96%E9%85%8D%E7%BD%AE%E6%96%B9%E6%B3%95)

2)用到的Redis命令
分布式鎖主要需要以下redis命令:
EXISTS key:當 key 存在,返回1;不存在,返回0。
GETSET key value:將給定 key 的值設(shè)為 value ,并返回 key 的舊值 (old value);當 key 存在但不是字符串類型時,返回一個錯誤;當key不存在時,返回nil。
GET key:返回 key 所關(guān)聯(lián)的字符串值,如果 key 不存在那么返回 nil。
DEL key [KEY …]:刪除給定的一個或多個 key(不存在的 key 會被忽略),返回實際刪除的key的個數(shù)(integer)。
DEL key1 key2 key3HSET key field value:給一個key 設(shè)置一個{field=value}的組合值,如果key沒有就直接賦值并返回1;如果field已有,那么就更新value的值,并返回0。
HEXISTS key field:當key中存儲著field的時候返回1,如果key或者field有一個不存在返回0。
HINCRBY key field increment:將存儲在key中的哈希(Hash)對象中的指定字段field的值加上增量increment;
如果鍵key不存在,一個保存了哈希對象{field=value}的key將被創(chuàng)建;如果字段field不存在,在進行當前操作前,feild將被創(chuàng)建,且對應(yīng)的值被置為0;返回值是increment。PEXPIRE key milliseconds:設(shè)置存活時間,單位是毫秒。EXPIRE操作單位是秒。
PUBLISH channel message:向channel post一個message內(nèi)容的消息,返回接收消息的客戶端數(shù)。
3)用到的lua腳本語義
Redisson源碼中,執(zhí)行redis命令的是lua腳本,其中主要有如下幾個概念:
- redis.call():執(zhí)行redis命令。
- KEYS[n]:指腳本中第n個參數(shù),比如KEYS[1]指腳本中的第一個參數(shù)。
- ARGV[n]:指腳本中第n個參數(shù)的值,比如ARGV[1]指腳本中的第一個參數(shù)的值。
- 返回值中nil與false同一個意思。
在redis執(zhí)行l(wèi)ua腳本時,相當于一個redis級別的鎖,不能執(zhí)行其他操作,類似于原子操作,這也是redisson實現(xiàn)的一個關(guān)鍵點。
另外,如果lua腳本執(zhí)行過程中出現(xiàn)了異?;蛘遰edis服務(wù)器宕機了,會將腳本中已經(jīng)執(zhí)行的命令在AOF、RDB日志中刪除;即LUA腳本執(zhí)行報錯會進行回滾操作。
二、源碼分析
1、RLock

RLock接口主要繼承了Lock接口,并擴展了部分方法,比如:tryLock(long waitTime, long leaseTime, TimeUnit unit)方法中加入的leaseTime參數(shù),用來設(shè)置鎖的過期時間,如果超過leaseTime還沒有解鎖的話,redis就強制解鎖;leaseTime的默認時間是30s。
獲取RLock對象
RLock lock = redissonClient.getLock("test_lock");RLock對象表示?個鎖對象,我們要某一個key加鎖時,需要先獲取?個鎖對象。

這里并沒有具體請求Redis進行加鎖的邏輯,而只是調(diào)用RedissonLock的構(gòu)造函數(shù),設(shè)置一些變量。
2、加鎖流程
進入到Rlock#lock()方法,先看主流程;關(guān)于競爭鎖等待時間、鎖超時釋放時間的配置、使用,在流程中穿插著聊。
0)加鎖流程圖

1)加鎖到哪臺機器
lock()方法執(zhí)行鏈路:

走到這里,已經(jīng)可以看到加鎖的底層邏輯:LUA腳本。
而lua腳本只是??串字符串,作為evalWriteAsync()?法的?個參數(shù)?已;所以下?步進到evalWriteAsync()?法中:

走到這里會調(diào)用ConnectionManager#getEntry(String)方法;
在創(chuàng)建RedissonClient時,筆者配置的是Redis-Cluster,而走到這里卻會進入到MasterSlaveConnectionManager,實際上實例化的ConnectionManager就是RedisCluster模式下的ClusterConnectionManager,而ClusterConnectionManager繼承自MasterSlaveConnectionManager,并且ClusterConnectionManager沒有重寫getEntry(String)方法,所以會進入到MasterSlaveConnectionManager#getEntry(String)方法。
ConnectionManager#getEntry(String)方法會根據(jù)傳入的key名稱找到相應(yīng)的Redis節(jié)點、目標master。
Redis-Cluster集群中的數(shù)據(jù)分布式是 通過?個?個的hash slot來實現(xiàn)的,Redis-Cluster集群總共16384個hash slot,它們都 會被均勻分布到所有的master節(jié)點上;這里ClusterConnectionManager通過key名稱計算出相應(yīng)的hash slot方式如下:
?先通過key計算出CRC16值,然后 CRC16值對16384進?取模,進?得到hash slot。
@Override
public int calcSlot(String key) {
if (key == null) {
return 0;
}
int start = key.indexOf('{');
if (start != -1) {
int end = key.indexOf('}');
if (end != -1 && start + 1 < end) {
key = key.substring(start + 1, end);
}
}
int result = CRC16.crc16(key.getBytes()) % MAX_SLOT;
log.debug("slot {} for {}", result, key);
return result;
}這?計算出key的hash slot之后,就可以通過hash slot 去看?看哪個master上有這個hash slot,如果某個master上有個這個hash slot,那么這個 key當然就會落到該master節(jié)點上,執(zhí)?加鎖指令也就應(yīng)該在該master上執(zhí)?。
下面進入本文重點,可重入鎖的各種加鎖、釋放鎖。
2)Client第一次加鎖
在尋找應(yīng)該在哪臺Redis機器上加鎖時,在RedissonLock#tryLockInnerAsync()方法中我們看到了一堆LUA腳本:

LUA腳本參數(shù)解析:
- KEYS[1] 表示的是 getName() ,即鎖key的名稱,比如案例中的 test_lock;
- ARGV[1] 表示的是 internalLockLeaseTime 默認值是30s;
- ARGV[2] 表示的是 getLockName(threadId) ,唯一標識當前訪問線程,使用鎖對象id+線程id(UUID:ThreadId)方式表示,用于區(qū)分不同服務(wù)器上的線程。
- UUID用來唯?標識?個客戶端,因為會有多個客戶端的多個線程加鎖;
- 結(jié)合起來的UUID:ThreadId 表示:具體哪個客戶端上的哪個線程過來加鎖,通 過這樣的組合?式唯?標識?個線程。
LUA腳本邏輯:
- 如果鎖名稱不存在;
- 則向redis中添加一個key為test_lock的HASH結(jié)構(gòu)、添加一個field為線程id,值=1的鍵值對{field:increment},表示此線程的重入次數(shù)為1;
- 設(shè)置test_lock的過期時間,防止當前服務(wù)器出問題后導致死鎖,然后return nil; end;返回nil,lua腳本執(zhí)行完畢;
- 如果鎖存在,檢測當前線程是否持有鎖;
- 如果是當前線程持有鎖,hincrby將該線程重入的次數(shù)++;并重新設(shè)置鎖的過期時間;返回nil,lua腳本執(zhí)行完畢;
- 如果不是當前線程持有鎖,pttl返回鎖的過期時間,單位ms。
第一次加鎖時,key肯定不存在與master節(jié)點上;
會執(zhí)行下列LUA腳本對應(yīng)的Redis指令:
hset test_lock UUID:ThreadId 1 pexpire test_lock 30000
此時,Redis中多一個Hash結(jié)構(gòu)的key(test_lock):
test_lock :
{
UUID:ThreadId:1
}這里的1使用來做鎖重入的。
pexpire指令為test_lock這個key設(shè)置過期時間為30s,即:30s后這個key會?動過期被刪除,key對應(yīng)的鎖在那時也就被釋放了。
總體來看,加鎖的邏輯很簡單:
在key對應(yīng)的hash數(shù)據(jù)結(jié)構(gòu)中記錄了? 下當前是哪個客戶端的哪個線程過來加鎖了,然后設(shè)置了?下key的過期時間為30s。 3)加鎖成功之后的鎖續(xù)約
成功加鎖后,lua腳本返回nil,即null。

加鎖成功之后,tryLockInnerAsync()?法返回;再結(jié)合Java8的Stream,對加鎖結(jié)果進一步處理;
因為加鎖成功后返回的是nil,這是lua腳本的返回形式,體現(xiàn)到j(luò)ava代碼中的返回值為:null。
又由于RLock#lock()方法傳入的leaseTime是-1,所以進入到scheduleExpirationRenewal(long)方法做鎖續(xù)約。

renewExpirationAsync()方法負責做具體的鎖續(xù)約:
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
}這里LUA腳本的邏輯很簡單:
- 判斷當前key中,是否還被線程UUID:ThreadId持有鎖,持有則設(shè)置過期時間為30s(續(xù)命)。
鎖續(xù)約(看門狗機制)其實就是每次加鎖成功后,會?上開啟?個后臺線程, 每隔10s檢查?下key是否存在,如果存在就為key續(xù)期30s。
- 這里的10s,取自配置的
lockWatchdogTimeout參數(shù),默認為30 * 1000 ms; - 所以?個key往往當過期時間慢慢消逝到20s左右時就?會被定時任務(wù)重置為了30s,這樣就能保證:只要這個定時任務(wù)還在、這個key還在,就?直維持加鎖。
如果當前持有鎖的線程被中斷了,會停止鎖續(xù)約,即殺死看門狗;

protected void cancelExpirationRenewal(Long threadId) {
ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (task == null) {
return;
}
if (threadId != null) {
task.removeThreadId(threadId);
}
if (threadId == null || task.hasNoThreads()) {
Timeout timeout = task.getTimeout();
if (timeout != null) {
timeout.cancel();
}
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
}
}所謂的停止鎖續(xù)約,實際就是將當前線程的threadId從看門狗緩存中移除,后續(xù)在執(zhí)行鎖續(xù)約時,如果發(fā)現(xiàn)看門狗緩存中已經(jīng)沒有了當前線程threadId,則直接退出鎖續(xù)約 并且 不再延時10s開啟一個定時任務(wù)。
如果加鎖時指定了leaseTime > 0,則不會開門狗機制,表示強制鎖leaseTime 毫秒后過期。一共有三種加鎖方式可以做到,如下:
- RLock#lock(long leaseTime, TimeUnit unit)
- RLock#tryLock(long waitTime, long leaseTime, TimeUnit unit)
- RLock#lockInterruptibly(long leaseTime, TimeUnit unit)
4)重入加鎖(相同線程多次加鎖)
再次回到加鎖的LUA腳本:

同一個線程對分布式鎖多次加鎖時,會走以下邏輯:
- 判斷當前key是否被當前線程持有,如果是則增加鎖重入的次數(shù),并重新設(shè)置鎖的過期時間為30s;
對應(yīng)的Redis命令為:
hexists test_lock UUID:ThreadId hincrby test_lock UUID:ThreadId 1 pexpire test_lock 30000
此時Redis中test_key對應(yīng)的數(shù)據(jù)結(jié)構(gòu)從
test_lock :
{
UUID:ThreadId:1
}變成:
test_lock :
{
UUID:ThreadId:2
}并將key的過期時間重新設(shè)置為30s。
鎖重入成功之后,后臺也會開啟?個watchdog后臺線程做鎖續(xù)約,每隔10s檢查?下key,如果key存在就將key的過期時間重新設(shè)置為30s。
Redisson可重?加鎖的語義,實際是通過Hash結(jié)構(gòu)的key中某個線程(UUID:ThreadId)對應(yīng)的加鎖次數(shù)來表示的。
5)鎖競爭(其他線程加鎖失?。?/p>
再再次回到加鎖的LUA腳本:

如果分布式鎖已經(jīng)被其他線程持有,LUA腳本會執(zhí)行以下邏輯:
返回當前key的剩余存活時間,因為不是返回nil,也就代表著加鎖失?。?/p>
對應(yīng)的Redis的命令為:
pttl test_lock
針對加鎖方式的不同,加鎖失敗的邏輯也不同;可以分兩大類:指定了加鎖失敗的等待時間waitTime和未指定waitTime。
- 未執(zhí)行加鎖失敗的等待時間
waitTime:獲取分布式鎖失敗會一直重試,直到獲取鎖成功。比如下列加鎖方法:Rlock#lock():一直嘗試獲取分布式鎖,直到獲取鎖成功。RLock#lockInterruptibly(long leaseTime, TimeUnit unit)RLock#lock(long leaseTime, TimeUnit unit)
- 指定了加鎖失敗的等待時間
waitTime:獲取分布式鎖會超時,超時之后返回加鎖失?。?ul> Rlock#tryLock(long waitTime, TimeUnit unit):指定獲取鎖失敗的等待時間。在等待時間范圍之內(nèi)進行重試,超時則返回加鎖失敗。Rlock#tryLock(long waitTime, long leaseTime, TimeUnit unit):同樣是指定獲取鎖失敗的等待時間,并且強制指定鎖過期的時間(不開啟看門狗)。在等待時間范圍之內(nèi)進行重試,超時則返回加鎖失敗。
可以簡單的概述為RLock接口下的tryLock()方法獲取鎖會失敗,lock()方法獲取鎖一定會成功。
1> 一直重試直到加鎖成功
以Rlock#lock()方法為例:
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
pubSub.timeout(future);
RedissonLockEntry entry;
if (interruptibly) {
entry = commandExecutor.getInterrupted(future);
} else {
entry = commandExecutor.get(future);
}
try {
while (true) {
// lock() 或 lockInterruptibly()為入口走到這里時。leaseTime為-1,表示會開始開門狗;如果leaseTime大于0,則不會開啟開門狗;
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
try {
// 因為Semaphore的可用資源為0,所以這里就等價于Thread.sleep(ttl);
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
entry.getLatch().acquire();
} else {
entry.getLatch().acquireUninterruptibly();
}
}
}
} finally {
unsubscribe(entry, threadId);
}
}
首先訂閱解鎖channel(命名格式:
redisson_lock__channel:{keyName}),其他線程解鎖后,會發(fā)布解鎖的消息;這里收到消息會立即嘗試獲取鎖;訂閱解鎖channel的超時時間默認為7.5s。也就說獲取鎖失敗7.5s之內(nèi),如果其他線程釋放鎖,當前線程可以立即嘗試獲取到鎖。獲取鎖失敗之后會進??個while死循環(huán)中:
每休息鎖的存活時間ttl之后,就嘗試去獲取鎖,直到成功獲取到鎖才會跳出while死循環(huán)。
2> 等待鎖超時返回加鎖失敗
以Rlock#tryLock(long waitTime, TimeUnit unit)為例:
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
// 獲取鎖剩余的等待時長
time -= System.currentTimeMillis() - current;
if (time <= 0) {
// 獲取鎖超時,返回獲取分布式鎖失敗
acquireFailed(waitTime, unit, threadId);
return false;
}
current = System.currentTimeMillis();
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
try {
// 訂閱解鎖channel的超時時長為 獲取鎖剩余的等待時長
subscribeFuture.get(time, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
if (!subscribeFuture.completeExceptionally(new RedisTimeoutException(
"Unable to acquire subscription lock after " + time + "ms. " +
"Try to increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."))) {
subscribeFuture.whenComplete((res, ex) -> {
if (ex == null) {
unsubscribe(res, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId);
return false;
} catch (ExecutionException e) {
acquireFailed(waitTime, unit, threadId);
return false;
}
try {
// 收到解鎖channel的消息之后,走到這里,再次判斷獲取鎖等待時長是否超時
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// while循環(huán)中嘗試去獲取鎖
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// waiting for message
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
// 如果獲取鎖失敗后,鎖存活時長 小于 剩余鎖等待時長,則線程睡眠 鎖存活時長
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
// 如果獲取鎖失敗后,鎖存活時間 大于等于 剩余鎖等待時長,則線程睡眠 鎖等待時長
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
}
}
加鎖存在超時時間 相比于 一直重試直到加鎖成功,只是多一個時間限制,具體差異體現(xiàn)在:訂閱解鎖channel的超時時長、獲取鎖失敗后線程的睡眠時長、重試獲取鎖次數(shù)的限制;
獲取分布式鎖失敗之后,立即判斷當前獲取鎖是否超時,如果超時,則返回加鎖失?。?/blockquote>否者,訂閱解鎖channel(命名格式:redisson_lock__channel:{keyName}),其他線程解鎖后,會發(fā)布解鎖的消息;訂閱解鎖channel的超時時間為獲取鎖剩余的等待時長。 在這個時間范圍之內(nèi),如果其他線程釋放鎖,當前線程收到解鎖channel的消息之后再次判斷獲取鎖是否超時,如果不超時,嘗試獲取鎖。獲取鎖之后會進??個while死循環(huán)中: 如果獲取鎖超時,則返回加鎖失?。?/blockquote>否者讓線程睡眠: 如果鎖存活時長ttl小于 剩余鎖等待時長,則線程睡眠 鎖存活時長;如果鎖存活時間ttl大于等于 剩余鎖等待時長,則線程睡眠 鎖等待時長;線程睡眠完之后,判斷獲取鎖是否超時,不超時則嘗試去獲取鎖。3、釋放鎖流程
1)Client主動嘗試釋放鎖
進入到Rlock#unlock()方法;
和加鎖的方式?樣,釋放鎖也是通過lua腳本來完成的;
LUA腳本參數(shù)解析:
- KEYS[1] 表示的是 getName() ,代表的是鎖名 test_lock;
- KEYS[2] 表示getChanelName() 表示的是發(fā)布訂閱過程中使用的Chanel;
- ARGV[1] 表示的是LockPubSub.unLockMessage,解鎖消息,實際代表的是數(shù)字 0,代表解鎖消息;
- ARGV[2] 表示的是internalLockLeaseTime 默認的有效時間 30s;
- ARGV[3] 表示的是 getLockName(thread.currentThread().getId()) 代表的是 UUID:ThreadId 用鎖對象id+線程id, 表示當前訪問線程,用于區(qū)分不同服務(wù)器上的線程。
LUA腳本邏輯:
- 如果鎖名稱不存在;
- 可能是因為鎖過期導致鎖不存在,也可能是并發(fā)解鎖。
- 則發(fā)布鎖解除的消息,返回1,lua腳本執(zhí)行完畢;
- 如果鎖存在,檢測當前線程是否持有鎖;
- 如果是當前線程持有鎖,定義變量counter,接收執(zhí)行incrby將該線程重入的次數(shù)–的結(jié)果;
- 如果重入次數(shù)大于0,表示該線程還有其他任務(wù)需要執(zhí)行;重新設(shè)置鎖的過期時間;返回0,lua腳本執(zhí)行完畢;
- 否則表示該線程執(zhí)行結(jié)束,del刪除該鎖;并且publish發(fā)布該鎖解除的消息;返回1,lua腳本執(zhí)行完畢;
- 如果不是當前線程持有鎖 或 其他情況,都返回nil,lua腳本執(zhí)行完畢。
腳本執(zhí)行結(jié)束之后,如果返回值不是0或1,即當前線程去釋放其他線程的加鎖時,拋出異常。
通過LUA腳本釋放鎖成功之后,會將看門狗殺死;
2)Client主動強制釋放鎖
forceUnlockAsync()方法被調(diào)用的地方很多,大多都是在清理資源時刪除鎖。@Override public RFuture<Boolean> forceUnlockAsync() { cancelExpirationRenewal(null); return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('del', KEYS[1]) == 1) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1 " + "else " + "return 0 " + "end", Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE); }LUA腳本邏輯:
邏輯比較簡單粗暴:刪除鎖成功則并發(fā)布鎖被刪除的消息,返回1結(jié)束,否則返回0結(jié)束。
3)Client宕機,鎖超時釋放
如果Redisson客戶端剛加鎖成功,并且未指定releaseTime,后臺會啟動一個定時任務(wù)watchdog每隔10s檢查key:key如果存在就為它?動續(xù)命到30s;在watchdog定時任務(wù)存在的情況下,如果不是主動釋放鎖,那么key將會?直的被watchdog這個定時任務(wù)維持加鎖。
但是如果客戶端宕機了,定時任務(wù)watchdog也就沒了,也就沒有鎖續(xù)約機制了,那么過完30s之后,key會?動被刪除、key對應(yīng)的鎖也自動被釋放了。
4)不啟動鎖續(xù)約的超時釋放鎖
如果在加鎖時指定了leaseTime,加鎖成功之后,后臺并不會啟動一個定時任務(wù)watchdog做鎖續(xù)約;key存活leaseTime 毫秒之后便會自動被刪除、key對應(yīng)的鎖也就自動被釋放了;無論當前線程的業(yè)務(wù)邏輯是否執(zhí)行完畢。
比如使用如下方式加鎖:
- RLock#lock(long leaseTime, TimeUnit unit)
- RLock#tryLock(long waitTime, long leaseTime, TimeUnit unit)
- RLock#lockInterruptibly(long leaseTime, TimeUnit unit)
到此這篇關(guān)于Redisson如何實現(xiàn)分布式鎖、鎖續(xù)約的文章就介紹到這了,更多相關(guān)redisson分布式鎖、鎖續(xù)約內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
如何利用Redis分布式鎖實現(xiàn)控制并發(fā)操作
這篇文章主要介紹了如何利用Redis分布式鎖實現(xiàn)控制并發(fā)操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-09-09
redis key過期監(jiān)聽的實現(xiàn)示例
在Redis中,我們可以為Key設(shè)置過期時間,當Key的過期時間到達后,Redis會自動將該Key標記為已失效,本文就來介紹一下redis key過期監(jiān)聽的實現(xiàn)示例,感興趣的可以了解一下2024-03-03
Redis字典實現(xiàn)、Hash鍵沖突及漸進式rehash詳解
這篇文章主要介紹了Redis字典實現(xiàn)、Hash鍵沖突以及漸進式rehash的相關(guān)知識,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-09-09
Linux下安裝Redis并設(shè)置相關(guān)服務(wù)
這篇文章主要為大家介紹了Linux下安裝Redis并設(shè)置相關(guān)服務(wù),感興趣的小伙伴們可以參考一下2016-01-01



