欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

dubbo如何實(shí)現(xiàn)consumer從多個(gè)group中調(diào)用指定group的provider

 更新時(shí)間:2023年03月21日 10:30:14   作者:自東向西  
這篇文章主要介紹了dubbo如何實(shí)現(xiàn)consumer從多個(gè)group中調(diào)用指定group的provider問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教

背景

在工作中,遇到這樣的場(chǎng)景:

有個(gè)es索引構(gòu)建服務(wù),需要從各個(gè)業(yè)務(wù)服務(wù)獲取索引的信息,從而構(gòu)建索引,業(yè)務(wù)服務(wù)都實(shí)現(xiàn)同一個(gè)接口——IndexInfoProvider,通過設(shè)置不同的group來(lái)達(dá)到區(qū)分的效果(group就是es索引名)。

索引構(gòu)建服務(wù)在內(nèi)存維護(hù)了一個(gè)Map<String, IndexInfoProvider> providerMap,key是索引名——也就是provider的group,value是IndexInfoProvider服務(wù)的consumer。

為了圖方便,索引構(gòu)建服務(wù)還一次性初始化了所有consumer,假設(shè)總共有200個(gè)分組,那么Map就會(huì)緩存200個(gè)consumer,初始化的時(shí)候巨慢。如果不一次性初始化,而是按需的話,代碼會(huì)變得復(fù)雜一些,可能需要像雙重檢驗(yàn)這樣的措施。而且因?yàn)槭蔷彺妫厝灰矔?huì)遇到緩存一致性的問題,例如新增一個(gè)索引。

基于這樣的問題,就想著,dubbo的一個(gè)consumer,能不能按需調(diào)用指定分組的provider呢~

過程

為什么必須緩存那么多consumer

為什么有200個(gè)分組,就要緩存200個(gè)consumer,而不是像我們平時(shí)那樣,1個(gè)就可以呢?

業(yè)務(wù)服務(wù)在實(shí)現(xiàn)IndexInfoProvider接口的時(shí)候,都指定是分組,而且分組名,就是對(duì)應(yīng)的索引名。

相應(yīng)的,消費(fèi)者就必須指定消費(fèi)的分組,200個(gè)索引,自然就需要200個(gè)consumer。

consumer只能調(diào)用一個(gè)group的provider么

在我們?nèi)粘5氖褂弥?,consumer基本都是只能調(diào)用一個(gè)group的provider。但實(shí)際上,dubbo是支持調(diào)用多個(gè)group的provider的。

這樣寫是指定a分組和b分組,多個(gè)分組之間用英文逗號(hào)分隔

這樣寫,是所有的分組

group="*"能代替Map<String, IndexInfoProvider>么

如果將es索引構(gòu)建服務(wù)的IndexInfoProvider接口的consumer分組設(shè)置為group="*",然后在調(diào)用接口的時(shí)候,根據(jù)需要,從一堆provider中篩選出指定group的provider,只調(diào)用這些provider,是不是就可以代替Map緩存了?

初步方案:

  • 利用ThreadLocal來(lái)指定要調(diào)用的分組,在調(diào)用方法前設(shè)置group到ThreadLocal中。
  • 實(shí)現(xiàn)dubbo的負(fù)載均衡拓展,只選取指定分組的節(jié)點(diǎn)來(lái)調(diào)用。
  • 調(diào)用結(jié)束,清理ThreadLocal中的分組信息。

似乎是可行的?

理想很美好,現(xiàn)實(shí)很骨感

通過源碼和代碼debug發(fā)現(xiàn),consumer持有的Invoker,有點(diǎn)像一個(gè)責(zé)任鏈。

如果是單分組——!group.contains(",") && !group.equals("*"),那么會(huì)是這樣:

MockClusterInvoker->FailoverClusterInvoker

如果是多分組,則會(huì)變成:

MockClusterInvoker->MergeableClusterInvoker->FailoverClusterInvoker

負(fù)載均衡的機(jī)制,是得到了FailoverClusterInvoker才會(huì)生效

MockClusterInvoker只是一種降級(jí)機(jī)制,不是導(dǎo)致問題的原因

問題是出在MergeableClusterInvoker,下面是導(dǎo)致問題的代碼!??!

代碼大概的意思是:消費(fèi)者沒有指定合并策略,那么就會(huì)調(diào)用第一個(gè)有效的服務(wù)提供者,如果都是無(wú)效,就直接調(diào)用第一個(gè)。

我是沒有指定合并策略的,所以會(huì)變成調(diào)用第一個(gè)有效的服務(wù)提供者。根本走不到負(fù)載均衡那里。

不指定合并策略不行,那指定呢?指定合并策略會(huì)怎樣呢?

指定了合并策略,會(huì)調(diào)用所有invoker,然后用Merger合并結(jié)果。

這種很適合這樣種場(chǎng)景:

一部分?jǐn)?shù)據(jù)在服務(wù)A,一部分?jǐn)?shù)據(jù)在服務(wù)B,需要分別調(diào)用A和B,然后把兩者的結(jié)果集合并

設(shè)想著這樣的方案:

  • 指定了合并策略,那么所有分組的invoker都會(huì)被調(diào)用到,那么請(qǐng)求就到FailoverClusterInvoker了,負(fù)載均衡spi就生效了
  • 修改一下負(fù)載均衡spi,如果目前調(diào)用的invoker不是指定分組的,那么就直接返回null
  • 實(shí)現(xiàn)自己的合并spi,返回第一個(gè)不會(huì)null的結(jié)果

雖然不是什么優(yōu)雅的方式,但是,似乎是能做到的???

實(shí)際上,行不通,達(dá)不到想要的效果。原因是MergeableClusterInvoker內(nèi)部是用線程池并發(fā)調(diào)用的,ThreadLocal里的分組信息會(huì)丟失。

還有機(jī)會(huì)么

MergeableClusterInvoker類是沒有留拓展的余地啦,還有其他機(jī)會(huì)么?MergeableClusterInvoker的Invokers列表從哪里來(lái)的?

Invokers是從directory來(lái)的,這里有沒有拓展的余地呢?

有的,在AbstractDirectory的list方法

看到這,發(fā)現(xiàn)用路由spi,還是有機(jī)會(huì)的,如果實(shí)現(xiàn)動(dòng)態(tài)路由,每次只給MergeableClusterInvoker返回指定分組的Invoker,是不是就可以呢?怎么讓routers包含我們自定義的路由spi呢?

需要url上攜帶了router參數(shù),但是這里的url的參數(shù)是由誰(shuí)決定的?@Reference 注解是沒有沒有的指定router參數(shù)的…

最后debuig,兜兜轉(zhuǎn)轉(zhuǎn),發(fā)現(xiàn)只能靠重寫ReferenceConfig類的loadRegistries方法,往url上加上動(dòng)態(tài)路由的參數(shù)。

成果

AssignGroupRouterFactory

public class AssignGroupRouterFactory implements RouterFactory {
    public static final String NAME = "assignGroup";

    @Override
    public Router getRouter(URL url) {
        return new AssignGroupRouter(url);
    }
}

AssignGroupRouter

public class AssignGroupRouter implements Router {
    private final URL url;

    public AssignGroupRouter(URL url) {
        this.url = url;
    }

    @Override
    public URL getUrl() {
        return url;
    }

    @Override
    public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
        String assignGroup = IndexInfoProviderInvoker.getAssignGroup();
        if (Objects.isNull(assignGroup)) {
            return invokers;
        }
        return invokers.stream()
            .filter(invoker -> {
                URL invokerUrl = invoker.getUrl();
                return Objects.equals(assignGroup, invokerUrl.getParameter(Constants.GROUP_KEY));
            }).collect(Collectors.toList());
    }

    @Override
    public int compareTo(Router o) {
        return 1;
    }
}


AssignGroupReferenceConfig

public class AssignGroupReferenceConfig<T> extends ReferenceConfig<T> {
    @Override
    protected List<URL> loadRegistries(boolean provider) {
        List<URL> urls = super.loadRegistries(provider);
        if (CollectionUtils.isEmpty(urls)) {
            return urls;
        }
        // 指定動(dòng)態(tài)路由,路由方式為assignGroup
        return urls.stream()
            .map(url -> url.addParameter(Constants.ROUTER_KEY, "assignGroup").addParameter(Constants.RUNTIME_KEY,
                "true"))
            .collect(Collectors.toList());
    }
}

IndexInfoProviderInvokerProxy

@Component
public class IndexInfoProviderInvokerProxy {
    public static final ThreadLocal<String> INDEX_TYPE_THREAD_LOCAL = new ThreadLocal<>();

    private final IndexInfoProvider indexInfoProvider;

    public IndexInfoProviderInvokerProxy(IndexInfoProvider indexInfoProvider) {
        this.indexInfoProvider = indexInfoProvider;
    }

    /**
     * 獲取文檔id 在[start, end) 之間的所有索引文檔
     *
     * @param indexType 索引名
     * @param start 起始id
     * @param end 結(jié)束id
     * @return 文檔列表
     */
    public List<IndexDocument> getDocsByIdRange(String indexType, long start, long end) {
        return invokeWitchIndexType(indexType, () -> indexInfoProvider.getDocsByIdRange(start, end));
    }

    private <T> T invokeWitchIndexType(String indexType, Supplier<T> supplier) {
        INDEX_TYPE_THREAD_LOCAL.set(indexType);
        T result = supplier.get();
        INDEX_TYPE_THREAD_LOCAL.remove();
        return result;
    }
}

總結(jié)

以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。

相關(guān)文章

最新評(píng)論