欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

java連接zookeeper實現(xiàn)zookeeper教程

 更新時間:2021年11月11日 08:54:15   作者:長河  
這篇文章主要介紹了java連接zookeeper實現(xiàn)zookeeper教程,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教

java連接zookeeper實現(xiàn)zookeeper

Java服務(wù)端連接Zookeeper,進行節(jié)點信息的獲取,管理…整理成一個基本工具

添加依賴:

<dependency>
   <groupId>org.apache.zookeeper</groupId>
   <artifactId>zookeeper</artifactId>
   <version>3.3.6</version>
</dependency>

具體代碼如下:

package com; 
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;  
 
public class BaseZookeeper implements Watcher{ 
   private ZooKeeper zookeeper;
    /**
     * 超時時間
     */
   private static final int SESSION_TIME_OUT = 2000;
   private CountDownLatch countDownLatch = new CountDownLatch(1);
   @Override
   public void process(WatchedEvent event) {
      if (event.getState() == KeeperState.SyncConnected) {
         System.out.println("Watch received event");
         countDownLatch.countDown();
      }
   }   
  
   /**連接zookeeper
    * @param host
    * @throws Exception
    */
   public void connectZookeeper(String host) throws Exception{
      zookeeper = new ZooKeeper(host, SESSION_TIME_OUT, this);
      countDownLatch.await();
      System.out.println("zookeeper connection success");
   }
  
   /**
    * 創(chuàng)建節(jié)點
    * @param path
    * @param data
    * @throws Exception
    */
   public String createNode(String path,String data) throws Exception{
      return this.zookeeper.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
   }
  
   /**
    * 獲取路徑下所有子節(jié)點
    * @param path
    * @return
    * @throws KeeperException
    * @throws InterruptedException
    */
   public List<String> getChildren(String path) throws KeeperException, InterruptedException{
      List<String> children = zookeeper.getChildren(path, false);
      return children;
   }
  
   /**
    * 獲取節(jié)點上面的數(shù)據(jù)
    * @param path  路徑
    * @return
    * @throws KeeperException
    * @throws InterruptedException
    */
   public String getData(String path) throws KeeperException, InterruptedException{
      byte[] data = zookeeper.getData(path, false, null);
      if (data == null) {
         return "";
      }
      return new String(data);
   }
  
   /**
    * 設(shè)置節(jié)點信息
    * @param path  路徑
    * @param data  數(shù)據(jù)
    * @return
    * @throws KeeperException
    * @throws InterruptedException
    */
   public Stat setData(String path,String data) throws KeeperException, InterruptedException{
      Stat stat = zookeeper.setData(path, data.getBytes(), -1);
      return stat;
   }
  
   /**
    * 刪除節(jié)點
    * @param path
    * @throws InterruptedException
    * @throws KeeperException
    */
   public void deleteNode(String path) throws InterruptedException, KeeperException{
      zookeeper.delete(path, -1);
   }
  
   /**
    * 獲取創(chuàng)建時間
    * @param path
    * @return
    * @throws KeeperException
    * @throws InterruptedException
    */
   public String getCTime(String path) throws KeeperException, InterruptedException{
      Stat stat = zookeeper.exists(path, false);
      return String.valueOf(stat.getCtime());
   }
  
   /**
    * 獲取某個路徑下孩子的數(shù)量
    * @param path
    * @return
    * @throws KeeperException
    * @throws InterruptedException
    */
   public Integer getChildrenNum(String path) throws KeeperException, InterruptedException{
      int childenNum = zookeeper.getChildren(path, false).size();
      return childenNum;
   }
   /**
    * 關(guān)閉連接
    * @throws InterruptedException
    */
   public void closeConnection() throws InterruptedException{
      if (zookeeper != null) {
         zookeeper.close();
      }
   }  
}  
 

測試:

public class Demo { 
    public static void main(String[] args) throws Exception {
        BaseZookeeper zookeeper = new BaseZookeeper();
        zookeeper.connectZookeeper("192.168.0.1:2181"); 
        List<String> children = zookeeper.getChildren("/");
        System.out.println(children);
    } 
}

ZookeeperJavaAPI基本操作

Zookeeper官方提供了兩種語言的API,Java和C,在這里只演示JavaAPI

操作API的類中的變量,一下方法都會使用到

static Logger logg = LoggerFactory.getLogger(ZKApi.class);
private static final String zkServerPath = "10.33.57.28:2181";
private static final String zkServerPath = "127.0.0.1:2181";
private static final Integer timeOut = 5000;
private static Stat stat = new Stat(); 

以及實現(xiàn)接口Watcher的實現(xiàn)方法process

public void process(WatchedEvent event) {
    try {
        if (event.getType() == Event.EventType.NodeDataChanged) {
            ZooKeeper zk = null;
            zk = ZKApi.getZkConnect();
            byte[] resByt = new byte[0];
            resByt = zk.getData("/test1", false, stat);
            String resStr = new String(resByt);
            System.out.println("更改后的值:" + resStr);
            System.out.println("版本號的變化:" + stat.getVersion());
            System.out.println("-------");
            countDown.countDown();
        }else if(event.getType() == Event.EventType.NodeChildrenChanged){
            System.out.println("NodeChildrenChanged");
            ZooKeeper zk = null;
            zk = ZKApi.getZkConnect();
            List<String> srcChildList = zk.getChildren(event.getPath(), false);
            for (String child:srcChildList){
                System.out.println(child);
            }
            countDown.countDown();
        }else if(event.getType() == Event.EventType.NodeCreated){
            countDown.countDown();
        }else if (event.getType() == Event.EventType.NodeCreated){
            countDown.countDown();
        }
    }  catch (KeeperException e) {
        e.printStackTrace();
    }  catch (InterruptedException e) {
        e.printStackTrace();
    } catch (IOException e) {
        e.printStackTrace();
    }
}

1.連接客戶端

創(chuàng)建客戶端連接使用Zookeeper類的構(gòu)造函數(shù)

Zookeeper構(gòu)造函數(shù)總共四個如下:

/*
@param connectString zk連接地址以及端口號 格式如:127.0.0.1:2181,如果多個zk,則使用逗號分隔
@param sessionTimeout session超時時間 單位ms
@param watcher 監(jiān)聽器,使用watcher必須實現(xiàn)接口Watcher實現(xiàn)process方法
@sessionId session id 可以用作恢復(fù)回話的參數(shù)
@sessionPassword session password 可以用作恢復(fù)回話的參數(shù)
@canbeReadOnly zk3.4添加的 只讀模式
* */
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
	long sessionId, byte[] sessionPasswd)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
        long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)

連接客戶端代碼

public static ZooKeeper getZkConnect() throws IOException {
    ZooKeeper zk = new ZooKeeper(zkServerPath, timeOut, new ZKApi());
    logg.debug("連接狀態(tài):{}", zk.getState());
    return zk;
}
DEBUG [main] - zookeeper.disableAutoWatchReset is false
DEBUG [main] - 連接狀態(tài):CONNECTING

2.恢復(fù)回話

public static void recoveryConnect() throws IOException, InterruptedException {
    ZooKeeper zooKeeper = new ZooKeeper(zkServerPath, timeOut, new ZKApi());
    long sessionId = zooKeeper.getSessionId();
    byte[] sessionPasswd = zooKeeper.getSessionPasswd();
    logg.debug("開始連接服務(wù)器 . . .");
    logg.debug("連接狀態(tài):{}",zooKeeper.getState());
    new Thread().sleep(1000 );
    logg.debug("開始重連 . . . ");
    ZooKeeper zooSession = new ZooKeeper(zkServerPath, timeOut, new ZKApi(), sessionId, sessionPasswd);
    logg.debug("重連狀態(tài):{}",zooSession.getState());
    new Thread().sleep(200);
    logg.debug("重連狀態(tài):{}",zooSession.getState());
}
DEBUG [main] - 開始連接服務(wù)器 . . .
DEBUG [main] - 連接狀態(tài):CONNECTING
DEBUG [main-SendThread(hdfa67:2181)] - Canonicalized address to hdfa67
 INFO [main-SendThread(hdfa67:2181)] - Opening socket connection to server hdfa67/10.33.57.67:2181. Will not attempt to authenticate using SASL (unknown error)
 INFO [main-SendThread(hdfa67:2181)] - Socket connection established to hdfa67/10.33.57.67:2181, initiating session
DEBUG [main-SendThread(hdfa67:2181)] - Session establishment request sent on hdfa67/10.33.57.67:2181
 INFO [main-SendThread(hdfa67:2181)] - Session establishment complete on server hdfa67/10.33.57.67:2181, sessionid = 0x10000ea59aa0011, negotiated timeout = 5000
DEBUG [main] - 開始重連 . . . 
 INFO [main] - Initiating client connection, connectString=10.33.57.67:2181 sessionTimeout=5000 watcher=ZKApi@73a28541 sessionId=0 sessionPasswd=<hidden>
DEBUG [main] - 重連狀態(tài):CONNECTING
DEBUG [main-SendThread(hdfa67:2181)] - Canonicalized address to hdfa67
 INFO [main-SendThread(hdfa67:2181)] - Opening socket connection to server hdfa67/10.33.57.67:2181. Will not attempt to authenticate using SASL (unknown error)
 INFO [main-SendThread(hdfa67:2181)] - Socket connection established to hdfa67/10.33.57.67:2181, initiating session
DEBUG [main-SendThread(hdfa67:2181)] - Session establishment request sent on hdfa67/10.33.57.67:2181
 INFO [main-SendThread(hdfa67:2181)] - Session establishment complete on server hdfa67/10.33.57.67:2181, sessionid = 0x10000ea59aa0012, negotiated timeout = 5000
DEBUG [main] - 重連狀態(tài):CONNECTED

3.創(chuàng)建節(jié)點

創(chuàng)建節(jié)點通過zk客戶端對象的create方法進行創(chuàng)建,主要有兩個方法:一種是同步,一種是異步,接下來的修改等方法同樣如此,就不多加解釋了

/**
@param path 創(chuàng)建的節(jié)點路徑
@param data 節(jié)點數(shù)據(jù)
@param acl 權(quán)限列表,
@param createMode 指定之創(chuàng)建節(jié)點的類型
@param cb 異步調(diào)用方法
@param ctx 回調(diào)對象
*/
public String create(final String path, byte data[], List<ACL> acl,
        CreateMode createMode)
public void create(final String path, byte data[], List<ACL> acl,
        CreateMode createMode,  StringCallback cb, Object ctx)
public static void createZkNode1() throws IOException, KeeperException, InterruptedException {
    ZooKeeper zk = getZkConnect();
    String result = zk.create("/test1", "test-data".getBytes(), 
    	ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);//創(chuàng)建一個/test的持續(xù)節(jié)點
    System.out.println(result);
//輸出/test1
public static void createZkNode2() throws IOException, KeeperException, InterruptedException {
    ZooKeeper zk = getZkConnect();
    
    String ctx = "{'create': 'success'}";
    zk.create("/test2", "test-data".getBytes(), 
    	ZooDefs.Ids.OPEN_ACL_UNSAFE,
    	CreateMode.PERSISTENT,new CreateCallBack() ,ctx);
    new Thread().sleep(2000);//需要暫停一會,否則創(chuàng)建失敗
}

4.修改節(jié)點

public Stat setData(final String path, byte data[], int version)
public void setData(final String path, byte data[], int version,
        StatCallback cb, Object ctx)
public static void setZkNode1() throws IOException, KeeperException, InterruptedException{
    ZooKeeper zk = getZkConnect();
    Stat stat = zk.setData("/test1", "modifyed-data".getBytes(), 0);
    System.out.println(stat.getVersion());
}
public static void setZkNode2() throws IOException, KeeperException, InterruptedException{
    ZooKeeper zk = getZkConnect();
    String ctx = "{'modify': 'success'}";
    zk.setData("/test1", "modifyed-data".getBytes(),0,new ModifyCalback(),ctx);
    new Thread().sleep(1000);//必須加上,否則回掉不成功
}

5.刪除節(jié)點

public void delete(final String path, int version)
public void delete(final String path, int version, VoidCallback cb,
        Object ctx)
public static void deleteZkNode1() throws IOException, KeeperException, InterruptedException {
    ZooKeeper zk = getZkConnect();
    zk.delete("/test1",1);//不能夠刪除子節(jié)點
}
public static void deleteZkNode2() throws IOException, InterruptedException {
    ZooKeeper zk = getZkConnect();
    String ctx = "{'delete': 'success'}";
    zk.delete("/test2",0,new DeleteCallBack(),ctx);//不能夠刪除子節(jié)點
    new Thread().sleep(1000);//必須加上,否則回掉不成功
}

6.查詢節(jié)點

public byte[] getData(String path, boolean watch, Stat stat)
public byte[] getData(final String path, Watcher watcher, Stat stat)
public void getData(final String path, Watcher watcher,DataCallback cb, Object ctx)             
public void getData(String path, boolean watch, DataCallback cb, Object ctx)        
public static CountDownLatch countDown = new CountDownLatch(1);
public static void selectData1() throws IOException, KeeperException, InterruptedException {
    ZooKeeper zk = getZkConnect();
    byte[] data = zk.getData("/test1", true, stat);
    String s = new String(data);
    System.out.println("value: "+s);
    countDown.await();
}
if (event.getType() == Event.EventType.NodeDataChanged) {
            ZooKeeper zk = null;
            zk = ZKApi.getZkConnect();
            byte[] resByt = new byte[0];
            resByt = zk.getData("/test1", false, stat);
            String resStr = new String(resByt);
            System.out.println("更改后的值:" + resStr);
            System.out.println("版本號的變化:" + stat.getVersion());
            System.out.println("-------");
            countDown.countDown();
        

由于更改之后,觸發(fā)了監(jiān)聽器,再次在命令行中進行更改,出現(xiàn)了一下結(jié)果。

在這里插入圖片描述

7.查詢子節(jié)點

查詢子節(jié)點的方法

public List<String> getChildren(final String path, Watcher watcher)
public List<String> getChildren(String path, boolean watch)
public void getChildren(final String path, Watcher watcher, ChildrenCallback cb, Object ctx)
public void getChildren(String path, boolean watch, Children2Callback cb, Object ctx)   
public List<String> getChildren(final String path, Watcher watcher, Stat stat)
public List<String> getChildren(String path, boolean watch, Stat stat)
public void getChildren(final String path, Watcher watcher, Children2Callback cb, Object ctx)
public void getChildren(String path, boolean watch, Children2Callback cb, Object ctx)
            

代碼實現(xiàn)

public static CountDownLatch countDown = new CountDownLatch(1);
public static void selectchildData1() throws IOException, KeeperException, InterruptedException {
    ZooKeeper zk = getZkConnect();
    List<String> srcChildList = zk.getChildren("/test", true, stat);
    for (String child:srcChildList){
        System.out.println(child);
    }
    countDown.await();
}
if(event.getType() == Event.EventType.NodeChildrenChanged){
    System.out.println("NodeChildrenChanged");
    ZooKeeper zk = null;
    zk = ZKApi.getZkConnect();
    List<String> srcChildList = zk.getChildren(event.getPath(), false);
    for (String child:srcChildList){
        System.out.println(child);
    }

運行結(jié)果完成后,觸監(jiān)聽器,再次刪除test1

在這里插入圖片描述

第二種異步方式實現(xiàn)

public static void selectchildData2() throws IOException, KeeperException, InterruptedException{
    ZooKeeper zk = getZkConnect();
    String ctx = "{'selectChild': 'success'}";
    zk.getChildren("/test",false,new ChildrenCallback(),ctx);
    new Thread().sleep(1000);
}

8.使用遞歸得到所有的節(jié)點

public static void selectchildData3() throws IOException, KeeperException, InterruptedException{
   getChild("/");
}
public static void getChild(String path) throws IOException, KeeperException, InterruptedException {
    System.out.println(path);
    ZooKeeper zk = getZkConnect();
    List<String> childrenList = zk.getChildren(path, false, stat);
    if(childrenList.isEmpty() || childrenList ==null)
        return;
    for(String s:childrenList){
        if(path.equals("/"))
           getChild(path+s);
        else {
            getChild(path+"/"+s);
        }
    }
}

運行結(jié)果:

/zookeeper
/zookeeper/config
/zookeeper/quota
/ldd
/ldd/l
/loo
/t1
/test1
/seq
/seq/seq30000000002
/seq/seq20000000001
/seq/se0000000003
/seq/seq10000000000

9.判斷節(jié)點是否存在

public static void existNode() throws IOException, KeeperException, InterruptedException {
    ZooKeeper zk = getZkConnect();
    Stat stat = zk.exists("/ff", true);
    System.out.println(stat);
}
//輸出null則不存在

10.自定義權(quán)限

public static void oneSelfACL() throws Exception {
    ZooKeeper zk = getZkConnect();
    ArrayList<ACL> acls = new ArrayList<ACL>();
  //  zk.create("/test1","test-data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); //所有人均可訪問
    Id id1 = new Id("digest", ACLUtils.getDigestUserPassword("id1:123456"));
    Id id2 = new Id("digest", ACLUtils.getDigestUserPassword("id2:123456"));
   // Id ipId = new Id("ip","127.0.0.1");ip設(shè)置
    // acls.add(new ACL(ZooDefs.Perms.ALL,id1));
    acls.add(new ACL(ZooDefs.Perms.ALL,id1));
    acls.add(new ACL(ZooDefs.Perms.DELETE,id2));
    //注冊過的用戶必須通過addAuthInfo才可以操作節(jié)點
    zk.addAuthInfo("digest","id1:123456".getBytes());
    zk.create("/test2","test2-data".getBytes(), acls,CreateMode.PERSISTENT);
}

結(jié)果如下:

在這里插入圖片描述

直接登錄id1由于在程序已經(jīng)注冊完成

在這里插入圖片描述

在這里插入圖片描述

在這里插入圖片描述

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關(guān)文章

最新評論