ZooKeeper命令及JavaAPI操作代碼
ZooKeeper數(shù)據(jù)模型
- ZooKeeper是一個(gè)樹(shù)形目錄服務(wù),其數(shù)據(jù)模型和Uiix的文件目錄樹(shù)很類(lèi)似,擁有一個(gè)層次化結(jié)構(gòu)。
- 這里面的每一個(gè)節(jié)點(diǎn)都被稱(chēng)為:ZNode,每個(gè)節(jié)點(diǎn)上都會(huì)保存自己的數(shù)據(jù)和節(jié)點(diǎn)信息。
- 節(jié)點(diǎn)可以擁有子節(jié)點(diǎn),同時(shí)也允許少量(1MB)數(shù)據(jù)存儲(chǔ)在該節(jié)點(diǎn)之下。
- 節(jié)點(diǎn)可以分為四大類(lèi):
- PEFSISTENT持久化節(jié)點(diǎn)
- EPHEMERAL臨時(shí)節(jié)點(diǎn):-e
- PERSISTENT_SEQUENTIAL持久化順序節(jié)點(diǎn):-s
- EPHEMERAL_SEQUENTIAL臨時(shí)順序節(jié)點(diǎn):-es
ZooKeeper服務(wù)端常用命令
- 啟動(dòng)ZooKeeper服務(wù):
./zkServer.sh start - 查看ZooKeeper服務(wù):
./zkServer.sh status - 停止ZooKeeper服務(wù):
./zkServer.sh stop - 重啟ZooKeeper服務(wù):
./zkServer.sh restart
ZooKeeper客戶端命令
- ./zkCli.sh -server localhost:2181連接服務(wù)端,如果是單機(jī)后面的可以省略不寫(xiě)。
- ls [/] :查看指定節(jié)點(diǎn)下子節(jié)點(diǎn)
- create [/app] [hrbu]:創(chuàng)建一個(gè)名為/app1的子節(jié)點(diǎn),并存放數(shù)據(jù)。
- get [/app] :獲取節(jié)點(diǎn)下的數(shù)據(jù)。
- set [/app] [hrbu]:給指定節(jié)點(diǎn)設(shè)置數(shù)據(jù)
- delete [/app] :刪除指定節(jié)點(diǎn) ps:此命令無(wú)法刪除存在子節(jié)點(diǎn)的節(jié)點(diǎn),如果要?jiǎng)h除帶有子節(jié)點(diǎn)的節(jié)點(diǎn)可以是使用deleteall [/app] 命令。
- quit 斷開(kāi)連接
- help 查看命令幫助
- create -e [/app] 創(chuàng)建臨時(shí)節(jié)點(diǎn),會(huì)話關(guān)閉就會(huì)刪除
- create -s [/app] 創(chuàng)建順序節(jié)點(diǎn)
- create -es [/app] 創(chuàng)建臨時(shí)順序節(jié)點(diǎn)
- ls -s [/app] 查看節(jié)點(diǎn)的詳細(xì)信息
使用Curator API操作Zookeeper
建立連接
@Test
public void testConnect() {
//重試策略
ExponentialBackoffRetry retry = new ExponentialBackoffRetry(3000, 10);
//第一種方式
CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.130.120:2181", 60 * 1000, 15 * 1000, retry);
//第二種方式
CuratorFramework client1 = CuratorFrameworkFactory.builder().connectString("192.168.130.120:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retry).namespace("hrbu").build();
//開(kāi)啟連接
client.start();
}參數(shù)解讀
- connectString – list of servers to connect to (ZooKeeper的地址)
sessionTimeoutMs – session timeout (會(huì)話超時(shí)時(shí)間)
connectionTimeoutMs – connection timeout (連接超時(shí)時(shí)間)
retryPolicy – retry policy to use (重試策略)

會(huì)話超時(shí)時(shí)間和連接超時(shí)時(shí)間有默認(rèn)值。
第二種鏈?zhǔn)骄幊痰姆绞娇梢灾付ㄒ粋€(gè)工作空間,在此客戶端下的所有操作都會(huì)將此工作空間作為根目錄。
注意
如果使用的是云服務(wù)器需要將指定端口打開(kāi)
firewall-cmd --zone=public --add-port=2181/tcp --permanent 開(kāi)放端口
firewall-cmd --zone=public --list-ports 查看已經(jīng)開(kāi)放的端口
systemctl restart firewalld 重啟防火墻生效
最后別忘了在服務(wù)器的安全組里面添加端口,將2181端口打開(kāi)
添加節(jié)點(diǎn)
@Test
public void testCreate1() throws Exception {
//基本創(chuàng)建
CreateBuilder createBuilder = client.create();
//創(chuàng)建時(shí)不指定數(shù)據(jù),會(huì)將當(dāng)前客戶端ip存到里面
createBuilder.forPath("/app1");
//指定數(shù)據(jù)
createBuilder.forPath("/app2", "hello".getBytes());
}
@Test
public void testCreate2() throws Exception {
CreateBuilder createBuilder = client.create();
//設(shè)置節(jié)點(diǎn)類(lèi)型,默認(rèn)的類(lèi)型是持久化
//CreateMode是枚舉類(lèi)型
createBuilder.withMode(CreateMode.EPHEMERAL).forPath("/app3");
}
@Test
public void testCreate3() throws Exception {
CreateBuilder createBuilder = client.create();
//創(chuàng)建多級(jí)節(jié)點(diǎn),如果父節(jié)點(diǎn)不存在,則創(chuàng)建父節(jié)點(diǎn)。
createBuilder.creatingParentContainersIfNeeded().forPath("/app4/app4_1");
}查詢(xún)節(jié)點(diǎn)
@Test
public void testGet() throws Exception {
//查詢(xún)數(shù)據(jù)
byte[] bytes = client.getData().forPath("/app1");
System.out.println(new String(bytes));
//查詢(xún)子節(jié)點(diǎn)
List<String> strings = client.getChildren().forPath("/app4");
strings.forEach(System.out::println);
//查詢(xún)節(jié)點(diǎn)狀態(tài)信息
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("/app1");
System.out.println(stat);
}修改節(jié)點(diǎn)
@Test
public void testSet() throws Exception {
//修改數(shù)據(jù)
client.setData().forPath("/app1","hrbu".getBytes());
//根據(jù)版本修改
int version = 0;
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("/app1");
version = stat.getVersion();
client.setData().withVersion(version).forPath("/app1", "HRBU".getBytes());
}刪除節(jié)點(diǎn)
@Test
public void testDelete() throws Exception {
//刪除單個(gè)節(jié)點(diǎn)
client.delete().forPath("/app4/app4_1");
//刪除帶有子節(jié)點(diǎn)的節(jié)點(diǎn)
client.delete().deletingChildrenIfNeeded().forPath("/app4");
//強(qiáng)制刪除
client.delete().guaranteed().forPath("/app4");
//回調(diào)
client.delete().guaranteed().inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("執(zhí)行刪除操作");
}
}).forPath("/app4");
}Watch事件監(jiān)聽(tīng)
- Zookeeper允許用戶在指定節(jié)點(diǎn)上注冊(cè)一些Watcher,并且在一些特定事件觸發(fā)的時(shí)候,ZooKeeper服務(wù)端會(huì)將事件通知到感興趣的客戶端上去,該機(jī)制是ZooKeeper實(shí)現(xiàn)分布式協(xié)調(diào)服務(wù)的重要特性。
- ZooKeeper中引入了Watcher機(jī)制來(lái)實(shí)現(xiàn)了發(fā)布/訂閱功能,能夠讓多個(gè)訂閱者同時(shí)監(jiān)聽(tīng)某一個(gè)對(duì)象,當(dāng)一個(gè)對(duì)象自身狀態(tài)變化時(shí),會(huì)通知所有訂閱者。
- ZooKeeper原生支持通過(guò)注冊(cè)Watcher來(lái)進(jìn)行事件監(jiān)聽(tīng),但是使用并不是特別方便,需要開(kāi)發(fā)人員自己反復(fù)注冊(cè)Watcher,比較繁瑣。
- Curator引入了Cache來(lái)時(shí)限對(duì)Zookeeper服務(wù)端事件的監(jiān)聽(tīng)。
- ZooKeeper提供了三種Watcher:
- NodeCache:只是監(jiān)聽(tīng)某一個(gè)特定的節(jié)點(diǎn)。
- PathChildrenCache:監(jiān)控一個(gè)Node的子節(jié)點(diǎn)。
- TreeCache:可以監(jiān)控整個(gè)樹(shù)上的所有節(jié)點(diǎn),類(lèi)似于PathChildrenCache和NodeCache的組合。
NodeCache
@Test
public void testNodeCache() throws Exception {
//NodeCache:指定一個(gè)節(jié)點(diǎn)注冊(cè)監(jiān)聽(tīng)器
//創(chuàng)建NodeCache對(duì)象
final NodeCache nodeCache = new NodeCache(client, "/app1");
//注冊(cè)監(jiān)聽(tīng)
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("app1節(jié)點(diǎn)發(fā)生變化");
//獲取修改節(jié)點(diǎn)后的數(shù)據(jù)
byte[] data = nodeCache.getCurrentData().getData();
System.out.println("變化后的節(jié)點(diǎn):"+new String(data));
}
});
//開(kāi)啟監(jiān)聽(tīng),如果為true,則開(kāi)啟則開(kāi)啟監(jiān)聽(tīng),加載緩沖數(shù)據(jù)
nodeCache.start(true);
}PathChildrenCache
@Test
public void testPathChildrenCache() throws Exception {
//PathChildrenCache:監(jiān)聽(tīng)某個(gè)節(jié)點(diǎn)的所有子節(jié)點(diǎn)
//創(chuàng)建監(jiān)聽(tīng)對(duì)象
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/hrbu", true);
//綁定監(jiān)聽(tīng)器
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
System.out.println("子節(jié)點(diǎn)發(fā)生變化");
System.out.println(pathChildrenCacheEvent);
//監(jiān)聽(tīng)子節(jié)點(diǎn)的數(shù)據(jù)變更,并且得到變更后的數(shù)據(jù)
//獲取類(lèi)型
PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();
//判斷類(lèi)型
if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
//獲取數(shù)據(jù)
byte[] data = pathChildrenCacheEvent.getData().getData();
System.out.println(new String(data));
}
}
});
//開(kāi)啟
pathChildrenCache.start();
}
TreeCache
@Test
public void testTreeCache() throws Exception {
//創(chuàng)建監(jiān)聽(tīng)器
TreeCache treeCache = new TreeCache(client, "/");
//注冊(cè)監(jiān)聽(tīng)
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
System.out.println("節(jié)點(diǎn)發(fā)生變化");
System.out.println(treeCacheEvent);
}
});
//開(kāi)啟
treeCache.start();
}分布式鎖實(shí)現(xiàn)
概述
- 我們?cè)谶M(jìn)行單機(jī)應(yīng)用開(kāi)發(fā),涉及并發(fā)同步的時(shí)候,,我們往往采用synchronized或者lock的方式來(lái)解決多線程間的代碼同步問(wèn)題,這時(shí)候多線程的運(yùn)行都是在同一個(gè)JVM之下,沒(méi)有任何問(wèn)題。
- 但當(dāng)我們的應(yīng)用時(shí)分布式集群工作的情況下,屬于多JVM下的工作環(huán)境,跨JVM之間已經(jīng)無(wú)法通過(guò)多線程的鎖解決同步問(wèn)題。
- 那么就需要一種更加高級(jí)的鎖機(jī)制,來(lái)處理跨機(jī)器進(jìn)程之間的數(shù)據(jù)同步問(wèn)題,這就是分布式鎖。
Zookeeper分布式鎖原理
- 核心思想:當(dāng)客戶端要獲取鎖,則創(chuàng)建節(jié)點(diǎn),使用完鎖,則刪除該節(jié)點(diǎn)。
- 1.客戶端獲取鎖時(shí),在lock節(jié)點(diǎn)下創(chuàng)建臨時(shí)順序節(jié)點(diǎn)。
- 2.然后獲取lock下面的所有子節(jié)點(diǎn),客戶端獲取到所有的子節(jié)之后,如果發(fā)現(xiàn)自己創(chuàng)建的子節(jié)點(diǎn)序號(hào)最小,那么就認(rèn)為該客戶端獲取到了鎖。使用完鎖后,將該節(jié)點(diǎn)刪除。
- 3.如果發(fā)現(xiàn)自己創(chuàng)建的節(jié)點(diǎn)并非lock所有子節(jié)點(diǎn)中最小的,說(shuō)明自己還沒(méi)有獲取到鎖,此時(shí)客戶端需要找到比自己小的那個(gè)節(jié)點(diǎn),同時(shí)對(duì)其注冊(cè)事件監(jiān)聽(tīng)器,監(jiān)聽(tīng)刪除事件。
- 4.如果發(fā)現(xiàn)比自己小的那個(gè)節(jié)點(diǎn)被刪除,則客戶端的Watcher會(huì)收到相應(yīng)通知,此時(shí)再次判斷自己創(chuàng)建的節(jié)點(diǎn)是否時(shí)lock子節(jié)點(diǎn)中序號(hào)最小的,如果是則獲取到了鎖,如果不是則重復(fù)以上步驟繼續(xù)獲取比自己小的一個(gè)節(jié)點(diǎn)并注冊(cè)監(jiān)聽(tīng)。
Curator實(shí)現(xiàn)分布式鎖API
在Curator中有五種鎖方案:
- InterProcessSemaphoreMutex:分布式排它鎖(非可重入鎖)
- InterProcessMutex:分布式可重入排它鎖
- InterProcessReadWriteLock:分布式讀寫(xiě)鎖
- InterProcessMultiLock:將多個(gè)鎖作為單個(gè)實(shí)體管理的容器
- InterProcessSemaphoreV2:共享信號(hào)量
package com.hrbu.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.concurrent.TimeUnit;
public class Ticket12306 implements Runnable{
private int tickets = 10;//數(shù)據(jù)庫(kù)的票數(shù)
private InterProcessMutex lock ;
public Ticket12306(){
//重試策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("8.130.32.75:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy)
.build();
//開(kāi)啟連接
client.start();
lock = new InterProcessMutex(client,"/lock");
}
@Override
public void run() {
while(true){
//獲取鎖
try {
lock.acquire(3, TimeUnit.SECONDS);
if(tickets > 0){
System.out.println(Thread.currentThread()+":"+tickets);
Thread.sleep(100);
tickets--;
}
} catch (Exception e) {
e.printStackTrace();
}finally {
//釋放鎖
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}package com.hrbu.curator;
public class LockTest {
public static void main(String[] args) {
Ticket12306 ticket12306 = new Ticket12306();
//創(chuàng)建客戶端
Thread t1 = new Thread(ticket12306,"攜程");
Thread t2 = new Thread(ticket12306,"飛豬");
t1.start();
t2.start();
}
}到此這篇關(guān)于ZooKeeper命令及JavaAPI操作的文章就介紹到這了,更多相關(guān)ZooKeeper JavaAPI操作內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Mybatis Plus 增刪改查的實(shí)現(xiàn)(小白教程)
本文主要介紹了Mybatis Plus 增刪改查,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-09-09
SpringBoot2.0整合SpringCloud Finchley @hystrixcommand注解找不到解決方案
這篇文章主要介紹了SpringBoot2.0整合SpringCloud Finchley @hystrixcommand注解找不到解決方案,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-08-08
SpringBoot通過(guò)Filter實(shí)現(xiàn)整個(gè)項(xiàng)目接口的SQL注入攔截詳解
這篇文章主要介紹了SpringBoot通過(guò)Filter實(shí)現(xiàn)整個(gè)項(xiàng)目接口的SQL注入攔截詳解,SQL注入是比較常見(jiàn)的網(wǎng)絡(luò)攻擊方式之一,在客戶端在向服務(wù)器發(fā)送請(qǐng)求的時(shí)候,sql命令通過(guò)表單提交或者url字符串拼接傳遞到后臺(tái)持久層,最終達(dá)到欺騙服務(wù)器執(zhí)行惡意的SQL命令,需要的朋友可以參考下2023-12-12
解決SpringBoot引用別的模塊無(wú)法注入的問(wèn)題
這篇文章主要介紹了解決SpringBoot引用別的模塊無(wú)法注入的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-02-02
java Volatile與Synchronized的區(qū)別
這篇文章主要介紹了java Volatile與Synchronized的區(qū)別,幫助大家更好的理解和使用Java,感興趣的朋友可以了解下2020-12-12
RestTemplate自定義請(qǐng)求失敗異常處理示例解析
這篇文章主要為大家介紹了RestTemplate自定義請(qǐng)求失敗異常處理的示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步早日升職加薪2022-03-03

