dubbo如何實現(xiàn)consumer從多個group中調(diào)用指定group的provider
背景
在工作中,遇到這樣的場景:
有個es索引構(gòu)建服務(wù),需要從各個業(yè)務(wù)服務(wù)獲取索引的信息,從而構(gòu)建索引,業(yè)務(wù)服務(wù)都實現(xiàn)同一個接口——IndexInfoProvider,通過設(shè)置不同的group來達到區(qū)分的效果(group就是es索引名)。
索引構(gòu)建服務(wù)在內(nèi)存維護了一個Map<String, IndexInfoProvider> providerMap,key是索引名——也就是provider的group,value是IndexInfoProvider服務(wù)的consumer。
為了圖方便,索引構(gòu)建服務(wù)還一次性初始化了所有consumer,假設(shè)總共有200個分組,那么Map就會緩存200個consumer,初始化的時候巨慢。如果不一次性初始化,而是按需的話,代碼會變得復雜一些,可能需要像雙重檢驗這樣的措施。而且因為是緩存,必然也會遇到緩存一致性的問題,例如新增一個索引。
基于這樣的問題,就想著,dubbo的一個consumer,能不能按需調(diào)用指定分組的provider呢~
過程
為什么必須緩存那么多consumer
為什么有200個分組,就要緩存200個consumer,而不是像我們平時那樣,1個就可以呢?

業(yè)務(wù)服務(wù)在實現(xiàn)IndexInfoProvider接口的時候,都指定是分組,而且分組名,就是對應的索引名。
相應的,消費者就必須指定消費的分組,200個索引,自然就需要200個consumer。
consumer只能調(diào)用一個group的provider么
在我們?nèi)粘5氖褂弥校琧onsumer基本都是只能調(diào)用一個group的provider。但實際上,dubbo是支持調(diào)用多個group的provider的。

這樣寫是指定a分組和b分組,多個分組之間用英文逗號分隔

這樣寫,是所有的分組
group="*"能代替Map<String, IndexInfoProvider>么
如果將es索引構(gòu)建服務(wù)的IndexInfoProvider接口的consumer分組設(shè)置為group="*",然后在調(diào)用接口的時候,根據(jù)需要,從一堆provider中篩選出指定group的provider,只調(diào)用這些provider,是不是就可以代替Map緩存了?
初步方案:
- 利用ThreadLocal來指定要調(diào)用的分組,在調(diào)用方法前設(shè)置group到ThreadLocal中。
- 實現(xiàn)dubbo的負載均衡拓展,只選取指定分組的節(jié)點來調(diào)用。
- 調(diào)用結(jié)束,清理ThreadLocal中的分組信息。
似乎是可行的?
理想很美好,現(xiàn)實很骨感
通過源碼和代碼debug發(fā)現(xiàn),consumer持有的Invoker,有點像一個責任鏈。
如果是單分組——!group.contains(",") && !group.equals("*"),那么會是這樣:
MockClusterInvoker->FailoverClusterInvoker
如果是多分組,則會變成:
MockClusterInvoker->MergeableClusterInvoker->FailoverClusterInvoker
負載均衡的機制,是得到了FailoverClusterInvoker才會生效
MockClusterInvoker只是一種降級機制,不是導致問題的原因
問題是出在MergeableClusterInvoker,下面是導致問題的代碼?。?!

代碼大概的意思是:消費者沒有指定合并策略,那么就會調(diào)用第一個有效的服務(wù)提供者,如果都是無效,就直接調(diào)用第一個。
我是沒有指定合并策略的,所以會變成調(diào)用第一個有效的服務(wù)提供者。根本走不到負載均衡那里。
不指定合并策略不行,那指定呢?指定合并策略會怎樣呢?


指定了合并策略,會調(diào)用所有invoker,然后用Merger合并結(jié)果。
這種很適合這樣種場景:
一部分數(shù)據(jù)在服務(wù)A,一部分數(shù)據(jù)在服務(wù)B,需要分別調(diào)用A和B,然后把兩者的結(jié)果集合并
設(shè)想著這樣的方案:
- 指定了合并策略,那么所有分組的invoker都會被調(diào)用到,那么請求就到FailoverClusterInvoker了,負載均衡spi就生效了
- 修改一下負載均衡spi,如果目前調(diào)用的invoker不是指定分組的,那么就直接返回null
- 實現(xiàn)自己的合并spi,返回第一個不會null的結(jié)果
雖然不是什么優(yōu)雅的方式,但是,似乎是能做到的???
實際上,行不通,達不到想要的效果。原因是MergeableClusterInvoker內(nèi)部是用線程池并發(fā)調(diào)用的,ThreadLocal里的分組信息會丟失。

還有機會么
MergeableClusterInvoker類是沒有留拓展的余地啦,還有其他機會么?MergeableClusterInvoker的Invokers列表從哪里來的?

Invokers是從directory來的,這里有沒有拓展的余地呢?
有的,在AbstractDirectory的list方法

看到這,發(fā)現(xiàn)用路由spi,還是有機會的,如果實現(xiàn)動態(tài)路由,每次只給MergeableClusterInvoker返回指定分組的Invoker,是不是就可以呢?怎么讓routers包含我們自定義的路由spi呢?

需要url上攜帶了router參數(shù),但是這里的url的參數(shù)是由誰決定的?@Reference 注解是沒有沒有的指定router參數(shù)的…
最后debuig,兜兜轉(zhuǎn)轉(zhuǎn),發(fā)現(xiàn)只能靠重寫ReferenceConfig類的loadRegistries方法,往url上加上動態(tài)路由的參數(shù)。

成果
AssignGroupRouterFactory
public class AssignGroupRouterFactory implements RouterFactory {
public static final String NAME = "assignGroup";
@Override
public Router getRouter(URL url) {
return new AssignGroupRouter(url);
}
}
AssignGroupRouter
public class AssignGroupRouter implements Router {
private final URL url;
public AssignGroupRouter(URL url) {
this.url = url;
}
@Override
public URL getUrl() {
return url;
}
@Override
public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
String assignGroup = IndexInfoProviderInvoker.getAssignGroup();
if (Objects.isNull(assignGroup)) {
return invokers;
}
return invokers.stream()
.filter(invoker -> {
URL invokerUrl = invoker.getUrl();
return Objects.equals(assignGroup, invokerUrl.getParameter(Constants.GROUP_KEY));
}).collect(Collectors.toList());
}
@Override
public int compareTo(Router o) {
return 1;
}
}
AssignGroupReferenceConfig
public class AssignGroupReferenceConfig<T> extends ReferenceConfig<T> {
@Override
protected List<URL> loadRegistries(boolean provider) {
List<URL> urls = super.loadRegistries(provider);
if (CollectionUtils.isEmpty(urls)) {
return urls;
}
// 指定動態(tài)路由,路由方式為assignGroup
return urls.stream()
.map(url -> url.addParameter(Constants.ROUTER_KEY, "assignGroup").addParameter(Constants.RUNTIME_KEY,
"true"))
.collect(Collectors.toList());
}
}
IndexInfoProviderInvokerProxy
@Component
public class IndexInfoProviderInvokerProxy {
public static final ThreadLocal<String> INDEX_TYPE_THREAD_LOCAL = new ThreadLocal<>();
private final IndexInfoProvider indexInfoProvider;
public IndexInfoProviderInvokerProxy(IndexInfoProvider indexInfoProvider) {
this.indexInfoProvider = indexInfoProvider;
}
/**
* 獲取文檔id 在[start, end) 之間的所有索引文檔
*
* @param indexType 索引名
* @param start 起始id
* @param end 結(jié)束id
* @return 文檔列表
*/
public List<IndexDocument> getDocsByIdRange(String indexType, long start, long end) {
return invokeWitchIndexType(indexType, () -> indexInfoProvider.getDocsByIdRange(start, end));
}
private <T> T invokeWitchIndexType(String indexType, Supplier<T> supplier) {
INDEX_TYPE_THREAD_LOCAL.set(indexType);
T result = supplier.get();
INDEX_TYPE_THREAD_LOCAL.remove();
return result;
}
}
總結(jié)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
詳解用Spring Boot Admin來監(jiān)控我們的微服務(wù)
這篇文章主要介紹了用Spring Boot Admin來監(jiān)控我們的微服務(wù),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-08-08
解決springboot項目啟動報錯Field xxxMapper in com...xx
這篇文章主要介紹了解決springboot項目啟動報錯Field xxxMapper in com...xxxContr問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-12-12
spring動態(tài)控制定時任務(wù)的實現(xiàn)
在實際項目中,經(jīng)常需要動態(tài)的控制定時任務(wù),比如通過接口增加、啟動、停止、刪除定時任務(wù),本文主要介紹了spring動態(tài)控制定時任務(wù)的實現(xiàn),感興趣的可以了解一下2024-01-01
關(guān)于MyBatis 查詢數(shù)據(jù)時屬性中多對一的問題(多條數(shù)據(jù)對應一條數(shù)據(jù))
這篇文章主要介紹了MyBatis 查詢數(shù)據(jù)時屬性中多對一的問題(多條數(shù)據(jù)對應一條數(shù)據(jù)),本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-01-01
SpringBoot使用RabbitMQ延時隊列(小白必備)
這篇文章主要介紹了SpringBoot使用RabbitMQ延時隊列(小白必備),詳細的介紹延遲隊列的使用場景及其如何使用,需要的小伙伴可以一起來了解一下2019-12-12
Java GUI實現(xiàn)學生成績管理系統(tǒng)
這篇文章主要為大家詳細介紹了Java GUI實現(xiàn)學生成績管理系統(tǒng),文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-01-01
Spring Security 實現(xiàn)“記住我”功能及原理解析
這篇文章主要介紹了Spring Security 實現(xiàn)“記住我”功能及原理解析,需要的朋友可以參考下2020-05-05

