Redisson分布式信號(hào)量RSemaphore的使用超詳細(xì)講解
本篇文章基于redisson-3.17.6版本源碼進(jìn)行分析
一、RSemaphore的使用
@Test
public void testRSemaphore() {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
RedissonClient redissonClient = Redisson.create(config);
RSemaphore rSemaphore = redissonClient.getSemaphore("semaphore");
// 設(shè)置5個(gè)許可,模擬五個(gè)停車位
rSemaphore.trySetPermits(5);
// 創(chuàng)建10個(gè)線程,模擬10輛車過來停車
for (int i = 1; i <= 10; i++) {
new Thread(() -> {
try {
rSemaphore.acquire();
System.out.println(Thread.currentThread().getName() + "進(jìn)入停車場...");
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(100));
System.out.println(Thread.currentThread().getName() + "離開停車場...");
rSemaphore.release();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, "A" + i).start();
}
try {
TimeUnit.MINUTES.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}二、RSemaphore設(shè)置許可數(shù)量
初始化RSemaphore,需要調(diào)用trySetPermits()設(shè)置許可數(shù)量:
/** * 嘗試設(shè)置許可數(shù)量,設(shè)置成功,返回true,否則返回false */ boolean trySetPermits(int permits);
trySetPermits()內(nèi)部調(diào)用了trySetPermitsAsync():
// 異步設(shè)置許可
@Override
public RFuture<Boolean> trySetPermitsAsync(int permits) {
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 判斷分布式信號(hào)量的key是否存在,如果不存在,才設(shè)置
"local value = redis.call('get', KEYS[1]); " +
"if (value == false) then "
// set "semaphore" permits
// 使用String數(shù)據(jù)結(jié)構(gòu)設(shè)置信號(hào)量的許可數(shù)
+ "redis.call('set', KEYS[1], ARGV[1]); "
// 發(fā)布一條消息到redisson_sc:{semaphore}通道
+ "redis.call('publish', KEYS[2], ARGV[1]); "
// 設(shè)置成功,返回1
+ "return 1;"
+ "end;"
// 否則返回0
+ "return 0;",
Arrays.asList(getRawName(), getChannelName()), permits);
if (log.isDebugEnabled()) {
future.thenAccept(r -> {
if (r) {
log.debug("permits set, permits: {}, name: {}", permits, getName());
} else {
log.debug("unable to set permits, permits: {}, name: {}", permits, getName());
}
});
}
return future;
}可以看到,設(shè)置許可數(shù)量底層使用LUA腳本,實(shí)際上就是使用redis的String數(shù)據(jù)結(jié)構(gòu),保存了我們指定的許可數(shù)量。如下圖:

參數(shù)說明:
- KEYS[1]: 我們指定的分布式信號(hào)量key,例如redissonClient.getSemaphore("semaphore")中的"semaphore")
- KEYS[2]: 釋放鎖的channel名稱,redisson_sc:{分布式信號(hào)量key},在本例中,就是redisson_sc:{semaphore}
- ARGV[1]: 設(shè)置的許可數(shù)量
總結(jié)設(shè)置許可執(zhí)行流程為:
- get semaphore,獲取到semaphore信號(hào)量的當(dāng)前的值
- 第一次數(shù)據(jù)為0, 然后使用set semaphore 3,將這個(gè)信號(hào)量同時(shí)能夠允許獲取鎖的客戶端的數(shù)量設(shè)置為3。(注意到,如果之前設(shè)置過了信號(hào)量,將無法再次設(shè)置,直接返回0。想要更改信號(hào)量總數(shù)可以使用addPermits方法)
- 然后redis發(fā)布一些消息,返回1
三、RSemaphore的加鎖流程
許可數(shù)量設(shè)置好之后,我們就可以調(diào)用acquire()方法獲取了,如果未傳入許可數(shù)量,默認(rèn)獲取一個(gè)許可。
public void acquire() throws InterruptedException {
acquire(1);
}
public void acquire(int permits) throws InterruptedException {
// 嘗試獲取鎖成功,直接返回
if (tryAcquire(permits)) {
return;
}
// 對(duì)于沒有獲取鎖的那些線程,訂閱redisson_sc:{分布式信號(hào)量key}通道的消息
CompletableFuture<RedissonLockEntry> future = subscribe();
semaphorePubSub.timeout(future);
RedissonLockEntry entry = commandExecutor.getInterrupted(future);
try {
// 不斷循環(huán)嘗試獲取許可
while (true) {
if (tryAcquire(permits)) {
return;
}
entry.getLatch().acquire();
}
} finally {
// 取消訂閱
unsubscribe(entry);
}
// get(acquireAsync(permits));
}可以看到,獲取許可的核心邏輯在tryAcquire()方法中,如果tryAcquire()返回true說明獲取許可成功,直接返回;如果返回false,說明當(dāng)前沒有許可可以使用,則對(duì)于沒有獲取鎖的那些線程,訂閱redisson_sc:{分布式信號(hào)量key}通道的消息,并通過死循環(huán)不斷嘗試獲取鎖。
我們看一下tryAcquire()方法的邏輯,內(nèi)部調(diào)用了tryAcquireAsync()方法:
// 異步獲取許可
@Override
public RFuture<Boolean> tryAcquireAsync(int permits) {
if (permits < 0) {
throw new IllegalArgumentException("Permits amount can't be negative");
}
if (permits == 0) {
return new CompletableFutureWrapper<>(true);
}
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 獲取當(dāng)前剩余的許可數(shù)量
"local value = redis.call('get', KEYS[1]); " +
// 許可不為空,并且許可數(shù)量 大于等于 當(dāng)前線程申請(qǐng)的許可數(shù)量
"if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " +
// 通過decrby減少剩余可用許可
"local val = redis.call('decrby', KEYS[1], ARGV[1]); " +
// 返回1
"return 1; " +
"end; " +
// 其它情況,返回0
"return 0;",
Collections.<Object>singletonList(getRawName()), permits);
}從源碼可以看到,獲取許可就是操作redis中的數(shù)據(jù),首先獲取到redis中剩余的許可數(shù)量,只有當(dāng)剩余的許可數(shù)量大于線程申請(qǐng)的許可數(shù)量時(shí),才獲取成功,返回1;否則獲取失敗,返回0;
總結(jié)加鎖執(zhí)行流程為:
- get semaphore,獲取到一個(gè)當(dāng)前的值,比如說是3,3 > 1
- decrby semaphore 1,將信號(hào)量允許獲取鎖的客戶端的數(shù)量遞減1,變成2
- decrby semaphore 1
- decrby semaphore 1
- 執(zhí)行3次加鎖后,semaphore值為0
- 此時(shí)如果再來進(jìn)行加鎖則直接返回0,然后進(jìn)入死循環(huán)去獲取鎖
四、RSemaphore的解鎖流程
通過前面對(duì)RSemaphore獲取鎖的分析,我們很容易能猜到,釋放鎖,無非就是歸還許可數(shù)量到redis中。我們查看具體的源碼:
public RFuture<Void> releaseAsync(int permits) {
if (permits < 0) {
throw new IllegalArgumentException("Permits amount can't be negative");
}
if (permits == 0) {
return new CompletableFutureWrapper<>((Void) null);
}
RFuture<Void> future = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
// 通過incrby增加許可數(shù)量
"local value = redis.call('incrby', KEYS[1], ARGV[1]); " +
// 發(fā)布一條消息到redisson_sc:{semaphore}中
"redis.call('publish', KEYS[2], value); ",
Arrays.asList(getRawName(), getChannelName()), permits);
if (log.isDebugEnabled()) {
future.thenAccept(o -> {
log.debug("released, permits: {}, name: {}", permits, getName());
});
}
return future;
}到此這篇關(guān)于Redisson分布式信號(hào)量RSemaphore的使用超詳細(xì)講解的文章就介紹到這了,更多相關(guān)Redisson RSemaphore內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java實(shí)現(xiàn)樹形菜單的方法總結(jié)
當(dāng)我們想要展示層級(jí)結(jié)構(gòu),如文件目錄、組織結(jié)構(gòu)或分類目錄時(shí),樹形菜單是一個(gè)直觀且有效的解決方案,本文為大家整理了java中幾種常見方法,希望對(duì)大家有所幫助2023-08-08
Java控制臺(tái)輸入數(shù)組并逆序輸出的方法實(shí)例
這篇文章主要介紹了Java手動(dòng)輸入數(shù)組并逆向輸出的方法實(shí)例,需要的朋友可以參考下。2017-08-08
SpringBoot整合數(shù)據(jù)庫訪問層的實(shí)戰(zhàn)
本文主要介紹了SpringBoot整合數(shù)據(jù)庫訪問層的實(shí)戰(zhàn),主要包含JdbcTemplate和mybatis框架的整合應(yīng)用,具有一定的參考價(jià)值,感興趣的可以了解一下2022-03-03
Springboot通過ObjectMapper配置json序列化詳解
SpringBoot默認(rèn)集成Jackson庫,其中ObjectMapper類是核心,用于Java對(duì)象與JSON字符串的互轉(zhuǎn),提供配置序列化特性、注冊(cè)模塊等方法,在SpringBoot中可以全局配置JSON格式,如日期格式化、將Long轉(zhuǎn)為字符串,還可以配置序列化時(shí)的各種規(guī)則,感興趣的可以了解一下2024-10-10
Java Grpc實(shí)例創(chuàng)建負(fù)載均衡詳解
這篇文章主要介紹了Java Grpc實(shí)例創(chuàng)建負(fù)載均衡詳解,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-03-03
springboot(thymeleaf)中th:field和th:value的區(qū)別及說明
這篇文章主要介紹了springboot(thymeleaf)中th:field和th:value的區(qū)別及說明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-10-10

