欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

dubbo如何實現(xiàn)consumer從多個group中調(diào)用指定group的provider

 更新時間:2023年03月21日 10:30:14   作者:自東向西  
這篇文章主要介紹了dubbo如何實現(xiàn)consumer從多個group中調(diào)用指定group的provider問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教

背景

在工作中,遇到這樣的場景:

有個es索引構(gòu)建服務(wù),需要從各個業(yè)務(wù)服務(wù)獲取索引的信息,從而構(gòu)建索引,業(yè)務(wù)服務(wù)都實現(xiàn)同一個接口——IndexInfoProvider,通過設(shè)置不同的group來達到區(qū)分的效果(group就是es索引名)。

索引構(gòu)建服務(wù)在內(nèi)存維護了一個Map<String, IndexInfoProvider> providerMap,key是索引名——也就是provider的group,value是IndexInfoProvider服務(wù)的consumer。

為了圖方便,索引構(gòu)建服務(wù)還一次性初始化了所有consumer,假設(shè)總共有200個分組,那么Map就會緩存200個consumer,初始化的時候巨慢。如果不一次性初始化,而是按需的話,代碼會變得復雜一些,可能需要像雙重檢驗這樣的措施。而且因為是緩存,必然也會遇到緩存一致性的問題,例如新增一個索引。

基于這樣的問題,就想著,dubbo的一個consumer,能不能按需調(diào)用指定分組的provider呢~

過程

為什么必須緩存那么多consumer

為什么有200個分組,就要緩存200個consumer,而不是像我們平時那樣,1個就可以呢?

業(yè)務(wù)服務(wù)在實現(xiàn)IndexInfoProvider接口的時候,都指定是分組,而且分組名,就是對應的索引名。

相應的,消費者就必須指定消費的分組,200個索引,自然就需要200個consumer。

consumer只能調(diào)用一個group的provider么

在我們?nèi)粘5氖褂弥校琧onsumer基本都是只能調(diào)用一個group的provider。但實際上,dubbo是支持調(diào)用多個group的provider的。

這樣寫是指定a分組和b分組,多個分組之間用英文逗號分隔

這樣寫,是所有的分組

group="*"能代替Map<String, IndexInfoProvider>么

如果將es索引構(gòu)建服務(wù)的IndexInfoProvider接口的consumer分組設(shè)置為group="*",然后在調(diào)用接口的時候,根據(jù)需要,從一堆provider中篩選出指定group的provider,只調(diào)用這些provider,是不是就可以代替Map緩存了?

初步方案:

  • 利用ThreadLocal來指定要調(diào)用的分組,在調(diào)用方法前設(shè)置group到ThreadLocal中。
  • 實現(xiàn)dubbo的負載均衡拓展,只選取指定分組的節(jié)點來調(diào)用。
  • 調(diào)用結(jié)束,清理ThreadLocal中的分組信息。

似乎是可行的?

理想很美好,現(xiàn)實很骨感

通過源碼和代碼debug發(fā)現(xiàn),consumer持有的Invoker,有點像一個責任鏈。

如果是單分組——!group.contains(",") && !group.equals("*"),那么會是這樣:

MockClusterInvoker->FailoverClusterInvoker

如果是多分組,則會變成:

MockClusterInvoker->MergeableClusterInvoker->FailoverClusterInvoker

負載均衡的機制,是得到了FailoverClusterInvoker才會生效

MockClusterInvoker只是一種降級機制,不是導致問題的原因

問題是出在MergeableClusterInvoker,下面是導致問題的代碼?。?!

代碼大概的意思是:消費者沒有指定合并策略,那么就會調(diào)用第一個有效的服務(wù)提供者,如果都是無效,就直接調(diào)用第一個。

我是沒有指定合并策略的,所以會變成調(diào)用第一個有效的服務(wù)提供者。根本走不到負載均衡那里。

不指定合并策略不行,那指定呢?指定合并策略會怎樣呢?

指定了合并策略,會調(diào)用所有invoker,然后用Merger合并結(jié)果。

這種很適合這樣種場景:

一部分數(shù)據(jù)在服務(wù)A,一部分數(shù)據(jù)在服務(wù)B,需要分別調(diào)用A和B,然后把兩者的結(jié)果集合并

設(shè)想著這樣的方案:

  • 指定了合并策略,那么所有分組的invoker都會被調(diào)用到,那么請求就到FailoverClusterInvoker了,負載均衡spi就生效了
  • 修改一下負載均衡spi,如果目前調(diào)用的invoker不是指定分組的,那么就直接返回null
  • 實現(xiàn)自己的合并spi,返回第一個不會null的結(jié)果

雖然不是什么優(yōu)雅的方式,但是,似乎是能做到的???

實際上,行不通,達不到想要的效果。原因是MergeableClusterInvoker內(nèi)部是用線程池并發(fā)調(diào)用的,ThreadLocal里的分組信息會丟失

還有機會么

MergeableClusterInvoker類是沒有留拓展的余地啦,還有其他機會么?MergeableClusterInvoker的Invokers列表從哪里來的?

Invokers是從directory來的,這里有沒有拓展的余地呢?

有的,在AbstractDirectory的list方法

看到這,發(fā)現(xiàn)用路由spi,還是有機會的,如果實現(xiàn)動態(tài)路由,每次只給MergeableClusterInvoker返回指定分組的Invoker,是不是就可以呢?怎么讓routers包含我們自定義的路由spi呢?

需要url上攜帶了router參數(shù),但是這里的url的參數(shù)是由誰決定的?@Reference 注解是沒有沒有的指定router參數(shù)的…

最后debuig,兜兜轉(zhuǎn)轉(zhuǎn),發(fā)現(xiàn)只能靠重寫ReferenceConfig類的loadRegistries方法,往url上加上動態(tài)路由的參數(shù)。

成果

AssignGroupRouterFactory

public class AssignGroupRouterFactory implements RouterFactory {
    public static final String NAME = "assignGroup";

    @Override
    public Router getRouter(URL url) {
        return new AssignGroupRouter(url);
    }
}

AssignGroupRouter

public class AssignGroupRouter implements Router {
    private final URL url;

    public AssignGroupRouter(URL url) {
        this.url = url;
    }

    @Override
    public URL getUrl() {
        return url;
    }

    @Override
    public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
        String assignGroup = IndexInfoProviderInvoker.getAssignGroup();
        if (Objects.isNull(assignGroup)) {
            return invokers;
        }
        return invokers.stream()
            .filter(invoker -> {
                URL invokerUrl = invoker.getUrl();
                return Objects.equals(assignGroup, invokerUrl.getParameter(Constants.GROUP_KEY));
            }).collect(Collectors.toList());
    }

    @Override
    public int compareTo(Router o) {
        return 1;
    }
}


AssignGroupReferenceConfig

public class AssignGroupReferenceConfig<T> extends ReferenceConfig<T> {
    @Override
    protected List<URL> loadRegistries(boolean provider) {
        List<URL> urls = super.loadRegistries(provider);
        if (CollectionUtils.isEmpty(urls)) {
            return urls;
        }
        // 指定動態(tài)路由,路由方式為assignGroup
        return urls.stream()
            .map(url -> url.addParameter(Constants.ROUTER_KEY, "assignGroup").addParameter(Constants.RUNTIME_KEY,
                "true"))
            .collect(Collectors.toList());
    }
}

IndexInfoProviderInvokerProxy

@Component
public class IndexInfoProviderInvokerProxy {
    public static final ThreadLocal<String> INDEX_TYPE_THREAD_LOCAL = new ThreadLocal<>();

    private final IndexInfoProvider indexInfoProvider;

    public IndexInfoProviderInvokerProxy(IndexInfoProvider indexInfoProvider) {
        this.indexInfoProvider = indexInfoProvider;
    }

    /**
     * 獲取文檔id 在[start, end) 之間的所有索引文檔
     *
     * @param indexType 索引名
     * @param start 起始id
     * @param end 結(jié)束id
     * @return 文檔列表
     */
    public List<IndexDocument> getDocsByIdRange(String indexType, long start, long end) {
        return invokeWitchIndexType(indexType, () -> indexInfoProvider.getDocsByIdRange(start, end));
    }

    private <T> T invokeWitchIndexType(String indexType, Supplier<T> supplier) {
        INDEX_TYPE_THREAD_LOCAL.set(indexType);
        T result = supplier.get();
        INDEX_TYPE_THREAD_LOCAL.remove();
        return result;
    }
}

總結(jié)

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • 詳解用Spring Boot Admin來監(jiān)控我們的微服務(wù)

    詳解用Spring Boot Admin來監(jiān)控我們的微服務(wù)

    這篇文章主要介紹了用Spring Boot Admin來監(jiān)控我們的微服務(wù),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-08-08
  • spring aop execution表達式的用法

    spring aop execution表達式的用法

    這篇文章主要介紹了spring aop execution表達式的用法,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-07-07
  • 解決springboot項目啟動報錯Field xxxMapper in com...xxxController required

    解決springboot項目啟動報錯Field xxxMapper in com...xx

    這篇文章主要介紹了解決springboot項目啟動報錯Field xxxMapper in com...xxxContr問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-12-12
  • spring動態(tài)控制定時任務(wù)的實現(xiàn)

    spring動態(tài)控制定時任務(wù)的實現(xiàn)

    在實際項目中,經(jīng)常需要動態(tài)的控制定時任務(wù),比如通過接口增加、啟動、停止、刪除定時任務(wù),本文主要介紹了spring動態(tài)控制定時任務(wù)的實現(xiàn),感興趣的可以了解一下
    2024-01-01
  • 關(guān)于MyBatis 查詢數(shù)據(jù)時屬性中多對一的問題(多條數(shù)據(jù)對應一條數(shù)據(jù))

    關(guān)于MyBatis 查詢數(shù)據(jù)時屬性中多對一的問題(多條數(shù)據(jù)對應一條數(shù)據(jù))

    這篇文章主要介紹了MyBatis 查詢數(shù)據(jù)時屬性中多對一的問題(多條數(shù)據(jù)對應一條數(shù)據(jù)),本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-01-01
  • SpringBoot使用RabbitMQ延時隊列(小白必備)

    SpringBoot使用RabbitMQ延時隊列(小白必備)

    這篇文章主要介紹了SpringBoot使用RabbitMQ延時隊列(小白必備),詳細的介紹延遲隊列的使用場景及其如何使用,需要的小伙伴可以一起來了解一下
    2019-12-12
  • Java GUI實現(xiàn)學生成績管理系統(tǒng)

    Java GUI實現(xiàn)學生成績管理系統(tǒng)

    這篇文章主要為大家詳細介紹了Java GUI實現(xiàn)學生成績管理系統(tǒng),文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2018-01-01
  • Java手動配置線程池過程詳解

    Java手動配置線程池過程詳解

    這篇文章主要介紹了Java手動配置線程池過程詳解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-05-05
  • 深入理解Java中的注解Annotation

    深入理解Java中的注解Annotation

    這篇文章主要介紹了深入理解Java中的注解Annotation,注解在Java中確實也很常見,但是人們常常不會自己定義一個注解拿來用,我們雖然很少去自定義注解,但是學會注解的寫法,注解的定義,學會利用反射解析注解中的信息,在開發(fā)中能夠使用到,這是很關(guān)鍵的,需要的朋友可以參考下
    2023-10-10
  • Spring Security 實現(xiàn)“記住我”功能及原理解析

    Spring Security 實現(xiàn)“記住我”功能及原理解析

    這篇文章主要介紹了Spring Security 實現(xiàn)“記住我”功能及原理解析,需要的朋友可以參考下
    2020-05-05

最新評論