dubbo如何實(shí)現(xiàn)consumer從多個(gè)group中調(diào)用指定group的provider
背景
在工作中,遇到這樣的場(chǎng)景:
有個(gè)es索引構(gòu)建服務(wù),需要從各個(gè)業(yè)務(wù)服務(wù)獲取索引的信息,從而構(gòu)建索引,業(yè)務(wù)服務(wù)都實(shí)現(xiàn)同一個(gè)接口——IndexInfoProvider,通過設(shè)置不同的group來(lái)達(dá)到區(qū)分的效果(group就是es索引名)。
索引構(gòu)建服務(wù)在內(nèi)存維護(hù)了一個(gè)Map<String, IndexInfoProvider> providerMap,key是索引名——也就是provider的group,value是IndexInfoProvider服務(wù)的consumer。
為了圖方便,索引構(gòu)建服務(wù)還一次性初始化了所有consumer,假設(shè)總共有200個(gè)分組,那么Map就會(huì)緩存200個(gè)consumer,初始化的時(shí)候巨慢。如果不一次性初始化,而是按需的話,代碼會(huì)變得復(fù)雜一些,可能需要像雙重檢驗(yàn)這樣的措施。而且因?yàn)槭蔷彺妫厝灰矔?huì)遇到緩存一致性的問題,例如新增一個(gè)索引。
基于這樣的問題,就想著,dubbo的一個(gè)consumer,能不能按需調(diào)用指定分組的provider呢~
過程
為什么必須緩存那么多consumer
為什么有200個(gè)分組,就要緩存200個(gè)consumer,而不是像我們平時(shí)那樣,1個(gè)就可以呢?
業(yè)務(wù)服務(wù)在實(shí)現(xiàn)IndexInfoProvider接口的時(shí)候,都指定是分組,而且分組名,就是對(duì)應(yīng)的索引名。
相應(yīng)的,消費(fèi)者就必須指定消費(fèi)的分組,200個(gè)索引,自然就需要200個(gè)consumer。
consumer只能調(diào)用一個(gè)group的provider么
在我們?nèi)粘5氖褂弥?,consumer基本都是只能調(diào)用一個(gè)group的provider。但實(shí)際上,dubbo是支持調(diào)用多個(gè)group的provider的。
這樣寫是指定a分組和b分組,多個(gè)分組之間用英文逗號(hào)分隔
這樣寫,是所有的分組
group="*"能代替Map<String, IndexInfoProvider>么
如果將es索引構(gòu)建服務(wù)的IndexInfoProvider接口的consumer分組設(shè)置為group="*",然后在調(diào)用接口的時(shí)候,根據(jù)需要,從一堆provider中篩選出指定group的provider,只調(diào)用這些provider,是不是就可以代替Map緩存了?
初步方案:
- 利用ThreadLocal來(lái)指定要調(diào)用的分組,在調(diào)用方法前設(shè)置group到ThreadLocal中。
- 實(shí)現(xiàn)dubbo的負(fù)載均衡拓展,只選取指定分組的節(jié)點(diǎn)來(lái)調(diào)用。
- 調(diào)用結(jié)束,清理ThreadLocal中的分組信息。
似乎是可行的?
理想很美好,現(xiàn)實(shí)很骨感
通過源碼和代碼debug發(fā)現(xiàn),consumer持有的Invoker,有點(diǎn)像一個(gè)責(zé)任鏈。
如果是單分組——!group.contains(",") && !group.equals("*"),那么會(huì)是這樣:
MockClusterInvoker->FailoverClusterInvoker
如果是多分組,則會(huì)變成:
MockClusterInvoker->MergeableClusterInvoker->FailoverClusterInvoker
負(fù)載均衡的機(jī)制,是得到了FailoverClusterInvoker才會(huì)生效
MockClusterInvoker只是一種降級(jí)機(jī)制,不是導(dǎo)致問題的原因
問題是出在MergeableClusterInvoker,下面是導(dǎo)致問題的代碼!??!
代碼大概的意思是:消費(fèi)者沒有指定合并策略,那么就會(huì)調(diào)用第一個(gè)有效的服務(wù)提供者,如果都是無(wú)效,就直接調(diào)用第一個(gè)。
我是沒有指定合并策略的,所以會(huì)變成調(diào)用第一個(gè)有效的服務(wù)提供者。根本走不到負(fù)載均衡那里。
不指定合并策略不行,那指定呢?指定合并策略會(huì)怎樣呢?
指定了合并策略,會(huì)調(diào)用所有invoker,然后用Merger合并結(jié)果。
這種很適合這樣種場(chǎng)景:
一部分?jǐn)?shù)據(jù)在服務(wù)A,一部分?jǐn)?shù)據(jù)在服務(wù)B,需要分別調(diào)用A和B,然后把兩者的結(jié)果集合并
設(shè)想著這樣的方案:
- 指定了合并策略,那么所有分組的invoker都會(huì)被調(diào)用到,那么請(qǐng)求就到FailoverClusterInvoker了,負(fù)載均衡spi就生效了
- 修改一下負(fù)載均衡spi,如果目前調(diào)用的invoker不是指定分組的,那么就直接返回null
- 實(shí)現(xiàn)自己的合并spi,返回第一個(gè)不會(huì)null的結(jié)果
雖然不是什么優(yōu)雅的方式,但是,似乎是能做到的???
實(shí)際上,行不通,達(dá)不到想要的效果。原因是MergeableClusterInvoker內(nèi)部是用線程池并發(fā)調(diào)用的,ThreadLocal里的分組信息會(huì)丟失。
還有機(jī)會(huì)么
MergeableClusterInvoker類是沒有留拓展的余地啦,還有其他機(jī)會(huì)么?MergeableClusterInvoker的Invokers列表從哪里來(lái)的?
Invokers是從directory來(lái)的,這里有沒有拓展的余地呢?
有的,在AbstractDirectory的list方法
看到這,發(fā)現(xiàn)用路由spi,還是有機(jī)會(huì)的,如果實(shí)現(xiàn)動(dòng)態(tài)路由,每次只給MergeableClusterInvoker返回指定分組的Invoker,是不是就可以呢?怎么讓routers包含我們自定義的路由spi呢?
需要url上攜帶了router參數(shù),但是這里的url的參數(shù)是由誰(shuí)決定的?@Reference 注解是沒有沒有的指定router參數(shù)的…
最后debuig,兜兜轉(zhuǎn)轉(zhuǎn),發(fā)現(xiàn)只能靠重寫ReferenceConfig類的loadRegistries方法,往url上加上動(dòng)態(tài)路由的參數(shù)。
成果
AssignGroupRouterFactory
public class AssignGroupRouterFactory implements RouterFactory { public static final String NAME = "assignGroup"; @Override public Router getRouter(URL url) { return new AssignGroupRouter(url); } }
AssignGroupRouter
public class AssignGroupRouter implements Router { private final URL url; public AssignGroupRouter(URL url) { this.url = url; } @Override public URL getUrl() { return url; } @Override public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException { String assignGroup = IndexInfoProviderInvoker.getAssignGroup(); if (Objects.isNull(assignGroup)) { return invokers; } return invokers.stream() .filter(invoker -> { URL invokerUrl = invoker.getUrl(); return Objects.equals(assignGroup, invokerUrl.getParameter(Constants.GROUP_KEY)); }).collect(Collectors.toList()); } @Override public int compareTo(Router o) { return 1; } }
AssignGroupReferenceConfig
public class AssignGroupReferenceConfig<T> extends ReferenceConfig<T> { @Override protected List<URL> loadRegistries(boolean provider) { List<URL> urls = super.loadRegistries(provider); if (CollectionUtils.isEmpty(urls)) { return urls; } // 指定動(dòng)態(tài)路由,路由方式為assignGroup return urls.stream() .map(url -> url.addParameter(Constants.ROUTER_KEY, "assignGroup").addParameter(Constants.RUNTIME_KEY, "true")) .collect(Collectors.toList()); } }
IndexInfoProviderInvokerProxy
@Component public class IndexInfoProviderInvokerProxy { public static final ThreadLocal<String> INDEX_TYPE_THREAD_LOCAL = new ThreadLocal<>(); private final IndexInfoProvider indexInfoProvider; public IndexInfoProviderInvokerProxy(IndexInfoProvider indexInfoProvider) { this.indexInfoProvider = indexInfoProvider; } /** * 獲取文檔id 在[start, end) 之間的所有索引文檔 * * @param indexType 索引名 * @param start 起始id * @param end 結(jié)束id * @return 文檔列表 */ public List<IndexDocument> getDocsByIdRange(String indexType, long start, long end) { return invokeWitchIndexType(indexType, () -> indexInfoProvider.getDocsByIdRange(start, end)); } private <T> T invokeWitchIndexType(String indexType, Supplier<T> supplier) { INDEX_TYPE_THREAD_LOCAL.set(indexType); T result = supplier.get(); INDEX_TYPE_THREAD_LOCAL.remove(); return result; } }
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
詳解用Spring Boot Admin來(lái)監(jiān)控我們的微服務(wù)
這篇文章主要介紹了用Spring Boot Admin來(lái)監(jiān)控我們的微服務(wù),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-08-08spring aop execution表達(dá)式的用法
這篇文章主要介紹了spring aop execution表達(dá)式的用法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-07-07解決springboot項(xiàng)目啟動(dòng)報(bào)錯(cuò)Field xxxMapper in com...xx
這篇文章主要介紹了解決springboot項(xiàng)目啟動(dòng)報(bào)錯(cuò)Field xxxMapper in com...xxxContr問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-12-12spring動(dòng)態(tài)控制定時(shí)任務(wù)的實(shí)現(xiàn)
在實(shí)際項(xiàng)目中,經(jīng)常需要?jiǎng)討B(tài)的控制定時(shí)任務(wù),比如通過接口增加、啟動(dòng)、停止、刪除定時(shí)任務(wù),本文主要介紹了spring動(dòng)態(tài)控制定時(shí)任務(wù)的實(shí)現(xiàn),感興趣的可以了解一下2024-01-01關(guān)于MyBatis 查詢數(shù)據(jù)時(shí)屬性中多對(duì)一的問題(多條數(shù)據(jù)對(duì)應(yīng)一條數(shù)據(jù))
這篇文章主要介紹了MyBatis 查詢數(shù)據(jù)時(shí)屬性中多對(duì)一的問題(多條數(shù)據(jù)對(duì)應(yīng)一條數(shù)據(jù)),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-01-01SpringBoot使用RabbitMQ延時(shí)隊(duì)列(小白必備)
這篇文章主要介紹了SpringBoot使用RabbitMQ延時(shí)隊(duì)列(小白必備),詳細(xì)的介紹延遲隊(duì)列的使用場(chǎng)景及其如何使用,需要的小伙伴可以一起來(lái)了解一下2019-12-12Java GUI實(shí)現(xiàn)學(xué)生成績(jī)管理系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了Java GUI實(shí)現(xiàn)學(xué)生成績(jī)管理系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-01-01Spring Security 實(shí)現(xiàn)“記住我”功能及原理解析
這篇文章主要介紹了Spring Security 實(shí)現(xiàn)“記住我”功能及原理解析,需要的朋友可以參考下2020-05-05