如何使用Java操作Zookeeper
簡介
Java操作Zookeeper有很多種方式,如zookeeper、zkclient、curator等等,下面介紹下使用zkclient的方式操作Zookeeper。
Maven依賴:
<dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.11</version> </dependency>
使用zkclient操作Zookeeper
創(chuàng)建節(jié)點:
@Test
public void testCreateNode() {
//建立連接
//zkServers: Zookeeper服務(wù)器IP地址和端口號,如果是集群情況下用逗號分割多個Zookeeper服務(wù)器地址
//sessionTimeout: 會話超時時間
//connectionTimeout: 連接超時時間
ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 5000);
//創(chuàng)建數(shù)據(jù)
Student student = new Student();
student.setName("張三");
student.setAge(18);
student.setPhone("1585454xxxx");
//創(chuàng)建持久節(jié)點
zkClient.createPersistent("/p_node", student);
//創(chuàng)建持久順序節(jié)點
zkClient.createPersistentSequential("/ps_node", student);
//創(chuàng)建臨時節(jié)點
zkClient.createEphemeral("/e_node", student);
//創(chuàng)建臨時順序節(jié)點
zkClient.createEphemeralSequential("/ps_node", student);
//關(guān)閉客戶端
//關(guān)閉客戶端的同時,前面創(chuàng)建的臨時節(jié)點也會被刪除
zkClient.close();
}
讀取節(jié)點數(shù)據(jù):
@Test
public void testReadNodeData() {
ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 5000);
Stat stat = new Stat();
Student student = zkClient.readData("/p_node", stat);
System.out.println("節(jié)點狀態(tài)信息:" + JSON.toJSONString(student));
System.out.println("節(jié)點數(shù)據(jù):" + JSON.toJSONString(stat));
zkClient.close();
}
//輸出結(jié)果:
//節(jié)點狀態(tài)信息:{"age":18,"name":"張三","phone":"1585454xxxx"}
//節(jié)點數(shù)據(jù):{"aversion":0,"ctime":1619165355431,"cversion":0,"czxid":165,"dataLength":260,"ephemeralOwner":0,"mtime":1619165355431,"mzxid":165,"numChildren":0,"pzxid":165,"version":0}
刪除節(jié)點:
@Test
public void testDeleteNode() {
ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 5000);
//刪除節(jié)點,不能刪除帶有子節(jié)點的節(jié)點
zkClient.delete("/p_node");
//刪除節(jié)點,遞歸刪除所有子孫節(jié)點
zkClient.deleteRecursive("/p_node2");
zkClient.close();
}
更新節(jié)點數(shù)據(jù):
@Test
public void testWriteNodeData() {
ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 5000);
//更新節(jié)點數(shù)據(jù)
zkClient.writeData("/p_node", "myData1");
//使用CAS更新節(jié)點數(shù)據(jù)
//zkClient.writeData("/p_node", "myData2", 1);
zkClient.close();
}
獲取子節(jié)點列表:
@Test
public void testGetChildNodes() {
ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 5000);
//獲取子節(jié)點列表
List<String> childList = zkClient.getChildren("/p_node");
childList.stream().forEach(System.out::println);
zkClient.close();
}
節(jié)點監(jiān)聽
Java節(jié)點監(jiān)聽都是永久的,觸發(fā)一次后不會被刪除。
監(jiān)聽節(jié)點的子節(jié)點變化:
@Test
public void testSubscribeChildChanges() throws IOException {
ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 5000);
zkClient.subscribeChildChanges("/p_node", new IZkChildListener(){
//子節(jié)點改變時調(diào)用
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
System.out.println(parentPath + "子節(jié)點發(fā)生改變");
System.out.println("當(dāng)前子節(jié)點列表:" + currentChilds);
}
});
//阻塞客戶端,便于測試
System.in.read();
}
監(jiān)聽節(jié)點數(shù)據(jù)變化:
@Test
public void testSubscribeDataChanges() throws IOException {
ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 5000);
zkClient.subscribeDataChanges("/p_node", new IZkDataListener(){
//節(jié)點數(shù)據(jù)改變時調(diào)用
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.println(dataPath + "節(jié)點數(shù)據(jù)發(fā)生變化");
System.out.println("修改后的數(shù)據(jù)為:" + data.toString());
}
//節(jié)點被刪除時調(diào)用
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println(dataPath + "節(jié)點已被刪除");
}
});
//阻塞客戶端,便于測試
System.in.read();
}
監(jiān)聽Zookeeper連接狀態(tài)變化:
@Test
public void testSubscribeStateChanges() throws IOException {
ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 5000);
zkClient.subscribeStateChanges(new IZkStateListener(){
//當(dāng)zookeeper連接狀態(tài)改變時調(diào)用
@Override
public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
System.out.println("當(dāng)前狀態(tài)" + state);
}
//在zookeeper會話過期并且創(chuàng)建了一個新的會話之后調(diào)用
@Override
public void handleNewSession() throws Exception {
System.out.println("會話過期,已創(chuàng)建新的會話");
}
//當(dāng)會話不能重新建立時調(diào)用
@Override
public void handleSessionEstablishmentError(Throwable error) throws Exception {
error.printStackTrace();
}
});
//阻塞客戶端,便于測試
System.in.read();
}
以上就是如何使用Java操作Zookeeper的詳細內(nèi)容,更多關(guān)于Java操作Zookeeper的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Springboot初始化啟動報錯Error?creating?bean?with?name?'da
這篇文章主要為大家介紹了Springboot初始化啟動報Error?creating?bean?with?name?'dataSource'?defined?in?class?path?resource解決,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-08-08
基于SpringBoot+Redis實現(xiàn)一個簡單的限流器
在Spring?Boot中使用Redis和過濾器實現(xiàn)請求限流,過濾器將在每個請求到達時檢查請求頻率,并根據(jù)設(shè)定的閾值進行限制,這樣可以保護您的應(yīng)用程序免受惡意請求或高并發(fā)請求的影響,本文我們通過Spring?Boot?+Redis?實現(xiàn)一個輕量級的消息隊列,需要的朋友可以參考下2023-08-08

