Redisson公平鎖的源碼解讀分享
前言
我在上一篇文章聊了Redisson的分布式鎖,這次繼續(xù)來(lái)聊聊Redisson的公平鎖。下面是官方原話:
它保證了當(dāng)多個(gè)Redisson客戶端線程同時(shí)請(qǐng)求加鎖時(shí),優(yōu)先分配給先發(fā)出請(qǐng)求的線程。所有請(qǐng)求線程會(huì)在一個(gè)隊(duì)列中排隊(duì),當(dāng)某個(gè)線程出現(xiàn)宕機(jī)時(shí),Redisson會(huì)等待5秒后繼續(xù)下一個(gè)線程,也就是說(shuō)如果前面有5個(gè)線程都處于等待狀態(tài),那么后面的線程會(huì)等待至少25秒。
源碼版本:3.17.7
這是我 fork 的分支,添加了自己理解的中文注釋?zhuān)?a rel="external nofollow" target="_blank">https://github.com/xiaoguyu/redisson
公平鎖
先上官方例子:
RLock fairLock = redisson.getFairLock("anyLock"); // 嘗試加鎖,最多等待100秒,上鎖以后10秒自動(dòng)解鎖 boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS); ... fairLock.unlock();
因?yàn)樵赗edisson中,公平鎖和普通可重入鎖的邏輯大體上一樣,我在上一篇文章都介紹了,這里就不再贅述。下面開(kāi)始介紹合理邏輯。
加鎖
加鎖的 lua 腳本在 RedissonFairLock#tryLockInnerAsync方法中
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { long wait = threadWaitTime; if (waitTime > 0) { wait = unit.toMillis(waitTime); } long currentTime = System.currentTimeMillis(); if (command == RedisCommands.EVAL_NULL_BOOLEAN) { ...... } if (command == RedisCommands.EVAL_LONG) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, // remove stale threads "while true do " + // list為空,證明沒(méi)有人排隊(duì),退出循環(huán) "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" + "if firstThreadId2 == false then " + "break;" + "end;" + // 能到這里,證明有人排隊(duì),拿出在排隊(duì)的第一個(gè)人的超時(shí)時(shí)間,如果超時(shí)了,則移除相應(yīng)數(shù)據(jù) "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" + "if timeout <= tonumber(ARGV[4]) then " + // remove the item from the queue and timeout set // NOTE we do not alter any other timeout "redis.call('zrem', KEYS[3], firstThreadId2);" + "redis.call('lpop', KEYS[2]);" + "else " + "break;" + "end;" + "end;" + // check if the lock can be acquired now // 檢查是否可以獲取鎖。如果hash和list都不存在,或者線程隊(duì)列的第一個(gè)是當(dāng)前線程,則可以獲取鎖 "if (redis.call('exists', KEYS[1]) == 0) " + "and ((redis.call('exists', KEYS[2]) == 0) " + "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " + // remove this thread from the queue and timeout set // 都獲取鎖了,當(dāng)然要從線程隊(duì)列和時(shí)間隊(duì)列中移除 "redis.call('lpop', KEYS[2]);" + "redis.call('zrem', KEYS[3], ARGV[2]);" + // decrease timeouts for all waiting in the queue // 刷新時(shí)間集合中的時(shí)間 "local keys = redis.call('zrange', KEYS[3], 0, -1);" + "for i = 1, #keys, 1 do " + "redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" + "end;" + // acquire the lock and set the TTL for the lease // 和公平鎖的設(shè)置一樣,值加1并且設(shè)置過(guò)期時(shí)間 "redis.call('hset', KEYS[1], ARGV[2], 1);" + "redis.call('pexpire', KEYS[1], ARGV[1]);" + "return nil;" + "end;" + // check if the lock is already held, and this is a re-entry // 能到這里,證明前面拿不到鎖,但是也要做可重入鎖的處理 "if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " + "redis.call('hincrby', KEYS[1], ARGV[2],1);" + "redis.call('pexpire', KEYS[1], ARGV[1]);" + "return nil;" + "end;" + // the lock cannot be acquired // check if the thread is already in the queue // 時(shí)間集合中有值,證明線程已經(jīng)在隊(duì)列中,不需要往后執(zhí)行邏輯了 "local timeout = redis.call('zscore', KEYS[3], ARGV[2]);" + "if timeout ~= false then " + // the real timeout is the timeout of the prior thread // in the queue, but this is approximately correct, and // avoids having to traverse the queue // 因?yàn)橄旅娴膖imeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]) // 所以這里的ttl = timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]) "return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" + "end;" + // add the thread to the queue at the end, and set its timeout in the timeout set to the timeout of // the prior thread in the queue (or the timeout of the lock if the queue is empty) plus the // threadWaitTime "local lastThreadId = redis.call('lindex', KEYS[2], -1);" + "local ttl;" + // 如果最后一個(gè)線程不是當(dāng)前線程,則從時(shí)間集合取出(舉例:線程1/2/3按順序獲取鎖,此時(shí)pttl得到的是線程1的鎖過(guò)期時(shí)間,zscore拿到的是線程2的鎖的過(guò)期時(shí)間,此時(shí)線程3應(yīng)該以線程2的為準(zhǔn)) "if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " + "ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" + "else " + // 否則直接獲取鎖的存活時(shí)間 "ttl = redis.call('pttl', KEYS[1]);" + "end;" + // 過(guò)期時(shí)間 = 鎖存活時(shí)間 + 等待時(shí)間 + 當(dāng)前時(shí)間戳 "local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" + // 如果添加到時(shí)間集合成功,則同時(shí)添加線程集合 "if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " + "redis.call('rpush', KEYS[2], ARGV[2]);" + "end;" + "return ttl;", Arrays.asList(getRawName(), threadsQueueName, timeoutSetName), unit.toMillis(leaseTime), getLockName(threadId), wait, currentTime); } throw new IllegalArgumentException(); }
公平鎖總共用了Redis的三種數(shù)據(jù)類(lèi)型,對(duì)應(yīng)著 lua 腳本里面的keys1、2、3的參數(shù):
KEYS[1]
鎖的名字,使用 Hash 數(shù)據(jù)類(lèi)型,是可重入鎖的基礎(chǔ),結(jié)構(gòu)為 {”threadId1”: 1, “thread2”: 1},key為線程id,value是鎖的次數(shù)
KEYS[2]
線程隊(duì)列的名字,使用 List 數(shù)據(jù)類(lèi)型,結(jié)構(gòu)為 [ “threadId1”, “threadId2” ],按順序存放需要獲取鎖的線程的id
KEYS[3]
時(shí)間隊(duì)列的名字,使用 sorted set 數(shù)據(jù)類(lèi)型,結(jié)構(gòu)為 {”threadId2”:123, “threadId1”:190},key為線程id,value為獲取鎖的超時(shí)時(shí)間戳
我下面會(huì)用 鎖、線程隊(duì)列、時(shí)間隊(duì)列 來(lái)表示這3個(gè)數(shù)據(jù)結(jié)構(gòu),需要注意下我的表述。
同樣的,介紹下參數(shù):
- ARGV[1]:leaseTime 鎖的持有時(shí)間
- ARGV[2]:線程id(描述不太準(zhǔn)確,暫時(shí)按這樣理解)
- ARGV[3]:waitTime 嘗試獲取鎖的最大等待時(shí)間
- ARGV[4]:currentTime 當(dāng)前時(shí)間戳
接下來(lái),我們一段一段分析 lua 腳本,首先看最開(kāi)始的 while 循環(huán)
"while true do " + // list為空,證明沒(méi)有人排隊(duì),退出循環(huán) "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" + "if firstThreadId2 == false then " + "break;" + "end;" + // 能到這里,證明有人排隊(duì),拿出在排隊(duì)的第一個(gè)人的超時(shí)時(shí)間,如果超時(shí)了,則移除相應(yīng)數(shù)據(jù) "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" + "if timeout <= tonumber(ARGV[4]) then " + // 從時(shí)間隊(duì)列和線程隊(duì)列中移除 "redis.call('zrem', KEYS[3], firstThreadId2);" + "redis.call('lpop', KEYS[2]);" + "else " + "break;" + "end;" + "end;" +
具體的邏輯我在注釋中寫(xiě)的很清楚了,看的時(shí)候記住 KEYS[2]、KEYS[3] 對(duì)應(yīng)著線程隊(duì)列和時(shí)間隊(duì)列接口。主要注意的是,線程隊(duì)列只有當(dāng)一個(gè)線程持有鎖,另一個(gè)線程獲取不到鎖時(shí),才會(huì)有值(前面有人才排隊(duì),沒(méi)人排什么隊(duì))。接著看第二段
// 檢查是否可以獲取鎖。當(dāng)鎖不存在,并且線程隊(duì)列不存在或者線程隊(duì)列第一位是當(dāng)前線程,則可以獲取鎖 "if (redis.call('exists', KEYS[1]) == 0) " + "and ((redis.call('exists', KEYS[2]) == 0) or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " + // remove this thread from the queue and timeout set // 都獲取鎖了,當(dāng)然要從線程隊(duì)列和時(shí)間隊(duì)列中移除 "redis.call('lpop', KEYS[2]);" + "redis.call('zrem', KEYS[3], ARGV[2]);" + // decrease timeouts for all waiting in the queue // 刷新時(shí)間隊(duì)列中的時(shí)間 "local keys = redis.call('zrange', KEYS[3], 0, -1);" + "for i = 1, #keys, 1 do " + "redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" + "end;" + // acquire the lock and set the TTL for the lease // 和公平鎖的設(shè)置一樣,值加1并且設(shè)置過(guò)期時(shí)間 "redis.call('hset', KEYS[1], ARGV[2], 1);" + "redis.call('pexpire', KEYS[1], ARGV[1]);" + "return nil;" + "end;" +
翻譯翻譯就是,鎖不存在(別人沒(méi)有持有鎖)并且線程隊(duì)列不存在或者線程隊(duì)列第一位是當(dāng)前線程(不用排隊(duì)或者自己排第一)才能獲得鎖。因?yàn)闀r(shí)間隊(duì)列中存放的是各個(gè)線程等待鎖的超時(shí)時(shí)間戳,所以每次都需要刷新下。繼續(xù)下一段邏輯
// 能到這里,證明前面拿不到鎖,但是也要做可重入鎖的處理 "if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " + "redis.call('hincrby', KEYS[1], ARGV[2],1);" + "redis.call('pexpire', KEYS[1], ARGV[1]);" + "return nil;" + "end;" +
這是可重入鎖的處理,繼續(xù)下一段
// 時(shí)間隊(duì)列中有值,證明線程已經(jīng)在隊(duì)列中,不需要往后執(zhí)行邏輯了 "local timeout = redis.call('zscore', KEYS[3], ARGV[2]);" + "if timeout ~= false then " + // the real timeout is the timeout of the prior thread // in the queue, but this is approximately correct, and // avoids having to traverse the queue // 因?yàn)橄旅娴膖imeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]) // 所以這里的ttl = timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]) "return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" + "end;" +
舉例子:線程1持有鎖,線程2嘗試第一次獲取鎖(不進(jìn)入這段if),線程2第二次獲取鎖(進(jìn)入了這段if)。繼續(xù)下一段
"local lastThreadId = redis.call('lindex', KEYS[2], -1);" + "local ttl;" + // 如果最后一個(gè)線程不是當(dāng)前線程,則從時(shí)間集合取出(舉例:線程1/2/3按順序獲取鎖,此時(shí)pttl得到的是線程1的鎖過(guò)期時(shí)間,zscore拿到的是線程2的鎖的過(guò)期時(shí)間,此時(shí)線程3應(yīng)該以線程2的為準(zhǔn)) "if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " + "ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" + "else " + // 否則直接獲取鎖的存活時(shí)間 "ttl = redis.call('pttl', KEYS[1]);" + "end;" + // 過(guò)期時(shí)間 = 鎖存活時(shí)間 + 等待時(shí)間 + 當(dāng)前時(shí)間戳 "local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" + // 如果添加到時(shí)間集合成功,則同時(shí)添加線程集合 "if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " + "redis.call('rpush', KEYS[2], ARGV[2]);" + "end;" + "return ttl;",
ttl 這段的獲取邏輯,翻譯翻譯就是,如果前面有人排隊(duì),就以前面的超時(shí)時(shí)間為準(zhǔn),如果沒(méi)人排隊(duì),就拿鎖的超時(shí)時(shí)間。獲取到 ttl ,就對(duì)添加到線程集合和時(shí)間集合。
以上就是公平鎖的加鎖 lua 腳本的全部邏輯。講的有點(diǎn)亂,但是只要能搞清楚keys1、2、3對(duì)應(yīng)著哪種數(shù)據(jù)類(lèi)型,理解整個(gè)邏輯應(yīng)該問(wèn)題不大。
解鎖
解鎖的核心 lua 腳本是下面這段RedissonFairLock#unlockInnerAsync
protected RFuture<Boolean> unlockInnerAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // remove stale threads "while true do " // 線程隊(duì)列為空,證明沒(méi)有人排隊(duì),退出循環(huán) + "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" + "if firstThreadId2 == false then " + "break;" + "end; " // 能到這里,證明有人排隊(duì),拿出在排隊(duì)的第一個(gè)人的超時(shí)時(shí)間,如果超時(shí)了,則移除相應(yīng)數(shù)據(jù) + "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" + "if timeout <= tonumber(ARGV[4]) then " + "redis.call('zrem', KEYS[3], firstThreadId2); " + "redis.call('lpop', KEYS[2]); " + "else " + "break;" + "end; " + "end;" // 如果鎖不存在,則通過(guò)訂閱發(fā)布機(jī)制通知下一個(gè)等待中的線程 + "if (redis.call('exists', KEYS[1]) == 0) then " + "local nextThreadId = redis.call('lindex', KEYS[2], 0); " + "if nextThreadId ~= false then " + "redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " + "end; " + "return 1; " + "end;" + // 如果當(dāng)前線程已經(jīng)不存在鎖里面,直接返回null "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + // 可重入鎖處理邏輯,對(duì)當(dāng)前線程的鎖次數(shù)減1 "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + // 鎖次數(shù)仍然大于0,則刷新鎖的存活時(shí)間 "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "end; " + // 刪除鎖 "redis.call('del', KEYS[1]); " + // 訂閱發(fā)布機(jī)制通知下一個(gè)等待中的線程 "local nextThreadId = redis.call('lindex', KEYS[2], 0); " + "if nextThreadId ~= false then " + "redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " + "end; " + "return 1; ", Arrays.asList(getRawName(), threadsQueueName, timeoutSetName, getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId), System.currentTimeMillis()); }
算了,不想寫(xiě)了,看注釋吧。
總結(jié)
本文介紹了Redisson的公平鎖,邏輯大體上和普通可重入鎖一致,核心在于 lua 腳本,運(yùn)用了Redis的3種數(shù)據(jù)類(lèi)型。
到此這篇關(guān)于Redisson公平鎖的源碼解讀分享的文章就介紹到這了,更多相關(guān)Redisson公平鎖內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java如何更改數(shù)據(jù)庫(kù)中的數(shù)據(jù)
這篇文章主要介紹了java如何更改數(shù)據(jù)庫(kù)中的數(shù)據(jù),修改數(shù)據(jù)庫(kù)是數(shù)據(jù)庫(kù)操作必不可少的一部分,使用Statement接口中的excuteUpdate()方法可以修改數(shù)據(jù)表中的數(shù)據(jù),感興趣的朋友跟隨小編一起看看吧2021-11-11手寫(xiě)mybatis完整sql插件問(wèn)題及實(shí)現(xiàn)思路
大家在使用mybatis的過(guò)程中,mysql日志功能一般不會(huì)直接放到數(shù)據(jù)庫(kù)中執(zhí)行的,今天小編重點(diǎn)給大家分享手寫(xiě)mybatis完整sql插件問(wèn)題及實(shí)現(xiàn)思路,對(duì)mybatis完整sql插件相關(guān)知識(shí)感興趣的朋友一起看看吧2021-05-05Spring聲明式事務(wù)@Transactional知識(shí)點(diǎn)分享
在本篇文章里小編給大家整理了關(guān)于Spring聲明式事務(wù)@Transactional詳解內(nèi)容,需要的朋友們可以參考下。2020-02-02Java8 LocalDateTime極簡(jiǎn)時(shí)間日期操作小結(jié)
這篇文章主要介紹了Java8-LocalDateTime極簡(jiǎn)時(shí)間日期操作整理,通過(guò)實(shí)例代碼給大家介紹了java8 LocalDateTime 格式化問(wèn)題,需要的朋友可以參考下2020-04-04Java使用枚舉實(shí)現(xiàn)狀態(tài)機(jī)的方法詳解
這篇文章主要介紹了Java使用枚舉實(shí)現(xiàn)狀態(tài)機(jī)的方法詳解,枚舉類(lèi)型很適合用來(lái)實(shí)現(xiàn)狀態(tài)機(jī),狀態(tài)機(jī)可以處于有限數(shù)量的特定狀態(tài),它們通常根據(jù)輸入,從一個(gè)狀態(tài)移動(dòng)到下一個(gè)狀態(tài),但同時(shí)也會(huì)存在瞬態(tài),需要的朋友可以參考下2023-11-11Java項(xiàng)目中大批量數(shù)據(jù)查詢(xún)導(dǎo)致OOM的解決
本文主要介紹了Java項(xiàng)目中大批量數(shù)據(jù)查詢(xún)導(dǎo)致OOM的解決,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-06-06