Redis:Redisson分布式鎖的使用方式(推薦使用)
Redis:Redisson分布式鎖的使用(生產(chǎn)環(huán)境下)(推薦使用)
關(guān)鍵詞
- 基于NIO的Netty框架,生產(chǎn)環(huán)境使用分布式鎖
- redisson加鎖:lua腳本加鎖(其他客戶端自旋)
- 自動延時機制:啟動watch dog,后臺線程,每隔10秒檢查一下客戶端1還持有鎖key,會不斷的延長鎖key的生存時間
- 可重入鎖機制:第二個if判斷 ,myLock :{“8743c9c0-0795-4907-87fd-6c719a6b4586:1”:2 }
- 釋放鎖:無鎖直接返回;有鎖不是我加的,返回;有鎖是我加的,執(zhí)行hincrby -1,當重入鎖減完才執(zhí)行del操作
- Redis使用同一個Lua解釋器來執(zhí)行所有命令,Redis保證以一種原子性的方式來執(zhí)行腳本:當lua腳本在執(zhí)行的時候,不會有其他腳本和命令同時執(zhí)行,這種語義類似于 MULTI/EXEC。從別的客戶端的視角來看,一個
- lua腳本要么不可見,要么已經(jīng)執(zhí)行完
一、 Redisson使用
Redisson是架設(shè)在Redis基礎(chǔ)上的一個Java駐內(nèi)存數(shù)據(jù)網(wǎng)格(In-Memory Data Grid)。
Redisson在基于NIO的Netty框架上,生產(chǎn)環(huán)境使用分布式鎖。
加入jar包的依賴
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>2.7.0</version> </dependency>
配置Redisson
public class RedissonManager { private static Config config = new Config(); //聲明redisso對象 private static Redisson redisson = null; //實例化redisson static{ config.useClusterServers() // 集群狀態(tài)掃描間隔時間,單位是毫秒 .setScanInterval(2000) //cluster方式至少6個節(jié)點(3主3從,3主做sharding,3從用來保證主宕機后可以高可用) .addNodeAddress("redis://127.0.0.1:6379" ) .addNodeAddress("redis://127.0.0.1:6380") .addNodeAddress("redis://127.0.0.1:6381") .addNodeAddress("redis://127.0.0.1:6382") .addNodeAddress("redis://127.0.0.1:6383") .addNodeAddress("redis://127.0.0.1:6384"); //得到redisson對象 redisson = (Redisson) Redisson.create(config); } //獲取redisson對象的方法 public static Redisson getRedisson(){ return redisson; } }
鎖的獲取和釋放
public class DistributedRedisLock { //從配置類中獲取redisson對象 private static Redisson redisson = RedissonManager.getRedisson(); private static final String LOCK_TITLE = "redisLock_"; //加鎖 public static boolean acquire(String lockName){ //聲明key對象 String key = LOCK_TITLE + lockName; //獲取鎖對象 RLock mylock = redisson.getLock(key); //加鎖,并且設(shè)置鎖過期時間3秒,防止死鎖的產(chǎn)生 uuid+threadId mylock.lock(2,3,TimeUtil.SECOND); //加鎖成功 return true; } //鎖的釋放 public static void release(String lockName){ //必須是和加鎖時的同一個key String key = LOCK_TITLE + lockName; //獲取所對象 RLock mylock = redisson.getLock(key); //釋放鎖(解鎖) mylock.unlock();
業(yè)務(wù)邏輯中使用分布式鎖
public String discount() throws IOException{ String key = "lock001"; //加鎖 DistributedRedisLock.acquire(key); //執(zhí)行具體業(yè)務(wù)邏輯 dosoming //釋放鎖 DistributedRedisLock.release(key); //返回結(jié)果 return soming; }
二、Redisson分布式鎖的實現(xiàn)原理
2.1 加鎖機制
如果該客戶端面對的是一個redis cluster集群,他首先會根據(jù)hash節(jié)點選擇一臺機器。
發(fā)送lua腳本到redis服務(wù)器上,腳本如下:
//exists',KEYS[1])==0 不存在,沒鎖 "if (redis.call('exists',KEYS[1])==0) then "+ --看有沒有鎖 // 命令:hset,1:第一回 "redis.call('hset',KEYS[1],ARGV[2],1) ; "+ --無鎖 加鎖 // 配置鎖的生命周期 "redis.call('pexpire',KEYS[1],ARGV[1]) ; "+ "return nil; end ;" + //可重入操作,判斷是不是我加的鎖 "if (redis.call('hexists',KEYS[1],ARGV[2]) ==1 ) then "+ --我加的鎖 //hincrby 在原來的鎖上加1 "redis.call('hincrby',KEYS[1],ARGV[2],1) ; "+ --重入鎖 "redis.call('pexpire',KEYS[1],ARGV[1]) ; "+ "return nil; end ;" + //否則,鎖存在,返回鎖的有效期,決定下次執(zhí)行腳本時間 "return redis.call('pttl',KEYS[1]) ;" --不能加鎖,返回鎖的時間
lua的作用:保證這段復(fù)雜業(yè)務(wù)邏輯執(zhí)行的原子性。
lua的解釋:
- KEYS[1]) : 加鎖的key
- ARGV[1] : key的生存時間,默認為30秒
- ARGV[2] : 加鎖的客戶端ID (UUID.randomUUID()) + “:” + threadId)
第一段if判斷語句,就是用“exists myLock”命令判斷一下,如果你要加鎖的那個鎖key不存在的話,你就進行加鎖。
如何加鎖呢?很簡單,用下面的命令:
hset myLock
8743c9c0-0795-4907-87fd-6c719a6b4586:1 1
通過這個命令設(shè)置一個hash數(shù)據(jù)結(jié)構(gòu),這行命令執(zhí)行后,會出現(xiàn)一個類似下面的數(shù)據(jù)結(jié)構(gòu):
myLock :{“8743c9c0-0795-4907-87fd-6c719a6b4586:1”:1 }
上述就代表“8743c9c0-0795-4907-87fd-6c719a6b4586:1”這個客戶端對“myLock”這個鎖key完成了加鎖。
接著會執(zhí)行“pexpire myLock 30000”命令,設(shè)置myLock這個鎖key的生存時間是30秒。
鎖互斥機制
那么在這個時候,如果客戶端2來嘗試加鎖,執(zhí)行了同樣的一段lua腳本,會咋樣呢?
很簡單,第一個if判斷會執(zhí)行“exists myLock”,發(fā)現(xiàn)myLock這個鎖key已經(jīng)存在了。
接著第二個if判斷,判斷一下,myLock鎖key的hash數(shù)據(jù)結(jié)構(gòu)中,是否包含客戶端2的ID,但是明顯不是的,因為那里包含的是客戶端1的ID。
所以,客戶端2會獲取到pttl myLock返回的一個數(shù)字,這個數(shù)字代表了myLock這個鎖key的剩余生存時間。比如還剩15000毫秒的生存時間。
此時客戶端2會進入一個while循環(huán),不停的嘗試加鎖。
自動延時機制
只要客戶端1一旦加鎖成功,就會啟動一個watch dog看門狗,他是一個后臺線程,會每隔10秒檢查一下,如果客戶端1還持有鎖key,那么就會不斷的延長鎖key的生存時間。
可重入鎖機制
第一個if判斷 肯定不成立,“exists myLock”會顯示鎖key已經(jīng)存在了。
第二個if判斷 會成立,因為myLock的hash數(shù)據(jù)結(jié)構(gòu)中包含的那個ID,就是客戶端1的那個ID,也就是“8743c9c0-0795-4907-87fd-6c719a6b4586:1”
此時就會執(zhí)行可重入加鎖的邏輯,他會用:incrby myLock 8743c9c0-0795-4907-87fd-6c71a6b4586:1 1
通過這個命令,對客戶端1的加鎖次數(shù),累加1。數(shù)據(jù)結(jié)構(gòu)會變成:myLock :{“8743c9c0-0795-4907-87fd-6c719a6b4586:1”:2 }
2.2 釋放鎖機制
執(zhí)行l(wèi)ua腳本如下:
# 如果key已經(jīng)不存在,說明已經(jīng)被解鎖,直接發(fā)布(publish)redis消息(無鎖,直接返回) "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + # key和field不匹配,說明當前客戶端線程沒有持有鎖,不能主動解鎖。 不是我加的鎖 不能解鎖 (有鎖不是我加的,返回) "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + # 將value減1 (有鎖是我加的,進行hincrby -1 ) "local counter = redis.call('hincrby', KEYS[1], ARGV[3],-1); " + # 如果counter>0說明鎖在重入,不能刪除key "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + # 刪除key并且publish 解鎖消息 # 可重入鎖減完了,進行del操作 "else " + "redis.call('del', KEYS[1]); " + #刪除鎖 "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;",
- – KEYS[1] :需要加鎖的key,這里需要是字符串類型。
- – KEYS[2] :redis消息的ChannelName,一個分布式鎖對應(yīng)唯一的一個channelName: “redisson_lockchannel{” + getName() + “}”
- – ARGV[1] :reids消息體,這里只需要一個字節(jié)的標記就可以,主要標記redis的key已經(jīng)解鎖,再結(jié)合 redis的Subscribe,能喚醒其他訂閱解鎖消息的客戶端線程申請鎖。
- – ARGV[2] :鎖的超時時間,防止死鎖
- – ARGV[3] :鎖的唯一標識,也就是剛才介紹的 id(UUID.randomUUID()) + “:” + threadId
如果執(zhí)行l(wèi)ock.unlock(),就可以釋放分布式鎖,此時的業(yè)務(wù)邏輯也是非常簡單的。
其實說白了,就是每次都對myLock數(shù)據(jù)結(jié)構(gòu)中的那個加鎖次數(shù)減1。
如果發(fā)現(xiàn)加鎖次數(shù)是0了,說明這個客戶端已經(jīng)不再持有鎖了,此時就會用:
- “del myLock”命令,從redis里刪除這個key。
- 然后呢,另外的客戶端2就可以嘗試完成加鎖了。
分布式鎖特性
- 互斥性
- 任意時刻,只能有一個客戶端獲取鎖,不能同時有兩個客戶端獲取到鎖。
- 同一性
- 鎖只能被持有該鎖的客戶端刪除,不能由其它客戶端刪除。
- 可重入性
- 持有某個鎖的客戶端可繼續(xù)對該鎖加鎖,實現(xiàn)鎖的續(xù)租
- 容錯性
- 鎖失效后(超過生命周期)自動釋放鎖(key失效),其他客戶端可以繼續(xù)獲得該鎖,防止死鎖
分布式鎖的實際應(yīng)用
- 數(shù)據(jù)并發(fā)競爭
- 利用分布式鎖可以將處理串行化,前面已經(jīng)講過了。
- 防止庫存超賣
訂單1下單前會先查看庫存,庫存為10,所以下單5本可以成功;
訂單2下單前會先查看庫存,庫存為10,所以下單8本可以成功;
訂單1和訂單2 同時操作,共下單13本,但庫存只有10本,顯然庫存不夠了,這種情況稱為庫存超賣。
可以采用分布式鎖解決這個問題。
訂單1和訂單2都從Redis中獲得分布式鎖(setnx),誰能獲得鎖誰進行下單操作,這樣就把訂單系統(tǒng)下單的順序串行化了,就不會出現(xiàn)超賣的情況了。
偽碼如下:
//加鎖并設(shè)置有效期 if(redis.lock("RDL",200)){ //判斷庫存 if (orderNum<getCount()){ //加鎖成功 ,可以下單 order(5); //釋放鎖 redis,unlock("RDL"); } }
注意此種方法會降低處理效率,這樣不適合秒殺的場景,秒殺可以使用CAS和Redis隊列的方式。
總結(jié)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
關(guān)于在Redis中使用Pipelining加速查詢的問題
這篇文章主要介紹了在Redis中使用Pipelining加速查詢,Redis是一個client-server模式的TCP服務(wù),也被稱為Request/Response協(xié)議的實現(xiàn),本文通過一個例子給大家詳細介紹,感興趣的朋友一起看看吧2022-05-05Redis內(nèi)部數(shù)據(jù)結(jié)構(gòu)Dict的實現(xiàn)方法
這篇文章主要介紹了Redis內(nèi)部數(shù)據(jù)結(jié)構(gòu)Dict的實現(xiàn)方法,本篇文章所述的dict在Redis中最主要的作用就是用于維護Redis數(shù)據(jù)庫中所有Key、value映射的數(shù)據(jù)結(jié)構(gòu),需要的朋友可以參考下2022-05-05聊聊使用RedisTemplat實現(xiàn)簡單的分布式鎖的問題
這篇文章主要介紹了使用RedisTemplat實現(xiàn)簡單的分布式鎖問題,文中給大家介紹在SpringBootTest中編寫測試模塊的詳細代碼,需要的朋友可以參考下2021-11-11