InterProcessMutex實(shí)現(xiàn)zookeeper分布式鎖原理
原理簡介:
zookeeper實(shí)現(xiàn)分布式鎖的原理就是多個(gè)節(jié)點(diǎn)同時(shí)在一個(gè)指定的節(jié)點(diǎn)下面創(chuàng)建臨時(shí)會(huì)話順序節(jié)點(diǎn),誰創(chuàng)建的節(jié)點(diǎn)序號最小,誰就獲得了鎖,并且其他節(jié)點(diǎn)就會(huì)監(jiān)聽序號比自己小的節(jié)點(diǎn),一旦序號比自己小的節(jié)點(diǎn)被刪除了,其他節(jié)點(diǎn)就會(huì)得到相應(yīng)的事件,然后查看自己是否為序號最小的節(jié)點(diǎn),如果是,則獲取鎖。
zookeeper節(jié)點(diǎn)圖分析
InterProcessMutex實(shí)現(xiàn)的鎖機(jī)制是公平且互斥的,公平的方式是按照每個(gè)請求的順序進(jìn)行排隊(duì)的。
InterProcessMutex實(shí)現(xiàn)的InterProcessLock接口,InterProcessLock主要規(guī)范了如下幾個(gè)方法:
// 獲取互斥鎖 public void acquire() throws Exception; // 在給定的時(shí)間內(nèi)獲取互斥鎖 public boolean acquire(long time, TimeUnit unit) throws Exception; // 釋放鎖處理 public void release() throws Exception; // 如果此JVM中的線程獲取了互斥鎖,則返回true boolean isAcquiredInThisProcess();
接下來我們看看InterProcessMutex中的實(shí)現(xiàn),它究竟有哪些屬性,以及實(shí)現(xiàn)細(xì)節(jié)
public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> { // LockInternals是真正實(shí)現(xiàn)操作zookeeper的類,它內(nèi)部包含連接zookeeper客戶端的CuratorFramework // LockInternals的具體實(shí)現(xiàn)后面我會(huì)講到 private final LockInternals internals; // basePath是鎖的根結(jié)點(diǎn),所有的臨時(shí)有序的節(jié)點(diǎn)都是basePath的子節(jié)點(diǎn), private final String basePath; // private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap(); // LockData封裝了請求對應(yīng)的線程(owningThread)、鎖的重入的次數(shù)(lockCount)、線程對應(yīng)的臨時(shí)節(jié)點(diǎn)(lockPath) private static class LockData { final Thread owningThread; final String lockPath; // 原子性的 final AtomicInteger lockCount = new AtomicInteger(1); private LockData(Thread owningThread, String lockPath) { this.owningThread = owningThread; this.lockPath = lockPath; } } private static final String LOCK_NAME = "lock-"; // 獲取互斥鎖,阻塞【InterProcessLock的實(shí)現(xiàn)】 @Override public void acquire() throws Exception { // 獲取鎖,一直等待 if ( !internalLock(-1, null) ) { throw new IOException("Lost connection while trying to acquire lock: " + basePath); } } // 獲取互斥鎖,指定時(shí)間time【InterProcessLock的實(shí)現(xiàn)】 @Override public boolean acquire(long time, TimeUnit unit) throws Exception { return internalLock(time, unit); } // 當(dāng)前線程是否占用鎖中【InterProcessLock的實(shí)現(xiàn)】 @Override public boolean isAcquiredInThisProcess() { return (threadData.size() > 0); } //如果調(diào)用線程與獲取互斥鎖的線程相同,則執(zhí)行一次互斥鎖釋放。如果線程已多次調(diào)用acquire,當(dāng)此方法返回時(shí),互斥鎖仍將保留 【InterProcessLock的實(shí)現(xiàn)】 @Override public void release() throws Exception { Thread currentThread = Thread.currentThread(); //當(dāng)前線程 LockData lockData = threadData.get(currentThread); //線程對應(yīng)的鎖信息 if ( lockData == null ) { throw new IllegalMonitorStateException("You do not own the lock: " + basePath); } // 因?yàn)楂@取到的鎖是可重入的,對lockCount進(jìn)行減1,lockCount=0時(shí)才是真正釋放鎖 int newLockCount = lockData.lockCount.decrementAndGet(); if ( newLockCount > 0 ) { return; } if ( newLockCount < 0 ) { throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath); } try { // 到這里時(shí)lockCount=0,具體釋放鎖的操作交給LockInternals中的releaseLock方法實(shí)現(xiàn) internals.releaseLock(lockData.lockPath); } finally { threadData.remove(currentThread); } } // 獲取basePath根結(jié)點(diǎn)下的所有臨時(shí)節(jié)點(diǎn)的有序集合 public Collection<String> getParticipantNodes() throws Exception { return LockInternals.getParticipantNodes(internals.getClient(), basePath, internals.getLockName(), internals.getDriver()); } boolean isOwnedByCurrentThread() { LockData lockData = threadData.get(Thread.currentThread()); return (lockData != null) && (lockData.lockCount.get() > 0); } protected String getLockPath() { LockData lockData = threadData.get(Thread.currentThread()); return lockData != null ? lockData.lockPath : null; } // acquire()中調(diào)用的internalLock()方法 private boolean internalLock(long time, TimeUnit unit) throws Exception { Thread currentThread = Thread.currentThread(); LockData lockData = threadData.get(currentThread); if ( lockData != null ) { // 如果當(dāng)前線程已經(jīng)獲取到了鎖,那么將重入次數(shù)lockCount+1,返回true lockData.lockCount.incrementAndGet(); return true; } // attemptLock方法是獲取鎖的真正實(shí)現(xiàn),lockPath是當(dāng)前線程成功在basePath下創(chuàng)建的節(jié)點(diǎn),若lockPath不為空代表成功獲取到鎖 String lockPath = internals.attemptLock(time, unit, getLockNodeBytes()); if ( lockPath != null ) { // lockPath封裝到當(dāng)前線程對應(yīng)的鎖信息中 LockData newLockData = new LockData(currentThread, lockPath); threadData.put(currentThread, newLockData); return true; } return false; } }
接下來我們看看InterProcessMutex中使用的LockInternals類的實(shí)現(xiàn)細(xì)節(jié)
public class LockInternals { private final CuratorFramework client; // 連接zookeeper的客戶端 private final String path; // 等于basePath,InterProcessMutex中傳進(jìn)來的 private final String basePath; // 根結(jié)點(diǎn) private final LockInternalsDriver driver; // 操作zookeeper節(jié)點(diǎn)的driver private final String lockName; // "lock-" private final AtomicReference<RevocationSpec> revocable = new AtomicReference<RevocationSpec>(null); private final CuratorWatcher revocableWatcher = new CuratorWatcher() { @Override public void process(WatchedEvent event) throws Exception { if ( event.getType() == Watcher.Event.EventType.NodeDataChanged ) { checkRevocableWatcher(event.getPath()); } } }; // 監(jiān)聽節(jié)點(diǎn)的監(jiān)聽器,若被監(jiān)聽的節(jié)點(diǎn)有動(dòng)靜,則喚醒 notifyFromWatcher()=>notifyAll(); private final Watcher watcher = new Watcher() { @Override public void process(WatchedEvent event) { notifyFromWatcher(); } }; private volatile int maxLeases; // 獲取basePath的子節(jié)點(diǎn),排序后的 public static List<String> getSortedChildren(CuratorFramework client, String basePath, final String lockName, final LockInternalsSorter sorter) throws Exception { List<String> children = client.getChildren().forPath(basePath); List<String> sortedList = Lists.newArrayList(children); Collections.sort ( sortedList, new Comparator<String>() { @Override public int compare(String lhs, String rhs) { return sorter.fixForSorting(lhs, lockName).compareTo(sorter.fixForSorting(rhs, lockName)); } } ); return sortedList; } // 嘗試獲取鎖【internalLock=>attemptLock】 String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception { // 開始時(shí)間 final long startMillis = System.currentTimeMillis(); // 記錄等待時(shí)間 final Long millisToWait = (unit != null) ? unit.toMillis(time) : null; final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes; // 重試次數(shù) int retryCount = 0; // 當(dāng)前節(jié)點(diǎn) String ourPath = null; // 是否獲取到鎖的標(biāo)志 boolean hasTheLock = false; // 是否放棄獲取到標(biāo)志 boolean isDone = false; // 不停嘗試獲取 while ( !isDone ) { isDone = true; try { // 創(chuàng)建當(dāng)前線程對應(yīng)的節(jié)點(diǎn) ourPath = driver.createsTheLock(client, path, localLockNodeBytes); // internalLockLoop中獲取 hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath); } catch ( KeeperException.NoNodeException e ) { // 是否可再次嘗試 if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ) { isDone = false; } else { throw e; } } } // 獲取到鎖后,返回當(dāng)前線程對應(yīng)創(chuàng)建的節(jié)點(diǎn)路徑 if ( hasTheLock ) { return ourPath; } return null; } // 循環(huán)獲取【attemptLock=>internalLockLoop】 private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception { boolean haveTheLock = false; // 是否擁有分布式鎖 boolean doDelete = false; // 是否需要?jiǎng)h除當(dāng)前節(jié)點(diǎn) try { if ( revocable.get() != null ) { client.getData().usingWatcher(revocableWatcher).forPath(ourPath); } // 循環(huán)嘗試獲取鎖 while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ) { // 得到basePath下排序后的臨時(shí)子節(jié)點(diǎn) List<String> children = getSortedChildren(); // 獲取之前創(chuàng)建的當(dāng)前線程對應(yīng)的子節(jié)點(diǎn) String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash // 判斷是否獲取到鎖,沒有就返回監(jiān)聽路徑 PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases); // 成功獲取到 if ( predicateResults.getsTheLock() ) { haveTheLock = true; } else { // 沒有獲取到鎖,監(jiān)聽前一個(gè)臨時(shí)順序節(jié)點(diǎn) String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch(); synchronized(this) { try { // 上一個(gè)臨時(shí)順序節(jié)點(diǎn)如果被刪除,會(huì)喚醒當(dāng)前線程繼續(xù)競爭鎖 client.getData().usingWatcher(watcher).forPath(previousSequencePath); if ( millisToWait != null ) { millisToWait -= (System.currentTimeMillis() - startMillis); startMillis = System.currentTimeMillis(); // 獲取鎖超時(shí) if ( millisToWait <= 0 ) { doDelete = true; // timed out - delete our node break; } wait(millisToWait); } else { wait(); } } catch ( KeeperException.NoNodeException e ) { // it has been deleted (i.e. lock released). Try to acquire again } } } } } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); doDelete = true; throw e; } finally { if ( doDelete ) { // 因?yàn)楂@取鎖超時(shí),所以刪除之前創(chuàng)建的臨時(shí)子節(jié)點(diǎn) deleteOurPath(ourPath); } } return haveTheLock; } private void deleteOurPath(String ourPath) throws Exception { try { // 刪除 client.delete().guaranteed().forPath(ourPath); } catch ( KeeperException.NoNodeException e ) { // ignore - already deleted (possibly expired session, etc.) } } }
StandardLockInternalsDriver implements LockInternalsDriver
// 前面internalLockLoop方法中driver.getsTheLock執(zhí)行的方法 @Override public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception { // 獲取子節(jié)點(diǎn)在臨時(shí)順序節(jié)點(diǎn)列表中的位置 int ourIndex = children.indexOf(sequenceNodeName); // 檢驗(yàn)子節(jié)點(diǎn)在臨時(shí)順序節(jié)點(diǎn)列表中是否有效 validateOurIndex(sequenceNodeName, ourIndex); // 若當(dāng)前子節(jié)點(diǎn)的位置<maxLeases,代表可獲取鎖【maxLeases默認(rèn)=1,若ourIndex=0,代筆自己位置最小】 boolean getsTheLock = ourIndex < maxLeases; // getsTheLock=true,則不需要監(jiān)聽前maxLeases的節(jié)點(diǎn)【maxLeases默認(rèn)=1,代表監(jiān)聽前面最靠近自己的節(jié)點(diǎn)】 String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases); return new PredicateResults(pathToWatch, getsTheLock); }
用InterProcessMutex在自己業(yè)務(wù)實(shí)現(xiàn)分布式鎖,請點(diǎn)擊此鏈接閱讀點(diǎn)我
到此這篇關(guān)于InterProcessMutex實(shí)現(xiàn)zookeeper分布式鎖原理的文章就介紹到這了,更多相關(guān)InterProcessMutex實(shí)現(xiàn)zookeeper分布式鎖內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
MyBatis Plus Mapper CRUD接口測試方式
在數(shù)據(jù)庫管理系統(tǒng)中,插入記錄是添加新數(shù)據(jù)條目,而刪除操作包括根據(jù)主鍵ID單條刪除和批量刪除,也可以基于特定條件進(jìn)行刪除,刪除操作的SQL語句是通過鍵值對在Map中拼接而成,如delete from 表 where key1=value1 AND key2=value22024-09-09基于Java web服務(wù)器簡單實(shí)現(xiàn)一個(gè)Servlet容器
這篇文章主要為大家詳細(xì)介紹了基于Java web服務(wù)器簡單實(shí)現(xiàn)一個(gè)Servlet容器,感興趣的小伙伴們可以參考一下2016-06-06如何實(shí)現(xiàn)springboot中controller之間的相互調(diào)用
這篇文章主要介紹了實(shí)現(xiàn)springboot中controller之間的相互調(diào)用方式,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-06-06Spring Cloud中FeignClient實(shí)現(xiàn)文件上傳功能
這篇文章主要為大家詳細(xì)介紹了Spring Cloud中FeignClient實(shí)現(xiàn)文件上傳功能,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2019-04-04springMvc異步的DeferredResult long?polling應(yīng)用示例解析
這篇文章主要為大家介紹了springMvc中DeferredResult的long?polling應(yīng)用示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-03-03解決一個(gè)JSON反序列化問題的辦法(空字符串變?yōu)榭占?
在平時(shí)的業(yè)務(wù)開發(fā)中,經(jīng)常會(huì)有拿到一串序列化后的字符串要來反序列化,下面這篇文章主要給大家介紹了如何解決一個(gè)JSON反序列化問題的相關(guān)資料,空字符串變?yōu)榭占?需要的朋友可以參考下2024-03-03java 輸入一個(gè)數(shù)字,反轉(zhuǎn)輸出這個(gè)數(shù)字的值(實(shí)現(xiàn)方法)
下面小編就為大家?guī)硪黄猨ava 輸入一個(gè)數(shù)字,反轉(zhuǎn)輸出這個(gè)數(shù)字的值(實(shí)現(xiàn)方法)。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2016-10-10