欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

一文帶你剖析Redisson分布式鎖的原理

 更新時(shí)間:2022年11月22日 16:36:34   作者:尋找的路上  
相信使用過(guò)redis的,或者正在做分布式開(kāi)發(fā)的童鞋都知道redisson組件,它的功能很多,但我們使用最頻繁的應(yīng)該還是它的分布式鎖功能,少量的代碼,卻實(shí)現(xiàn)了加鎖、鎖續(xù)命(看門狗)、鎖訂閱、解鎖、鎖等待(自旋)等功能,我們來(lái)看看都是如何實(shí)現(xiàn)的

相信使用過(guò)redis的,或者正在做分布式開(kāi)發(fā)的童鞋都知道redisson組件,它的功能很多,但我們使用最頻繁的應(yīng)該還是它的分布式鎖功能,少量的代碼,卻實(shí)現(xiàn)了加鎖、鎖續(xù)命(看門狗)、鎖訂閱、解鎖、鎖等待(自旋)等功能,我們來(lái)看看都是如何實(shí)現(xiàn)的。

加鎖

//獲取鎖對(duì)象
RLock redissonLock = redisson.getLock(lockKey);
//加分布式鎖
redissonLock.lock();

根據(jù)redissonLock.lock()方法跟蹤到具體的private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId)方法,真正獲取加鎖的邏輯是在tryAcquireAsync該方法中調(diào)用的tryLockInnerAsync()方法,看看這個(gè)方法是怎么實(shí)現(xiàn)的?

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);

    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
               // 判斷是否存在分布式鎖,getName()也就是KEYS[1],也就是鎖key名                     
              "if (redis.call('exists', KEYS[1]) == 0) then " +
               // 加鎖,執(zhí)行hset 鎖key名 1                           
                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
               // 設(shè)置過(guò)期時(shí)間                           
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
               // 這個(gè)分支是redisson的重入鎖邏輯,鎖還在,鎖計(jì)數(shù)+1,重新設(shè)置過(guò)期時(shí)長(zhǎng)                 
              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              // 返回鎖的剩余過(guò)期時(shí)長(zhǎng)                            
              "return redis.call('pttl', KEYS[1]);",
                Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

發(fā)現(xiàn)底層是結(jié)合lua腳本實(shí)現(xiàn)了加鎖邏輯。

為什么底層結(jié)合了Lua腳本?Redis是在2.6推出了腳本功能,允許開(kāi)發(fā)者使用Lua語(yǔ)言編寫腳本傳到redis執(zhí)行。使用腳本的好處如下:

1、減少網(wǎng)絡(luò)開(kāi)銷:本來(lái)5次網(wǎng)絡(luò)請(qǐng)求的操作,可以用一個(gè)請(qǐng)求完成,原先5次請(qǐng)求的邏輯,可以一次性放到redis中執(zhí)行,較少了網(wǎng)絡(luò)往返時(shí)延。這點(diǎn)跟管道有點(diǎn)類似。

2、原子操作:Redis會(huì)將整個(gè)腳本作為一個(gè)整體執(zhí)行,中間不會(huì)被其他命令插入。管道不是原子的,不過(guò)
redis的批量操作命令(類似mset)是原子的
。

也就意味著雖然腳本中有多條redis指令,那即使有多條線程并發(fā)執(zhí)行,在同一時(shí)刻也只有一個(gè)線程能夠執(zhí)行這段邏輯,等這段邏輯執(zhí)行完,分布式鎖也就獲取到了,其它線程再進(jìn)來(lái)就獲取不到分布式鎖了。

鎖續(xù)命(自旋)

大家都聽(tīng)過(guò)鎖續(xù)命,肯定也知道這里涉及到看門狗的概念。在調(diào)用tryLockInnerAsync()方法時(shí),第一個(gè)參數(shù)是commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout()也就是默認(rèn)的看門狗過(guò)期時(shí)間是private long lockWatchdogTimeout = 30 * 1000毫秒。

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
    if (leaseTime != -1) {
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    // 添加監(jiān)聽(tīng)器,判斷獲取鎖是否成功,成功的話,添加定時(shí)任務(wù):定期更新鎖過(guò)期時(shí)間
    ttlRemainingFuture.addListener(new FutureListener<Long>() {
        @Override
        public void operationComplete(Future<Long> future) throws Exception {
            if (!future.isSuccess()) {
                return;
            }
            // 根據(jù)tryLockInnerAsync方法,加鎖成功,return nil 也就是null
            Long ttlRemaining = future.getNow();
            // lock acquired
            if (ttlRemaining == null) {
                // 添加定時(shí)任務(wù):定期更新鎖過(guò)期時(shí)間
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}

當(dāng)線程獲取到鎖后,會(huì)進(jìn)入if (ttlRemaining == null)分支,調(diào)用定期更新鎖過(guò)期時(shí)間scheduleExpirationRenewal方法,我們看看該方法實(shí)現(xiàn):

private void scheduleExpirationRenewal(final long threadId) {
    if (expirationRenewalMap.containsKey(getEntryName())) {
        return;
    }

    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            
            RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                    // 檢測(cè)KEYS[1]鎖是否還在,在的話再次設(shè)置過(guò)期時(shí)間                               
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                    "end; " +
                    "return 0;",
                      Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
            
            future.addListener(new FutureListener<Boolean>() {
                @Override
                public void operationComplete(Future<Boolean> future) throws Exception {
                    expirationRenewalMap.remove(getEntryName());
                    if (!future.isSuccess()) {
                        log.error("Can't update lock " + getName() + " expiration", future.cause());
                        return;
                    }
                    // 通過(guò)上面lua腳本執(zhí)行后會(huì)返回1,也就true,再次調(diào)用更新過(guò)期時(shí)間進(jìn)行續(xù)期
                    if (future.getNow()) {
                        // reschedule itself
                        scheduleExpirationRenewal(threadId);
                    }
                }
            });
        }
        // 延遲 internalLockLeaseTime / 3再執(zhí)行續(xù)命
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

    if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
        task.cancel();
    }
}

發(fā)現(xiàn)scheduleExpirationRenewal方法只是用了Timeout作為任務(wù),并沒(méi)有使用java的Timer()之類的定時(shí)器,而是在Timeout任務(wù)run()方法中定義了RFuture對(duì)象,通過(guò)給RFuture對(duì)象設(shè)置listener,在listener中通過(guò)Lua腳本執(zhí)行結(jié)果進(jìn)行判斷是否還需要進(jìn)行續(xù)期。通過(guò)這樣的方式來(lái)給分布式鎖進(jìn)行續(xù)期。

這種方式實(shí)現(xiàn)定時(shí)更新確實(shí)很巧妙,定期時(shí)間很靈活。

鎖訂閱及鎖等待

鎖訂閱是針對(duì)那些沒(méi)有獲取到分布式鎖的線程而言的。來(lái)看看整個(gè)獲取鎖的方法:

public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired,獲取到鎖,直接退出
        if (ttl == null) {
            return;
        }
		// 沒(méi)有獲取到鎖,進(jìn)行訂閱
        RFuture<RedissonLockEntry> future = subscribe(threadId);
        commandExecutor.syncSubscription(future);

        try {
            while (true) {
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    break;
                }

                // waiting for message
                if (ttl >= 0) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().acquire();
                }
            }
        } finally {
            unsubscribe(future, threadId);
        }
//        get(lockAsync(leaseTime, unit));
    }

當(dāng)?shù)谝粋€(gè)線程獲取到鎖后,會(huì)在if (ttl == null)分支進(jìn)行返回,第二個(gè)及以后的線程進(jìn)來(lái)在沒(méi)獲取到鎖時(shí),只能接著走下面的邏輯,進(jìn)行鎖的訂閱。

接著進(jìn)入到一個(gè)while循環(huán),首先還是會(huì)進(jìn)行一次嘗試獲取鎖(萬(wàn)一此時(shí)第一個(gè)線程已經(jīng)釋放鎖了呢),通過(guò)tryAcquire(leaseTime, unit, threadId)方法,如果沒(méi)有獲取到鎖的話,會(huì)返回鎖的剩余過(guò)期時(shí)間,如果剩余過(guò)期時(shí)間大于0,則當(dāng)前線程通過(guò)Semaphore信號(hào)號(hào),將當(dāng)前線程阻塞,底層執(zhí)行LockSupport.parkNanos(this, nanosTimeout)線程掛起剩余過(guò)期時(shí)間后,會(huì)自動(dòng)進(jìn)行喚醒,再次執(zhí)行tryAcquire嘗試獲取鎖。所有沒(méi)有獲取到鎖的線程都會(huì)執(zhí)行這個(gè)流程。

一定要等待剩余過(guò)期時(shí)間后才喚醒嗎?

假設(shè)線程一獲取到鎖,過(guò)期時(shí)間默認(rèn)為30s,當(dāng)前執(zhí)行業(yè)務(wù)邏輯已經(jīng)過(guò)了5s,那其他線程走到這里,則需要等待25s后才行進(jìn)行喚醒,那萬(wàn)一線程一執(zhí)行業(yè)務(wù)邏輯只要10s,那其他線程還需要等待20s嗎?這樣豈不是導(dǎo)致效率很低?

答案是否定的,詳細(xì)看解鎖邏輯。

解鎖

解鎖:redissonLock.unlock();

我們來(lái)看看具體的解鎖邏輯:

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            // 鎖不存在,發(fā)布unlockMessage解鎖消息,通知其他等待線程                              
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; " +
            "end;" +
            // 不存在該鎖,異常捕捉                              
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            // redisson可重入鎖計(jì)數(shù)-1,依舊>0,則重新設(shè)置過(guò)期時(shí)間                              
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
            "if (counter > 0) then " +
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            // redis刪除鎖,發(fā)布unlockMessage解鎖消息,通知其他等待線程                         
            "else " +
                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;",
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

}

發(fā)現(xiàn)解鎖邏輯底層也是用了一個(gè)lua腳本實(shí)現(xiàn)。具體的說(shuō)明可以看代碼注釋,刪除鎖后,并發(fā)布解鎖消息,通知到其它線程,也就意味著不會(huì)其它等待的線程一直等待。

Semophore信號(hào)量的訂閱中有個(gè)onMessage方法,

protected void onMessage(RedissonLockEntry value, Long message) {
    // 喚醒線程
    value.getLatch().release(message.intValue());
    
    while (true) {
        Runnable runnableToExecute = null;
        synchronized (value) {
            Runnable runnable = value.getListeners().poll();
            if (runnable != null) {
                if (value.getLatch().tryAcquire()) {
                    runnableToExecute = runnable;
                } else {
                    value.addListener(runnable);
                }
            }
        }
        
        if (runnableToExecute != null) {
            runnableToExecute.run();
        } else {
            return;
        }
    }
}

解鎖后通過(guò)if (opStatus)分支取消鎖續(xù)期邏輯。

總結(jié)

總的來(lái)說(shuō),可以借助一張圖加深理解:

分布式鎖的整體實(shí)現(xiàn)很巧妙,借助lua腳本的原子性,實(shí)現(xiàn)了很多功能,當(dāng)然redisson還有其它很多功能,比如為了解決主從集群中的異步復(fù)制會(huì)導(dǎo)致鎖丟失問(wèn)題,引入了redlock機(jī)制,還有分布式下的可重入鎖等。

到此這篇關(guān)于一文帶你剖析Redisson分布式鎖的原理的文章就介紹到這了,更多相關(guān)Redisson分布式鎖內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 帶你了解Java數(shù)據(jù)結(jié)構(gòu)和算法之遞歸

    帶你了解Java數(shù)據(jù)結(jié)構(gòu)和算法之遞歸

    這篇文章主要為大家介紹了Java數(shù)據(jù)結(jié)構(gòu)和算法之遞歸,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來(lái)幫助
    2022-01-01
  • java基面試礎(chǔ)知識(shí)詳解

    java基面試礎(chǔ)知識(shí)詳解

    這篇文章主要介紹了java基面試礎(chǔ)知識(shí),非常不錯(cuò),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2019-08-08
  • 一文讓你徹底明白Java中的值傳遞和引用傳遞

    一文讓你徹底明白Java中的值傳遞和引用傳遞

    這篇文章主要給大家介紹了關(guān)于Java中值傳遞和引用傳遞的相關(guān)資料,值傳遞是指在調(diào)用函數(shù)時(shí)將實(shí)際參數(shù)復(fù)制一份傳遞到函數(shù)中,引用傳遞是指在調(diào)用函數(shù)時(shí)將實(shí)際參數(shù)的引用直接傳遞到函數(shù)中,需要的朋友可以參考下
    2023-10-10
  • Java并發(fā)中死鎖、活鎖和饑餓是什么意思

    Java并發(fā)中死鎖、活鎖和饑餓是什么意思

    今天看到的一篇文章,說(shuō)的很好,再敲了一遍,分享一下有關(guān)于死鎖、活鎖及饑餓的概念和區(qū)別,感興趣的可以了解一下
    2021-11-11
  • IDEA解決Java:程序包xxxx不存在的問(wèn)題

    IDEA解決Java:程序包xxxx不存在的問(wèn)題

    這篇文章主要介紹了IDEA解決Java:程序包xxxx不存在的問(wèn)題,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-09-09
  • 詳解Spring事務(wù)回滾和事務(wù)提交

    詳解Spring事務(wù)回滾和事務(wù)提交

    這篇文章主要介紹了詳解Spring事務(wù)回滾和事務(wù)提交的相關(guān)資料,幫助大家更好的理解和學(xué)習(xí)使用spring框架,感興趣的朋友可以了解下
    2021-03-03
  • idea使用jclasslib插件查看字節(jié)碼

    idea使用jclasslib插件查看字節(jié)碼

    這篇文章主要為大家介紹了idea使用jclasslib插件查看字節(jié)碼,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-05-05
  • Java異常處理中同時(shí)有finally和return語(yǔ)句的執(zhí)行問(wèn)題

    Java異常處理中同時(shí)有finally和return語(yǔ)句的執(zhí)行問(wèn)題

    這篇文章主要介紹了Java異常處理中同時(shí)有finally和return語(yǔ)句的執(zhí)行問(wèn)題,首先確定的是一般finally語(yǔ)句都會(huì)被執(zhí)行...然后,需要的朋友可以參考下
    2015-11-11
  • IDEA創(chuàng)建Maven工程Servlet的詳細(xì)教程

    IDEA創(chuàng)建Maven工程Servlet的詳細(xì)教程

    這篇文章主要介紹了IDEA創(chuàng)建Maven工程Servlet的詳細(xì)教程,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-10-10
  • Spring MVC   文件、cookies的接收 與REST響應(yīng)詳解

    Spring MVC   文件、cookies的接收 與REST響應(yīng)詳

    在SpringMVC中,使用@RequestPart注解可接收文件并處理多部分請(qǐng)求,同時(shí)可以通過(guò)@CookieValue和HttpServletResponse來(lái)獲取和設(shè)置Cookies,本文介紹Spring MVC   文件、cookies的接收 與REST響應(yīng),感興趣的朋友跟隨小編一起看看吧
    2024-09-09

最新評(píng)論