在Java中操作Zookeeper的示例代碼詳解
依賴
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.0</version>
</dependency>
連接到zkServer
//連接字符串,zkServer的ip、port,如果是集群逗號分隔
String connectStr = "192.168.1.9:2181";
//zookeeper就是一個zkCli
ZooKeeper zooKeeper = null;
try {
//初始次數(shù)為1。后面要在內(nèi)部類中使用,三種寫法:1、寫成外部類成員變量,不用加final;2、作為函數(shù)局部變量,放在try外面,寫成final;3、寫在try中,不加final
CountDownLatch countDownLatch = new CountDownLatch(1);
//超時時間ms,監(jiān)聽器
zooKeeper = new ZooKeeper(connectStr, 5000, new Watcher() {
public void process(WatchedEvent watchedEvent) {
//如果狀態(tài)變成已連接
if (watchedEvent.getState().equals(Event.KeeperState.SyncConnected)) {
System.out.println("連接成功");
//次數(shù)-1
countDownLatch.countDown();
}
}
});
//等待,次數(shù)為0時才會繼續(xù)往下執(zhí)行。等待監(jiān)聽器監(jiān)聽到連接成功,才能操作zk
countDownLatch.await();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
//...操作zk。后面的demo都是寫在此處的
//關(guān)閉連接
try {
zooKeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
檢測節(jié)點是否存在
// 檢測節(jié)點是否存在
// 同步方式
Stat exists = null;
try {
//如果存在,返回節(jié)點狀態(tài)Stat;如果不存在,返回null。第二個參數(shù)是watch
exists = zooKeeper.exists("/mall",false);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
if (exists==null){
System.out.println("節(jié)點不存在");
}
else {
System.out.println("節(jié)點存在");
}
//異步回調(diào)
zooKeeper.exists("/mall",false, new AsyncCallback.StatCallback() {
//第二個是path znode路徑,第三個是ctx 后面?zhèn)魅雽崊?,第四個是znode的狀態(tài)
public void processResult(int i, String s, Object o, Stat stat) {
//如果節(jié)點不存在,返回的stat是null
if (stat==null){
System.out.println("節(jié)點不存在");
}
else{
System.out.println("節(jié)點存在");
}
}
// 傳入ctx,Object類型
},null);
操作后,服務(wù)端會返回處理結(jié)果,返回void、null也算處理結(jié)果。
同步指的是當(dāng)前線程阻塞,等待服務(wù)端返回數(shù)據(jù),收到返回的數(shù)據(jù)才繼續(xù)往下執(zhí)行;
異步回調(diào)指的是,把對結(jié)果(返回的數(shù)據(jù))的處理寫在回調(diào)函數(shù)中,當(dāng)前線程不等待返回的數(shù)據(jù),繼續(xù)往下執(zhí)行,收到返回的數(shù)據(jù)時自動調(diào)用回調(diào)函數(shù)來處理。
如果處理返回數(shù)據(jù)的代碼之后的操作,不依賴返回數(shù)據(jù)、對返回數(shù)據(jù)的處理,那么可以把返回數(shù)據(jù)的處理寫成回調(diào)函數(shù)。
創(chuàng)建節(jié)點
//創(chuàng)建節(jié)點
//同步方式
try {
//數(shù)據(jù)要寫成byte[],不攜帶數(shù)據(jù)寫成null;默認(rèn)acl權(quán)限使用ZooDefs.Ids.OPEN_ACL_UNSAFE;最后一個是節(jié)點類型,P是永久,E是臨時,S是有序
zooKeeper.create("/mall", "abcd".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("已創(chuàng)建節(jié)點/mall");
//如果節(jié)點已存在,會拋出異常
} catch (KeeperException | InterruptedException e) { System.out.println("創(chuàng)建節(jié)點/mall失敗,請檢查節(jié)點是否已存在");
e.printStackTrace();
}
//異步回調(diào)
zooKeeper.create("/mall", "abcd".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new AsyncCallback.Create2Callback(){
//第二個path,第三個ctx,第四個節(jié)點狀態(tài)
public void processResult(int i, String s, Object o, String s1, Stat stat) {
//回調(diào)方式不拋出異常,返回的stat是創(chuàng)建節(jié)點的狀態(tài),如果節(jié)點已存在,返回的stat是null
if (stat==null){
System.out.println("創(chuàng)建節(jié)點/mall失敗,請檢查節(jié)點是否已存在");
}
else {
System.out.println("節(jié)點創(chuàng)建成功");
}
}
//ctx實參
},null);
刪除節(jié)點
//刪除節(jié)點
//同步方式
try {
//第二個參數(shù)是版本號,-1表示可以是任何版本
zooKeeper.delete("/mall1",-1);
System.out.println("成功刪除節(jié)點/mall");
} catch (InterruptedException | KeeperException e) {
System.out.println("刪除節(jié)點/mall失敗");
e.printStackTrace();
}
//異步回調(diào)
zooKeeper.delete("/mall2", -1, new AsyncCallback.VoidCallback() {
//第二個是path,第三個是ctx
public void processResult(int i, String s, Object o) {
}
//
//ctx實參
},null);
delete()只能刪除沒有子節(jié)點的znode,如果該znode有子節(jié)點會拋出異常。
沒有提供遞歸刪除子節(jié)點的方法,如果要刪除帶有子節(jié)點的znode,需要自己實現(xiàn)遞歸刪除??梢韵萭etChildren()獲取子節(jié)點列表,遍歷列表依次刪除子節(jié)點,再刪除父節(jié)點。
獲取子節(jié)點列表
//獲取子節(jié)點列表,List<String>,比如/mall/user,/mall/order,返回的是["user"、"order"]
//同步方式
List<String> children = null;
try {
//第二個參數(shù)是watch
children = zooKeeper.getChildren("/mall", false);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
System.out.println("子節(jié)點列表:" + children);
//異步
zooKeeper.getChildren("/mall", false, new AsyncCallback.ChildrenCallback() {
//第二個起依次是:path、ctx、返回的子節(jié)點列表
public void processResult(int i, String s, Object o, List<String> list) {
System.out.println("子節(jié)點列表:" + list);
}
//ctx實參
}, null);
只獲取子節(jié)點,不獲取孫節(jié)點。
watch都是:可以寫boolean,要添加監(jiān)聽就寫true,不監(jiān)聽寫false;也可以寫Watcher對象,new一個Watcher對象表示要監(jiān)聽,null表示不監(jiān)聽。
獲取節(jié)點數(shù)據(jù)
//獲取節(jié)點數(shù)據(jù),返回byte[]
//同步方式
byte[] data = null;
try {
//第二個參數(shù)是watch,第三個是stat
data = zooKeeper.getData("/mall", false, null);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
//調(diào)用new String()時要判斷data是否為null,如果是null會拋NPE
if (data==null){
System.out.println("該節(jié)點沒有數(shù)據(jù)");
}
else{
System.out.println("節(jié)點數(shù)據(jù):"+new String(data));
}
//異步回調(diào)
zooKeeper.getData("/mall", false, new AsyncCallback.DataCallback() {
//第二個起依次是:path、ctx、返回的節(jié)點數(shù)據(jù)、節(jié)點狀態(tài)
public void processResult(int i, String s, Object o, byte[] bytes, Stat stat) {
//不必判斷bytes是否是null,如果節(jié)點沒有數(shù)據(jù),不會調(diào)用回調(diào)函數(shù);執(zhí)行到此,說明bytes不是null
System.out.println("節(jié)點數(shù)據(jù):" + new String(bytes) );
}
//ctx實參
}, null);
設(shè)置|修改節(jié)點數(shù)據(jù)
//設(shè)置|更新節(jié)點據(jù)
//同步方式
try {
//最后一個參數(shù)是版本號
zooKeeper.setData("/mall", "1234".getBytes(), -1);
System.out.println("設(shè)置節(jié)點數(shù)據(jù)成功");
} catch (KeeperException | InterruptedException e) {
System.out.println("設(shè)置節(jié)點數(shù)據(jù)失敗");
e.printStackTrace();
}
//異步回調(diào)
zooKeeper.setData("/mall", "1234".getBytes(), -1, new AsyncCallback.StatCallback() {
//第二個是path,第三個是ctx
public void processResult(int i, String s, Object o, Stat stat) {
}
// ctx
},null);
設(shè)置acl權(quán)限
//設(shè)置acl權(quán)限
//第一個參數(shù)指定權(quán)限,第二個是Id對象
ACL acl = new ACL(ZooDefs.Perms.ALL, new Id("auth", "chy:abcd"));
List<ACL> aclList = new ArrayList<>();
aclList.add(acl);
//如果List中只有一個ACL對象,也可以這樣寫
//List<ACL> aclList = Collections.singletonList(auth);
//驗證權(quán)限,需寫在設(shè)置權(quán)限之前。如果之前沒有設(shè)置權(quán)限,也需要先驗證本次即將設(shè)置的用戶
zooKeeper.addAuthInfo("digest","chy:abcd".getBytes());
//方式一 setAcl
try {
//第二個參數(shù)是List<ACL>,第三個參數(shù)是版本號
zooKeeper.setACL("/mall", aclList, -1);
System.out.println("設(shè)置權(quán)限成功");
} catch (KeeperException | InterruptedException e) {
System.out.println("設(shè)置權(quán)限失敗");
e.printStackTrace();
}
//方式二 在創(chuàng)建節(jié)點時設(shè)置權(quán)限
try {
zooKeeper.create("/mall","abcd".getBytes(),aclList,CreateMode.PERSISTENT);
System.out.println("已創(chuàng)建節(jié)點并設(shè)置權(quán)限");
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
設(shè)置權(quán)限之后,連接zkServer進(jìn)行操作時,都需要先驗證用戶。
此處未寫對應(yīng)的異步回調(diào)。
查看acl權(quán)限
//查看acl權(quán)限
//設(shè)置權(quán)限之后,以后操作時需要先驗證用戶,一次session中驗證一次即可
zooKeeper.addAuthInfo("digest","chy:abcd".getBytes());
//同步方式
try {
List<ACL> aclList = zooKeeper.getACL("/mall", null);
System.out.println("acl權(quán)限:"+aclList);
} catch (KeeperException | InterruptedException e) {
System.out.println("獲取acl權(quán)限失敗");
e.printStackTrace();
}
//異步回調(diào)
zooKeeper.getACL("/mall3", null, new AsyncCallback.ACLCallback() {
//第二個起:path、ctx、獲取到的List<ACL>、節(jié)點狀態(tài)
public void processResult(int i, String s, Object o, List<ACL> list, Stat stat) {
//就算沒有手動設(shè)置acl權(quán)限,默認(rèn)也是有值的
System.out.println("acl權(quán)限:"+list);
}
//ctx實參
},null);
添加監(jiān)聽器
//添加監(jiān)聽 方式一
try {
CountDownLatch countDownLatch = new CountDownLatch(1);
zooKeeper.getData("/mall", new Watcher() {
public void process(WatchedEvent watchedEvent) {
//watcher會監(jiān)聽該節(jié)點所有的事件,不管發(fā)生什么事件都會調(diào)用process()來處理,需要先判斷一下事件類型
if (watchedEvent.getType().equals(Event.EventType.NodeDataChanged)){
System.out.println("節(jié)點數(shù)據(jù)改變了");
//會一直監(jiān)聽,如果只監(jiān)聽一次數(shù)據(jù)改變,將下面這句代碼取消注釋即可
//countDownLatch.countDown();
}
}
}, null);
//默認(rèn)watcher是一次性的,如果要一直監(jiān)聽,需要借助CountDownLatch
countDownLatch.await();
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
ZooKeeper類的exists()、getData()、getChildren()方法都具有添加監(jiān)聽的功能,用法類似。
watchedEvent.getType().equals(Event.EventType.NodeDataChanged)
watchedEvent.getState().equals(Event.KeeperState.SyncConnected)
getType是獲取事件類型,getState是獲取連接狀態(tài)。
上面這種方式,會遞歸監(jiān)聽子孫節(jié)點,子孫節(jié)點的數(shù)據(jù)改變也算NodeDataChanged,子孫節(jié)點的創(chuàng)建|刪除也算NodeCreated|NodeDeleted。
//添加監(jiān)聽 方式二
try {
CountDownLatch countDownLatch1 = new CountDownLatch(1);
zooKeeper.addWatch("/mall", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getType().equals(Event.EventType.NodeDataChanged)){
System.out.println("節(jié)點數(shù)據(jù)改變了");
//如果只監(jiān)聽一次數(shù)據(jù)改變,將下面這句代碼注釋掉
//countDownLatch1.countDown();
}
}
//監(jiān)聽模式,PERSISTENT是不監(jiān)聽子孫節(jié)點,PERSISTENT_RECURSIVE是遞歸監(jiān)聽子孫節(jié)點
}, AddWatchMode.PERSISTENT_RECURSIVE);
countDownLatch1.await();
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
countDownLatch1.await();要阻塞線程,最好啟動一條新線程來監(jiān)聽。
只有設(shè)置了監(jiān)聽的zkCli,該節(jié)點發(fā)生事件時才會收到zkServer的通知。
watch只保存在zkServer的內(nèi)存中(zk依賴jdk,運(yùn)行在jvm上,堆中的session對象),不持久化到硬盤,就是說設(shè)置的監(jiān)聽只在本次會話期間有效,zkCli關(guān)閉連接,zkServer在指定時間后(默認(rèn)連續(xù)沒有收到10個心跳),zkServer會自動刪除相關(guān)session,watcher丟失。
移除監(jiān)聽
//移除監(jiān)聽 方式一
try {
zooKeeper.addWatch("/mall",null,AddWatchMode.PERSISTENT);
System.out.println("已移除監(jiān)聽");
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
就是上面添加監(jiān)聽的哪些方法,watch|watcher參數(shù),如果是boolean類型,設(shè)置為false即可關(guān)閉監(jiān)聽;如果是Watcher類型,可以設(shè)置null覆蓋掉之前設(shè)置的監(jiān)聽。
//移除監(jiān)聽 方式二
try {
//第二個參數(shù)是Watcher,原來添加的那個Watcher監(jiān)聽對象,不能是null
//第三個參數(shù)指定要移除監(jiān)聽的哪部分,Any是移除整個監(jiān)聽,Data是移除對數(shù)據(jù)的監(jiān)聽,Children是移除對子節(jié)點的遞歸監(jiān)聽
//最后一個參數(shù)指定未連接到zkServe時,是否移除本地監(jiān)聽部分
zooKeeper.removeWatches("/mall",watcher, Watcher.WatcherType.Children,true);
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
}
監(jiān)聽由2部分組成,一部分在zkServer上,事件發(fā)生時通知對應(yīng)的zkCli;一部分在zkCli,收到zkServer的通知時做出一些處理。
最后一個參數(shù)指定未連接到zkServer,是否移除本地(zkCli)監(jiān)聽部分,true——移除,false——不移除。
比如說沒有連接到zkServer,移除本地監(jiān)聽,10個心跳內(nèi)連上了zkServer,zkServer的監(jiān)聽部分仍在,發(fā)生事件時仍會通知此zkCli,但zkCli本地監(jiān)聽已經(jīng)移除了,對通知不會做出處理。
第一種方式會移除整個監(jiān)聽,不需要傳入監(jiān)聽對象watcher;
第二種方式功能更全,可以指定移除監(jiān)聽的哪個部分,但需要傳入watcher對象,添加監(jiān)聽時要用一個變量來保存watcher對象。
到此這篇關(guān)于在Java中操作Zookeeper的示例代碼詳解的文章就介紹到這了,更多相關(guān)Java操作Zookeeper內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java設(shè)計模式之外觀模式(Facade模式)介紹
這篇文章主要介紹了Java設(shè)計模式之外觀模式(Facade模式)介紹,外觀模式(Facade)的定義:為子系統(tǒng)中的一組接口提供一個一致的界面,需要的朋友可以參考下2015-03-03
Java軟件生產(chǎn)監(jiān)控工具Btrace使用方法詳解
這篇文章主要介紹了Java軟件生產(chǎn)監(jiān)控工具Btrace使用方法詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-07-07
SpringBoot整合Mybatis使用Druid數(shù)據(jù)庫連接池
這篇文章主要介紹了SpringBoot整合Mybatis使用Druid數(shù)據(jù)庫連接池,具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-03-03

