欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

zookeeper實(shí)現(xiàn)分布式鎖

 更新時(shí)間:2018年05月07日 14:41:14   投稿:lijiao  
這篇文章主要為大家詳細(xì)介紹了基于zookeeper實(shí)現(xiàn)分布式鎖,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下

一、分布式鎖介紹

分布式鎖主要用于在分布式環(huán)境中保護(hù)跨進(jìn)程、跨主機(jī)、跨網(wǎng)絡(luò)的共享資源實(shí)現(xiàn)互斥訪(fǎng)問(wèn),以達(dá)到保證數(shù)據(jù)的一致性。

二、架構(gòu)介紹

 在介紹使用Zookeeper實(shí)現(xiàn)分布式鎖之前,首先看當(dāng)前的系統(tǒng)架構(gòu)圖 

      

解釋?zhuān)?左邊的整個(gè)區(qū)域表示一個(gè)Zookeeper集群,locker是Zookeeper的一個(gè)持久節(jié)點(diǎn),node_1、node_2、node_3是locker這個(gè)持久節(jié)點(diǎn)下面的臨時(shí)順序節(jié)點(diǎn)。client_1、client_2、client_n表示多個(gè)客戶(hù)端,Service表示需要互斥訪(fǎng)問(wèn)的共享資源。

三、分布式鎖獲取思路

1.獲取分布式鎖的總體思路

        在獲取分布式鎖的時(shí)候在locker節(jié)點(diǎn)下創(chuàng)建臨時(shí)順序節(jié)點(diǎn),釋放鎖的時(shí)候刪除該臨時(shí)節(jié)點(diǎn)。客戶(hù)端調(diào)用createNode方法在locker下創(chuàng)建臨時(shí)順序節(jié)點(diǎn),然后調(diào)用getChildren(“l(fā)ocker”)來(lái)獲取locker下面的所有子節(jié)點(diǎn),注意此時(shí)不用設(shè)置任何Watcher??蛻?hù)端獲取到所有的子節(jié)點(diǎn)path之后,如果發(fā)現(xiàn)自己在之前創(chuàng)建的子節(jié)點(diǎn)序號(hào)最小,那么就認(rèn)為該客戶(hù)端獲取到了鎖。如果發(fā)現(xiàn)自己創(chuàng)建的節(jié)點(diǎn)并非locker所有子節(jié)點(diǎn)中最小的,說(shuō)明自己還沒(méi)有獲取到鎖,此時(shí)客戶(hù)端需要找到比自己小的那個(gè)節(jié)點(diǎn),然后對(duì)其調(diào)用exist()方法,同時(shí)對(duì)其注冊(cè)事件監(jiān)聽(tīng)器。之后,讓這個(gè)被關(guān)注的節(jié)點(diǎn)刪除,則客戶(hù)端的Watcher會(huì)收到相應(yīng)通知,此時(shí)再次判斷自己創(chuàng)建的節(jié)點(diǎn)是否是locker子節(jié)點(diǎn)中序號(hào)最小的,如皋是則獲取到了鎖,如果不是則重復(fù)以上步驟繼續(xù)獲取到比自己小的一個(gè)節(jié)點(diǎn)并注冊(cè)監(jiān)聽(tīng)。當(dāng)前這個(gè)過(guò)程中還需要許多的邏輯判斷。

2.獲取分布式鎖的核心算法流程

        下面同個(gè)一個(gè)流程圖來(lái)分析獲取分布式鎖的完整算法,如下:

        解釋?zhuān)?/strong>客戶(hù)端A要獲取分布式鎖的時(shí)候首先到locker下創(chuàng)建一個(gè)臨時(shí)順序節(jié)點(diǎn)(node_n),然后立即獲取locker下的所有(一級(jí))子節(jié)點(diǎn)。

此時(shí)因?yàn)闀?huì)有多個(gè)客戶(hù)端同一時(shí)間爭(zhēng)取鎖,因此locker下的子節(jié)點(diǎn)數(shù)量就會(huì)大于1。對(duì)于順序節(jié)點(diǎn),特點(diǎn)是節(jié)點(diǎn)名稱(chēng)后面自動(dòng)有一個(gè)數(shù)字編號(hào),先創(chuàng)建的節(jié)點(diǎn)數(shù)字編號(hào)小于后創(chuàng)建的,因此可以將子節(jié)點(diǎn)按照節(jié)點(diǎn)名稱(chēng)后綴的數(shù)字順序從小到大排序,這樣排在第一位的就是最先創(chuàng)建的順序節(jié)點(diǎn),此時(shí)它就代表了最先爭(zhēng)取到鎖的客戶(hù)端!此時(shí)判斷最小的這個(gè)節(jié)點(diǎn)是否為客戶(hù)端A之前創(chuàng)建出來(lái)的node_n,如果是則表示客戶(hù)端A獲取到了鎖,如果不是則表示鎖已經(jīng)被其它客戶(hù)端獲取,因此客戶(hù)端A要等待它釋放鎖,也就是等待獲取到鎖的那個(gè)客戶(hù)端B把自己創(chuàng)建的那個(gè)節(jié)點(diǎn)刪除。

此時(shí)就通過(guò)監(jiān)聽(tīng)比node_n次小的那個(gè)順序節(jié)點(diǎn)的刪除事件來(lái)知道客戶(hù)端B是否已經(jīng)釋放了鎖,如果是,此時(shí)客戶(hù)端A再次獲取locker下的所有子節(jié)點(diǎn),再次與自己創(chuàng)建的node_n節(jié)點(diǎn)對(duì)比,直到自己創(chuàng)建的node_n是locker的所有子節(jié)點(diǎn)中順序號(hào)最小的,此時(shí)表示客戶(hù)端A獲取到了鎖!

四、基于Zookeeper的分布式鎖的代碼實(shí)現(xiàn)

1.定義分布式鎖接口

定義的分布式鎖接口如下:

public interface DistributedLock {
 
   /**獲取鎖,如果沒(méi)有得到就等待*/
   public void acquire() throws Exception;
 
   /**
   * 獲取鎖,直到超時(shí)
   * @param time超時(shí)時(shí)間
   * @param unit time參數(shù)的單位
   * @return是否獲取到鎖
   * @throws Exception
   */
   public boolean acquire (long time, TimeUnit unit) throws Exception;
 
   /**
    * 釋放鎖
    * @throws Exception
    */
   public void release() throws Exception;
}

2.定義一個(gè)簡(jiǎn)單的互斥鎖

        定義一個(gè)互斥鎖類(lèi),實(shí)現(xiàn)以上定義的鎖接口,同時(shí)繼承一個(gè)基類(lèi)BaseDistributedLock,該基類(lèi)主要用于與Zookeeper交互,包含一個(gè)嘗試獲取鎖的方法和一個(gè)釋放鎖。      

/**鎖接口的具體實(shí)現(xiàn),主要借助于繼承的父類(lèi)BaseDistributedLock來(lái)實(shí)現(xiàn)的接口方法
 * 該父類(lèi)是基于Zookeeper實(shí)現(xiàn)分布式鎖的具體細(xì)節(jié)實(shí)現(xiàn)*/
public class SimpleDistributedLockMutex extends BaseDistributedLock implements DistributedLock {
  /*用于保存Zookeeper中實(shí)現(xiàn)分布式鎖的節(jié)點(diǎn),如名稱(chēng)為locker:/locker,
 *該節(jié)點(diǎn)應(yīng)該是持久節(jié)點(diǎn),在該節(jié)點(diǎn)下面創(chuàng)建臨時(shí)順序節(jié)點(diǎn)來(lái)實(shí)現(xiàn)分布式鎖 */
 private final String basePath;
 
 /*鎖名稱(chēng)前綴,locker下創(chuàng)建的順序節(jié)點(diǎn)例如都以lock-開(kāi)頭,這樣便于過(guò)濾無(wú)關(guān)節(jié)點(diǎn)
 *這樣創(chuàng)建后的節(jié)點(diǎn)類(lèi)似:lock-00000001,lock-000000002*/
 private staticfinal String LOCK_NAME ="lock-";
 
 /*用于保存某個(gè)客戶(hù)端在locker下面創(chuàng)建成功的順序節(jié)點(diǎn),用于后續(xù)相關(guān)操作使用(如判斷)*/
 private String ourLockPath;
 
 /**
 * 用于獲取鎖資源,通過(guò)父類(lèi)的獲取鎖方法來(lái)獲取鎖
 * @param time獲取鎖的超時(shí)時(shí)間
 * @param unit time的時(shí)間單位
 * @return是否獲取到鎖
 * @throws Exception
 */
 private boolean internalLock (long time, TimeUnit unit) throws Exception {
  //如果ourLockPath不為空則認(rèn)為獲取到了鎖,具體實(shí)現(xiàn)細(xì)節(jié)見(jiàn)attemptLock的實(shí)現(xiàn)
  ourLockPath = attemptLock(time, unit);
  return ourLockPath !=null;
 }
 
 /**
  * 傳入Zookeeper客戶(hù)端連接對(duì)象,和basePath
  * @param client Zookeeper客戶(hù)端連接對(duì)象
  * @param basePath basePath是一個(gè)持久節(jié)點(diǎn)
  */
 public SimpleDistributedLockMutex(ZkClientExt client, String basePath){
  /*調(diào)用父類(lèi)的構(gòu)造方法在Zookeeper中創(chuàng)建basePath節(jié)點(diǎn),并且為basePath節(jié)點(diǎn)子節(jié)點(diǎn)設(shè)置前綴
  *同時(shí)保存basePath的引用給當(dāng)前類(lèi)屬性*/
  super(client,basePath,LOCK_NAME);
  this.basePath = basePath;
 }
 
 /**獲取鎖,直到超時(shí),超時(shí)后拋出異常*/
 public void acquire() throws Exception {
  //-1表示不設(shè)置超時(shí)時(shí)間,超時(shí)由Zookeeper決定
  if (!internalLock(-1,null)){
   throw new IOException("連接丟失!在路徑:'"+basePath+"'下不能獲取鎖!");
  }
 }
 
 /**
 * 獲取鎖,帶有超時(shí)時(shí)間
 */
 public boolean acquire(long time, TimeUnit unit) throws Exception {
  return internalLock(time, unit);
 }
 
 /**釋放鎖*/
 public void release()throws Exception {
  releaseLock(ourLockPath);
 }
}

3. 分布式鎖的實(shí)現(xiàn)細(xì)節(jié)

獲取分布式鎖的重點(diǎn)邏輯在于BaseDistributedLock,實(shí)現(xiàn)了基于Zookeeper實(shí)現(xiàn)分布式鎖的細(xì)節(jié)。

public class BaseDistributedLock {
 
 private final ZkClientExt client;
 private final String path;
 private final String basePath;
 private final String lockName;
 private static final Integer MAX_RETRY_COUNT = 10;
 
 public BaseDistributedLock(ZkClientExt client, String path, String lockName){
 this.client = client;
 this.basePath = path;
 this.path = path.concat("/").concat(lockName); 
 this.lockName = lockName;
 }
 
 private void deleteOurPath(String ourPath) throws Exception{
 client.delete(ourPath);
 }
 
 private String createLockNode(ZkClient client, String path) throws Exception{
 return client.createEphemeralSequential(path, null);
 }
 
 /**
 * 獲取鎖的核心方法
 * @param startMillis
 * @param millisToWait
 * @param ourPath
 * @return
 * @throws Exception
 */
 private boolean waitToLock(long startMillis, Long millisToWait, String ourPath) throws Exception{
 
 boolean haveTheLock = false;
 boolean doDelete = false;
 
 try{
  while ( !haveTheLock ) {
  //該方法實(shí)現(xiàn)獲取locker節(jié)點(diǎn)下的所有順序節(jié)點(diǎn),并且從小到大排序 
  List<String> children = getSortedChildren();
  String sequenceNodeName = ourPath.substring(basePath.length()+1);
  
  //計(jì)算剛才客戶(hù)端創(chuàng)建的順序節(jié)點(diǎn)在locker的所有子節(jié)點(diǎn)中排序位置,如果是排序?yàn)?,則表示獲取到了鎖
  int ourIndex = children.indexOf(sequenceNodeName);
  
  /*如果在getSortedChildren中沒(méi)有找到之前創(chuàng)建的[臨時(shí)]順序節(jié)點(diǎn),這表示可能由于網(wǎng)絡(luò)閃斷而導(dǎo)致
   *Zookeeper認(rèn)為連接斷開(kāi)而刪除了我們創(chuàng)建的節(jié)點(diǎn),此時(shí)需要拋出異常,讓上一級(jí)去處理
   *上一級(jí)的做法是捕獲該異常,并且執(zhí)行重試指定的次數(shù) 見(jiàn)后面的 attemptLock方法 */
  if ( ourIndex<0 ){
   throw new ZkNoNodeException("節(jié)點(diǎn)沒(méi)有找到: " + sequenceNodeName);
  }

  //如果當(dāng)前客戶(hù)端創(chuàng)建的節(jié)點(diǎn)在locker子節(jié)點(diǎn)列表中位置大于0,表示其它客戶(hù)端已經(jīng)獲取了鎖
  //此時(shí)當(dāng)前客戶(hù)端需要等待其它客戶(hù)端釋放鎖,
  boolean isGetTheLock = ourIndex == 0;
  
  //如何判斷其它客戶(hù)端是否已經(jīng)釋放了鎖?從子節(jié)點(diǎn)列表中獲取到比自己次小的哪個(gè)節(jié)點(diǎn),并對(duì)其建立監(jiān)聽(tīng)
  String pathToWatch = isGetTheLock ? null : children.get(ourIndex - 1);

  if ( isGetTheLock ){
   haveTheLock = true;
  }else{
   //如果次小的節(jié)點(diǎn)被刪除了,則表示當(dāng)前客戶(hù)端的節(jié)點(diǎn)應(yīng)該是最小的了,所以使用CountDownLatch來(lái)實(shí)現(xiàn)等待
   String previousSequencePath = basePath .concat( "/" ) .concat( pathToWatch );
   final CountDownLatch latch = new CountDownLatch(1);
   final IZkDataListener previousListener = new IZkDataListener() {
   
   //次小節(jié)點(diǎn)刪除事件發(fā)生時(shí),讓countDownLatch結(jié)束等待
   //此時(shí)還需要重新讓程序回到while,重新判斷一次!
   public void handleDataDeleted(String dataPath) throws Exception {
    latch.countDown();  
   }
   
   public void handleDataChange(String dataPath, Object data) throws Exception {
    // ignore     
   }
   };

   try{   
   //如果節(jié)點(diǎn)不存在會(huì)出現(xiàn)異常
   client.subscribeDataChanges(previousSequencePath, previousListener);
   
   if ( millisToWait != null ){
    millisToWait -= (System.currentTimeMillis() - startMillis);
    startMillis = System.currentTimeMillis();
    if ( millisToWait <= 0 ){
    doDelete = true; // timed out - delete our node
    break;
    }

    latch.await(millisToWait, TimeUnit.MICROSECONDS);
   }else{
    latch.await();
   }
   
   }catch ( ZkNoNodeException e ){
   //ignore
   }finally{
   client.unsubscribeDataChanges(previousSequencePath, previousListener);
   }
  }
  }
 }catch ( Exception e ){
  //發(fā)生異常需要?jiǎng)h除節(jié)點(diǎn)
  doDelete = true;
  throw e;
  
 }finally{
  //如果需要?jiǎng)h除節(jié)點(diǎn)
  if ( doDelete ){
  deleteOurPath(ourPath);
  }
 }
 return haveTheLock;
 } 
 
 private String getLockNodeNumber(String str, String lockName) {
 int index = str.lastIndexOf(lockName);
 if ( index >= 0 ){
  index += lockName.length();
  return index <= str.length() ? str.substring(index) : "";
 }
 return str;
 }
 
 private List<String> getSortedChildren() throws Exception {
 try{
  List<String> children = client.getChildren(basePath);
  Collections.sort(
  children,
  new Comparator<String>(){
   public int compare(String lhs, String rhs){
   return getLockNodeNumber(lhs, lockName).compareTo(getLockNodeNumber(rhs, lockName));
   }
  }
  );
  return children;
  
 }catch(ZkNoNodeException e){
  client.createPersistent(basePath, true);
  return getSortedChildren();
 }
 }
 
 
 protected void releaseLock(String lockPath) throws Exception{
 deleteOurPath(lockPath); 
 }
 
 
 protected String attemptLock(long time, TimeUnit unit) throws Exception{
 final long startMillis = System.currentTimeMillis();
 final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;

 String  ourPath = null;
 boolean  hasTheLock = false;
 boolean  isDone = false;
 int  retryCount = 0;
 
 //網(wǎng)絡(luò)閃斷需要重試一試
 while ( !isDone ){
  isDone = true;

  try{
  //createLockNode用于在locker(basePath持久節(jié)點(diǎn))下創(chuàng)建客戶(hù)端要獲取鎖的[臨時(shí)]順序節(jié)點(diǎn)
  ourPath = createLockNode(client, path);
  /**
   * 該方法用于判斷自己是否獲取到了鎖,即自己創(chuàng)建的順序節(jié)點(diǎn)在locker的所有子節(jié)點(diǎn)中是否最小
   * 如果沒(méi)有獲取到鎖,則等待其它客戶(hù)端鎖的釋放,并且稍后重試直到獲取到鎖或者超時(shí)
   */
  hasTheLock = waitToLock(startMillis, millisToWait, ourPath);
  
  }catch ( ZkNoNodeException e ){
  if ( retryCount++ < MAX_RETRY_COUNT ){
   isDone = false;
  }else{
   throw e;
  }
  }
 }
 
 if ( hasTheLock ){
  return ourPath;
 }
 
 return null;
}

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。

相關(guān)文章

  • spring-security關(guān)于hasRole的坑及解決

    spring-security關(guān)于hasRole的坑及解決

    這篇文章主要介紹了spring-security關(guān)于hasRole的坑及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-09-09
  • java 打印一字符串,并在main()方法內(nèi)調(diào)用它

    java 打印一字符串,并在main()方法內(nèi)調(diào)用它

    編寫(xiě)一個(gè)方法(名字自定,但要符合Java編碼規(guī)范),方法內(nèi)打印一字符串,并在main()方法內(nèi)調(diào)用它。
    2017-02-02
  • 詳解springcloud組件consul服務(wù)治理

    詳解springcloud組件consul服務(wù)治理

    Consul是一款由HashiCorp公司開(kāi)源的,用于服務(wù)治理的軟件,Spring Cloud Consul對(duì)其進(jìn)行了封裝,這篇文章主要介紹了springcloud組件consul服務(wù)治理,需要的朋友可以參考下
    2022-08-08
  • Java通過(guò)調(diào)用FFMPEG獲取視頻時(shí)長(zhǎng)

    Java通過(guò)調(diào)用FFMPEG獲取視頻時(shí)長(zhǎng)

    這篇文章主要為大家詳細(xì)介紹了Java通過(guò)調(diào)用FFMPEG獲取視頻時(shí)長(zhǎng),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2019-04-04
  • Java中CAS機(jī)制實(shí)現(xiàn)方法詳解

    Java中CAS機(jī)制實(shí)現(xiàn)方法詳解

    傳統(tǒng)的并發(fā)控制手段如synchronized和ReentrantLock雖有效防止資源競(jìng)爭(zhēng),卻可能引起性能開(kāi)銷(xiāo),相比之下,CAS(CompareAndSwap)提供一種輕量級(jí)的樂(lè)觀(guān)鎖策略,通過(guò)硬件級(jí)別的原子指令實(shí)現(xiàn)無(wú)鎖并發(fā),提高性能,需要的朋友可以參考下
    2024-09-09
  • Netty 輕松實(shí)現(xiàn)文件上傳功能

    Netty 輕松實(shí)現(xiàn)文件上傳功能

    今天我們來(lái)完成一個(gè)使用netty進(jìn)行文件傳輸?shù)娜蝿?wù)。在實(shí)際項(xiàng)目中,文件傳輸通常采用FTP或者HTTP附件的方式,對(duì)Netty 文件上傳功能感興趣的朋友一起看看吧
    2021-07-07
  • springboot實(shí)現(xiàn)登錄功能的完整步驟

    springboot實(shí)現(xiàn)登錄功能的完整步驟

    這篇文章主要給大家介紹了關(guān)于springboot實(shí)現(xiàn)登錄功能的完整步驟,在web應(yīng)用程序中,用戶(hù)登錄權(quán)限驗(yàn)證是非常重要的一個(gè)步驟,文中通過(guò)代碼以及圖文介紹的非常詳細(xì),需要的朋友可以參考下
    2023-09-09
  • Java中JFrame實(shí)現(xiàn)無(wú)邊框無(wú)標(biāo)題方法

    Java中JFrame實(shí)現(xiàn)無(wú)邊框無(wú)標(biāo)題方法

    這篇文章主要介紹了Java中JFrame實(shí)現(xiàn)無(wú)邊框無(wú)標(biāo)題方法,本文直接給出代碼實(shí)例,需要的朋友可以參考下
    2015-05-05
  • 使用Java實(shí)現(xiàn)2048小游戲代碼實(shí)例

    使用Java實(shí)現(xiàn)2048小游戲代碼實(shí)例

    這篇文章主要介紹了使用Java實(shí)現(xiàn)2048小游戲代碼實(shí)例,2048 游戲是一款益智類(lèi)游戲,玩家需要通過(guò)合并相同數(shù)字的方塊,不斷合成更大的數(shù)字,最終達(dá)到2048,游戲規(guī)則簡(jiǎn)單,但挑戰(zhàn)性很高,需要玩家靈活運(yùn)用策略和計(jì)算能力,本文將使用Java代碼實(shí)現(xiàn),需要的朋友可以參考下
    2023-10-10
  • Maven Plugins報(bào)錯(cuò)的解決方法

    Maven Plugins報(bào)錯(cuò)的解決方法

    本文主要介紹了Maven Plugins報(bào)錯(cuò)的解決方法,文中通過(guò)圖文介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2024-02-02

最新評(píng)論