Redisson之分布式鎖原理全面分析
Redisson是一個(gè) Redis的開源客戶端,也提供了分布式鎖的實(shí)現(xiàn)。
Redisson官網(wǎng):
Redisson 分布式鎖使用
Redisson分布式鎖使用起來還是蠻簡單的。
1、添加 Redisson 配置類
引入依賴:
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.16.5</version> </dependency>
創(chuàng)建 Redisson 配置類,注入 RedissonClient客戶端。
@Configuration public class RedissonConfig { @Value("${spring.redis.host}") private String host; @Value("${spring.redis.port}") private String port; @Value("${spring.redis.password}") private String password; @Bean public RedissonClient getRedisson() { Config config = new Config(); /** * reids配置,支持單機(jī)、主從、哨兵、集群等配置。這里使用單機(jī)配置 */ config.useSingleServer().setAddress("redis://" + host + ":" + port).setPassword(password); return Redisson.create(config); } }
2、使用 Redisson分布式鎖
代碼如下:
@Autowired private RedisTemplate<String, String> redisTemplate; @Autowired private RedissonClient redissonClient; public void disLockDemo(long productId) { String lockKey = "DISTRIBUTE_LOCK:redissonLock:product_" + productId; //設(shè)置鎖定資源名稱,并獲取分布式鎖對(duì)象。 RLock redissonLock = redissonClient.getLock(lockKey); //1.加鎖 redissonLock.lock(); //boolean isLock = disLock.tryLock(500, 15000, TimeUnit.MILLISECONDS); try { //2.執(zhí)行業(yè)務(wù)代碼 // TODO //if (isLock) { // TODO //} } finally { //3.解鎖 redissonLock.unlock(); } }
Redisson 分布式鎖源碼分析
使用分布式鎖必須要考慮的一些問題:
- 互斥性:在任意時(shí)刻,只能有一個(gè)進(jìn)程持有鎖。
- 防死鎖:即使有一個(gè)進(jìn)程在持有鎖的期間崩潰而未能主動(dòng)釋放鎖,要有其他方式去釋放鎖從而保證其他進(jìn)程能獲取到鎖。
- 不能釋放別人的鎖:加鎖和解鎖的必須是同一個(gè)進(jìn)程。
- 鎖的續(xù)期問題:業(yè)務(wù)執(zhí)行時(shí)間超過鎖的過期時(shí)間時(shí),需要提前給鎖的續(xù)期。
Redisson 是 Redis 官方推薦分布式鎖實(shí)現(xiàn)方案,它采用 Watch Dog機(jī)制能夠很好的解決鎖續(xù)期的問題。
執(zhí)行 lua腳本保證了多條命令執(zhí)行的原子性操作。
帶著上面分布式鎖的一些問題查看源碼。
1、獲取分布式鎖對(duì)象
簡單了解一下。
1.1 創(chuàng)建 RedissonClient
我們?cè)谂渲妙愔型ㄟ^ Redisson.create(config)
方法創(chuàng)建了 RedissonClient對(duì)象,并注入到 IOC容器中。
1.2 獲取分布式鎖對(duì)象
使用 Redisson 客戶端來 獲取分布式鎖對(duì)象。
2、加鎖代碼
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { //線程id long threadId = Thread.currentThread().getId(); // 1.嘗試獲取鎖 Long ttl = this.tryAcquire(-1L, leaseTime, unit, threadId); if (ttl != null) { RFuture<RedissonLockEntry> future = this.subscribe(threadId); if (interruptibly) { this.commandExecutor.syncSubscriptionInterrupted(future); } else { this.commandExecutor.syncSubscription(future); } try { //2.死循環(huán),反復(fù)去調(diào)用tryAcquire嘗試獲取鎖 while(true) { // 再次嘗試獲取鎖 ttl = this.tryAcquire(-1L, leaseTime, unit, threadId); // ttl為null時(shí)表示別的線程已經(jīng)unlock了,自己加鎖成功 if (ttl == null) { return; } // 3.鎖互斥:通過 JDK的信號(hào)量 Semaphore來阻塞線程 if (ttl >= 0L) { try { ((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException var13) { if (interruptibly) { throw var13; } ((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else if (interruptibly) { ((RedissonLockEntry)future.getNow()).getLatch().acquire(); } else { ((RedissonLockEntry)future.getNow()).getLatch().acquireUninterruptibly(); } } } finally { // 4.無論是否獲得鎖,都要取消訂閱解鎖消息 this.unsubscribe(future, threadId); } } }
2.1 異步加鎖機(jī)制
查看 tryAcquire()
加鎖方法。
通過源碼,看到加鎖其實(shí)是通過一段 lua 腳本實(shí)現(xiàn)的,如下:
if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end ; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end ; return redis.call('pttl', KEYS[1]);
- KEYS[1] 代表的是你加鎖的 key。
- ARGV[1] 代表的是鎖 key 的默認(rèn)生存時(shí)間,默認(rèn) 30 秒。
- ARGV[2] 代表的是加鎖的客戶端的線程 ID。通過
getLockName
方法組裝了一下。 - ARGV[2] 后面的 1:為了支持可重入鎖做的計(jì)數(shù)統(tǒng)計(jì)。
Redisson 實(shí)現(xiàn)分布式鎖的共享資源的存儲(chǔ)結(jié)構(gòu)是 hash數(shù)據(jù)結(jié)構(gòu):
key 是鎖的名稱,field 是客戶端 ID,value 是該客戶端加鎖(可重入)的次數(shù)。
假設(shè)此時(shí),客戶端 1 來嘗試加鎖,查看加鎖的 lua 腳本:
第一段 if 判斷語句,如果你要加鎖的那個(gè)鎖 key 不存在的話,進(jìn)行加鎖。此時(shí)鎖 key不存在,向Redis中設(shè)置一個(gè) hash 結(jié)構(gòu)的數(shù)據(jù),則客戶端 1加鎖成功,返回 null。
2.1.1 鎖的續(xù)期機(jī)制
客戶端 1 加鎖的那個(gè)鎖 key 默認(rèn)生存時(shí)間才 30 秒,如果超過了 30 秒,客戶端 1 還想一直持有這把鎖,就需要提前進(jìn)行鎖的續(xù)期操作。
Redisson 提供了一個(gè) Watch dog 機(jī)制
來解決鎖的續(xù)期問題, 只要客戶端 1 一旦加鎖成功,就會(huì)啟動(dòng)一個(gè) Watch Dog。
進(jìn)入 scheduleExpirationRenewal
方法,重點(diǎn)查看 renewExpiration方法。
鎖續(xù)期的 lua 腳本如下:
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end ; return 0;
從源碼我們看到 leaseTime 必須是 -1 才會(huì)開啟 Watch Dog 機(jī)制,我們發(fā)現(xiàn):
- 如果想開啟 Watch Dog 機(jī)制必須使用默認(rèn)的加鎖時(shí)間為 30s。
- 如果自己自定義時(shí)間,即使用 tryLock,鎖并不會(huì)延長,不會(huì)觸發(fā)Watch Dog 機(jī)制。
Watch Dog 機(jī)制其實(shí)就是一個(gè)后臺(tái)定時(shí)任務(wù)線程,獲取鎖成功之后,會(huì)將持有鎖的線程放入到一個(gè) RedissonLock.EXPIRATION_RENEWAL_MAP里面,
然后每隔 10 秒 (internalLockLeaseTime / 3) 檢查一下,如果客戶端 1 還持有鎖 key(判斷客戶端是否還持有 key,
其實(shí)就是遍歷 EXPIRATION_RENEWAL_MAP 里面線程 id 然后根據(jù)線程 id 去 Redis 中查,
如果存在就會(huì)延長 key 的時(shí)間),那么就會(huì)不斷的延長鎖 key 的生存時(shí)間。
如果服務(wù)宕機(jī)了,Watch Dog 機(jī)制線程也就沒有了,此時(shí)就不會(huì)延長 key 的過期時(shí)間,到了 30s 之后就會(huì)自動(dòng)過期了,其他線程就可以獲取到鎖。
2.1.2 可重入加鎖機(jī)制
Redisson 也是支持可重入鎖的,比如:客戶端 1 加鎖代碼:
@Override public void demo() { RLock lock = redissonSingle.getLock("myLock"); try { lock.lock(); // TODO 執(zhí)行業(yè)務(wù) //鎖重入 lock.lock(); } catch (Exception e) { e.printStackTrace(); } finally { // 釋放鎖 lock.unlock(); lock.unlock(); } }
此時(shí),如果客戶端 1 又來嘗試加鎖,繼續(xù)分析加鎖的 lua 腳本。
- 首先,第一個(gè) if 判斷,你要加鎖的那個(gè)鎖 key 已經(jīng)存在了。
- 然后,第二個(gè) if 判斷,判斷一下,加鎖的那個(gè)鎖 key的 hash 數(shù)據(jù)結(jié)構(gòu)中,是否包含客戶端 1 的 ID,
- 此時(shí)數(shù)據(jù)結(jié)構(gòu)的是客戶端 1 的 ID,即包含客戶端 1的 ID,然后就執(zhí)行行可重入鎖的命令,將 hash 結(jié)構(gòu)的 value數(shù)據(jù) + 1,返回 null。
2.2 鎖互斥機(jī)制
上面客戶端 1加鎖成功,此時(shí),如果客戶端 2 來嘗試加鎖,繼續(xù)分析加鎖的 lua 腳本:
- 首先,第一個(gè) if 判斷,你要加鎖的那個(gè)鎖 key 已經(jīng)存在了。
- 然后,第二個(gè) if 判斷,判斷一下,加鎖的那個(gè)鎖 key的 hash 數(shù)據(jù)結(jié)構(gòu)中,是否包含客戶端 2 的 ID,如果包含就是執(zhí)行可重入鎖的賦值,此時(shí) hash數(shù)據(jù)結(jié)構(gòu)是客戶端 1 的 ID,不包含客戶端 2的 ID,所以,返回加鎖的那個(gè)鎖 key的剩余存活時(shí)間。
接著查看 lock方法中的 死循環(huán)部分。
流程大致如下:
- 嘗試獲取鎖,返回 null 則說明加鎖成功,返回一個(gè)ttl,則說明已經(jīng)存在該鎖,ttl 為鎖的剩余存活時(shí)間。
- 如果此時(shí)客戶端 2 進(jìn)程獲取鎖失敗,那么使用客戶端 2 的線程 id,通過 Redis 的 channel 訂閱鎖釋放的事件。
- 進(jìn)入死循環(huán)中,嘗試重新獲取鎖。
- 如果在重試中拿到了鎖,則直接返回。
- 如果鎖當(dāng)前還是被占用的,那么等待釋放鎖的消息。通過使用了 JDK 的信號(hào)量 Semaphore 來阻塞線程,當(dāng) ttl 為鎖的剩余存活時(shí)間為0后,信號(hào)量的 release() 方法會(huì)被調(diào)用,此時(shí)被信號(hào)量阻塞的等待隊(duì)列中的一個(gè)線程就可以繼續(xù)嘗試獲取鎖了。
注意:
當(dāng)鎖正在被占用時(shí),等待獲取鎖的進(jìn)程并不是真正通過一個(gè) while(true) 死循環(huán)去獲取鎖(占 CPU資源),而時(shí)使用 JDK 的信號(hào)量 Semaphore 來阻塞線程(間斷性的不斷嘗試獲取鎖),是會(huì)釋放 CPU資源的。
3、鎖釋放代碼
public RFuture<Void> unlockAsync(long threadId) { RPromise<Void> result = new RedissonPromise(); // 1. 異步釋放鎖 RFuture<Boolean> future = this.unlockInnerAsync(threadId); // 2. 取消 Watch Dog 機(jī)制 future.onComplete((opStatus, e) -> { this.cancelExpirationRenewal(threadId); if (e != null) { result.tryFailure(e); } else if (opStatus == null) { IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId); result.tryFailure(cause); } else { result.trySuccess((Object)null); } }); return result; }
3.1 異步釋放鎖機(jī)制
查看unlockInnerAsync方法。
釋放鎖也是執(zhí)行的 lua 腳本:
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil; end ; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end ; return nil;
首先,第一段 if 判斷語句,判斷 key 是否存在的話,進(jìn)行加鎖。此時(shí)鎖 key不存在,則客戶端 1加鎖成功,向Redis中設(shè)置一個(gè) hash 結(jié)構(gòu)的數(shù)據(jù)。返回 null。
然后,第二個(gè) if 判斷,判斷一下該客戶端對(duì)應(yīng)的鎖的 hash 結(jié)構(gòu)的 value 值是否遞減為 0,
- 如果遞減不為 0,則重入鎖的解鎖,返回0。
- 如果遞減為 0,則進(jìn)行刪除,返回1。
3.2 取消 Watch Dog機(jī)制
查看 cancelExpirationRenewal方法。
取消 Watch Dog 機(jī)制,即將 RedissonLock.EXPIRATION_RENEWAL_MAP 里面的線程 id 刪除。
3.3 通知阻塞等待的進(jìn)程
利用 Redis 的發(fā)布訂閱機(jī)制,廣播釋放鎖的消息,通知阻塞等待的進(jìn)程(向通道名為 redisson_lock__channel publish 一條 UNLOCK_MESSAGE 信息)。
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Java?GUI實(shí)現(xiàn)學(xué)生成績管理系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了Java?GUI實(shí)現(xiàn)學(xué)生成績管理系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-01-01使用@JsonFormat和@DateTimeFormat對(duì)Date格式化操作
這篇文章主要介紹了使用@JsonFormat和@DateTimeFormat對(duì)Date格式化操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08SpringCloud @FeignClient注入Spring容器原理分析
本文詳細(xì)分析了Spring Boot中@FeignClient注解的掃描和注入過程,重點(diǎn)探討了@EnableFeignClients注解的工作原理,通過源碼分析,揭示了@EnableFeignClients如何通過@Import注解和FeignClientsRegistrar類實(shí)現(xiàn)bean定義的加載2024-12-12spring對(duì)JDBC和orm的支持實(shí)例詳解
這篇文章主要介紹了spring對(duì)JDBC和orm的支持實(shí)例詳解,需要的朋友可以參考下2017-09-09解決rocketmq-spring-boot-starter導(dǎo)致的多消費(fèi)者實(shí)例重復(fù)消費(fèi)問題
這篇文章主要介紹了解決rocketmq-spring-boot-starter導(dǎo)致的多消費(fèi)者實(shí)例重復(fù)消費(fèi)問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-06-06JAVA匿名內(nèi)部類(Anonymous Classes)的具體使用
本文主要介紹了JAVA匿名內(nèi)部類,匿名內(nèi)部類在我們JAVA程序員的日常工作中經(jīng)常要用到,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-08-08java 數(shù)值類型分秒時(shí)間格式化的實(shí)例代碼
這篇文章主要介紹了java 數(shù)值類型分秒時(shí)間格式化的實(shí)例代碼的相關(guān)資料,將秒或分鐘的值轉(zhuǎn)換為xx天xx小時(shí)xx分鐘xx秒 如果 “xx” 為0 自動(dòng)缺省,需要的朋友可以參考下2017-07-07Spring時(shí)間戳(日期)格式轉(zhuǎn)換方式
這篇文章主要介紹了Spring時(shí)間戳(日期)格式轉(zhuǎn)換方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2025-03-03Quarkus中實(shí)現(xiàn)Resteasy的文件上傳下載操作
這篇文章主要為大家介紹了Quarkus中實(shí)現(xiàn)Resteasy的文件上傳下載的操作過程步驟,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步2022-02-02