Redisson之分布式鎖原理全面分析
Redisson是一個(gè) Redis的開(kāi)源客戶(hù)端,也提供了分布式鎖的實(shí)現(xiàn)。
Redisson官網(wǎng):
Redisson 分布式鎖使用
Redisson分布式鎖使用起來(lái)還是蠻簡(jiǎn)單的。
1、添加 Redisson 配置類(lèi)
引入依賴(lài):
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.16.5</version>
</dependency>
創(chuàng)建 Redisson 配置類(lèi),注入 RedissonClient客戶(hù)端。
@Configuration
public class RedissonConfig {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private String port;
@Value("${spring.redis.password}")
private String password;
@Bean
public RedissonClient getRedisson() {
Config config = new Config();
/**
* reids配置,支持單機(jī)、主從、哨兵、集群等配置。這里使用單機(jī)配置
*/
config.useSingleServer().setAddress("redis://" + host + ":" + port).setPassword(password);
return Redisson.create(config);
}
}
2、使用 Redisson分布式鎖
代碼如下:
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Autowired
private RedissonClient redissonClient;
public void disLockDemo(long productId) {
String lockKey = "DISTRIBUTE_LOCK:redissonLock:product_" + productId;
//設(shè)置鎖定資源名稱(chēng),并獲取分布式鎖對(duì)象。
RLock redissonLock = redissonClient.getLock(lockKey);
//1.加鎖
redissonLock.lock();
//boolean isLock = disLock.tryLock(500, 15000, TimeUnit.MILLISECONDS);
try {
//2.執(zhí)行業(yè)務(wù)代碼
// TODO
//if (isLock) {
// TODO
//}
} finally {
//3.解鎖
redissonLock.unlock();
}
}

Redisson 分布式鎖源碼分析
使用分布式鎖必須要考慮的一些問(wèn)題:
- 互斥性:在任意時(shí)刻,只能有一個(gè)進(jìn)程持有鎖。
- 防死鎖:即使有一個(gè)進(jìn)程在持有鎖的期間崩潰而未能主動(dòng)釋放鎖,要有其他方式去釋放鎖從而保證其他進(jìn)程能獲取到鎖。
- 不能釋放別人的鎖:加鎖和解鎖的必須是同一個(gè)進(jìn)程。
- 鎖的續(xù)期問(wèn)題:業(yè)務(wù)執(zhí)行時(shí)間超過(guò)鎖的過(guò)期時(shí)間時(shí),需要提前給鎖的續(xù)期。
Redisson 是 Redis 官方推薦分布式鎖實(shí)現(xiàn)方案,它采用 Watch Dog機(jī)制能夠很好的解決鎖續(xù)期的問(wèn)題。
執(zhí)行 lua腳本保證了多條命令執(zhí)行的原子性操作。
帶著上面分布式鎖的一些問(wèn)題查看源碼。
1、獲取分布式鎖對(duì)象
簡(jiǎn)單了解一下。
1.1 創(chuàng)建 RedissonClient
我們?cè)谂渲妙?lèi)中通過(guò) Redisson.create(config)方法創(chuàng)建了 RedissonClient對(duì)象,并注入到 IOC容器中。

1.2 獲取分布式鎖對(duì)象
使用 Redisson 客戶(hù)端來(lái) 獲取分布式鎖對(duì)象。

2、加鎖代碼

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
//線(xiàn)程id
long threadId = Thread.currentThread().getId();
// 1.嘗試獲取鎖
Long ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
if (ttl != null) {
RFuture<RedissonLockEntry> future = this.subscribe(threadId);
if (interruptibly) {
this.commandExecutor.syncSubscriptionInterrupted(future);
} else {
this.commandExecutor.syncSubscription(future);
}
try {
//2.死循環(huán),反復(fù)去調(diào)用tryAcquire嘗試獲取鎖
while(true) {
// 再次嘗試獲取鎖
ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
// ttl為null時(shí)表示別的線(xiàn)程已經(jīng)unlock了,自己加鎖成功
if (ttl == null) {
return;
}
// 3.鎖互斥:通過(guò) JDK的信號(hào)量 Semaphore來(lái)阻塞線(xiàn)程
if (ttl >= 0L) {
try {
((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException var13) {
if (interruptibly) {
throw var13;
}
((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else if (interruptibly) {
((RedissonLockEntry)future.getNow()).getLatch().acquire();
} else {
((RedissonLockEntry)future.getNow()).getLatch().acquireUninterruptibly();
}
}
} finally {
// 4.無(wú)論是否獲得鎖,都要取消訂閱解鎖消息
this.unsubscribe(future, threadId);
}
}
}
2.1 異步加鎖機(jī)制
查看 tryAcquire()加鎖方法。

通過(guò)源碼,看到加鎖其實(shí)是通過(guò)一段 lua 腳本實(shí)現(xiàn)的,如下:
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end ;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end ;
return redis.call('pttl', KEYS[1]);
- KEYS[1] 代表的是你加鎖的 key。
- ARGV[1] 代表的是鎖 key 的默認(rèn)生存時(shí)間,默認(rèn) 30 秒。
- ARGV[2] 代表的是加鎖的客戶(hù)端的線(xiàn)程 ID。通過(guò)
getLockName方法組裝了一下。 - ARGV[2] 后面的 1:為了支持可重入鎖做的計(jì)數(shù)統(tǒng)計(jì)。
Redisson 實(shí)現(xiàn)分布式鎖的共享資源的存儲(chǔ)結(jié)構(gòu)是 hash數(shù)據(jù)結(jié)構(gòu):
key 是鎖的名稱(chēng),field 是客戶(hù)端 ID,value 是該客戶(hù)端加鎖(可重入)的次數(shù)。
假設(shè)此時(shí),客戶(hù)端 1 來(lái)嘗試加鎖,查看加鎖的 lua 腳本:
第一段 if 判斷語(yǔ)句,如果你要加鎖的那個(gè)鎖 key 不存在的話(huà),進(jìn)行加鎖。此時(shí)鎖 key不存在,向Redis中設(shè)置一個(gè) hash 結(jié)構(gòu)的數(shù)據(jù),則客戶(hù)端 1加鎖成功,返回 null。
2.1.1 鎖的續(xù)期機(jī)制
客戶(hù)端 1 加鎖的那個(gè)鎖 key 默認(rèn)生存時(shí)間才 30 秒,如果超過(guò)了 30 秒,客戶(hù)端 1 還想一直持有這把鎖,就需要提前進(jìn)行鎖的續(xù)期操作。
Redisson 提供了一個(gè) Watch dog 機(jī)制來(lái)解決鎖的續(xù)期問(wèn)題, 只要客戶(hù)端 1 一旦加鎖成功,就會(huì)啟動(dòng)一個(gè) Watch Dog。
進(jìn)入 scheduleExpirationRenewal方法,重點(diǎn)查看 renewExpiration方法。

鎖續(xù)期的 lua 腳本如下:
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('pexpire', KEYS[1], ARGV[1]);
return 1;
end ;
return 0;
從源碼我們看到 leaseTime 必須是 -1 才會(huì)開(kāi)啟 Watch Dog 機(jī)制,我們發(fā)現(xiàn):
- 如果想開(kāi)啟 Watch Dog 機(jī)制必須使用默認(rèn)的加鎖時(shí)間為 30s。
- 如果自己自定義時(shí)間,即使用 tryLock,鎖并不會(huì)延長(zhǎng),不會(huì)觸發(fā)Watch Dog 機(jī)制。
Watch Dog 機(jī)制其實(shí)就是一個(gè)后臺(tái)定時(shí)任務(wù)線(xiàn)程,獲取鎖成功之后,會(huì)將持有鎖的線(xiàn)程放入到一個(gè) RedissonLock.EXPIRATION_RENEWAL_MAP里面,
然后每隔 10 秒 (internalLockLeaseTime / 3) 檢查一下,如果客戶(hù)端 1 還持有鎖 key(判斷客戶(hù)端是否還持有 key,
其實(shí)就是遍歷 EXPIRATION_RENEWAL_MAP 里面線(xiàn)程 id 然后根據(jù)線(xiàn)程 id 去 Redis 中查,
如果存在就會(huì)延長(zhǎng) key 的時(shí)間),那么就會(huì)不斷的延長(zhǎng)鎖 key 的生存時(shí)間。
如果服務(wù)宕機(jī)了,Watch Dog 機(jī)制線(xiàn)程也就沒(méi)有了,此時(shí)就不會(huì)延長(zhǎng) key 的過(guò)期時(shí)間,到了 30s 之后就會(huì)自動(dòng)過(guò)期了,其他線(xiàn)程就可以獲取到鎖。
2.1.2 可重入加鎖機(jī)制
Redisson 也是支持可重入鎖的,比如:客戶(hù)端 1 加鎖代碼:
@Override
public void demo() {
RLock lock = redissonSingle.getLock("myLock");
try {
lock.lock();
// TODO 執(zhí)行業(yè)務(wù)
//鎖重入
lock.lock();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 釋放鎖
lock.unlock();
lock.unlock();
}
}
此時(shí),如果客戶(hù)端 1 又來(lái)嘗試加鎖,繼續(xù)分析加鎖的 lua 腳本。
- 首先,第一個(gè) if 判斷,你要加鎖的那個(gè)鎖 key 已經(jīng)存在了。
- 然后,第二個(gè) if 判斷,判斷一下,加鎖的那個(gè)鎖 key的 hash 數(shù)據(jù)結(jié)構(gòu)中,是否包含客戶(hù)端 1 的 ID,
- 此時(shí)數(shù)據(jù)結(jié)構(gòu)的是客戶(hù)端 1 的 ID,即包含客戶(hù)端 1的 ID,然后就執(zhí)行行可重入鎖的命令,將 hash 結(jié)構(gòu)的 value數(shù)據(jù) + 1,返回 null。
2.2 鎖互斥機(jī)制
上面客戶(hù)端 1加鎖成功,此時(shí),如果客戶(hù)端 2 來(lái)嘗試加鎖,繼續(xù)分析加鎖的 lua 腳本:
- 首先,第一個(gè) if 判斷,你要加鎖的那個(gè)鎖 key 已經(jīng)存在了。
- 然后,第二個(gè) if 判斷,判斷一下,加鎖的那個(gè)鎖 key的 hash 數(shù)據(jù)結(jié)構(gòu)中,是否包含客戶(hù)端 2 的 ID,如果包含就是執(zhí)行可重入鎖的賦值,此時(shí) hash數(shù)據(jù)結(jié)構(gòu)是客戶(hù)端 1 的 ID,不包含客戶(hù)端 2的 ID,所以,返回加鎖的那個(gè)鎖 key的剩余存活時(shí)間。
接著查看 lock方法中的 死循環(huán)部分。

流程大致如下:
- 嘗試獲取鎖,返回 null 則說(shuō)明加鎖成功,返回一個(gè)ttl,則說(shuō)明已經(jīng)存在該鎖,ttl 為鎖的剩余存活時(shí)間。
- 如果此時(shí)客戶(hù)端 2 進(jìn)程獲取鎖失敗,那么使用客戶(hù)端 2 的線(xiàn)程 id,通過(guò) Redis 的 channel 訂閱鎖釋放的事件。
- 進(jìn)入死循環(huán)中,嘗試重新獲取鎖。
- 如果在重試中拿到了鎖,則直接返回。
- 如果鎖當(dāng)前還是被占用的,那么等待釋放鎖的消息。通過(guò)使用了 JDK 的信號(hào)量 Semaphore 來(lái)阻塞線(xiàn)程,當(dāng) ttl 為鎖的剩余存活時(shí)間為0后,信號(hào)量的 release() 方法會(huì)被調(diào)用,此時(shí)被信號(hào)量阻塞的等待隊(duì)列中的一個(gè)線(xiàn)程就可以繼續(xù)嘗試獲取鎖了。
注意:
當(dāng)鎖正在被占用時(shí),等待獲取鎖的進(jìn)程并不是真正通過(guò)一個(gè) while(true) 死循環(huán)去獲取鎖(占 CPU資源),而時(shí)使用 JDK 的信號(hào)量 Semaphore 來(lái)阻塞線(xiàn)程(間斷性的不斷嘗試獲取鎖),是會(huì)釋放 CPU資源的。
3、鎖釋放代碼

public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise();
// 1. 異步釋放鎖
RFuture<Boolean> future = this.unlockInnerAsync(threadId);
// 2. 取消 Watch Dog 機(jī)制
future.onComplete((opStatus, e) -> {
this.cancelExpirationRenewal(threadId);
if (e != null) {
result.tryFailure(e);
} else if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);
result.tryFailure(cause);
} else {
result.trySuccess((Object)null);
}
});
return result;
}
3.1 異步釋放鎖機(jī)制
查看unlockInnerAsync方法。
釋放鎖也是執(zhí)行的 lua 腳本:
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;
end ;
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else
redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end ;
return nil;
首先,第一段 if 判斷語(yǔ)句,判斷 key 是否存在的話(huà),進(jìn)行加鎖。此時(shí)鎖 key不存在,則客戶(hù)端 1加鎖成功,向Redis中設(shè)置一個(gè) hash 結(jié)構(gòu)的數(shù)據(jù)。返回 null。
然后,第二個(gè) if 判斷,判斷一下該客戶(hù)端對(duì)應(yīng)的鎖的 hash 結(jié)構(gòu)的 value 值是否遞減為 0,
- 如果遞減不為 0,則重入鎖的解鎖,返回0。
- 如果遞減為 0,則進(jìn)行刪除,返回1。
3.2 取消 Watch Dog機(jī)制
查看 cancelExpirationRenewal方法。

取消 Watch Dog 機(jī)制,即將 RedissonLock.EXPIRATION_RENEWAL_MAP 里面的線(xiàn)程 id 刪除。
3.3 通知阻塞等待的進(jìn)程
利用 Redis 的發(fā)布訂閱機(jī)制,廣播釋放鎖的消息,通知阻塞等待的進(jìn)程(向通道名為 redisson_lock__channel publish 一條 UNLOCK_MESSAGE 信息)。
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Java?GUI實(shí)現(xiàn)學(xué)生成績(jī)管理系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了Java?GUI實(shí)現(xiàn)學(xué)生成績(jī)管理系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-01-01
使用@JsonFormat和@DateTimeFormat對(duì)Date格式化操作
這篇文章主要介紹了使用@JsonFormat和@DateTimeFormat對(duì)Date格式化操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08
SpringCloud @FeignClient注入Spring容器原理分析
本文詳細(xì)分析了Spring Boot中@FeignClient注解的掃描和注入過(guò)程,重點(diǎn)探討了@EnableFeignClients注解的工作原理,通過(guò)源碼分析,揭示了@EnableFeignClients如何通過(guò)@Import注解和FeignClientsRegistrar類(lèi)實(shí)現(xiàn)bean定義的加載2024-12-12
spring對(duì)JDBC和orm的支持實(shí)例詳解
這篇文章主要介紹了spring對(duì)JDBC和orm的支持實(shí)例詳解,需要的朋友可以參考下2017-09-09
解決rocketmq-spring-boot-starter導(dǎo)致的多消費(fèi)者實(shí)例重復(fù)消費(fèi)問(wèn)題
這篇文章主要介紹了解決rocketmq-spring-boot-starter導(dǎo)致的多消費(fèi)者實(shí)例重復(fù)消費(fèi)問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-06-06
JAVA匿名內(nèi)部類(lèi)(Anonymous Classes)的具體使用
本文主要介紹了JAVA匿名內(nèi)部類(lèi),匿名內(nèi)部類(lèi)在我們JAVA程序員的日常工作中經(jīng)常要用到,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-08-08
java 數(shù)值類(lèi)型分秒時(shí)間格式化的實(shí)例代碼
這篇文章主要介紹了java 數(shù)值類(lèi)型分秒時(shí)間格式化的實(shí)例代碼的相關(guān)資料,將秒或分鐘的值轉(zhuǎn)換為xx天xx小時(shí)xx分鐘xx秒 如果 “xx” 為0 自動(dòng)缺省,需要的朋友可以參考下2017-07-07
Spring時(shí)間戳(日期)格式轉(zhuǎn)換方式
這篇文章主要介紹了Spring時(shí)間戳(日期)格式轉(zhuǎn)換方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2025-03-03
Quarkus中實(shí)現(xiàn)Resteasy的文件上傳下載操作
這篇文章主要為大家介紹了Quarkus中實(shí)現(xiàn)Resteasy的文件上傳下載的操作過(guò)程步驟,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步2022-02-02

