Redisson如何解決Redis分布式鎖提前釋放問題
前言:
在分布式場景下,相信你或多或少需要使用分布式鎖來訪問臨界資源,或者控制耗時操作的并發(fā)性。
當然,實現(xiàn)分布式鎖的方案也比較多,比如數(shù)據(jù)庫、redis、zk 等等。本文主要結(jié)合一個線上案例,講解 redis 分布式鎖的相關(guān)實現(xiàn)。
一、問題描述:
某天線上出現(xiàn)了數(shù)據(jù)重復處理問題,經(jīng)排查后發(fā)現(xiàn),竟然是單次處理時間較長,redis 分布式鎖提前釋放
導致相同請求并發(fā)處理。
其實,這是一個鎖續(xù)約
的問題,對于一把分布式鎖,我們需要考慮,設置鎖多長時間過期、出現(xiàn)異常如何釋放鎖?
以上問題便是本文要討論的主題。
二、原因分析:
項目采用較簡單的自定義 redis 分布式鎖,為避免死鎖定義默認過期時間 10s,如下:
override fun lock() { while (true) { //嘗試獲取鎖 if (tryLock()) { return } try { Thread.sleep(10) } catch (e: InterruptedException) { e.printStackTrace() } } } override fun tryLock(): Boolean { val value = getUniqueSign() // 隨機串 val flag = redisTemplate!!.opsForValue().setIfAbsent(name, value, 10000, TimeUnit.MILLISECONDS) if (flag != null && flag) { VALUE_lOCAL.set(value) INTO_NUM_LOCAL.set(if (INTO_NUM_LOCAL.get() != null) INTO_NUM_LOCAL.get() + 1 else 1) return true } return false }
缺乏對鎖自動續(xù)期等實現(xiàn)。
三、解決方案:
1、思考:
針對這種場景,可以考慮的是如何給鎖自動續(xù)期-當業(yè)務沒有執(zhí)行結(jié)束的情況下,當然也可以自定義實現(xiàn) 比如開一個后臺線程定時的給這些拿到鎖的線程續(xù)期。
Redisson 也正是基于這種思路實現(xiàn)自動續(xù)期的分布式鎖,各種異常情況也考慮的更加完善,綜合考慮采用 Redisson 的分布式鎖解決方案優(yōu)化。
2、Redisson簡單配置:
@Configuration @EnableConfigurationProperties(RedissonProperties::class) class RedissonConfig { @Bean fun redissonClient(redissonProperties: RedissonProperties): RedissonClient { val config = Config() val singleServerConfig = redissonProperties.singleServerConfig!! config.useSingleServer().setAddress(singleServerConfig.address) .setDatabase(singleServerConfig.database) .setUsername(singleServerConfig.username) .setPassword(singleServerConfig.password) .setConnectionPoolSize(singleServerConfig.connectionPoolSize) .setConnectionMinimumIdleSize(singleServerConfig.connectionMinimumIdleSize) .setConnectTimeout(singleServerConfig.connectTimeout) .setIdleConnectionTimeout(singleServerConfig.idleConnectionTimeout) .setRetryInterval(singleServerConfig.retryInterval) .setRetryAttempts(singleServerConfig.retryAttempts) .setTimeout(singleServerConfig.timeout) return Redisson.create(config) } } @ConfigurationProperties(prefix = "xxx.redisson") class RedissonProperties { var singleServerConfig: SingleServerConfig? = null }
Redis 服務使用的騰訊云的哨兵模式架構(gòu),此架構(gòu)對外開放一個代理地址訪問,因此這里配置單機模式配置即可。
如果你是自己搭建的 redis 哨兵模式架構(gòu),需要按照文檔配置相關(guān)必要參數(shù)
3、使用樣例:
... @Autowired lateinit var redissonClient: RedissonClient ... fun xxx() { ... val lock = redissonClient.getLock("mylock") lock.lock() try { ... } finally { lock.unlock() } ... }
使用方式和JDK提供的鎖是不是很像?是不是很簡單?
正是Redisson這類優(yōu)秀的開源產(chǎn)品的出現(xiàn),才讓我們將更多的時間投入到業(yè)務開發(fā)中...
四、源碼分析
下面來看看 Redisson 對常規(guī)分布式鎖的實現(xiàn),主要分析 RedissonLock
1、lock加鎖操作
@Override public void lock() { try { lock(-1, null, false); } catch (InterruptedException e) { throw new IllegalStateException(); } } // 租約期限, 也就是expire時間, -1代表未設置 將使用系統(tǒng)默認的30s private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { // 嘗試拿鎖, 如果能拿到就直接返回 long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return; } RFuture<RedissonLockEntry> future = subscribe(threadId); if (interruptibly) { commandExecutor.syncSubscriptionInterrupted(future); } else { commandExecutor.syncSubscription(future); } // 如果拿不到鎖就嘗試一直輪循, 直到成功獲取鎖或者異常終止 try { while (true) { ttl = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { break; } ... } } finally { unsubscribe(future, threadId); } }
1.1、tryAcquire
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId)); } private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture<Long> ttlRemainingFuture; // 調(diào)用真正獲取鎖的操作 if (leaseTime != -1) { ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { return; } // lock acquired // 這里是成功獲取了鎖, 嘗試給鎖續(xù)約 if (ttlRemaining == null) { if (leaseTime != -1) { internalLockLeaseTime = unit.toMillis(leaseTime); } else { scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; } // 通過lua腳本真正執(zhí)行加鎖的操作 <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { // 如果key不存在, 那正好, 直接set并設置過期時間 // 如果key存在, 就有兩種情況需要考慮 // - 同一線程獲取重入鎖,直接將field(也就是getLockName(threadId))對應的value值+1 // - 不同線程競爭鎖, 此次加鎖失敗, 并直接返回此key對應的過期時間 return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId)); }
1.2、續(xù)約
通過 scheduleExpirationRenewal 給鎖續(xù)約
protected void scheduleExpirationRenewal(long threadId) { ExpirationEntry entry = new ExpirationEntry(); ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); if (oldEntry != null) { oldEntry.addThreadId(threadId); } else { entry.addThreadId(threadId); // 續(xù)約操作 renewExpiration(); } } private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } // 設置延遲任務task, 在時長internalLockLeaseTime/3之后執(zhí)行, 定期給鎖續(xù)期 Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } // 真正執(zhí)行續(xù)期命令操作 RFuture<Boolean> future = renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null) { log.error("Can't update lock " + getRawName() + " expiration", e); EXPIRATION_RENEWAL_MAP.remove(getEntryName()); return; } // 這次續(xù)期之后, 繼續(xù)schedule自己, 達到持續(xù)續(xù)期的效果 if (res) { // reschedule itself renewExpiration(); } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); ee.setTimeout(task); } // 所謂續(xù)期, 就是將expire過期時間再延長 protected RFuture<Boolean> renewExpirationAsync(long threadId) { // 如果key以及當前線程存在, 則延長expire時間, 并返回1代表成功;否則返回0代表失敗 return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.singletonList(getRawName()), internalLockLeaseTime, getLockName(threadId)); }
2、unlock解鎖操作
public void unlock() { try { get(unlockAsync(Thread.currentThread().getId())); } catch (RedisException e) { ... } } public RFuture<Void> unlockAsync(long threadId) { RPromise<Void> result = new RedissonPromise<>(); // 執(zhí)行解鎖操作 RFuture<Boolean> future = unlockInnerAsync(threadId); // 操作成功之后做的事 future.onComplete((opStatus, e) -> { // 取消續(xù)約task cancelExpirationRenewal(threadId); ... }); return result; } protected RFuture<Boolean> unlockInnerAsync(long threadId) { // 如果key以及當前線程對應的記錄已經(jīng)不存在, 直接返回空 // 否在將field(也就是getLockName(threadId))對應的value減1 // - 如果減去1之后值還大于0, 那么重新延長過期時間 // - 如果減去之后值小于等于0, 那么直接刪除key, 并發(fā)布訂閱消息 return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;", Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }
以上便是 redisson 客戶端工具對 redis 分布式鎖的加/解鎖具體實現(xiàn),主要解決了以下幾個問題
1、死鎖問題:設置過期時間
2、可重入問題:重入+1, 釋放鎖-1,當值=0時代表完全釋放鎖
3、續(xù)約問題:可解決鎖提前釋放問題
4、鎖釋放:誰加鎖就由誰來釋放
總結(jié):
本文由一個線上問題做引子,通過 redis 分布式鎖的常用實現(xiàn)方案,最終選定 redisson 的解決方案; 并分析 redisson 的具體實現(xiàn)細節(jié)
相關(guān)參考:
到此這篇關(guān)于Redisson如何解決Redis分布式鎖提前釋放問題的文章就介紹到這了,更多相關(guān)Redis分布式鎖提前釋放內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Redis的六種底層數(shù)據(jù)結(jié)構(gòu)(小結(jié))
本文主要介紹了Redis的六種底層數(shù)據(jù)結(jié)構(gòu),文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-01-01詳解redis desktop manager安裝及連接方式
這篇文章主要介紹了redis desktop manager安裝及連接方式,本文圖文并茂給大家介紹的非常詳細,具有一定的參考借鑒價值,需要的朋友可以參考下2019-09-09