欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

java開發(fā)Dubbo負載均衡與集群容錯示例詳解

 更新時間:2021年11月15日 14:59:27   作者:又蠢又笨的懶羊羊程序猿  
這篇文章主要為大家介紹了java開發(fā)Dubbo負載均衡與集群容錯示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步

負載均衡與集群容錯

Invoker

在Dubbo中Invoker就是一個具有調(diào)用功能的對象,在服務(wù)提供端就是實際的服務(wù)實現(xiàn),只是將服務(wù)實現(xiàn)封裝起來變成一個Invoker。

在服務(wù)消費端,從注冊中心得到服務(wù)提供者的信息之后,將一條條信息封裝為Invoker,這個Invoker就具備了遠程調(diào)用的能力。

綜上,Dubbo就是創(chuàng)建了一個統(tǒng)一的模型,將可調(diào)用(可執(zhí)行體)的服務(wù)對象都統(tǒng)一封裝為Invoker。

ClusterInvoker就是將多個服務(wù)引入的Invoker封裝起來,對外統(tǒng)一暴露一個Invoker,并且賦予這些Invoker集群容錯的功能。

服務(wù)目錄

服務(wù)目錄,即Directory,實際上它就是多個Invoker的集合,服務(wù)提供端一般都會集群分布,同樣的服務(wù)會有多個提供者,因此需要一個服務(wù)目錄來統(tǒng)一存放它們,需要調(diào)用服務(wù)的時候便從這個服務(wù)目錄中進行挑選。

同時服務(wù)目錄還是實現(xiàn)了NotifyListener接口,當集群中新增了一臺服務(wù)提供者或者下線了一臺服務(wù)提供者,目錄都會對服務(wù)提供者進行更新,新增或者刪除對應(yīng)的Invoker。

在這里插入圖片描述

從上圖中,可以看到用了一個抽象類AbstractDirectory來實現(xiàn) Directory接口,抽象類中運用到了模板方法模式,將一些公共方法和邏輯寫好,作為一個骨架,然后具體實現(xiàn)由了兩個子類來完成,兩個子類分別為StaticDirectoryRegistryDirectory。

RegistryDirectory

RegistryDirectory實現(xiàn)了NotifyListener接口,可以監(jiān)聽注冊中心的變化,當注冊中心配置發(fā)生變化時,服務(wù)目錄也可以收到變更通知,然后根據(jù)更新之后的配置刷新Invoker列表。

由此可知RegistryDirectory共有三個作用:

獲取Invoker列表監(jiān)聽注冊中心刷新Invoker列表

獲取Invoker列表

RegistryDirectory實現(xiàn)了父類AbstractDirectory的抽象方法doList(),該方法可以得到Invoker列表

public List<Invoker<T>> doList(Invocation invocation) {
    if (this.forbidden) {
        throw new RpcException(....);
    } else {
        List<Invoker<T>> invokers = null;
        Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap;	//獲取方法調(diào)用名和Invoker的映射表
        if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
            String methodName = RpcUtils.getMethodName(invocation);
            Object[] args = RpcUtils.getArguments(invocation);
            //以下就是根據(jù)方法名和方法參數(shù)獲取可調(diào)用的Invoker
            if (args != null && args.length > 0 && args[0] != null && (args[0] instanceof String || args[0].getClass().isEnum())) {
                invokers = (List)localMethodInvokerMap.get(methodName + "." + args[0]);
            }
            if (invokers == null) {
                invokers = (List)localMethodInvokerMap.get(methodName);
            }
            if (invokers == null) {
                invokers = (List)localMethodInvokerMap.get("*");
            }
            if (invokers == null) {
                Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
                if (iterator.hasNext()) {
                    invokers = (List)iterator.next();
                }
            }
        }
        return (List)(invokers == null ? new ArrayList(0) : invokers);
    }
}

監(jiān)聽注冊中心

通過實現(xiàn)NotifyListener接口可以感知注冊中心的數(shù)據(jù)變更。

RegistryDirectory定義了三個集合invokerUrls routerUrls configuratorUrls分別處理對應(yīng)的配置然后轉(zhuǎn)化成對象。

public synchronized void notify(List<URL> urls) {
    List<URL> invokerUrls = new ArrayList();
    List<URL> routerUrls = new ArrayList();
    List<URL> configuratorUrls = new ArrayList();
    Iterator i$ = urls.iterator();
    while(true) {
        while(true) {
            while(i$.hasNext()) {
				//....根據(jù)urls填充上述三個列表
            }
            if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
                this.configurators = toConfigurators(configuratorUrls);		//根據(jù)urls轉(zhuǎn)化為configurators配置
            }
            List localConfigurators;
            if (routerUrls != null && !routerUrls.isEmpty()) {
                localConfigurators = this.toRouters(routerUrls);
                if (localConfigurators != null) {
                    this.setRouters(localConfigurators);		//根據(jù)urls轉(zhuǎn)化為routers配置
                }
            }
            localConfigurators = this.configurators;
            this.overrideDirectoryUrl = this.directoryUrl;
            Configurator configurator;
            if (localConfigurators != null && !localConfigurators.isEmpty()) {
                for(Iterator i$ = localConfigurators.iterator(); i$.hasNext(); this.overrideDirectoryUrl = configurator.configure(this.overrideDirectoryUrl)) {
                    configurator = (Configurator)i$.next();
                }
            }
            this.refreshInvoker(invokerUrls);		//根據(jù)invokerUrls刷新invoker列表
            return;
        }
    }
}

刷新Invoker列表

private void refreshInvoker(List<URL> invokerUrls) {
    //如果invokerUrls只有一個URL并且協(xié)議是empty,那么清除所有invoker
    if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null && "empty".equals(((URL)invokerUrls.get(0)).getProtocol())) {
        this.forbidden = true;
        this.methodInvokerMap = null;
        this.destroyAllInvokers();
    } else {
        this.forbidden = false;
        Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap;		//獲取舊的Invoker列表        
        if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
            invokerUrls.addAll(this.cachedInvokerUrls);
        } else {
            this.cachedInvokerUrls = new HashSet();
            this.cachedInvokerUrls.addAll(invokerUrls);
        }
        if (invokerUrls.isEmpty()) {
            return;
        }
	    //根據(jù)URL生成InvokerMap
        Map<String, Invoker<T>> newUrlInvokerMap = this.toInvokers(invokerUrls);
        //根據(jù)新的InvokerMap生成方法名和Invoker列表對應(yīng)的Map
        Map<String, List<Invoker<T>>> newMethodInvokerMap = this.toMethodInvokers(newUrlInvokerMap);
        if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
            logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
            return;
        }		
        this.methodInvokerMap = this.multiGroup ? this.toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
        this.urlInvokerMap = newUrlInvokerMap;

        try {
            this.destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap);		//銷毀無效的Invoker
        } catch (Exception var6) {
            logger.warn("destroyUnusedInvokers error. ", var6);
        }
    }
}

上述操作就是根據(jù)invokerUrls數(shù)量以及協(xié)議頭是否為empty來判斷是否禁用所有invokers,如果不禁用的話將invokerUrls轉(zhuǎn)化為Invoker,并且得到<url,Invoker>的映射關(guān)系。

再進一步進行轉(zhuǎn)化,得到<methodName,List>的映射關(guān)系,再將同一組的Invoker進行合并,將合并結(jié)果賦值給methodInvokerMap,這個methodInvokerMap就是在doList中使用到的Map。

最后刷新InvokerMap,銷毀無效的Invoker。

StaticDirectory

StaticDirectory是靜態(tài)目錄,所有Invoker是固定的不會刪減的,并且所有Invoker由構(gòu)造器來傳入。

內(nèi)部邏輯也相當簡單,只定義了一個列表用于存儲Invokers。實現(xiàn)父類的方法也只是將這些Invokers原封不動地返回。

private final List<Invoker<T>> invokers;

protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {
    return this.invokers;
}

服務(wù)路由

服務(wù)路由規(guī)定了服務(wù)消費者可以調(diào)用哪些服務(wù)提供者,Dubbo常用的是條件路由ConditionRouter

條件路由由兩個條件組成,格式為[服務(wù)消費者匹配條件] => [服務(wù)提供者匹配條件],例如172.26.29.15 => 172.27.19.89規(guī)定了只有IP為172.26.29.15的服務(wù)消費者才可以訪問IP為172.27.19.89的服務(wù)提供者,不可以調(diào)用其他的服務(wù)。

路由一樣是通過RegistryDirectory中的notify()更新的,在調(diào)用toMethodInvokers()的時候會進行服務(wù)器級別的路由和方法級別的路由。

Cluster

在前面的流程中我們已經(jīng)通過Directory獲取了服務(wù)目錄,并且通過路由獲取了一個或多個Invoker,但是對于服務(wù)消費者還是需要進行選擇,篩選出一個Invoker進行調(diào)用。

Dubbo默認的Cluster實現(xiàn)有多種,如下:

FailoverClusterFailfastClusterFailsafeClusterFailbackClusterBroadcastClusterAvailableCluster

每個Cluster內(nèi)部返回的都是xxxClusterInvoker,例如FailoverCluster:

public class FailoverCluster implements Cluster {
    public static final String NAME = "failover";
    public FailoverCluster() {
    }
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        return new FailoverClusterInvoker(directory);
    }
}

FailoverClusterInvoker

FailoverClusterInvoker實現(xiàn)的功能是失敗調(diào)用(有重試次數(shù))自動切換。

public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    List<Invoker<T>> copyinvokers = invokers;
    this.checkInvokers(invokers, invocation);
    //重試次數(shù)
    int len = this.getUrl().getMethodParameter(invocation.getMethodName(), "retries", 2) + 1;
    if (len <= 0) {
        len = 1;
    }
    RpcException le = null;
    List<Invoker<T>> invoked = new ArrayList(invokers.size());
    Set<String> providers = new HashSet(len);

    //根據(jù)重試次數(shù)循環(huán)調(diào)用
    for(int i = 0; i < len; ++i) {
        if (i > 0) {
            this.checkWhetherDestroyed();
            copyinvokers = this.list(invocation);
            this.checkInvokers(copyinvokers, invocation);
        }	    
        //負載均衡篩選出一個Invoker作本次調(diào)用
        Invoker<T> invoker = this.select(loadbalance, invocation, copyinvokers, invoked);
        //將使用過的Invoker保存起來,下次重試時做過濾用
        invoked.add(invoker);
        //記錄到上下文中
        RpcContext.getContext().setInvokers(invoked);
        try {
            //發(fā)起調(diào)用
            Result result = invoker.invoke(invocation);
            if (le != null && logger.isWarnEnabled()) {
                logger.warn("....");
            }
			
            Result var12 = result;
            return var12;
        } catch (RpcException var17) {	//catch異常 繼續(xù)下次循環(huán)重試
            if (var17.isBiz()) {
                throw var17;
            }

            le = var17;
        } catch (Throwable var18) {
            le = new RpcException(var18.getMessage(), var18);
        } finally {
            providers.add(invoker.getUrl().getAddress());
        }
    }

    throw new RpcException(....);
}

上述方法中,首先獲取重試次數(shù)len,根據(jù)重試次數(shù)進行循環(huán)調(diào)用,調(diào)用發(fā)生異常會被catch住,然后重新調(diào)用。

每次循環(huán)會通過負載均衡選出一個Invoker,然后利用這個Invoker進行遠程調(diào)用,每次選出的Invoker會記錄下來,在下次調(diào)用的select()中會將使用上次調(diào)用的Invoker進行重試,如果上一次沒有調(diào)用或者上次調(diào)用的Invoker下線了,那么會重新進行負載均衡進行選擇。

FailfastClusterInvoker

FailfastClusterInvoker只會進行一次遠程調(diào)用,如果失敗后立馬拋出異常。

public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    this.checkInvokers(invokers, invocation);
    Invoker invoker = this.select(loadbalance, invocation, invokers, (List)null);	//負載均衡選擇Invoker

    try {
        return invoker.invoke(invocation);		//發(fā)起遠程調(diào)用
    } catch (Throwable var6) {	//失敗調(diào)用直接將錯誤拋出
        if (var6 instanceof RpcException && ((RpcException)var6).isBiz()) {
            throw (RpcException)var6;
        } else {
            throw new RpcException(....);
        }
    }
}

FailsafeClusterInvoker

FailsafeClusterInvoker是一種安全失敗的cluster,調(diào)用發(fā)生錯誤僅僅是記錄一下日志,然后就返回了空結(jié)果。

public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    try {
        this.checkInvokers(invokers, invocation);
        //負載均衡選出Invoker后直接進行調(diào)用
        Invoker<T> invoker = this.select(loadbalance, invocation, invokers, (List)null);	
        return invoker.invoke(invocation);
    } catch (Throwable var5) {	//調(diào)用錯誤只是打印日志
        logger.error("Failsafe ignore exception: " + var5.getMessage(), var5);
        return new RpcResult();
    }
}

FailbackClusterInvoker

FailbackClusterInvoker調(diào)用失敗后,會記錄下本次調(diào)用,然后返回一個空結(jié)果給服務(wù)消費者,并且會通過一個定時任務(wù)對失敗的調(diào)用進行重試。適用于執(zhí)行消息通知等最大努力場景。

protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    try {
        this.checkInvokers(invokers, invocation);
        //負載均衡選出Invoker
        Invoker<T> invoker = this.select(loadbalance, invocation, invokers, (List)null);
        //執(zhí)行調(diào)用,執(zhí)行成功返回調(diào)用結(jié)果
        return invoker.invoke(invocation);
    } catch (Throwable var5) {
        //調(diào)用失敗
        logger.error("....");
        //記錄下本次失敗調(diào)用
        this.addFailed(invocation, this);
        //返回空結(jié)果
        return new RpcResult();
    }
}
private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) {
    if (this.retryFuture == null) {
        synchronized(this) {
            //如果未創(chuàng)建重試本次調(diào)用的定時任務(wù)
            if (this.retryFuture == null) {
                //創(chuàng)建定時任務(wù)
                this.retryFuture = this.scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
                    public void run() {
                        try {
                            //定時進行重試
                            FailbackClusterInvoker.this.retryFailed();
                        } catch (Throwable var2) {
                            FailbackClusterInvoker.logger.error("....", var2);
                        }

                    }
                }, 5000L, 5000L, TimeUnit.MILLISECONDS);
            }
        }
    }	
    //將invocation和router存入map
    this.failed.put(invocation, router);
}
void retryFailed() {
    if (this.failed.size() != 0) {
        Iterator i$ = (new HashMap(this.failed)).entrySet().iterator();
        while(i$.hasNext()) {
            Entry<Invocation, AbstractClusterInvoker<?>> entry = (Entry)i$.next();
            Invocation invocation = (Invocation)entry.getKey();
            Invoker invoker = (Invoker)entry.getValue();
            try {
                //進行重試調(diào)用
                invoker.invoke(invocation);
                //調(diào)用成功未產(chǎn)生異常則移除本次失敗調(diào)用的記錄,銷毀定時任務(wù)
                this.failed.remove(invocation);
            } catch (Throwable var6) {
                logger.error("....", var6);
            }
        }

    }
}

邏輯比較簡單,大致就是當調(diào)用錯誤時返回空結(jié)果,并記錄下本次失敗調(diào)用到failed<invocation,router>中,并且會創(chuàng)建一個定時任務(wù)定時地去調(diào)用failed中記錄的失敗調(diào)用,如果調(diào)用成功了就從failed中移除這個調(diào)用。

ForkingClusterInvoker

ForkingClusterInvoker運行時,會將所有Invoker都放入線程池中并發(fā)調(diào)用,只要有一個Invoker調(diào)用成功了就返回結(jié)果,doInvoker方法立即停止運行。

適用于對實時性比較高的讀寫操作。

public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    Result var19;
    try {
        this.checkInvokers(invokers, invocation);
        int forks = this.getUrl().getParameter("forks", 2);
        int timeout = this.getUrl().getParameter("timeout", 1000);        
        final Object selected;
        if (forks > 0 && forks < invokers.size()) {
            selected = new ArrayList();

            for(int i = 0; i < forks; ++i) {
                Invoker<T> invoker = this.select(loadbalance, invocation, invokers, (List)selected);
                if (!((List)selected).contains(invoker)) {
                    //選擇好的Invoker放入這個selected列表
                    ((List)selected).add(invoker);
                }
            }
        } else {
            selected = invokers;
        }
        RpcContext.getContext().setInvokers((List)selected);
        final AtomicInteger count = new AtomicInteger();
        //阻塞隊列
        final BlockingQueue<Object> ref = new LinkedBlockingQueue();
        Iterator i$ = ((List)selected).iterator();
        while(i$.hasNext()) {
            final Invoker<T> invoker = (Invoker)i$.next();
            this.executor.execute(new Runnable() {
                public void run() {
                    try {
                        Result result = invoker.invoke(invocation);
                        ref.offer(result);
                    } catch (Throwable var3) {
                        int value = count.incrementAndGet();
                        if (value >= ((List)selected).size()) {		//等待所有調(diào)用都產(chǎn)生異常才入隊
                            ref.offer(var3);
                        }
                    }

                }
            });
        }
        try {
            //阻塞獲取結(jié)果
            Object ret = ref.poll((long)timeout, TimeUnit.MILLISECONDS);
            if (ret instanceof Throwable) {
                Throwable e = (Throwable)ret;
                throw new RpcException(....);
            }
            var19 = (Result)ret;
        } catch (InterruptedException var14) {
            throw new RpcException(....);
        }
    } finally {
        RpcContext.getContext().clearAttachments();
    }
    return var19;
}

BroadcastClusterInvoker

BroadcastClusterInvoker運行時會將所有Invoker逐個調(diào)用,在最后判斷中如果有一個調(diào)用產(chǎn)生錯誤,則拋出異常。

適用于通知所有提供者更新緩存或日志等本地資源的場景。

public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    this.checkInvokers(invokers, invocation);
    RpcContext.getContext().setInvokers(invokers);
    RpcException exception = null;
    Result result = null;
    Iterator i$ = invokers.iterator();
    while(i$.hasNext()) {
        Invoker invoker = (Invoker)i$.next();
        try {
            result = invoker.invoke(invocation);
        } catch (RpcException var9) {
            exception = var9;
            logger.warn(var9.getMessage(), var9);
        } catch (Throwable var10) {
            exception = new RpcException(var10.getMessage(), var10);
            logger.warn(var10.getMessage(), var10);
        }
    }	
    //如果調(diào)用過程中發(fā)生過錯誤 拋出異常
    if (exception != null) {
        throw exception;
    } else {
        //返回調(diào)用結(jié)果
        return result;
    }
}

AbstractClusterInvoker

AbstractClusterInvoker是上述所有類的父類,內(nèi)部結(jié)構(gòu)較為簡單。AvailableCluster內(nèi)部返回結(jié)果就是AvailableClusterInvoker。

public class AvailableCluster implements Cluster {
    public static final String NAME = "available";

    public AvailableCluster() {
    }   
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        return new AbstractClusterInvoker<T>(directory) {
            public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
                Iterator i$ = invokers.iterator();

                Invoker invoker;
                do {		//循環(huán)判斷:哪個invoker能用就調(diào)用哪個
                    if (!i$.hasNext()) {
                        throw new RpcException("No provider available in " + invokers);
                    }

                    invoker = (Invoker)i$.next();
                } while(!invoker.isAvailable());

                return invoker.invoke(invocation);
            }
        };
    }
}

小結(jié)

上述中有很多種集群的實現(xiàn),各適用于不同的場景,加了Cluster這個中間層,向服務(wù)消費者屏蔽了集群調(diào)用的細節(jié),并且支持不同場景使用不同的模式。

負載均衡

Dubbo中的負載均衡,即LoadBalance,服務(wù)提供者一般都是集群分布,所以需要Dubbo選擇出合適的服務(wù)提供者來給服務(wù)消費者調(diào)用。

Dubbo中提供了多種負載均衡算法:

RandomLoadBalanceLeastActiveLoadBalanceConsistentHashLoadBalanceRoundRobinLoadBalance

AbstractLoadBalance

實現(xiàn)類都繼承了于這個類,該類實現(xiàn)了LoadBalance,使用模板方法模式,將一些公用的邏輯封裝好,而具體的實現(xiàn)由子類自定義。

public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        if (invokers != null && !invokers.isEmpty()) {
            //子類實現(xiàn)
            return invokers.size() == 1 ? (Invoker)invokers.get(0) : this.doSelect(invokers, url, invocation);
        } else {
            return null;
        }
}
protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> var1, URL var2, Invocation var3);

服務(wù)剛啟動需要預(yù)熱,不能突然讓服務(wù)負載過高,需要進行服務(wù)的降權(quán)。

protected int getWeight(Invoker<?> invoker, Invocation invocation) {
    int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), "weight", 100);	//獲得權(quán)重
    if (weight > 0) {
        long timestamp = invoker.getUrl().getParameter("remote.timestamp", 0L);		//啟動時間
        if (timestamp > 0L) {
            int uptime = (int)(System.currentTimeMillis() - timestamp);   	//計算已啟動時長
            int warmup = invoker.getUrl().getParameter("warmup", 600000);
            if (uptime > 0 && uptime < warmup) {
                weight = calculateWarmupWeight(uptime, warmup, weight);		//降權(quán)
            }
        }
    }
    return weight;
}

RandomLoadBalance

使用了加權(quán)隨機算法,假設(shè)現(xiàn)在有三個節(jié)點A,B,C,然后賦予這幾個節(jié)點一定權(quán)重,分別為1,2,3,那么可計算得到總權(quán)重為6,那么這幾個節(jié)點被訪問的可能性分別為1/6,2/6,3/6。

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    int length = invokers.size();		//Invoker個數(shù)
    int totalWeight = 0;		//總權(quán)重
    boolean sameWeight = true;     //權(quán)重是否相同
    int offset;			
    int i;
    for(offset = 0; offset < length; ++offset) {	
        i = this.getWeight((Invoker)invokers.get(offset), invocation);		//得到權(quán)重
        totalWeight += i;	  //計算總權(quán)重
        //是否權(quán)重都相同
        if (sameWeight && offset > 0 && i != this.getWeight((Invoker)invokers.get(offset - 1), invocation)) {
            sameWeight = false;		
        }
    }
    if (totalWeight > 0 && !sameWeight) {
        offset = this.random.nextInt(totalWeight);		//獲得隨機偏移量		
       	//判斷偏移量落在哪個片段上
        for(i = 0; i < length; ++i) {
            offset -= this.getWeight((Invoker)invokers.get(i), invocation);
            if (offset < 0) {
                return (Invoker)invokers.get(i);		
            }
        }
    }	
    return (Invoker)invokers.get(this.random.nextInt(length));
}

LeastActiveLoadBalance

最少活躍數(shù)負載均衡,接收一個請求后,請求活躍數(shù)+1,處理完一個請求后,請求活躍數(shù)-1,請求活躍數(shù)少既說明現(xiàn)在服務(wù)器壓力小也說明該服務(wù)器處理請求快,沒有堆積什么請求。

總的流程是先遍歷Invokers列表,尋找當前請求活躍數(shù)最少的Invoker,如果有多個Invoker具有相同的最小請求活躍數(shù),則根據(jù)他們的權(quán)重來進行篩選。

ConsistentHashLoadBalance

在這里插入圖片描述

將服務(wù)器的IP等信息生成一個Hash值,將這個值映射到Hash圓環(huán)上作為某個節(jié)點,當查找節(jié)點時,通過一個Key來順時針查找。

Dubbo還引入了160個虛擬節(jié)點,使得數(shù)據(jù)更加分散,避免請求積壓在某個節(jié)點上。

并且Hash值是方法級別的,一個服務(wù)的每個方法都有一個ConsistentHashSelector,根據(jù)參數(shù)值來計算得出Hash值,

RoundRobinLoadBalance

加權(quán)輪詢負載均衡,這種輪詢是平滑的,假設(shè)A和B的權(quán)重為10:30,那么輪詢的結(jié)果可能是A、B、B、A、A、B、B、B…,40次調(diào)用下來A調(diào)用了10次,B調(diào)用了30次。

總結(jié)

在這里插入圖片描述

服務(wù)引入時,會將多個遠程調(diào)用塞入Directory,然后通過Cluster來封裝,同時根據(jù)需要提供各種容錯功能,最終統(tǒng)一暴露一個Invoker給服務(wù)消費者,服務(wù)消費者調(diào)用的時候會從目錄得到Invoker列表,經(jīng)過路由的過濾以及負載均衡最終得到一個Invoker發(fā)起調(diào)用。

以上就是java開發(fā)Dubbo負載均衡與集群容錯示例詳解的詳細內(nèi)容,如有不足或錯誤歡迎指正。

更多關(guān)于Dubbo負載均衡與集群容錯的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評論