Java zookeeper服務(wù)的使用詳解
Java語(yǔ)言客戶端使用zookeeper
下載zookeeper連接工具,方便我們查看zookeeper存的數(shù)據(jù)。下載地址:
https://pan.baidu.com/s/1UG5_VcYUZUYUkg04QROLYg?pwd=3ych 提取碼: 3ych
下載后解壓就可以使用了:

使用頁(yè)面:

Java語(yǔ)言連接z00keeper
首先引入maven 依賴jar包
<dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.9</version> </dependency>
編寫Java代碼
public class Test001 {
private static final String ADDRES = "127.0.0.1:2181";
private static final int TIMAOUT = 5000;
//計(jì)數(shù)器
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
//zk核心節(jié)點(diǎn)+事件通知
//節(jié)點(diǎn)路徑和界定啊value
/**
* 參數(shù)一:連接地址
* 參數(shù)二:zk超時(shí)時(shí)間
* 參數(shù)三:事件通知
*/
//1、創(chuàng)建zk鏈接
ZooKeeper zooKeeper = new ZooKeeper(ADDRES, TIMAOUT, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
Event.KeeperState state = watchedEvent.getState();
if(state == Event.KeeperState.SyncConnected){
System.out.println("zk鏈接成功");
countDownLatch.countDown(); //計(jì)數(shù)器減 1
}
}
});
//計(jì)數(shù)器結(jié)果必須是為0 才能繼續(xù)執(zhí)行
System.out.println("zk正在等待連接");
countDownLatch.await();
System.out.println("開(kāi)始創(chuàng)建我們的連接");
//2、創(chuàng)建我們的節(jié)點(diǎn)
/**
* 參數(shù)一:路徑名稱
* 參數(shù)二:節(jié)點(diǎn)value
* 參數(shù)三:節(jié)點(diǎn)權(quán)限acl
* 蠶食四:節(jié)點(diǎn)類型 臨時(shí)和永久
*/
String s = zooKeeper.create("/kaico/one", "hello,boy".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(s);
zooKeeper.close();
}
}可以利用連接工具查看操作結(jié)果。
zooKeeper類有很多api操作節(jié)點(diǎn),可以創(chuàng)建、刪除。
zookeeper Javaapi文檔: 點(diǎn)擊查看
四種節(jié)點(diǎn)類型
第一種:臨時(shí)節(jié)點(diǎn):會(huì)話關(guān)閉之后,就自動(dòng)消失 CreateMode.PERSISTENT_SEQUENTIAL
第二種:臨時(shí)有序節(jié)點(diǎn) CreateMode.EPHEMERAL
第三種:持久節(jié)點(diǎn):會(huì)話關(guān)閉之后,持久化到硬盤 CreateMode.PERSISTENT
第四種:持久有序節(jié)點(diǎn) CreateMode.PERSISTENT_SEQUENTIAL
ACL權(quán)限
ACL權(quán)限模型,實(shí)際上就是對(duì)樹(shù)每個(gè)節(jié)點(diǎn)實(shí)現(xiàn)控制.
身份的認(rèn)證有4種方式:
world:默認(rèn)方式,相當(dāng)于全世界都能訪問(wèn).
auth:代表已經(jīng)認(rèn)證通過(guò)的用戶(cli中可以通過(guò)addauth digest user:pwd來(lái)添加當(dāng)前上下文中的授權(quán)用戶).
digest:即用戶名:密碼這種方式認(rèn)證,這也是業(yè)務(wù)系統(tǒng)中最常用的.
ip:使用lp地址認(rèn)證。
代碼案例:使用賬號(hào)密碼實(shí)現(xiàn)權(quán)限控制
1、添加有權(quán)限控制的節(jié)點(diǎn)數(shù)據(jù)
public class Test002 {
private static final String ADDRES = "127.0.0.1:2181";
private static final int TIMAOUT = 5000;
//計(jì)數(shù)器
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException {
/**
* 參數(shù)一:連接地址
* 參數(shù)二:zk超時(shí)時(shí)間
* 參數(shù)三:事件通知
*/
//1、創(chuàng)建zk鏈接
ZooKeeper zooKeeper = new ZooKeeper(ADDRES, TIMAOUT, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
Event.KeeperState state = watchedEvent.getState();
if(state == Event.KeeperState.SyncConnected){
System.out.println("zk鏈接成功");
countDownLatch.countDown(); //計(jì)數(shù)器減 1
}
}
});
//計(jì)數(shù)器結(jié)果必須是為0 才能繼續(xù)執(zhí)行
System.out.println("zk正在等待連接");
countDownLatch.await();
System.out.println("開(kāi)始創(chuàng)建我們的連接");
//創(chuàng)建賬號(hào) admin 可以實(shí)現(xiàn)讀寫操作
Id admin = new Id("digest", DigestAuthenticationProvider.generateDigest("admin:admin123"));
ACL acl1 = new ACL(ZooDefs.Perms.ALL, admin);
//創(chuàng)建賬號(hào) guest 只允許做讀操作
Id guest = new Id("digest", DigestAuthenticationProvider.generateDigest("guest:guest123"));
ACL acl2 = new ACL(ZooDefs.Perms.READ, guest);
ArrayList<ACL> acls = new ArrayList<>();
acls.add(acl1);
acls.add(acl2);
//2、創(chuàng)建我們的節(jié)點(diǎn)
/**
* 參數(shù)一:路徑名稱
* 參數(shù)二:節(jié)點(diǎn)value
* 參數(shù)三:節(jié)點(diǎn)權(quán)限acl
* 蠶食四:節(jié)點(diǎn)類型 臨時(shí)和永久
*/
String s = zooKeeper.create("/kaico/acl", "hello,boy".getBytes(), acls, CreateMode.PERSISTENT);
System.out.println(s);
zooKeeper.close();
}
}2、獲取設(shè)置了權(quán)限的節(jié)點(diǎn)數(shù)據(jù)
public class Test003 {
private static final String ADDRES = "127.0.0.1:2181";
private static final int TIMAOUT = 5000;
//計(jì)數(shù)器
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException {
/**
* 參數(shù)一:連接地址
* 參數(shù)二:zk超時(shí)時(shí)間
* 參數(shù)三:事件通知
*/
//1、創(chuàng)建zk鏈接
ZooKeeper zooKeeper = new ZooKeeper(ADDRES, TIMAOUT, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
Event.KeeperState state = watchedEvent.getState();
if(state == Event.KeeperState.SyncConnected){
System.out.println("zk鏈接成功");
countDownLatch.countDown(); //計(jì)數(shù)器減 1
}
}
});
//計(jì)數(shù)器結(jié)果必須是為0 才能繼續(xù)執(zhí)行
System.out.println("zk正在等待連接");
countDownLatch.await();
System.out.println("開(kāi)始創(chuàng)建我們的連接");
//設(shè)置一下zookeeper 的賬號(hào)才有權(quán)限獲取內(nèi)容
zooKeeper.addAuthInfo("digest", "guest:guest123".getBytes());
//獲取節(jié)點(diǎn)的內(nèi)容
byte[] data = zooKeeper.getData("/kaico/acl", null, new Stat());
System.out.println(new String(data));
zooKeeper.close();
}
}實(shí)現(xiàn)事件監(jiān)聽(tīng)通知
Zookeeper實(shí)現(xiàn)基本的總結(jié):類似于文件存儲(chǔ)系統(tǒng),可以幫助我們解決分布式領(lǐng)域中遇到問(wèn)題
Zookeeper分布式協(xié)調(diào)工具
特征:
- 定義的節(jié)點(diǎn)包含key (路徑)和 value ,路徑不允許有重復(fù)保證唯一性.
- Zookeeper分為四種類型持久、持久序號(hào)、臨時(shí)、臨時(shí)序號(hào).
- 持久與臨時(shí)節(jié)點(diǎn)區(qū)別:連接如果一旦關(guān)閉,當(dāng)前的節(jié)點(diǎn)自動(dòng)刪除;
- 事件通知監(jiān)聽(tīng)節(jié)點(diǎn)發(fā)生的變化刪除、修改、子節(jié)點(diǎn)
Java代碼案例:
public class Test004 {
private static final String ADDRES = "127.0.0.1:2181";
private static final int TIMAOUT = 5000;
//計(jì)數(shù)器
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException {
//1、創(chuàng)建zk 連接
ZkClient zkClient = new ZkClient(ADDRES, TIMAOUT);
String parentPath = "/kaico/jing";
//2、監(jiān)聽(tīng)節(jié)點(diǎn)發(fā)生的變化,監(jiān)聽(tīng)子節(jié)點(diǎn)是否發(fā)生變化,如果發(fā)生變化都可以獲取到回調(diào)通知。
// zkClient.subscribeChildChanges(parentPath, new IZkChildListener() {
// @Override
// public void handleChildChange(String s, List<String> list) throws Exception {
// System.out.println("s:" + s + ",節(jié)點(diǎn)發(fā)生了變化");
// list.forEach((t)->{
// System.out.println("子節(jié)點(diǎn):" + t);
// });
// }
// });
//監(jiān)聽(tīng)節(jié)點(diǎn)的內(nèi)容是否發(fā)生變化或刪除
zkClient.subscribeDataChanges(parentPath, new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
System.out.println("修改的節(jié)點(diǎn)為:" + s + ",修改之后的值:" + o);
}
@Override
public void handleDataDeleted(String s) throws Exception {
System.out.println("節(jié)點(diǎn):" + s + "被刪除");
}
});
//修改值內(nèi)容
zkClient.writeData(parentPath, "666666666666666");
while (true){
}
// zkClient.close();
}
}微服務(wù)使用zookeeper作為注冊(cè)中心
調(diào)用接口邏輯圖

使用zookeeper實(shí)現(xiàn)邏輯:
根據(jù)服務(wù)提供方的名稱創(chuàng)建對(duì)應(yīng)的節(jié)點(diǎn),服務(wù)提供方的接口所有的ip+端口作為子節(jié)點(diǎn)的value的值,這樣服務(wù)調(diào)用方根據(jù)服務(wù)提供方的名稱在zookeeper上找到對(duì)應(yīng)的ip+端口從而可以調(diào)用對(duì)應(yīng)的接口,再監(jiān)聽(tīng)該節(jié)點(diǎn),如果提供接口的機(jī)器發(fā)生宕機(jī)于zookeeper斷開(kāi)連接,子節(jié)點(diǎn)也相應(yīng)的減少了,服務(wù)調(diào)用方也會(huì)收到通知。
分布式鎖
分布式鎖的概念:解決再多個(gè)jvm中最終只能有一個(gè)jvm 執(zhí)行。
zookeeper實(shí)現(xiàn)分布式鎖的思路:
節(jié)點(diǎn)保證唯一、事件通知、臨時(shí)節(jié)點(diǎn)(生命周期和Session會(huì)關(guān)聯(lián))﹒
創(chuàng)建分布式鎖原理:
1.多個(gè)jvm同時(shí)在Zookeeper 上創(chuàng)建相同的臨時(shí)節(jié)點(diǎn)(lockPath).
2. 因?yàn)榕R時(shí)節(jié)點(diǎn)路徑保證唯一的性,只要誰(shuí)能夠創(chuàng)建成功誰(shuí)就能夠獲取鎖,就可以開(kāi)始執(zhí)
行業(yè)務(wù)邏輯;,
3.如果節(jié)點(diǎn)已經(jīng)給其他請(qǐng)求創(chuàng)建的話或者是創(chuàng)建節(jié)點(diǎn)失敗,當(dāng)前的請(qǐng)求實(shí)現(xiàn)等待;
釋放鎖的原理
因?yàn)槲覀儾捎门R時(shí)節(jié)點(diǎn),當(dāng)前節(jié)點(diǎn)創(chuàng)建成功,表示獲取鎖成功;正常執(zhí)行完業(yè)務(wù)邏輯調(diào)用Session關(guān)閉連接方法,當(dāng)前的節(jié)點(diǎn)會(huì)刪除;----釋放鎖
其他正在等待請(qǐng)求,采用事件監(jiān)聽(tīng)如果當(dāng)前節(jié)點(diǎn)被刪除的話,又重新進(jìn)入到獲取鎖流程;
臨時(shí)節(jié)點(diǎn)+事件通知。
代碼實(shí)現(xiàn)分布式鎖
實(shí)現(xiàn)分布式鎖的方式有多種:數(shù)據(jù)庫(kù)、redis、zookeeper,這里使用zookeeper實(shí)現(xiàn)。
實(shí)現(xiàn)原理:
因?yàn)閆ookeeper節(jié)點(diǎn)路徑保持唯一,不允許重復(fù) 且有臨時(shí)節(jié)點(diǎn)特性連接關(guān)閉后當(dāng)前節(jié)點(diǎn)會(huì)自動(dòng)消失,從而實(shí)現(xiàn)分布式鎖。
- 多請(qǐng)求同時(shí)創(chuàng)建相同的節(jié)點(diǎn)(lockPath),只要誰(shuí)能夠創(chuàng)建成功 誰(shuí)就能夠獲取到鎖;
- 如果創(chuàng)建節(jié)點(diǎn)的時(shí)候,突然該節(jié)點(diǎn)已經(jīng)被其他請(qǐng)求創(chuàng)建的話則直接等待;
- 只要能夠創(chuàng)建節(jié)點(diǎn)成功,則開(kāi)始進(jìn)入到正常業(yè)務(wù)邏輯操作,其他沒(méi)有獲取鎖進(jìn)行等待;
- 正常業(yè)務(wù)邏輯流程執(zhí)行完后,調(diào)用zk關(guān)閉連接方式釋放鎖,從而是其他的請(qǐng)求開(kāi)始進(jìn)入到獲取鎖的資源。
使用zookeeper 實(shí)現(xiàn)分們式鎖的代碼案例
利用模板設(shè)計(jì)模式實(shí)現(xiàn)分布式鎖
1、定義鎖接口 Lock
public interface Lock {
/**
* 獲取鎖
*/
public void getLock();
/**
* 釋放鎖
*/
public void unLock();
}2、定義抽象類實(shí)現(xiàn)鎖接口 Lock ,設(shè)計(jì)其他的方法完成對(duì)鎖的操作
abstract class AbstractTemplzateLock implements Lock {
@Override
public void getLock() {
// 模版方法 定義共同抽象的骨架
if (tryLock()) {
System.out.println(">>>" + Thread.currentThread().getName() + ",獲取鎖成功");
} else {
// 開(kāi)始實(shí)現(xiàn)等待
waitLock();// 事件監(jiān)聽(tīng)
// 重新獲取
getLock();
}
}
/**
* 獲取鎖
* @return
*/
protected abstract boolean tryLock();
/**
* 等待鎖
* @return
*/
protected abstract void waitLock();
/**
* 釋放鎖
* @return
*/
protected abstract void unImplLock();
@Override
public void unLock() {
unImplLock();
}
}3、利用zookeeper實(shí)現(xiàn)鎖,繼承抽象類 AbstractTemplzateLock
public class ZkTemplzateImplLock extends AbstractTemplzateLock {
//參數(shù)1 連接地址
private static final String ADDRES = "192.168.212.147:2181";
// 參數(shù)2 zk超時(shí)時(shí)間
private static final int TIMEOUT = 5000;
// 創(chuàng)建我們的zk連接
private ZkClient zkClient = new ZkClient(ADDRES, TIMEOUT);
/**
* 共同的創(chuàng)建臨時(shí)節(jié)點(diǎn)
*/
private String lockPath = "/lockPath";
private CountDownLatch countDownLatch = null;
@Override
protected boolean tryLock() {
// 獲取鎖的思想:多個(gè)jvm同時(shí)創(chuàng)建臨時(shí)節(jié)點(diǎn),只要誰(shuí)能夠創(chuàng)建成功 誰(shuí)能夠獲取到鎖
try {
zkClient.createEphemeral(lockPath);
return true;
} catch (Exception e) {
// // 如果創(chuàng)建已經(jīng)存在的話
// e.printStackTrace();
return false;
}
}
@Override
protected void waitLock() {
// 1.使用事件監(jiān)聽(tīng) 監(jiān)聽(tīng)lockPath節(jié)點(diǎn)是否已經(jīng)被刪除,如果被刪除的情況下 有可以重新的進(jìn)入到獲取鎖的權(quán)限
IZkDataListener iZkDataListener = new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
}
@Override
public void handleDataDeleted(String s) throws Exception {
if (countDownLatch != null) {
countDownLatch.countDown();// 計(jì)數(shù)器變?yōu)?
}
}
};
zkClient.subscribeDataChanges(lockPath, iZkDataListener);
// 2.使用countDownLatch等待
if (countDownLatch == null) {
countDownLatch = new CountDownLatch(1);
}
try {
countDownLatch.await();// 如果當(dāng)前計(jì)數(shù)器不是為0 就一直等待
} catch (Exception e) {
}
// 3. 如果當(dāng)前節(jié)點(diǎn)被刪除的情況下,有需要重新進(jìn)入到獲取鎖
zkClient.unsubscribeDataChanges(lockPath, iZkDataListener);
}
@Override
protected void unImplLock() {
if (zkClient != null) {
zkClient.close();
System.out.println(Thread.currentThread().getName() + ",釋放了鎖>>>");
}
}
}4、編寫使用鎖的方法
//main方法使用多線程測(cè)試分布式鎖
public static void main(String[] args) {
// OrderService orderService = new OrderService();
for (int i = 0; i < 100; i++) {
new Thread(new OrderService()).start();
}
// 單個(gè)jvm中多線程同時(shí)生成訂單號(hào)碼如果發(fā)生重復(fù) 如何解決 synchronized或者是lock鎖
// 如果在多個(gè)jvm中同時(shí)生成訂單號(hào)碼如果發(fā)生重復(fù)如何解決
// 注意synchronized或者是lock鎖 只能夠在本地的jvm中有效
// 分布式鎖的概念
}
//多線程run 方法
public class OrderService implements Runnable {
private OrderNumGenerator orderNumGenerator = new OrderNumGenerator();
private Lock lock = new ZkTemplzateImplLock();
@Override
public void run() {
getNumber();
}
private void getNumber() {
try {
lock.getLock();
Thread.sleep(50);
String number = orderNumGenerator.getNumber();
System.out.println(Thread.currentThread().getName() + ",獲取的number:" + number);
// 如果zk超時(shí)了,有做數(shù)據(jù)庫(kù)寫的操作統(tǒng)一直接回滾
} catch (Exception e) {
} finally {
lock.unLock();
}
}
// ZkTemplzateImplLock父親 模版類 AbstractTemplzateLock 父親 Lock
}
//自動(dòng)生成訂單號(hào)的類
public class OrderNumGenerator {
/**
* 序號(hào)
*/
private static int count;
/**
* 生成我們的時(shí)間戳 為訂單號(hào)碼
* @return
*/
public String getNumber() {
SimpleDateFormat simpt = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss");
try {
Thread.sleep(30);
} catch (Exception e) {
}
return simpt.format(new Date()) + "-" + ++count;
}
}如何防止死鎖?
創(chuàng)建zkClient時(shí)設(shè)置session 連接時(shí)間 sessionTimeout。 也就是設(shè)置Session連接超時(shí)時(shí)間,在規(guī)定的時(shí)間內(nèi)獲取鎖后超時(shí)啦~自動(dòng)回滾當(dāng)前數(shù)據(jù)庫(kù)業(yè)務(wù)邏輯。
注意:等待鎖時(shí),zkClient注冊(cè)的事件最后需要?jiǎng)h除。
到此這篇關(guān)于Java zookeeper服務(wù)的使用詳解的文章就介紹到這了,更多相關(guān)Java zookeeper服務(wù)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
MySQL?MyBatis?默認(rèn)插入當(dāng)前時(shí)間方式
這篇文章主要介紹了MySQL?MyBatis?默認(rèn)插入當(dāng)前時(shí)間方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-10-10
Spring Boot之過(guò)濾器 Filter注入的方式解析
這篇文章主要介紹了Spring Boot之過(guò)濾器 Filter注入的方式解析,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11
SpringBoot + SpringSecurity 短信驗(yàn)證碼登錄功能實(shí)現(xiàn)
這篇文章主要介紹了SpringBoot + SpringSecurity 短信驗(yàn)證碼登錄功能實(shí)現(xiàn),小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-06-06
Spring Boot高效數(shù)據(jù)聚合之道深入講解
這篇文章主要給大家介紹了關(guān)于Spring Boot高效數(shù)據(jù)聚合之道的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用Spring Boot具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-06-06
logback和log4j日志框架堆棧信息添加TraceId方式
這篇文章主要介紹了logback和log4j日志框架堆棧信息添加TraceId方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-09-09
SpringBoot中實(shí)現(xiàn)Redis緩存預(yù)熱
緩存預(yù)熱是一種在系統(tǒng)啟動(dòng)后,但在實(shí)際使用前將數(shù)據(jù)加載到緩存中的技術(shù),本文主要來(lái)和大家一起探討如何在Spring Boot應(yīng)用程序中實(shí)現(xiàn)Redis緩存預(yù)熱,以確保系統(tǒng)在處理請(qǐng)求前就已經(jīng)處于最佳狀態(tài),感興趣的可以了解下2023-11-11
Java之多個(gè)線程順序循環(huán)執(zhí)行的幾種實(shí)現(xiàn)
這篇文章主要介紹了Java之多個(gè)線程順序循環(huán)執(zhí)行的幾種實(shí)現(xiàn)方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-09-09

