java連接zookeeper實現(xiàn)zookeeper教程
java連接zookeeper實現(xiàn)zookeeper
Java服務(wù)端連接Zookeeper,進行節(jié)點信息的獲取,管理…整理成一個基本工具
添加依賴:
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.3.6</version> </dependency>
具體代碼如下:
package com; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; public class BaseZookeeper implements Watcher{ private ZooKeeper zookeeper; /** * 超時時間 */ private static final int SESSION_TIME_OUT = 2000; private CountDownLatch countDownLatch = new CountDownLatch(1); @Override public void process(WatchedEvent event) { if (event.getState() == KeeperState.SyncConnected) { System.out.println("Watch received event"); countDownLatch.countDown(); } } /**連接zookeeper * @param host * @throws Exception */ public void connectZookeeper(String host) throws Exception{ zookeeper = new ZooKeeper(host, SESSION_TIME_OUT, this); countDownLatch.await(); System.out.println("zookeeper connection success"); } /** * 創(chuàng)建節(jié)點 * @param path * @param data * @throws Exception */ public String createNode(String path,String data) throws Exception{ return this.zookeeper.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } /** * 獲取路徑下所有子節(jié)點 * @param path * @return * @throws KeeperException * @throws InterruptedException */ public List<String> getChildren(String path) throws KeeperException, InterruptedException{ List<String> children = zookeeper.getChildren(path, false); return children; } /** * 獲取節(jié)點上面的數(shù)據(jù) * @param path 路徑 * @return * @throws KeeperException * @throws InterruptedException */ public String getData(String path) throws KeeperException, InterruptedException{ byte[] data = zookeeper.getData(path, false, null); if (data == null) { return ""; } return new String(data); } /** * 設(shè)置節(jié)點信息 * @param path 路徑 * @param data 數(shù)據(jù) * @return * @throws KeeperException * @throws InterruptedException */ public Stat setData(String path,String data) throws KeeperException, InterruptedException{ Stat stat = zookeeper.setData(path, data.getBytes(), -1); return stat; } /** * 刪除節(jié)點 * @param path * @throws InterruptedException * @throws KeeperException */ public void deleteNode(String path) throws InterruptedException, KeeperException{ zookeeper.delete(path, -1); } /** * 獲取創(chuàng)建時間 * @param path * @return * @throws KeeperException * @throws InterruptedException */ public String getCTime(String path) throws KeeperException, InterruptedException{ Stat stat = zookeeper.exists(path, false); return String.valueOf(stat.getCtime()); } /** * 獲取某個路徑下孩子的數(shù)量 * @param path * @return * @throws KeeperException * @throws InterruptedException */ public Integer getChildrenNum(String path) throws KeeperException, InterruptedException{ int childenNum = zookeeper.getChildren(path, false).size(); return childenNum; } /** * 關(guān)閉連接 * @throws InterruptedException */ public void closeConnection() throws InterruptedException{ if (zookeeper != null) { zookeeper.close(); } } }
測試:
public class Demo { public static void main(String[] args) throws Exception { BaseZookeeper zookeeper = new BaseZookeeper(); zookeeper.connectZookeeper("192.168.0.1:2181"); List<String> children = zookeeper.getChildren("/"); System.out.println(children); } }
ZookeeperJavaAPI基本操作
Zookeeper官方提供了兩種語言的API,Java和C,在這里只演示JavaAPI
操作API的類中的變量,一下方法都會使用到
static Logger logg = LoggerFactory.getLogger(ZKApi.class); private static final String zkServerPath = "10.33.57.28:2181"; private static final String zkServerPath = "127.0.0.1:2181"; private static final Integer timeOut = 5000; private static Stat stat = new Stat();
以及實現(xiàn)接口Watcher的實現(xiàn)方法process
public void process(WatchedEvent event) { try { if (event.getType() == Event.EventType.NodeDataChanged) { ZooKeeper zk = null; zk = ZKApi.getZkConnect(); byte[] resByt = new byte[0]; resByt = zk.getData("/test1", false, stat); String resStr = new String(resByt); System.out.println("更改后的值:" + resStr); System.out.println("版本號的變化:" + stat.getVersion()); System.out.println("-------"); countDown.countDown(); }else if(event.getType() == Event.EventType.NodeChildrenChanged){ System.out.println("NodeChildrenChanged"); ZooKeeper zk = null; zk = ZKApi.getZkConnect(); List<String> srcChildList = zk.getChildren(event.getPath(), false); for (String child:srcChildList){ System.out.println(child); } countDown.countDown(); }else if(event.getType() == Event.EventType.NodeCreated){ countDown.countDown(); }else if (event.getType() == Event.EventType.NodeCreated){ countDown.countDown(); } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } }
1.連接客戶端
創(chuàng)建客戶端連接使用Zookeeper類的構(gòu)造函數(shù)
Zookeeper構(gòu)造函數(shù)總共四個如下:
/* @param connectString zk連接地址以及端口號 格式如:127.0.0.1:2181,如果多個zk,則使用逗號分隔 @param sessionTimeout session超時時間 單位ms @param watcher 監(jiān)聽器,使用watcher必須實現(xiàn)接口Watcher實現(xiàn)process方法 @sessionId session id 可以用作恢復(fù)回話的參數(shù) @sessionPassword session password 可以用作恢復(fù)回話的參數(shù) @canbeReadOnly zk3.4添加的 只讀模式 * */ public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd) public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)
連接客戶端代碼
public static ZooKeeper getZkConnect() throws IOException { ZooKeeper zk = new ZooKeeper(zkServerPath, timeOut, new ZKApi()); logg.debug("連接狀態(tài):{}", zk.getState()); return zk; }
DEBUG [main] - zookeeper.disableAutoWatchReset is false DEBUG [main] - 連接狀態(tài):CONNECTING
2.恢復(fù)回話
public static void recoveryConnect() throws IOException, InterruptedException { ZooKeeper zooKeeper = new ZooKeeper(zkServerPath, timeOut, new ZKApi()); long sessionId = zooKeeper.getSessionId(); byte[] sessionPasswd = zooKeeper.getSessionPasswd(); logg.debug("開始連接服務(wù)器 . . ."); logg.debug("連接狀態(tài):{}",zooKeeper.getState()); new Thread().sleep(1000 ); logg.debug("開始重連 . . . "); ZooKeeper zooSession = new ZooKeeper(zkServerPath, timeOut, new ZKApi(), sessionId, sessionPasswd); logg.debug("重連狀態(tài):{}",zooSession.getState()); new Thread().sleep(200); logg.debug("重連狀態(tài):{}",zooSession.getState()); }
DEBUG [main] - 開始連接服務(wù)器 . . . DEBUG [main] - 連接狀態(tài):CONNECTING DEBUG [main-SendThread(hdfa67:2181)] - Canonicalized address to hdfa67 INFO [main-SendThread(hdfa67:2181)] - Opening socket connection to server hdfa67/10.33.57.67:2181. Will not attempt to authenticate using SASL (unknown error) INFO [main-SendThread(hdfa67:2181)] - Socket connection established to hdfa67/10.33.57.67:2181, initiating session DEBUG [main-SendThread(hdfa67:2181)] - Session establishment request sent on hdfa67/10.33.57.67:2181 INFO [main-SendThread(hdfa67:2181)] - Session establishment complete on server hdfa67/10.33.57.67:2181, sessionid = 0x10000ea59aa0011, negotiated timeout = 5000 DEBUG [main] - 開始重連 . . . INFO [main] - Initiating client connection, connectString=10.33.57.67:2181 sessionTimeout=5000 watcher=ZKApi@73a28541 sessionId=0 sessionPasswd=<hidden> DEBUG [main] - 重連狀態(tài):CONNECTING DEBUG [main-SendThread(hdfa67:2181)] - Canonicalized address to hdfa67 INFO [main-SendThread(hdfa67:2181)] - Opening socket connection to server hdfa67/10.33.57.67:2181. Will not attempt to authenticate using SASL (unknown error) INFO [main-SendThread(hdfa67:2181)] - Socket connection established to hdfa67/10.33.57.67:2181, initiating session DEBUG [main-SendThread(hdfa67:2181)] - Session establishment request sent on hdfa67/10.33.57.67:2181 INFO [main-SendThread(hdfa67:2181)] - Session establishment complete on server hdfa67/10.33.57.67:2181, sessionid = 0x10000ea59aa0012, negotiated timeout = 5000 DEBUG [main] - 重連狀態(tài):CONNECTED
3.創(chuàng)建節(jié)點
創(chuàng)建節(jié)點通過zk客戶端對象的create方法進行創(chuàng)建,主要有兩個方法:一種是同步,一種是異步,接下來的修改等方法同樣如此,就不多加解釋了
/** @param path 創(chuàng)建的節(jié)點路徑 @param data 節(jié)點數(shù)據(jù) @param acl 權(quán)限列表, @param createMode 指定之創(chuàng)建節(jié)點的類型 @param cb 異步調(diào)用方法 @param ctx 回調(diào)對象 */ public String create(final String path, byte data[], List<ACL> acl, CreateMode createMode) public void create(final String path, byte data[], List<ACL> acl, CreateMode createMode, StringCallback cb, Object ctx)
public static void createZkNode1() throws IOException, KeeperException, InterruptedException { ZooKeeper zk = getZkConnect(); String result = zk.create("/test1", "test-data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);//創(chuàng)建一個/test的持續(xù)節(jié)點 System.out.println(result); //輸出/test1 public static void createZkNode2() throws IOException, KeeperException, InterruptedException { ZooKeeper zk = getZkConnect(); String ctx = "{'create': 'success'}"; zk.create("/test2", "test-data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,new CreateCallBack() ,ctx); new Thread().sleep(2000);//需要暫停一會,否則創(chuàng)建失敗 }
4.修改節(jié)點
public Stat setData(final String path, byte data[], int version) public void setData(final String path, byte data[], int version, StatCallback cb, Object ctx)
public static void setZkNode1() throws IOException, KeeperException, InterruptedException{ ZooKeeper zk = getZkConnect(); Stat stat = zk.setData("/test1", "modifyed-data".getBytes(), 0); System.out.println(stat.getVersion()); } public static void setZkNode2() throws IOException, KeeperException, InterruptedException{ ZooKeeper zk = getZkConnect(); String ctx = "{'modify': 'success'}"; zk.setData("/test1", "modifyed-data".getBytes(),0,new ModifyCalback(),ctx); new Thread().sleep(1000);//必須加上,否則回掉不成功 }
5.刪除節(jié)點
public void delete(final String path, int version) public void delete(final String path, int version, VoidCallback cb, Object ctx)
public static void deleteZkNode1() throws IOException, KeeperException, InterruptedException { ZooKeeper zk = getZkConnect(); zk.delete("/test1",1);//不能夠刪除子節(jié)點 } public static void deleteZkNode2() throws IOException, InterruptedException { ZooKeeper zk = getZkConnect(); String ctx = "{'delete': 'success'}"; zk.delete("/test2",0,new DeleteCallBack(),ctx);//不能夠刪除子節(jié)點 new Thread().sleep(1000);//必須加上,否則回掉不成功 }
6.查詢節(jié)點
public byte[] getData(String path, boolean watch, Stat stat) public byte[] getData(final String path, Watcher watcher, Stat stat) public void getData(final String path, Watcher watcher,DataCallback cb, Object ctx) public void getData(String path, boolean watch, DataCallback cb, Object ctx)
public static CountDownLatch countDown = new CountDownLatch(1); public static void selectData1() throws IOException, KeeperException, InterruptedException { ZooKeeper zk = getZkConnect(); byte[] data = zk.getData("/test1", true, stat); String s = new String(data); System.out.println("value: "+s); countDown.await(); }
if (event.getType() == Event.EventType.NodeDataChanged) { ZooKeeper zk = null; zk = ZKApi.getZkConnect(); byte[] resByt = new byte[0]; resByt = zk.getData("/test1", false, stat); String resStr = new String(resByt); System.out.println("更改后的值:" + resStr); System.out.println("版本號的變化:" + stat.getVersion()); System.out.println("-------"); countDown.countDown();
由于更改之后,觸發(fā)了監(jiān)聽器,再次在命令行中進行更改,出現(xiàn)了一下結(jié)果。
7.查詢子節(jié)點
查詢子節(jié)點的方法
public List<String> getChildren(final String path, Watcher watcher) public List<String> getChildren(String path, boolean watch) public void getChildren(final String path, Watcher watcher, ChildrenCallback cb, Object ctx) public void getChildren(String path, boolean watch, Children2Callback cb, Object ctx) public List<String> getChildren(final String path, Watcher watcher, Stat stat) public List<String> getChildren(String path, boolean watch, Stat stat) public void getChildren(final String path, Watcher watcher, Children2Callback cb, Object ctx) public void getChildren(String path, boolean watch, Children2Callback cb, Object ctx)
代碼實現(xiàn)
public static CountDownLatch countDown = new CountDownLatch(1); public static void selectchildData1() throws IOException, KeeperException, InterruptedException { ZooKeeper zk = getZkConnect(); List<String> srcChildList = zk.getChildren("/test", true, stat); for (String child:srcChildList){ System.out.println(child); } countDown.await(); }
if(event.getType() == Event.EventType.NodeChildrenChanged){ System.out.println("NodeChildrenChanged"); ZooKeeper zk = null; zk = ZKApi.getZkConnect(); List<String> srcChildList = zk.getChildren(event.getPath(), false); for (String child:srcChildList){ System.out.println(child); }
運行結(jié)果完成后,觸監(jiān)聽器,再次刪除test1
第二種異步方式實現(xiàn)
public static void selectchildData2() throws IOException, KeeperException, InterruptedException{ ZooKeeper zk = getZkConnect(); String ctx = "{'selectChild': 'success'}"; zk.getChildren("/test",false,new ChildrenCallback(),ctx); new Thread().sleep(1000); }
8.使用遞歸得到所有的節(jié)點
public static void selectchildData3() throws IOException, KeeperException, InterruptedException{ getChild("/"); } public static void getChild(String path) throws IOException, KeeperException, InterruptedException { System.out.println(path); ZooKeeper zk = getZkConnect(); List<String> childrenList = zk.getChildren(path, false, stat); if(childrenList.isEmpty() || childrenList ==null) return; for(String s:childrenList){ if(path.equals("/")) getChild(path+s); else { getChild(path+"/"+s); } } }
運行結(jié)果:
/zookeeper
/zookeeper/config
/zookeeper/quota
/ldd
/ldd/l
/loo
/t1
/test1
/seq
/seq/seq30000000002
/seq/seq20000000001
/seq/se0000000003
/seq/seq10000000000
9.判斷節(jié)點是否存在
public static void existNode() throws IOException, KeeperException, InterruptedException { ZooKeeper zk = getZkConnect(); Stat stat = zk.exists("/ff", true); System.out.println(stat); } //輸出null則不存在
10.自定義權(quán)限
public static void oneSelfACL() throws Exception { ZooKeeper zk = getZkConnect(); ArrayList<ACL> acls = new ArrayList<ACL>(); // zk.create("/test1","test-data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); //所有人均可訪問 Id id1 = new Id("digest", ACLUtils.getDigestUserPassword("id1:123456")); Id id2 = new Id("digest", ACLUtils.getDigestUserPassword("id2:123456")); // Id ipId = new Id("ip","127.0.0.1");ip設(shè)置 // acls.add(new ACL(ZooDefs.Perms.ALL,id1)); acls.add(new ACL(ZooDefs.Perms.ALL,id1)); acls.add(new ACL(ZooDefs.Perms.DELETE,id2)); //注冊過的用戶必須通過addAuthInfo才可以操作節(jié)點 zk.addAuthInfo("digest","id1:123456".getBytes()); zk.create("/test2","test2-data".getBytes(), acls,CreateMode.PERSISTENT); }
結(jié)果如下:
直接登錄id1由于在程序已經(jīng)注冊完成
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
java反射機制的一些學(xué)習(xí)心得小結(jié)
這篇文章主要給大家介紹了關(guān)于java反射機制的一些學(xué)習(xí)心得,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-02-02springcloud feign調(diào)其他微服務(wù)時參數(shù)是對象的問題
這篇文章主要介紹了springcloud feign調(diào)其他微服務(wù)時參數(shù)是對象的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-03-03Struts2單選按鈕詳解及枚舉類型的轉(zhuǎn)換代碼示例
這篇文章主要介紹了Struts2單選按鈕詳解及枚舉類型的轉(zhuǎn)換代碼示例,分享了相關(guān)代碼示例,小編覺得還是挺不錯的,具有一定借鑒價值,需要的朋友可以參考下2018-02-02解決JAVA非對稱加密不同系統(tǒng)加密結(jié)果不一致的問題
這篇文章主要介紹了解決JAVA非對稱加密不同系統(tǒng)加密結(jié)果不一致的問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-10-10SpringCloud集成Eureka并實現(xiàn)負載均衡的過程詳解
這篇文章主要給大家詳細介紹了SpringCloud集成Eureka并實現(xiàn)負載均衡的過程,文章通過代碼示例和圖文講解的非常詳細,對大家的學(xué)習(xí)或工作有一定的參考價值,需要的朋友可以參考下2023-11-11java實現(xiàn)合并2個文件中的內(nèi)容到新文件中
這篇文章主要介紹了java實現(xiàn)合并2個文件中的內(nèi)容到新文件中,思路非常不錯,這里推薦給大家。2015-03-03springboot+springsecurity如何實現(xiàn)動態(tài)url細粒度權(quán)限認證
這篇文章主要介紹了springboot+springsecurity如何實現(xiàn)動態(tài)url細粒度權(quán)限認證的操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-06-06詳解用JWT對SpringCloud進行認證和鑒權(quán)
這篇文章主要介紹了詳解用JWT對SpringCloud進行認證和鑒權(quán),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-03-03