Java Spring Boot 集成Zookeeper
集成步驟
1.pom.xml文件配置,引入相關(guān)jar包
Curator是Netflix公司開源的一套zookeeper客戶端框架,解決了很多Zookeeper客戶端非常底層的細(xì)節(jié)開發(fā)工作,包括連接重連、反復(fù)注冊Watcher和NodeExistsException異常等等。
<!-- 封裝了一些高級(jí)特性,如:Cache事件監(jiān)聽、選舉、分布式鎖、分布式Barrier --> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.10.0</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.13</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
特殊說明: 1.無需引入curator-framework,因?yàn)閏urator-recipes自動(dòng)關(guān)聯(lián)依賴引入curator-framework。 2.curator會(huì)默認(rèn)引入zookeeper的jar報(bào),需要檢查版本與服務(wù)器的版本是否一致,如果不一致則需要排除引入 3.
2. 核心配置類
@Configuration public class ZookeeperConfig implements Serializable { private static final long serialVersionUID = -9025878246972668136L; private final ZooKeeperProperty zooKeeperProperty; public ZookeeperConfig(ZooKeeperProperty zooKeeperProperty) { this.zooKeeperProperty = zooKeeperProperty; } @Bean public CuratorFramework curatorFramework() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(zooKeeperProperty.getBaseSleepTime(), zooKeeperProperty.getMaxRetries()); CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(zooKeeperProperty.getServers()) .connectionTimeoutMs(zooKeeperProperty.getConnectionTimeout()) .sessionTimeoutMs(zooKeeperProperty.getSessionTimeout()) .retryPolicy(retryPolicy) .build(); client.start(); return client; } @Bean @ConditionalOnMissingBean public ZooKeeperUtils zooKeeperTemplate(CuratorFramework client) { return new ZooKeeperUtils(client); } } @ConfigurationProperties(prefix="zookeeper") @Component public class ZooKeeperProperty implements Serializable { private static final long serialVersionUID = 8650758711482699256L; /** * zk連接集群,多個(gè)用逗號(hào)隔開 */ private String servers; /** * 會(huì)話超時(shí)時(shí)間 */ private int sessionTimeout = 60000; /** * 連接超時(shí)時(shí)間 */ private int connectionTimeout = 15000; /** * 初始重試等待時(shí)間(毫秒) */ private int baseSleepTime = 1000; /** * 重試最大次數(shù) */ private int maxRetries = 10; //省略get、set方法 ...... }
3.常用API功能
@Component public class ZooKeeperUtils { private static final Logger logger = LoggerFactory .getLogger(ZooKeeperUtils.class); /** * 路徑分隔符 */ private static final String PATH_SEPARATOR = "/"; /** * zk連接 */ private final CuratorFramework client; public ZooKeeperUtils(CuratorFramework client) { this.client = client; } /** * 創(chuàng)建空節(jié)點(diǎn),默認(rèn)持久節(jié)點(diǎn) * * @param path * 節(jié)點(diǎn)路徑 * @param node * 節(jié)點(diǎn)名稱 * @return 完整路徑 */ public String createNode(String path, String node) { return createNode(path, node, CreateMode.PERSISTENT); } /** * 創(chuàng)建帶類型的空節(jié)點(diǎn) * * @param path * 節(jié)點(diǎn)路徑 * @param node * 節(jié)點(diǎn)名稱 * @param createMode * 類型 CreateMode.PERSISTENT: 創(chuàng)建節(jié)點(diǎn)后,不刪除就永久存在 * CreateMode.PERSISTENT_SEQUENTIAL:節(jié)點(diǎn)path末尾會(huì)追加一個(gè)10位數(shù)的單調(diào)遞增的序列 * CreateMode.EPHEMERAL:創(chuàng)建后,回話結(jié)束節(jié)點(diǎn)會(huì)自動(dòng)刪除 * CreateMode.EPHEMERAL_SEQUENTIAL:節(jié)點(diǎn)path末尾會(huì)追加一個(gè)10位數(shù)的單調(diào)遞增的序列 * @return 路徑 */ public String createNode(String path, String node, CreateMode createMode) { path = buildPath(path, node); logger.info("create node for path: {} with createMode: {}", path, createMode.name()); try { client.create().creatingParentsIfNeeded().withMode(createMode) .forPath(path); logger.info("create node :{} sucessfully", node); return path; } catch (Exception e) { logger.error( "create node for path: {} with createMode: {} failed!", path, createMode.name(), e); return null; } } /** * 創(chuàng)建節(jié)點(diǎn),默認(rèn)持久節(jié)點(diǎn) * * @param path * 節(jié)點(diǎn)路徑 * @param node * 節(jié)點(diǎn)名稱 * @param value * 節(jié)點(diǎn)值 * @return 完整路徑 */ public String createNode(String path, String node, String value) { return createNode(path, node, value, CreateMode.PERSISTENT); } /** * 創(chuàng)建節(jié)點(diǎn),默認(rèn)持久節(jié)點(diǎn) * * @param path * 節(jié)點(diǎn)路徑 * @param node * 節(jié)點(diǎn)名稱 * @param value * 節(jié)點(diǎn)值 * @param createMode * 節(jié)點(diǎn)類型 * @return 完整路徑 */ public String createNode(String path, String node, String value, CreateMode createMode) { if (Objects.isNull(value)) { logger.error("ZooKeeper節(jié)點(diǎn)值不能為空!"); } path = buildPath(path, node); logger.info("create node for path: {}, value: {}, with createMode: {}", path, value, createMode.name()); try { client.create().creatingParentsIfNeeded().withMode(createMode) .forPath(path, value.getBytes()); return path; } catch (Exception e) { logger.error( "create node for path: {}, value: {}, with createMode: {} failed!", path, value, createMode.name(), e); } return null; } /** * 獲取節(jié)點(diǎn)數(shù)據(jù) * * @param path * 路徑 * @param node * 節(jié)點(diǎn)名稱 * @return 完整路徑 */ public String get(String path, String node) { path = buildPath(path, node); try { byte[] bytes = client.getData().forPath(path); if (bytes.length > 0) { return new String(bytes); } } catch (Exception e) { logger.error("get value for path: {}, node: {} failed!", path, node, e); } return null; } /** * 更新節(jié)點(diǎn)數(shù)據(jù) * * @param path * 節(jié)點(diǎn)路徑 * @param node * 節(jié)點(diǎn)名稱 * @param value * 更新值 * @return 完整路徑 */ public String update(String path, String node, String value) { if (Objects.isNull(value)) { logger.error("ZooKeeper節(jié)點(diǎn)值不能為空!"); } path = buildPath(path, node); logger.info("update path: {} to value: {}", path, value); try { client.setData().forPath(path, value.getBytes()); return path; } catch (Exception e) { logger.error("update path: {} to value: {} failed!", path, value); } return null; } /** * 刪除節(jié)點(diǎn),并且遞歸刪除子節(jié)點(diǎn) * * @param path * 路徑 * @param node * 節(jié)點(diǎn)名稱 * @return 路徑 */ public boolean delete(String path, String node) { path = buildPath(path, node); logger.info("delete node for path: {}", path); try { client.delete().deletingChildrenIfNeeded().forPath(path); return true; } catch (Exception e) { logger.error("delete node for path: {} failed!", path); } return false; } /** * 獲取子節(jié)點(diǎn) * * @param path * 節(jié)點(diǎn)路徑 * @return */ public List<String> getChildren(String path) { if (StringUtils.isEmpty(path)) { return null; } if (!path.startsWith(PATH_SEPARATOR)) { path = PATH_SEPARATOR + path; } try { return client.getChildren().forPath(path); } catch (Exception e) { logger.error("get children path:{} error", path, e); } return null; } /** * 判斷節(jié)點(diǎn)是否存在 * * @param path * 路徑 * @param node * 節(jié)點(diǎn)名稱 * @return 結(jié)果 */ public boolean exists(String path, String node) { try { List<String> list = getChildren(path); return !CollectionUtils.isEmpty(list) && list.contains(node); } catch (Exception e) { return false; } } /** * 申請鎖,指定請求等待時(shí)間 * * @param path * 加鎖zk節(jié)點(diǎn) * @param time * 時(shí)間 * @param unit * 時(shí)間單位 * @param runnable * 執(zhí)行方法 */ public void lock(String path, long time, TimeUnit unit, Runnable runnable) { try { InterProcessMutex lock = new InterProcessMutex(client, path); if (lock.acquire(time, unit)) { try { runnable.run(); } finally { lock.release(); } } else { logger.error("獲取鎖超時(shí):{}!", path); } } catch (Exception e) { logger.error("獲取鎖失敗: {}!", path); } } /** * 申請鎖,指定請求等待時(shí)間 * * @param path * 加鎖zk節(jié)點(diǎn) * @param time * 時(shí)間 * @param unit * 時(shí)間單位 * @param callable * 執(zhí)行方法 * @return . */ public <T> T lock(String path, long time, TimeUnit unit, Callable<T> callable) { try { InterProcessMutex lock = new InterProcessMutex(client, path); if (lock.acquire(time, unit)) { try { return callable.call(); } finally { lock.release(); } } else { logger.error("獲取鎖超時(shí):{}!", path); } } catch (Exception e) { logger.error("獲取鎖失敗: {}!", path); } return null; } /* *//** * 對一個(gè)節(jié)點(diǎn)進(jìn)行監(jiān)聽,監(jiān)聽事件包括指定的路徑節(jié)點(diǎn)的增、刪、改的操作 * * @param path * 節(jié)點(diǎn)路徑 * @param listener * 回調(diào)方法 * @throws Exception */ public void watchNode(String path,boolean dataIsCompressed,final ZooKeeperCallback zooKeeperCallback)throws Exception { try { final NodeCache nodeCache = new NodeCache(client, path,dataIsCompressed); nodeCache.getListenable().addListener(new NodeCacheListener() { public void nodeChanged() throws Exception { ChildData childData = nodeCache.getCurrentData(); logger.info("ZNode節(jié)點(diǎn)狀態(tài)改變, path={}", childData.getPath()); logger.info("ZNode節(jié)點(diǎn)狀態(tài)改變, data={}", childData.getData()); logger.info("ZNode節(jié)點(diǎn)狀態(tài)改變, stat={}", childData.getStat()); //處理業(yè)務(wù)邏輯 zooKeeperCallback.call(); } }); nodeCache.start(); } catch (Exception e) { logger.error("創(chuàng)建NodeCache監(jiān)聽失敗, path={}",path); } } /** * 對指定的路徑節(jié)點(diǎn)的一級(jí)子目錄進(jìn)行監(jiān)聽,不對該節(jié)點(diǎn)的操作進(jìn)行監(jiān)聽,對其子目錄的節(jié)點(diǎn)進(jìn)行增、刪、改的操作監(jiān)聽 * * @param path * 節(jié)點(diǎn)路徑 * @param listener * 回調(diào)方法 */ public void watchChildren(String path, PathChildrenCacheListener listener) { try { PathChildrenCache pathChildrenCache = new PathChildrenCache(client, path, true); pathChildrenCache.start(PathChildrenCache.StartMode.NORMAL); pathChildrenCache.getListenable().addListener(listener); } catch (Exception e) { logger.error("watch children failed for path: {}", path, e); } } /** * 將指定的路徑節(jié)點(diǎn)作為根節(jié)點(diǎn)(祖先節(jié)點(diǎn)),對其所有的子節(jié)點(diǎn)操作進(jìn)行監(jiān)聽,呈現(xiàn)樹形目錄的監(jiān)聽,可以設(shè)置監(jiān)聽深度,最大監(jiān)聽深度為2147483647( * int類型的最大值) * * @param path * 節(jié)點(diǎn)路徑 * @param maxDepth * 回調(diào)方法 * @param listener * 監(jiān)聽 */ public void watchTree(String path, int maxDepth, TreeCacheListener listener) { try { TreeCache treeCache = TreeCache.newBuilder(client, path) .setMaxDepth(maxDepth).build(); treeCache.start(); treeCache.getListenable().addListener(listener); } catch (Exception e) { logger.error("watch tree failed for path: {}", path, e); } } public String buildPath(String path, String node) { if (StringUtils.isEmpty(path) || StringUtils.isEmpty(node)) { logger.error("ZooKeeper路徑或者節(jié)點(diǎn)名稱不能為空!"); } if (!path.startsWith(PATH_SEPARATOR)) { path = PATH_SEPARATOR + path; } if (PATH_SEPARATOR.equals(path)) { return path + node; } else { return path + PATH_SEPARATOR + node; } } }
4.基本使用
@Autowired private ZooKeeperUtils zooKeeperUtil; @RequestMapping("/addNode") public String addNode() { String path= zooKeeperUtil.createNode("/zookeeper", "node1"); return path; }
特殊說明:關(guān)于zookeeper的分布式鎖,后續(xù)講解常用分布式鎖的時(shí)候,會(huì)詳細(xì)說明。
常見錯(cuò)誤和解決辦法
問題1:調(diào)用api創(chuàng)建zookeeper節(jié)點(diǎn)時(shí),報(bào)KeeperErrorCode = Unimplemented for /test錯(cuò)誤。
原因:服務(wù)器安裝zookeeper的版本與程序中的zookeeper版本不一致。
解決方案: 登錄服務(wù)器,查看zookeeper安裝版本,執(zhí)行如下命令:
echo stat|nc 127.0.0.1 2181
當(dāng)前引入的zookeeper版本為3.4.13,而zookeeper的版本與curator對應(yīng)關(guān)系如下:
Curator 2.x.x - compatible with both ZooKeeper 3.4.x and ZooKeeper 3.5.x Curator 4.x.x - compatible only with ZooKeeper 3.5.x and includes support for new features such as dynamic reconfiguration, etc. Curator 5.x.x compatible only with ZooKeeper 3.6.x+
問題2:啟動(dòng)項(xiàng)目的日志中會(huì)有Will not attempt to authenticate using SASL錯(cuò)誤
起初我認(rèn)為是zookeeper需要進(jìn)行SASL認(rèn)證,但是通過查閱相關(guān)資料后,才知道3.4之前版本,zookeeper默認(rèn)會(huì)采用SASL認(rèn)證,3.4以后的版本沒有此類問題。
到此這篇關(guān)于Java Spring Boot 集成Zookeeper的文章就介紹到這了,更多相關(guān)Spring Boot 集成Zookeeper內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- 淺談Java(SpringBoot)基于zookeeper的分布式鎖實(shí)現(xiàn)
- 使用dubbo+zookeeper+spring boot構(gòu)建服務(wù)的方法詳解
- SpringBoot中dubbo+zookeeper實(shí)現(xiàn)分布式開發(fā)的應(yīng)用詳解
- SpringBoot集成Curator實(shí)現(xiàn)Zookeeper基本操作的代碼示例
- SpringBoot系列教程之dubbo和Zookeeper集成方法
- SpringBoot整合Dubbo+Zookeeper實(shí)現(xiàn)RPC調(diào)用
- springboot應(yīng)用訪問zookeeper的流程
- SpringBoot整合Zookeeper詳細(xì)教程
- SpringBoot讀取ZooKeeper(ZK)屬性的方法實(shí)現(xiàn)
相關(guān)文章
Java使用FutureTask實(shí)現(xiàn)預(yù)加載的示例詳解
基于FutureTask的特性,通??梢允褂肍utureTask做一些預(yù)加載工作,比如一些時(shí)間較長的計(jì)算等,本文就來和大家講講具體實(shí)現(xiàn)方法吧,感興趣的可以了解一下2023-06-06Spring boot2基于Mybatis實(shí)現(xiàn)多表關(guān)聯(lián)查詢
這篇文章主要介紹了Spring boot2基于Mybatis實(shí)現(xiàn)多表關(guān)聯(lián)查詢,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-04-04java開發(fā)使用StringUtils.split避坑詳解
這篇文章主要為大家介紹了java開發(fā)使用StringUtils.split避坑詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-11-11