ZooKeeper框架教程Curator分布式鎖實(shí)現(xiàn)及源碼分析
ZooKeeper入門(mén)教程一簡(jiǎn)介與核心概念
ZooKeeper入門(mén)教程二在單機(jī)和集群環(huán)境下的安裝搭建及使用
ZooKeeper入門(mén)教程三分布式鎖實(shí)現(xiàn)及完整運(yùn)行源碼
上一篇文章中,我們使用zookeeper的java api實(shí)現(xiàn)了分布式排他鎖。
Curator中有著更為標(biāo)準(zhǔn)、規(guī)范的分布式鎖實(shí)現(xiàn)。與其我們自己去實(shí)現(xiàn),不如直接使用Curator。通過(guò)學(xué)習(xí)Curator的源代碼,我們也能了解實(shí)現(xiàn)分布式鎖的最佳實(shí)踐。
Curator中有各種分布式鎖,本文挑選其中一個(gè)---InterProcessMutex進(jìn)行講解。
我們先看一下Curator代碼中對(duì)于InterProcessMutex的注釋?zhuān)?/p>
可重入的互斥鎖,跨JVM工作。使用ZooKeeper來(lái)控制鎖。所有JVM中的任何進(jìn)程,只要使用同樣的鎖路徑,將會(huì)成為跨進(jìn)程的一部分。此外,這個(gè)排他鎖是“公平的”,每個(gè)用戶(hù)按照申請(qǐng)的順序得到排他鎖。
可見(jiàn)InterProcessMutex和我們自己實(shí)現(xiàn)的例子都是一個(gè)排他鎖,此外還可以重入。
如何使用InterProcessMutex
在分析InterProcessMutex代碼前,我們先看一下它是如何使用的,下面代碼簡(jiǎn)單展示了InterProcessMutex的使用:
public static void soldTickWithLock(CuratorFramework client) throws Exception { //創(chuàng)建分布式鎖, 鎖空間的根節(jié)點(diǎn)路徑為/curator/lock InterProcessMutex mutex = new InterProcessMutex(client, "/curator/locks"); mutex.acquire(); //獲得了鎖, 進(jìn)行業(yè)務(wù)流程 //代表復(fù)雜邏輯執(zhí)行了一段時(shí)間 int sleepMillis = (int) (Math.random() * 2000); Thread.sleep(sleepMillis); //完成業(yè)務(wù)流程, 釋放鎖 mutex.release(); }
使用方式和我們自己編寫(xiě)的鎖是一樣的,首先通過(guò)mutex.acquire()獲取鎖,該方法會(huì)阻塞進(jìn)程,直到獲取鎖,然后執(zhí)行你的業(yè)務(wù)方法,最后通過(guò) mutex.release()釋放鎖。
接下來(lái)我們進(jìn)入正題,展開(kāi)分析Curator關(guān)于分布式鎖的實(shí)現(xiàn):
實(shí)現(xiàn)思路
Curator設(shè)計(jì)方式和之前我們自己實(shí)現(xiàn)的方式是類(lèi)似的:
1、創(chuàng)建有序臨時(shí)節(jié)點(diǎn)
2、觸發(fā)“嘗試取鎖邏輯”,如果自己是臨時(shí)鎖節(jié)點(diǎn)序列的第一個(gè),則取得鎖,獲取鎖成功。
3、如果自己不是序列中第一個(gè),則監(jiān)聽(tīng)前一個(gè)鎖節(jié)點(diǎn)變更。同時(shí)阻塞線(xiàn)程。
4、當(dāng)前一個(gè)鎖節(jié)點(diǎn)變更時(shí),通過(guò)watcher恢復(fù)線(xiàn)程,然后再次到步驟2“嘗試取鎖邏輯”
如下圖所示:
代碼實(shí)現(xiàn)概述
Curator對(duì)于排它鎖的頂層實(shí)現(xiàn)邏輯在InterProcessMutex類(lèi)中,它對(duì)客戶(hù)端暴露鎖的使用方法,如獲取鎖和釋放鎖等。但鎖的上述實(shí)現(xiàn)邏輯,是由他持有的LockInternals對(duì)象來(lái)具體實(shí)現(xiàn)的。LockInternals使用StandardLockInternalsDriver類(lèi)中的方法來(lái)做一些處理。
簡(jiǎn)單點(diǎn)解釋?zhuān)覀兇騻€(gè)比方,Curator好比是一家公司承接各種業(yè)務(wù),InterProcessMutex是老板,收到自己客戶(hù)(client)的需求后,分配給自己的下屬LockInternals去具體完成,同時(shí)給他一個(gè)工具StandardLockInternalsDriver,讓他在做任務(wù)的過(guò)程中使用。如下圖展示:
接下來(lái)我們將深入分析InterProcessMutex、LockInternals及StandardLockInternalsDriver類(lèi)。
InterProcessMutex源碼分析
InterProcessMutex類(lèi)是curator中的排它鎖類(lèi),客戶(hù)端直接打交道的就是InterProcessMutex。所以我們從頂層開(kāi)始,先分析InterProcessMutex。
實(shí)現(xiàn)接口
InterProcessMutex實(shí)現(xiàn)了兩個(gè)接口:
public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex>
InterProcessLock是分布式鎖接口,分布式鎖必須實(shí)現(xiàn)接口中的如下方法:
1、獲取鎖,直到鎖可用
public void acquire() throws Exception;
2、在指定等待的時(shí)間內(nèi)獲取鎖。
public boolean acquire(long time, TimeUnit unit) throws Exception;
3、釋放鎖
public void release() throws Exception;
4、當(dāng)前線(xiàn)程是否獲取了鎖
boolean isAcquiredInThisProcess();
以上方法也是InterProcessMutex暴露出來(lái),供客戶(hù)端在使用分布式鎖時(shí)調(diào)用。
Revocable<T>,實(shí)現(xiàn)該接口的鎖,鎖是可以被撤銷(xiāo)的。本編文章重點(diǎn)講解鎖的實(shí)現(xiàn)機(jī)制,關(guān)于撤銷(xiāo)部分不做討論。
屬性
InterProcessMutex屬性如下:
類(lèi)型 | 名稱(chēng) | 說(shuō)明 |
LockInternals | internals | 鎖的實(shí)現(xiàn)都在該類(lèi)中,InterProcessMutex通過(guò)此類(lèi)的方法實(shí)現(xiàn)鎖 |
String | basePath | 鎖節(jié)點(diǎn)在zk中的根路徑 |
ConcurrentMap<Thread, LockData> | threadData | 線(xiàn)程和自己的鎖相關(guān)數(shù)據(jù)映射 |
String | LOCK_NAME | 常量,值為"lock-"。表示鎖節(jié)點(diǎn)的前綴 |
它還有一個(gè)內(nèi)部靜態(tài)類(lèi)LockData,也是threadData中保存的value,它定義了鎖的相關(guān)數(shù)據(jù),包括鎖所屬線(xiàn)程,鎖的全路徑,和該線(xiàn)程加鎖的次數(shù)(InterProcessMutex為可重入鎖)。代碼如下:
private static class LockData { final Thread owningThread; final String lockPath; final AtomicInteger lockCount = new AtomicInteger(1); private LockData(Thread owningThread, String lockPath) { this.owningThread = owningThread; this.lockPath = lockPath; } }
構(gòu)造方法
InterProcessMutex有三個(gè)構(gòu)造方法,根據(jù)入?yún)⒉煌短渍{(diào)用,最終調(diào)用的構(gòu)造方法如下:
InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) { basePath = PathUtils.validatePath(path); internals = new LockInternals(client, driver, path, lockName, maxLeases); }
可見(jiàn)構(gòu)造方法最終初始化了兩個(gè)屬性,basePath被設(shè)置為我們傳入的值 "/curator/lock",這是鎖的根節(jié)點(diǎn)。此外就是初始化了internals,前面說(shuō)過(guò)internals是真正實(shí)現(xiàn)鎖功能的對(duì)象。真正干活的是internals。
構(gòu)造完InterProcessMutex對(duì)象后,我們看看它是如何工作的。
方法
InterProcessMutex實(shí)現(xiàn)InterProcessLock接口,關(guān)于分布式鎖的幾個(gè)方法都在這個(gè)接口中,我們看看InterProcessMutex是如何實(shí)現(xiàn)的。
獲得鎖
獲得鎖有兩個(gè)方法,區(qū)別為是否限定了等待鎖的時(shí)間長(zhǎng)度。其實(shí)最終都是調(diào)用的私有方法internalLock()。不限定等待時(shí)長(zhǎng)的代碼如下:
public void acquire() throws Exception { if ( !internalLock(-1, null) ) { throw new IOException("Lost connection while trying to acquire lock: " + basePath); } }
可以看到internalLock()返回false時(shí),只可能因?yàn)檫B接超時(shí),否則會(huì)一直等待獲取鎖。
internalLock邏輯如下:
- 取得當(dāng)前線(xiàn)程在threadData中的lockData
- 如果存在該線(xiàn)程的鎖數(shù)據(jù),說(shuō)明是鎖重入, lockData.lockCount加1,直接返回true。獲取鎖成功
- 如果不存在該線(xiàn)程的鎖數(shù)據(jù),則通過(guò)internals.attemptLock()獲取鎖,此時(shí)線(xiàn)程被阻塞,直至獲得到鎖
- 鎖獲取成功后,把鎖的信息保存到threadData中。
- 如果沒(méi)能獲取到鎖,則返回false。
完整代碼如下:
private boolean internalLock(long time, TimeUnit unit) throws Exception { /* Note on concurrency: a given lockData instance can be only acted on by a single thread so locking isn't necessary */ Thread currentThread = Thread.currentThread(); LockData lockData = threadData.get(currentThread); if ( lockData != null ) { // re-entering lockData.lockCount.incrementAndGet(); return true; } String lockPath = internals.attemptLock(time, unit, getLockNodeBytes()); if ( lockPath != null ) { LockData newLockData = new LockData(currentThread, lockPath); threadData.put(currentThread, newLockData); return true; } return false; }
可以看到獲取鎖的核心代碼是internals.attemptLock
釋放鎖
釋放鎖的方法為release(),邏輯如下:
從threadData中取得當(dāng)前線(xiàn)程的鎖數(shù)據(jù),有如下情況:
不存在,拋出無(wú)此鎖的異常
存在,而且lockCount-1后大于零,說(shuō)明該線(xiàn)程鎖重入了,所以直接返回,并不在zk中釋放。
存在,而且lockCount-1后小于零,說(shuō)明有某種異常發(fā)生,直接拋異常
存在,而且lockCount-1等于零,這是無(wú)重入的正確狀態(tài),需要做的就是從zk中刪除臨時(shí)節(jié)點(diǎn),通過(guò)internals.releaseLock(),不管結(jié)果如何,在threadData中移除該線(xiàn)程的數(shù)據(jù)。
InterProcessMutex小結(jié)
分布式鎖主要用到的是上面兩個(gè)方法,InterProcessMutex還有些其他的方法,這里就不做具體講解,可以自己看一下,實(shí)現(xiàn)都不復(fù)雜。
通過(guò)對(duì)InterProcessMutex的講解,相信我們已經(jīng)對(duì)鎖的獲得和釋放有了了解,應(yīng)該也意識(shí)到真正實(shí)現(xiàn)鎖的是LockInternals類(lèi)。接下來(lái)我們將重點(diǎn)講解LockInternals。
LockInternals源碼分析
Curator通過(guò)zk實(shí)現(xiàn)分布式鎖的核心邏輯都在LockInternals中,我們按獲取鎖到釋放鎖的流程為指引,逐步分析LockInternals的源代碼。
獲取鎖
在InterProcessMutex獲取鎖的代碼分析中,可以看到它是通過(guò)internals.attemptLock(time, unit, getLockNodeBytes());來(lái)獲取鎖的,那么我們就以這個(gè)方法為入口。此方法的邏輯比較簡(jiǎn)單,如下:
通過(guò)driver在zk上創(chuàng)建鎖節(jié)點(diǎn),獲得鎖節(jié)點(diǎn)路徑。
通過(guò)internalLockLoop()方法阻塞進(jìn)程,直到獲取鎖成功。
核心代碼如下:
ourPath = driver.createsTheLock(client, path, localLockNodeBytes); hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
我們繼續(xù)分析internalLockLoop方法,獲取鎖的核心邏輯在此方法中。
internalLockLoop中通過(guò)while自旋,判斷鎖如果沒(méi)有被獲取,將不斷的去嘗試獲取鎖。
while循環(huán)中邏輯如下:
- 通過(guò)driver查看當(dāng)前鎖節(jié)點(diǎn)序號(hào)是否排在第一位,如果排在第一位,說(shuō)明取鎖成功,跳出循環(huán)
- 如果沒(méi)有排在第一位,則監(jiān)聽(tīng)自己的前序鎖節(jié)點(diǎn),然后阻塞線(xiàn)程。
當(dāng)前序節(jié)點(diǎn)釋放了鎖,監(jiān)聽(tīng)會(huì)被觸發(fā),恢復(fù)線(xiàn)程,此時(shí)主線(xiàn)程又回到while中第一步。
重復(fù)以上邏輯,直至獲取到鎖(自己鎖的序號(hào)排在首位)。
internalLockLoop方法核心代碼如下:
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ) { List<String> children = getSortedChildren(); String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases); if ( predicateResults.getsTheLock() ) { haveTheLock = true; } else { String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch(); synchronized(this) { try { // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak client.getData().usingWatcher(watcher).forPath(previousSequencePath); if ( millisToWait != null ) { millisToWait -= (System.currentTimeMillis() - startMillis); startMillis = System.currentTimeMillis(); if ( millisToWait <= 0 ) { doDelete = true; // timed out - delete our node break; } wait(millisToWait); } else { wait(); } } catch ( KeeperException.NoNodeException e ) { // it has been deleted (i.e. lock released). Try to acquire again } } } }
獲取鎖的主要代碼邏輯我們到這就已經(jīng)分析完了,可見(jiàn)和我們自己的實(shí)現(xiàn)還是基本一樣的。此外上面提到了driver對(duì)象,也就是StandardLockInternalsDriver類(lèi),它提供了一些輔助的方法,比如說(shuō)在zk創(chuàng)建鎖節(jié)點(diǎn),判斷zk上鎖序列第一位是否為當(dāng)前鎖,鎖序列的排序邏輯等。我們就不具體講解了。
釋放鎖
釋放鎖的邏輯很簡(jiǎn)單,移除watcher,刪除鎖節(jié)點(diǎn)。代碼如下:
final void releaseLock(String lockPath) throws Exception { client.removeWatchers(); revocable.set(null); deleteOurPath(lockPath); }
總結(jié)
至此,Curator中InterProcessMutex的源代碼分析全部完成。
簡(jiǎn)單回顧下,InterProcessMutex類(lèi)封裝上層邏輯,對(duì)外暴露鎖的使用方法。而真正的鎖實(shí)現(xiàn)邏輯在LockInternals中,它通過(guò)對(duì)zk臨時(shí)有序鎖節(jié)點(diǎn)的創(chuàng)建和監(jiān)控,判斷自己的鎖序號(hào)是否在首位,來(lái)實(shí)現(xiàn)鎖的獲取。此外它還結(jié)合StandardLockInternalsDriver提供的方法,共同實(shí)現(xiàn)了排他鎖。
希望大家以后多多支持腳本之家!
相關(guān)文章
Spring AOP 切面@Around注解的用法說(shuō)明
這篇文章主要介紹了Spring AOP 切面@Around注解的用法說(shuō)明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-02-02spring boot實(shí)現(xiàn)自動(dòng)輸出word文檔功能的實(shí)例代碼
這篇文章主要介紹了spring boot實(shí)現(xiàn)自動(dòng)輸出word文檔功能的實(shí)例代碼,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-04-04maven多模塊項(xiàng)目依賴(lài)管理與依賴(lài)?yán)^承詳解
這篇文章主要介紹了maven多模塊項(xiàng)目依賴(lài)管理與依賴(lài)?yán)^承詳解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12Spring?Kafka中如何通過(guò)參數(shù)配置解決超時(shí)問(wèn)題詳解
這篇文章主要給大家介紹了關(guān)于Spring?Kafka中如何通過(guò)參數(shù)配置解決超時(shí)問(wèn)題的相關(guān)資料,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2022-01-01分析xxljob登入功能集成OIDC的統(tǒng)一認(rèn)證
這篇文章主要為大家介紹分析xxljob登入功能集成OIDC的統(tǒng)一認(rèn)證的詳解說(shuō)明,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步2022-02-02解決SpringMVC使用@RequestBody注解報(bào)400錯(cuò)誤的問(wèn)題
這篇文章主要介紹了解決SpringMVC使用@RequestBody注解報(bào)400錯(cuò)誤的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-09-09Java中對(duì)list map根據(jù)map某個(gè)key值進(jìn)行排序的方法
今天小編就為大家分享一篇Java中對(duì)list map根據(jù)map某個(gè)key值進(jìn)行排序的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-07-07