dubbo服務(wù)引用之創(chuàng)建Invoker流程詳解
1、創(chuàng)建Invoker流程
1.1、收集引用參數(shù)
ReferenceConfig#init方法的結(jié)尾處,調(diào)用createProxy方法,采集的參數(shù)集合作為入?yún)鬟f到該方法中。
1.2、創(chuàng)建Invoker
@SuppressWarnings({"unchecked", "rawtypes", "deprecation"}) private T createProxy(Map<String, String> map) { ... // 創(chuàng)建服務(wù)代理 return (T) proxyFactory.getProxy(invoker); }
方法主要邏輯就是
- 1、默認(rèn)情況下如果本地有服務(wù)暴露,則引用本地服務(wù).
- 2、用戶寫死了引用的URL,指定的URL可能是對(duì)點(diǎn)對(duì)直連地址,也可能是注冊(cè)中心URL
- 3、通過注冊(cè)中心配置拼裝URL,List<URL> us = loadRegistries(false); 用戶配置了幾個(gè)注冊(cè)中心,就會(huì)產(chǎn)生幾個(gè)URL
不管走哪種引用類型,都會(huì)執(zhí)行下面的核心代碼
invoker = refprotocol.refer(interfaceClass, url);
refprotocol是一個(gè)Protocol接口,getAdaptiveExtension返回的是一個(gè)Protocol$Adpative,在該類的源碼及其生產(chǎn)方法。
private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
又看到了Protocol這個(gè)接口
@SPI("dubbo") public interface Protocol { int getDefaultPort(); @Adaptive <T> Exporter<T> export(Invoker<T> var1) throws RpcException; @Adaptive <T> Invoker<T> refer(Class<T> var1, URL var2) throws RpcException; void destroy(); }
主要任務(wù)是,暴露遠(yuǎn)程服務(wù)、引用遠(yuǎn)程服務(wù)、釋放協(xié)議【釋放暴露于引用服務(wù)時(shí)占用的資源】,dubbo支持多種協(xié)議,http,thrift,RMI等,真是通過dubbo的SPI機(jī)制,才可以靈活的在這些協(xié)議來回切換。
我們第二種為例講一下核心邏輯,同服務(wù)暴露時(shí)的一樣,因?yàn)镈ubbo的AOP機(jī)制,在獲RegistryProtocol時(shí),會(huì)經(jīng)過兩個(gè)Wrapper類的包裝
這個(gè)地方也不例外,但是兩個(gè)Wrapper類的ProtocolFilterWrapper,ProtocolListenerWrapper并無實(shí)際的業(yè)務(wù)邏輯,我們直接跳過。
執(zhí)行refprotocol.refer(interfaceClass, url)即執(zhí)行RegistryProtocol#refer代碼。
@SuppressWarnings("unchecked") public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY); Registry registry = registryFactory.getRegistry(url); if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); } // group="a,b" or group="*" Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY)); String group = qs.get(Constants.GROUP_KEY); if (group != null && group.length() > 0) { if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) { return doRefer(getMergeableCluster(), registry, type, url); } } return doRefer(cluster, registry, type, url); }
1.2.1、 連接zookeeper
Registry registry = registryFactory.getRegistry(url);
在服務(wù)發(fā)布的時(shí)候,已經(jīng)講過了,其主要核心作用就是連接zookeeper服務(wù)器,并返回一個(gè)ZookeeperRegistry實(shí)例。
在RegistryProtocol#doRefer方法中,通過ZookeeperRegistry#register的執(zhí)行,創(chuàng)建引用服務(wù)的consumers節(jié)點(diǎn)。創(chuàng)建如下節(jié)點(diǎn):
/dubbo/com.alibaba.dubbo.demo.DemoService/consumers/consumer%3A%2F%2F192.168.43.156%2Fcom.alibaba.dubbo.demo.DemoService%3Fapplication%3Ddemo-consumer%26category%3Dconsumers%26check%3Dfalse%26dubbo%3D2.0.0%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D15775%26side%3Dconsumer%26timestamp%3D1525073802234
1.2.2、 監(jiān)聽節(jié)點(diǎn)
1.2.3、 加入集群
cluster.join(directory)
cluster也是一個(gè)帶有Adaptive注解的擴(kuò)展類,默認(rèn)實(shí)現(xiàn)時(shí)FailoverCluster
@SPI(FailoverCluster.NAME) public interface Cluster { /** * Merge the directory invokers to a virtual invoker. * * @param <T> * @param directory * @return cluster invoker * @throws RpcException */ @Adaptive <T> Invoker<T> join(Directory<T> directory) throws RpcException; }
進(jìn)入FailoverCluster#join,返回FailoverClusterInvoker,一個(gè)可以失敗轉(zhuǎn)移的Invoker,
public class FailoverCluster implements Cluster { public final static String NAME = "failover"; public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new FailoverClusterInvoker<T>(directory); } }
FailoverClusterInvoker源碼中,它實(shí)現(xiàn)了父類中的一個(gè)模板子方法doInvoke。
父類AbstractClusterInvoker的invoke方法,
public Result invoke(final Invocation invocation) throws RpcException { checkWhetherDestroyed(); LoadBalance loadbalance; List<Invoker<T>> invokers = list(invocation); if (invokers != null && invokers.size() > 0) { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); } else { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); }
方法list(invocation)返回了List<Invoker<T>> invokers,這些Invoker就是實(shí)際與服務(wù)交互的對(duì)象,
protected List<Invoker<T>> list(Invocation invocation) throws RpcException { List<Invoker<T>> invokers = directory.list(invocation); return invokers; }
我們?cè)跇?gòu)造FailoverClusterInvoker時(shí),傳入的Directory實(shí)現(xiàn)類是RegistryDirectory,即AbstractDirectory#list方法。
1.2.4、 核心類DubboInvoker
DubboInvoker 最終Invoker執(zhí)行的方法是:
@Override protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } else if (isAsync) { ResponseFuture future = currentClient.request(inv, timeout); RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); return new RpcResult(); } else { RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } }
調(diào)用invoker,白話描述就是:
將通過遠(yuǎn)程通信將Invocation信息傳遞給服務(wù)器端,服務(wù)器端接收到該Invocation信息后,找到對(duì)應(yīng)的本地Invoker,然后通過反射執(zhí)行相應(yīng)的方法,將方法的返回值再通過遠(yuǎn)程通信將結(jié)果傳遞給客戶端。
這里分3種情況:
- 執(zhí)行方法不需要返回值:直接調(diào)用ExchangeClient.send()方法
- 執(zhí)行方法的結(jié)果需要異步返回:使用ExchangeClient.request()方法返回一個(gè)ResponseFuture對(duì)象,通過RpcContext中的ThreadLocal使ResponseFuture和當(dāng)前線程綁定,未等服務(wù)端響應(yīng)結(jié)果就直接返回,然后服務(wù)端通過ProtocolFilterWrapper.buildInvokerChain()方法會(huì)調(diào)用Filter.invoke()方法,即FutureFilter.invoker()->asyncCallback(),會(huì)獲取RpcContext的ResponseFuture對(duì)象,異步返回結(jié)果
- 執(zhí)行方法的結(jié)果需要同步返回:使用ExchangeClient.request()方法,返回一個(gè)ResponseFuture,一直阻塞到服務(wù)端返回響應(yīng)結(jié)果
返回FailoverClusterInvoker
以上就是dubbo服務(wù)引用二之創(chuàng)建Invoker的詳細(xì)內(nèi)容,更多關(guān)于dubbo服務(wù)引用創(chuàng)建Invoker的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
使用mybatis-plus的insert方法遇到的問題及解決方法(添加時(shí)id值不存在異常)
這篇文章主要介紹了使用mybatis-plus的insert方法遇到的問題及解決方法(添加時(shí)id值不存在異常),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-08-08SpringBoot項(xiàng)目中如何解決跨域問題的最新方案?
跨域問題是瀏覽器為了保護(hù)用戶的信息安全,實(shí)施了同源策略(Same-Origin Policy),即只允許頁面請(qǐng)求同源(相同協(xié)議、域名和端口)的資源,當(dāng) JavaScript 發(fā)起的請(qǐng)求跨越了同源策略,即請(qǐng)求的目標(biāo)與當(dāng)前頁面的域名、端口、協(xié)議不一致時(shí),瀏覽器會(huì)阻止請(qǐng)求的發(fā)送或接收2025-03-03Mybatis分頁插件Pagehelper的PageInfo字段屬性使用及解釋
這篇文章主要介紹了Mybatis分頁插件Pagehelper的PageInfo字段屬性使用及解釋,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-05-05springmvc參數(shù)為對(duì)象,數(shù)組的操作
這篇文章主要介紹了springmvc參數(shù)為對(duì)象,數(shù)組的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08Java?guava框架LoadingCache及CacheBuilder本地小容量緩存框架總結(jié)
Guava?Cache本地緩存框架主要是一種將本地?cái)?shù)據(jù)緩存到內(nèi)存中,但數(shù)據(jù)量并不能太大,否則將會(huì)占用過多的內(nèi)存,本文給大家介紹Java?guava框架?LoadingCache及CacheBuilder?本地小容量緩存框架總結(jié),感興趣的朋友一起看看吧2023-12-12非常適合新手學(xué)生的Java線程池超詳細(xì)分析
作者是一個(gè)來自河源的大三在校生,以下筆記都是作者自學(xué)之路的一些淺薄經(jīng)驗(yàn),如有錯(cuò)誤請(qǐng)指正,將來會(huì)不斷的完善筆記,幫助更多的Java愛好者入門2022-03-03