Redisson之分布式鎖原理全面分析
Redisson是一個 Redis的開源客戶端,也提供了分布式鎖的實現(xiàn)。
Redisson官網(wǎng):
Redisson 分布式鎖使用
Redisson分布式鎖使用起來還是蠻簡單的。
1、添加 Redisson 配置類
引入依賴:
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.16.5</version> </dependency>
創(chuàng)建 Redisson 配置類,注入 RedissonClient客戶端。
@Configuration public class RedissonConfig { @Value("${spring.redis.host}") private String host; @Value("${spring.redis.port}") private String port; @Value("${spring.redis.password}") private String password; @Bean public RedissonClient getRedisson() { Config config = new Config(); /** * reids配置,支持單機、主從、哨兵、集群等配置。這里使用單機配置 */ config.useSingleServer().setAddress("redis://" + host + ":" + port).setPassword(password); return Redisson.create(config); } }
2、使用 Redisson分布式鎖
代碼如下:
@Autowired private RedisTemplate<String, String> redisTemplate; @Autowired private RedissonClient redissonClient; public void disLockDemo(long productId) { String lockKey = "DISTRIBUTE_LOCK:redissonLock:product_" + productId; //設置鎖定資源名稱,并獲取分布式鎖對象。 RLock redissonLock = redissonClient.getLock(lockKey); //1.加鎖 redissonLock.lock(); //boolean isLock = disLock.tryLock(500, 15000, TimeUnit.MILLISECONDS); try { //2.執(zhí)行業(yè)務代碼 // TODO //if (isLock) { // TODO //} } finally { //3.解鎖 redissonLock.unlock(); } }
Redisson 分布式鎖源碼分析
使用分布式鎖必須要考慮的一些問題:
- 互斥性:在任意時刻,只能有一個進程持有鎖。
- 防死鎖:即使有一個進程在持有鎖的期間崩潰而未能主動釋放鎖,要有其他方式去釋放鎖從而保證其他進程能獲取到鎖。
- 不能釋放別人的鎖:加鎖和解鎖的必須是同一個進程。
- 鎖的續(xù)期問題:業(yè)務執(zhí)行時間超過鎖的過期時間時,需要提前給鎖的續(xù)期。
Redisson 是 Redis 官方推薦分布式鎖實現(xiàn)方案,它采用 Watch Dog機制能夠很好的解決鎖續(xù)期的問題。
執(zhí)行 lua腳本保證了多條命令執(zhí)行的原子性操作。
帶著上面分布式鎖的一些問題查看源碼。
1、獲取分布式鎖對象
簡單了解一下。
1.1 創(chuàng)建 RedissonClient
我們在配置類中通過 Redisson.create(config)
方法創(chuàng)建了 RedissonClient對象,并注入到 IOC容器中。
1.2 獲取分布式鎖對象
使用 Redisson 客戶端來 獲取分布式鎖對象。
2、加鎖代碼
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { //線程id long threadId = Thread.currentThread().getId(); // 1.嘗試獲取鎖 Long ttl = this.tryAcquire(-1L, leaseTime, unit, threadId); if (ttl != null) { RFuture<RedissonLockEntry> future = this.subscribe(threadId); if (interruptibly) { this.commandExecutor.syncSubscriptionInterrupted(future); } else { this.commandExecutor.syncSubscription(future); } try { //2.死循環(huán),反復去調用tryAcquire嘗試獲取鎖 while(true) { // 再次嘗試獲取鎖 ttl = this.tryAcquire(-1L, leaseTime, unit, threadId); // ttl為null時表示別的線程已經(jīng)unlock了,自己加鎖成功 if (ttl == null) { return; } // 3.鎖互斥:通過 JDK的信號量 Semaphore來阻塞線程 if (ttl >= 0L) { try { ((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException var13) { if (interruptibly) { throw var13; } ((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else if (interruptibly) { ((RedissonLockEntry)future.getNow()).getLatch().acquire(); } else { ((RedissonLockEntry)future.getNow()).getLatch().acquireUninterruptibly(); } } } finally { // 4.無論是否獲得鎖,都要取消訂閱解鎖消息 this.unsubscribe(future, threadId); } } }
2.1 異步加鎖機制
查看 tryAcquire()
加鎖方法。
通過源碼,看到加鎖其實是通過一段 lua 腳本實現(xiàn)的,如下:
if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end ; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end ; return redis.call('pttl', KEYS[1]);
- KEYS[1] 代表的是你加鎖的 key。
- ARGV[1] 代表的是鎖 key 的默認生存時間,默認 30 秒。
- ARGV[2] 代表的是加鎖的客戶端的線程 ID。通過
getLockName
方法組裝了一下。 - ARGV[2] 后面的 1:為了支持可重入鎖做的計數(shù)統(tǒng)計。
Redisson 實現(xiàn)分布式鎖的共享資源的存儲結構是 hash數(shù)據(jù)結構:
key 是鎖的名稱,field 是客戶端 ID,value 是該客戶端加鎖(可重入)的次數(shù)。
假設此時,客戶端 1 來嘗試加鎖,查看加鎖的 lua 腳本:
第一段 if 判斷語句,如果你要加鎖的那個鎖 key 不存在的話,進行加鎖。此時鎖 key不存在,向Redis中設置一個 hash 結構的數(shù)據(jù),則客戶端 1加鎖成功,返回 null。
2.1.1 鎖的續(xù)期機制
客戶端 1 加鎖的那個鎖 key 默認生存時間才 30 秒,如果超過了 30 秒,客戶端 1 還想一直持有這把鎖,就需要提前進行鎖的續(xù)期操作。
Redisson 提供了一個 Watch dog 機制
來解決鎖的續(xù)期問題, 只要客戶端 1 一旦加鎖成功,就會啟動一個 Watch Dog。
進入 scheduleExpirationRenewal
方法,重點查看 renewExpiration方法。
鎖續(xù)期的 lua 腳本如下:
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end ; return 0;
從源碼我們看到 leaseTime 必須是 -1 才會開啟 Watch Dog 機制,我們發(fā)現(xiàn):
- 如果想開啟 Watch Dog 機制必須使用默認的加鎖時間為 30s。
- 如果自己自定義時間,即使用 tryLock,鎖并不會延長,不會觸發(fā)Watch Dog 機制。
Watch Dog 機制其實就是一個后臺定時任務線程,獲取鎖成功之后,會將持有鎖的線程放入到一個 RedissonLock.EXPIRATION_RENEWAL_MAP里面,
然后每隔 10 秒 (internalLockLeaseTime / 3) 檢查一下,如果客戶端 1 還持有鎖 key(判斷客戶端是否還持有 key,
其實就是遍歷 EXPIRATION_RENEWAL_MAP 里面線程 id 然后根據(jù)線程 id 去 Redis 中查,
如果存在就會延長 key 的時間),那么就會不斷的延長鎖 key 的生存時間。
如果服務宕機了,Watch Dog 機制線程也就沒有了,此時就不會延長 key 的過期時間,到了 30s 之后就會自動過期了,其他線程就可以獲取到鎖。
2.1.2 可重入加鎖機制
Redisson 也是支持可重入鎖的,比如:客戶端 1 加鎖代碼:
@Override public void demo() { RLock lock = redissonSingle.getLock("myLock"); try { lock.lock(); // TODO 執(zhí)行業(yè)務 //鎖重入 lock.lock(); } catch (Exception e) { e.printStackTrace(); } finally { // 釋放鎖 lock.unlock(); lock.unlock(); } }
此時,如果客戶端 1 又來嘗試加鎖,繼續(xù)分析加鎖的 lua 腳本。
- 首先,第一個 if 判斷,你要加鎖的那個鎖 key 已經(jīng)存在了。
- 然后,第二個 if 判斷,判斷一下,加鎖的那個鎖 key的 hash 數(shù)據(jù)結構中,是否包含客戶端 1 的 ID,
- 此時數(shù)據(jù)結構的是客戶端 1 的 ID,即包含客戶端 1的 ID,然后就執(zhí)行行可重入鎖的命令,將 hash 結構的 value數(shù)據(jù) + 1,返回 null。
2.2 鎖互斥機制
上面客戶端 1加鎖成功,此時,如果客戶端 2 來嘗試加鎖,繼續(xù)分析加鎖的 lua 腳本:
- 首先,第一個 if 判斷,你要加鎖的那個鎖 key 已經(jīng)存在了。
- 然后,第二個 if 判斷,判斷一下,加鎖的那個鎖 key的 hash 數(shù)據(jù)結構中,是否包含客戶端 2 的 ID,如果包含就是執(zhí)行可重入鎖的賦值,此時 hash數(shù)據(jù)結構是客戶端 1 的 ID,不包含客戶端 2的 ID,所以,返回加鎖的那個鎖 key的剩余存活時間。
接著查看 lock方法中的 死循環(huán)部分。
流程大致如下:
- 嘗試獲取鎖,返回 null 則說明加鎖成功,返回一個ttl,則說明已經(jīng)存在該鎖,ttl 為鎖的剩余存活時間。
- 如果此時客戶端 2 進程獲取鎖失敗,那么使用客戶端 2 的線程 id,通過 Redis 的 channel 訂閱鎖釋放的事件。
- 進入死循環(huán)中,嘗試重新獲取鎖。
- 如果在重試中拿到了鎖,則直接返回。
- 如果鎖當前還是被占用的,那么等待釋放鎖的消息。通過使用了 JDK 的信號量 Semaphore 來阻塞線程,當 ttl 為鎖的剩余存活時間為0后,信號量的 release() 方法會被調用,此時被信號量阻塞的等待隊列中的一個線程就可以繼續(xù)嘗試獲取鎖了。
注意:
當鎖正在被占用時,等待獲取鎖的進程并不是真正通過一個 while(true) 死循環(huán)去獲取鎖(占 CPU資源),而時使用 JDK 的信號量 Semaphore 來阻塞線程(間斷性的不斷嘗試獲取鎖),是會釋放 CPU資源的。
3、鎖釋放代碼
public RFuture<Void> unlockAsync(long threadId) { RPromise<Void> result = new RedissonPromise(); // 1. 異步釋放鎖 RFuture<Boolean> future = this.unlockInnerAsync(threadId); // 2. 取消 Watch Dog 機制 future.onComplete((opStatus, e) -> { this.cancelExpirationRenewal(threadId); if (e != null) { result.tryFailure(e); } else if (opStatus == null) { IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId); result.tryFailure(cause); } else { result.trySuccess((Object)null); } }); return result; }
3.1 異步釋放鎖機制
查看unlockInnerAsync方法。
釋放鎖也是執(zhí)行的 lua 腳本:
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil; end ; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end ; return nil;
首先,第一段 if 判斷語句,判斷 key 是否存在的話,進行加鎖。此時鎖 key不存在,則客戶端 1加鎖成功,向Redis中設置一個 hash 結構的數(shù)據(jù)。返回 null。
然后,第二個 if 判斷,判斷一下該客戶端對應的鎖的 hash 結構的 value 值是否遞減為 0,
- 如果遞減不為 0,則重入鎖的解鎖,返回0。
- 如果遞減為 0,則進行刪除,返回1。
3.2 取消 Watch Dog機制
查看 cancelExpirationRenewal方法。
取消 Watch Dog 機制,即將 RedissonLock.EXPIRATION_RENEWAL_MAP 里面的線程 id 刪除。
3.3 通知阻塞等待的進程
利用 Redis 的發(fā)布訂閱機制,廣播釋放鎖的消息,通知阻塞等待的進程(向通道名為 redisson_lock__channel publish 一條 UNLOCK_MESSAGE 信息)。
總結
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關文章
Java?GUI實現(xiàn)學生成績管理系統(tǒng)
這篇文章主要為大家詳細介紹了Java?GUI實現(xiàn)學生成績管理系統(tǒng),文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-01-01使用@JsonFormat和@DateTimeFormat對Date格式化操作
這篇文章主要介紹了使用@JsonFormat和@DateTimeFormat對Date格式化操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-08-08SpringCloud @FeignClient注入Spring容器原理分析
本文詳細分析了Spring Boot中@FeignClient注解的掃描和注入過程,重點探討了@EnableFeignClients注解的工作原理,通過源碼分析,揭示了@EnableFeignClients如何通過@Import注解和FeignClientsRegistrar類實現(xiàn)bean定義的加載2024-12-12解決rocketmq-spring-boot-starter導致的多消費者實例重復消費問題
這篇文章主要介紹了解決rocketmq-spring-boot-starter導致的多消費者實例重復消費問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-06-06JAVA匿名內部類(Anonymous Classes)的具體使用
本文主要介紹了JAVA匿名內部類,匿名內部類在我們JAVA程序員的日常工作中經(jīng)常要用到,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-08-08Quarkus中實現(xiàn)Resteasy的文件上傳下載操作
這篇文章主要為大家介紹了Quarkus中實現(xiàn)Resteasy的文件上傳下載的操作過程步驟,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步2022-02-02