SpringBoot整合Redisson實現(xiàn)分布式鎖
Redisson是架設在redis基礎上的一個Java駐內(nèi)存數(shù)據(jù)網(wǎng)格(In-Memory Data Grid)。充分的利用了Redis鍵值數(shù)據(jù)庫提供的一系列優(yōu)勢,基于Java實用工具包中常用接口,為使用者提供了一系列具有分布式特性的常用工具類。使得原本作為協(xié)調(diào)單機多線程并發(fā)程序的工具包獲得了協(xié)調(diào)分布式多機多線程并發(fā)系統(tǒng)的能力,大大降低了設計和研發(fā)大規(guī)模分布式系統(tǒng)的難度。同時結(jié)合各富特色的分布式服務,更進一步簡化了分布式環(huán)境中程序相互之間的協(xié)作。
Github地址:https://github.com/redisson/redisson
一、添加依賴
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- springboot整合redis --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <!-- springboot整合redisson --> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.13.6</version> </dependency> </dependencies>
二、redis配置文件
server: port: 8000 spring: redis: host: localhost port: 6379 password: null database: 1 timeout: 30000
三、新建配置類
@Configuration public class MyRedissonConfig { @Value("${spring.redis.host}") String redisHost; @Value("${spring.redis.port}") String redisPort; @Value("${spring.redis.password}") String redisPassword; @Value("${spring.redis.timeout}") Integer redisTimeout; /** * Redisson配置 * @return */ @Bean RedissonClient redissonClient() { //1、創(chuàng)建配置 Config config = new Config(); redisHost = redisHost.startsWith("redis://") ? redisHost : "redis://" + redisHost; SingleServerConfig serverConfig = config.useSingleServer() .setAddress(redisHost + ":" + redisPort) .setTimeout(redisTimeout); if (StringUtils.isNotBlank(redisPassword)) { serverConfig.setPassword(redisPassword); } return Redisson.create(config); } }
//單機 RedissonClient redisson = Redisson.create(); Config config = new Config(); config.useSingleServer().setAddress("myredisserver:6379"); RedissonClient redisson = Redisson.create(config); //主從 Config config = new Config(); config.useMasterSlaveServers() .setMasterAddress("127.0.0.1:6379") .addSlaveAddress("127.0.0.1:6389", "127.0.0.1:6332", "127.0.0.1:6419") .addSlaveAddress("127.0.0.1:6399"); RedissonClient redisson = Redisson.create(config); //哨兵 Config config = new Config(); config.useSentinelServers() .setMasterName("mymaster") .addSentinelAddress("127.0.0.1:26389", "127.0.0.1:26379") .addSentinelAddress("127.0.0.1:26319"); RedissonClient redisson = Redisson.create(config); //集群 Config config = new Config(); config.useClusterServers() .setScanInterval(2000) // cluster state scan interval in milliseconds .addNodeAddress("127.0.0.1:7000", "127.0.0.1:7001") .addNodeAddress("127.0.0.1:7002"); RedissonClient redisson = Redisson.create(config);
四、使用分布式鎖
可重入鎖
基于Redis的Redisson分布式可重入鎖RLock對象實現(xiàn)了java.util.concurrent.locks.Lock接口。
@RequestMapping("/redisson") public String testRedisson(){ //獲取分布式鎖,只要鎖的名字一樣,就是同一把鎖 RLock lock = redissonClient.getLock("lock"); //加鎖(阻塞等待),默認過期時間是無限期 lock.lock(); try{ //如果業(yè)務執(zhí)行過長,Redisson會自動給鎖續(xù)期 Thread.sleep(1000); System.out.println("加鎖成功,執(zhí)行業(yè)務邏輯"); } catch (InterruptedException e) { e.printStackTrace(); } finally { //解鎖,如果業(yè)務執(zhí)行完成,就不會繼續(xù)續(xù)期 lock.unlock(); } return "Hello Redisson!"; }
如果拿到分布式鎖的節(jié)點宕機,且這個鎖正好處于鎖住的狀態(tài)時,會出現(xiàn)鎖死的狀態(tài),為了避免這種情況的發(fā)生,鎖都會設置一個過期時間。這樣也存在一個問題,一個線程拿到了鎖設置了30s超時,在30s后這個線程還沒有執(zhí)行完畢,鎖超時釋放了,就會導致問題,Redisson給出了自己的答案,就是 watch dog 自動延期機制。
Redisson提供了一個監(jiān)控鎖的看門狗,它的作用是在Redisson實例被關(guān)閉前,不斷的延長鎖的有效期,也就是說,如果一個拿到鎖的線程一直沒有完成邏輯,那么看門狗會幫助線程不斷的延長鎖超時時間,鎖不會因為超時而被釋放。
默認情況下,看門狗的續(xù)期時間是30s,也可以通過修改Config.lockWatchdogTimeout來另行指定。
另外Redisson 還提供了可以指定leaseTime參數(shù)的加鎖方法來指定加鎖的時間。超過這個時間后鎖便自動解開了,不會延長鎖的有效期。
在RedissonLock類的renewExpiration()方法中,會啟動一個定時任務每隔30/3=10秒給鎖續(xù)期。如果業(yè)務執(zhí)行期間,應用掛了,那么不會自動續(xù)期,到過期時間之后,鎖會自動釋放。
private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } RFuture<Boolean> future = renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null) { log.error("Can't update lock " + getName() + " expiration", e); return; } if (res) { // reschedule itself renewExpiration(); } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); ee.setTimeout(task); }
另外Redisson還提供了leaseTime的參數(shù)來指定加鎖的時間。超過這個時間后鎖便自動解開了。
// 加鎖以后10秒鐘自動解鎖 // 無需調(diào)用unlock方法手動解鎖 lock.lock(10, TimeUnit.SECONDS); // 嘗試加鎖,最多等待100秒,上鎖以后10秒自動解鎖 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
如果指定了鎖的超時時間,底層直接調(diào)用lua腳本,進行占鎖。如果超過leaseTime,業(yè)務邏輯還沒有執(zhí)行完成,則直接釋放鎖,所以在指定leaseTime時,要讓leaseTime大于業(yè)務執(zhí)行時間。RedissonLock類的tryLockInnerAsync()方法
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
讀寫鎖
分布式可重入讀寫鎖允許同時有多個讀鎖和一個寫鎖處于加鎖狀態(tài)。在讀寫鎖中,讀讀共享、讀寫互斥、寫寫互斥。
RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock"); // 最常見的使用方法 rwlock.readLock().lock(); // 或 rwlock.writeLock().lock();
讀寫鎖測試類,當訪問write接口時,read接口會被阻塞住。
@RestController public class TestController { @Autowired RedissonClient redissonClient; @Autowired StringRedisTemplate redisTemplate; @RequestMapping("/write") public String write(){ RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("wr-lock"); RLock writeLock = readWriteLock.writeLock(); String s = UUID.randomUUID().toString(); writeLock.lock(); try { redisTemplate.opsForValue().set("wr-lock-key", s); Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); }finally { writeLock.unlock(); } return s; } @RequestMapping("/read") public String read(){ RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("wr-lock"); RLock readLock = readWriteLock.readLock(); String s = ""; readLock.lock(); try { s = redisTemplate.opsForValue().get("wr-lock-key"); } finally { readLock.unlock(); } return s; } }
信號量(Semaphore)
基于Redis的Redisson的分布式信號量(Semaphore)Java對象RSemaphore
采用了與java.util.concurrent.Semaphore
類似的接口和用法
關(guān)于信號量的使用你們能夠想象一下這個場景,有三個停車位,當三個停車位滿了后,其余車就不停了。能夠把車位比做信號,如今有三個信號,停一次車,用掉一個信號,車離開就是釋放一個信號。
咱們用 Redisson 來演示上述停車位的場景。
先定義一個占用停車位的方法:
/** * 停車,占用停車位 * 總共 3 個車位 */ @ResponseBody @RequestMapping("park") public String park() throws InterruptedException { // 獲取信號量(停車場) RSemaphore park = redisson.getSemaphore("park"); // 獲取一個信號(停車位) park.acquire(); return "OK"; }
再定義一個離開車位的方法:
/** * 釋放車位 * 總共 3 個車位 */ @ResponseBody @RequestMapping("leave") public String leave() throws InterruptedException { // 獲取信號量(停車場) RSemaphore park = redisson.getSemaphore("park"); // 釋放一個信號(停車位) park.release(); return "OK"; }
為了簡便,我用 Redis 客戶端添加了一個 key:“park”,值等于 3,表明信號量為 park,總共有三個值。
而后用 postman 發(fā)送 park 請求占用一個停車位。
而后在 redis 客戶端查看 park 的值,發(fā)現(xiàn)已經(jīng)改成 2 了。繼續(xù)調(diào)用兩次,發(fā)現(xiàn) park 的等于 0,當調(diào)用第四次的時候,會發(fā)現(xiàn)請求一直處于等待中
,說明車位不夠了。若是想要不阻塞,能夠用 tryAcquire 或 tryAcquireAsync。
咱們再調(diào)用離開車位的方法,park 的值變?yōu)榱?1,表明車位剩余 1 個。
注意:屢次執(zhí)行釋放信號量操做,剩余信號量會一直增長,而不是到 3 后就封頂了。
閉鎖(CountDownLatch)
CountDownLatch作用:某一線程,等待其他線程執(zhí)行完畢之后,自己再繼續(xù)執(zhí)行。
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch"); latch.trySetCount(1); latch.await(); // 在其他線程或其他JVM里 RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch"); latch.countDown();
在TestController中添加測試方法,訪問close接口時,調(diào)用await()方法進入阻塞狀態(tài),直到有三次訪問release接口時,close接口才會返回。
@RequestMapping("/close") public String close() throws InterruptedException { RCountDownLatch close = redissonClient.getCountDownLatch("close"); close.trySetCount(3); close.await(); return "close"; } @RequestMapping("/release") public String release(){ RCountDownLatch close = redissonClient.getCountDownLatch("close"); close.countDown(); return "release"; }
到此這篇關(guān)于SpringBoot整合Redisson實現(xiàn)分布式鎖的文章就介紹到這了,更多相關(guān)SpringBoot Redisson分布式鎖內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- 關(guān)于SpringBoot 使用 Redis 分布式鎖解決并發(fā)問題
- Springboot整合Redis實現(xiàn)超賣問題還原和流程分析(分布式鎖)
- springboot 集成redission 以及分布式鎖的使用詳解
- SpringBoot之使用Redis實現(xiàn)分布式鎖(秒殺系統(tǒng))
- Redis分布式鎖升級版RedLock及SpringBoot實現(xiàn)方法
- SpringBoot集成redis實現(xiàn)分布式鎖的示例代碼
- 基于springboot實現(xiàn)redis分布式鎖的方法
- SpringBoot3+Redis實現(xiàn)分布式鎖的配置方法
相關(guān)文章
java的各種集合為什么不安全(List、Set、Map)以及代替方案
這篇文章主要介紹了java的各種集合為什么不安全(List、Set、Map)以及代替方案,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-10-10Apache Commons Math3學習之數(shù)值積分實例代碼
這篇文章主要介紹了Apache Commons Math3學習之數(shù)值積分實例代碼,涉及使用辛普森積分的例子,這里分享給大家,供需要的朋友參考。2017-10-10詳解poi+springmvc+springjdbc導入導出excel實例
本篇文章主要介紹了poi+springmvc+springjdbc導入導出excel實例,非常具有實用價值,需要的朋友可以參考下。2017-01-01