分布式框架Zookeeper?api的使用介紹
前言
Zookeeper API共包含五個(gè)包,分別為:
- org.apache.zookeeper
- org.apache.zookeeper.data
- org.apache.zookeeper.server
- org.apache.zookeeper.server.quorum
- org.apache.zookeeper.server.upgrade
其中org.apache.zookeeper,包含Zookeeper類,他是我們編程時(shí)最常?的類文件。這個(gè)類是Zookeeper客戶端的主要類文件。如果要使用Zookeeper服務(wù),應(yīng)?程序?先必須創(chuàng)建?個(gè)Zookeeper實(shí)例,這時(shí)就需要使用此類。?旦客戶端和Zookeeper服務(wù)端建立起了連接,Zookeeper系統(tǒng)將會(huì)給本次連接會(huì)話分配?個(gè)ID值,并且客戶端將會(huì)周期性的向服務(wù)器端發(fā)送心跳來(lái)維持會(huì)話連接。只要連接有效,客戶端就可以使?Zookeeper API來(lái)做相應(yīng)處理了
導(dǎo)入依賴
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.14</version>
</dependency>建立會(huì)話
package com.lagou.api;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class CreateSession implements Watcher {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
/**
* 建立會(huì)話
*/
public static void main(String[] args) throws IOException, InterruptedException {
/*
客戶端可以通過(guò)創(chuàng)建?個(gè)zk實(shí)例來(lái)連接zk服務(wù)器
new Zookeeper(connectString,sesssionTimeOut,Wather)
connectString: 連接地址:IP:端?
sesssionTimeOut:會(huì)話超時(shí)時(shí)間:?jiǎn)挝缓撩?
Wather:監(jiān)聽(tīng)器(當(dāng)特定事件觸發(fā)監(jiān)聽(tīng)時(shí),zk會(huì)通過(guò)watcher通知到客戶端)
*/
ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new CreateSession());
System.out.println(zooKeeper.getState());
// 計(jì)數(shù)工具類 CountDownLatch : 不讓main方法結(jié)束,讓線程處于等待阻塞
countDownLatch.await();
System.out.println("客戶端與服務(wù)端會(huì)話真正建立了");
}
/**
* 回調(diào)方法:處理來(lái)自服務(wù)器端的watcher通知
*/
// 當(dāng)前類實(shí)現(xiàn)了Watcher接?,重寫(xiě)了process?法,該?法負(fù)責(zé)處理來(lái)?Zookeeper服務(wù)端的watcher通知,在收到服務(wù)端發(fā)送過(guò)來(lái)的SyncConnected事件之后,解除主程序在CountDownLatch上的等待阻塞,?此,會(huì)話創(chuàng)建完畢
public void process(WatchedEvent watchedEvent) {
//當(dāng)連接創(chuàng)建了,服務(wù)端發(fā)送給客戶端SyncConnected事件
if(watchedEvent.getState() == Event.KeeperState.SyncConnected) {
// 解除主程序在CountDownLatch上的等待阻塞
countDownLatch.countDown();
}
}
}注意,ZooKeeper 客戶端和服務(wù)端會(huì)話的建立是?個(gè)異步的過(guò)程,也就是說(shuō)在程序中,構(gòu)造?法會(huì)在處理完客戶端初始化工作后立即返回,在?多數(shù)情況下,此時(shí)并沒(méi)有真正建立好?個(gè)可用的會(huì)話,在會(huì)話的生命周期中處于“CONNECTING”的狀態(tài)。當(dāng)該會(huì)話真正創(chuàng)建完畢后ZooKeeper服務(wù)端會(huì)向會(huì)話對(duì)應(yīng)的客戶端發(fā)送?個(gè)事件通知,以告知客戶端,客戶端只有在獲取這個(gè)通知之后,才算真正建立了會(huì)話。
創(chuàng)建節(jié)點(diǎn)
package com.lagou.api;
import org.apache.zookeeper.*;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class CreateNote implements Watcher {
private static ZooKeeper zooKeeper;
/**
* 建立會(huì)話
*/
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
/*
客戶端可以通過(guò)創(chuàng)建?個(gè)zk實(shí)例來(lái)連接zk服務(wù)器
new Zookeeper(connectString,sesssionTimeOut,Wather)
connectString: 連接地址:IP:端?
sesssionTimeOut:會(huì)話超時(shí)時(shí)間:?jiǎn)挝缓撩?
Wather:監(jiān)聽(tīng)器(當(dāng)特定事件觸發(fā)監(jiān)聽(tīng)時(shí),zk會(huì)通過(guò)watcher通知到客戶端)
*/
zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new CreateNote());
System.out.println(zooKeeper.getState());
Thread.sleep(Integer.MAX_VALUE);
}
// 創(chuàng)建節(jié)點(diǎn)的方法
private static void createNoteSync() throws InterruptedException, KeeperException {
/**
* path :節(jié)點(diǎn)創(chuàng)建的路徑
* data[] :節(jié)點(diǎn)創(chuàng)建要保存的數(shù)據(jù),是個(gè)byte類型的
* acl :節(jié)點(diǎn)創(chuàng)建的權(quán)限信息(4種類型)
* ANYONE_ID_UNSAFE : 表示任何?
* AUTH_IDS :此ID僅可?于設(shè)置ACL。它將被客戶機(jī)驗(yàn)證的ID替換。
* OPEN_ACL_UNSAFE :這是?個(gè)完全開(kāi)放的ACL(常?)--> world:anyone
* CREATOR_ALL_ACL :此ACL授予創(chuàng)建者身份驗(yàn)證ID的所有權(quán)限
* createMode :創(chuàng)建節(jié)點(diǎn)的類型(4種類型)
* PERSISTENT:持久節(jié)點(diǎn)
* PERSISTENT_SEQUENTIAL:持久順序節(jié)點(diǎn)
* EPHEMERAL:臨時(shí)節(jié)點(diǎn)
* EPHEMERAL_SEQUENTIAL:臨時(shí)順序節(jié)點(diǎn)
String node = zookeeper.create(path,data,acl,createMode);
*/
// 持久節(jié)點(diǎn)
String note_persistent = zooKeeper.create("/lg-persistent", "持久節(jié)點(diǎn)內(nèi)容".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// 臨時(shí)節(jié)點(diǎn)
String note_ephemeral = zooKeeper.create("/lg-ephemeral", "臨時(shí)節(jié)點(diǎn)內(nèi)容".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
// 持久順序節(jié)點(diǎn)
String note_sequential = zooKeeper.create("/lg-sequential", "持久順序節(jié)點(diǎn)內(nèi)容".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
System.out.println("創(chuàng)建的持久節(jié)點(diǎn): " + note_persistent);
System.out.println("創(chuàng)建的臨時(shí)節(jié)點(diǎn): " + note_ephemeral);
System.out.println("創(chuàng)建的持久順序節(jié)點(diǎn): " + note_sequential);
}
/**
* 回調(diào)方法:處理來(lái)自服務(wù)器端的watcher通知
*/
public void process(WatchedEvent watchedEvent) {
// SyncConnected
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
// 創(chuàng)建節(jié)點(diǎn)
try {
createNoteSync();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}獲取節(jié)點(diǎn)數(shù)據(jù)
package com.lagou.api;
import org.apache.zookeeper.*;
import java.io.IOException;
import java.util.List;
public class GetNoteData implements Watcher {
private static ZooKeeper zooKeeper;
/**
* 建立會(huì)話
*/
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
/*
客戶端可以通過(guò)創(chuàng)建?個(gè)zk實(shí)例來(lái)連接zk服務(wù)器
new Zookeeper(connectString,sesssionTimeOut,Wather)
connectString: 連接地址:IP:端?
sesssionTimeOut:會(huì)話超時(shí)時(shí)間:?jiǎn)挝缓撩?
Wather:監(jiān)聽(tīng)器(當(dāng)特定事件觸發(fā)監(jiān)聽(tīng)時(shí),zk會(huì)通過(guò)watcher通知到客戶端)
*/
zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new GetNoteData());
System.out.println(zooKeeper.getState());
Thread.sleep(Integer.MAX_VALUE);
}
/**
* 回調(diào)方法:處理來(lái)自服務(wù)器端的watcher通知
*/
public void process(WatchedEvent watchedEvent) {
/*
子節(jié)點(diǎn)列表發(fā)生改變時(shí),服務(wù)端會(huì)發(fā)送noteChildrenChanged事件通知
要重新獲取子節(jié)點(diǎn)列表,同時(shí)注意:通知是一次性的,需要反復(fù)注冊(cè)監(jiān)聽(tīng)
*/
if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {
List<String> children = null;
try {
children = zooKeeper.getChildren("/lg-persistent", true);
} catch (KeeperException | InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(children);
}
// SyncConnected
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
// 獲取節(jié)點(diǎn)數(shù)據(jù)的方法
try {
getNoteData();
// 獲取節(jié)點(diǎn)的子節(jié)點(diǎn)列表方法
getChildren();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
/*
獲取某個(gè)節(jié)點(diǎn)的內(nèi)容
*/
private void getNoteData() throws Exception {
/**
* path : 獲取數(shù)據(jù)的路徑
* watch : 是否開(kāi)啟監(jiān)聽(tīng)
* stat : 節(jié)點(diǎn)狀態(tài)信息
* null: 表示獲取最新版本的數(shù)據(jù)
* zk.getData(path, watch, stat);
*/
byte[] data = zooKeeper.getData("/lg-persistent", false, null);
System.out.println(new String(data));
}
/*
獲取某個(gè)節(jié)點(diǎn)的子節(jié)點(diǎn)列表方法
*/
public static void getChildren() throws InterruptedException, KeeperException {
/*
path:路徑
watch:是否要啟動(dòng)監(jiān)聽(tīng),當(dāng)?節(jié)點(diǎn)列表發(fā)?變化,會(huì)觸發(fā)監(jiān)聽(tīng)
zooKeeper.getChildren(path, watch);
*/
List<String> children = zooKeeper.getChildren("/lg-persistent", true);
System.out.println(children);
}
}修改節(jié)點(diǎn)數(shù)據(jù)
package com.lagou.api;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
public class UpdateNoteData implements Watcher {
private static ZooKeeper zooKeeper;
/**
* 建立會(huì)話
*/
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
/*
客戶端可以通過(guò)創(chuàng)建?個(gè)zk實(shí)例來(lái)連接zk服務(wù)器
new Zookeeper(connectString,sesssionTimeOut,Wather)
connectString: 連接地址:IP:端?
sesssionTimeOut:會(huì)話超時(shí)時(shí)間:?jiǎn)挝缓撩?
Wather:監(jiān)聽(tīng)器(當(dāng)特定事件觸發(fā)監(jiān)聽(tīng)時(shí),zk會(huì)通過(guò)watcher通知到客戶端)
*/
zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new UpdateNoteData());
System.out.println(zooKeeper.getState());
Thread.sleep(Integer.MAX_VALUE);
}
/**
* 回調(diào)方法:處理來(lái)自服務(wù)器端的watcher通知
*/
public void process(WatchedEvent watchedEvent) {
// SyncConnected
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
// 更新數(shù)據(jù)節(jié)點(diǎn)內(nèi)容的方法
try {
updateNoteSync();
} catch (InterruptedException | KeeperException e) {
throw new RuntimeException(e);
}
}
}
/*
更新數(shù)據(jù)節(jié)點(diǎn)內(nèi)容的方法
*/
private void updateNoteSync() throws InterruptedException, KeeperException {
/*
path:路徑
data:要修改的內(nèi)容 byte[]
version:為-1,表示對(duì)最新版本的數(shù)據(jù)進(jìn)?修改
zooKeeper.setData(path, data,version);
*/
byte[] data = zooKeeper.getData("/lg-persistent", false, null);
System.out.println("修改前的值:" + new String(data));
// 修改 /lg-persistent 的數(shù)據(jù) stat: 狀態(tài)信息對(duì)象
Stat stat = zooKeeper.setData("/lg-persistent", "客戶端修改了節(jié)點(diǎn)數(shù)據(jù)".getBytes(), -1);
byte[] data2 = zooKeeper.getData("/lg-persistent", false, null);
System.out.println("修改后的值:" + new String(data2));
}
}刪除節(jié)點(diǎn)
package com.lagou.api;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
public class DeleteNote implements Watcher {
private static ZooKeeper zooKeeper;
/**
* 建立會(huì)話
*/
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
/*
客戶端可以通過(guò)創(chuàng)建?個(gè)zk實(shí)例來(lái)連接zk服務(wù)器
new Zookeeper(connectString,sesssionTimeOut,Wather)
connectString: 連接地址:IP:端?
sesssionTimeOut:會(huì)話超時(shí)時(shí)間:?jiǎn)挝缓撩?
Wather:監(jiān)聽(tīng)器(當(dāng)特定事件觸發(fā)監(jiān)聽(tīng)時(shí),zk會(huì)通過(guò)watcher通知到客戶端)
*/
zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new DeleteNote());
System.out.println(zooKeeper.getState());
Thread.sleep(Integer.MAX_VALUE);
}
/**
* 回調(diào)方法:處理來(lái)自服務(wù)器端的watcher通知
*/
public void process(WatchedEvent watchedEvent) {
// SyncConnected
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
// 刪除節(jié)點(diǎn)
try {
deleteNoteSync();
} catch (InterruptedException | KeeperException e) {
throw new RuntimeException(e);
}
}
}
/*
刪除節(jié)點(diǎn)方法
*/
private void deleteNoteSync() throws InterruptedException, KeeperException {
/*
zooKeeper.exists(path,watch) :判斷節(jié)點(diǎn)是否存在
zookeeper.delete(path,version) : 刪除節(jié)點(diǎn)
*/
Stat stat = zooKeeper.exists("/lg-persistent/c1", false);
System.out.println(stat == null ? "該節(jié)點(diǎn)不存在" : "該節(jié)點(diǎn)存在");
if (stat != null) {
zooKeeper.delete("/lg-persistent/c1", -1);
}
Stat stat2 = zooKeeper.exists("/lg-persistent/c1", false);
System.out.println(stat2 == null ? "該節(jié)點(diǎn)不存在" : "該節(jié)點(diǎn)存在");
}
}到此這篇關(guān)于分布式框架Zookeeper api的使用介紹的文章就介紹到這了,更多相關(guān)Zookeeper api內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解HTTP請(qǐng)求與響應(yīng)基礎(chǔ)及實(shí)例
這篇文章主要介紹了詳解HTTP請(qǐng)求與響應(yīng)基礎(chǔ)及實(shí)例的相關(guān)資料,這里對(duì)http的請(qǐng)求和響應(yīng)進(jìn)行詳細(xì)分析并附有實(shí)現(xiàn)實(shí)例,需要的朋友可以參考下2017-07-07
Java將科學(xué)計(jì)數(shù)法數(shù)據(jù)轉(zhuǎn)為字符串的實(shí)例
下面小編就為大家?guī)?lái)一篇Java將科學(xué)計(jì)數(shù)法數(shù)據(jù)轉(zhuǎn)為字符串的實(shí)例。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2016-12-12
springboot配置多數(shù)據(jù)源并集成Druid和mybatis的操作
這篇文章主要介紹了springboot配置多數(shù)據(jù)源并集成Druid和mybatis的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-07-07
關(guān)于java中構(gòu)造函數(shù)的一些知識(shí)詳解
下面小編就為大家?guī)?lái)一篇關(guān)于java中構(gòu)造函數(shù)的一些知識(shí)詳解。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2016-12-12
Java中Optional的正確用法與爭(zhēng)議點(diǎn)詳解
這篇文章主要介紹了Java中Optional的正確用法與爭(zhēng)議點(diǎn)的相關(guān)資料,需要的朋友可以參考下2022-11-11
spring注解識(shí)別一個(gè)接口的多個(gè)實(shí)現(xiàn)類方法
下面小編就為大家?guī)?lái)一篇spring注解識(shí)別一個(gè)接口的多個(gè)實(shí)現(xiàn)類方法。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-04-04
詳解Spring Boot微服務(wù)如何集成fescar解決分布式事務(wù)問(wèn)題
這篇文章主要介紹了詳解Spring Boot微服務(wù)如何集成fescar解決分布式事務(wù)問(wèn)題,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2019-01-01

