欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

ZooKeeper框架教程Curator分布式鎖實(shí)現(xiàn)及源碼分析

 更新時(shí)間:2022年01月28日 14:45:04   作者:愛(ài)碼叔(稀有氣體)  
本文是ZooKeeper入門(mén)系列教程,本篇為大家介紹zookeeper一個(gè)優(yōu)秀的框架Curator,提供了各種分布式協(xié)調(diào)的服務(wù),Curator中有著更為標(biāo)準(zhǔn)、規(guī)范的分布式鎖實(shí)現(xiàn)

ZooKeeper入門(mén)教程一簡(jiǎn)介與核心概念

ZooKeeper入門(mén)教程二在單機(jī)和集群環(huán)境下的安裝搭建及使用

ZooKeeper入門(mén)教程三分布式鎖實(shí)現(xiàn)及完整運(yùn)行源碼

上一篇文章中,我們使用zookeeper的java api實(shí)現(xiàn)了分布式排他鎖。

Curator中有著更為標(biāo)準(zhǔn)、規(guī)范的分布式鎖實(shí)現(xiàn)。與其我們自己去實(shí)現(xiàn),不如直接使用Curator。通過(guò)學(xué)習(xí)Curator的源代碼,我們也能了解實(shí)現(xiàn)分布式鎖的最佳實(shí)踐。

Curator中有各種分布式鎖,本文挑選其中一個(gè)---InterProcessMutex進(jìn)行講解。

我們先看一下Curator代碼中對(duì)于InterProcessMutex的注釋?zhuān)?/p>

可重入的互斥鎖,跨JVM工作。使用ZooKeeper來(lái)控制鎖。所有JVM中的任何進(jìn)程,只要使用同樣的鎖路徑,將會(huì)成為跨進(jìn)程的一部分。此外,這個(gè)排他鎖是“公平的”,每個(gè)用戶(hù)按照申請(qǐng)的順序得到排他鎖。

可見(jiàn)InterProcessMutex和我們自己實(shí)現(xiàn)的例子都是一個(gè)排他鎖,此外還可以重入。 

  如何使用InterProcessMutex

在分析InterProcessMutex代碼前,我們先看一下它是如何使用的,下面代碼簡(jiǎn)單展示了InterProcessMutex的使用:

    public static void soldTickWithLock(CuratorFramework client) throws Exception {
        //創(chuàng)建分布式鎖, 鎖空間的根節(jié)點(diǎn)路徑為/curator/lock
        InterProcessMutex mutex = new InterProcessMutex(client, "/curator/locks");
        mutex.acquire();
 
        //獲得了鎖, 進(jìn)行業(yè)務(wù)流程
        //代表復(fù)雜邏輯執(zhí)行了一段時(shí)間
        int sleepMillis = (int) (Math.random() * 2000);
        Thread.sleep(sleepMillis);
 
        //完成業(yè)務(wù)流程, 釋放鎖
        mutex.release();
    }

使用方式和我們自己編寫(xiě)的鎖是一樣的,首先通過(guò)mutex.acquire()獲取鎖,該方法會(huì)阻塞進(jìn)程,直到獲取鎖,然后執(zhí)行你的業(yè)務(wù)方法,最后通過(guò) mutex.release()釋放鎖。

接下來(lái)我們進(jìn)入正題,展開(kāi)分析Curator關(guān)于分布式鎖的實(shí)現(xiàn):

  實(shí)現(xiàn)思路

Curator設(shè)計(jì)方式和之前我們自己實(shí)現(xiàn)的方式是類(lèi)似的:

1、創(chuàng)建有序臨時(shí)節(jié)點(diǎn)

2、觸發(fā)“嘗試取鎖邏輯”,如果自己是臨時(shí)鎖節(jié)點(diǎn)序列的第一個(gè),則取得鎖,獲取鎖成功。

3、如果自己不是序列中第一個(gè),則監(jiān)聽(tīng)前一個(gè)鎖節(jié)點(diǎn)變更。同時(shí)阻塞線(xiàn)程。

4、當(dāng)前一個(gè)鎖節(jié)點(diǎn)變更時(shí),通過(guò)watcher恢復(fù)線(xiàn)程,然后再次到步驟2“嘗試取鎖邏輯”

如下圖所示:

   代碼實(shí)現(xiàn)概述

Curator對(duì)于排它鎖的頂層實(shí)現(xiàn)邏輯在InterProcessMutex類(lèi)中,它對(duì)客戶(hù)端暴露鎖的使用方法,如獲取鎖和釋放鎖等。但鎖的上述實(shí)現(xiàn)邏輯,是由他持有的LockInternals對(duì)象來(lái)具體實(shí)現(xiàn)的。LockInternals使用StandardLockInternalsDriver類(lèi)中的方法來(lái)做一些處理。

簡(jiǎn)單點(diǎn)解釋?zhuān)覀兇騻€(gè)比方,Curator好比是一家公司承接各種業(yè)務(wù),InterProcessMutex是老板,收到自己客戶(hù)(client)的需求后,分配給自己的下屬LockInternals去具體完成,同時(shí)給他一個(gè)工具StandardLockInternalsDriver,讓他在做任務(wù)的過(guò)程中使用。如下圖展示:

接下來(lái)我們將深入分析InterProcessMutex、LockInternals及StandardLockInternalsDriver類(lèi)。

  InterProcessMutex源碼分析

InterProcessMutex類(lèi)是curator中的排它鎖類(lèi),客戶(hù)端直接打交道的就是InterProcessMutex。所以我們從頂層開(kāi)始,先分析InterProcessMutex。

  實(shí)現(xiàn)接口

InterProcessMutex實(shí)現(xiàn)了兩個(gè)接口:

public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex>

InterProcessLock是分布式鎖接口,分布式鎖必須實(shí)現(xiàn)接口中的如下方法:

1、獲取鎖,直到鎖可用

public void acquire() throws Exception;

2、在指定等待的時(shí)間內(nèi)獲取鎖。

public boolean acquire(long time, TimeUnit unit) throws Exception;

3、釋放鎖

public void release() throws Exception;

4、當(dāng)前線(xiàn)程是否獲取了鎖

boolean isAcquiredInThisProcess();

以上方法也是InterProcessMutex暴露出來(lái),供客戶(hù)端在使用分布式鎖時(shí)調(diào)用。

Revocable<T>,實(shí)現(xiàn)該接口的鎖,鎖是可以被撤銷(xiāo)的。本編文章重點(diǎn)講解鎖的實(shí)現(xiàn)機(jī)制,關(guān)于撤銷(xiāo)部分不做討論。

  屬性

InterProcessMutex屬性如下:

類(lèi)型名稱(chēng)說(shuō)明
LockInternalsinternals鎖的實(shí)現(xiàn)都在該類(lèi)中,InterProcessMutex通過(guò)此類(lèi)的方法實(shí)現(xiàn)鎖
StringbasePath鎖節(jié)點(diǎn)在zk中的根路徑
ConcurrentMap<Thread, LockData>threadData線(xiàn)程和自己的鎖相關(guān)數(shù)據(jù)映射
StringLOCK_NAME常量,值為"lock-"。表示鎖節(jié)點(diǎn)的前綴

它還有一個(gè)內(nèi)部靜態(tài)類(lèi)LockData,也是threadData中保存的value,它定義了鎖的相關(guān)數(shù)據(jù),包括鎖所屬線(xiàn)程,鎖的全路徑,和該線(xiàn)程加鎖的次數(shù)(InterProcessMutex為可重入鎖)。代碼如下:

private static class LockData
{
    final Thread owningThread;
    final String lockPath;
    final AtomicInteger lockCount = new AtomicInteger(1);
    private LockData(Thread owningThread, String lockPath)
    {
        this.owningThread = owningThread;
        this.lockPath = lockPath;
    }
}

  構(gòu)造方法

InterProcessMutex有三個(gè)構(gòu)造方法,根據(jù)入?yún)⒉煌短渍{(diào)用,最終調(diào)用的構(gòu)造方法如下:

InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver)
{
    basePath = PathUtils.validatePath(path);
    internals = new LockInternals(client, driver, path, lockName, maxLeases);
}

可見(jiàn)構(gòu)造方法最終初始化了兩個(gè)屬性,basePath被設(shè)置為我們傳入的值 "/curator/lock",這是鎖的根節(jié)點(diǎn)。此外就是初始化了internals,前面說(shuō)過(guò)internals是真正實(shí)現(xiàn)鎖功能的對(duì)象。真正干活的是internals。

構(gòu)造完InterProcessMutex對(duì)象后,我們看看它是如何工作的。

  方法

InterProcessMutex實(shí)現(xiàn)InterProcessLock接口,關(guān)于分布式鎖的幾個(gè)方法都在這個(gè)接口中,我們看看InterProcessMutex是如何實(shí)現(xiàn)的。

  獲得鎖

獲得鎖有兩個(gè)方法,區(qū)別為是否限定了等待鎖的時(shí)間長(zhǎng)度。其實(shí)最終都是調(diào)用的私有方法internalLock()。不限定等待時(shí)長(zhǎng)的代碼如下:

public void acquire() throws Exception
{
    if ( !internalLock(-1, null) )
    {
        throw new IOException("Lost connection while trying to acquire lock: " + basePath);
    }
}

可以看到internalLock()返回false時(shí),只可能因?yàn)檫B接超時(shí),否則會(huì)一直等待獲取鎖。

internalLock邏輯如下:

  • 取得當(dāng)前線(xiàn)程在threadData中的lockData
  • 如果存在該線(xiàn)程的鎖數(shù)據(jù),說(shuō)明是鎖重入, lockData.lockCount加1,直接返回true。獲取鎖成功
  • 如果不存在該線(xiàn)程的鎖數(shù)據(jù),則通過(guò)internals.attemptLock()獲取鎖,此時(shí)線(xiàn)程被阻塞,直至獲得到鎖
  • 鎖獲取成功后,把鎖的信息保存到threadData中。
  • 如果沒(méi)能獲取到鎖,則返回false。

完整代碼如下:

private boolean internalLock(long time, TimeUnit unit) throws Exception
{
    /*
       Note on concurrency: a given lockData instance
       can be only acted on by a single thread so locking isn't necessary
    */
    Thread currentThread = Thread.currentThread();
    LockData lockData = threadData.get(currentThread);
    if ( lockData != null )
    {
        // re-entering
        lockData.lockCount.incrementAndGet();
        return true;
    }
    String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
    if ( lockPath != null )
    {
        LockData newLockData = new LockData(currentThread, lockPath);
        threadData.put(currentThread, newLockData);
        return true;
    }
    return false;
}

可以看到獲取鎖的核心代碼是internals.attemptLock

  釋放鎖

釋放鎖的方法為release(),邏輯如下:

從threadData中取得當(dāng)前線(xiàn)程的鎖數(shù)據(jù),有如下情況:

不存在,拋出無(wú)此鎖的異常

存在,而且lockCount-1后大于零,說(shuō)明該線(xiàn)程鎖重入了,所以直接返回,并不在zk中釋放。

存在,而且lockCount-1后小于零,說(shuō)明有某種異常發(fā)生,直接拋異常

存在,而且lockCount-1等于零,這是無(wú)重入的正確狀態(tài),需要做的就是從zk中刪除臨時(shí)節(jié)點(diǎn),通過(guò)internals.releaseLock(),不管結(jié)果如何,在threadData中移除該線(xiàn)程的數(shù)據(jù)。 

InterProcessMutex小結(jié)

分布式鎖主要用到的是上面兩個(gè)方法,InterProcessMutex還有些其他的方法,這里就不做具體講解,可以自己看一下,實(shí)現(xiàn)都不復(fù)雜。

通過(guò)對(duì)InterProcessMutex的講解,相信我們已經(jīng)對(duì)鎖的獲得和釋放有了了解,應(yīng)該也意識(shí)到真正實(shí)現(xiàn)鎖的是LockInternals類(lèi)。接下來(lái)我們將重點(diǎn)講解LockInternals。

  LockInternals源碼分析

Curator通過(guò)zk實(shí)現(xiàn)分布式鎖的核心邏輯都在LockInternals中,我們按獲取鎖到釋放鎖的流程為指引,逐步分析LockInternals的源代碼。

  獲取鎖

在InterProcessMutex獲取鎖的代碼分析中,可以看到它是通過(guò)internals.attemptLock(time, unit, getLockNodeBytes());來(lái)獲取鎖的,那么我們就以這個(gè)方法為入口。此方法的邏輯比較簡(jiǎn)單,如下:

通過(guò)driver在zk上創(chuàng)建鎖節(jié)點(diǎn),獲得鎖節(jié)點(diǎn)路徑。

通過(guò)internalLockLoop()方法阻塞進(jìn)程,直到獲取鎖成功。

核心代碼如下:

ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);

我們繼續(xù)分析internalLockLoop方法,獲取鎖的核心邏輯在此方法中。

internalLockLoop中通過(guò)while自旋,判斷鎖如果沒(méi)有被獲取,將不斷的去嘗試獲取鎖。

while循環(huán)中邏輯如下:

  • 通過(guò)driver查看當(dāng)前鎖節(jié)點(diǎn)序號(hào)是否排在第一位,如果排在第一位,說(shuō)明取鎖成功,跳出循環(huán)
  • 如果沒(méi)有排在第一位,則監(jiān)聽(tīng)自己的前序鎖節(jié)點(diǎn),然后阻塞線(xiàn)程。

當(dāng)前序節(jié)點(diǎn)釋放了鎖,監(jiān)聽(tīng)會(huì)被觸發(fā),恢復(fù)線(xiàn)程,此時(shí)主線(xiàn)程又回到while中第一步。

重復(fù)以上邏輯,直至獲取到鎖(自己鎖的序號(hào)排在首位)。

internalLockLoop方法核心代碼如下:

while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{
    List<String>        children = getSortedChildren();
    String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
 
    PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
    if ( predicateResults.getsTheLock() )
    {
        haveTheLock = true;
    }
    else
    {
        String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
 
        synchronized(this)
        {
            try 
            {
                // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
                client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                if ( millisToWait != null )
                {
                    millisToWait -= (System.currentTimeMillis() - startMillis);
                    startMillis = System.currentTimeMillis();
                    if ( millisToWait <= 0 )
                    {
                        doDelete = true;    // timed out - delete our node
                        break;
                    }
 
                    wait(millisToWait);
                }
                else
                {
                    wait();
                }
            }
            catch ( KeeperException.NoNodeException e ) 
            {
                // it has been deleted (i.e. lock released). Try to acquire again
            }
        }
    }
}

獲取鎖的主要代碼邏輯我們到這就已經(jīng)分析完了,可見(jiàn)和我們自己的實(shí)現(xiàn)還是基本一樣的。此外上面提到了driver對(duì)象,也就是StandardLockInternalsDriver類(lèi),它提供了一些輔助的方法,比如說(shuō)在zk創(chuàng)建鎖節(jié)點(diǎn),判斷zk上鎖序列第一位是否為當(dāng)前鎖,鎖序列的排序邏輯等。我們就不具體講解了。

  釋放鎖

釋放鎖的邏輯很簡(jiǎn)單,移除watcher,刪除鎖節(jié)點(diǎn)。代碼如下:

final void releaseLock(String lockPath) throws Exception
{
client.removeWatchers();
revocable.set(null);
deleteOurPath(lockPath);
}

  總結(jié)

至此,Curator中InterProcessMutex的源代碼分析全部完成。

簡(jiǎn)單回顧下,InterProcessMutex類(lèi)封裝上層邏輯,對(duì)外暴露鎖的使用方法。而真正的鎖實(shí)現(xiàn)邏輯在LockInternals中,它通過(guò)對(duì)zk臨時(shí)有序鎖節(jié)點(diǎn)的創(chuàng)建和監(jiān)控,判斷自己的鎖序號(hào)是否在首位,來(lái)實(shí)現(xiàn)鎖的獲取。此外它還結(jié)合StandardLockInternalsDriver提供的方法,共同實(shí)現(xiàn)了排他鎖。

希望大家以后多多支持腳本之家!

相關(guān)文章

  • Spring AOP 切面@Around注解的用法說(shuō)明

    Spring AOP 切面@Around注解的用法說(shuō)明

    這篇文章主要介紹了Spring AOP 切面@Around注解的用法說(shuō)明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2021-02-02
  • Java異常處理實(shí)例分析

    Java異常處理實(shí)例分析

    這篇文章主要介紹了Java異常處理,實(shí)例分析了java異常處理的常見(jiàn)用法,具有一定參考借鑒價(jià)值,需要的朋友可以參考下
    2015-04-04
  • spring boot實(shí)現(xiàn)自動(dòng)輸出word文檔功能的實(shí)例代碼

    spring boot實(shí)現(xiàn)自動(dòng)輸出word文檔功能的實(shí)例代碼

    這篇文章主要介紹了spring boot實(shí)現(xiàn)自動(dòng)輸出word文檔功能的實(shí)例代碼,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-04-04
  • springmvc的@Validated注解使用

    springmvc的@Validated注解使用

    這篇文章主要介紹了springmvc的@Validated注解使用,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2019-12-12
  • maven多模塊項(xiàng)目依賴(lài)管理與依賴(lài)?yán)^承詳解

    maven多模塊項(xiàng)目依賴(lài)管理與依賴(lài)?yán)^承詳解

    這篇文章主要介紹了maven多模塊項(xiàng)目依賴(lài)管理與依賴(lài)?yán)^承詳解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • Spring?Kafka中如何通過(guò)參數(shù)配置解決超時(shí)問(wèn)題詳解

    Spring?Kafka中如何通過(guò)參數(shù)配置解決超時(shí)問(wèn)題詳解

    這篇文章主要給大家介紹了關(guān)于Spring?Kafka中如何通過(guò)參數(shù)配置解決超時(shí)問(wèn)題的相關(guān)資料,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2022-01-01
  • 分析xxljob登入功能集成OIDC的統(tǒng)一認(rèn)證

    分析xxljob登入功能集成OIDC的統(tǒng)一認(rèn)證

    這篇文章主要為大家介紹分析xxljob登入功能集成OIDC的統(tǒng)一認(rèn)證的詳解說(shuō)明,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步
    2022-02-02
  • 解決SpringMVC使用@RequestBody注解報(bào)400錯(cuò)誤的問(wèn)題

    解決SpringMVC使用@RequestBody注解報(bào)400錯(cuò)誤的問(wèn)題

    這篇文章主要介紹了解決SpringMVC使用@RequestBody注解報(bào)400錯(cuò)誤的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2020-09-09
  • Java中對(duì)list map根據(jù)map某個(gè)key值進(jìn)行排序的方法

    Java中對(duì)list map根據(jù)map某個(gè)key值進(jìn)行排序的方法

    今天小編就為大家分享一篇Java中對(duì)list map根據(jù)map某個(gè)key值進(jìn)行排序的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2018-07-07
  • idea2023.3安裝及配置詳細(xì)圖文教程

    idea2023.3安裝及配置詳細(xì)圖文教程

    IDEA全稱(chēng)IntelliJ?IDEA,是Java語(yǔ)言對(duì)的集成開(kāi)發(fā)環(huán)境,IDEA在業(yè)界被認(rèn)為是公認(rèn)最好的Java開(kāi)發(fā)工具,這篇文章主要給大家介紹了關(guān)于idea2023.3安裝及配置的相關(guān)資料,需要的朋友可以參考下
    2023-11-11

最新評(píng)論