Redis實(shí)現(xiàn)分布式鎖的示例代碼
什么是分布式鎖
分布式鎖是一種應(yīng)用級(jí)的鎖,在分布式系統(tǒng)下一個(gè)應(yīng)用通常會(huì)有多個(gè)節(jié)點(diǎn)部署,那么在高并發(fā)的情況下,就可能出現(xiàn)同一時(shí)間多個(gè)節(jié)點(diǎn)多個(gè)線程同時(shí)處理同一條數(shù)據(jù),這種情況很容易出現(xiàn)數(shù)據(jù)不一致。那么分布式鎖的出現(xiàn)就是為了解決此問題,它能夠保證一個(gè)代碼塊在同一時(shí)間只能被同一線程或同一個(gè)節(jié)點(diǎn)執(zhí)行。
分布式鎖有哪些
- 基于數(shù)據(jù)庫(kù):使用數(shù)據(jù)庫(kù)設(shè)計(jì)排他鎖或者共享鎖來實(shí)現(xiàn)。
- 基于緩存:由于緩存操作是原子性的,可以使用Redis或Memcached進(jìn)行實(shí)現(xiàn)分布式鎖。
- 基于Zookeeper:利用其臨時(shí)順序節(jié)點(diǎn)特性實(shí)現(xiàn)。
- 基于分布式協(xié)調(diào)服務(wù):使用Etcd或Consul的API實(shí)現(xiàn)。
沒種分布式鎖的實(shí)現(xiàn)原理也有些不同,今天我們主要基于Redis緩存進(jìn)一步說明分布式鎖的實(shí)現(xiàn)。
適用場(chǎng)景
- 數(shù)據(jù)并發(fā)讀寫:當(dāng)多個(gè)節(jié)點(diǎn)需要對(duì)同一數(shù)據(jù)進(jìn)行操作時(shí),分布式鎖可以確保在任何時(shí)刻只有一個(gè)節(jié)點(diǎn)能夠執(zhí)行寫操作,從而避免數(shù)據(jù)的不一致。
- 業(yè)務(wù)流程控制:在復(fù)雜的業(yè)務(wù)流程中,多個(gè)節(jié)點(diǎn)可能需要按特定順序執(zhí)行某些操作??梢允褂梅植际芥i來保證這些流程按順序執(zhí)行。
- 資源爭(zhēng)搶:當(dāng)多個(gè)節(jié)點(diǎn)需要爭(zhēng)搶有限的資源時(shí),分布式鎖可以避免資源競(jìng)爭(zhēng)導(dǎo)致的系統(tǒng)壓力增大或資源浪費(fèi)。
實(shí)現(xiàn)原理
使用Redis實(shí)現(xiàn)分布式鎖主要分為以下幾個(gè)步驟:
1.使用Redis的setNx方法,設(shè)置key-value到緩存中,由于Redis操作是原子性的,并且該方法操作若key在緩存中不存在,則會(huì)創(chuàng)建key并設(shè)值后返回狀態(tài)碼1,否則返回狀態(tài)碼0,所以我們只需根據(jù)返回的狀態(tài)碼就可以確定是否有線程正在占用資源,以此作為是否加鎖成功的依據(jù)。
2.使用Redis的delete方法,可以在流程執(zhí)行完成后對(duì)緩存進(jìn)行刪除,以此達(dá)到釋放鎖的目的,從而使其他線程可以加鎖進(jìn)行操作。
3.使用Redis的expire方法,設(shè)置緩存過期時(shí)間,主要用于防止一些故障導(dǎo)致緩存沒有釋放,而長(zhǎng)時(shí)間占用鎖從而導(dǎo)致死鎖,同時(shí)也可以為長(zhǎng)時(shí)間的任務(wù)進(jìn)行續(xù)期。
分布式鎖設(shè)計(jì)實(shí)現(xiàn)
這里我們主要通過SpringBoot已經(jīng)封裝好的RedisTemplate工具類進(jìn)行詳細(xì)講解。
引入maven依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <version>2.2.5.RELEASE</version> </dependency>
話不多說直接上代碼
public class RedisLock { private static final Logger logger = LoggerFactory.getLogger(RedisLock.class); /** * 鎖狀態(tài) */ private Boolean locked = false; /** * Redis操作工具 */ private RedisTemplate redisTemplate; /** * 緩存過期時(shí)間(單位:秒) */ private Long expireTime = 30L; /** * 緩存key */ private String key; public RedisLock(RedisTemplate redisTemplate, String key) { this.redisTemplate = redisTemplate; this.key = key; } public RedisLock(RedisTemplate redisTemplate, String key, Long expireTime) { this(redisTemplate, key); this.expireTime = expireTime; } public boolean tryLock(long intervalTimeMs, long timeoutMs) { // 重試間隔時(shí)間,默認(rèn)100 if (intervalTimeMs <= 0) { intervalTimeMs = 100L; } // 獲取鎖超時(shí)時(shí)間,默認(rèn)100 if (timeoutMs <= intervalTimeMs) { timeoutMs = 100L; } while (timeoutMs >= 0) { // 設(shè)置緩存,底層調(diào)用setNx方法,返回true即緩存設(shè)置成功,則表示加鎖成功 if (redisTemplate.opsForValue().setIfAbsent(key, "1", expireTime, TimeUnit.SECONDS)) { locked = true; return true; } // 否則按間隔時(shí)間進(jìn)行休眠后再重新嘗試加鎖 sleep(intervalTimeMs); timeoutMs -= intervalTimeMs; } locked = false; return false; } public void releaseLock() { if (!locked) { return; } // 僅已加鎖才進(jìn)行刪除 redisTemplate.delete(key); locked = false; } /** * 延長(zhǎng)過期時(shí)間 */ public void renewal() { redisTemplate.expire(key, expireTime, TimeUnit.SECONDS); } /** * 自定義延長(zhǎng)過期時(shí)間 * @param renewalTime 延長(zhǎng)的時(shí)間(單位:秒) */ public void renewal(long renewalTime) { redisTemplate.expire(key, renewalTime, TimeUnit.SECONDS); } private void sleep(long intervalTimeMs) { try { TimeUnit.MILLISECONDS.sleep(intervalTimeMs); } catch (InterruptedException e) { logger.error("tryLock:{} timeout exception", key); } } } public class RedisLockBuild { public static RedisLock build(RedisTemplate redisTemplate, String key) { RedisLock redisLock = new RedisLock(redisTemplate, key); return redisLock; } public static RedisLock build(RedisTemplate redisTemplate, String key, long expireTime) { RedisLock redisLock = new RedisLock(redisTemplate, key, expireTime); return redisLock; } }
從以上的兩個(gè)代碼類來看是不是覺得如此簡(jiǎn)單,主要流程點(diǎn)都有注釋,接下來我們就仔細(xì)說明一下這兩個(gè)類的作用:
RedisLock: Redis分布式鎖工具類,主要提供以下幾個(gè)方法:
public boolean tryLock(long intervalTimeMs, long timeoutMs)
嘗試獲取鎖,并傳遞參數(shù)作為獲取不到鎖時(shí)進(jìn)行等待,以及等待每次重新獲取鎖的時(shí)間,在有效時(shí)間內(nèi)獲取到鎖就返回成功,否則返回失敗,業(yè)務(wù)側(cè)根據(jù)獲取鎖的結(jié)果進(jìn)行業(yè)務(wù)處理。
public void releaseLock()
釋放鎖,通常為了在鎖使用完成后對(duì)其進(jìn)行快速釋放,供其他線程進(jìn)行使用,所以在加鎖的代碼塊都需要使用try-finally進(jìn)行處理,必須主動(dòng)在finally處進(jìn)行鎖釋放。
public void renewal(long renewalTime)
為防止有個(gè)別處理時(shí)間較長(zhǎng)的流程,防止流程未處理完鎖就過期,提供此方法用于對(duì)鎖進(jìn)行續(xù)期,也就是對(duì)鎖重新設(shè)置過期時(shí)間,以達(dá)到延遲過期的效果,使用需要注意釋放鎖防止造成死鎖。
RedisLockBuild: 作為RedisLock的構(gòu)造器,主要方便我們構(gòu)造RedisLock對(duì)象,有需要的可根據(jù)個(gè)人所需進(jìn)行封裝。
使用案例
看似簡(jiǎn)單的東西,不動(dòng)手試一下怎么知道好不好用呢,這里我們就舉個(gè)栗子來說明一下:
@RestController @RequestMapping("/redis") public class RedisController { private static final Logger logger = LoggerFactory.getLogger(RedisController.class); @Autowired private RedisTemplate redisTemplate; private Integer inventoryNumber = 10; @PostMapping("/lock") public String lock() { CompletableFuture cf1 = CompletableFuture.runAsync(() -> increaseInventory(1)); CompletableFuture cf2 = CompletableFuture.runAsync(() -> decreaseInventory(1)); CompletableFuture<Void> cfAll = CompletableFuture.allOf(cf1, cf2); cfAll.join(); logger.info("Inventory Number:{}", inventoryNumber); return "test success"; } /** * 增加庫(kù)存 */ @SneakyThrows public void increaseInventory(int number) { logger.info("----- increaseInventory start -----"); RedisLock redisLock = RedisLockBuild.build(redisTemplate, "inventory"); try { if (redisLock.tryLock(500, 3000)) { inventoryNumber += number; logger.info("----- increaseInventory success sleep 2 seconds -----"); TimeUnit.SECONDS.sleep(2); } else { logger.info("increaseInventory lock failed."); } } finally { redisLock.releaseLock(); } logger.info("----- increaseInventory end -----"); } /** * 減少庫(kù)存 */ @SneakyThrows public void decreaseInventory(int number) { logger.info("----- decreaseInventory start -----"); RedisLock redisLock = RedisLockBuild.build(redisTemplate, "inventory"); try { if (redisLock.tryLock(500, 3000)) { inventoryNumber -= number; logger.info("----- decreaseInventory success sleep 2 seconds -----"); TimeUnit.SECONDS.sleep(2); } else { logger.info("decreaseInventory lock failed."); } } finally { redisLock.releaseLock(); } logger.info("----- decreaseInventory end -----"); } }
以上代碼我們簡(jiǎn)單寫了個(gè)測(cè)試類,設(shè)置一個(gè)庫(kù)存變量inventoryNumber=10,兩個(gè)任務(wù)cf1、cf2,并進(jìn)行異步并發(fā)執(zhí)行,這兩個(gè)任務(wù)同時(shí)對(duì)庫(kù)存變量inventoryNumber進(jìn)行增加和遞減,此時(shí)我們細(xì)看increaseInventory、decreaseInventory這兩個(gè)方法里面的實(shí)現(xiàn),同時(shí)都對(duì)庫(kù)存修改的動(dòng)作進(jìn)行了加分布式鎖操作,方式并發(fā)修改數(shù)據(jù)出現(xiàn)數(shù)據(jù)不一致問題。如此我們就成功實(shí)現(xiàn)了分布式鎖。
為了更加可觀,我們?cè)诖a里設(shè)置了休眠2秒的時(shí)間,上圖是我們的一次運(yùn)行結(jié)果,也就是無論哪個(gè)任務(wù)先獲取到鎖,另一個(gè)任務(wù)就要等休眠2秒過后釋放后才能夠獲取鎖,從而進(jìn)行庫(kù)存操作。
總結(jié)
使用分布式鎖時(shí)也是需要合理分析場(chǎng)景,并合理設(shè)置好鎖的時(shí)間,以達(dá)到鎖資源的合理分配,從而使我們的系統(tǒng)在高效運(yùn)行的情況下更加安全可靠。
到此這篇關(guān)于Redis實(shí)現(xiàn)分布式鎖的示例代碼的文章就介紹到這了,更多相關(guān)Redis 分布式鎖內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Redis 對(duì)過期數(shù)據(jù)的處理方法
這篇文章主要介紹了Redis 對(duì)過期數(shù)據(jù)的處理,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-10-10jwt+redis實(shí)現(xiàn)登錄認(rèn)證的示例代碼
在登錄業(yè)務(wù)代碼中,當(dāng)用戶登錄成功時(shí),生成一個(gè)登錄憑證存儲(chǔ)到redis中,本文主要介紹了jwt+redis實(shí)現(xiàn)登錄認(rèn)證的示例代碼,具有一定的參考價(jià)值,感興趣的可以了解一下2024-03-03基于Redis的分布式鎖的簡(jiǎn)單實(shí)現(xiàn)方法
這篇文章主要介紹了基于Redis的分布式鎖的簡(jiǎn)單實(shí)現(xiàn)方法,Redis官方給出兩種思路,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-10-10Redis集群服務(wù)器的實(shí)現(xiàn)(圖文步驟)
本文介紹了Redis集群服務(wù)器的優(yōu)勢(shì),為讀者提供了全面的Redis集群服務(wù)器知識(shí)和使用技巧,具有一定的參考價(jià)值,感興趣的可以了解一下2023-09-09