zookeeper實戰(zhàn)之實現(xiàn)分布式鎖的方法
一、分布式鎖的通用實現(xiàn)思路
分布式鎖的概念以及常規(guī)解決方案可以參考之前的博客:聊聊分布式鎖的解決方案;今天我們先分析下分布式鎖的實現(xiàn)思路;
- 首先,需要保證唯一性,即某一時點只能有一個線程訪問某一資源;比方說待辦短信通知功能,每天早上九點短信提醒所有工單的處理人處理工單,假設(shè)服務(wù)部署了20個容器,那么早上九點的時候會有20個線程啟動準(zhǔn)備發(fā)送短信,此時我們只能讓一個線程執(zhí)行短信發(fā)送,否則用戶會收到20條相同的短信;
- 其次,需要考慮下何時應(yīng)該釋放鎖?這又分三種情況,一是拿到鎖的線程正常結(jié)束,另一種是獲取鎖的線程異常退出,還有種是獲取鎖的線程一直阻塞;第一種情況直接釋放即可,第二種情況可以通過定義下鎖的過期時間然后通過定時任務(wù)去釋放鎖;zk的話直接通過臨時節(jié)點即可;最后一種阻塞的情況也可以通過定時任務(wù)來釋放,但是需要根據(jù)業(yè)務(wù)來綜合判斷,如果業(yè)務(wù)本身就是長時間耗時的操作那么鎖的過期時間就得設(shè)置的久一點
- 最后,當(dāng)拿到鎖的線程釋放鎖的時候,如何通知其他線程可以搶鎖了呢
這里簡單介紹兩種解決方案,一種是所有需要鎖的線程主動輪詢,固定時間去訪問下看鎖是否釋放,但是這種方案無端增加服務(wù)器壓力并且時效性無法保證;另一種就是zk的watch,監(jiān)聽鎖所在的目錄,一有變化立馬得到通知
二、ZK實現(xiàn)分布式鎖的思路
zk通過每個線程在同一父目錄下創(chuàng)建臨時有序節(jié)點,然后通過比較節(jié)點的id大小來實現(xiàn)分布式鎖功能;再通過zk的watch機制實時獲取節(jié)點的狀態(tài),如果被刪除立即重新爭搶鎖;具體流程見線圖:
提示:需要關(guān)注下圖里判斷自身不是最小節(jié)點時的監(jiān)聽情況,為什么不監(jiān)聽父節(jié)點?原因圖里已有描述,這里就不再贅述
三、ZK實現(xiàn)分布式鎖的編碼實現(xiàn)
1、核心工具類實現(xiàn)
通過不斷的調(diào)試,我封裝了一個ZkLockHelper
類,里面封裝了上鎖和釋放鎖的方法,為了方便我將zk的一些監(jiān)聽和回調(diào)機智也融合到一起了,并沒有抽出來,下面貼上該類的全部代碼
package com.darling.service.zookeeper.lock; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.junit.platform.commons.util.StringUtils; import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.concurrent.CountDownLatch; /** * @description: * @author: dll * @date: Created in 2022/11/4 8:41 * @version: * @modified By: */ @Data @Slf4j public class ZkLockHelper implements AsyncCallback.StringCallback, AsyncCallback.StatCallback,Watcher, AsyncCallback.ChildrenCallback { private final String lockPath = "/lockItem"; ZooKeeper zkClient; String threadName; CountDownLatch cd = new CountDownLatch(1); private String pathName; /** * 上鎖 */ public void tryLock() { try { log.info("線程:{}正在創(chuàng)建節(jié)點",threadName); zkClient.create(lockPath,(threadName).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,this,"AAA"); log.info("線程:{}正在阻塞......",threadName); // 由于上面是異步創(chuàng)建所以這里需要阻塞住當(dāng)前線程 cd.await(); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 釋放鎖 */ public void unLock() { try { zkClient.delete(pathName,-1); System.out.println(threadName + " 工作結(jié)束...."); } catch (Exception e) { e.printStackTrace(); } } /** * create方法的回調(diào),創(chuàng)建成功后在此處獲取/DCSLock的子目錄,比較節(jié)點ID是否最小,是則拿到鎖。。。 * @param rc 狀態(tài)碼 * @param path create方法的path入?yún)? * @param ctx create方法的上下文入?yún)? * @param name 創(chuàng)建成功的臨時有序節(jié)點的名稱,即在path的后面加上了zk維護的自增ID; * 注意如果創(chuàng)建的不是有序節(jié)點,那么此處的name和path的內(nèi)容一致 */ @Override public void processResult(int rc, String path, Object ctx, String name) { log.info(">>>>>>>>>>>>>>>>>processResult,rx:{},path:{},ctx:{},name:{}",rc,path,ctx.toString(),name); if (StringUtils.isNotBlank(name)) { try { pathName = name ; // 此處path需注意要寫/ zkClient.getChildren("/", false,this,"123"); // List<String> children = zkClient.getChildren("/", false); // log.info(">>>>>threadName:{},children:{}",threadName,children); // // 給children排序 // Collections.sort(children); // int i = children.indexOf(pathName.substring(1)); // // 判斷自身是否第一個 // if (Objects.equals(i,0)) { // // 是第一個則表示搶到了鎖 // log.info("線程{}搶到了鎖",threadName); // cd.countDown(); // }else { // // 表示沒搶到鎖 // log.info("線程{}搶鎖失敗,重新注冊監(jiān)聽器",threadName); // zkClient.exists("/"+children.get(i-1),this,this,"AAA"); // } } catch (Exception e) { e.printStackTrace(); } } } /** * exists方法的回調(diào),此處暫不做處理 * @param rc * @param path * @param ctx * @param stat */ @Override public void processResult(int rc, String path, Object ctx, Stat stat) { } /** * exists的watch監(jiān)聽 * @param event */ @Override public void process(WatchedEvent event) { //如果第一個線程鎖釋放了,等價于第一個線程刪除了節(jié)點,此時只有第二個線程會監(jiān)控的到 switch (event.getType()) { case None: break; case NodeCreated: break; case NodeDeleted: zkClient.getChildren("/", false,this,"123"); // // 此處path需注意要寫"/" // List<String> children = null; // try { // children = zkClient.getChildren("/", false); // } catch (KeeperException e) { // e.printStackTrace(); // } catch (InterruptedException e) { // e.printStackTrace(); // } // log.info(">>>>>threadName:{},children:{}",threadName,children); // // 給children排序 // Collections.sort(children); // int i = children.indexOf(pathName.substring(1)); // // 判斷自身是否第一個 // if (Objects.equals(i,0)) { // // 是第一個則表示搶到了鎖 // log.info("線程{}搶到了鎖",threadName); // cd.countDown(); // }else { // /** // * 表示沒搶到鎖;需要判斷前置節(jié)點存不存在,其實這里并不是特別關(guān)心前置節(jié)點存不存在,所以其回調(diào)可以不處理; // * 但是這里關(guān)注的前置節(jié)點的監(jiān)聽,當(dāng)前置節(jié)點監(jiān)聽到被刪除時就是其他線程搶鎖之時 // */ // zkClient.exists("/"+children.get(i-1),this,this,"AAA"); // } break; case NodeDataChanged: break; case NodeChildrenChanged: break; } } /** * getChildren方法的回調(diào) * @param rc * @param path * @param ctx * @param children */ @Override public void processResult(int rc, String path, Object ctx, List<String> children) { try { log.info(">>>>>threadName:{},children:{}", threadName, children); if (Objects.isNull(children)) { return; } // 給children排序 Collections.sort(children); int i = children.indexOf(pathName.substring(1)); // 判斷自身是否第一個 if (Objects.equals(i, 0)) { // 是第一個則表示搶到了鎖 log.info("線程{}搶到了鎖", threadName); cd.countDown(); } else { // 表示沒搶到鎖 log.info("線程{}搶鎖失敗,重新注冊監(jiān)聽器", threadName); /** * 表示沒搶到鎖;需要判斷前置節(jié)點存不存在,其實這里并不是特別關(guān)心前置節(jié)點存不存在,所以其回調(diào)可以不處理; * 但是這里關(guān)注的前置節(jié)點的監(jiān)聽,當(dāng)前置節(jié)點監(jiān)聽到被刪除時就是其他線程搶鎖之時 */ zkClient.exists("/" + children.get(i - 1), this, this, "AAA"); } } catch (Exception e) { e.printStackTrace(); } } }
提示:代碼中注釋的代碼塊可以關(guān)注下,原本是直接阻塞式編程,將獲取所有子節(jié)點并釋放鎖的操作直接寫在getChildren方法的回調(diào)里,后來發(fā)現(xiàn)當(dāng)節(jié)點被刪除時我們還要重新?lián)屾i,那么代碼就冗余了,于是結(jié)合響應(yīng)式編程的思想,將這段核心代碼放到
getChildren方法的回調(diào)
里,這樣代碼簡潔了并且可以讓業(yè)務(wù)更只關(guān)注于getChildren
這件事了
2、測試代碼編寫
線程安全問題復(fù)現(xiàn)
package com.darling.service.zookeeper.lock; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.junit.Test; /** * @description: 開啟是個線程給i做遞減操作,未加鎖的情況下會有線程安全問題 * @author: dll * @date: Created in 2022/11/8 8:32 * @version: * @modified By: */ @Slf4j public class ZkLockTest02 { private int i = 10; @Test public void test() throws InterruptedException { for (int n = 0; n < 10; n++) { new Thread(new Runnable() { @SneakyThrows @Override public void run() { Thread.sleep(100); incre(); } }).start(); } Thread.sleep(5000); log.info("i = {}",i); } /** * i遞減 線程不安全 */ public void incre(){ // i.incrementAndGet(); log.info("當(dāng)前線程:{},i = {}",Thread.currentThread().getName(),i--); } }
上面代碼運行結(jié)果如下:
使用上面封裝的ZkLockHelper
實現(xiàn)的分布式鎖
package com.darling.service.zookeeper.lock; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.zookeeper.ZooKeeper; import org.junit.After; import org.junit.Before; import org.junit.Test; /** * @description: 使用zk實現(xiàn)的分布式鎖解決線程安全問題 * @author: dll * @date: Created in 2022/11/8 8:32 * @version: * @modified By: */ @Slf4j public class ZkLockTest03 { ZooKeeper zkClient; @Before public void conn (){ zkClient = ZkUtil.getZkClient(); } @After public void close (){ try { zkClient.close(); } catch (InterruptedException e) { e.printStackTrace(); } } private int i = 10; @Test public void test() throws InterruptedException { for (int n = 0; n < 10; n++) { new Thread(new Runnable() { @SneakyThrows @Override public void run() { Thread.sleep(100); ZkLockHelper zkHelper = new ZkLockHelper(); // 這里給zkHelper設(shè)置threadName是為了后續(xù)調(diào)試的時候日志打印,便于觀察存在的問題 String threadName = Thread.currentThread().getName(); zkHelper.setThreadName(threadName); zkHelper.setZkClient(zkClient); // tryLock上鎖 zkHelper.tryLock(); incre(); log.info("線程{}正在執(zhí)行業(yè)務(wù)代碼...",threadName); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } // 釋放鎖 zkHelper.unLock(); } }).start(); } while (true) { } } /** * i遞減 線程不安全 */ public void incre(){ // i.incrementAndGet(); log.info("☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆當(dāng)前線程:{},i = {}",Thread.currentThread().getName(),i--); } }
運行結(jié)果如下:
由于日志中摻雜著zk的日志所有此處并未截全,但是也能看到i是在按規(guī)律遞減的,不會出現(xiàn)通過線程拿到相同值的情況
四、zk實現(xiàn)分布式鎖的優(yōu)缺點
優(yōu)點
- 集群部署不存在單點故障問題
- 統(tǒng)一視圖
zk集群每個節(jié)點對外提供的數(shù)據(jù)是一致的,數(shù)據(jù)一致性有所報障 - 臨時有序節(jié)點
zk提供臨時有序節(jié)點,這樣當(dāng)客戶端失去連接時會自動釋放鎖,不用像其他方案一樣當(dāng)拿到鎖的實例服務(wù)不可用時,需要定時任務(wù)去刪除鎖;臨時節(jié)點的特性就是當(dāng)客戶端失去連接會自動刪除 - watch能力加持
當(dāng)獲取不到鎖時,無需客戶端定期輪詢爭搶,只需watch前一節(jié)點即可,當(dāng)有變化時會及時通知,比普通方案即及時又高效;注意這里最好只watch前一節(jié)點,如果watch整個父目錄的話,當(dāng)客戶端并發(fā)較大時會不斷有請求進出zk,給zk性能帶來壓力
缺點
- 與單機版redis比較的話性能肯定較差,但是當(dāng)客戶端集群足夠龐大且業(yè)務(wù)量足夠多時肯定還是集群更加穩(wěn)定
好了,zk實現(xiàn)分布式鎖的編碼實現(xiàn)就到這了,后續(xù)有時間再寫偏redis的,其實思路縷清了,編碼實現(xiàn)還是很簡單的
到此這篇關(guān)于zookeeper實戰(zhàn)之實現(xiàn)分布式鎖的方法的文章就介紹到這了,更多相關(guān)zookeeper分布式鎖內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
springboot 啟動如何修改application.properties的參數(shù)
這篇文章主要介紹了springboot 啟動如何修改application.properties的參數(shù)方法,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-08-08java Arrays快速打印數(shù)組的數(shù)據(jù)元素列表案例
這篇文章主要介紹了java Arrays快速打印數(shù)組的數(shù)據(jù)元素列表案例,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-09-09SpringBoot中過濾器Filter+JWT令牌實現(xiàn)登錄驗證
本文主要介紹了SpringBoot中過濾器Filter+JWT令牌實現(xiàn)登錄驗證,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2024-04-04Java實現(xiàn)Dbhelper支持大數(shù)據(jù)增刪改
這篇文章主要介紹了Java實現(xiàn)Dbhelper支持大數(shù)據(jù)增刪改功能的實現(xiàn)過程,感興趣的小伙伴們可以參考一下2016-01-01解決SpringCloud下spring-boot-maven-plugin插件的打包問題
這篇文章主要介紹了SpringCloud下spring-boot-maven-plugin插件的打包問題,本文給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-03-03