Java使用Redisson分布式鎖實(shí)現(xiàn)原理
1. 基本用法
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.8.2</version> </dependency>
Config config = new Config(); config.useClusterServers() .setScanInterval(2000) // cluster state scan interval in milliseconds .addNodeAddress("redis://127.0.0.1:7000", "redis://127.0.0.1:7001") .addNodeAddress("redis://127.0.0.1:7002"); RedissonClient redisson = Redisson.create(config); RLock lock = redisson.getLock("anyLock"); lock.lock(); try { ... } finally { lock.unlock(); }
針對(duì)上面這段代碼,重點(diǎn)看一下Redisson是如何基于Redis實(shí)現(xiàn)分布式鎖的
Redisson中提供的加鎖的方法有很多,但大致類似,此處只看lock()方法
更多請(qǐng)參見(jiàn)https://github.com/redisson/redisson/wiki/8.-distributed-locks-and-synchronizers
2. 加鎖
可以看到,調(diào)用getLock()方法后實(shí)際返回一個(gè)RedissonLock對(duì)象,在RedissonLock對(duì)象的lock()方法主要調(diào)用tryAcquire()方法
由于leaseTime == -1,于是走tryLockInnerAsync()方法,這個(gè)方法才是關(guān)鍵
首先,看一下evalWriteAsync方法的定義
最后兩個(gè)參數(shù)分別是keys和params
實(shí)際調(diào)用是這樣的:
單獨(dú)將調(diào)用的那一段摘出來(lái)看
commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
結(jié)合上面的參數(shù)聲明,我們可以知道,這里KEYS[1]就是getName(),ARGV[2]是getLockName(threadId)
假設(shè)前面獲取鎖時(shí)傳的name是“abc”,假設(shè)調(diào)用的線程ID是Thread-1,假設(shè)成員變量UUID類型的id是6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c
那么KEYS[1]=abc,ARGV[2]=6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1
因此,這段腳本的意思是
1、判斷有沒(méi)有一個(gè)叫“abc”的key
2、如果沒(méi)有,則在其下設(shè)置一個(gè)字段為“6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1”,值為“1”的鍵值對(duì) ,并設(shè)置它的過(guò)期時(shí)間
3、如果存在,則進(jìn)一步判斷“6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1”是否存在,若存在,則其值加1,并重新設(shè)置過(guò)期時(shí)間
4、返回“abc”的生存時(shí)間(毫秒)
這里用的數(shù)據(jù)結(jié)構(gòu)是hash,hash的結(jié)構(gòu)是: key 字段1 值1 字段2 值2 。。。
用在鎖這個(gè)場(chǎng)景下,key就表示鎖的名稱,也可以理解為臨界資源,字段就表示當(dāng)前獲得鎖的線程
所有競(jìng)爭(zhēng)這把鎖的線程都要判斷在這個(gè)key下有沒(méi)有自己線程的字段,如果沒(méi)有則不能獲得鎖,如果有,則相當(dāng)于重入,字段值加1(次數(shù))
3. 解鎖
protected RFuture<Boolean> unlockInnerAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId)); }
我們還是假設(shè)name=abc,假設(shè)線程ID是Thread-1
同理,我們可以知道
KEYS[1]是getName(),即KEYS[1]=abc
KEYS[2]是getChannelName(),即KEYS[2]=redisson_lock__channel:{abc}
ARGV[1]是LockPubSub.unlockMessage,即ARGV[1]=0
ARGV[2]是生存時(shí)間
ARGV[3]是getLockName(threadId),即ARGV[3]=6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1
因此,上面腳本的意思是:
1、判斷是否存在一個(gè)叫“abc”的key
2、如果不存在,向Channel中廣播一條消息,廣播的內(nèi)容是0,并返回1
3、如果存在,進(jìn)一步判斷字段6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1是否存在
4、若字段不存在,返回空,若字段存在,則字段值減1
5、若減完以后,字段值仍大于0,則返回0
6、減完后,若字段值小于或等于0,則廣播一條消息,廣播內(nèi)容是0,并返回1;
可以猜測(cè),廣播0表示資源可用,即通知那些等待獲取鎖的線程現(xiàn)在可以獲得鎖了
4. 等待
以上是正常情況下獲取到鎖的情況,那么當(dāng)無(wú)法立即獲取到鎖的時(shí)候怎么辦呢?
再回到前面獲取鎖的位置
@Override public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException { long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return; } // 訂閱 RFuture<RedissonLockEntry> future = subscribe(threadId); commandExecutor.syncSubscription(future); try { while (true) { ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { break; } // waiting for message if (ttl >= 0) { getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { getEntry(threadId).getLatch().acquire(); } } } finally { unsubscribe(future, threadId); } // get(lockAsync(leaseTime, unit)); } protected static final LockPubSub PUBSUB = new LockPubSub(); protected RFuture<RedissonLockEntry> subscribe(long threadId) { return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService()); } protected void unsubscribe(RFuture<RedissonLockEntry> future, long threadId) { PUBSUB.unsubscribe(future.getNow(), getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService()); }
這里會(huì)訂閱Channel,當(dāng)資源可用時(shí)可以及時(shí)知道,并搶占,防止無(wú)效的輪詢而浪費(fèi)資源
當(dāng)資源可用用的時(shí)候,循環(huán)去嘗試獲取鎖,由于多個(gè)線程同時(shí)去競(jìng)爭(zhēng)資源,所以這里用了信號(hào)量,對(duì)于同一個(gè)資源只允許一個(gè)線程獲得鎖,其它的線程阻塞
5. 小結(jié)
6. 其它相關(guān)
《基于Redis的分布式鎖的簡(jiǎn)單實(shí)現(xiàn)》
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
MyBatis中批量插入和批量更新的實(shí)現(xiàn)方法詳解
這篇文章主要介紹了MyBatis中批量插入和批量更新的實(shí)現(xiàn)方法,在日常開(kāi)發(fā)中有時(shí)候需要從A數(shù)據(jù)庫(kù)提取大量數(shù)據(jù)同步到B系統(tǒng),這種情況自然是需要批量操作才行,感興趣想要詳細(xì)了解可以參考下文2023-05-05Java實(shí)現(xiàn)解析.xlsb文件的示例代碼
這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)解析.xlsb文件的相關(guān)方法,文中的示例代碼講解詳細(xì),具有一定的借鑒價(jià)值,感興趣的可以了解一下2023-01-01Java 17 更新后的 strictfp 關(guān)鍵字
strictfp 可能是最沒(méi)有存在感的關(guān)鍵字了,很多人寫(xiě)了多年 Java 甚至都不知道它的存在,strictfp,字面意思就是嚴(yán)格的浮點(diǎn)型。這玩意兒居然還有個(gè)關(guān)鍵字,可見(jiàn)其地位還是很高的。下面文章小編就帶大家詳細(xì)介紹其關(guān)鍵字,需要的朋友可以參考一下2021-09-09java.text.DecimalFormat類十進(jìn)制格式化
這篇文章主要為大家詳細(xì)介紹了java.text.DecimalFormat類十進(jìn)制格式化的方法,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-03-03java 中使用匿名類直接new接口詳解及實(shí)例代碼
這篇文章主要介紹了java 中使用匿名類直接new接口詳解及實(shí)例代碼的相關(guān)資料,需要的朋友可以參考下2017-03-03Mapper批量插入Oracle數(shù)據(jù)@InsertProvider注解
今天小編就為大家分享一篇關(guān)于Mapper批量插入Oracle數(shù)據(jù)@InsertProvider注解,小編覺(jué)得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧2019-03-03java通過(guò)注解翻譯字典的實(shí)現(xiàn)示例
本文主要介紹了java通過(guò)注解翻譯字典的實(shí)現(xiàn)示例,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-04-04mybatis多數(shù)據(jù)源動(dòng)態(tài)切換的完整步驟
這篇文章主要給大家介紹了關(guān)于mybatis多數(shù)據(jù)源動(dòng)態(tài)切換的完整步驟,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11Java?ArrayList遍歷foreach與iterator時(shí)remove的區(qū)別
這篇文章主要介紹了Java?ArrayList遍歷foreach與iterator時(shí)remove的區(qū)別,文章圍繞主題展開(kāi)詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,需要的朋友可以參考一下2022-07-07