nacos客戶端一致性hash負(fù)載需求實現(xiàn)
最近接到一個需求,由于文件服務(wù)器上傳文件后,不同節(jié)點之間共享文件需要延遲,上游上傳文件后立刻去下載,如果負(fù)載到其他節(jié)點上可能會找不到文件,所以使用文件服務(wù)器接入nacos根據(jù)相同的trace_id路由到一個節(jié)點上,這樣保證上傳后立刻下載的請求都能路由到同一個節(jié)點上,研究了兩天nacos原生的client發(fā)現(xiàn)并沒有提供相關(guān)功能,于是便產(chǎn)生了一個想法,手?jǐn)]一個客戶端負(fù)載。
要想同一個trace_id需要路由到相同的節(jié)點上,首先想到的方法就是利用hash算法,目前常用于分布式系統(tǒng)中負(fù)載的哈希算法分為兩種:
1.普通hash取模
2.一致性hash
普通hash雖然開發(fā)起來快,看起來也滿足需求,但是當(dāng)集群擴容或者縮容的時候,就會造成trace_id的hash結(jié)果與之前不同,可能不會路由到一個節(jié)點上。
而一致性hash在擴容縮容時只會影響哈希環(huán)中順時針方向的相鄰的節(jié)點, 對其他節(jié)點無影響。但是缺點時數(shù)據(jù)的分布和節(jié)點的位置有關(guān),因為這些節(jié)點不是均勻的分布在哈希環(huán)上的,不過根據(jù)選擇適當(dāng)?shù)膆ash算法也可以避免這個缺點,讓數(shù)據(jù)相對均勻的在hash環(huán)上分布。
網(wǎng)上關(guān)于一致性hash的討論已經(jīng)有很多了,在這里就放一張圖便于大家理解。
其他的不說,先上代碼
1.首先創(chuàng)建NacosClient,監(jiān)聽對應(yīng)的服務(wù):
public class NacosClient { //nacos監(jiān)聽器,處理初始化hash環(huán)等邏輯 private Nodelistener nodelistener; //初始化nacosClient,并且監(jiān)聽服務(wù) public void init() { NamingService naming = null; try { System.out.println(System.getProperty("serveAddr")); naming = NamingFactory.createNamingService(System.getProperty("serveAddr")); //注冊監(jiān)聽器,當(dāng)集群節(jié)點變化的時候調(diào)用nodelistener處理節(jié)點信息 naming.subscribe("test", event -> { if (event instanceof NamingEvent) { nodelistener.handlerChange((NamingEvent)event); } }); } catch (NacosException e) { e.printStackTrace(); } } }
2.創(chuàng)建Nodelistener,主要處理構(gòu)建hash環(huán)等邏輯:
public class Nodelistener { private List<Instance> servers; //利用treeMap構(gòu)建hash環(huán) private volatile SortedMap<Long, Instance> sortedMap = new TreeMap<Long, Instance>(); //虛擬節(jié)點 private int virtualNodeCount = 100; public synchronized void handlerChange(NamingEvent event) { List<Instance> servers = new ArrayList<Instance>(); event.getInstances().stream().filter(instance -> { return instance.isEnabled() && instance.isHealthy(); }).forEach(instance -> servers.add(instance)); this.onChange(servers); } //每次集群節(jié)點變化時,重新構(gòu)建hash環(huán) public void onChange(List<Instance> servers) { //只有一個節(jié)點的時候這里暫不考慮,讀者可以自行處理 if(servers.size() != 1) { SortedMap<Long, Instance> newSortedMap = new TreeMap<Long, Instance>(); for (int i = 0; i < servers.size(); i ++) { for (int j = 0; j < this.virtualNodeCount; j++) { //計算虛擬節(jié)點的hash,這里用到的是MurMurHash,網(wǎng)上還有很多其他hash實現(xiàn), //有興趣可以自行查閱,具體實現(xiàn)細(xì)節(jié)就不列出了 Long hash = HashUtil.getHash(servers.get(i).getIp() + ":" + servers.get(i).getPort() + j); //把虛擬節(jié)點加入hash環(huán) sortedMap.put(hash, servers.get(i)); } } sortedMap = newSortedMap; } this.servers = servers; } //根據(jù)傳入的key獲取hash環(huán)上順時針到hash環(huán)尾部部分所有節(jié)點 public Instance getInstance(String str) { Long hash = HashUtil.getHash(str); SortedMap<Long, Instance> map = sortedMap.tailMap(hash); //這里證明剛好獲取的是尾部,所以返回所有的節(jié)點,其實是獲取第一個節(jié)點 if (map.isEmpty()) { map = sortedMap; } return map.get(map.firstKey()); } }
其中,虛擬節(jié)點是一致性hash經(jīng)常用到的,主要是用于解決hash傾斜問題,即節(jié)點數(shù)比較少時,數(shù)據(jù)落在hash環(huán)上會造成不均衡,下圖即沒有虛擬節(jié)點的情況:
有虛擬節(jié)點的情況,這樣hash環(huán)就均勻分割,相應(yīng)數(shù)據(jù)落入的區(qū)間也會平衡:
3.負(fù)載均衡器:
public class LoadBalance { private Nodelistener nodelistener; //只需要簡單的從hash環(huán)中獲取第一個節(jié)點 public Instance doSelect(String key) { return nodelistener.getInstance(key); } }
測試下結(jié)果:
public void test2() { Map<String, Integer> map = new HashMap<String, Integer>(); Random random = new Random(); for (int i = 0; i < 10000; i ++) { String key = String.valueOf(random.nextLong()); Instance instance = loadBalance.doSelect(key); if(!map.containsKey(instance.getIp())) { map.put(instance.getIp(), 0); }else { map.put(instance.getIp(), map.get(instance.getIp()) + 1); } System.out.println("test2 count :" + i); System.out.printf("select IP is :" + instance.getIp()); } System.out.println(map.toString()); }
此處為了方便就直接用隨機數(shù)模擬trace_id,結(jié)果如下:
select IP is :127.0.0.0{127.0.0.4=2031, 127.0.0.3=2144, 127.0.0.2=1925, 127.0.0.1=1931, 127.0.0.0=1964}
可以看到10000次請求被均勻的分布到了4個節(jié)點上。
思考:
1. 本次我們使用到了treeMap構(gòu)建hash環(huán),那么treeMap構(gòu)建的hash具體的查找效率如何呢?
treeMap是由紅黑樹構(gòu)成的,其 containsKey(),get(),put(), remove() 方法時間復(fù)雜度均為O(logn),均是對數(shù)階,已經(jīng)算相當(dāng)不錯了。
2.在Nodelistener 中我們兩個方法都使用了synchronized 這樣會有什么影響?
首先因為treeMap是線程不安全的,所以我們都使用了方法級別的synchronized,所以兩個方法不會同時執(zhí)行,這樣使用treeMap時,不會造成線程不安全問題,其次可以保證我們在獲取hash環(huán)中節(jié)點的時候,treeMap不會因為節(jié)點變化而變化。但是這樣處理的話就會產(chǎn)生一個問題,我們正在計算trace_id的路由節(jié)點時,機器不巧縮容了,treeMap還沒進行更新,剛好路由到的節(jié)點時下線的機器,那么就會訪問失敗,筆者這里解決這個問題的思路是重試,如果失敗獲取下一個節(jié)點,此時文件服務(wù)器不同節(jié)點之間文件已經(jīng)同步完畢,所以不同節(jié)點訪問是沒問題的。
public void test(String key) throws InterruptedException { //這里可以設(shè)置重試次數(shù) for (;;) { Instance instance = loadBalance.doSelect(key); String addr = instance.getIp() + ":" + instance.getPort(); //測試請求 if(post(addr)) { //成功邏輯 ..... break; }else { //等待兩秒,即可以使文件服務(wù)器不同節(jié)點之間同步文件,還可以等待更新本地hash環(huán) Thread.sleep(2000); //失敗則選取下一個節(jié)點 instance = loadBalance.doSelect(key); //此處可以增加重試次數(shù)邏輯和如果重試到hash環(huán)上最后一個節(jié)點則重新獲取hash環(huán)第一個節(jié)點邏輯, // 在此就不做論述,讀者可以自由發(fā)揮 continue; } } }
那么我們考慮當(dāng)我們計算trace_id路由時,正好擴容的情況,此時treeMap還沒有進行更新,情況如下圖,我們路由到的節(jié)點如果不是圖中標(biāo)記的受影響區(qū)域則不會有影響,如果是圖中受影響的區(qū)域計算得出的路由是擴容前的也就是 127.0.0.2-1(真實節(jié)點是127.0.0.2),那么下次相同的trace_id則會路由到新節(jié)點,此時會出現(xiàn)同一個trace_id路由到的節(jié)點不一樣的問題,筆者在此處也使用的重試機制。(其實這個地方可以使用緩存Key和節(jié)點的關(guān)系,擴容后關(guān)系改變之后再改變圖中受影響的hash環(huán),但是因為trace_id比較特殊,并不適合緩存所有,所以使用了重試機制)
3.每次探知到服務(wù)器節(jié)點變化的時候都需要重新構(gòu)建hash環(huán),這樣操作會比較耗時,可以修改成每次節(jié)點變化只需要改變對應(yīng)虛擬節(jié)點信息,更新本地hash環(huán)時間,可以將onChange方法改造下。
public void onChange(List<Instance> newServers) { //單節(jié)點時這里暫不考慮 if(servers.size() != 1 ) { //TODO .. } Map<String, Instance> oldAddrs = this.servers.stream() .collect(Collectors.toMap(Instance::toInetAddr, instance -> instance)); Map<String, Instance> newAddrs = newServers.stream() .collect(Collectors.toMap(Instance::toInetAddr, instance -> instance)); //remove oldAddrs.forEach((key, value) -> { if (!newAddrs.containsKey(key)) { for(int j = 0; j < virtualNodeCount; j++) { Long hash = HashUtil.getHash( value.toInetAddr() + "&&VM"+ j); sortedMap.remove(hash); } } }); //add newAddrs.forEach((key, value) -> { if (!oldAddrs.containsKey(key)) { for(int j = 0; j < virtualNodeCount; j++) { Long hash = HashUtil.getHash(value.toInetAddr() + "&&VM" + j); sortedMap.put(hash, value); } } }); this.servers = newServers; }
以上就是nacos客戶端一致性hash負(fù)載需求實現(xiàn)的詳細(xì)內(nèi)容,更多關(guān)于nacos客戶端一致性hash負(fù)載的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
SpringBoot2.0整合Shiro框架實現(xiàn)用戶權(quán)限管理的示例
這篇文章主要介紹了SpringBoot2.0整合Shiro框架實現(xiàn)用戶權(quán)限管理的示例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-08-08java多線程實現(xiàn)服務(wù)器端與多客戶端之間的通信
本篇文章主要介紹了java多線程實現(xiàn)服務(wù)器端與多客戶端之間的通信,介紹了多線程來實現(xiàn)服務(wù)器與多線程之間的通信的基本步驟,有需要的小伙伴可以參考下。2016-10-10談?wù)凧ava利用原始HttpURLConnection發(fā)送POST數(shù)據(jù)
這篇文章主要給大家介紹java利用原始httpUrlConnection發(fā)送post數(shù)據(jù),設(shè)計到httpUrlConnection類的相關(guān)知識,感興趣的朋友跟著小編一起學(xué)習(xí)吧2015-10-10Eclipse新建項目不可選擇Java Project問題解決方案
這篇文章主要介紹了Eclipse新建項目不可選擇Java Project問題解決方案,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-07-07基于OpenCv與JVM實現(xiàn)加載保存圖像功能(JAVA?圖像處理)
openCv有一個名imread的簡單函數(shù),用于從文件中讀取圖像,本文給大家介紹JAVA?圖像處理基于OpenCv與JVM實現(xiàn)加載保存圖像功能,感興趣的朋友一起看看吧2022-01-01