欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

ZooKeeper入門教程三分布式鎖實現及完整運行源碼

 更新時間:2022年01月28日 14:43:52   作者:愛碼叔(稀有氣體)  
本文是ZooKeeper入門系列教程,分布式鎖有多種實現方式,比如通過數據庫、redis都可實現。作為分布式協同工具ZooKeeper,當然也有著標準的實現方式。本文介紹在zookeeper中如何實現排他鎖

ZooKeeper入門教程一簡介與核心概念

ZooKeeper入門教程二在單機和集群環(huán)境下的安裝搭建及使用

1.0版本

首先我們先介紹一個簡單的zookeeper實現分布式鎖的思路:

用zookeeper中一個臨時節(jié)點代表鎖,比如在/exlusive_lock下創(chuàng)建臨時子節(jié)點/exlusive_lock/lock。

  • 所有客戶端爭相創(chuàng)建此節(jié)點,但只有一個客戶端創(chuàng)建成功。
  • 創(chuàng)建成功代表獲取鎖成功,此客戶端執(zhí)行業(yè)務邏輯
  • 未創(chuàng)建成功的客戶端,監(jiān)聽/exlusive_lock變更
  • 獲取鎖的客戶端執(zhí)行完成后,刪除/exlusive_lock/lock,表示鎖被釋放
  • 鎖被釋放后,其他監(jiān)聽/exlusive_lock變更的客戶端得到通知,再次爭相創(chuàng)建臨時子節(jié)點/exlusive_lock/lock。此時相當于回到了第2步。

我們的程序按照上述邏輯直至搶占到鎖,執(zhí)行完業(yè)務邏輯。

上述是較為簡單的分布式鎖實現方式。能夠應付一般使用場景,但存在著如下兩個問題:

1、鎖的獲取順序和最初客戶端爭搶順序不一致,這不是一個公平鎖。每次鎖獲取都是當次最先搶到鎖的客戶端。

2、羊群效應,所有沒有搶到鎖的客戶端都會監(jiān)聽/exlusive_lock變更。當并發(fā)客戶端很多的情況下,所有的客戶端都會接到通知去爭搶鎖,此時就出現了羊群效應。

為了解決上面的問題,我們重新設計。

2.0版本

我們在2.0版本中,讓每個客戶端在/exlusive_lock下創(chuàng)建的臨時節(jié)點為有序節(jié)點,這樣每個客戶端都在/exlusive_lock下有自己對應的鎖節(jié)點,而序號排在最前面的節(jié)點,代表對應的客戶端獲取鎖成功。排在后面的客戶端監(jiān)聽自己前面一個節(jié)點,那么在他前序客戶端執(zhí)行完成后,他將得到通知,獲得鎖成功。邏輯修改如下:

  • 每個客戶端往/exlusive_lock下創(chuàng)建有序臨時節(jié)點/exlusive_lock/lock_。創(chuàng)建成功后/exlusive_lock下面會有每個客戶端對應的節(jié)點,如/exlusive_lock/lock_000000001
  • 客戶端取得/exlusive_lock下子節(jié)點,并進行排序,判斷排在最前面的是否為自己。如果自己的鎖節(jié)點在第一位,代表獲取鎖成功,此客戶端執(zhí)行業(yè)務邏輯
  • 如果自己的鎖節(jié)點不在第一位,則監(jiān)聽自己前一位的鎖節(jié)點。例如,自己鎖節(jié)點lock_000000002,那么則監(jiān)聽lock_000000001.
  • 當前一位鎖節(jié)點(lock_000000001)對應的客戶端執(zhí)行完成,釋放了鎖,將會觸發(fā)監(jiān)聽客戶端(lock_000000002)的邏輯。
  • 監(jiān)聽客戶端重新執(zhí)行第2步邏輯,判斷自己是否獲得了鎖。

如此修改后,每個客戶端只關心自己前序鎖是否釋放,所以每次只會有一個客戶端得到通知。而且,所有客戶端的執(zhí)行順序和最初鎖創(chuàng)建的順序是一致的。解決了1.0版本的兩個問題。

接下來我們看看代碼如何實現。

LockSample類

此類是分布式鎖類,實現了2個分布式鎖的相關方法:

1、獲取鎖

2、釋放鎖

主要程序邏輯圍繞著這兩個方法的實現,特別是獲取鎖的邏輯。我們先看一下該類的成員變量:

private ZooKeeper zkClient;
private static final String LOCK_ROOT_PATH = "/Locks";
private static final String LOCK_NODE_NAME = "Lock_";
private String lockPath;

定義了zkClient,用來操作zookeeper。

鎖的根路徑,及自增節(jié)點的前綴。此處生產環(huán)境應該由客戶端傳入。

當前鎖的路徑。

構造方法

public LockSample() throws IOException {
    zkClient= new ZooKeeper("localhost:2181", 10000, new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            if(event.getState()== Event.KeeperState.Disconnected){
                System.out.println("失去連接");
 
            }
        }
    });
}

創(chuàng)建zkClient,同時創(chuàng)建了狀態(tài)監(jiān)聽。此監(jiān)聽可以去掉,這里只是打印出失去連接狀態(tài)。

獲取鎖實現

暴露出來的獲取鎖的方法為acquireLock(),邏輯很簡單:

public  void acquireLock() throws InterruptedException, KeeperException {
    //創(chuàng)建鎖節(jié)點
    createLock();
    //嘗試獲取鎖
    attemptLock();
}

首先創(chuàng)建鎖節(jié)點,然后嘗試去取鎖。真正的邏輯都在這兩個方法中。

createLock()

先判斷鎖的根節(jié)點/Locks是否存在,不存在的話創(chuàng)建。然后在/Locks下創(chuàng)建有序臨時節(jié)點,并設置當前的鎖路徑變量lockPath。

代碼如下:

private void createLock() throws KeeperException, InterruptedException {
    //如果根節(jié)點不存在,則創(chuàng)建根節(jié)點
    Stat stat = zkClient.exists(LOCK_ROOT_PATH, false);
    if (stat == null) {
        zkClient.create(LOCK_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }
 
    // 創(chuàng)建EPHEMERAL_SEQUENTIAL類型節(jié)點
    String lockPath = zkClient.create(LOCK_ROOT_PATH + "/" + LOCK_NODE_NAME,
            Thread.currentThread().getName().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
            CreateMode.EPHEMERAL_SEQUENTIAL);
    System.out.println(Thread.currentThread().getName() + " 鎖創(chuàng)建: " + lockPath);
    this.lockPath=lockPath;
}

attemptLock()

這是最核心的方法,客戶端嘗試去獲取鎖,是對2.0版本邏輯的實現,這里就不再重復邏輯,直接看代碼:

private void attemptLock() throws KeeperException, InterruptedException {
    // 獲取Lock所有子節(jié)點,按照節(jié)點序號排序
    List<String> lockPaths = null;
    lockPaths = zkClient.getChildren(LOCK_ROOT_PATH, false);
    Collections.sort(lockPaths);
    int index = lockPaths.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1));
    // 如果lockPath是序號最小的節(jié)點,則獲取鎖
    if (index == 0) {
        System.out.println(Thread.currentThread().getName() + " 鎖獲得, lockPath: " + lockPath);
        return ;
    } else {
        // lockPath不是序號最小的節(jié)點,監(jiān)聽前一個節(jié)點
        String preLockPath = lockPaths.get(index - 1);
 
        Stat stat = zkClient.exists(LOCK_ROOT_PATH + "/" + preLockPath, watcher);
 
        // 假如前一個節(jié)點不存在了,比如說執(zhí)行完畢,或者執(zhí)行節(jié)點掉線,重新獲取鎖
        if (stat == null) {
            attemptLock();
        } else { // 阻塞當前進程,直到preLockPath釋放鎖,被watcher觀察到,notifyAll后,重新acquireLock
            System.out.println(" 等待前鎖釋放,prelocakPath:"+preLockPath);
            synchronized (watcher) {
                watcher.wait();
            }
            attemptLock();
        }
    }
}

注意這一行代碼

Stat stat = zkClient.exists(LOCK_ROOT_PATH + "/" + preLockPath, watcher);

我們在獲取前一個節(jié)點的時候,同時設置了監(jiān)聽watcher。如果前鎖存在,則阻塞主線程。

watcher定義代碼如下:

private Watcher watcher = new Watcher() {
    @Override
    public void process(WatchedEvent event) {
        System.out.println(event.getPath() + " 前鎖釋放");
        synchronized (this) {
            notifyAll();
        }
    }
};

watcher只是notifyAll,讓主線程繼續(xù)執(zhí)行,以便再次調用attemptLock(),去嘗試獲取lock。如果沒有異常情況的話,此時當前客戶端應該能夠成功獲取鎖。

釋放鎖實現

釋放鎖原語實現很簡單,參照releaseLock()方法。代碼如下:

public void releaseLock() throws KeeperException, InterruptedException {
    zkClient.delete(lockPath, -1);
    zkClient.close();
    System.out.println(" 鎖釋放:" + lockPath);
}

關于分布式鎖的代碼到此就講解完了,我們再看下客戶端如何使用它。

我們創(chuàng)建一個TicketSeller類,作為客戶端來使用分布式鎖。

 TicketSeller類

sell()

不帶鎖的業(yè)務邏輯方法,代碼如下:

private void sell(){
    System.out.println("售票開始");
    // 線程隨機休眠數毫秒,模擬現實中的費時操作
    int sleepMillis = (int) (Math.random() * 2000);
    try {
        //代表復雜邏輯執(zhí)行了一段時間
        Thread.sleep(sleepMillis);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("售票結束");
}

僅是為了演示,sleep了一段時間。

sellTicketWithLock()

此方法中,加鎖后執(zhí)行業(yè)務邏輯,代碼如下:

public void sellTicketWithLock() throws KeeperException, InterruptedException, IOException {
    LockSample lock = new LockSample();
    lock.acquireLock();
    sell();
    lock.releaseLock();
}

測試入口

接下來我們寫一個main函數做測試:

public static void main(String[] args) throws KeeperException, InterruptedException, IOException {
    TicketSeller ticketSeller = new TicketSeller();
    for(int i=0;i<1000;i++){
        ticketSeller.sellTicketWithLock();
    }
}

main函數中我們循環(huán)調用ticketSeller.sellTicketWithLock(),執(zhí)行加鎖后的賣票邏輯。

測試方法

1、先啟動一個java程序運行,可以看到日志輸出如下:

main 鎖創(chuàng)建: /Locks/Lock_0000000391
main 鎖獲得, lockPath: /Locks/Lock_0000000391
售票開始
售票結束
 鎖釋放:/Locks/Lock_0000000391
main 鎖創(chuàng)建: /Locks/Lock_0000000392
main 鎖獲得, lockPath: /Locks/Lock_0000000392
售票開始
售票結束
 鎖釋放:/Locks/Lock_0000000392
main 鎖創(chuàng)建: /Locks/Lock_0000000393
main 鎖獲得, lockPath: /Locks/Lock_0000000393
售票開始
售票結束
 鎖釋放:/Locks/Lock_0000000393

可見每次執(zhí)行都是按照鎖的順序執(zhí)行,而且由于只有一個進程,并沒有鎖的爭搶發(fā)生。

2、我們再啟動一個同樣的程序,鎖的爭搶此時發(fā)生了,可以看到雙方的日志輸出如下:

程序1:

main 鎖獲得, lockPath: /Locks/Lock_0000000471
售票開始
售票結束
 鎖釋放:/Locks/Lock_0000000471
main 鎖創(chuàng)建: /Locks/Lock_0000000473
 等待前鎖釋放,prelocakPath:Lock_0000000472
/Locks/Lock_0000000472 前鎖釋放
main 鎖獲得, lockPath: /Locks/Lock_0000000473
售票開始
售票結束
 鎖釋放:/Locks/Lock_0000000473

可以看到Lock_0000000471執(zhí)行完成后,該進程獲取的鎖為Lock_0000000473,這說明Lock_0000000472被另外一個進程創(chuàng)建了。此時Lock_0000000473在等待前鎖釋放。Lock_0000000472釋放后,Lock_0000000473才獲得鎖,然后才執(zhí)行業(yè)務邏輯。

我們再看程序2的日志:

main 鎖獲得, lockPath: /Locks/Lock_0000000472
售票開始
售票結束
 鎖釋放:/Locks/Lock_0000000472
main 鎖創(chuàng)建: /Locks/Lock_0000000474
 等待前鎖釋放,prelocakPath:Lock_0000000473
/Locks/Lock_0000000473 前鎖釋放
main 鎖獲得, lockPath: /Locks/Lock_0000000474
售票開始
售票結束
 鎖釋放:/Locks/Lock_0000000474

可以看到,確實是進程2獲取了Lock_0000000472。

zookeeper實現分布式鎖就先講到這。注意代碼只做演示用,并不適合生產環(huán)境使用。

代碼清單如下:

1、LockSample

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
 
import java.io.IOException;
import java.util.Collections;
import java.util.List;
 
public class LockSample {
 
    //ZooKeeper配置信息
    private ZooKeeper zkClient;
    private static final String LOCK_ROOT_PATH = "/Locks";
    private static final String LOCK_NODE_NAME = "Lock_";
    private String lockPath;
 
    // 監(jiān)控lockPath的前一個節(jié)點的watcher
    private Watcher watcher = new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            System.out.println(event.getPath() + " 前鎖釋放");
            synchronized (this) {
                notifyAll();
            }
 
        }
    };
 
    public LockSample() throws IOException {
        zkClient= new ZooKeeper("localhost:2181", 10000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if(event.getState()== Event.KeeperState.Disconnected){
                    System.out.println("失去連接");
 
                }
            }
        });
    }
 
    //獲取鎖的原語實現.
    public  void acquireLock() throws InterruptedException, KeeperException {
        //創(chuàng)建鎖節(jié)點
        createLock();
        //嘗試獲取鎖
        attemptLock();
    }
 
    //創(chuàng)建鎖的原語實現。在lock節(jié)點下創(chuàng)建該線程的鎖節(jié)點
    private void createLock() throws KeeperException, InterruptedException {
        //如果根節(jié)點不存在,則創(chuàng)建根節(jié)點
        Stat stat = zkClient.exists(LOCK_ROOT_PATH, false);
        if (stat == null) {
            zkClient.create(LOCK_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
 
        // 創(chuàng)建EPHEMERAL_SEQUENTIAL類型節(jié)點
        String lockPath = zkClient.create(LOCK_ROOT_PATH + "/" + LOCK_NODE_NAME,
                Thread.currentThread().getName().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println(Thread.currentThread().getName() + " 鎖創(chuàng)建: " + lockPath);
        this.lockPath=lockPath;
    }
 
    private void attemptLock() throws KeeperException, InterruptedException {
        // 獲取Lock所有子節(jié)點,按照節(jié)點序號排序
        List<String> lockPaths = null;
 
        lockPaths = zkClient.getChildren(LOCK_ROOT_PATH, false);
 
        Collections.sort(lockPaths);
 
        int index = lockPaths.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1));
 
        // 如果lockPath是序號最小的節(jié)點,則獲取鎖
        if (index == 0) {
            System.out.println(Thread.currentThread().getName() + " 鎖獲得, lockPath: " + lockPath);
            return ;
        } else {
            // lockPath不是序號最小的節(jié)點,監(jiān)控前一個節(jié)點
            String preLockPath = lockPaths.get(index - 1);
 
            Stat stat = zkClient.exists(LOCK_ROOT_PATH + "/" + preLockPath, watcher);
 
            // 假如前一個節(jié)點不存在了,比如說執(zhí)行完畢,或者執(zhí)行節(jié)點掉線,重新獲取鎖
            if (stat == null) {
                attemptLock();
            } else { // 阻塞當前進程,直到preLockPath釋放鎖,被watcher觀察到,notifyAll后,重新acquireLock
                System.out.println(" 等待前鎖釋放,prelocakPath:"+preLockPath);
                synchronized (watcher) {
                    watcher.wait();
                }
                attemptLock();
            }
        }
    }
 
    //釋放鎖的原語實現
    public void releaseLock() throws KeeperException, InterruptedException {
        zkClient.delete(lockPath, -1);
        zkClient.close();
        System.out.println(" 鎖釋放:" + lockPath);
    }
 
 
}

2、TicketSeller

import org.apache.zookeeper.KeeperException;
import java.io.IOException;
public class TicketSeller {
    private void sell(){
        System.out.println("售票開始");
        // 線程隨機休眠數毫秒,模擬現實中的費時操作
        int sleepMillis = (int) (Math.random() * 2000);
        try {
            //代表復雜邏輯執(zhí)行了一段時間
            Thread.sleep(sleepMillis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("售票結束");
    }
 
    public void sellTicketWithLock() throws KeeperException, InterruptedException, IOException {
        LockSample lock = new LockSample();
        lock.acquireLock();
        sell();
        lock.releaseLock();
    }
 
    public static void main(String[] args) throws KeeperException, InterruptedException, IOException {
        TicketSeller ticketSeller = new TicketSeller();
        for(int i=0;i<1000;i++){
            ticketSeller.sellTicketWithLock();
 
        }
    }
}

以上就是ZooKeeper入門教程三分布式鎖實現及完整運行源碼的詳細內容,更多關于ZooKeeper分布式鎖實現源碼的資料請關注腳本之家其它相關文章!

相關文章

  • SpringBoot項目將mybatis升級為mybatis-plus的方法

    SpringBoot項目將mybatis升級為mybatis-plus的方法

    本文主要介紹了SpringBoot項目將mybatis升級為mybatis-plus的方法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2023-01-01
  • 多線程死鎖的產生以及如何避免死鎖方法(詳解)

    多線程死鎖的產生以及如何避免死鎖方法(詳解)

    下面小編就為大家?guī)硪黄嗑€程死鎖的產生以及如何避免死鎖方法(詳解)。小編覺得挺不錯的,現在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-04-04
  • SpringBoot項目如何將Bean注入到普通類中

    SpringBoot項目如何將Bean注入到普通類中

    這篇文章主要介紹了SpringBoot項目如何將Bean注入到普通類中,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-11-11
  • IDEA配置Gradle及Gradle安裝的實現步驟

    IDEA配置Gradle及Gradle安裝的實現步驟

    本文主要介紹了IDEA配置Gradle及Gradle安裝的實現步驟,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2023-08-08
  • Java中?equals?重寫時為什么一定也要重寫?hashCode

    Java中?equals?重寫時為什么一定也要重寫?hashCode

    這篇文章主要介紹了Java中?equals?重寫時為什么一定也要重寫?hashCode,equals?方法和?hashCode?方法是?Object?類中的兩個基礎方法,它們共同協作來判斷兩個對象是否相等,所以之間到底有什么聯系呢,接下來和小編一起進入文章學習該內容吧
    2022-05-05
  • spring?項目實現限流方法示例

    spring?項目實現限流方法示例

    這篇文章主要為大家介紹了spring項目實現限流的方法示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-07-07
  • javaweb頁面附件、圖片下載及打開(實現方法)

    javaweb頁面附件、圖片下載及打開(實現方法)

    下面小編就為大家?guī)硪黄猨avaweb頁面附件、圖片下載及打開(實現方法)。小編覺得挺不錯的,現在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-06-06
  • java 8 lambda表達式list操作分組、過濾、求和、最值、排序、去重代碼詳解

    java 8 lambda表達式list操作分組、過濾、求和、最值、排序、去重代碼詳解

    java8的lambda表達式提供了一些方便list操作的方法,主要涵蓋分組、過濾、求和、最值、排序、去重,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友參考下吧
    2024-01-01
  • 利用POI生成EXCEL文件的方法實例

    利用POI生成EXCEL文件的方法實例

    Apache POI 是用Java編寫的免費開源的跨平臺的 Java API,Apache POI提供API給Java程式對Microsoft Office格式檔案讀和寫的功能,下面這篇文章主要給大家介紹了關于利用POI生成EXCEL文件的相關資料,需要的朋友可以參考下
    2018-07-07
  • Spring Boot實現微信小程序登錄

    Spring Boot實現微信小程序登錄

    這篇文章主要為大家詳細介紹了Spring Boot實現微信小程序登錄,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2021-04-04

最新評論