ZooKeeper入門教程三分布式鎖實現及完整運行源碼
ZooKeeper入門教程二在單機和集群環(huán)境下的安裝搭建及使用
1.0版本
首先我們先介紹一個簡單的zookeeper實現分布式鎖的思路:
用zookeeper中一個臨時節(jié)點代表鎖,比如在/exlusive_lock下創(chuàng)建臨時子節(jié)點/exlusive_lock/lock。
- 所有客戶端爭相創(chuàng)建此節(jié)點,但只有一個客戶端創(chuàng)建成功。
- 創(chuàng)建成功代表獲取鎖成功,此客戶端執(zhí)行業(yè)務邏輯
- 未創(chuàng)建成功的客戶端,監(jiān)聽/exlusive_lock變更
- 獲取鎖的客戶端執(zhí)行完成后,刪除/exlusive_lock/lock,表示鎖被釋放
- 鎖被釋放后,其他監(jiān)聽/exlusive_lock變更的客戶端得到通知,再次爭相創(chuàng)建臨時子節(jié)點/exlusive_lock/lock。此時相當于回到了第2步。
我們的程序按照上述邏輯直至搶占到鎖,執(zhí)行完業(yè)務邏輯。
上述是較為簡單的分布式鎖實現方式。能夠應付一般使用場景,但存在著如下兩個問題:
1、鎖的獲取順序和最初客戶端爭搶順序不一致,這不是一個公平鎖。每次鎖獲取都是當次最先搶到鎖的客戶端。
2、羊群效應,所有沒有搶到鎖的客戶端都會監(jiān)聽/exlusive_lock變更。當并發(fā)客戶端很多的情況下,所有的客戶端都會接到通知去爭搶鎖,此時就出現了羊群效應。
為了解決上面的問題,我們重新設計。
2.0版本
我們在2.0版本中,讓每個客戶端在/exlusive_lock下創(chuàng)建的臨時節(jié)點為有序節(jié)點,這樣每個客戶端都在/exlusive_lock下有自己對應的鎖節(jié)點,而序號排在最前面的節(jié)點,代表對應的客戶端獲取鎖成功。排在后面的客戶端監(jiān)聽自己前面一個節(jié)點,那么在他前序客戶端執(zhí)行完成后,他將得到通知,獲得鎖成功。邏輯修改如下:
- 每個客戶端往/exlusive_lock下創(chuàng)建有序臨時節(jié)點/exlusive_lock/lock_。創(chuàng)建成功后/exlusive_lock下面會有每個客戶端對應的節(jié)點,如/exlusive_lock/lock_000000001
- 客戶端取得/exlusive_lock下子節(jié)點,并進行排序,判斷排在最前面的是否為自己。如果自己的鎖節(jié)點在第一位,代表獲取鎖成功,此客戶端執(zhí)行業(yè)務邏輯
- 如果自己的鎖節(jié)點不在第一位,則監(jiān)聽自己前一位的鎖節(jié)點。例如,自己鎖節(jié)點lock_000000002,那么則監(jiān)聽lock_000000001.
- 當前一位鎖節(jié)點(lock_000000001)對應的客戶端執(zhí)行完成,釋放了鎖,將會觸發(fā)監(jiān)聽客戶端(lock_000000002)的邏輯。
- 監(jiān)聽客戶端重新執(zhí)行第2步邏輯,判斷自己是否獲得了鎖。
如此修改后,每個客戶端只關心自己前序鎖是否釋放,所以每次只會有一個客戶端得到通知。而且,所有客戶端的執(zhí)行順序和最初鎖創(chuàng)建的順序是一致的。解決了1.0版本的兩個問題。
接下來我們看看代碼如何實現。
LockSample類
此類是分布式鎖類,實現了2個分布式鎖的相關方法:
1、獲取鎖
2、釋放鎖
主要程序邏輯圍繞著這兩個方法的實現,特別是獲取鎖的邏輯。我們先看一下該類的成員變量:
private ZooKeeper zkClient; private static final String LOCK_ROOT_PATH = "/Locks"; private static final String LOCK_NODE_NAME = "Lock_"; private String lockPath;
定義了zkClient,用來操作zookeeper。
鎖的根路徑,及自增節(jié)點的前綴。此處生產環(huán)境應該由客戶端傳入。
當前鎖的路徑。
構造方法
public LockSample() throws IOException { zkClient= new ZooKeeper("localhost:2181", 10000, new Watcher() { @Override public void process(WatchedEvent event) { if(event.getState()== Event.KeeperState.Disconnected){ System.out.println("失去連接"); } } }); }
創(chuàng)建zkClient,同時創(chuàng)建了狀態(tài)監(jiān)聽。此監(jiān)聽可以去掉,這里只是打印出失去連接狀態(tài)。
獲取鎖實現
暴露出來的獲取鎖的方法為acquireLock(),邏輯很簡單:
public void acquireLock() throws InterruptedException, KeeperException { //創(chuàng)建鎖節(jié)點 createLock(); //嘗試獲取鎖 attemptLock(); }
首先創(chuàng)建鎖節(jié)點,然后嘗試去取鎖。真正的邏輯都在這兩個方法中。
createLock()
先判斷鎖的根節(jié)點/Locks是否存在,不存在的話創(chuàng)建。然后在/Locks下創(chuàng)建有序臨時節(jié)點,并設置當前的鎖路徑變量lockPath。
代碼如下:
private void createLock() throws KeeperException, InterruptedException { //如果根節(jié)點不存在,則創(chuàng)建根節(jié)點 Stat stat = zkClient.exists(LOCK_ROOT_PATH, false); if (stat == null) { zkClient.create(LOCK_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } // 創(chuàng)建EPHEMERAL_SEQUENTIAL類型節(jié)點 String lockPath = zkClient.create(LOCK_ROOT_PATH + "/" + LOCK_NODE_NAME, Thread.currentThread().getName().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(Thread.currentThread().getName() + " 鎖創(chuàng)建: " + lockPath); this.lockPath=lockPath; }
attemptLock()
這是最核心的方法,客戶端嘗試去獲取鎖,是對2.0版本邏輯的實現,這里就不再重復邏輯,直接看代碼:
private void attemptLock() throws KeeperException, InterruptedException { // 獲取Lock所有子節(jié)點,按照節(jié)點序號排序 List<String> lockPaths = null; lockPaths = zkClient.getChildren(LOCK_ROOT_PATH, false); Collections.sort(lockPaths); int index = lockPaths.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1)); // 如果lockPath是序號最小的節(jié)點,則獲取鎖 if (index == 0) { System.out.println(Thread.currentThread().getName() + " 鎖獲得, lockPath: " + lockPath); return ; } else { // lockPath不是序號最小的節(jié)點,監(jiān)聽前一個節(jié)點 String preLockPath = lockPaths.get(index - 1); Stat stat = zkClient.exists(LOCK_ROOT_PATH + "/" + preLockPath, watcher); // 假如前一個節(jié)點不存在了,比如說執(zhí)行完畢,或者執(zhí)行節(jié)點掉線,重新獲取鎖 if (stat == null) { attemptLock(); } else { // 阻塞當前進程,直到preLockPath釋放鎖,被watcher觀察到,notifyAll后,重新acquireLock System.out.println(" 等待前鎖釋放,prelocakPath:"+preLockPath); synchronized (watcher) { watcher.wait(); } attemptLock(); } } }
注意這一行代碼
Stat stat = zkClient.exists(LOCK_ROOT_PATH + "/" + preLockPath, watcher);
我們在獲取前一個節(jié)點的時候,同時設置了監(jiān)聽watcher。如果前鎖存在,則阻塞主線程。
watcher定義代碼如下:
private Watcher watcher = new Watcher() { @Override public void process(WatchedEvent event) { System.out.println(event.getPath() + " 前鎖釋放"); synchronized (this) { notifyAll(); } } };
watcher只是notifyAll,讓主線程繼續(xù)執(zhí)行,以便再次調用attemptLock(),去嘗試獲取lock。如果沒有異常情況的話,此時當前客戶端應該能夠成功獲取鎖。
釋放鎖實現
釋放鎖原語實現很簡單,參照releaseLock()方法。代碼如下:
public void releaseLock() throws KeeperException, InterruptedException { zkClient.delete(lockPath, -1); zkClient.close(); System.out.println(" 鎖釋放:" + lockPath); }
關于分布式鎖的代碼到此就講解完了,我們再看下客戶端如何使用它。
我們創(chuàng)建一個TicketSeller類,作為客戶端來使用分布式鎖。
TicketSeller類
sell()
不帶鎖的業(yè)務邏輯方法,代碼如下:
private void sell(){ System.out.println("售票開始"); // 線程隨機休眠數毫秒,模擬現實中的費時操作 int sleepMillis = (int) (Math.random() * 2000); try { //代表復雜邏輯執(zhí)行了一段時間 Thread.sleep(sleepMillis); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("售票結束"); }
僅是為了演示,sleep了一段時間。
sellTicketWithLock()
此方法中,加鎖后執(zhí)行業(yè)務邏輯,代碼如下:
public void sellTicketWithLock() throws KeeperException, InterruptedException, IOException { LockSample lock = new LockSample(); lock.acquireLock(); sell(); lock.releaseLock(); }
測試入口
接下來我們寫一個main函數做測試:
public static void main(String[] args) throws KeeperException, InterruptedException, IOException { TicketSeller ticketSeller = new TicketSeller(); for(int i=0;i<1000;i++){ ticketSeller.sellTicketWithLock(); } }
main函數中我們循環(huán)調用ticketSeller.sellTicketWithLock(),執(zhí)行加鎖后的賣票邏輯。
測試方法
1、先啟動一個java程序運行,可以看到日志輸出如下:
main 鎖創(chuàng)建: /Locks/Lock_0000000391 main 鎖獲得, lockPath: /Locks/Lock_0000000391 售票開始 售票結束 鎖釋放:/Locks/Lock_0000000391 main 鎖創(chuàng)建: /Locks/Lock_0000000392 main 鎖獲得, lockPath: /Locks/Lock_0000000392 售票開始 售票結束 鎖釋放:/Locks/Lock_0000000392 main 鎖創(chuàng)建: /Locks/Lock_0000000393 main 鎖獲得, lockPath: /Locks/Lock_0000000393 售票開始 售票結束 鎖釋放:/Locks/Lock_0000000393
可見每次執(zhí)行都是按照鎖的順序執(zhí)行,而且由于只有一個進程,并沒有鎖的爭搶發(fā)生。
2、我們再啟動一個同樣的程序,鎖的爭搶此時發(fā)生了,可以看到雙方的日志輸出如下:
程序1:
main 鎖獲得, lockPath: /Locks/Lock_0000000471 售票開始 售票結束 鎖釋放:/Locks/Lock_0000000471 main 鎖創(chuàng)建: /Locks/Lock_0000000473 等待前鎖釋放,prelocakPath:Lock_0000000472 /Locks/Lock_0000000472 前鎖釋放 main 鎖獲得, lockPath: /Locks/Lock_0000000473 售票開始 售票結束 鎖釋放:/Locks/Lock_0000000473
可以看到Lock_0000000471執(zhí)行完成后,該進程獲取的鎖為Lock_0000000473,這說明Lock_0000000472被另外一個進程創(chuàng)建了。此時Lock_0000000473在等待前鎖釋放。Lock_0000000472釋放后,Lock_0000000473才獲得鎖,然后才執(zhí)行業(yè)務邏輯。
我們再看程序2的日志:
main 鎖獲得, lockPath: /Locks/Lock_0000000472 售票開始 售票結束 鎖釋放:/Locks/Lock_0000000472 main 鎖創(chuàng)建: /Locks/Lock_0000000474 等待前鎖釋放,prelocakPath:Lock_0000000473 /Locks/Lock_0000000473 前鎖釋放 main 鎖獲得, lockPath: /Locks/Lock_0000000474 售票開始 售票結束 鎖釋放:/Locks/Lock_0000000474
可以看到,確實是進程2獲取了Lock_0000000472。
zookeeper實現分布式鎖就先講到這。注意代碼只做演示用,并不適合生產環(huán)境使用。
代碼清單如下:
1、LockSample
import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.io.IOException; import java.util.Collections; import java.util.List; public class LockSample { //ZooKeeper配置信息 private ZooKeeper zkClient; private static final String LOCK_ROOT_PATH = "/Locks"; private static final String LOCK_NODE_NAME = "Lock_"; private String lockPath; // 監(jiān)控lockPath的前一個節(jié)點的watcher private Watcher watcher = new Watcher() { @Override public void process(WatchedEvent event) { System.out.println(event.getPath() + " 前鎖釋放"); synchronized (this) { notifyAll(); } } }; public LockSample() throws IOException { zkClient= new ZooKeeper("localhost:2181", 10000, new Watcher() { @Override public void process(WatchedEvent event) { if(event.getState()== Event.KeeperState.Disconnected){ System.out.println("失去連接"); } } }); } //獲取鎖的原語實現. public void acquireLock() throws InterruptedException, KeeperException { //創(chuàng)建鎖節(jié)點 createLock(); //嘗試獲取鎖 attemptLock(); } //創(chuàng)建鎖的原語實現。在lock節(jié)點下創(chuàng)建該線程的鎖節(jié)點 private void createLock() throws KeeperException, InterruptedException { //如果根節(jié)點不存在,則創(chuàng)建根節(jié)點 Stat stat = zkClient.exists(LOCK_ROOT_PATH, false); if (stat == null) { zkClient.create(LOCK_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } // 創(chuàng)建EPHEMERAL_SEQUENTIAL類型節(jié)點 String lockPath = zkClient.create(LOCK_ROOT_PATH + "/" + LOCK_NODE_NAME, Thread.currentThread().getName().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(Thread.currentThread().getName() + " 鎖創(chuàng)建: " + lockPath); this.lockPath=lockPath; } private void attemptLock() throws KeeperException, InterruptedException { // 獲取Lock所有子節(jié)點,按照節(jié)點序號排序 List<String> lockPaths = null; lockPaths = zkClient.getChildren(LOCK_ROOT_PATH, false); Collections.sort(lockPaths); int index = lockPaths.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1)); // 如果lockPath是序號最小的節(jié)點,則獲取鎖 if (index == 0) { System.out.println(Thread.currentThread().getName() + " 鎖獲得, lockPath: " + lockPath); return ; } else { // lockPath不是序號最小的節(jié)點,監(jiān)控前一個節(jié)點 String preLockPath = lockPaths.get(index - 1); Stat stat = zkClient.exists(LOCK_ROOT_PATH + "/" + preLockPath, watcher); // 假如前一個節(jié)點不存在了,比如說執(zhí)行完畢,或者執(zhí)行節(jié)點掉線,重新獲取鎖 if (stat == null) { attemptLock(); } else { // 阻塞當前進程,直到preLockPath釋放鎖,被watcher觀察到,notifyAll后,重新acquireLock System.out.println(" 等待前鎖釋放,prelocakPath:"+preLockPath); synchronized (watcher) { watcher.wait(); } attemptLock(); } } } //釋放鎖的原語實現 public void releaseLock() throws KeeperException, InterruptedException { zkClient.delete(lockPath, -1); zkClient.close(); System.out.println(" 鎖釋放:" + lockPath); } }
2、TicketSeller
import org.apache.zookeeper.KeeperException; import java.io.IOException; public class TicketSeller { private void sell(){ System.out.println("售票開始"); // 線程隨機休眠數毫秒,模擬現實中的費時操作 int sleepMillis = (int) (Math.random() * 2000); try { //代表復雜邏輯執(zhí)行了一段時間 Thread.sleep(sleepMillis); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("售票結束"); } public void sellTicketWithLock() throws KeeperException, InterruptedException, IOException { LockSample lock = new LockSample(); lock.acquireLock(); sell(); lock.releaseLock(); } public static void main(String[] args) throws KeeperException, InterruptedException, IOException { TicketSeller ticketSeller = new TicketSeller(); for(int i=0;i<1000;i++){ ticketSeller.sellTicketWithLock(); } } }
以上就是ZooKeeper入門教程三分布式鎖實現及完整運行源碼的詳細內容,更多關于ZooKeeper分布式鎖實現源碼的資料請關注腳本之家其它相關文章!
相關文章
SpringBoot項目將mybatis升級為mybatis-plus的方法
本文主要介紹了SpringBoot項目將mybatis升級為mybatis-plus的方法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2023-01-01Java中?equals?重寫時為什么一定也要重寫?hashCode
這篇文章主要介紹了Java中?equals?重寫時為什么一定也要重寫?hashCode,equals?方法和?hashCode?方法是?Object?類中的兩個基礎方法,它們共同協作來判斷兩個對象是否相等,所以之間到底有什么聯系呢,接下來和小編一起進入文章學習該內容吧2022-05-05java 8 lambda表達式list操作分組、過濾、求和、最值、排序、去重代碼詳解
java8的lambda表達式提供了一些方便list操作的方法,主要涵蓋分組、過濾、求和、最值、排序、去重,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友參考下吧2024-01-01