欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Redisson分布式鎖之加解鎖詳解

 更新時(shí)間:2023年03月16日 09:16:45   作者:宮三公子  
這篇文章主要為大家介紹了Redisson分布式鎖加解鎖的詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

引言

2023的金三銀四來的沒想象中那么激烈,一個(gè)朋友前段時(shí)間投了幾十家,多數(shù)石沉大海,好不容易等來面試機(jī)會(huì),就恰好被問道項(xiàng)目中關(guān)于分布式鎖的應(yīng)用,后涉及Redisson實(shí)現(xiàn)分布式鎖的原理,答不上來。

鎖的可重入性

我們都知道,Java中synchronized和lock都支持可重入,synchronized的鎖關(guān)聯(lián)一個(gè)線程持有者和一個(gè)計(jì)數(shù)器。當(dāng)一個(gè)線程請(qǐng)求成功后,JVM會(huì)記下持有鎖的線程,并將計(jì)數(shù)器計(jì)為1。此時(shí)其他線程請(qǐng)求該鎖,則必須等待。而該持有鎖的線程如果再次請(qǐng)求這個(gè)鎖,就可以再次拿到這個(gè)鎖,同時(shí)計(jì)數(shù)器會(huì)遞增。當(dāng)線程退出一個(gè)synchronized方法/塊時(shí),計(jì)數(shù)器會(huì)遞減,如果計(jì)數(shù)器為0則釋放該鎖;在ReentrantLock中,底層的 AQS 對(duì)應(yīng)的state 同步狀態(tài)值表示線程獲取該鎖的可重入次數(shù),通過CAS方式進(jìn)行設(shè)置,在默認(rèn)情況下,state的值為0 表示當(dāng)前鎖沒有被任何線程持有,原理類似。所以如果想要實(shí)現(xiàn)可重入性,可能須有一個(gè)計(jì)數(shù)器來控制重入次數(shù),實(shí)際Redisson確實(shí)是這么做的。

好的我們通過Redisson客戶端進(jìn)行設(shè)置,并循環(huán)3次,模擬鎖重入:000

for(int i = 0; i < 3; i++) {      
    RedissonLockUtil.tryLock("distributed:lock:distribute_key", TimeUnit.SECONDS, 20, 100); 
 }

連接Redis客戶端進(jìn)行查看:

可以看到,我們?cè)O(shè)置的分布式鎖是存在一個(gè)hash結(jié)構(gòu)中,value看起來是循環(huán)的次數(shù)3,key就不怎么認(rèn)識(shí)了,那這個(gè)key是怎么設(shè)置進(jìn)去的呢,另外為什么要設(shè)置成為Hash類型呢?

加鎖

我們先來看看普通的分布式鎖的上鎖流程:

說明:

  • 客戶端在進(jìn)行加鎖時(shí),會(huì)校驗(yàn)如果業(yè)務(wù)上沒有設(shè)置持有鎖時(shí)長(zhǎng)leaseTime,會(huì)啟動(dòng)看門狗來每隔10s進(jìn)行續(xù)命,否則就直接以leaseTime作為持有的時(shí)長(zhǎng);
  • 并發(fā)場(chǎng)景下,如果客戶端1鎖還未釋放,客戶端2嘗試獲取,加鎖必然失敗,然后會(huì)通過發(fā)布訂閱模式來訂閱Key的釋放通知,并繼續(xù)進(jìn)入后續(xù)的搶鎖流程。
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
      long time = unit.toMillis(waitTime);
      long current = System.currentTimeMillis();
      long threadId = Thread.currentThread().getId();
      Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
      if (ttl == null) {
         return true;
      } else {
         // 訂閱分布式Key對(duì)應(yīng)的消息,監(jiān)聽其它鎖持有者釋放,鎖沒有釋放的時(shí)候則會(huì)等待,直到鎖釋放的時(shí)候會(huì)執(zhí)行下面的while循環(huán)
         CompletableFuture subscribeFuture = this.subscribe(threadId);
         subscribeFuture.get(time, TimeUnit.MILLISECONDS);
         try {
            do {
               // 嘗試獲取鎖
               ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
               // 競(jìng)爭(zhēng)獲取鎖成功,退出循環(huán),不再競(jìng)爭(zhēng)。
               if (ttl == null) {
                  return true;
               }
               // 利用信號(hào)量機(jī)制阻塞當(dāng)前線程相應(yīng)時(shí)間,之后再重新獲取鎖
               if (ttl >= 0L && ttl < time) {
                  ((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
               } else {
                  ((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
               }
               time -= System.currentTimeMillis() - currentTime;
            } while(time > 0L);
         } finally {
            // 競(jìng)爭(zhēng)鎖成功后,取消訂閱該線程Id事件
            this.unsubscribe((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture), threadId);
         }
      }
   }
}
RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
        // 如果設(shè)置了持有鎖的時(shí)長(zhǎng),直接進(jìn)行嘗試加鎖操作
         if (leaseTime != -1L) {
            return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        } else {
            // 未設(shè)置加鎖時(shí)長(zhǎng),在加鎖成功后,啟動(dòng)續(xù)期任務(wù),初始默認(rèn)持有鎖時(shí)間是30s
            RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
            ttlRemainingFuture.addListener(new FutureListener<Long>() {
                public void operationComplete(Future<Long> future) throws Exception {
                    if (future.isSuccess()) {
                        Long ttlRemaining = (Long)future.getNow();
                        if (ttlRemaining == null) {
                            RedissonLock.this.scheduleExpirationRenewal(threadId);
                        }
                    }
                }
            });
            return ttlRemainingFuture;
        }
    }

我們都知道Redis執(zhí)行Lua腳本具有原子性,所以在嘗試加鎖的下層,Redis主要執(zhí)行了一段復(fù)雜的lua腳本:

-- 不存在該key時(shí)
if (redis.call('exists', KEYS[1]) == 0) then
      -- 新增該鎖并且hash中該線程id對(duì)應(yīng)的count置1
redis.call('hincrby', KEYS[1], ARGV[2], 1);
-- 設(shè)置過期時(shí)間
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
-- 存在該key 并且 hash中線程id的key也存在
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
      -- 線程重入次數(shù)++
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
return redis.call('pttl', KEYS[1]);

參數(shù)說明:

KEYS[1]:對(duì)應(yīng)我們?cè)O(shè)置的分布式key,即:distributed:lock:distribute_key

ARGV[1]:業(yè)務(wù)自定義的加鎖時(shí)長(zhǎng)或者默認(rèn)的30s;

ARGV[2]: 具體的客戶端初始化連接UUID+線程ID: 9d8f0907-1165-47d2-8983-1e130b07ad0c:1

我們從上面的腳本中可以看出核心邏輯其實(shí)不難:

  • 如果分布式鎖Key未被任何端持有,直接根據(jù)“客戶端連接ID+線程ID” 進(jìn)行初始化設(shè)置,并設(shè)置重入次數(shù)為1,并設(shè)置Key的過期時(shí)間;
  • 否則重入次數(shù)+1,并重置過期時(shí)間;

鎖續(xù)命

接下來看看scheduleExpirationRenewal續(xù)命是怎么做的呢?

private void scheduleExpirationRenewal(final long threadId) {
   if (!expirationRenewalMap.containsKey(this.getEntryName())) {
      Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
         public void run(Timeout timeout) throws Exception {
            // 執(zhí)行續(xù)命操作
            RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
            future.addListener(new FutureListener<Boolean>() {
               public void operationComplete(Future<Boolean> future) throws Exception {
                  RedissonLock.expirationRenewalMap.remove(RedissonLock.this.getEntryName());
                          ...
                  // 續(xù)命成功,繼續(xù)
                  if ((Boolean)future.getNow()) {
                     RedissonLock.this.scheduleExpirationRenewal(threadId);
                  }
               }
            });
         }
      }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
   }
}

Tip小知識(shí)點(diǎn):

  • 續(xù)期是用的什么定時(shí)任務(wù)執(zhí)行的?
    Redisson用netty的HashedWheelTimer做命令重試機(jī)制,原因在于一條redis命令的執(zhí)行不論成功或者失敗耗時(shí)都很短,而HashedWheelTimer是單線程的,系統(tǒng)性能開銷小。

而在上面的renewExpirationAsync中續(xù)命操作的執(zhí)行核心Lua腳本要做的事情也非常的簡(jiǎn)單,就是給這個(gè)Key的過期時(shí)間重新設(shè)置為指定的30s.

if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return 1;
end;
return 0;

釋放鎖

釋放鎖主要是除了解鎖本省,另外還要考慮到如果存在續(xù)期的情況,要將續(xù)期任務(wù)刪除:

public RFuture<Void> unlockAsync(long threadId) {
   // 解鎖
   RFuture<Boolean> future = this.unlockInnerAsync(threadId);
   CompletionStage<Void> f = future.handle((opStatus, e) -> {
      // 解除續(xù)期
      this.cancelExpirationRenewal(threadId);
      ...
   });
   return new CompletableFutureWrapper(f);
}

在unlockInnerAsync內(nèi)部,Redisson釋放鎖其實(shí)核心也是執(zhí)行了如下一段核心Lua腳本:

    // 校驗(yàn)是否存在
    if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
      return nil;
      end;
    // 獲取加鎖次數(shù),校驗(yàn)是否為重入鎖
    local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
    // 如果為重入鎖,重置過期時(shí)間,鎖本身不釋放
    if (counter > 0) then
      redis.call('pexpire', KEYS[1], ARGV[2]);
      return 0;
   // 刪除Key
    else redis.call('del', KEYS[1]);
      // 通知阻塞的客戶端可以搶鎖啦
      redis.call('publish', KEYS[2], ARGV[1]);
      return 1;
      end;
      return nil;

其中:

KEYS[1]: 分布式鎖
KEYS[2]: redisson_lock_channel:{分布式鎖} 發(fā)布訂閱消息的管道名稱
ARGV[1]: 發(fā)布的消息內(nèi)容
ARGV[2]: 鎖的過期時(shí)間
ARGV[3]: 線程ID標(biāo)識(shí)名稱

其它問題

  • 紅鎖這么火,但真的靠譜么?
  • Redisson公平鎖是什么情況?

以上就是Redisson分布式鎖第一彈-加解鎖的詳細(xì)內(nèi)容,更多關(guān)于Redisson分布式鎖加解鎖的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Redis Cluster集群數(shù)據(jù)分片機(jī)制原理

    Redis Cluster集群數(shù)據(jù)分片機(jī)制原理

    這篇文章主要介紹了Redis Cluster集群數(shù)據(jù)分片機(jī)制原理,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-04-04
  • Redis的BitMap使用操作命令

    Redis的BitMap使用操作命令

    Redis 為我們提供了位圖這一數(shù)據(jù)結(jié)構(gòu),每個(gè)用戶每天的登錄記錄只占據(jù)一位,365天就是365位,僅僅需要46字節(jié)就可存儲(chǔ),極大地節(jié)約了存儲(chǔ)空間,這篇文章主要介紹了Redis的BitMap使用操作命令,需要的朋友可以參考下
    2023-10-10
  • 虛擬機(jī)下的Redis無法訪問報(bào)錯(cuò)500解決方法

    虛擬機(jī)下的Redis無法訪問報(bào)錯(cuò)500解決方法

    這篇文章主要介紹了虛擬機(jī)下的Redis無法訪問,報(bào)錯(cuò)500解決方法,由于我的redis是在虛擬機(jī)下安裝的,無法訪問redis的原因是因?yàn)樘摂M機(jī)的ip地址和主機(jī)不同,文中通過圖文結(jié)合給出了詳細(xì)的解決方法,需要的朋友可以參考下
    2024-02-02
  • Redis分布式鎖Redlock的實(shí)現(xiàn)

    Redis分布式鎖Redlock的實(shí)現(xiàn)

    本文主要介紹了Redis分布式鎖Redlock的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2021-08-08
  • 使用redis實(shí)現(xiàn)高效分頁的項(xiàng)目實(shí)踐

    使用redis實(shí)現(xiàn)高效分頁的項(xiàng)目實(shí)踐

    在很多場(chǎng)景下,我們需要對(duì)大量的數(shù)據(jù)進(jìn)行分頁展示,本文主要介紹了使用redis實(shí)現(xiàn)高效分頁的項(xiàng)目實(shí)踐,具有一定的參考價(jià)值,感興趣的可以了解一下
    2024-02-02
  • 如何使用?redis?消息隊(duì)列完成秒殺過期訂單處理操作(二)

    如何使用?redis?消息隊(duì)列完成秒殺過期訂單處理操作(二)

    這篇文章主要介紹了如何使用?redis?消息隊(duì)列完成秒殺過期訂單處理操作,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),感興趣的朋友跟隨小編一起看看吧
    2024-07-07
  • Redis的5種數(shù)據(jù)類型與常用命令講解

    Redis的5種數(shù)據(jù)類型與常用命令講解

    今天小編就為大家分享一篇關(guān)于Redis的5種數(shù)據(jù)類型與常用命令講解,小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來看看吧
    2019-03-03
  • 淺談一下如何保證Redis緩存與數(shù)據(jù)庫的一致性

    淺談一下如何保證Redis緩存與數(shù)據(jù)庫的一致性

    這篇文章主要介紹了一下如何保證Redis緩存與數(shù)據(jù)庫的一致性,今天這篇文章就帶你詳細(xì)了解一下四種同步策略,需要的朋友可以參考下
    2023-03-03
  • redis獲取所有key的方法

    redis獲取所有key的方法

    本文主要介紹了redis獲取所有key的方法,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-04-04
  • 解決linux下redis數(shù)據(jù)庫overcommit_memory問題

    解決linux下redis數(shù)據(jù)庫overcommit_memory問題

    這篇文章介紹了解決linux下redis數(shù)據(jù)庫overcommit_memory問題的方法,對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2022-02-02

最新評(píng)論