欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

ZooKeeper開發(fā)實際應(yīng)用案例實戰(zhàn)

 更新時間:2022年01月28日 14:40:49   作者:愛碼叔(稀有氣體)  
這篇文章主要為大家介紹了ZooKeeper開發(fā)的實際應(yīng)用案例實戰(zhàn),文中附含詳細(xì)開發(fā)應(yīng)用源碼,有需要的朋友可以借鑒參考下,希望能夠有所幫助

ZooKeeper入門教程一簡介與核心概念

ZooKeeper入門教程二在單機和集群環(huán)境下的安裝搭建及使用

ZooKeeper入門教程三分布式鎖實現(xiàn)及完整運行源碼

ZooKeeper框架教程Curator分布式鎖實現(xiàn)及源碼分析

前面幾章,我們學(xué)習(xí)了zookeeper的概念和使用,并且分析了curator通過zookeeper實現(xiàn)分布式鎖的源代碼,我們已經(jīng)熟知zookeeper協(xié)調(diào)分布式系統(tǒng)的方式,相信大家一定會思考自己的項目場景中是否有zookeeper的用武之地。沒錯,我們學(xué)習(xí)的最終目的是要去應(yīng)用它。本章,我通過實際工作中的一個例子,講解zookeeper是如何幫我解決分布式問題,以此引導(dǎo)大家發(fā)現(xiàn)自己系統(tǒng)中可以應(yīng)用zookeeper的場景。真正把zookeeper使用起來!

項目背景介紹

首先給大家介紹一下本文描述項目的情況。這是一個檢索網(wǎng)站,它讓你能在幾千萬份復(fù)雜文檔數(shù)據(jù)中檢索出你所需要的文檔數(shù)據(jù)。為了加快檢索速度,項目的數(shù)據(jù)分布在100臺機器的內(nèi)存里,我們稱之為數(shù)據(jù)服務(wù)器。除了數(shù)據(jù),這100臺機器上均部署著檢索程序。這些server之外,還有數(shù)臺給前端提供接口的搜索server,這些機器屬一個集群,我們稱之為檢索服務(wù)器。當(dāng)搜索請求過來時,他們負(fù)責(zé)把搜索請求轉(zhuǎn)發(fā)到那100臺機器,待所有機器返回結(jié)果后進行合并,最終返回給前端頁面。結(jié)構(gòu)如下圖:

面臨問題

網(wǎng)站上線之初,由于數(shù)據(jù)只有幾百萬,所以數(shù)據(jù)服務(wù)器只有10多臺。是一個規(guī)模比較小的分布式系統(tǒng),當(dāng)時沒有做分布式系統(tǒng)的協(xié)調(diào),也能正常工作,偶爾出問題,馬上解決。但是到了近期,機器增長到100臺,網(wǎng)站幾乎每天都會出現(xiàn)問題,導(dǎo)致整個分布式系統(tǒng)掛掉。問題原因如下:

數(shù)據(jù)服務(wù)器之前沒有做分布式協(xié)調(diào)。對于檢索服務(wù)器來說,并不知道哪些數(shù)據(jù)服務(wù)器還存活,所以檢索服務(wù)器每次檢索,都會等待100臺機器返回結(jié)果。但假如100臺數(shù)據(jù)服務(wù)中某一臺死掉了,檢索服務(wù)器也會長時間等待他的返回。這導(dǎo)致了檢索服務(wù)器積累了大量的請求,最終被壓垮。當(dāng)所有的檢索服務(wù)器都被壓垮時,那么網(wǎng)站也就徹底不可用了。

問題的本質(zhì)為檢索服務(wù)器維護的數(shù)據(jù)服務(wù)器列表是靜態(tài)不變的,不能感知數(shù)據(jù)服務(wù)器的上下線。

在10臺數(shù)據(jù)服務(wù)器的時候,某一臺機器出問題的概率很小。但當(dāng)增長到100臺服務(wù)器時,出問題的概率變成了10倍。所以才會導(dǎo)致網(wǎng)站幾乎每天都要死掉一次。

由于一臺機器的問題,導(dǎo)致100臺機器的分布式系統(tǒng)不可用,這是極其不合理,也是無法忍受的。

之前此項目的數(shù)據(jù)和檢索不由我負(fù)責(zé)。了解到此問題的時候,我覺得這個問題得立刻解決,否則不但用戶體驗差,而且開發(fā)和運維也要每天疲于系統(tǒng)維護,浪費了大量資源,但由于還有很多新的需求在開發(fā),原來的團隊也沒時間去處理。今年我有機會來解決這個問題,當(dāng)時正好剛剛研究完zookeeper,立刻想到這正是采用zookeeper的典型場景。

如何解決

我直接說方案,程序分為數(shù)據(jù)服務(wù)器和檢索服務(wù)器兩部分。

數(shù)據(jù)服務(wù)器:

1、每臺數(shù)據(jù)服務(wù)器啟動時候以臨時節(jié)點的形式把自己注冊到zookeeper的某節(jié)點下,如/data_servers。這樣當(dāng)某數(shù)據(jù)服務(wù)器死掉時,session斷開鏈接,該節(jié)點被刪除。

檢索服務(wù)器:

1、啟動時,加載/data_servers下所有子節(jié)點數(shù)據(jù),獲取了目前所有能提供服務(wù)的數(shù)據(jù)服務(wù)器列表,并且加載到內(nèi)存中。

2、啟動時,同時監(jiān)聽/data_servers節(jié)點,當(dāng)新的數(shù)據(jù)server上線或者某個server下線時,獲得通知,然后重新加載/data_servers下所有子節(jié)點數(shù)據(jù),刷新內(nèi)存中數(shù)據(jù)服務(wù)器列表。

通過以上方案,做到數(shù)據(jù)服務(wù)器上下線時,檢索服務(wù)器能夠動態(tài)感知。檢索服務(wù)器在檢索前,從內(nèi)存中取得的數(shù)據(jù)服務(wù)器列表將是最新的、可用的。即使在刷新時間差內(nèi)取到了掉線的數(shù)據(jù)服務(wù)器也沒關(guān)系,最多影響本次查詢,而不會拖垮整個集群。見下圖:

代碼講解

捋清思路后,其實代碼就比較簡單了。數(shù)據(jù)服務(wù)器只需要啟動的時候?qū)憐ookeeper臨時節(jié)點就好了,同時寫入自己服務(wù)器的相關(guān)信息,比如ip、port之類。檢索無服務(wù)器端會稍微復(fù)雜點,不過此處場景和zookeeper官方給的例子十分符合,所以我直接參考官方例子進行修改,實現(xiàn)起來也很簡單。關(guān)于官方例子我寫過兩篇博文,可以參考學(xué)習(xí):

zookeeper官方例子翻譯:ZooKeeper官方文檔之Java客戶端開發(fā)案例翻譯

zookeeper官方例子解讀:ZooKeeper官方文檔之Java案例解讀

數(shù)據(jù)服務(wù)器

數(shù)據(jù)服務(wù)器程序十分簡單,只會做一件事情:啟動的時候,把自己以臨時節(jié)點的形式注冊到zookeeper。一旦服務(wù)器掛掉,zookeeper自動刪除臨時znode。

我們創(chuàng)建ServiceRegister.java實現(xiàn)Runnable,數(shù)據(jù)服務(wù)啟動的時候,單獨線程運行此代碼,實現(xiàn)注冊到zookeeper邏輯。維系和zookeeper的鏈接。

檢索服務(wù)器

檢索服務(wù)器,代碼設(shè)計完全采用官方案例,所以詳細(xì)的代碼解讀請參考上面提到的兩篇文章,這里只做下簡述。

代碼有兩個類DataMonitor和LoadSaidsExecutor。LoadSaidsExecutor是啟動入口,他來啟動DataMonitor監(jiān)控zookeeper節(jié)點變化。DataMonitor負(fù)責(zé)監(jiān)控,初次啟動和發(fā)現(xiàn)變化時,調(diào)用LoadSaidsExecutor的方法來加載最新的數(shù)據(jù)服務(wù)器列表信息。

DataMonitor和LoadSaidsExecutor的工作流程如下:

Excutor把自己注冊為DataMonitor的監(jiān)聽

DataMonitor實現(xiàn)watcher接口,并監(jiān)聽znode

znode變化時,觸發(fā)DataMonitor的監(jiān)聽

回調(diào)回調(diào)中通過ZooKeeper.exist() 再次監(jiān)聽znode

上一步exist的回調(diào)方法中,調(diào)用監(jiān)聽自己的Executor,執(zhí)行業(yè)務(wù)邏輯6

Executor啟新的線程加載數(shù)據(jù)服務(wù)器信息到內(nèi)存中

注意:圖為以前文章配圖。圖里應(yīng)該把6,7步改為文字描述的第6步。

檢索服務(wù)啟動的時候,單獨線程運行LoadSaIdsExecutor。LoadSaIdsExecutor會阻塞線程,轉(zhuǎn)為事件驅(qū)動。

總結(jié)

我們通過一個例子,展示了zookeeper在實際系統(tǒng)中的應(yīng)用,通過zookeeper解決了分布式系統(tǒng)的問題。其實以上代碼還有很大的優(yōu)化空間。我能想到如下兩點:

1、數(shù)據(jù)服務(wù)器會假死或者變慢,但和zk鏈接還在,并不會從zk中刪除,但已經(jīng)拖慢了集群的速度。解決此問題,我們可以在數(shù)據(jù)服務(wù)器中加入定時任務(wù),通過定時跑真實業(yè)務(wù)查詢,監(jiān)控服務(wù)器狀態(tài),一旦達到設(shè)定的紅線閾值,強制下線,而不是等到server徹底死掉。

2、檢索服務(wù)器每個server都監(jiān)控zookeeper同一個節(jié)點,在節(jié)點變化時會出現(xiàn)羊群效應(yīng)。當(dāng)然,檢索服務(wù)器如果數(shù)量不多還好。其實檢索服務(wù)器應(yīng)該通過zookeeper做一個leader選舉,只由leader去監(jiān)控zookeeper節(jié)點變化,更新redis中的數(shù)據(jù)服務(wù)器列表緩存即可。

附:完整代碼

數(shù)據(jù)服務(wù)端代碼

ServiceRegister.java

public class ServiceRegister implements Runnable{
    private ZooKeeper zk;
    private static final String ZNODE = "/sas"; 
    private static final String SA_NODE_PREFIX = "sa_"; 
    private String hostName="localhost:2181"; 
    public void setHostName(String hostName) {
        this.hostName = hostName;
    }
     public ServiceRegister() throws IOException {
        zk = new ZooKeeper(hostName, 10000,null);
    }
  
    @Override
    public void run() {
        try {
             createSaNode();
            synchronized (this) {
                wait();
            }
 
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
 
    //測試用
    public static void main(String[] args){
        try {
            new ServiceRegister().run();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    //創(chuàng)建子節(jié)點
    private String createSaNode() throws KeeperException, InterruptedException {
        // 如果根節(jié)點不存在,則創(chuàng)建根節(jié)點
        Stat stat = zk.exists(ZNODE, false);
        if (stat == null) {
            zk.create(ZNODE, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
 
        String hostName = System.getenv("HOSTNAME");
        // 創(chuàng)建EPHEMERAL_SEQUENTIAL類型節(jié)點
        String saPath = zk.create(ZNODE + "/" + SA_NODE_PREFIX,
                hostName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL_SEQUENTIAL);
        return saPath;
    }
}

檢索服務(wù)端代碼

DataMonitor.java

public class DataMonitor implements Watcher, AsyncCallback.ChildrenCallback { 
    ZooKeeper zk;
    String znode; 
    Watcher chainedWatcher; 
    boolean dead;
    DataMonitorListener listener;
     List<String> prevSaIds;
     public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,
                       DataMonitorListener listener) {
        this.zk = zk;
        this.znode = znode;
        this.chainedWatcher = chainedWatcher;
        this.listener = listener;
        // 這是整個監(jiān)控的真正開始,通過獲取children節(jié)點開始。設(shè)置了本對象為監(jiān)控對象,回調(diào)對象也是本對象。以后均是事件驅(qū)動。
        zk.getChildren(znode, true, this, null);
    }
 
    /**
     * 其他和monitor產(chǎn)生交互的類,需要實現(xiàn)此listener
     */
    public interface DataMonitorListener {
        /**
         * The existence status of the node has changed.
         */
        void changed(List<String> saIds);
 
        /**
         * The ZooKeeper session is no longer valid.
         *
         * @param rc
         *                the ZooKeeper reason code
         */
        void closing(int rc);
    }
 
   /*
    *監(jiān)控/saids的回調(diào)函數(shù)。除了處理異常外。
    *如果發(fā)生變化,和構(gòu)造函數(shù)中一樣,通過getChildren,再次監(jiān)控,并處理children節(jié)點變化后的業(yè)務(wù)
    */
    public void process(WatchedEvent event) {
        String path = event.getPath();
        if (event.getType() == Event.EventType.None) {
            // We are are being told that the state of the
            // connection has changed
            switch (event.getState()) {
                case SyncConnected:
                    // In this particular example we don't need to do anything
                    // here - watches are automatically re-registered with
                    // server and any watches triggered while the client was
                    // disconnected will be delivered (in order of course)
                    break;
                case Expired:
                    // It's all over
                    dead = true;
                    listener.closing(Code.SESSIONEXPIRED.intValue());
                    break;
            }
        } else {
            if (path != null && path.equals(znode)) {
                // Something has changed on the node, let's find out
                zk.getChildren(znode, true, this, null);
            }
        }
        if (chainedWatcher != null) {
            chainedWatcher.process(event);
        }
    }
 
    //拿到Children節(jié)點后的回調(diào)函數(shù)。
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children) {
        boolean exists;
        switch (rc) {
            case Code.Ok:
                exists = true;
                break;
            case Code.NoNode:
                exists = false;
                break;
            case Code.SessionExpired:
            case Code.NoAuth:
                dead = true;
                listener.closing(rc);
                return;
            default:
                // Retry errors
                zk.getChildren(znode, true, this, null);
                return;
        }
 
        List<String> saIds = null;
 
        //如果存在,再次查詢到最新children,此時僅查詢,不要設(shè)置監(jiān)控了
        if (exists) {
            try {
                saIds = zk.getChildren(znode,null);
            } catch (KeeperException e) {
                // We don't need to worry about recovering now. The watch
                // callbacks will kick off any exception handling
                e.printStackTrace();
            } catch (InterruptedException e) {
                return;
            }
        }
 
        //拿到最新saids后,通過listener(executor),加載Saids。
        if ((saIds == null && saIds != prevSaIds)
                || (saIds != null && !saIds.equals(prevSaIds))) {
            listener.changed(saIds);
            prevSaIds = saIds;
        }
    }
}

LoadSaIdsExecutor.java

public class LoadSaIdsExecutor
        implements Watcher, Runnable, DataMonitor.DataMonitorListener
{
 
    private DataMonitor dm;
    private ZooKeeper zk;
    private static final String znode = "/sas"; 
    private String hostName="localhost:2181";
    public void setHostName(String hostName) {
        this.hostName = hostName;
    }
 
        /*
         *初始化zookeeper及DataMonitor
         * 自己作為zookeeper的監(jiān)控者,監(jiān)控和zookeeper連接的變化
         * 自己作為DataMonitor的listener。當(dāng)dm監(jiān)控到變化時會調(diào)用executor執(zhí)行業(yè)務(wù)操作
         */
    public LoadSaIdsExecutor() throws KeeperException, IOException {
        zk = new ZooKeeper(hostName, 300000, this);
        dm = new DataMonitor(zk, znode, null, this);
    }
 
    /**
     * 入口方法,測試用。
     */
    public static void main(String[] args) {
        try {
            new LoadSaIdsExecutor().run();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    /**
     * 作為單獨線程運行
     */
    public void run() {
        try {
            synchronized (this) {
                while (!dm.dead) {
                    wait();
                }
            }
        } catch (InterruptedException e) {
        }
    }
    /*
     *作為zookeeper監(jiān)控者的回調(diào),直接傳遞事件給monitor的回調(diào)函數(shù)統(tǒng)一處理
     */
    @Override
    public void process(WatchedEvent event) {
        dm.process(event);
    } 
    /*
     *當(dāng)關(guān)閉時,讓線程線繼續(xù)走完
     */
    public void closing(int rc) {
        synchronized (this) {
            notifyAll();
        }
    }
 
    /*
     *監(jiān)控到/saids變化后的處理類
     */
    static class SaIdsLoader extends Thread {
         List<String> saIds = null; 
        //構(gòu)造對象后直接啟動線程
        public SaIdsLoader(List<String> saIds){
            this.saIds = saIds;
            start();
        }
 
        public void run() {
            System.out.println("------------加載開始------------");
            //業(yè)務(wù)處理的地方
            if(saIds!=null){
                saIds.forEach(id->{
                    System.out.println(id);
                });
            }
            System.out.println("------------加載結(jié)束------------");
        }
    }
 
    /*
     *作為listener對外暴露的方法,在節(jié)點/saids變化時被調(diào)用。
     */
    @Override
    public void changed(List<String> data) {
                new SaIdsLoader(data);
    }
}

以上就是ZooKeeper開發(fā)實際應(yīng)用案例實戰(zhàn)的詳細(xì)內(nèi)容,更多關(guān)于ZooKeeper開發(fā)應(yīng)用案例的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評論