ZooKeeper開發(fā)實際應用案例實戰(zhàn)
ZooKeeper入門教程二在單機和集群環(huán)境下的安裝搭建及使用
ZooKeeper框架教程Curator分布式鎖實現及源碼分析
前面幾章,我們學習了zookeeper的概念和使用,并且分析了curator通過zookeeper實現分布式鎖的源代碼,我們已經熟知zookeeper協(xié)調分布式系統(tǒng)的方式,相信大家一定會思考自己的項目場景中是否有zookeeper的用武之地。沒錯,我們學習的最終目的是要去應用它。本章,我通過實際工作中的一個例子,講解zookeeper是如何幫我解決分布式問題,以此引導大家發(fā)現自己系統(tǒng)中可以應用zookeeper的場景。真正把zookeeper使用起來!
項目背景介紹
首先給大家介紹一下本文描述項目的情況。這是一個檢索網站,它讓你能在幾千萬份復雜文檔數據中檢索出你所需要的文檔數據。為了加快檢索速度,項目的數據分布在100臺機器的內存里,我們稱之為數據服務器。除了數據,這100臺機器上均部署著檢索程序。這些server之外,還有數臺給前端提供接口的搜索server,這些機器屬一個集群,我們稱之為檢索服務器。當搜索請求過來時,他們負責把搜索請求轉發(fā)到那100臺機器,待所有機器返回結果后進行合并,最終返回給前端頁面。結構如下圖:
面臨問題
網站上線之初,由于數據只有幾百萬,所以數據服務器只有10多臺。是一個規(guī)模比較小的分布式系統(tǒng),當時沒有做分布式系統(tǒng)的協(xié)調,也能正常工作,偶爾出問題,馬上解決。但是到了近期,機器增長到100臺,網站幾乎每天都會出現問題,導致整個分布式系統(tǒng)掛掉。問題原因如下:
數據服務器之前沒有做分布式協(xié)調。對于檢索服務器來說,并不知道哪些數據服務器還存活,所以檢索服務器每次檢索,都會等待100臺機器返回結果。但假如100臺數據服務中某一臺死掉了,檢索服務器也會長時間等待他的返回。這導致了檢索服務器積累了大量的請求,最終被壓垮。當所有的檢索服務器都被壓垮時,那么網站也就徹底不可用了。
問題的本質為檢索服務器維護的數據服務器列表是靜態(tài)不變的,不能感知數據服務器的上下線。
在10臺數據服務器的時候,某一臺機器出問題的概率很小。但當增長到100臺服務器時,出問題的概率變成了10倍。所以才會導致網站幾乎每天都要死掉一次。
由于一臺機器的問題,導致100臺機器的分布式系統(tǒng)不可用,這是極其不合理,也是無法忍受的。
之前此項目的數據和檢索不由我負責。了解到此問題的時候,我覺得這個問題得立刻解決,否則不但用戶體驗差,而且開發(fā)和運維也要每天疲于系統(tǒng)維護,浪費了大量資源,但由于還有很多新的需求在開發(fā),原來的團隊也沒時間去處理。今年我有機會來解決這個問題,當時正好剛剛研究完zookeeper,立刻想到這正是采用zookeeper的典型場景。
如何解決
我直接說方案,程序分為數據服務器和檢索服務器兩部分。
數據服務器:
1、每臺數據服務器啟動時候以臨時節(jié)點的形式把自己注冊到zookeeper的某節(jié)點下,如/data_servers。這樣當某數據服務器死掉時,session斷開鏈接,該節(jié)點被刪除。
檢索服務器:
1、啟動時,加載/data_servers下所有子節(jié)點數據,獲取了目前所有能提供服務的數據服務器列表,并且加載到內存中。
2、啟動時,同時監(jiān)聽/data_servers節(jié)點,當新的數據server上線或者某個server下線時,獲得通知,然后重新加載/data_servers下所有子節(jié)點數據,刷新內存中數據服務器列表。
通過以上方案,做到數據服務器上下線時,檢索服務器能夠動態(tài)感知。檢索服務器在檢索前,從內存中取得的數據服務器列表將是最新的、可用的。即使在刷新時間差內取到了掉線的數據服務器也沒關系,最多影響本次查詢,而不會拖垮整個集群。見下圖:
代碼講解
捋清思路后,其實代碼就比較簡單了。數據服務器只需要啟動的時候寫zookeeper臨時節(jié)點就好了,同時寫入自己服務器的相關信息,比如ip、port之類。檢索無服務器端會稍微復雜點,不過此處場景和zookeeper官方給的例子十分符合,所以我直接參考官方例子進行修改,實現起來也很簡單。關于官方例子我寫過兩篇博文,可以參考學習:
zookeeper官方例子翻譯:ZooKeeper官方文檔之Java客戶端開發(fā)案例翻譯
zookeeper官方例子解讀:ZooKeeper官方文檔之Java案例解讀
數據服務器
數據服務器程序十分簡單,只會做一件事情:啟動的時候,把自己以臨時節(jié)點的形式注冊到zookeeper。一旦服務器掛掉,zookeeper自動刪除臨時znode。
我們創(chuàng)建ServiceRegister.java實現Runnable,數據服務啟動的時候,單獨線程運行此代碼,實現注冊到zookeeper邏輯。維系和zookeeper的鏈接。
檢索服務器
檢索服務器,代碼設計完全采用官方案例,所以詳細的代碼解讀請參考上面提到的兩篇文章,這里只做下簡述。
代碼有兩個類DataMonitor和LoadSaidsExecutor。LoadSaidsExecutor是啟動入口,他來啟動DataMonitor監(jiān)控zookeeper節(jié)點變化。DataMonitor負責監(jiān)控,初次啟動和發(fā)現變化時,調用LoadSaidsExecutor的方法來加載最新的數據服務器列表信息。
DataMonitor和LoadSaidsExecutor的工作流程如下:
Excutor把自己注冊為DataMonitor的監(jiān)聽
DataMonitor實現watcher接口,并監(jiān)聽znode
znode變化時,觸發(fā)DataMonitor的監(jiān)聽
回調回調中通過ZooKeeper.exist() 再次監(jiān)聽znode
上一步exist的回調方法中,調用監(jiān)聽自己的Executor,執(zhí)行業(yè)務邏輯6
Executor啟新的線程加載數據服務器信息到內存中
注意:圖為以前文章配圖。圖里應該把6,7步改為文字描述的第6步。
檢索服務啟動的時候,單獨線程運行LoadSaIdsExecutor。LoadSaIdsExecutor會阻塞線程,轉為事件驅動。
總結
我們通過一個例子,展示了zookeeper在實際系統(tǒng)中的應用,通過zookeeper解決了分布式系統(tǒng)的問題。其實以上代碼還有很大的優(yōu)化空間。我能想到如下兩點:
1、數據服務器會假死或者變慢,但和zk鏈接還在,并不會從zk中刪除,但已經拖慢了集群的速度。解決此問題,我們可以在數據服務器中加入定時任務,通過定時跑真實業(yè)務查詢,監(jiān)控服務器狀態(tài),一旦達到設定的紅線閾值,強制下線,而不是等到server徹底死掉。
2、檢索服務器每個server都監(jiān)控zookeeper同一個節(jié)點,在節(jié)點變化時會出現羊群效應。當然,檢索服務器如果數量不多還好。其實檢索服務器應該通過zookeeper做一個leader選舉,只由leader去監(jiān)控zookeeper節(jié)點變化,更新redis中的數據服務器列表緩存即可。
附:完整代碼
數據服務端代碼
ServiceRegister.java
public class ServiceRegister implements Runnable{ private ZooKeeper zk; private static final String ZNODE = "/sas"; private static final String SA_NODE_PREFIX = "sa_"; private String hostName="localhost:2181"; public void setHostName(String hostName) { this.hostName = hostName; } public ServiceRegister() throws IOException { zk = new ZooKeeper(hostName, 10000,null); } @Override public void run() { try { createSaNode(); synchronized (this) { wait(); } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } //測試用 public static void main(String[] args){ try { new ServiceRegister().run(); } catch (IOException e) { e.printStackTrace(); } } //創(chuàng)建子節(jié)點 private String createSaNode() throws KeeperException, InterruptedException { // 如果根節(jié)點不存在,則創(chuàng)建根節(jié)點 Stat stat = zk.exists(ZNODE, false); if (stat == null) { zk.create(ZNODE, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } String hostName = System.getenv("HOSTNAME"); // 創(chuàng)建EPHEMERAL_SEQUENTIAL類型節(jié)點 String saPath = zk.create(ZNODE + "/" + SA_NODE_PREFIX, hostName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); return saPath; } }
檢索服務端代碼
DataMonitor.java
public class DataMonitor implements Watcher, AsyncCallback.ChildrenCallback { ZooKeeper zk; String znode; Watcher chainedWatcher; boolean dead; DataMonitorListener listener; List<String> prevSaIds; public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher, DataMonitorListener listener) { this.zk = zk; this.znode = znode; this.chainedWatcher = chainedWatcher; this.listener = listener; // 這是整個監(jiān)控的真正開始,通過獲取children節(jié)點開始。設置了本對象為監(jiān)控對象,回調對象也是本對象。以后均是事件驅動。 zk.getChildren(znode, true, this, null); } /** * 其他和monitor產生交互的類,需要實現此listener */ public interface DataMonitorListener { /** * The existence status of the node has changed. */ void changed(List<String> saIds); /** * The ZooKeeper session is no longer valid. * * @param rc * the ZooKeeper reason code */ void closing(int rc); } /* *監(jiān)控/saids的回調函數。除了處理異常外。 *如果發(fā)生變化,和構造函數中一樣,通過getChildren,再次監(jiān)控,并處理children節(jié)點變化后的業(yè)務 */ public void process(WatchedEvent event) { String path = event.getPath(); if (event.getType() == Event.EventType.None) { // We are are being told that the state of the // connection has changed switch (event.getState()) { case SyncConnected: // In this particular example we don't need to do anything // here - watches are automatically re-registered with // server and any watches triggered while the client was // disconnected will be delivered (in order of course) break; case Expired: // It's all over dead = true; listener.closing(Code.SESSIONEXPIRED.intValue()); break; } } else { if (path != null && path.equals(znode)) { // Something has changed on the node, let's find out zk.getChildren(znode, true, this, null); } } if (chainedWatcher != null) { chainedWatcher.process(event); } } //拿到Children節(jié)點后的回調函數。 @Override public void processResult(int rc, String path, Object ctx, List<String> children) { boolean exists; switch (rc) { case Code.Ok: exists = true; break; case Code.NoNode: exists = false; break; case Code.SessionExpired: case Code.NoAuth: dead = true; listener.closing(rc); return; default: // Retry errors zk.getChildren(znode, true, this, null); return; } List<String> saIds = null; //如果存在,再次查詢到最新children,此時僅查詢,不要設置監(jiān)控了 if (exists) { try { saIds = zk.getChildren(znode,null); } catch (KeeperException e) { // We don't need to worry about recovering now. The watch // callbacks will kick off any exception handling e.printStackTrace(); } catch (InterruptedException e) { return; } } //拿到最新saids后,通過listener(executor),加載Saids。 if ((saIds == null && saIds != prevSaIds) || (saIds != null && !saIds.equals(prevSaIds))) { listener.changed(saIds); prevSaIds = saIds; } } }
LoadSaIdsExecutor.java
public class LoadSaIdsExecutor implements Watcher, Runnable, DataMonitor.DataMonitorListener { private DataMonitor dm; private ZooKeeper zk; private static final String znode = "/sas"; private String hostName="localhost:2181"; public void setHostName(String hostName) { this.hostName = hostName; } /* *初始化zookeeper及DataMonitor * 自己作為zookeeper的監(jiān)控者,監(jiān)控和zookeeper連接的變化 * 自己作為DataMonitor的listener。當dm監(jiān)控到變化時會調用executor執(zhí)行業(yè)務操作 */ public LoadSaIdsExecutor() throws KeeperException, IOException { zk = new ZooKeeper(hostName, 300000, this); dm = new DataMonitor(zk, znode, null, this); } /** * 入口方法,測試用。 */ public static void main(String[] args) { try { new LoadSaIdsExecutor().run(); } catch (Exception e) { e.printStackTrace(); } } /** * 作為單獨線程運行 */ public void run() { try { synchronized (this) { while (!dm.dead) { wait(); } } } catch (InterruptedException e) { } } /* *作為zookeeper監(jiān)控者的回調,直接傳遞事件給monitor的回調函數統(tǒng)一處理 */ @Override public void process(WatchedEvent event) { dm.process(event); } /* *當關閉時,讓線程線繼續(xù)走完 */ public void closing(int rc) { synchronized (this) { notifyAll(); } } /* *監(jiān)控到/saids變化后的處理類 */ static class SaIdsLoader extends Thread { List<String> saIds = null; //構造對象后直接啟動線程 public SaIdsLoader(List<String> saIds){ this.saIds = saIds; start(); } public void run() { System.out.println("------------加載開始------------"); //業(yè)務處理的地方 if(saIds!=null){ saIds.forEach(id->{ System.out.println(id); }); } System.out.println("------------加載結束------------"); } } /* *作為listener對外暴露的方法,在節(jié)點/saids變化時被調用。 */ @Override public void changed(List<String> data) { new SaIdsLoader(data); } }
以上就是ZooKeeper開發(fā)實際應用案例實戰(zhàn)的詳細內容,更多關于ZooKeeper開發(fā)應用案例的資料請關注腳本之家其它相關文章!
相關文章
java后臺調用HttpURLConnection類模擬瀏覽器請求實例(可用于接口調用)
這篇文章主要介紹了java后臺調用HttpURLConnection類模擬瀏覽器請求實例,該實例可用于接口調用,具有一定的實用價值,需要的朋友可以參考下2014-10-10Java Scanner類用法及nextLine()產生的換行符問題實例分析
這篇文章主要介紹了Java Scanner類用法及nextLine()產生的換行符問題,結合實例形式分析了Scanner類功能、hasNextInt()和nextInt()方法使用及nextLine()產生的換行符問題解決方法,需要的朋友可以參考下2019-03-03MyBatis基于pagehelper實現分頁原理及代碼實例
這篇文章主要介紹了MyBatis基于pagehelper實現分頁原理及代碼實例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-06-06Elasticsearch開發(fā)AtomicArray使用示例探究
這篇文章主要為大家介紹了Elasticsearch AtomicArray使用示例探究,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-08-08關于Cannot?resolve?com.microsoft.sqlserver:sqljdbc4:4.0報錯問題解
這篇文章主要給大家介紹了關于Cannot?resolve?com.microsoft.sqlserver:sqljdbc4:4.0報錯問題的解決辦法,這個是在pom文件中添加依賴出現報錯問題,需要的朋友可以參考下2024-02-02