在Java中操作Zookeeper的示例代碼詳解
依賴
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.6.0</version> </dependency>
連接到zkServer
//連接字符串,zkServer的ip、port,如果是集群逗號分隔 String connectStr = "192.168.1.9:2181"; //zookeeper就是一個zkCli ZooKeeper zooKeeper = null; try { //初始次數(shù)為1。后面要在內(nèi)部類中使用,三種寫法:1、寫成外部類成員變量,不用加final;2、作為函數(shù)局部變量,放在try外面,寫成final;3、寫在try中,不加final CountDownLatch countDownLatch = new CountDownLatch(1); //超時時間ms,監(jiān)聽器 zooKeeper = new ZooKeeper(connectStr, 5000, new Watcher() { public void process(WatchedEvent watchedEvent) { //如果狀態(tài)變成已連接 if (watchedEvent.getState().equals(Event.KeeperState.SyncConnected)) { System.out.println("連接成功"); //次數(shù)-1 countDownLatch.countDown(); } } }); //等待,次數(shù)為0時才會繼續(xù)往下執(zhí)行。等待監(jiān)聽器監(jiān)聽到連接成功,才能操作zk countDownLatch.await(); } catch (IOException | InterruptedException e) { e.printStackTrace(); } //...操作zk。后面的demo都是寫在此處的 //關(guān)閉連接 try { zooKeeper.close(); } catch (InterruptedException e) { e.printStackTrace(); }
檢測節(jié)點(diǎn)是否存在
// 檢測節(jié)點(diǎn)是否存在 // 同步方式 Stat exists = null; try { //如果存在,返回節(jié)點(diǎn)狀態(tài)Stat;如果不存在,返回null。第二個參數(shù)是watch exists = zooKeeper.exists("/mall",false); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } if (exists==null){ System.out.println("節(jié)點(diǎn)不存在"); } else { System.out.println("節(jié)點(diǎn)存在"); } //異步回調(diào) zooKeeper.exists("/mall",false, new AsyncCallback.StatCallback() { //第二個是path znode路徑,第三個是ctx 后面?zhèn)魅雽?shí)參,第四個是znode的狀態(tài) public void processResult(int i, String s, Object o, Stat stat) { //如果節(jié)點(diǎn)不存在,返回的stat是null if (stat==null){ System.out.println("節(jié)點(diǎn)不存在"); } else{ System.out.println("節(jié)點(diǎn)存在"); } } // 傳入ctx,Object類型 },null);
操作后,服務(wù)端會返回處理結(jié)果,返回void、null也算處理結(jié)果。
同步指的是當(dāng)前線程阻塞,等待服務(wù)端返回數(shù)據(jù),收到返回的數(shù)據(jù)才繼續(xù)往下執(zhí)行;
異步回調(diào)指的是,把對結(jié)果(返回的數(shù)據(jù))的處理寫在回調(diào)函數(shù)中,當(dāng)前線程不等待返回的數(shù)據(jù),繼續(xù)往下執(zhí)行,收到返回的數(shù)據(jù)時自動調(diào)用回調(diào)函數(shù)來處理。
如果處理返回數(shù)據(jù)的代碼之后的操作,不依賴返回數(shù)據(jù)、對返回數(shù)據(jù)的處理,那么可以把返回數(shù)據(jù)的處理寫成回調(diào)函數(shù)。
創(chuàng)建節(jié)點(diǎn)
//創(chuàng)建節(jié)點(diǎn) //同步方式 try { //數(shù)據(jù)要寫成byte[],不攜帶數(shù)據(jù)寫成null;默認(rèn)acl權(quán)限使用ZooDefs.Ids.OPEN_ACL_UNSAFE;最后一個是節(jié)點(diǎn)類型,P是永久,E是臨時,S是有序 zooKeeper.create("/mall", "abcd".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("已創(chuàng)建節(jié)點(diǎn)/mall"); //如果節(jié)點(diǎn)已存在,會拋出異常 } catch (KeeperException | InterruptedException e) { System.out.println("創(chuàng)建節(jié)點(diǎn)/mall失敗,請檢查節(jié)點(diǎn)是否已存在"); e.printStackTrace(); } //異步回調(diào) zooKeeper.create("/mall", "abcd".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new AsyncCallback.Create2Callback(){ //第二個path,第三個ctx,第四個節(jié)點(diǎn)狀態(tài) public void processResult(int i, String s, Object o, String s1, Stat stat) { //回調(diào)方式不拋出異常,返回的stat是創(chuàng)建節(jié)點(diǎn)的狀態(tài),如果節(jié)點(diǎn)已存在,返回的stat是null if (stat==null){ System.out.println("創(chuàng)建節(jié)點(diǎn)/mall失敗,請檢查節(jié)點(diǎn)是否已存在"); } else { System.out.println("節(jié)點(diǎn)創(chuàng)建成功"); } } //ctx實(shí)參 },null);
刪除節(jié)點(diǎn)
//刪除節(jié)點(diǎn) //同步方式 try { //第二個參數(shù)是版本號,-1表示可以是任何版本 zooKeeper.delete("/mall1",-1); System.out.println("成功刪除節(jié)點(diǎn)/mall"); } catch (InterruptedException | KeeperException e) { System.out.println("刪除節(jié)點(diǎn)/mall失敗"); e.printStackTrace(); } //異步回調(diào) zooKeeper.delete("/mall2", -1, new AsyncCallback.VoidCallback() { //第二個是path,第三個是ctx public void processResult(int i, String s, Object o) { } // //ctx實(shí)參 },null);
delete()只能刪除沒有子節(jié)點(diǎn)的znode,如果該znode有子節(jié)點(diǎn)會拋出異常。
沒有提供遞歸刪除子節(jié)點(diǎn)的方法,如果要刪除帶有子節(jié)點(diǎn)的znode,需要自己實(shí)現(xiàn)遞歸刪除??梢韵萭etChildren()獲取子節(jié)點(diǎn)列表,遍歷列表依次刪除子節(jié)點(diǎn),再刪除父節(jié)點(diǎn)。
獲取子節(jié)點(diǎn)列表
//獲取子節(jié)點(diǎn)列表,List<String>,比如/mall/user,/mall/order,返回的是["user"、"order"] //同步方式 List<String> children = null; try { //第二個參數(shù)是watch children = zooKeeper.getChildren("/mall", false); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } System.out.println("子節(jié)點(diǎn)列表:" + children); //異步 zooKeeper.getChildren("/mall", false, new AsyncCallback.ChildrenCallback() { //第二個起依次是:path、ctx、返回的子節(jié)點(diǎn)列表 public void processResult(int i, String s, Object o, List<String> list) { System.out.println("子節(jié)點(diǎn)列表:" + list); } //ctx實(shí)參 }, null);
只獲取子節(jié)點(diǎn),不獲取孫節(jié)點(diǎn)。
watch都是:可以寫boolean,要添加監(jiān)聽就寫true,不監(jiān)聽寫false;也可以寫Watcher對象,new一個Watcher對象表示要監(jiān)聽,null表示不監(jiān)聽。
獲取節(jié)點(diǎn)數(shù)據(jù)
//獲取節(jié)點(diǎn)數(shù)據(jù),返回byte[] //同步方式 byte[] data = null; try { //第二個參數(shù)是watch,第三個是stat data = zooKeeper.getData("/mall", false, null); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } //調(diào)用new String()時要判斷data是否為null,如果是null會拋NPE if (data==null){ System.out.println("該節(jié)點(diǎn)沒有數(shù)據(jù)"); } else{ System.out.println("節(jié)點(diǎn)數(shù)據(jù):"+new String(data)); } //異步回調(diào) zooKeeper.getData("/mall", false, new AsyncCallback.DataCallback() { //第二個起依次是:path、ctx、返回的節(jié)點(diǎn)數(shù)據(jù)、節(jié)點(diǎn)狀態(tài) public void processResult(int i, String s, Object o, byte[] bytes, Stat stat) { //不必判斷bytes是否是null,如果節(jié)點(diǎn)沒有數(shù)據(jù),不會調(diào)用回調(diào)函數(shù);執(zhí)行到此,說明bytes不是null System.out.println("節(jié)點(diǎn)數(shù)據(jù):" + new String(bytes) ); } //ctx實(shí)參 }, null);
設(shè)置|修改節(jié)點(diǎn)數(shù)據(jù)
//設(shè)置|更新節(jié)點(diǎn)據(jù) //同步方式 try { //最后一個參數(shù)是版本號 zooKeeper.setData("/mall", "1234".getBytes(), -1); System.out.println("設(shè)置節(jié)點(diǎn)數(shù)據(jù)成功"); } catch (KeeperException | InterruptedException e) { System.out.println("設(shè)置節(jié)點(diǎn)數(shù)據(jù)失敗"); e.printStackTrace(); } //異步回調(diào) zooKeeper.setData("/mall", "1234".getBytes(), -1, new AsyncCallback.StatCallback() { //第二個是path,第三個是ctx public void processResult(int i, String s, Object o, Stat stat) { } // ctx },null);
設(shè)置acl權(quán)限
//設(shè)置acl權(quán)限 //第一個參數(shù)指定權(quán)限,第二個是Id對象 ACL acl = new ACL(ZooDefs.Perms.ALL, new Id("auth", "chy:abcd")); List<ACL> aclList = new ArrayList<>(); aclList.add(acl); //如果List中只有一個ACL對象,也可以這樣寫 //List<ACL> aclList = Collections.singletonList(auth); //驗(yàn)證權(quán)限,需寫在設(shè)置權(quán)限之前。如果之前沒有設(shè)置權(quán)限,也需要先驗(yàn)證本次即將設(shè)置的用戶 zooKeeper.addAuthInfo("digest","chy:abcd".getBytes()); //方式一 setAcl try { //第二個參數(shù)是List<ACL>,第三個參數(shù)是版本號 zooKeeper.setACL("/mall", aclList, -1); System.out.println("設(shè)置權(quán)限成功"); } catch (KeeperException | InterruptedException e) { System.out.println("設(shè)置權(quán)限失敗"); e.printStackTrace(); } //方式二 在創(chuàng)建節(jié)點(diǎn)時設(shè)置權(quán)限 try { zooKeeper.create("/mall","abcd".getBytes(),aclList,CreateMode.PERSISTENT); System.out.println("已創(chuàng)建節(jié)點(diǎn)并設(shè)置權(quán)限"); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); }
設(shè)置權(quán)限之后,連接zkServer進(jìn)行操作時,都需要先驗(yàn)證用戶。
此處未寫對應(yīng)的異步回調(diào)。
查看acl權(quán)限
//查看acl權(quán)限 //設(shè)置權(quán)限之后,以后操作時需要先驗(yàn)證用戶,一次session中驗(yàn)證一次即可 zooKeeper.addAuthInfo("digest","chy:abcd".getBytes()); //同步方式 try { List<ACL> aclList = zooKeeper.getACL("/mall", null); System.out.println("acl權(quán)限:"+aclList); } catch (KeeperException | InterruptedException e) { System.out.println("獲取acl權(quán)限失敗"); e.printStackTrace(); } //異步回調(diào) zooKeeper.getACL("/mall3", null, new AsyncCallback.ACLCallback() { //第二個起:path、ctx、獲取到的List<ACL>、節(jié)點(diǎn)狀態(tài) public void processResult(int i, String s, Object o, List<ACL> list, Stat stat) { //就算沒有手動設(shè)置acl權(quán)限,默認(rèn)也是有值的 System.out.println("acl權(quán)限:"+list); } //ctx實(shí)參 },null);
添加監(jiān)聽器
//添加監(jiān)聽 方式一 try { CountDownLatch countDownLatch = new CountDownLatch(1); zooKeeper.getData("/mall", new Watcher() { public void process(WatchedEvent watchedEvent) { //watcher會監(jiān)聽該節(jié)點(diǎn)所有的事件,不管發(fā)生什么事件都會調(diào)用process()來處理,需要先判斷一下事件類型 if (watchedEvent.getType().equals(Event.EventType.NodeDataChanged)){ System.out.println("節(jié)點(diǎn)數(shù)據(jù)改變了"); //會一直監(jiān)聽,如果只監(jiān)聽一次數(shù)據(jù)改變,將下面這句代碼取消注釋即可 //countDownLatch.countDown(); } } }, null); //默認(rèn)watcher是一次性的,如果要一直監(jiān)聽,需要借助CountDownLatch countDownLatch.await(); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); }
ZooKeeper類的exists()、getData()、getChildren()方法都具有添加監(jiān)聽的功能,用法類似。
watchedEvent.getType().equals(Event.EventType.NodeDataChanged)
watchedEvent.getState().equals(Event.KeeperState.SyncConnected)
getType是獲取事件類型,getState是獲取連接狀態(tài)。
上面這種方式,會遞歸監(jiān)聽子孫節(jié)點(diǎn),子孫節(jié)點(diǎn)的數(shù)據(jù)改變也算NodeDataChanged,子孫節(jié)點(diǎn)的創(chuàng)建|刪除也算NodeCreated|NodeDeleted。
//添加監(jiān)聽 方式二 try { CountDownLatch countDownLatch1 = new CountDownLatch(1); zooKeeper.addWatch("/mall", new Watcher() { @Override public void process(WatchedEvent watchedEvent) { if (watchedEvent.getType().equals(Event.EventType.NodeDataChanged)){ System.out.println("節(jié)點(diǎn)數(shù)據(jù)改變了"); //如果只監(jiān)聽一次數(shù)據(jù)改變,將下面這句代碼注釋掉 //countDownLatch1.countDown(); } } //監(jiān)聽模式,PERSISTENT是不監(jiān)聽子孫節(jié)點(diǎn),PERSISTENT_RECURSIVE是遞歸監(jiān)聽子孫節(jié)點(diǎn) }, AddWatchMode.PERSISTENT_RECURSIVE); countDownLatch1.await(); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); }
countDownLatch1.await();要阻塞線程,最好啟動一條新線程來監(jiān)聽。
只有設(shè)置了監(jiān)聽的zkCli,該節(jié)點(diǎn)發(fā)生事件時才會收到zkServer的通知。
watch只保存在zkServer的內(nèi)存中(zk依賴jdk,運(yùn)行在jvm上,堆中的session對象),不持久化到硬盤,就是說設(shè)置的監(jiān)聽只在本次會話期間有效,zkCli關(guān)閉連接,zkServer在指定時間后(默認(rèn)連續(xù)沒有收到10個心跳),zkServer會自動刪除相關(guān)session,watcher丟失。
移除監(jiān)聽
//移除監(jiān)聽 方式一 try { zooKeeper.addWatch("/mall",null,AddWatchMode.PERSISTENT); System.out.println("已移除監(jiān)聽"); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); }
就是上面添加監(jiān)聽的哪些方法,watch|watcher參數(shù),如果是boolean類型,設(shè)置為false即可關(guān)閉監(jiān)聽;如果是Watcher類型,可以設(shè)置null覆蓋掉之前設(shè)置的監(jiān)聽。
//移除監(jiān)聽 方式二 try { //第二個參數(shù)是Watcher,原來添加的那個Watcher監(jiān)聽對象,不能是null //第三個參數(shù)指定要移除監(jiān)聽的哪部分,Any是移除整個監(jiān)聽,Data是移除對數(shù)據(jù)的監(jiān)聽,Children是移除對子節(jié)點(diǎn)的遞歸監(jiān)聽 //最后一個參數(shù)指定未連接到zkServe時,是否移除本地監(jiān)聽部分 zooKeeper.removeWatches("/mall",watcher, Watcher.WatcherType.Children,true); } catch (InterruptedException | KeeperException e) { e.printStackTrace(); }
監(jiān)聽由2部分組成,一部分在zkServer上,事件發(fā)生時通知對應(yīng)的zkCli;一部分在zkCli,收到zkServer的通知時做出一些處理。
最后一個參數(shù)指定未連接到zkServer,是否移除本地(zkCli)監(jiān)聽部分,true——移除,false——不移除。
比如說沒有連接到zkServer,移除本地監(jiān)聽,10個心跳內(nèi)連上了zkServer,zkServer的監(jiān)聽部分仍在,發(fā)生事件時仍會通知此zkCli,但zkCli本地監(jiān)聽已經(jīng)移除了,對通知不會做出處理。
第一種方式會移除整個監(jiān)聽,不需要傳入監(jiān)聽對象watcher;
第二種方式功能更全,可以指定移除監(jiān)聽的哪個部分,但需要傳入watcher對象,添加監(jiān)聽時要用一個變量來保存watcher對象。
到此這篇關(guān)于在Java中操作Zookeeper的示例代碼詳解的文章就介紹到這了,更多相關(guān)Java操作Zookeeper內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java設(shè)計模式之外觀模式(Facade模式)介紹
這篇文章主要介紹了Java設(shè)計模式之外觀模式(Facade模式)介紹,外觀模式(Facade)的定義:為子系統(tǒng)中的一組接口提供一個一致的界面,需要的朋友可以參考下2015-03-03Java軟件生產(chǎn)監(jiān)控工具Btrace使用方法詳解
這篇文章主要介紹了Java軟件生產(chǎn)監(jiān)控工具Btrace使用方法詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-07-07SpringBoot整合Mybatis使用Druid數(shù)據(jù)庫連接池
這篇文章主要介紹了SpringBoot整合Mybatis使用Druid數(shù)據(jù)庫連接池,具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-03-03