欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Redisson如何解決Redis分布式鎖提前釋放問題

 更新時間:2022年05月26日 15:18:12   作者:柏油  
本文主要介紹了Redisson如何解決Redis分布式鎖提前釋放問題,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧

前言:

在分布式場景下,相信你或多或少需要使用分布式鎖來訪問臨界資源,或者控制耗時操作的并發(fā)性。

當然,實現(xiàn)分布式鎖的方案也比較多,比如數(shù)據(jù)庫、redis、zk 等等。本文主要結(jié)合一個線上案例,講解 redis 分布式鎖的相關(guān)實現(xiàn)。

一、問題描述:

某天線上出現(xiàn)了數(shù)據(jù)重復處理問題,經(jīng)排查后發(fā)現(xiàn),竟然是單次處理時間較長,redis 分布式鎖提前釋放導致相同請求并發(fā)處理。

其實,這是一個鎖續(xù)約的問題,對于一把分布式鎖,我們需要考慮,設置鎖多長時間過期、出現(xiàn)異常如何釋放鎖?

以上問題便是本文要討論的主題。

二、原因分析:

      項目采用較簡單的自定義 redis 分布式鎖,為避免死鎖定義默認過期時間 10s,如下:

    override fun lock() {

        while (true) {
            //嘗試獲取鎖
            if (tryLock()) {
                return
            }
            try {
                Thread.sleep(10)
            } catch (e: InterruptedException) {
                e.printStackTrace()
            }

        }
    }

    override fun tryLock(): Boolean {
        val value = getUniqueSign() // 隨機串
        val flag = redisTemplate!!.opsForValue().setIfAbsent(name, value, 10000, TimeUnit.MILLISECONDS)
        if (flag != null && flag) {
            VALUE_lOCAL.set(value)
            INTO_NUM_LOCAL.set(if (INTO_NUM_LOCAL.get() != null) INTO_NUM_LOCAL.get() + 1 else 1)
            return true
        }
        return false
    }

缺乏對鎖自動續(xù)期等實現(xiàn)。

三、解決方案:

1、思考: 

針對這種場景,可以考慮的是如何給鎖自動續(xù)期-當業(yè)務沒有執(zhí)行結(jié)束的情況下,當然也可以自定義實現(xiàn) 比如開一個后臺線程定時的給這些拿到鎖的線程續(xù)期。

Redisson 也正是基于這種思路實現(xiàn)自動續(xù)期的分布式鎖,各種異常情況也考慮的更加完善,綜合考慮采用 Redisson 的分布式鎖解決方案優(yōu)化。

2、Redisson簡單配置:

@Configuration
@EnableConfigurationProperties(RedissonProperties::class)
class RedissonConfig {

    @Bean
    fun redissonClient(redissonProperties: RedissonProperties): RedissonClient {
        val config = Config()
        val singleServerConfig = redissonProperties.singleServerConfig!!
        config.useSingleServer().setAddress(singleServerConfig.address)
                .setDatabase(singleServerConfig.database)
                .setUsername(singleServerConfig.username)
                .setPassword(singleServerConfig.password)
                .setConnectionPoolSize(singleServerConfig.connectionPoolSize)
              .setConnectionMinimumIdleSize(singleServerConfig.connectionMinimumIdleSize)
                .setConnectTimeout(singleServerConfig.connectTimeout)
                .setIdleConnectionTimeout(singleServerConfig.idleConnectionTimeout)
                .setRetryInterval(singleServerConfig.retryInterval)
                .setRetryAttempts(singleServerConfig.retryAttempts)
                .setTimeout(singleServerConfig.timeout)
        return Redisson.create(config)
    }

}

@ConfigurationProperties(prefix = "xxx.redisson")
class RedissonProperties {
    var singleServerConfig: SingleServerConfig? = null
}

Redis 服務使用的騰訊云的哨兵模式架構(gòu),此架構(gòu)對外開放一個代理地址訪問,因此這里配置單機模式配置即可。

如果你是自己搭建的 redis 哨兵模式架構(gòu),需要按照文檔配置相關(guān)必要參數(shù)

3、使用樣例:

    ...
  
    @Autowired
    lateinit var redissonClient: RedissonClient

 
    ... 

    fun xxx() {

      ...

      val lock = redissonClient.getLock("mylock")
      lock.lock()
      try {
        
        ... 

      } finally {
        lock.unlock()
      }

        ...

    }

使用方式和JDK提供的鎖是不是很像?是不是很簡單?

正是Redisson這類優(yōu)秀的開源產(chǎn)品的出現(xiàn),才讓我們將更多的時間投入到業(yè)務開發(fā)中...

四、源碼分析

下面來看看 Redisson 對常規(guī)分布式鎖的實現(xiàn),主要分析 RedissonLock

1、lock加鎖操作

    @Override
    public void lock() {
        try {
            lock(-1, null, false);
        } catch (InterruptedException e) {
            throw new IllegalStateException();
        }
    }



    // 租約期限, 也就是expire時間, -1代表未設置 將使用系統(tǒng)默認的30s
    private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        // 嘗試拿鎖, 如果能拿到就直接返回
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return;
        }

        RFuture<RedissonLockEntry> future = subscribe(threadId);
        if (interruptibly) {
            commandExecutor.syncSubscriptionInterrupted(future);
        } else {
            commandExecutor.syncSubscription(future);
        }

        // 如果拿不到鎖就嘗試一直輪循, 直到成功獲取鎖或者異常終止
        try {
            while (true) {
                ttl = tryAcquire(-1, leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    break;
                }

                ...

            }
        } finally {
            unsubscribe(future, threadId);
        }
    }

1.1、tryAcquire

    private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
    }

    private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        RFuture<Long> ttlRemainingFuture;
        // 調(diào)用真正獲取鎖的操作
        if (leaseTime != -1) {
            ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        } else {
            ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                    TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        }
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e != null) {
                return;
            }

            // lock acquired
            // 這里是成功獲取了鎖, 嘗試給鎖續(xù)約
            if (ttlRemaining == null) {
                if (leaseTime != -1) {
                    internalLockLeaseTime = unit.toMillis(leaseTime);
                } else {
                    scheduleExpirationRenewal(threadId);
                }
            }
        });
        return ttlRemainingFuture;
    }

    // 通過lua腳本真正執(zhí)行加鎖的操作
    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        // 如果key不存在, 那正好, 直接set并設置過期時間
        // 如果key存在, 就有兩種情況需要考慮
        //   - 同一線程獲取重入鎖,直接將field(也就是getLockName(threadId))對應的value值+1
        //   - 不同線程競爭鎖, 此次加鎖失敗, 并直接返回此key對應的過期時間
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
    }

1.2、續(xù)約

通過 scheduleExpirationRenewal 給鎖續(xù)約

    protected void scheduleExpirationRenewal(long threadId) {
        ExpirationEntry entry = new ExpirationEntry();
        ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
        if (oldEntry != null) {
            oldEntry.addThreadId(threadId);
        } else {
            entry.addThreadId(threadId);
            // 續(xù)約操作
            renewExpiration();
        }
    }

    private void renewExpiration() {
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
        
        // 設置延遲任務task, 在時長internalLockLeaseTime/3之后執(zhí)行, 定期給鎖續(xù)期
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                
                // 真正執(zhí)行續(xù)期命令操作
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.onComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock " + getRawName() + " expiration", e);
                        EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                        return;
                    }
                    
                    // 這次續(xù)期之后, 繼續(xù)schedule自己, 達到持續(xù)續(xù)期的效果
                    if (res) {
                        // reschedule itself
                        renewExpiration();
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        
        ee.setTimeout(task);
    }

    // 所謂續(xù)期, 就是將expire過期時間再延長
    protected RFuture<Boolean> renewExpirationAsync(long threadId) {
        // 如果key以及當前線程存在, 則延長expire時間, 并返回1代表成功;否則返回0代表失敗
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return 0;",
                Collections.singletonList(getRawName()),
                internalLockLeaseTime, getLockName(threadId));
    }

2、unlock解鎖操作

  public void unlock() {
        try {
            get(unlockAsync(Thread.currentThread().getId()));
        } catch (RedisException e) {
            ...
        }
     
    }

    public RFuture<Void> unlockAsync(long threadId) {
        RPromise<Void> result = new RedissonPromise<>();
        // 執(zhí)行解鎖操作
        RFuture<Boolean> future = unlockInnerAsync(threadId);

        // 操作成功之后做的事
        future.onComplete((opStatus, e) -> {
            // 取消續(xù)約task
            cancelExpirationRenewal(threadId);
            
            ...

        });

        return result;
    }

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        // 如果key以及當前線程對應的記錄已經(jīng)不存在, 直接返回空
        // 否在將field(也就是getLockName(threadId))對應的value減1
        //   - 如果減去1之后值還大于0, 那么重新延長過期時間
        //   - 如果減去之后值小于等于0, 那么直接刪除key, 并發(fā)布訂閱消息
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                        "end; " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                        "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                        "else " +
                        "redis.call('del', KEYS[1]); " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return nil;",
                Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
    }

以上便是 redisson 客戶端工具對 redis 分布式鎖的加/解鎖具體實現(xiàn),主要解決了以下幾個問題

    1、死鎖問題:設置過期時間

    2、可重入問題:重入+1, 釋放鎖-1,當值=0時代表完全釋放鎖

    3、續(xù)約問題:可解決鎖提前釋放問題

    4、鎖釋放:誰加鎖就由誰來釋放

總結(jié):

本文由一個線上問題做引子,通過 redis 分布式鎖的常用實現(xiàn)方案,最終選定 redisson 的解決方案; 并分析 redisson 的具體實現(xiàn)細節(jié)

相關(guān)參考:

到此這篇關(guān)于Redisson如何解決Redis分布式鎖提前釋放問題的文章就介紹到這了,更多相關(guān)Redis分布式鎖提前釋放內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 淺談Redis 緩存的三大問題及其解決方案

    淺談Redis 緩存的三大問題及其解決方案

    Redis 經(jīng)常用于系統(tǒng)中的緩存,這樣可以解決目前 IO 設備無法滿足互聯(lián)網(wǎng)應用海量的讀寫請求的問題。本文主要介紹了淺談Redis 緩存的三大問題及其解決方案,感興趣的可以了解一下
    2021-07-07
  • 一篇文章帶你徹底搞懂Redis?事務

    一篇文章帶你徹底搞懂Redis?事務

    這篇文章主要介紹了一篇文章帶你徹底搞懂Redis?事務的相關(guān)資料,需要的朋友可以參考下
    2022-10-10
  • Redis Cluster 集群搭建你會嗎

    Redis Cluster 集群搭建你會嗎

    這篇文章主要介紹了Redis Cluster 集群搭建過程,本文分步驟通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-08-08
  • 使用Redis實現(xiàn)用戶積分排行榜的教程

    使用Redis實現(xiàn)用戶積分排行榜的教程

    這篇文章主要介紹了使用Redis實現(xiàn)用戶積分排行榜的教程,包括一個用PHP腳本進行操作的例子,需要的朋友可以參考下
    2015-04-04
  • Redis的六種底層數(shù)據(jù)結(jié)構(gòu)(小結(jié))

    Redis的六種底層數(shù)據(jù)結(jié)構(gòu)(小結(jié))

    本文主要介紹了Redis的六種底層數(shù)據(jù)結(jié)構(gòu),文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-01-01
  • 詳解redis desktop manager安裝及連接方式

    詳解redis desktop manager安裝及連接方式

    這篇文章主要介紹了redis desktop manager安裝及連接方式,本文圖文并茂給大家介紹的非常詳細,具有一定的參考借鑒價值,需要的朋友可以參考下
    2019-09-09
  • 淺析Redis如何保證數(shù)據(jù)不丟失

    淺析Redis如何保證數(shù)據(jù)不丟失

    Redis是一種Nosql類型的數(shù)據(jù)存儲,全稱Remote?Dictionary?Server,也就是遠程字典服務器,本文主要來和大家討論一下Redis如何保證數(shù)據(jù)不丟失,需要的可以參考下
    2024-02-02
  • 淺談Redis處理接口冪等性的兩種方案

    淺談Redis處理接口冪等性的兩種方案

    本文主要介紹了淺談Redis處理接口冪等性的兩種方案,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2022-08-08
  • redis數(shù)據(jù)傾斜處理方法

    redis數(shù)據(jù)傾斜處理方法

    我們在使用Redis分片集群時,集群最好的狀態(tài)就是每個實例可以處理相同或相近比例的請求,但如果不是這樣,則會出現(xiàn)某些實例壓力特別大,而某些實例特別空閑的情況發(fā)生,本文就一起來看下這種情況是如何發(fā)生的以及如何處理
    2022-12-12
  • Redis的使用模式之計數(shù)器模式實例

    Redis的使用模式之計數(shù)器模式實例

    這篇文章主要介紹了Redis的使用模式之計數(shù)器模式實例,本文講解了匯總計數(shù)器、按時間匯總的計數(shù)器、速度控制、使用 Hash 數(shù)據(jù)類型維護大量計數(shù)器等內(nèi)容,需要的朋友可以參考下
    2015-03-03

最新評論