詳解Zookeeper基礎(chǔ)知識
1. 簡介
zookeeper是一個(gè)開源的分布式協(xié)調(diào)服務(wù), 提供分布式數(shù)據(jù)一致性解決方案,分布式應(yīng)用程序可以實(shí)現(xiàn)數(shù)據(jù)統(tǒng)一配置管理、統(tǒng)一命名服務(wù)、分布式鎖、集群管理等功能.
ZooKeeper主要服務(wù)于分布式系統(tǒng),使用分布式系統(tǒng)就無法避免對節(jié)點(diǎn)管理的問題(需要實(shí)時(shí)感知節(jié)點(diǎn)的狀態(tài)、對節(jié)點(diǎn)進(jìn)行統(tǒng)一管理等等),而由于這些問題處理起來可能相對麻煩和提高了系統(tǒng)的復(fù)雜性,ZooKeeper作為一個(gè)能夠通用解決這些問題的中間件就應(yīng)運(yùn)而生了。
2. 數(shù)據(jù)模型
2.1 模型結(jié)構(gòu)
ZooKeeper 提供的命名空間很像標(biāo)準(zhǔn)文件系統(tǒng)的命名空間。名稱是由斜杠 (/) 分隔的一系列路徑元素。ZooKeeper 命名空間中的每個(gè)節(jié)點(diǎn)都由路徑標(biāo)識。
與典型的為存儲而設(shè)計(jì)的文件系統(tǒng)不同,ZooKeeper 數(shù)據(jù)保存在內(nèi)存中,這意味著 ZooKeeper 可以實(shí)現(xiàn)高吞吐量和低延遲。

2.2 模型的特點(diǎn)
- 每個(gè)子目錄如
/app1都被稱作一個(gè)znode(節(jié)點(diǎn))。這個(gè)znode是被它所在的路徑唯一標(biāo)識。 - znode可以有子節(jié)點(diǎn)目錄,并且每個(gè)znode可以存儲數(shù)據(jù)。
- znode是有版本的,每個(gè)znode中存儲的數(shù)據(jù)可以有多個(gè)版本,也就是一個(gè)訪問路徑中可以存儲多份數(shù)據(jù)。每次 znode 的數(shù)據(jù)更改時(shí),版本號都會增加。例如,每當(dāng)客戶端檢索數(shù)據(jù)時(shí),它也會收到數(shù)據(jù)的版本。
- 每個(gè)znode都有一個(gè)訪問控制列表 (ACL),它限制了誰可以做什么。
- znode可以被監(jiān)控,包括這個(gè)目錄中存儲的數(shù)據(jù)的修改,子節(jié)點(diǎn)目錄變化等,一旦變化可以通知設(shè)置監(jiān)控的客戶端。
⚠️注意:當(dāng)使用zkCli.sh 創(chuàng)建會話時(shí),節(jié)點(diǎn)的監(jiān)聽事件只能被觸發(fā)一次。
2.3 節(jié)點(diǎn)分類
2.3.1 Persistent
持久節(jié)點(diǎn):節(jié)點(diǎn)被創(chuàng)建后,就一直存在,除非客戶端主動(dòng)刪除這個(gè)節(jié)點(diǎn)。
2.3.2 Persistent Sequential
持久順序節(jié)點(diǎn):有序的持久節(jié)點(diǎn)。在zk中,每個(gè)父節(jié)點(diǎn)會為它的第一級子節(jié)點(diǎn)維護(hù)一份時(shí)序,會記錄每個(gè)子節(jié)點(diǎn)創(chuàng)建的先后順序。例如:
$ create -s /app1 # 創(chuàng)建 /app1 節(jié)點(diǎn) Created /app10000000000 # 創(chuàng)建成功后的節(jié)點(diǎn)名稱為 /app10000000000
2.3.3 Ephemeral
臨時(shí)節(jié)點(diǎn):和持久節(jié)點(diǎn)不同的是,臨時(shí)節(jié)點(diǎn)的生命周期和客戶端會話綁定且臨時(shí)節(jié)點(diǎn)下面不能創(chuàng)建子節(jié)點(diǎn)。如果客戶端會話失效,那么這個(gè)節(jié)點(diǎn)就會自動(dòng)被清除掉。
注意:客戶端失效臨時(shí)節(jié)點(diǎn)會被清除,但如果是斷開鏈接,臨時(shí)節(jié)點(diǎn)并不會立馬被清除。
“立馬”:在會話超時(shí)持續(xù)時(shí)間內(nèi)未從客戶端收到任何心跳信號之后,zk服務(wù)器將刪除該會話的臨時(shí)節(jié)點(diǎn)。但如果正常關(guān)閉會話,臨時(shí)節(jié)點(diǎn)會立馬被清除。
# 1. 創(chuàng)建會話 $ ./zkCli.sh # 2. 創(chuàng)建臨時(shí)節(jié)點(diǎn) /app3 $ create -e /app3 Created /app3 # 3. ctrl + c 關(guān)閉會話 # 4. 緊接著再次創(chuàng)建會話 $ ./zkCli.sh # 5. 查看節(jié)點(diǎn)內(nèi)容 發(fā)現(xiàn)臨時(shí)節(jié)點(diǎn)依舊存在 $ ls / [app10000000000, app3, zookeeper] # 6. 等待幾秒,再次查看 發(fā)現(xiàn)臨時(shí)節(jié)點(diǎn)消失 $ ls / [app10000000000, zookeeper] # 7. 再次創(chuàng)建臨時(shí)節(jié)點(diǎn) /app3 $ create -e /app3 Created /app3 # 8. 正常關(guān)閉會話 $ quit # 9. 再次創(chuàng)建會話 臨時(shí)節(jié)點(diǎn)消失 $ ./zkCli.sh $ ls / [app10000000000, zookeeper]
2.3.4 Ephemeral Sequential
臨時(shí)順序節(jié)點(diǎn):有序的臨時(shí)節(jié)點(diǎn)。創(chuàng)建es節(jié)點(diǎn)時(shí),zk會維護(hù)一份時(shí)序,會記錄每個(gè)節(jié)點(diǎn)的順序。例如:
$ create -s -e /app2 # 創(chuàng)建臨時(shí)有序節(jié)點(diǎn) /app2 Created /app20000000001 # 創(chuàng)建成功后的節(jié)點(diǎn)名稱為 app20000000001
3. 安裝
3.1 官方
官方地址:https://zookeeper.apache.org/releases.html
3.2 docker
$ docker run --name zookeeper --restart always -d zookeeper
3.3 docker-compose
⚠️注意:執(zhí)行腳本前,需先將配置文件掛載到宿主機(jī)上。
version: '3.1'
services:
zk:
image: zookeeper
restart: always
container_name: zookeeper
ports:
- 2181:2181
volumes:
- ./data:/data
- ./logs:/datalog
- ./conf/zoo.cfg:/conf/zoo.cfg
3.4 配置信息
# 數(shù)據(jù)存儲目錄 dataDir=/data # 日志存儲目錄 dataLogDir=/datalog # 集群模式下 節(jié)點(diǎn)之間的心跳時(shí)間2s(每2s進(jìn)行一次心跳檢測) tickTime=2000 # 集群初始化時(shí) 節(jié)點(diǎn)之間同步次數(shù)。(5 * 2s = 10s 10s內(nèi)未初始化成功則初始化失敗) initLimit=5 # 集群模式下 同步數(shù)據(jù)次數(shù)。 (2 * 2s = 4s 4s內(nèi)未同步則超時(shí)失?。? syncLimit=2 # 數(shù)據(jù)快照保留個(gè)數(shù) autopurge.snapRetainCount=3 # 數(shù)據(jù)快照清除時(shí)間間隔(單位為小時(shí)) 0:不清除 ,1:1小時(shí) autopurge.purgeInterval=0 # 最大的客戶端鏈接數(shù) maxClientCnxns=60
4. 基礎(chǔ)命令
不同版本之前的命令會有所差異,本章是基于zk:3.7版本。官方地址
4.1 創(chuàng)建會話
首先執(zhí)行命令,創(chuàng)建新的會話,進(jìn)入終端。
# 進(jìn)入到zk的安裝目錄的bin目錄下執(zhí)行 $ ./zkCli.sh # 如果zk不是在本機(jī) 則可以使用server參數(shù)指定鏈接地址 $ ./zkCli.sh -server 127.0.0.1:2181
4.2 ls
ls [-s] [-w] [-R] path:ls 命令用于查看某個(gè)路徑下目錄列表。
-s:查看此znode狀態(tài)信息。-w:監(jiān)聽此znode目錄變化。-R:遞歸查看
$ ls / # 查看根目錄 [app, app10000000000, zookeeper] $ ls /app [] $ ls -s /app [app01, app02] cZxid = 0x1b ctime = Sun Aug 29 13:07:24 UTC 2021 mZxid = 0x1b mtime = Sun Aug 29 13:07:24 UTC 2021 pZxid = 0x2e cversion = 2 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 0 numChildren = 2
監(jiān)聽znode目錄變化:

4.3 create
create [-s] [-e] [-c] [-t ttl] path [data] [acl]:create 用于創(chuàng)建znode。
-s:創(chuàng)建順序節(jié)點(diǎn)。-e:創(chuàng)建臨時(shí)節(jié)點(diǎn)。-c:創(chuàng)建一個(gè)容器節(jié)點(diǎn),當(dāng)容器中的最后一個(gè)子節(jié)點(diǎn)被刪除時(shí),容器也隨之消失。-t:設(shè)置節(jié)點(diǎn)存活時(shí)間(毫秒)。
⚠️注意:如果要設(shè)置超時(shí)時(shí)間需在配置文件中激活配置:
zookeeper.extendedTypesEnabled=true
data:設(shè)置的數(shù)據(jù)。acl:訪問控制配置。
$ ls / [app, app10000000000, zookeeper] # 創(chuàng)建持久節(jié)點(diǎn) /app3 $ create /app3 zhangtieniu Created /app3 # 創(chuàng)建有序的持久節(jié)點(diǎn) /app4 $ create -s /app4 zhangsan Created /app40000000011 # 創(chuàng)建臨時(shí)節(jié)點(diǎn) /app5 $ create -e /app5 linshi Created /app5 # 創(chuàng)建臨時(shí)有序節(jié)點(diǎn) /app6 $ create -s -e /app6 linshiyouxu Created /app60000000013 $ ls / [app, app10000000000, app3, app40000000011, app5, app60000000013, zookeeper]
4.4 get
get [-s] [-w] path: 命令用于獲取節(jié)點(diǎn)數(shù)據(jù)和狀態(tài)信息。
-s:查看此znode狀態(tài)信息。-w:監(jiān)聽此znode值變化。
$ get /app3 # 查看/app3 znode內(nèi)容 zhangtieniu
監(jiān)聽znod值變化:

4.5 stat
stat [-w] path:命令用于查看節(jié)點(diǎn)狀態(tài)信息。
-w:監(jiān)聽節(jié)點(diǎn)狀態(tài)變化。
$ stat /app3 cZxid = 0x34 # 創(chuàng)建節(jié)點(diǎn)時(shí)的事務(wù)id ctime = Sun Aug 29 14:31:52 UTC 2021 # 創(chuàng)建時(shí)間 mZxid = 0x34 # 創(chuàng)建的版本號 mtime = Sun Aug 29 14:31:52 UTC 2021 # 修改時(shí)間 pZxid = 0x34 # 子節(jié)點(diǎn)列表最后一次被修改的事務(wù)id cversion = 0 # 節(jié)點(diǎn)版本號 dataVersion = 0 # 數(shù)據(jù)版本號 aclVersion = 0 # 權(quán)限版本號 ephemeralOwner = 0x0 # 臨時(shí)擁有者 dataLength = 11 # 節(jié)點(diǎn)值數(shù)據(jù)長度 numChildren = 0 # 子節(jié)點(diǎn)數(shù)量
4.6 set
set [-s] [-v version] path data: 命令用于修改節(jié)點(diǎn)存儲的數(shù)據(jù)。
-s:查看設(shè)置成功后的狀態(tài)信息。
-v:指定設(shè)置值的版本號,該值只能為該節(jié)點(diǎn)的最新版本??梢允褂闷鋵?shí)現(xiàn)樂觀鎖。
$ set /app wangmazi $ set -s /app lisi cZxid = 0x1b ctime = Sun Aug 29 13:07:24 UTC 2021 mZxid = 0x43 mtime = Sun Aug 29 14:50:52 UTC 2021 pZxid = 0x3f cversion = 4 dataVersion = 6 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 4 numChildren = 4
4.7 delete
delete [-v version] path:刪除指定節(jié)點(diǎn)。
⚠️注意:當(dāng)刪除的節(jié)點(diǎn)有子節(jié)點(diǎn)時(shí),不能使用該命令。需使用deleteall命令刪除。
-v:刪除指定版本。與set同理
$ delete /app3
4.8 quit
關(guān)閉會話。
5. 節(jié)點(diǎn)的監(jiān)聽機(jī)制
zk的監(jiān)聽機(jī)制,可以使客戶端可以監(jiān)聽znode節(jié)點(diǎn)的變化,znode節(jié)點(diǎn)的變化出發(fā)相應(yīng)的事件,然后清除對該節(jié)點(diǎn)的檢測。
⚠️注意:在這里再強(qiáng)調(diào)一次,當(dāng)使用zkCli.sh 創(chuàng)建會話時(shí),對znode的監(jiān)聽只能觸發(fā)一次。但在使用java客戶端鏈接時(shí),可以一直觸發(fā)。
$ ls -w /path # 監(jiān)聽節(jié)點(diǎn)目錄的變化 $ get -w /path # 監(jiān)聽節(jié)點(diǎn)數(shù)據(jù)的變化
6. quick start
6.1 項(xiàng)目結(jié)構(gòu)

.
├── pom.xml
└── src
└── main
├── java
│ └── com
│ └── ldx
│ └── zookeeper
│ ├── ZookeeperApplication.java # 啟動(dòng)類
│ ├── config
│ │ ├── CuratorClientConfig.java # zk配置類
│ │ └── CuratorClientProperties.java # zk 配置屬性文件
│ ├── controller
│ │ └── AppController.java # zk 測試文件
│ └── util
│ └── CuratorClient.java # zk工具類
└── resources
└── application.yaml # 服務(wù)配置文件
6.2 引入依賴
Curator 是 Netflix 公司開源的一套 zookeeper 客戶端框架,解決了很多 Zookeeper 客戶端非常底層的細(xì)節(jié)開發(fā)工作,包括連接重連、反復(fù)注冊 Watcher 和 NodeExistsException 異常等。
curator-recipes:封裝了一些高級特性,如:Cache 事件監(jiān)聽、選舉、分布式鎖、分布式計(jì)數(shù)器、分布式 Barrier 等。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.4</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.ldx</groupId>
<artifactId>zookeeper</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>zookeeper</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- client 操作工具包 -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
6.3 application.
server:
port: 8080
# curator配置
curator-client:
# 連接字符串
connection-string: localhost:2181
# 根節(jié)點(diǎn)
namespace: ldx
# 節(jié)點(diǎn)數(shù)據(jù)編碼
charset: utf8
# session超時(shí)時(shí)間
session-timeout-ms: 60000
# 連接超時(shí)時(shí)間
connection-timeout-ms: 15000
# 關(guān)閉連接超時(shí)時(shí)間
max-close-wait-ms: 1000
# 默認(rèn)數(shù)據(jù)
default-data: ""
# 當(dāng)半數(shù)以上zookeeper服務(wù)出現(xiàn)故障仍然提供讀服務(wù)
can-be-read-only: false
# 自動(dòng)創(chuàng)建父節(jié)點(diǎn)
use-container-parents-if-available: true
# 重試策略,默認(rèn)使用BoundedExponentialBackoffRetry
retry:
max-sleep-time-ms: 10000
base-sleep-time-ms: 1000
max-retries: 3
# 認(rèn)證信息
#auth:
#scheme: digest
# auth: username:password
6.4 CuratorClientProperties
package com.ldx.zookeeper.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* zk 屬性配置類
*
* @author ludangxin
* @date 2021/8/31
*/
@Data
@ConfigurationProperties(prefix = "curator-client")
public class CuratorClientProperties {
/**
* 連接地址
*/
private String connectionString;
/**
* 命名空間
*/
private String namespace;
/**
* 字符集
*/
private String charset = "utf8";
/**
* 會話超時(shí)時(shí)間 毫秒
*/
private int sessionTimeoutMs = 60000;
/**
* 連接超時(shí)時(shí)間 毫秒
*/
private int connectionTimeoutMs = 15000;
/**
* 最大關(guān)閉等待時(shí)間 毫秒
*/
private int maxCloseWaitMs = 1000;
/**
* 默認(rèn)數(shù)據(jù)
*/
private String defaultData = "";
/**
* 當(dāng)半數(shù)以上zookeeper服務(wù)出現(xiàn)故障仍然提供讀服務(wù)
*/
private boolean canBeReadOnly = false;
/**
* 自動(dòng)創(chuàng)建父節(jié)點(diǎn)
*/
private boolean useContainerParentsIfAvailable = true;
/**
* 線程池名稱
*/
private String threadFactoryClassName;
private Retry retry = new Retry();
private Auth auth = new Auth();
@Data
public static class Retry {
private int maxSleepTimeMs = 10000;
private int baseSleepTimeMs = 1000;
private int maxRetries = 3;
}
@Data
public static class Auth {
private String scheme = "digest";
private String auth;
}
}
6.5 CuratorClientConfig
package com.ldx.zookeeper.config;
import com.ldx.zookeeper.util.CuratorClient;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.ensemble.EnsembleProvider;
import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.api.CompressionProvider;
import org.apache.curator.framework.imps.GzipCompressionProvider;
import org.apache.curator.retry.BoundedExponentialBackoffRetry;
import org.apache.curator.utils.DefaultZookeeperFactory;
import org.apache.curator.utils.ZookeeperFactory;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.nio.charset.Charset;
import java.util.List;
import java.util.concurrent.ThreadFactory;
/**
* zk 配置類
*
* @author ludangxin
* @date 2021/8/31
*/
@Slf4j
@Configuration
@EnableConfigurationProperties(CuratorClientProperties.class)
public class CuratorClientConfig {
@Bean
public EnsembleProvider ensembleProvider(CuratorClientProperties curatorClientProperties) {
return new FixedEnsembleProvider(curatorClientProperties.getConnectionString());
}
@Bean
public RetryPolicy retryPolicy(CuratorClientProperties curatorClientProperties) {
CuratorClientProperties.Retry retry = curatorClientProperties.getRetry();
return new BoundedExponentialBackoffRetry(retry.getBaseSleepTimeMs(), retry.getMaxSleepTimeMs(), retry.getMaxRetries());
}
@Bean
public CompressionProvider compressionProvider() {
return new GzipCompressionProvider();
}
@Bean
public ZookeeperFactory zookeeperFactory() {
return new DefaultZookeeperFactory();
}
@Bean
public ACLProvider aclProvider() {
return new ACLProvider() {
@Override
public List<ACL> getDefaultAcl() {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
@Override
public List<ACL> getAclForPath(String path) {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
};
}
@Bean
@SneakyThrows
public CuratorFrameworkFactory.Builder builder(EnsembleProvider ensembleProvider,
RetryPolicy retryPolicy,
CompressionProvider compressionProvider,
ZookeeperFactory zookeeperFactory,
ACLProvider aclProvider,
CuratorClientProperties curatorClientProperties) {
String charset = curatorClientProperties.getCharset();
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.ensembleProvider(ensembleProvider)
.retryPolicy(retryPolicy)
.compressionProvider(compressionProvider)
.zookeeperFactory(zookeeperFactory)
.namespace(curatorClientProperties.getNamespace())
.sessionTimeoutMs(curatorClientProperties.getSessionTimeoutMs())
.connectionTimeoutMs(curatorClientProperties.getConnectionTimeoutMs())
.maxCloseWaitMs(curatorClientProperties.getMaxCloseWaitMs())
.defaultData(curatorClientProperties.getDefaultData().getBytes(Charset.forName(charset)))
.canBeReadOnly(curatorClientProperties.isCanBeReadOnly());
if (!curatorClientProperties.isUseContainerParentsIfAvailable()) {
builder.dontUseContainerParents();
}
CuratorClientProperties.Auth auth = curatorClientProperties.getAuth();
if (StringUtils.isNotBlank(auth.getAuth())) {
builder.authorization(auth.getScheme(), auth.getAuth().getBytes(Charset.forName(charset)));
builder.aclProvider(aclProvider);
}
String threadFactoryClassName = curatorClientProperties.getThreadFactoryClassName();
if (StringUtils.isNotBlank(threadFactoryClassName)) {
try {
Class<?> cls = Class.forName(threadFactoryClassName);
ThreadFactory threadFactory = (ThreadFactory) cls.newInstance();
builder.threadFactory(threadFactory);
} catch (Exception e) {
log.error("init CuratorClient error", e);
}
}
return builder;
}
@Bean(initMethod = "init", destroyMethod = "stop")
public CuratorClient curatorClient(CuratorFrameworkFactory.Builder builder) {
return new CuratorClient(builder);
}
}
6.6 CuratorClient
package com.ldx.zookeeper.util;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.framework.recipes.locks.*;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.zookeeper.CreateMode;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
/**
* zookeeper工具類
*
* @author ludangxin
* @date 2021/8/30
*/
@Slf4j
public class CuratorClient {
/**
* 默認(rèn)的字符編碼集
*/
private static final String DEFAULT_CHARSET = "utf8";
/**
* 客戶端
*/
private final CuratorFramework client;
/**
* 字符集
*/
private String charset = DEFAULT_CHARSET;
@SneakyThrows
public CuratorClient(CuratorFrameworkFactory.Builder builder) {
client = builder.build();
}
@SneakyThrows
public CuratorClient(CuratorFrameworkFactory.Builder builder, String charset) {
client = builder.build();
this.charset = charset;
}
public void init() {
client.start();
client.getConnectionStateListenable().addListener((client, state) -> {
if (state==ConnectionState.LOST) {
// 連接丟失
log.info("lost session with zookeeper");
} else if (state==ConnectionState.CONNECTED) {
// 連接新建
log.info("connected with zookeeper");
} else if (state==ConnectionState.RECONNECTED) {
// 重新連接
log.info("reconnected with zookeeper");
}
});
}
/**
* 關(guān)閉會話
*/
public void stop() {
log.info("zookeeper session close");
client.close();
}
/**
* 創(chuàng)建節(jié)點(diǎn)
*
* @param mode 節(jié)點(diǎn)類型
* 1、PERSISTENT 持久化目錄節(jié)點(diǎn),存儲的數(shù)據(jù)不會丟失。
* 2、PERSISTENT_SEQUENTIAL順序自動(dòng)編號的持久化目錄節(jié)點(diǎn),存儲的數(shù)據(jù)不會丟失
* 3、EPHEMERAL臨時(shí)目錄節(jié)點(diǎn),一旦創(chuàng)建這個(gè)節(jié)點(diǎn)的客戶端與服務(wù)器端口也就是session 超時(shí),這種節(jié)點(diǎn)會被自動(dòng)刪除
* 4、EPHEMERAL_SEQUENTIAL臨時(shí)自動(dòng)編號節(jié)點(diǎn),一旦創(chuàng)建這個(gè)節(jié)點(diǎn)的客戶端與服務(wù)器端口也就是session 超時(shí),這種節(jié)點(diǎn)會被自動(dòng)刪除,并且根據(jù)當(dāng)前已經(jīng)存在的節(jié)點(diǎn)數(shù)自動(dòng)加 1,然后返回給客戶端已經(jīng)成功創(chuàng)建的目錄節(jié)點(diǎn)名。
* @param path 節(jié)點(diǎn)名稱
* @param nodeData 節(jié)點(diǎn)數(shù)據(jù)
*/
@SneakyThrows
public void createNode(CreateMode mode, String path, String nodeData) {
// 使用creatingParentContainersIfNeeded()之后Curator能夠自動(dòng)遞歸創(chuàng)建所有所需的父節(jié)點(diǎn)
client.create().creatingParentsIfNeeded().withMode(mode).forPath(path, nodeData.getBytes(Charset.forName(charset)));
}
/**
* 創(chuàng)建節(jié)點(diǎn)
*
* @param mode 節(jié)點(diǎn)類型
* 1、PERSISTENT 持久化目錄節(jié)點(diǎn),存儲的數(shù)據(jù)不會丟失。
* 2、PERSISTENT_SEQUENTIAL順序自動(dòng)編號的持久化目錄節(jié)點(diǎn),存儲的數(shù)據(jù)不會丟失
* 3、EPHEMERAL臨時(shí)目錄節(jié)點(diǎn),一旦創(chuàng)建這個(gè)節(jié)點(diǎn)的客戶端與服務(wù)器端口也就是session 超時(shí),這種節(jié)點(diǎn)會被自動(dòng)刪除
* 4、EPHEMERAL_SEQUENTIAL臨時(shí)自動(dòng)編號節(jié)點(diǎn),一旦創(chuàng)建這個(gè)節(jié)點(diǎn)的客戶端與服務(wù)器端口也就是session 超時(shí),這種節(jié)點(diǎn)會被自動(dòng)刪除,并且根據(jù)當(dāng)前已經(jīng)存在的節(jié)點(diǎn)數(shù)自動(dòng)加 1,然后返回給客戶端已經(jīng)成功創(chuàng)建的目錄節(jié)點(diǎn)名。
* @param path 節(jié)點(diǎn)名稱
*/
@SneakyThrows
public void createNode(CreateMode mode, String path) {
// 使用creatingParentContainersIfNeeded()之后Curator能夠自動(dòng)遞歸創(chuàng)建所有所需的父節(jié)點(diǎn)
client.create().creatingParentsIfNeeded().withMode(mode).forPath(path);
}
/**
* 刪除節(jié)點(diǎn)數(shù)據(jù)
*
* @param path 節(jié)點(diǎn)名稱
*/
@SneakyThrows
public void deleteNode(final String path) {
deleteNode(path, true);
}
/**
* 刪除節(jié)點(diǎn)數(shù)據(jù)
*
* @param path 節(jié)點(diǎn)名稱
* @param deleteChildre 是否刪除子節(jié)點(diǎn)
*/
@SneakyThrows
public void deleteNode(final String path, Boolean deleteChildre) {
if (deleteChildre) {
// guaranteed()刪除一個(gè)節(jié)點(diǎn),強(qiáng)制保證刪除,
// 只要客戶端會話有效,那么Curator會在后臺持續(xù)進(jìn)行刪除操作,直到刪除節(jié)點(diǎn)成功
client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);
} else {
client.delete().guaranteed().forPath(path);
}
}
/**
* 設(shè)置指定節(jié)點(diǎn)的數(shù)據(jù)
*
* @param path 節(jié)點(diǎn)名稱
* @param data 節(jié)點(diǎn)數(shù)據(jù)
*/
@SneakyThrows
public void setNodeData(String path, String data) {
client.setData().forPath(path, data.getBytes(Charset.forName(charset)));
}
/**
* 獲取指定節(jié)點(diǎn)的數(shù)據(jù)
*
* @param path 節(jié)點(diǎn)名稱
* @return 節(jié)點(diǎn)數(shù)據(jù)
*/
@SneakyThrows
public String getNodeData(String path) {
return new String(client.getData().forPath(path), Charset.forName(charset));
}
/**
* 獲取數(shù)據(jù)時(shí)先同步
*
* @param path 節(jié)點(diǎn)名稱
* @return 節(jié)點(diǎn)數(shù)據(jù)
*/
public String synNodeData(String path) {
client.sync();
return getNodeData(path);
}
/**
* 判斷節(jié)點(diǎn)是否存在
*
* @param path 節(jié)點(diǎn)名稱
* @return true 節(jié)點(diǎn)存在,false 節(jié)點(diǎn)不存在
*/
@SneakyThrows
public boolean isExistNode(final String path) {
client.sync();
return Objects.nonNull(client.checkExists().forPath(path));
}
/**
* 獲取節(jié)點(diǎn)的子節(jié)點(diǎn)
*
* @param path 節(jié)點(diǎn)名稱
* @return 子節(jié)點(diǎn)集合
*/
@SneakyThrows
public List<String> getChildren(String path) {
return client.getChildren().forPath(path);
}
/**
* 創(chuàng)建排他鎖
*
* @param path 節(jié)點(diǎn)名稱
* @return 排他鎖
*/
public InterProcessSemaphoreMutex getSemaphoreMutexLock(String path) {
return new InterProcessSemaphoreMutex(client, path);
}
/**
* 創(chuàng)建可重入排他鎖
*
* @param path 節(jié)點(diǎn)名稱
* @return 可重入排他鎖
*/
public InterProcessMutex getMutexLock(String path) {
return new InterProcessMutex(client, path);
}
/**
* 創(chuàng)建一組可重入排他鎖
*
* @param paths 節(jié)點(diǎn)名稱集合
* @return 鎖容器
*/
public InterProcessMultiLock getMultiMutexLock(List<String> paths) {
return new InterProcessMultiLock(client, paths);
}
/**
* 創(chuàng)建一組任意類型的鎖
*
* @param locks 鎖集合
* @return 鎖容器
*/
public InterProcessMultiLock getMultiLock(List<InterProcessLock> locks) {
return new InterProcessMultiLock(locks);
}
/**
* 加鎖
*
* @param lock 分布式鎖對象
*/
@SneakyThrows
public void acquire(InterProcessLock lock) {
lock.acquire();
}
/**
* 加鎖
*
* @param lock 分布式鎖對象
* @param time 等待時(shí)間
* @param unit 時(shí)間單位
*/
@SneakyThrows
public void acquire(InterProcessLock lock, long time, TimeUnit unit) {
lock.acquire(time, unit);
}
/**
* 釋放鎖
*
* @param lock 分布式鎖對象
*/
@SneakyThrows
public void release(InterProcessLock lock) {
lock.release();
}
/**
* 檢查是否當(dāng)前jvm的線程獲取了鎖
*
* @param lock 分布式鎖對象
* @return true/false
*/
public boolean isAcquiredInThisProcess(InterProcessLock lock) {
return lock.isAcquiredInThisProcess();
}
/**
* 獲取讀寫鎖
*
* @param path 節(jié)點(diǎn)名稱
* @return 讀寫鎖
*/
public InterProcessReadWriteLock getReadWriteLock(String path) {
return new InterProcessReadWriteLock(client, path);
}
/**
* 監(jiān)聽數(shù)據(jù)節(jié)點(diǎn)的變化情況
*
* @param path 節(jié)點(diǎn)名稱
* @param listener 監(jiān)聽器
* @return 監(jiān)聽節(jié)點(diǎn)的TreeCache實(shí)例
*/
@SneakyThrows
public CuratorCache watch(String path, CuratorCacheListener listener) {
CuratorCache curatorCache = CuratorCache.builder(client, path).build();
curatorCache.listenable().addListener(listener);
curatorCache.start();
return curatorCache;
}
/**
* 監(jiān)聽數(shù)據(jù)節(jié)點(diǎn)的變化情況
*
* @param path 節(jié)點(diǎn)名稱
* @param listener 監(jiān)聽器
* @return 監(jiān)聽節(jié)點(diǎn)的TreeCache實(shí)例
*/
public CuratorCache watch(String path, CuratorCacheListener listener, Executor executor) {
CuratorCache curatorCache = CuratorCache.builder(client, path).build();
curatorCache.listenable().addListener(listener, executor);
curatorCache.start();
return curatorCache;
}
/**
* 取消監(jiān)聽節(jié)點(diǎn)
*
* @param path 節(jié)點(diǎn)名稱
* @param listener 監(jiān)聽器
*/
public void unwatch(String path, CuratorCacheListener listener) {
CuratorCache curatorCache = CuratorCache.builder(client, path).build();
curatorCache.listenable().removeListener(listener);
}
}
6.7 AppController
package com.ldx.zookeeper.controller;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ldx.zookeeper.util.CuratorClient;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.springframework.web.bind.annotation.*;
/**
* demo
*
* @author ludangxin
* @date 2021/8/30
*/
@Slf4j
@RestController
@RequestMapping("app")
@RequiredArgsConstructor
public class AppController {
private final CuratorClient curatorClient;
@GetMapping("{appName}")
public String getData(@PathVariable String appName) {
return curatorClient.getNodeData(setPrefix(appName));
}
@PostMapping("{appName}")
public String addApp(@PathVariable String appName, @RequestParam String data) {
curatorClient.createNode(CreateMode.PERSISTENT, setPrefix(appName), data);
return "ok";
}
@PostMapping("{appName}/{childName}")
public String addAppChild(@PathVariable String appName, @PathVariable String childName, @RequestParam String data) {
curatorClient.createNode(CreateMode.PERSISTENT, setPrefix(appName).concat(setPrefix(childName)), data);
return "ok";
}
@PutMapping("{appName}")
public String setData(@PathVariable String appName, String data) {
curatorClient.setNodeData(setPrefix(appName), data);
return "ok";
}
@PutMapping("{appName}/{childName}")
public String setData(@PathVariable String appName, @PathVariable String childName, String data) {
curatorClient.setNodeData(setPrefix(appName).concat(setPrefix(childName)), data);
return "ok";
}
@DeleteMapping("{appName}")
public String delApp(@PathVariable String appName) {
curatorClient.deleteNode(setPrefix(appName));
return "ok";
}
@PostMapping("{appName}/watch/dir")
public String watchAppDir(@PathVariable String appName) {
CuratorCacheListener listener = CuratorCacheListener.builder().forDeletes(obj -> {
String path = obj.getPath();
String data = new String(obj.getData());
Stat stat = obj.getStat();
log.info("節(jié)點(diǎn):{} 被刪除,節(jié)點(diǎn)數(shù)據(jù):{},節(jié)點(diǎn)狀態(tài):\\{version:{},createTime:{}\\}", path, data,
stat.getVersion(), stat.getCtime());
}).build();
curatorClient.watch(setPrefix(appName), listener);
return "ok";
}
@PostMapping("{appName}/watch/data")
public String watchAppData(@PathVariable String appName) {
ObjectMapper mapper = new ObjectMapper();
CuratorCacheListener listener = CuratorCacheListener.builder().forChanges((oldNode, newNode) -> {
try {
String path = oldNode.getPath();
log.info("節(jié)點(diǎn):{} 被修改,修改前:{} ", path, mapper.writeValueAsString(oldNode));
log.info("節(jié)點(diǎn):{} 被修改,修改后:{} ", path, mapper.writeValueAsString(newNode));
} catch(JsonProcessingException e) {
e.printStackTrace();
}
}).build();
curatorClient.watch(setPrefix(appName), listener);
return "ok";
}
private String setPrefix(String appName) {
String prefix = "/";
if(!appName.startsWith(prefix)) {
appName = prefix.concat(appName);
}
return appName;
}
}
7. ZAB 協(xié)議介紹
ZAB(ZooKeeper Atomic Broadcast 原子廣播) 協(xié)議是為分布式協(xié)調(diào)服務(wù) ZooKeeper 專門設(shè)計(jì)的一種支持崩潰恢復(fù)的原子廣播協(xié)議。 在 ZooKeeper 中,主要依賴 ZAB 協(xié)議來實(shí)現(xiàn)分布式數(shù)據(jù)一致性,基于該協(xié)議,ZooKeeper 實(shí)現(xiàn)了一種主備模式的系統(tǒng)架構(gòu)來保持集群中各個(gè)副本之間的數(shù)據(jù)一致性。
ZAB 協(xié)議兩種基本的模式:崩潰恢復(fù)和消息廣播
ZAB協(xié)議包括兩種基本的模式,分別是 崩潰恢復(fù)和消息廣播。當(dāng)整個(gè)服務(wù)框架在啟動(dòng)過程中,或是當(dāng) Leader 服務(wù)器出現(xiàn)網(wǎng)絡(luò)中斷、崩潰退出與重啟等異常情況時(shí),ZAB 協(xié)議就會進(jìn)人恢復(fù)模式并選舉產(chǎn)生新的Leader服務(wù)器。當(dāng)選舉產(chǎn)生了新的 Leader 服務(wù)器,同時(shí)集群中已經(jīng)有過半的機(jī)器與該Leader服務(wù)器完成了狀態(tài)同步之后,ZAB協(xié)議就會退出恢復(fù)模式。其中,所謂的狀態(tài)同步是指數(shù)據(jù)同步,用來保證集群中存在過半的機(jī)器能夠和Leader服務(wù)器的數(shù)據(jù)狀態(tài)保持一致。
當(dāng)集群中已經(jīng)有過半的Follower服務(wù)器完成了和Leader服務(wù)器的狀態(tài)同步,那么整個(gè)服務(wù)框架就可以進(jìn)人消息廣播模式了。 當(dāng)一臺同樣遵守ZAB協(xié)議的服務(wù)器啟動(dòng)后加人到集群中時(shí),如果此時(shí)集群中已經(jīng)存在一個(gè)Leader服務(wù)器在負(fù)責(zé)進(jìn)行消息廣播,那么新加人的服務(wù)器就會自覺地進(jìn)人數(shù)據(jù)恢復(fù)模式:找到Leader所在的服務(wù)器,并與其進(jìn)行數(shù)據(jù)同步,然后一起參與到消息廣播流程中去。正如上文介紹中所說的,ZooKeeper設(shè)計(jì)成只允許唯一的一個(gè)Leader服務(wù)器來進(jìn)行事務(wù)請求的處理。Leader服務(wù)器在接收到客戶端的事務(wù)請求后,會生成對應(yīng)的事務(wù)提案并發(fā)起一輪廣播協(xié)議;而如果集群中的其他機(jī)器接收到客戶端的事務(wù)請求,那么這些非Leader服務(wù)器會首先將這個(gè)事務(wù)請求轉(zhuǎn)發(fā)給Leader服務(wù)器。
以上就是詳解Zookeeper基礎(chǔ)知識的詳細(xì)內(nèi)容,更多關(guān)于Zookeeper基礎(chǔ)的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java日常練習(xí)題,每天進(jìn)步一點(diǎn)點(diǎn)(6)
下面小編就為大家?guī)硪黄狫ava基礎(chǔ)的幾道練習(xí)題(分享)。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧,希望可以幫到你2021-07-07
java中@DateTimeFormat和@JsonFormat注解的使用
本文主要介紹了java中@DateTimeFormat和@JsonFormat注解的使用,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-08-08
SpringBoot之logback-spring.xml不生效的解決方法
這篇文章主要介紹了SpringBoot之logback-spring.xml不生效的解決方法,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-01-01
Java簡單統(tǒng)計(jì)字符串中漢字,英文字母及數(shù)字?jǐn)?shù)量的方法
這篇文章主要介紹了Java簡單統(tǒng)計(jì)字符串中漢字,英文字母及數(shù)字?jǐn)?shù)量的方法,涉及java針對字符串的遍歷、編碼轉(zhuǎn)換、判斷等相關(guān)操作技巧,需要的朋友可以參考下2017-06-06
解讀jdk動(dòng)態(tài)代理為什么必須實(shí)現(xiàn)接口
這篇文章主要介紹了解讀jdk動(dòng)態(tài)代理為什么必須實(shí)現(xiàn)接口問題,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-02-02
idea輸入sout無法自動(dòng)補(bǔ)全System.out.println()的問題
這篇文章主要介紹了idea輸入sout無法自動(dòng)補(bǔ)全System.out.println()的問題,本文給大家分享解決方案,供大家參考,需要的朋友可以參考下2020-07-07

