欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Redis之Redisson原理詳解

 更新時間:2023年06月15日 09:45:57   作者:愛吃牛肉的大老虎  
Redisson 顧名思義,Redis 的兒子,本質上還是 Redis 加鎖,不過是對 Redis 做了很多封裝,它不僅提供了一系列的分布式的 Java 常用對象,還提供了許多分布式服務,本文將詳細給大家介紹Redisson原理

1 Redisson

1.1 簡介

Redisson 顧名思義,Redis 的兒子,本質上還是 Redis 加鎖,不過是對 Redis 做了很多封裝,它不僅提供了一系列的分布式的 Java 常用對象,還提供了許多分布式服務。

1.2 與其他客戶端比較

RedissonJedis、Lettuce有什么區(qū)別?

  • Redisson和它倆的區(qū)別就像一個用鼠標操作圖形化界面,一個用命令行操作文件。Redisson是更高層的抽象,JedisLettuceRedis命令的封裝。
  • JedisRedis官方推出的用于通過Java連接Redis客戶端的一個工具包,提供了Redis的各種命令支持
  • Lettuce是一種可擴展的線程安全的 Redis 客戶端,通訊框架基于Netty,支持高級的 Redis 特性,比如哨兵,集群,管道,自動重新連接和Redis數(shù)據(jù)模型。Spring Boot 2.x 開始 Lettuce 已取代 Jedis 成為首選 Redis 的客戶端。
  • Redisson是架設在Redis基礎上,通訊基于Netty的綜合的、新型的中間件,企業(yè)級開發(fā)中使用Redis的最佳范本
  • JedisRedis命令封裝好,Lettuce則進一步有了更豐富的Api,也支持集群等模式。但是兩者只給了你操作Redis數(shù)據(jù)庫的腳手架,而Redisson則是基于RedisLuaNetty建立起了成熟的分布式解決方案,甚至redis官方都推薦的一種工具集

1.3 操作使用

1.3.1 pom.xml

在引入 Redisson 的依賴后,就可以直接進行調(diào)用:

<!-- 原生 -->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.13.4</version>
</dependency>
<!-- 或者 另一種Spring集成starter  -->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.13.6</version>
</dependency>

1.3.2 配置

@Configuration
public class RedissionConfig {
    @Value("${spring.redis.host}")
    private String redisHost;
    @Value("${spring.redis.password}")
    private String password;
    private int port = 6379;
    @Bean
    public RedissonClient getRedisson() {
        Config config = new Config();
        config.useSingleServer().
                setAddress("redis://" + redisHost + ":" + port).
                setPassword(password);
        config.setCodec(new JsonJacksonCodec());
        return Redisson.create(config);
    }
}

1.3.3 啟用分布式鎖

@Resource
private RedissonClient redissonClient;
RLock rLock = redissonClient.getLock(lockName);
try {
    boolean isLocked = rLock.tryLock(expireTime, TimeUnit.MILLISECONDS);
    if (isLocked) {
        // TODO
                }
    } catch (Exception e) {
            rLock.unlock();
    }

簡潔明了,只需要一個RLock,既然推薦Redisson,就往里面看看他是怎么實現(xiàn)的。

就是這么簡單,使用方法 jdk 的 ReentrantLock 差不多,并且也支持 ReadWriteLock(讀寫鎖)、Reentrant Lock(可重入鎖)、Fair Lock(公平鎖)、RedLock(紅鎖)等各種鎖,詳細可以參照redisson官方文檔來查看。

那么 Redisson 到底有哪些優(yōu)勢呢?鎖的自動續(xù)期(默認都是 30 秒),如果業(yè)務超長,運行期間會自動給鎖續(xù)上新的 30s,不用擔心業(yè)務執(zhí)行時間超長而鎖被自動刪掉。
加鎖的業(yè)務只要運行完成,就不會給當前續(xù)期,即便不手動解鎖,鎖默認在 30s 后刪除,不會造成死鎖問題。
前面也提到了鎖的自動續(xù)期,我們來看看 Redisson 是如何來實現(xiàn)的。

1.4 大致操作原理

我們一起來看下Redisson底層原理圖吧:

只要線程一加鎖成功,就會啟動一個watch dog看門狗,它是一個后臺線程,會每隔10秒檢查一下,如果線程一還持有鎖,那么就會不斷的延長鎖key的生存時間。因此,Redisson就是使用Redisson解決了鎖過期釋放,業(yè)務沒執(zhí)行完問題。

1.5 RLock

RLockRedisson分布式鎖的最核心接口,繼承了concurrent包的Lock接口和自己的RLockAsync接口,RLockAsync的返回值都是RFuture,是Redisson執(zhí)行異步實現(xiàn)的核心邏輯,也是Netty發(fā)揮的主要陣地。

1.5.1 RLock如何加鎖

RLock進入,找到RedissonLock類,找到 tryLock 方法再遞進到干活的tryAcquireOnceAsync 方法,這是加鎖的主要代碼(版本不一此處實現(xiàn)有差別,和最新3.15.x有一定出入,但是核心邏輯依然未變。此處以3.13.6為例)

private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        if (leaseTime != -1L) {
            return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        } else {
            RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
            ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
                if (e == null) {
                    if (ttlRemaining) {
                        this.scheduleExpirationRenewal(threadId);
                    }
                }
            });
            return ttlRemainingFuture;
        }
    }

此處出現(xiàn)leaseTime時間判斷的2個分支,實際上就是加鎖時是否設置過期時間,未設置過期時間(-1)時則會有watchDog 的鎖續(xù)約 (下文),一個注冊了加鎖事件的續(xù)約任務。我們先來看有過期時間tryLockInnerAsync 部分,

evalWriteAsync是eval命令執(zhí)行l(wèi)ua的入口

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        this.internalLockLeaseTime = unit.toMillis(leaseTime);
        return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
    }

這里揭開真面目,eval命令執(zhí)行Lua腳本的地方,此處的Lua腳本展開

-- 不存在該key時
if (redis.call('exists', KEYS[1]) == 0) then 
  -- 新增該鎖并且hash中該線程id對應的count置1
  redis.call('hincrby', KEYS[1], ARGV[2], 1); 
  -- 設置過期時間
  redis.call('pexpire', KEYS[1], ARGV[1]); 
  return nil; 
end; 
-- 存在該key 并且 hash中線程id的key也存在
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then 
  -- 線程重入次數(shù)++
  redis.call('hincrby', KEYS[1], ARGV[2], 1); 
  redis.call('pexpire', KEYS[1], ARGV[1]); 
  return nil; 
end; 
return redis.call('pttl', KEYS[1]);

redisson具體參數(shù)分析

// keyName
KEYS[1] = Collections.singletonList(this.getName())
// leaseTime
ARGV[1] = this.internalLockLeaseTime
// uuid+threadId組合的唯一值
ARGV[2] = this.getLockName(threadId)

總共3個參數(shù)完成了一段邏輯:

  • 判斷該鎖是否已經(jīng)有對應hash表存在,
  • 沒有對應的hash表:則sethash表中一個entry的key為鎖名稱,value為1,之后設置該hash表失效時間為leaseTime
  • 存在對應的hash表:則將該lockNamevalue執(zhí)行+1操作,也就是計算進入次數(shù),再設置失效時間leaseTime
  • 最后返回這把鎖的ttl剩余時間

也和上述自定義鎖沒有區(qū)別

既然如此,那解鎖的步驟也肯定有對應的-1操作,再看unlock方法,同樣查找方法名,一路到

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "...lua...腳本", Arrays.asList(this.getName(), this.getChannelName()), new Object[]{LockPubSub.unlockMessage, this.internalLockLeaseTime, this.getLockName(threadId)});
    }

拿出Lua部分

-- 不存在key
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then 
  return nil;
end;
-- 計數(shù)器 -1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); 
if (counter > 0) then 
  -- 過期時間重設
  redis.call('pexpire', KEYS[1], ARGV[2]); 
  return 0; 
else
  -- 刪除并發(fā)布解鎖消息
  redis.call('del', KEYS[1]); 
  redis.call('publish', KEYS[2], ARGV[1]); 
  return 1;
end; 
return nil;

該Lua KEYS有2個Arrays.asList(getName(),getChannelName())

  • name 鎖名稱
  • channelName,用于pubSub發(fā)布消息的channel名稱

ARGV變量有三個LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)

  • LockPubSub.UNLOCK_MESSAGE,channel發(fā)送消息的類別,此處解鎖為0
  • internalLockLeaseTime,watchDog配置的超時時間,默認為30s
  • lockName 這里的lockName指的是uuid和threadId組合的唯一值

步驟如下:

  • 如果該鎖不存在則返回nil
  • 如果該鎖存在則將其線程的hash key計數(shù)器-1
  • 計數(shù)器 counter>0,重置下失效時間,返回0;否則,刪除該鎖,發(fā)布解鎖消息unlockMessage,返回1;

其中unLock的時候使用到了Redis發(fā)布訂閱PubSub完成消息通知。
而訂閱的步驟就在RedissonLock的加鎖入口的lock方法里

long threadId = Thread.currentThread().getId();
Long ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
if (ttl != null) {
    // 訂閱
    RFuture<RedissonLockEntry> future = this.subscribe(threadId);
    if (interruptibly) {
        this.commandExecutor.syncSubscriptionInterrupted(future);
    } else {
        this.commandExecutor.syncSubscription(future);
    }
    // 省略

當鎖被其他線程占用時,通過監(jiān)聽鎖的釋放通知(在其他線程通過RedissonLock釋放鎖時,會通過發(fā)布訂閱pub/sub功能發(fā)起通知),等待鎖被其他線程釋放,也是為了避免自旋的一種常用效率手段

1.5.2 解鎖消息

為了一探究竟通知了什么,通知后又做了什么,進入LockPubSub。

這里只有一個明顯的監(jiān)聽方法onMessage,其訂閱和信號量的釋放都在父類PublishSubscribe,我們只關注監(jiān)聽事件的實際操作

protected void onMessage(RedissonLockEntry value, Long message) {
    Runnable runnableToExecute;
     if (message.equals(unlockMessage)) {
         // 從監(jiān)聽器隊列取監(jiān)聽線程執(zhí)行監(jiān)聽回調(diào)
         runnableToExecute = (Runnable)value.getListeners().poll();
         if (runnableToExecute != null) {
             runnableToExecute.run();
         }
         // getLatch()返回的是Semaphore,信號量,此處是釋放信號量
         // 釋放信號量后會喚醒等待的entry.getLatch().tryAcquire去再次嘗試申請鎖
         value.getLatch().release();
     } else if (message.equals(readUnlockMessage)) {
         while(true) {
             runnableToExecute = (Runnable)value.getListeners().poll();
             if (runnableToExecute == null) {
                 value.getLatch().release(value.getLatch().getQueueLength());
                 break;
             }
             runnableToExecute.run();
         }
     }
 }

發(fā)現(xiàn)一個是默認解鎖消息 ,一個是讀鎖解鎖消息 ,因為redisson是有提供讀寫鎖的,而讀寫鎖讀讀情況和讀寫、寫寫情況互斥情況不同,我們只看上面的默認解鎖消息unlockMessage分支

LockPubSub監(jiān)聽最終執(zhí)行了2件事

  • runnableToExecute.run() 執(zhí)行監(jiān)聽回調(diào)
  • value.getLatch().release(); 釋放信號量

Redisson通過LockPubSub 監(jiān)聽解鎖消息,執(zhí)行監(jiān)聽回調(diào)和釋放信號量通知等待線程可以重新?lián)屾i。
這時再回來看tryAcquireOnceAsync另一分支

private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        if (leaseTime != -1L) {
            return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        } else {
            RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
            ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
                if (e == null) {
                    if (ttlRemaining) {
                        this.scheduleExpirationRenewal(threadId);
                    }
                }
            });
            return ttlRemainingFuture;
        }
    }

可以看到,無超時時間時,在執(zhí)行加鎖操作后,還執(zhí)行了一段費解的邏輯

ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
                if (e == null) {
                    if (ttlRemaining) {
                        this.scheduleExpirationRenewal(threadId);
                    }
                }
            })                   }                 }             })

此處涉及到NettyFuture/Promise-Listener模型,Redisson中幾乎全部以這種方式通信(所以說Redisson是基于Netty通信機制實現(xiàn)的),理解這段邏輯可以試著先理解

在 Java 的 Future 中,業(yè)務邏輯為一個 Callable 或 Runnable 實現(xiàn)類,該類的 call()或 run() 執(zhí)行完畢意味著業(yè)務邏輯的完結,在 Promise 機制中,可以在業(yè)務邏輯中人工設置業(yè)務邏輯的成功與失敗,這樣更加方便的監(jiān)控自己的業(yè)務邏輯。

這塊代碼的表面意義就是,在執(zhí)行異步加鎖的操作后,加鎖成功則根據(jù)加鎖完成返回的ttl是否過期來確認是否執(zhí)行一段定時任務。
這段定時任務的就是watchDog的核心。

1.5.3 鎖續(xù)約

查看RedissonLock.this.scheduleExpirationRenewal(threadId)

private void scheduleExpirationRenewal(long threadId) {
        RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry();
        RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
        if (oldEntry != null) {
            oldEntry.addThreadId(threadId);
        } else {
            entry.addThreadId(threadId);
            this.renewExpiration();
        }
    }
private void renewExpiration() {
        RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
        if (ee != null) {
            Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
                public void run(Timeout timeout) throws Exception {
                    RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
                    if (ent != null) {
                        Long threadId = ent.getFirstThreadId();
                        if (threadId != null) {
                            RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
                            future.onComplete((res, e) -> {
                                if (e != null) {
                                    RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);
                                } else {
                                    if (res) {
                                        RedissonLock.this.renewExpiration();
                                    }
                                }
                            });
                        }
                    }
                }
            }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
            ee.setTimeout(task);
        }
    }

拆分來看,這段連續(xù)嵌套且冗長的代碼實際上做了幾步:

  • 添加一個nettyTimeout回調(diào)任務,每(internalLockLeaseTime / 3)毫秒執(zhí)行一次,執(zhí)行的方法是renewExpirationAsync
  • renewExpirationAsync重置了鎖超時時間,又注冊一個監(jiān)聽器,監(jiān)聽回調(diào)又執(zhí)行了renewExpiration

renewExpirationAsync 的Lua如下

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
        return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
    }
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then 
  redis.call('pexpire', KEYS[1], ARGV[1]); 
  return 1; 
end; 
return 0;

重新設置了超時時間。
Redisson加這段邏輯的目的是什么?
目的是為了某種場景下保證業(yè)務不影響,如任務執(zhí)行超時但未結束,鎖已經(jīng)釋放的問題。
當一個線程持有了一把鎖,由于并未設置超時時間leaseTimeRedisson 默認配置了30S,開啟watchDog,每10S對該鎖進行一次續(xù)約,維持30S的超時時間,直到任務完成再刪除鎖。
這就是Redisson的鎖續(xù)約 ,也就是WatchDog 實現(xiàn)的基本思路。

1.5.4 流程概括

通過整體的介紹,流程簡單概括:

  • A、B線程爭搶一把鎖,A獲取到后,B阻塞
  • B線程阻塞時并非主動 CAS,而是PubSub方式訂閱該鎖的廣播消息
  • A操作完成釋放了鎖,B線程收到訂閱消息通知
  • B被喚醒開始繼續(xù)搶鎖,拿到鎖

詳細加鎖解鎖流程總結如下圖:

1.6 公平鎖

以上介紹的可重入鎖是非公平鎖,Redisson還基于Redis的隊列(List)和ZSet實現(xiàn)了公平鎖

1.6.1 java中公平鎖

公平的定義是什么?

公平就是按照客戶端的請求先來后到排隊來獲取鎖,先到先得,也就是FIFO,所以隊列和容器順序編排必不可少

回顧JUCReentrantLock公平鎖的實現(xiàn)

/**
 * Sync object for fair locks
 */
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;
    final void lock() {
        acquire(1);
    }
    /**
     * Fair version of tryAcquire.  Don't grant access unless
     * recursive call or no waiters or is first.
     */
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

AQS已經(jīng)提供了整個實現(xiàn),是否公平取決于實現(xiàn)類取出節(jié)點邏輯是否順序取

AbstractQueuedSynchronizer是用來構建鎖或者其他同步組件的基礎框架,通過內(nèi)置FIFO隊列來完成資源獲取線程的排隊工作,自身沒有實現(xiàn)同步接口,僅僅定義了若干同步狀態(tài)獲取和釋放的方法來供自定義同步組件使用(上圖),支持獨占共享獲取,這是基于模版方法模式的一種設計,給公平/非公平提供了土壤。

我們用2張圖來簡單解釋AQS的等待流程
一張是同步隊列(FIFO雙向隊列)管理 獲取同步狀態(tài)失?。〒屾i失?。┑木€程引用、等待狀態(tài)和前驅后繼節(jié)點的流程圖

一張是獨占式獲取同步狀態(tài)的總流程 ,核心acquire(int arg)方法調(diào)用流程

可以看出鎖的獲取流程
AQS維護一個同步隊列,獲取狀態(tài)失敗的線程都會加入到隊列中進行自旋,移出隊列或停止自旋的條件是前驅節(jié)點為頭節(jié)點切成功獲取了同步狀態(tài)。而比較另一段非公平鎖類NonfairSync可以發(fā)現(xiàn),控制公平和非公平的關鍵代碼,在于hasQueuedPredecessors方法。

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;
    /**
     * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

NonfairSync減少了了hasQueuedPredecessors判斷條件,該方法的作用就是

  • 查看同步隊列中當前節(jié)點是否有前驅節(jié)點,如果有比當前線程更早請求獲取鎖則返回true。
  • 保證每次都取隊列的第一個節(jié)點(線程)來獲取鎖,這就是公平規(guī)則

為什么JUC以默認非公平鎖呢?

因為當一個線程請求鎖時,只要獲取來同步狀態(tài)即成功獲取。在此前提下,剛釋放的線程再次獲取同步狀態(tài)的幾率會非常大,使得其他線程只能在同步隊列中等待。但這樣帶來的好處是,非公平鎖大大減少了系統(tǒng)線程上下文的切換開銷。

可見公平的代價是性能與吞吐量。
Redis里沒有AQS,但是有List和zSet,看看Redisson是怎么實現(xiàn)公平的

1.6.2 RedissonFairLock

RedissonFairLock 用法依然很簡單

RLock fairLock = redissonClient.getFairLock(lockName);fairLock.lock();

RedissonFairLock繼承自RedissonLock,同樣一路向下找到加鎖實現(xiàn)方法tryLockInnerAsync 。

這里有2段冗長的Lua,但是Debug發(fā)現(xiàn),公平鎖的入口在 command == RedisCommands.EVAL_LONG 之后,此段Lua較長,參數(shù)也多,我們著重分析Lua的實現(xiàn)規(guī)則

參數(shù)

-- lua中的幾個參數(shù)
KEYS = Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName)
KEYS[1]: lock_name, 鎖名稱                   
KEYS[2]: "redisson_lock_queue:{xxx}"  線程隊列
KEYS[3]: "redisson_lock_timeout:{xxx}"  線程id對應的超時集合
ARGV =  internalLockLeaseTime, getLockName(threadId), currentTime + threadWaitTime, currentTime
ARGV[1]: "{leaseTime}" 過期時間
ARGV[2]: "{Redisson.UUID}:{threadId}"   
ARGV[3] = 當前時間 + 線程等待時間:(10:00:00) + 5000毫秒 = 10:00:05
ARGV[4] = 當前時間(10:00:00)  部署服務器時間,非redis-server服務器時間

公平鎖實現(xiàn)的Lua腳本

-- 1.死循環(huán)清除過期key
while true do 
  -- 獲取頭節(jié)點
    local firstThreadId2 = redis.call('lindex', KEYS[2], 0);
    -- 首次獲取必空跳出循環(huán)
  if firstThreadId2 == false then 
    break;
  end;
  -- 清除過期key
  local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));
  if timeout <= tonumber(ARGV[4]) then
    redis.call('zrem', KEYS[3], firstThreadId2);
    redis.call('lpop', KEYS[2]);
  else
    break;
  end;
end;
-- 2.不存在該鎖 && (不存在線程等待隊列 || 存在線程等待隊列而且第一個節(jié)點就是此線程ID),加鎖部分主要邏輯
if (redis.call('exists', KEYS[1]) == 0) and 
  ((redis.call('exists', KEYS[2]) == 0)  or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then
  -- 彈出隊列中線程id元素,刪除Zset中該線程id對應的元素
  redis.call('lpop', KEYS[2]);
  redis.call('zrem', KEYS[3], ARGV[2]);
  local keys = redis.call('zrange', KEYS[3], 0, -1);
  -- 遍歷zSet所有key,將key的超時時間(score) - 當前時間ms
  for i = 1, #keys, 1 do 
    redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);
  end;
    -- 加鎖設置鎖過期時間
  redis.call('hset', KEYS[1], ARGV[2], 1);
  redis.call('pexpire', KEYS[1], ARGV[1]);
  return nil;
end;
-- 3.線程存在,重入判斷
if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then
  redis.call('hincrby', KEYS[1], ARGV[2],1);
  redis.call('pexpire', KEYS[1], ARGV[1]);
  return nil;
end;
-- 4.返回當前線程剩余存活時間
local timeout = redis.call('zscore', KEYS[3], ARGV[2]);
    if timeout ~= false then
  -- 過期時間timeout的值在下方設置,此處的減法算出的依舊是當前線程的ttl
  return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);
end;
-- 5.尾節(jié)點剩余存活時間
local lastThreadId = redis.call('lindex', KEYS[2], -1);
local ttl;
-- 尾節(jié)點不空 && 尾節(jié)點非當前線程
if lastThreadId ~= false and lastThreadId ~= ARGV[2] then
  -- 計算隊尾節(jié)點剩余存活時間
  ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);
else
  -- 獲取lock_name剩余存活時間
  ttl = redis.call('pttl', KEYS[1]);
end;
-- 6.末尾排隊
-- zSet 超時時間(score),尾節(jié)點ttl + 當前時間 + 5000ms + 當前時間,無則新增,有則更新
-- 線程id放入隊列尾部排隊,無則插入,有則不再插入
local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);
if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then
  redis.call('rpush', KEYS[2], ARGV[2]);
end;
return ttl;

1.6.3 公平鎖加鎖步驟

通過以上Lua,可以發(fā)現(xiàn),lua操作的關鍵結構是列表(list)和有序集合(zSet)。
其中 list 維護了一個等待的線程隊列 redisson_lock_queue:{xxx},zSet維護了一個線程超時情況的有序集合 redisson_lock_timeout:{xxx},盡管lua較長,但是可以拆分為6個步驟

  • 隊列清理
    保證隊列中只有未過期的等待線程
  • 首次加鎖
    hset加鎖,pexpire過期時間
  • 重入判斷
    此處同可重入鎖lua
  • 返回ttl
  • 計算尾節(jié)點ttl
    初始值為鎖的剩余過期時間
  • 末尾排隊
    ttl + 2 * currentTime + waitTime是score的默認值計算公式

以上就是Redis之Redisson原理詳解的詳細內(nèi)容,更多關于Redisson原理的資料請關注腳本之家其它相關文章!

相關文章

  • Redis使用元素刪除的布隆過濾器來解決緩存穿透問題

    Redis使用元素刪除的布隆過濾器來解決緩存穿透問題

    本文主要介紹了Redis使用元素刪除的布隆過濾器來解決緩存穿透問題,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2021-08-08
  • redis如何設置key的有效期

    redis如何設置key的有效期

    這篇文章主要介紹了redis如何設置key的有效期方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-01-01
  • redis加鎖的三種方式小結

    redis加鎖的三種方式小結

    本文主要介紹了redis加鎖的三種方式小結,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2023-01-01
  • k8s部署redis哨兵的實現(xiàn)

    k8s部署redis哨兵的實現(xiàn)

    本文主要介紹了k8s部署redis哨兵的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2022-07-07
  • 基于Redis6.2.6版本部署Redis?Cluster集群的問題

    基于Redis6.2.6版本部署Redis?Cluster集群的問題

    這篇文章主要介紹了基于Redis6.2.6版本部署Redis?Cluster集群,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2022-04-04
  • redis如何實現(xiàn)清空緩存

    redis如何實現(xiàn)清空緩存

    這篇文章主要介紹了redis如何實現(xiàn)清空緩存,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-08-08
  • 同一份數(shù)據(jù)Redis為什么要存兩次

    同一份數(shù)據(jù)Redis為什么要存兩次

    這篇文章主要介紹了同一份數(shù)據(jù)Redis為什么要存兩次,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-01-01
  • redis常用命令小結

    redis常用命令小結

    這篇文章主要介紹了redis的一些常用命令,需要的朋友可以參考下
    2014-06-06
  • Redis連接池配置方式

    Redis連接池配置方式

    文章介紹了Redis連接池的配置方法,包括與數(shù)據(jù)庫連接時引入連接池的必要性、Java中使用Redis連接池的示例、jar包準備、編寫配置代碼以及連接池參數(shù)的設置
    2024-12-12
  • Redis實現(xiàn)分布式鎖方法詳細

    Redis實現(xiàn)分布式鎖方法詳細

    在單體應用中,如果我們對共享數(shù)據(jù)不進行加鎖操作,會出現(xiàn)數(shù)據(jù)一致性問題,我們的解決辦法通常是加鎖。在分布式架構中,我們同樣會遇到數(shù)據(jù)共享操作問題。本文將介紹Redis實現(xiàn)分布式鎖的五種方式。需要的可以參考一下
    2021-12-12

最新評論