Redisson分布式鎖源碼解析
Redisson鎖繼承Implements Reentrant Lock,所以具備 Reentrant Lock 鎖中的一些特性:超時(shí),重試,可中斷等。加上Redisson中Redis具備分布式的特性,所以非常適合用來做Java中的分布式鎖。 下面我們對其加鎖、解鎖過程中的源碼細(xì)節(jié)進(jìn)行一一分析。
鎖的接口定義了一下方法:

分布式鎖當(dāng)中加鎖,我們常用的加鎖接口:
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
下面我們來看一下方法的具體實(shí)現(xiàn):
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
final long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= (System.currentTimeMillis() - current);
if (time <= 0) {
acquireFailed(threadId);
return false;
}
current = System.currentTimeMillis();
final RFuture subscribeFuture = subscribe(threadId);
if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.addListener(new FutureListener() {
@Override
public void operationComplete(Future future) throws Exception {
if (subscribeFuture.isSuccess()) {
unsubscribe(subscribeFuture, threadId);
}
}
});
}
acquireFailed(threadId);
return false;
}
try {
time -= (System.currentTimeMillis() - current);
if (time <= 0) {
acquireFailed(threadId);
return false;
}
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= (System.currentTimeMillis() - currentTime);
if (time = 0 && ttl < time) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= (System.currentTimeMillis() - currentTime);
if (time <= 0) {
acquireFailed(threadId);
return false;
}
}
} finally {
unsubscribe(subscribeFuture, threadId);
}
// return get(tryLockAsync(waitTime, leaseTime, unit));
}
首先我們看到調(diào)用tryAcquire嘗試獲取鎖,在這里是否能獲取到鎖,是根據(jù)鎖名稱的過期時(shí)間TTL來判定的(TTL
下面我們接著看一下tryAcquire的實(shí)現(xiàn):
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
可以看到真正獲取鎖的操作經(jīng)過一層get操作里面執(zhí)行的,這里為何要這么操作,本人也不是太理解,如有理解錯(cuò)誤,歡迎指正。
get 是由CommandAsyncExecutor(一個(gè)線程Executor)封裝的一個(gè)Executor
設(shè)置一個(gè)單線程的同步控制器CountDownLatch,用于控制單個(gè)線程的中斷信息。個(gè)人理解經(jīng)過中間的這么一步:主要是為了支持線程可中斷操作。
public V get(RFuture future) {
if (!future.isDone()) {
final CountDownLatch l = new CountDownLatch(1);
future.addListener(new FutureListener() {
@Override
public void operationComplete(Future future) throws Exception {
l.countDown();
}
});
boolean interrupted = false;
while (!future.isDone()) {
try {
l.await();
} catch (InterruptedException e) {
interrupted = true;
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
}
// commented out due to blocking issues up to 200 ms per minute for each thread:由于每個(gè)線程的阻塞問題,每分鐘高達(dá)200毫秒
// future.awaitUninterruptibly();
if (future.isSuccess()) {
return future.getNow();
}
throw convertException(future);
}
我們進(jìn)一步往下看:
private RFuture tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener() {
@Override
public void operationComplete(Future future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
首先判斷鎖是否有超時(shí)時(shí)間,有過期時(shí)間的話,會在后面獲取鎖的時(shí)候設(shè)置進(jìn)去。沒有過期時(shí)間的話,則會用默認(rèn)的
private long lockWatchdogTimeout = 30 * 1000;
下面我們在進(jìn)一步往下分析真正獲取鎖的操作:
RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
我把里面的重點(diǎn)信息做了以下三點(diǎn)總結(jié):
1:真正執(zhí)行的是一段具有原子性的Lua腳本,并且最終也是由CommandAsynExecutor去執(zhí)行。
2:鎖真正持久化到Redis時(shí),用的hash類型key field value
3:獲取鎖的三個(gè)參數(shù):getName()是邏輯鎖名稱,例如:分布式鎖要鎖住的methodName+params;internalLockLeaseTime是毫秒單位的鎖過期時(shí)間;getLockName則是鎖對應(yīng)的線程級別的名稱,因?yàn)橹С窒嗤€程可重入,不同線程不可重入,所以這里的鎖的生成方式是:UUID+":"threadId。有的同學(xué)可能會問,這樣不是很縝密:不同的JVM可能會生成相同的threadId,所以Redission這里加了一個(gè)區(qū)分度很高的UUID;
Lua腳本中的執(zhí)行分為以下三步:
1:exists檢查redis中是否存在鎖名稱;如果不存在,則獲取成功;同時(shí)把邏輯鎖名稱KEYS[1],線程級別的鎖名稱[ARGV[2],value=1,設(shè)置到redis。并設(shè)置邏輯鎖名稱的過期時(shí)間ARGV[2],返回;
2:如果檢查到存在KEYS[1],[ARGV[2],則說明獲取成功,此時(shí)會自增對應(yīng)的value值,記錄重入次數(shù);并更新鎖的過期時(shí)間
3:key不存,直接返回key的剩余過期時(shí)間(-2)
- 詳解Spring Cache使用Redisson分布式鎖解決緩存擊穿問題
- 使用自定義注解實(shí)現(xiàn)redisson分布式鎖
- 詳解redis分布式鎖(優(yōu)化redis分布式鎖的過程及Redisson使用)
- SpringBoot整合Redisson實(shí)現(xiàn)分布式鎖
- Springboot中如何使用Redisson實(shí)現(xiàn)分布式鎖淺析
- Redisson實(shí)現(xiàn)Redis分布式鎖的幾種方式
- redisson分布式鎖的用法大全
- Java redisson實(shí)現(xiàn)分布式鎖原理詳解
- redisson 實(shí)現(xiàn)分布式鎖的源碼解析
相關(guān)文章
分布式系統(tǒng)中的降級熔斷設(shè)計(jì)問題面試
這篇文章主要為大家介紹了分布式系統(tǒng)中的降級熔斷設(shè)計(jì)問題面試解答,有需要的朋友可以借鑒參考下,希望能有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-03-03
解決Mybatis的@Param()注解導(dǎo)致分頁失效的問題
這篇文章主要介紹了解決Mybatis的@Param()注解導(dǎo)致分頁失效的問題,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-04-04
Java四舍五入時(shí)保留指定小數(shù)位數(shù)的五種方式
這篇文章主要介紹了Java四舍五入時(shí)保留指定小數(shù)位數(shù)的五種方式,幫助大家更好的理解和使用Java,感興趣的朋友可以了解下2020-09-09
SpringMVC數(shù)據(jù)響應(yīng)詳細(xì)介紹
Spring MVC 是 Spring 提供的一個(gè)基于 MVC 設(shè)計(jì)模式的輕量級 Web 開發(fā)框架,本質(zhì)上相當(dāng)于 Servlet,Spring MVC 角色劃分清晰,分工明細(xì),本章來講解SpringMVC數(shù)據(jù)響應(yīng)2023-02-02
數(shù)組和二維數(shù)組感覺用王者榮耀的裝備欄來舉例解釋,應(yīng)該更易懂一些。從基礎(chǔ)開始講,后續(xù)會講到JAVA高級,中間會穿插面試題和項(xiàng)目實(shí)戰(zhàn),希望能給大家?guī)韼椭?/div> 2022-03-03
Java實(shí)現(xiàn)局域網(wǎng)聊天室功能(私聊、群聊)
這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)局域網(wǎng)聊天室功能,包括私聊、群聊,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-05-05
Spring AI 使用本地 Ollama Embeddings的操作方法
使用 OpenAI 的 Embeddings 接口是有費(fèi)用的,如果想對大量文檔進(jìn)行測試,使用本地部署的 Embeddings 就能省去大量的費(fèi)用,所以我們嘗試使用本地的 Ollama Embeddings,這篇文章主要介紹了Spring AI 使用本地 Ollama Embeddings,需要的朋友可以參考下2024-05-05最新評論

