分布式鎖redisson實現(xiàn)原理源碼詳解
一、簡介
? 現(xiàn)在項目一般都是使用分布式集群部署,對后臺業(yè)務(wù)數(shù)據(jù)的某些操作需要考慮加鎖的問題,而jdk的synchronize加鎖機制已經(jīng)不適合做集群部署的操作,因為synchronize關(guān)鍵字只是針對于單體部署的單臺虛擬機有用??紤]到現(xiàn)在系統(tǒng)使用redis做緩存比較高效,此處推薦使用redis下的分布式鎖redisson進(jìn)行加鎖操作。官網(wǎng)參考:https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95。
二、工程引入配置
1.工程中需要引入redis、redisson依賴,pom.xml中引入:
<!--redis依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.3.2.RELEASE</version>
</dependency>
<!--redisson依賴-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.16.4</version>
</dependency>
<!--使用redis時需要此jar包-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>2.配置文件yml中添加redis連接信息。
spring:
redis:
database: 0
host: xx.xx.xx.xx
port: 1316
password: xxxx
timeout: 3000
lettuce:
pool:
max-active: 20
max-idle: 10
max-wait: -1
min-idle: 03.操作redis的客戶端選擇RedisTemplate,需要配置下存儲在redis的序列化值,使用@Bean注解,當(dāng)程序啟動時加載到spring容器中供后期使用,redis的相關(guān)操作,不在此處進(jìn)行,有需求的可以參考筆者的這篇博文:http://www.dbjr.com.cn/article/220998.htm。
@Configuration
public class RedisConfig {
Logger logger = LoggerFactory.getLogger(RedisConfig.class);
@Bean
public RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory factory) {
logger.debug("redisTemplate實例化 {}");
RedisTemplate<String,Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(factory);
FastJsonRedisSerializer fastJsonRedisSerializer = new FastJsonRedisSerializer<Object>(Object.class);
// key的序列化采用StringRedisSerializer
redisTemplate.setKeySerializer(new StringRedisSerializer());
// value值的序列化采用fastJsonRedisSerializer
redisTemplate.setValueSerializer(fastJsonRedisSerializer);
// hash的key也采用String的序列化方式
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
// hash的value序列化方式采用fastJsonRedisSerializer
redisTemplate.setHashValueSerializer(fastJsonRedisSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
}4.配置RedissonClient客戶端,用于加鎖操作,使用@Bean注解,當(dāng)程序啟動時加載到spring容器中供后期使用,配置客戶端需要根據(jù)redis服務(wù)的模式配置,有集群、主從、哨兵等模式,具體配置參考官網(wǎng):https://github.com/redisson/redisson/wiki/2.-%E9%85%8D%E7%BD%AE%E6%96%B9%E6%B3%95;此處使用的單節(jié)點模式配置。
@Configuration
public class RedissonConfig {
//redis相關(guān)配置
@Value("${spring.redis.host}")
private String redisHost;
@Value("${spring.redis.port}")
private String redisPort;
@Value("${spring.redis.database}")
private int database;
@Value("${spring.redis.password}")
private String password;
@Value("${spring.redis.timeout}")
private int timeout;
//創(chuàng)建redisson客戶端,此時默認(rèn)使用單節(jié)點
@Bean
public RedissonClient redissonClient(){
Config config = new Config();
config.useSingleServer().setAddress("redis://"+redisHost+":"+redisPort);
config.useSingleServer().setDatabase(database);
config.useSingleServer().setPassword(password);
config.useSingleServer().setTimeout(timeout);
RedissonClient redisson = Redisson.create(config);
return redisson;
}
}三、加鎖操作
? 操作特別簡單,通過RedissonClient獲取鎖,然后調(diào)用lock即可加鎖,解鎖使用unlock即可。
//在需要使用分布式鎖的類里面注入RedissonClient客戶端
@Autowired
RedissonClient redissonClient;
//根據(jù)鎖名稱獲取鎖
RLock lock = redissonClient.getLock("anyLock");
//加鎖
// 最常見的使用方法
lock.lock();
// 加鎖以后10秒鐘自動解鎖
lock.lock(10, TimeUnit.SECONDS);
// 嘗試加鎖,最多等待100秒,上鎖以后10秒自動解鎖
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
//解鎖
lock.unlock();
}
}四、原理分析
1.程序啟動創(chuàng)建RedissonClient時做了啥?
//創(chuàng)建客戶端 RedissonClient redisson = Redisson.create(config)
根據(jù)配置的config信息創(chuàng)建RedissonClient客戶端,創(chuàng)建連接redis的管理器、執(zhí)行redis命令的執(zhí)行器,并生成一個uuid值作為此客戶端的id,此id將會貫穿程序的一生,后面加鎖時需要使用此客戶端id+進(jìn)程號作為鎖hash值的key。此處創(chuàng)建的執(zhí)行器,會在后面創(chuàng)建鎖時復(fù)用。
protected Redisson(Config config) {
this.config = config;
//復(fù)制一份配置信息
Config configCopy = new Config(config);
//根據(jù)配置信息連接redis的方式創(chuàng)建連接管理器,分為單節(jié)點、集群、哨兵模式等
//此處會出創(chuàng)建UUID id = UUID.randomUUID()作為客戶端的唯一id
this.connectionManager = ConfigSupport.createConnectionManager(configCopy);
RedissonObjectBuilder objectBuilder = null;
if (config.isReferenceEnabled()) {
objectBuilder = new RedissonObjectBuilder(this);
}
//創(chuàng)建操作redis的執(zhí)行器
this.commandExecutor = new CommandSyncService(this.connectionManager, objectBuilder);
this.evictionScheduler = new EvictionScheduler(this.commandExecutor);
this.writeBehindService = new WriteBehindService(this.commandExecutor);
}2.創(chuàng)建鎖的時候做了啥?
//創(chuàng)建鎖
RLock lock = redissonClient.getLock("mylock");根據(jù)一個key值,進(jìn)行鎖的創(chuàng)建,最終的創(chuàng)建會在Redisson類中實現(xiàn),因為Redisson實現(xiàn)了RedissonClient接口。
public RLock getLock(String name) {
//this.commandExecutor執(zhí)行器為程序啟動時,創(chuàng)建RedissonClient客戶端時已經(jīng)生成
return new RedissonLock(this.commandExecutor, name);
}創(chuàng)建RedissonLock的時候,會連帶創(chuàng)建它的父類RedissonBaseLock、RedissonExpirable、RedissonObject,并把執(zhí)行器和鎖key一并傳遞過去,供后面程序調(diào)用使用。
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
//創(chuàng)建父類RedissonBaseLock
super(commandExecutor, name);
//程序啟動時創(chuàng)建的執(zhí)行器復(fù)制一份給此類變量
this.commandExecutor = commandExecutor;
//WatchDog 鎖續(xù)期的時間,默認(rèn)是30秒
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}RedissonLock類的父級關(guān)系:

3.嘗試獲取鎖都做了啥?
//嘗試獲取鎖 boolean b = lock.tryLock(30, TimeUnit.SECONDS)
首先會調(diào)用到j(luò)dk包java.util.concurrent.locks下的嘗試獲取鎖方法:
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
由于繼承和實現(xiàn)接口,最終調(diào)用到RedissonLock的tryLock方法:
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
//waitTime為獲取鎖等待的時間,超過此時間獲取不到鎖則獲取鎖失敗,-1L表示沒有設(shè)置加鎖時間,默認(rèn)的加鎖30秒,為后續(xù)判斷是否加延期watch dog做標(biāo)識,unit為時間單位
return this.tryLock(waitTime, -1L, unit);
}具體看下獲取鎖的方法:
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
//獲取鎖等待時間
long time = unit.toMillis(waitTime);
//當(dāng)前時間,用于后面計算使用
long current = System.currentTimeMillis();
//當(dāng)前線程的id,用于后面加鎖、訂閱信息等使用
long threadId = Thread.currentThread().getId();
//嘗試獲取鎖,若是此key已經(jīng)加鎖,且不是當(dāng)前線程加的鎖,則返回此鎖還有多久過期,若是返回的是null則標(biāo)識加鎖成功
Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
//加鎖成功,直接返回獲取鎖成功
return true;
} else {
//計算鎖等待時間
time -= System.currentTimeMillis() - current;
if (time <= 0L) { //鎖等待時間小于0,則加鎖失敗,直接返回false
this.acquireFailed(waitTime, unit, threadId);
return false;
} else {
//再次獲取當(dāng)前時間
current = System.currentTimeMillis();
//根據(jù)線程id,訂閱鎖釋放事件,添加監(jiān)聽,當(dāng)鎖釋放了,通知等待的線程爭搶鎖資源
RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
//當(dāng)await返回的為false,表示等待時間已經(jīng)超出獲取鎖最大等待時間,取消訂閱并返回獲取鎖失敗
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
this.unsubscribe(subscribeFuture, threadId);
}
});
}
this.acquireFailed(waitTime, unit, threadId);
return false;
} else {
try {
//重新計算剩余等待時間
time -= System.currentTimeMillis() - current;
//等待時間為負(fù)數(shù),直接返回false
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
boolean var20 = false;
return var20;
} else {
boolean var16;
do {
long currentTime = System.currentTimeMillis();
//再次嘗試獲取鎖
ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
//獲取鎖成功直接返回
if (ttl == null) {
var16 = true;
return var16;
}
//計算剩余等待時間
time -= System.currentTimeMillis() - currentTime;
if (time <= 0L) { //獲取鎖失敗
this.acquireFailed(waitTime, unit, threadId);
var16 = false;
return var16;
}
currentTime = System.currentTimeMillis();
//當(dāng)鎖釋放的時間ttl小于等待獲取鎖的時間time,則讓線程掛起ttl的時間再進(jìn)行鎖的獲取,避免了無效的鎖申請浪費資源,使用LockSupport類的UNSAFE.park讓線程掛起一段時間
if (ttl >= 0L && ttl < time) {
((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
//當(dāng)?shù)却@取鎖的時間time小于鎖釋放的時間ttl,則讓線程掛起time的時間,再去獲取鎖,避免了無效的鎖申請浪費資源,使用LockSupport類的UNSAFE.park讓線程掛起一段時間,此時肯定是獲取鎖失敗,因為鎖還沒到釋放的時間
((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
//經(jīng)過時間的消耗,重新計算鎖的等待時間
time -= System.currentTimeMillis() - currentTime;
} while(time > 0L); //執(zhí)行循環(huán)獲取鎖的條件是:獲取鎖的剩余時間還大于0,否則跳出循環(huán),執(zhí)行后面的獲取鎖失敗程序
this.acquireFailed(waitTime, unit, threadId);
var16 = false;
return var16;
}
} finally {
//不管是否獲取到鎖,都需要取消訂閱鎖釋放事件
this.unsubscribe(subscribeFuture, threadId);
}
}
}
}
}首先根據(jù)進(jìn)程id獲取鎖,若是ttl返回的為null則表示獲取鎖成功,直接返回true;若是獲取到的ttl是一個時間值,則表示此鎖被其他線程占用,此值表示鎖過期的時間,則進(jìn)行后續(xù)的鎖釋放訂閱事件,通過redis的channel信道,異步信號量來監(jiān)聽鎖釋放機制,當(dāng)鎖釋放,繼續(xù)嘗試獲取鎖;每一步操作花費一定的時候后,都需要根據(jù)當(dāng)前時間減去進(jìn)行操作前的時間,結(jié)果值與獲取鎖等待的剩余時間做計算,若是等待時間小于0,則直接返回獲取鎖失敗。為了減少do while里面重復(fù)無效獲取鎖浪費資源,使用了LockSupport類的UNSAFE.park讓線程掛起一段時間,直到獲取鎖等待時間小于0則退出while循環(huán)。
嘗試獲取鎖的方法this.tryAcquire執(zhí)行的redis語句是一個lua腳本,之所以使用lua腳本是為了保證執(zhí)行的原子性,如下:
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
return redis.call('pttl', KEYS[1]);KEYS[1]代表加鎖的key上面的"mylock",ARGV[1]代表鎖的生存時間,默認(rèn)是30秒,ARGV[2]代表加鎖的hash值的key,由客戶端id+當(dāng)前線程id組成,客戶端id是程序啟動創(chuàng)建RedissonClient客戶端是生成的uuid。
上面redis含義為:
redis.call('exists', KEYS[1]) == 0:當(dāng)前key的鎖不存在
redis.call('hincrby', KEYS[1], ARGV[2], 1):則進(jìn)行加鎖,加鎖次數(shù)加1,類似于redis執(zhí)行HINCRBY myLock we65768xs-6752-4c23-278a-67ee2f1986jhf:43 1
redis.call('pexpire', KEYS[1], ARGV[1]):設(shè)置鎖過期時間,類似于pexpire myLock 30000
redis.call('hexists', KEYS[1], ARGV[2]) == 1:key已經(jīng)被當(dāng)前客戶端當(dāng)前線程加鎖了
redis.call('hincrby', KEYS[1], ARGV[2], 1):重新加鎖,加鎖次數(shù)加1
redis.call('pexpire', KEYS[1], ARGV[1]):設(shè)置加鎖時間,類似于pexpire myLock 30000
return redis.call('pttl', KEYS[1]):上面兩個if條件都不滿足,表示此key已經(jīng)加鎖了,且不是此線程加的鎖,返回此鎖還有多久過期語句連貫解釋:(1)當(dāng)此key鎖不存在,則進(jìn)行加鎖,存儲類型為hash,hash值的key為客戶端id+線程id,value為1,設(shè)置鎖的過期時間,默認(rèn)是30秒,語句結(jié)束返回(2)1不滿足,判斷是否為此客戶端的此線程加的鎖,若是,則加鎖次數(shù)value值加1,重新賦值鎖的過期時間,語句結(jié)束返回(3)前面兩個都不滿足,說明此key的鎖已經(jīng)被其他客戶端或者相同客戶端不同線程加上了,此時查詢此鎖的過期時間返回。
加鎖key的結(jié)構(gòu)說明:

同一客戶端+同一線程多次去獲取鎖,獲取到的話值value加1,redisson是可重入鎖,下面這樣重復(fù)加鎖的方式是允許的,幾次加鎖,需要使用幾個unlock解鎖。
//加鎖 lock.lock(); //加鎖 lock.lock(); //釋放鎖 lock.unlock(); lock.unlock();
測試重復(fù)加鎖可以使用debug得到當(dāng)前客戶端id+進(jìn)程號作為鎖hash值key,value給1,手動向redis中添加一行記錄,再執(zhí)行嘗試獲取鎖,此時redis中hash值value會加1變?yōu)?。

4.獲取不到鎖,訂閱鎖釋放機制是如何實現(xiàn)的?
? 當(dāng)獲取不到鎖,返回的ttl為鎖的過期時間時,往下繼續(xù)執(zhí)行獲取鎖的實現(xiàn),此時會執(zhí)行redis的訂閱鎖機制。
RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
具體訂閱機制實現(xiàn):
//參數(shù)entryName為創(chuàng)建鎖時創(chuàng)建RedissonLock的時候,連帶創(chuàng)建它的父類RedissonBaseLock,值為客戶端id加key值:this.entryName = this.id + ":" + name;
//參數(shù)channelName信道名稱為創(chuàng)建鎖時創(chuàng)建RedissonLock的時候,連帶創(chuàng)建它的父類RedissonObject,值為固定字符加鎖key:redisson_lock__channel+":{" + name + "}"
public RFuture<E> subscribe(String entryName, String channelName) {
//根據(jù)信道名稱創(chuàng)建信道,再獲取異步信號量
AsyncSemaphore semaphore = this.service.getSemaphore(new ChannelName(channelName));
//創(chuàng)建redisson信號量
RPromise<E> newPromise = new RedissonPromise();
//semaphore.acquire:Lambda表達(dá)式,把當(dāng)前創(chuàng)建的信號量添加到listeners線程集合中進(jìn)行監(jiān)聽
semaphore.acquire(() -> {
//redisson信號量取消
if (!newPromise.setUncancellable()) {
//釋放信號量
semaphore.release();
} else {
//從private final ConcurrentMap<String, E> entries = new ConcurrentHashMap();中獲取是否有此entry的信號量,此處使用ConcurrentHashMap是為了線程安全,并且提高效率,因為ConcurrentHashMap是線程安全的分段鎖
E entry = (PubSubEntry)this.entries.get(entryName);
//存在此信號量,則釋放信號量
if (entry != null) {
entry.acquire();
semaphore.release();
entry.getPromise().onComplete(new TransferListener(newPromise));
} else {
//不存在此entry,則創(chuàng)建
E value = this.createEntry(newPromise);
value.acquire();
//此處使用ConcurrentHashMap的putIfAbsent校驗是否已經(jīng)存在此entry,存在則不添加
E oldValue = (PubSubEntry)this.entries.putIfAbsent(entryName, value);
if (oldValue != null) { //存在則不添加,釋放信號量
oldValue.acquire();
semaphore.release();
oldValue.getPromise().onComplete(new TransferListener(newPromise));
} else { //不存在此entry的信號量
//創(chuàng)建一個信道的監(jiān)聽
RedisPubSubListener<Object> listener = this.createListener(channelName, value);
//訂閱信道的監(jiān)聽事件,當(dāng)鎖釋放時,信號量的release會被調(diào)用
this.service.subscribe(LongCodec.INSTANCE, channelName, semaphore, new RedisPubSubListener[]{listener});
}
}
}
});
return newPromise;
}通過redis的channel信道訂閱鎖釋放的事件,創(chuàng)建異步信號量AsyncSemaphore監(jiān)聽鎖釋放的機制,當(dāng)鎖釋放時調(diào)用信號量的release釋放方法,此時被信號量阻塞的線程就可以繼續(xù)嘗試獲取鎖,釋放鎖的方法如下:
//定義一個AtomicInteger類型的counter變量,記錄線程數(shù),AtomicInteger能保證在多線程下的安全性,其特性是加和減的時候先用當(dāng)前需要變化后的值和舊的值進(jìn)行比較,例如當(dāng)前需要加1,則用加后的結(jié)果值減去1,再和舊的值比較,一致了才進(jìn)行覆蓋的操作,保證多線程下的安全性
private final AtomicInteger counter;
//嘗試執(zhí)行線程
private void tryRun() {
if (this.counter.decrementAndGet() >= 0) {
Runnable listener = (Runnable)this.listeners.poll();
if (listener == null) {
this.counter.incrementAndGet();
return;
}
listener.run();
} else if (this.counter.incrementAndGet() > 0) {
this.tryRun();
}
}
//釋放信號量的方法
public void release() {
//信號量加1
this.counter.incrementAndGet();
//啟動監(jiān)聽
this.tryRun();
}5.避免無效申請鎖浪費資源是怎么實現(xiàn)的?
//當(dāng)鎖釋放的時間ttl小于等待獲取鎖的時間time,則讓線程掛起ttl的時間再進(jìn)行鎖的獲取,避免了無效的鎖申請浪費資源,使用LockSupport類的UNSAFE.park讓線程掛起一段時間
if (ttl >= 0L && ttl < time) {
((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
//當(dāng)?shù)却@取鎖的時間time小于鎖釋放的時間ttl,則讓線程掛起time的時間,再去獲取鎖,避免了無效的鎖申請浪費資源,,使用LockSupport類的UNSAFE.park讓線程掛起一段時間,此時肯定是獲取鎖失敗,因為鎖還沒到釋放的時間
((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}ttl是鎖釋放的時間,time是獲取鎖剩余的等待時間,此方法是放在do while里面的,為了不讓程序無效申請鎖浪費資源,在此處做的優(yōu)化。讓線程掛起一定的時間后再執(zhí)行獲取鎖,掛起的時間根據(jù)ttl和time的大小來定,若是滿足if條件,代表獲取鎖等待的時間比鎖釋放的時間要長,則讓程序掛起ttl的時間,這樣鎖已經(jīng)釋放了,再去獲?。蝗羰遣粷M足if條件,代表釋放鎖的時間比鎖等待的時間要長,則讓程序掛起time的時間,此時鎖還沒有釋放,但是獲取鎖的等待時間已經(jīng)到達(dá),繼續(xù)執(zhí)行while循環(huán),此時會跳出while,表示獲取鎖失敗。
LockSupport類掛起線程的方法:
public static void parkNanos(Object blocker, long nanos) {
//掛起時間大于0
if (nanos > 0) {
//獲取當(dāng)前線程
Thread t = Thread.currentThread();
//設(shè)置掛起的線程
setBlocker(t, blocker);
//public native void park(boolean isAbsolute, long time);第一個參數(shù)是是否是絕對時間,第二個參數(shù)是等待時間值
UNSAFE.park(false, nanos);
//移除掛起的線程
setBlocker(t, null);
}
}
//設(shè)置掛起的線程
private static void setBlocker(Thread t, Object arg) {
UNSAFE.putObject(t, parkBlockerOffset, arg);
}6.當(dāng)加鎖時間內(nèi)處理不完業(yè)務(wù),鎖續(xù)時是怎么處理的?
? 當(dāng)在加鎖時間范圍內(nèi),處理不完業(yè)務(wù),需要更新此鎖的過期時間,此處就需要redisson的一個watch dog機制進(jìn)行處理。注意watch dog機制只適用于鎖過期時間為默認(rèn)30秒的方式,自己配置的過期時間,盡管是配的30秒過期也不會啟用watch dog機制。
//嘗試獲取鎖,等待獲取時間30秒,此種方式會啟用watch dog boolean b = lock.tryLock(30, TimeUnit.SECONDS); //嘗試獲取鎖,等待獲取時間20秒,鎖的過期時間是30秒或者配置為其他任意值,都不會啟用watch dog boolean b = lock.tryLock(20, 30,TimeUnit.SECONDS);
源碼中當(dāng)沒有配置過期時間,默認(rèn)傳遞一個-1L,此-1L作為后面判斷是否要創(chuàng)建定時任務(wù)啟動watch dog的標(biāo)識:
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
return this.tryLock(waitTime, -1L, unit);
}若是配置了鎖的過期時間,盡管是配的30秒,也不會啟動watch dog,源碼中是直接把過期時間傳遞過去的,沒有做30秒轉(zhuǎn)成-1L的操作:
boolean tryLock(long var1, long var3, TimeUnit var5) throws InterruptedException;
源碼中獲取鎖,判斷是否啟用定時器的邏輯代碼:
//waitTime:獲取鎖等待的時間
//leaseTime:加鎖時間,沒有配置的情況傳遞的是-1L,有配置過期時間則直接傳遞的是過期時間
//unit:時間單位
//threadId:當(dāng)前線程號
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture ttlRemainingFuture;
if (leaseTime != -1L) { //不等于-1L表示用戶自己配置了過期時間,加鎖時傳遞用戶配置的過期時間
ttlRemainingFuture = this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else { //沒有配置鎖的過期時間,使用默認(rèn)的時間;internalLockLeaseTime為程序啟動時創(chuàng)建RedissonClient客戶端時設(shè)置的默認(rèn)值30秒,在創(chuàng)建鎖時,創(chuàng)建RedissonLock時已經(jīng)給此值賦上值
ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
//e==null表示獲取到鎖,獲取不到鎖會有鎖過期的時間
if (e == null) {
//ttlRemaining == null表示獲取到鎖
if (ttlRemaining == null) {
if (leaseTime != -1L) { //當(dāng)用戶配置了過期時間,則把過期時間賦值給internalLockLeaseTime變量,記錄鎖的過期時間
this.internalLockLeaseTime = unit.toMillis(leaseTime);
} else { //使用默認(rèn)的30秒過期時間,則創(chuàng)建定時器啟動watch dog續(xù)時
this.scheduleExpirationRenewal(threadId);
}
}
}
});
return ttlRemainingFuture;
}當(dāng)用戶配置了過期時間(leaseTime != -1L)時,獲取到鎖,在redis中存放的過期時間為用戶配置的時間;當(dāng)使用默認(rèn)的過期時間30秒,則在redis中存放的過期時間為程序啟動時默認(rèn)配置的30秒。當(dāng)獲取鎖的結(jié)果為null即成功時,進(jìn)行判斷是否要啟動watch dog 續(xù)時機制,若是用戶自己配置的過期時間,則給類中記錄此鎖過期的變量賦值上用戶設(shè)置的數(shù)據(jù),若是默認(rèn)30秒過期時間,則添加定時器啟動watch dog。
看下設(shè)置定時器的源碼:
//根據(jù)進(jìn)程id設(shè)置定時器啟動watch dog
protected void scheduleExpirationRenewal(long threadId) {
//創(chuàng)建一個entry對象
RedissonBaseLock.ExpirationEntry entry = new RedissonBaseLock.ExpirationEntry();
//從ConcurrentHashMap類型的EXPIRATION_RENEWAL_MAP變量中判斷此entry是否已經(jīng)存在
RedissonBaseLock.ExpirationEntry oldEntry = (RedissonBaseLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
if (oldEntry != null) { //已經(jīng)存在,更新進(jìn)程號
oldEntry.addThreadId(threadId);
} else { //不存在則添加,值引用直接添加進(jìn)程號
entry.addThreadId(threadId);
try {
//檢查表達(dá)式
this.renewExpiration();
} finally {
if (Thread.currentThread().isInterrupted()) {
//進(jìn)程中斷了,EXPIRATION_RENEWAL_MAP中移除entry,停止定時器
this.cancelExpirationRenewal(threadId);
}
}
}
}根據(jù)進(jìn)程號,從全局變量EXPIRATION_RENEWAL_MAP中看是否有此進(jìn)程的entry存在,存在則更新進(jìn)程號,不存在則添加進(jìn)去;當(dāng)線程終止了,需要從EXPIRATION_RENEWAL_MAP中移除entry,停止定時器。添加完成后開始檢驗鎖的過期時間,源碼為:
//檢查表達(dá)式
private void renewExpiration() {
//從全局變量中查詢線程的entry對象
RedissonBaseLock.ExpirationEntry ee = (RedissonBaseLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
if (ee != null) { //entry對象存在
//創(chuàng)建一個定時器,定時器執(zhí)行的時間this.internalLockLeaseTime / 3L,10秒鐘執(zhí)行一次
Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
//從全局變量中查詢線程的entry對象
RedissonBaseLock.ExpirationEntry ent = (RedissonBaseLock.ExpirationEntry)RedissonBaseLock.EXPIRATION_RENEWAL_MAP.get(RedissonBaseLock.this.getEntryName());
if (ent != null) {//entry對象存在
//獲取線程號
Long threadId = ent.getFirstThreadId();
if (threadId != null) {
//異步執(zhí)行判斷是否還持有鎖,持有鎖的話,再把鎖的過期時間更新為30秒,也是一個lua執(zhí)行腳本
RFuture<Boolean> future = RedissonBaseLock.this.renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) { //執(zhí)行更新鎖過期時間失敗
RedissonBaseLock.log.error("Can't update lock " + RedissonBaseLock.this.getRawName() + " expiration", e);
//從全局變量中移除entry RedissonBaseLock.EXPIRATION_RENEWAL_MAP.remove(RedissonBaseLock.this.getEntryName());
} else {
if (res) { //更新鎖為30秒過期成功,則重新調(diào)用renewExpiration方法,再次添加定時器檢查
RedissonBaseLock.this.renewExpiration();
} else {//已經(jīng)不存在此鎖,任務(wù)已經(jīng)完成,則EXPIRATION_RENEWAL_MAP中移除entry,停止定時器
RedissonBaseLock.this.cancelExpirationRenewal((Long)null);
}
}
});
}
}
}
//設(shè)置task的定時時間,指定時間單位
}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
//添加到定時器中
ee.setTimeout(task);
}
}從全局變量EXPIRATION_RENEWAL_MAP中獲取線程的entry,當(dāng)entry存在,則執(zhí)行一段檢查鎖是否存在,存在則更新過期時間為30秒的lua腳本,當(dāng)lua腳本的執(zhí)行結(jié)果返回成功,則重新調(diào)用renewExpiration方法,重新添加定時器任務(wù);當(dāng)lua腳本執(zhí)行失敗,則從EXPIRATION_RENEWAL_MAP中移除entry,停止定時器。
檢查鎖key是否存在,并更新鎖過期時間的方法renewExpirationAsync中l(wèi)ua腳本:
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('pexpire', KEYS[1], ARGV[1]);
return 1;
end;
return 0;redis腳本含義:
redis.call('hexists', KEYS[1], ARGV[2]) == 1:存在此key的鎖,并且是當(dāng)前客戶端下當(dāng)前線程所擁有
redis.call('pexpire', KEYS[1], ARGV[1]):重新給此key設(shè)置過期時間,更新為30秒每次執(zhí)行這段lua腳本滿足if條件,并且執(zhí)行成功,則此key的過期時間被重置為30秒,業(yè)務(wù)一直沒有處理完的話,會每隔十秒過期時間被重置為30秒。
五、方案優(yōu)缺點
1.優(yōu)點
(1)通過watch dog機制實現(xiàn)了鎖的續(xù)期問題。
(2)結(jié)合著redis一塊使用,系統(tǒng)性能更高。
(3)操作redis使用lua腳本,保證執(zhí)行的原子性。
(4)支持可重入鎖。
(5)使用了LockSupport的Unsafe.park使線程掛起,避免了重復(fù)無效獲取鎖浪費資源。
2.缺點
(1)在redis主從模式或者集群模式下,當(dāng)客戶端1在master節(jié)點加鎖成功,但是master節(jié)點還沒有異步復(fù)制數(shù)據(jù)給其他slave節(jié)點時,master節(jié)點宕機了,此時客戶端2來申請加鎖,會在新的master節(jié)點上加鎖成功,此時會存在多個客戶端加鎖成功的情況,可能會產(chǎn)生不必要的臟數(shù)據(jù)。
(2)watch dog 機制10秒觸發(fā)一次會消耗一定的服務(wù)器資源。
到此這篇關(guān)于源碼詳解分布式鎖redisson實現(xiàn)原理的文章就介紹到這了,更多相關(guān)分布式鎖redisson實現(xiàn)原理內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
@RequestMapping對不同參數(shù)的接收方式示例詳解
Spring?MVC框架中,@RequestMapping注解用于映射URL到控制器方法,不同的參數(shù)類型如簡單參數(shù)、實體參數(shù)、數(shù)組參數(shù)、集合參數(shù)、日期參數(shù)和JSON參數(shù),本文給大家介紹@RequestMapping對不同參數(shù)的接收方式,感興趣的朋友一起看看吧2024-10-10
Java gRPC攔截器簡單實現(xiàn)分布式日志鏈路追蹤器過程詳解
有請求的發(fā)送、處理,當(dāng)然就會有攔截器的需求,例如在服務(wù)端通過攔截器統(tǒng)一進(jìn)行請求認(rèn)證等操作,這些就需要攔截器來完成,今天松哥先和小伙伴們來聊一聊gRPC中攔截器的基本用法,后面我再整一篇文章和小伙伴們做一個基于攔截器實現(xiàn)的JWT認(rèn)證的gRPC2023-03-03
FeignClient實現(xiàn)接口調(diào)用方式(不同參數(shù)形式)
這篇文章主要介紹了FeignClient實現(xiàn)接口調(diào)用方式(不同參數(shù)形式),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-03-03
Java畢業(yè)設(shè)計實戰(zhàn)之財務(wù)預(yù)算管理系統(tǒng)的實現(xiàn)
這是一個使用了java+SSM+Jsp+Mysql+Layui+Maven開發(fā)的財務(wù)預(yù)算管理系統(tǒng),是一個畢業(yè)設(shè)計的實戰(zhàn)練習(xí),具有財務(wù)預(yù)算管理該有的所有功能,感興趣的朋友快來看看吧2022-02-02

