Redis實現(xiàn)分布式鎖的示例代碼
什么是分布式鎖
分布式鎖是一種應(yīng)用級的鎖,在分布式系統(tǒng)下一個應(yīng)用通常會有多個節(jié)點部署,那么在高并發(fā)的情況下,就可能出現(xiàn)同一時間多個節(jié)點多個線程同時處理同一條數(shù)據(jù),這種情況很容易出現(xiàn)數(shù)據(jù)不一致。那么分布式鎖的出現(xiàn)就是為了解決此問題,它能夠保證一個代碼塊在同一時間只能被同一線程或同一個節(jié)點執(zhí)行。
分布式鎖有哪些
- 基于數(shù)據(jù)庫:使用數(shù)據(jù)庫設(shè)計排他鎖或者共享鎖來實現(xiàn)。
- 基于緩存:由于緩存操作是原子性的,可以使用Redis或Memcached進(jìn)行實現(xiàn)分布式鎖。
- 基于Zookeeper:利用其臨時順序節(jié)點特性實現(xiàn)。
- 基于分布式協(xié)調(diào)服務(wù):使用Etcd或Consul的API實現(xiàn)。
沒種分布式鎖的實現(xiàn)原理也有些不同,今天我們主要基于Redis緩存進(jìn)一步說明分布式鎖的實現(xiàn)。
適用場景
- 數(shù)據(jù)并發(fā)讀寫:當(dāng)多個節(jié)點需要對同一數(shù)據(jù)進(jìn)行操作時,分布式鎖可以確保在任何時刻只有一個節(jié)點能夠執(zhí)行寫操作,從而避免數(shù)據(jù)的不一致。
- 業(yè)務(wù)流程控制:在復(fù)雜的業(yè)務(wù)流程中,多個節(jié)點可能需要按特定順序執(zhí)行某些操作??梢允褂梅植际芥i來保證這些流程按順序執(zhí)行。
- 資源爭搶:當(dāng)多個節(jié)點需要爭搶有限的資源時,分布式鎖可以避免資源競爭導(dǎo)致的系統(tǒng)壓力增大或資源浪費。
實現(xiàn)原理
使用Redis實現(xiàn)分布式鎖主要分為以下幾個步驟:
1.使用Redis的setNx方法,設(shè)置key-value到緩存中,由于Redis操作是原子性的,并且該方法操作若key在緩存中不存在,則會創(chuàng)建key并設(shè)值后返回狀態(tài)碼1,否則返回狀態(tài)碼0,所以我們只需根據(jù)返回的狀態(tài)碼就可以確定是否有線程正在占用資源,以此作為是否加鎖成功的依據(jù)。
2.使用Redis的delete方法,可以在流程執(zhí)行完成后對緩存進(jìn)行刪除,以此達(dá)到釋放鎖的目的,從而使其他線程可以加鎖進(jìn)行操作。
3.使用Redis的expire方法,設(shè)置緩存過期時間,主要用于防止一些故障導(dǎo)致緩存沒有釋放,而長時間占用鎖從而導(dǎo)致死鎖,同時也可以為長時間的任務(wù)進(jìn)行續(xù)期。
分布式鎖設(shè)計實現(xiàn)
這里我們主要通過SpringBoot已經(jīng)封裝好的RedisTemplate工具類進(jìn)行詳細(xì)講解。
引入maven依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <version>2.2.5.RELEASE</version> </dependency>
話不多說直接上代碼
public class RedisLock {
private static final Logger logger = LoggerFactory.getLogger(RedisLock.class);
/**
* 鎖狀態(tài)
*/
private Boolean locked = false;
/**
* Redis操作工具
*/
private RedisTemplate redisTemplate;
/**
* 緩存過期時間(單位:秒)
*/
private Long expireTime = 30L;
/**
* 緩存key
*/
private String key;
public RedisLock(RedisTemplate redisTemplate, String key) {
this.redisTemplate = redisTemplate;
this.key = key;
}
public RedisLock(RedisTemplate redisTemplate, String key, Long expireTime) {
this(redisTemplate, key);
this.expireTime = expireTime;
}
public boolean tryLock(long intervalTimeMs, long timeoutMs) {
// 重試間隔時間,默認(rèn)100
if (intervalTimeMs <= 0) {
intervalTimeMs = 100L;
}
// 獲取鎖超時時間,默認(rèn)100
if (timeoutMs <= intervalTimeMs) {
timeoutMs = 100L;
}
while (timeoutMs >= 0) {
// 設(shè)置緩存,底層調(diào)用setNx方法,返回true即緩存設(shè)置成功,則表示加鎖成功
if (redisTemplate.opsForValue().setIfAbsent(key, "1", expireTime, TimeUnit.SECONDS)) {
locked = true;
return true;
}
// 否則按間隔時間進(jìn)行休眠后再重新嘗試加鎖
sleep(intervalTimeMs);
timeoutMs -= intervalTimeMs;
}
locked = false;
return false;
}
public void releaseLock() {
if (!locked) {
return;
}
// 僅已加鎖才進(jìn)行刪除
redisTemplate.delete(key);
locked = false;
}
/**
* 延長過期時間
*/
public void renewal() {
redisTemplate.expire(key, expireTime, TimeUnit.SECONDS);
}
/**
* 自定義延長過期時間
* @param renewalTime 延長的時間(單位:秒)
*/
public void renewal(long renewalTime) {
redisTemplate.expire(key, renewalTime, TimeUnit.SECONDS);
}
private void sleep(long intervalTimeMs) {
try {
TimeUnit.MILLISECONDS.sleep(intervalTimeMs);
} catch (InterruptedException e) {
logger.error("tryLock:{} timeout exception", key);
}
}
}
public class RedisLockBuild {
public static RedisLock build(RedisTemplate redisTemplate, String key) {
RedisLock redisLock = new RedisLock(redisTemplate, key);
return redisLock;
}
public static RedisLock build(RedisTemplate redisTemplate, String key, long expireTime) {
RedisLock redisLock = new RedisLock(redisTemplate, key, expireTime);
return redisLock;
}
}從以上的兩個代碼類來看是不是覺得如此簡單,主要流程點都有注釋,接下來我們就仔細(xì)說明一下這兩個類的作用:
RedisLock: Redis分布式鎖工具類,主要提供以下幾個方法:
public boolean tryLock(long intervalTimeMs, long timeoutMs)
嘗試獲取鎖,并傳遞參數(shù)作為獲取不到鎖時進(jìn)行等待,以及等待每次重新獲取鎖的時間,在有效時間內(nèi)獲取到鎖就返回成功,否則返回失敗,業(yè)務(wù)側(cè)根據(jù)獲取鎖的結(jié)果進(jìn)行業(yè)務(wù)處理。
public void releaseLock()
釋放鎖,通常為了在鎖使用完成后對其進(jìn)行快速釋放,供其他線程進(jìn)行使用,所以在加鎖的代碼塊都需要使用try-finally進(jìn)行處理,必須主動在finally處進(jìn)行鎖釋放。
public void renewal(long renewalTime)
為防止有個別處理時間較長的流程,防止流程未處理完鎖就過期,提供此方法用于對鎖進(jìn)行續(xù)期,也就是對鎖重新設(shè)置過期時間,以達(dá)到延遲過期的效果,使用需要注意釋放鎖防止造成死鎖。
RedisLockBuild: 作為RedisLock的構(gòu)造器,主要方便我們構(gòu)造RedisLock對象,有需要的可根據(jù)個人所需進(jìn)行封裝。
使用案例
看似簡單的東西,不動手試一下怎么知道好不好用呢,這里我們就舉個栗子來說明一下:
@RestController
@RequestMapping("/redis")
public class RedisController {
private static final Logger logger = LoggerFactory.getLogger(RedisController.class);
@Autowired
private RedisTemplate redisTemplate;
private Integer inventoryNumber = 10;
@PostMapping("/lock")
public String lock() {
CompletableFuture cf1 = CompletableFuture.runAsync(() -> increaseInventory(1));
CompletableFuture cf2 = CompletableFuture.runAsync(() -> decreaseInventory(1));
CompletableFuture<Void> cfAll = CompletableFuture.allOf(cf1, cf2);
cfAll.join();
logger.info("Inventory Number:{}", inventoryNumber);
return "test success";
}
/**
* 增加庫存
*/
@SneakyThrows
public void increaseInventory(int number) {
logger.info("----- increaseInventory start -----");
RedisLock redisLock = RedisLockBuild.build(redisTemplate, "inventory");
try {
if (redisLock.tryLock(500, 3000)) {
inventoryNumber += number;
logger.info("----- increaseInventory success sleep 2 seconds -----");
TimeUnit.SECONDS.sleep(2);
}
else {
logger.info("increaseInventory lock failed.");
}
}
finally {
redisLock.releaseLock();
}
logger.info("----- increaseInventory end -----");
}
/**
* 減少庫存
*/
@SneakyThrows
public void decreaseInventory(int number) {
logger.info("----- decreaseInventory start -----");
RedisLock redisLock = RedisLockBuild.build(redisTemplate, "inventory");
try {
if (redisLock.tryLock(500, 3000)) {
inventoryNumber -= number;
logger.info("----- decreaseInventory success sleep 2 seconds -----");
TimeUnit.SECONDS.sleep(2);
}
else {
logger.info("decreaseInventory lock failed.");
}
}
finally {
redisLock.releaseLock();
}
logger.info("----- decreaseInventory end -----");
}
}以上代碼我們簡單寫了個測試類,設(shè)置一個庫存變量inventoryNumber=10,兩個任務(wù)cf1、cf2,并進(jìn)行異步并發(fā)執(zhí)行,這兩個任務(wù)同時對庫存變量inventoryNumber進(jìn)行增加和遞減,此時我們細(xì)看increaseInventory、decreaseInventory這兩個方法里面的實現(xiàn),同時都對庫存修改的動作進(jìn)行了加分布式鎖操作,方式并發(fā)修改數(shù)據(jù)出現(xiàn)數(shù)據(jù)不一致問題。如此我們就成功實現(xiàn)了分布式鎖。

為了更加可觀,我們在代碼里設(shè)置了休眠2秒的時間,上圖是我們的一次運行結(jié)果,也就是無論哪個任務(wù)先獲取到鎖,另一個任務(wù)就要等休眠2秒過后釋放后才能夠獲取鎖,從而進(jìn)行庫存操作。
總結(jié)
使用分布式鎖時也是需要合理分析場景,并合理設(shè)置好鎖的時間,以達(dá)到鎖資源的合理分配,從而使我們的系統(tǒng)在高效運行的情況下更加安全可靠。
到此這篇關(guān)于Redis實現(xiàn)分布式鎖的示例代碼的文章就介紹到這了,更多相關(guān)Redis 分布式鎖內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
jwt+redis實現(xiàn)登錄認(rèn)證的示例代碼
在登錄業(yè)務(wù)代碼中,當(dāng)用戶登錄成功時,生成一個登錄憑證存儲到redis中,本文主要介紹了jwt+redis實現(xiàn)登錄認(rèn)證的示例代碼,具有一定的參考價值,感興趣的可以了解一下2024-03-03

