Java zookeeper服務(wù)的使用詳解
Java語言客戶端使用zookeeper
下載zookeeper連接工具,方便我們查看zookeeper存的數(shù)據(jù)。下載地址:
https://pan.baidu.com/s/1UG5_VcYUZUYUkg04QROLYg?pwd=3ych 提取碼: 3ych
下載后解壓就可以使用了:
使用頁(yè)面:
Java語言連接z00keeper
首先引入maven 依賴jar包
<dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.9</version> </dependency>
編寫Java代碼
public class Test001 { private static final String ADDRES = "127.0.0.1:2181"; private static final int TIMAOUT = 5000; //計(jì)數(shù)器 private static CountDownLatch countDownLatch = new CountDownLatch(1); public static void main(String[] args) throws IOException, InterruptedException, KeeperException { //zk核心節(jié)點(diǎn)+事件通知 //節(jié)點(diǎn)路徑和界定啊value /** * 參數(shù)一:連接地址 * 參數(shù)二:zk超時(shí)時(shí)間 * 參數(shù)三:事件通知 */ //1、創(chuàng)建zk鏈接 ZooKeeper zooKeeper = new ZooKeeper(ADDRES, TIMAOUT, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { Event.KeeperState state = watchedEvent.getState(); if(state == Event.KeeperState.SyncConnected){ System.out.println("zk鏈接成功"); countDownLatch.countDown(); //計(jì)數(shù)器減 1 } } }); //計(jì)數(shù)器結(jié)果必須是為0 才能繼續(xù)執(zhí)行 System.out.println("zk正在等待連接"); countDownLatch.await(); System.out.println("開始創(chuàng)建我們的連接"); //2、創(chuàng)建我們的節(jié)點(diǎn) /** * 參數(shù)一:路徑名稱 * 參數(shù)二:節(jié)點(diǎn)value * 參數(shù)三:節(jié)點(diǎn)權(quán)限acl * 蠶食四:節(jié)點(diǎn)類型 臨時(shí)和永久 */ String s = zooKeeper.create("/kaico/one", "hello,boy".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(s); zooKeeper.close(); } }
可以利用連接工具查看操作結(jié)果。
zooKeeper類有很多api操作節(jié)點(diǎn),可以創(chuàng)建、刪除。
zookeeper Javaapi文檔: 點(diǎn)擊查看
四種節(jié)點(diǎn)類型
第一種:臨時(shí)節(jié)點(diǎn):會(huì)話關(guān)閉之后,就自動(dòng)消失 CreateMode.PERSISTENT_SEQUENTIAL
第二種:臨時(shí)有序節(jié)點(diǎn) CreateMode.EPHEMERAL
第三種:持久節(jié)點(diǎn):會(huì)話關(guān)閉之后,持久化到硬盤 CreateMode.PERSISTENT
第四種:持久有序節(jié)點(diǎn) CreateMode.PERSISTENT_SEQUENTIAL
ACL權(quán)限
ACL權(quán)限模型,實(shí)際上就是對(duì)樹每個(gè)節(jié)點(diǎn)實(shí)現(xiàn)控制.
身份的認(rèn)證有4種方式:
world:默認(rèn)方式,相當(dāng)于全世界都能訪問.
auth:代表已經(jīng)認(rèn)證通過的用戶(cli中可以通過addauth digest user:pwd來添加當(dāng)前上下文中的授權(quán)用戶).
digest:即用戶名:密碼這種方式認(rèn)證,這也是業(yè)務(wù)系統(tǒng)中最常用的.
ip:使用lp地址認(rèn)證。
代碼案例:使用賬號(hào)密碼實(shí)現(xiàn)權(quán)限控制
1、添加有權(quán)限控制的節(jié)點(diǎn)數(shù)據(jù)
public class Test002 { private static final String ADDRES = "127.0.0.1:2181"; private static final int TIMAOUT = 5000; //計(jì)數(shù)器 private static CountDownLatch countDownLatch = new CountDownLatch(1); public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException { /** * 參數(shù)一:連接地址 * 參數(shù)二:zk超時(shí)時(shí)間 * 參數(shù)三:事件通知 */ //1、創(chuàng)建zk鏈接 ZooKeeper zooKeeper = new ZooKeeper(ADDRES, TIMAOUT, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { Event.KeeperState state = watchedEvent.getState(); if(state == Event.KeeperState.SyncConnected){ System.out.println("zk鏈接成功"); countDownLatch.countDown(); //計(jì)數(shù)器減 1 } } }); //計(jì)數(shù)器結(jié)果必須是為0 才能繼續(xù)執(zhí)行 System.out.println("zk正在等待連接"); countDownLatch.await(); System.out.println("開始創(chuàng)建我們的連接"); //創(chuàng)建賬號(hào) admin 可以實(shí)現(xiàn)讀寫操作 Id admin = new Id("digest", DigestAuthenticationProvider.generateDigest("admin:admin123")); ACL acl1 = new ACL(ZooDefs.Perms.ALL, admin); //創(chuàng)建賬號(hào) guest 只允許做讀操作 Id guest = new Id("digest", DigestAuthenticationProvider.generateDigest("guest:guest123")); ACL acl2 = new ACL(ZooDefs.Perms.READ, guest); ArrayList<ACL> acls = new ArrayList<>(); acls.add(acl1); acls.add(acl2); //2、創(chuàng)建我們的節(jié)點(diǎn) /** * 參數(shù)一:路徑名稱 * 參數(shù)二:節(jié)點(diǎn)value * 參數(shù)三:節(jié)點(diǎn)權(quán)限acl * 蠶食四:節(jié)點(diǎn)類型 臨時(shí)和永久 */ String s = zooKeeper.create("/kaico/acl", "hello,boy".getBytes(), acls, CreateMode.PERSISTENT); System.out.println(s); zooKeeper.close(); } }
2、獲取設(shè)置了權(quán)限的節(jié)點(diǎn)數(shù)據(jù)
public class Test003 { private static final String ADDRES = "127.0.0.1:2181"; private static final int TIMAOUT = 5000; //計(jì)數(shù)器 private static CountDownLatch countDownLatch = new CountDownLatch(1); public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException { /** * 參數(shù)一:連接地址 * 參數(shù)二:zk超時(shí)時(shí)間 * 參數(shù)三:事件通知 */ //1、創(chuàng)建zk鏈接 ZooKeeper zooKeeper = new ZooKeeper(ADDRES, TIMAOUT, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { Event.KeeperState state = watchedEvent.getState(); if(state == Event.KeeperState.SyncConnected){ System.out.println("zk鏈接成功"); countDownLatch.countDown(); //計(jì)數(shù)器減 1 } } }); //計(jì)數(shù)器結(jié)果必須是為0 才能繼續(xù)執(zhí)行 System.out.println("zk正在等待連接"); countDownLatch.await(); System.out.println("開始創(chuàng)建我們的連接"); //設(shè)置一下zookeeper 的賬號(hào)才有權(quán)限獲取內(nèi)容 zooKeeper.addAuthInfo("digest", "guest:guest123".getBytes()); //獲取節(jié)點(diǎn)的內(nèi)容 byte[] data = zooKeeper.getData("/kaico/acl", null, new Stat()); System.out.println(new String(data)); zooKeeper.close(); } }
實(shí)現(xiàn)事件監(jiān)聽通知
Zookeeper實(shí)現(xiàn)基本的總結(jié):類似于文件存儲(chǔ)系統(tǒng),可以幫助我們解決分布式領(lǐng)域中遇到問題
Zookeeper分布式協(xié)調(diào)工具
特征:
- 定義的節(jié)點(diǎn)包含key (路徑)和 value ,路徑不允許有重復(fù)保證唯一性.
- Zookeeper分為四種類型持久、持久序號(hào)、臨時(shí)、臨時(shí)序號(hào).
- 持久與臨時(shí)節(jié)點(diǎn)區(qū)別:連接如果一旦關(guān)閉,當(dāng)前的節(jié)點(diǎn)自動(dòng)刪除;
- 事件通知監(jiān)聽節(jié)點(diǎn)發(fā)生的變化刪除、修改、子節(jié)點(diǎn)
Java代碼案例:
public class Test004 { private static final String ADDRES = "127.0.0.1:2181"; private static final int TIMAOUT = 5000; //計(jì)數(shù)器 private static CountDownLatch countDownLatch = new CountDownLatch(1); public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException { //1、創(chuàng)建zk 連接 ZkClient zkClient = new ZkClient(ADDRES, TIMAOUT); String parentPath = "/kaico/jing"; //2、監(jiān)聽節(jié)點(diǎn)發(fā)生的變化,監(jiān)聽子節(jié)點(diǎn)是否發(fā)生變化,如果發(fā)生變化都可以獲取到回調(diào)通知。 // zkClient.subscribeChildChanges(parentPath, new IZkChildListener() { // @Override // public void handleChildChange(String s, List<String> list) throws Exception { // System.out.println("s:" + s + ",節(jié)點(diǎn)發(fā)生了變化"); // list.forEach((t)->{ // System.out.println("子節(jié)點(diǎn):" + t); // }); // } // }); //監(jiān)聽節(jié)點(diǎn)的內(nèi)容是否發(fā)生變化或刪除 zkClient.subscribeDataChanges(parentPath, new IZkDataListener() { @Override public void handleDataChange(String s, Object o) throws Exception { System.out.println("修改的節(jié)點(diǎn)為:" + s + ",修改之后的值:" + o); } @Override public void handleDataDeleted(String s) throws Exception { System.out.println("節(jié)點(diǎn):" + s + "被刪除"); } }); //修改值內(nèi)容 zkClient.writeData(parentPath, "666666666666666"); while (true){ } // zkClient.close(); } }
微服務(wù)使用zookeeper作為注冊(cè)中心
調(diào)用接口邏輯圖
使用zookeeper實(shí)現(xiàn)邏輯:
根據(jù)服務(wù)提供方的名稱創(chuàng)建對(duì)應(yīng)的節(jié)點(diǎn),服務(wù)提供方的接口所有的ip+端口作為子節(jié)點(diǎn)的value的值,這樣服務(wù)調(diào)用方根據(jù)服務(wù)提供方的名稱在zookeeper上找到對(duì)應(yīng)的ip+端口從而可以調(diào)用對(duì)應(yīng)的接口,再監(jiān)聽該節(jié)點(diǎn),如果提供接口的機(jī)器發(fā)生宕機(jī)于zookeeper斷開連接,子節(jié)點(diǎn)也相應(yīng)的減少了,服務(wù)調(diào)用方也會(huì)收到通知。
分布式鎖
分布式鎖的概念:解決再多個(gè)jvm中最終只能有一個(gè)jvm 執(zhí)行。
zookeeper實(shí)現(xiàn)分布式鎖的思路:
節(jié)點(diǎn)保證唯一、事件通知、臨時(shí)節(jié)點(diǎn)(生命周期和Session會(huì)關(guān)聯(lián))﹒
創(chuàng)建分布式鎖原理:
1.多個(gè)jvm同時(shí)在Zookeeper 上創(chuàng)建相同的臨時(shí)節(jié)點(diǎn)(lockPath).
2. 因?yàn)榕R時(shí)節(jié)點(diǎn)路徑保證唯一的性,只要誰能夠創(chuàng)建成功誰就能夠獲取鎖,就可以開始執(zhí)
行業(yè)務(wù)邏輯;,
3.如果節(jié)點(diǎn)已經(jīng)給其他請(qǐng)求創(chuàng)建的話或者是創(chuàng)建節(jié)點(diǎn)失敗,當(dāng)前的請(qǐng)求實(shí)現(xiàn)等待;
釋放鎖的原理
因?yàn)槲覀儾捎门R時(shí)節(jié)點(diǎn),當(dāng)前節(jié)點(diǎn)創(chuàng)建成功,表示獲取鎖成功;正常執(zhí)行完業(yè)務(wù)邏輯調(diào)用Session關(guān)閉連接方法,當(dāng)前的節(jié)點(diǎn)會(huì)刪除;----釋放鎖
其他正在等待請(qǐng)求,采用事件監(jiān)聽如果當(dāng)前節(jié)點(diǎn)被刪除的話,又重新進(jìn)入到獲取鎖流程;
臨時(shí)節(jié)點(diǎn)+事件通知。
代碼實(shí)現(xiàn)分布式鎖
實(shí)現(xiàn)分布式鎖的方式有多種:數(shù)據(jù)庫(kù)、redis、zookeeper,這里使用zookeeper實(shí)現(xiàn)。
實(shí)現(xiàn)原理:
因?yàn)閆ookeeper節(jié)點(diǎn)路徑保持唯一,不允許重復(fù) 且有臨時(shí)節(jié)點(diǎn)特性連接關(guān)閉后當(dāng)前節(jié)點(diǎn)會(huì)自動(dòng)消失,從而實(shí)現(xiàn)分布式鎖。
- 多請(qǐng)求同時(shí)創(chuàng)建相同的節(jié)點(diǎn)(lockPath),只要誰能夠創(chuàng)建成功 誰就能夠獲取到鎖;
- 如果創(chuàng)建節(jié)點(diǎn)的時(shí)候,突然該節(jié)點(diǎn)已經(jīng)被其他請(qǐng)求創(chuàng)建的話則直接等待;
- 只要能夠創(chuàng)建節(jié)點(diǎn)成功,則開始進(jìn)入到正常業(yè)務(wù)邏輯操作,其他沒有獲取鎖進(jìn)行等待;
- 正常業(yè)務(wù)邏輯流程執(zhí)行完后,調(diào)用zk關(guān)閉連接方式釋放鎖,從而是其他的請(qǐng)求開始進(jìn)入到獲取鎖的資源。
使用zookeeper 實(shí)現(xiàn)分們式鎖的代碼案例
利用模板設(shè)計(jì)模式實(shí)現(xiàn)分布式鎖
1、定義鎖接口 Lock
public interface Lock { /** * 獲取鎖 */ public void getLock(); /** * 釋放鎖 */ public void unLock(); }
2、定義抽象類實(shí)現(xiàn)鎖接口 Lock ,設(shè)計(jì)其他的方法完成對(duì)鎖的操作
abstract class AbstractTemplzateLock implements Lock { @Override public void getLock() { // 模版方法 定義共同抽象的骨架 if (tryLock()) { System.out.println(">>>" + Thread.currentThread().getName() + ",獲取鎖成功"); } else { // 開始實(shí)現(xiàn)等待 waitLock();// 事件監(jiān)聽 // 重新獲取 getLock(); } } /** * 獲取鎖 * @return */ protected abstract boolean tryLock(); /** * 等待鎖 * @return */ protected abstract void waitLock(); /** * 釋放鎖 * @return */ protected abstract void unImplLock(); @Override public void unLock() { unImplLock(); } }
3、利用zookeeper實(shí)現(xiàn)鎖,繼承抽象類 AbstractTemplzateLock
public class ZkTemplzateImplLock extends AbstractTemplzateLock { //參數(shù)1 連接地址 private static final String ADDRES = "192.168.212.147:2181"; // 參數(shù)2 zk超時(shí)時(shí)間 private static final int TIMEOUT = 5000; // 創(chuàng)建我們的zk連接 private ZkClient zkClient = new ZkClient(ADDRES, TIMEOUT); /** * 共同的創(chuàng)建臨時(shí)節(jié)點(diǎn) */ private String lockPath = "/lockPath"; private CountDownLatch countDownLatch = null; @Override protected boolean tryLock() { // 獲取鎖的思想:多個(gè)jvm同時(shí)創(chuàng)建臨時(shí)節(jié)點(diǎn),只要誰能夠創(chuàng)建成功 誰能夠獲取到鎖 try { zkClient.createEphemeral(lockPath); return true; } catch (Exception e) { // // 如果創(chuàng)建已經(jīng)存在的話 // e.printStackTrace(); return false; } } @Override protected void waitLock() { // 1.使用事件監(jiān)聽 監(jiān)聽lockPath節(jié)點(diǎn)是否已經(jīng)被刪除,如果被刪除的情況下 有可以重新的進(jìn)入到獲取鎖的權(quán)限 IZkDataListener iZkDataListener = new IZkDataListener() { @Override public void handleDataChange(String s, Object o) throws Exception { } @Override public void handleDataDeleted(String s) throws Exception { if (countDownLatch != null) { countDownLatch.countDown();// 計(jì)數(shù)器變?yōu)? } } }; zkClient.subscribeDataChanges(lockPath, iZkDataListener); // 2.使用countDownLatch等待 if (countDownLatch == null) { countDownLatch = new CountDownLatch(1); } try { countDownLatch.await();// 如果當(dāng)前計(jì)數(shù)器不是為0 就一直等待 } catch (Exception e) { } // 3. 如果當(dāng)前節(jié)點(diǎn)被刪除的情況下,有需要重新進(jìn)入到獲取鎖 zkClient.unsubscribeDataChanges(lockPath, iZkDataListener); } @Override protected void unImplLock() { if (zkClient != null) { zkClient.close(); System.out.println(Thread.currentThread().getName() + ",釋放了鎖>>>"); } } }
4、編寫使用鎖的方法
//main方法使用多線程測(cè)試分布式鎖 public static void main(String[] args) { // OrderService orderService = new OrderService(); for (int i = 0; i < 100; i++) { new Thread(new OrderService()).start(); } // 單個(gè)jvm中多線程同時(shí)生成訂單號(hào)碼如果發(fā)生重復(fù) 如何解決 synchronized或者是lock鎖 // 如果在多個(gè)jvm中同時(shí)生成訂單號(hào)碼如果發(fā)生重復(fù)如何解決 // 注意synchronized或者是lock鎖 只能夠在本地的jvm中有效 // 分布式鎖的概念 } //多線程run 方法 public class OrderService implements Runnable { private OrderNumGenerator orderNumGenerator = new OrderNumGenerator(); private Lock lock = new ZkTemplzateImplLock(); @Override public void run() { getNumber(); } private void getNumber() { try { lock.getLock(); Thread.sleep(50); String number = orderNumGenerator.getNumber(); System.out.println(Thread.currentThread().getName() + ",獲取的number:" + number); // 如果zk超時(shí)了,有做數(shù)據(jù)庫(kù)寫的操作統(tǒng)一直接回滾 } catch (Exception e) { } finally { lock.unLock(); } } // ZkTemplzateImplLock父親 模版類 AbstractTemplzateLock 父親 Lock } //自動(dòng)生成訂單號(hào)的類 public class OrderNumGenerator { /** * 序號(hào) */ private static int count; /** * 生成我們的時(shí)間戳 為訂單號(hào)碼 * @return */ public String getNumber() { SimpleDateFormat simpt = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss"); try { Thread.sleep(30); } catch (Exception e) { } return simpt.format(new Date()) + "-" + ++count; } }
如何防止死鎖?
創(chuàng)建zkClient時(shí)設(shè)置session 連接時(shí)間 sessionTimeout。 也就是設(shè)置Session連接超時(shí)時(shí)間,在規(guī)定的時(shí)間內(nèi)獲取鎖后超時(shí)啦~自動(dòng)回滾當(dāng)前數(shù)據(jù)庫(kù)業(yè)務(wù)邏輯。
注意:等待鎖時(shí),zkClient注冊(cè)的事件最后需要?jiǎng)h除。
到此這篇關(guān)于Java zookeeper服務(wù)的使用詳解的文章就介紹到這了,更多相關(guān)Java zookeeper服務(wù)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
MySQL?MyBatis?默認(rèn)插入當(dāng)前時(shí)間方式
這篇文章主要介紹了MySQL?MyBatis?默認(rèn)插入當(dāng)前時(shí)間方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-10-10SpringBoot + SpringSecurity 短信驗(yàn)證碼登錄功能實(shí)現(xiàn)
這篇文章主要介紹了SpringBoot + SpringSecurity 短信驗(yàn)證碼登錄功能實(shí)現(xiàn),小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-06-06Spring Boot高效數(shù)據(jù)聚合之道深入講解
這篇文章主要給大家介紹了關(guān)于Spring Boot高效數(shù)據(jù)聚合之道的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用Spring Boot具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧2019-06-06logback和log4j日志框架堆棧信息添加TraceId方式
這篇文章主要介紹了logback和log4j日志框架堆棧信息添加TraceId方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-09-09SpringBoot中實(shí)現(xiàn)Redis緩存預(yù)熱
緩存預(yù)熱是一種在系統(tǒng)啟動(dòng)后,但在實(shí)際使用前將數(shù)據(jù)加載到緩存中的技術(shù),本文主要來和大家一起探討如何在Spring Boot應(yīng)用程序中實(shí)現(xiàn)Redis緩存預(yù)熱,以確保系統(tǒng)在處理請(qǐng)求前就已經(jīng)處于最佳狀態(tài),感興趣的可以了解下2023-11-11Java之多個(gè)線程順序循環(huán)執(zhí)行的幾種實(shí)現(xiàn)
這篇文章主要介紹了Java之多個(gè)線程順序循環(huán)執(zhí)行的幾種實(shí)現(xiàn)方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-09-09