Spring Cloud Feign內(nèi)部實(shí)現(xiàn)代碼細(xì)節(jié)
1. 概述
Feign用于服務(wù)間調(diào)用,它的內(nèi)部實(shí)現(xiàn)是一個(gè)包含Ribbon(負(fù)載均衡)的**JDK-HttpURLConnection(Http)**調(diào)用。雖然調(diào)用形式是類(lèi)似于RPC,但是實(shí)際調(diào)用是Http,這也是為什么Feign被稱(chēng)為偽RPC調(diào)用的原因。
內(nèi)部調(diào)用過(guò)程如下:
2. 代碼細(xì)節(jié)
1) BaseLoadBalancer.java配置初始化
重點(diǎn)功能: 1. 初始化負(fù)載均衡策略 2. 初始化取服務(wù)注冊(cè)列表調(diào)度策略
void initWithConfig(IClientConfig clientConfig, IRule rule, IPing ping, LoadBalancerStats stats) { ... // 每隔30s Ping一次 int pingIntervalTime = Integer.parseInt("" + clientConfig.getProperty( CommonClientConfigKey.NFLoadBalancerPingInterval, Integer.parseInt("30"))); // 每次最多Ping 2s int maxTotalPingTime = Integer.parseInt("" + clientConfig.getProperty( CommonClientConfigKey.NFLoadBalancerMaxTotalPingTime, Integer.parseInt("2"))); setPingInterval(pingIntervalTime); setMaxTotalPingTime(maxTotalPingTime); // cross associate with each other // i.e. Rule,Ping meet your container LB // LB, these are your Ping and Rule guys ... // 設(shè)置負(fù)載均衡規(guī)則 setRule(rule); // 初始化取服務(wù)注冊(cè)列表調(diào)度策略 setPing(ping); setLoadBalancerStats(stats); rule.setLoadBalancer(this); ... }
2) 負(fù)載均衡策略初始化
重點(diǎn)功能: 1. 默認(rèn)使用輪詢(xún)策略
BaseLoadBalancer.java
public void setRule(IRule rule) { if (rule != null) { this.rule = rule; } else { /* default rule */ // 默認(rèn)使用輪詢(xún)策略 this.rule = new RoundRobinRule(); } if (this.rule.getLoadBalancer() != this) { this.rule.setLoadBalancer(this); } }
RoundRobinRule.java
private AtomicInteger nextServerCyclicCounter; public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { log.warn("no load balancer"); return null; } Server server = null; int count = 0; while (server == null && count++ < 10) { List<Server> reachableServers = lb.getReachableServers(); List<Server> allServers = lb.getAllServers(); int upCount = reachableServers.size(); int serverCount = allServers.size(); if ((upCount == 0) || (serverCount == 0)) { log.warn("No up servers available from load balancer: " + lb); return null; } // 輪詢(xún)重點(diǎn)算法 int nextServerIndex = incrementAndGetModulo(serverCount); server = allServers.get(nextServerIndex); if (server == null) { /* Transient. */ Thread.yield(); continue; } if (server.isAlive() && (server.isReadyToServe())) { return (server); } // Next. server = null; } if (count >= 10) { log.warn("No available alive servers after 10 tries from load balancer: " + lb); } return server; } private int incrementAndGetModulo(int modulo) { for (;;) { int current = nextServerCyclicCounter.get(); int next = (current + 1) % modulo; if (nextServerCyclicCounter.compareAndSet(current, next)) return next; } }
3) 初始化取服務(wù)注冊(cè)列表調(diào)度策略
重點(diǎn)功能: 1. 設(shè)置輪詢(xún)間隔為30s 一次
注意: 這里沒(méi)有做實(shí)際的Ping,只是獲取緩存的注冊(cè)列表的alive服務(wù),原因是為了提高性能
BaseLoadBalancer.java
public void setPing(IPing ping) { if (ping != null) { if (!ping.equals(this.ping)) { this.ping = ping; setupPingTask(); // since ping data changed } } else { this.ping = null; // cancel the timer task lbTimer.cancel(); } } void setupPingTask() { if (canSkipPing()) { return; } if (lbTimer != null) { lbTimer.cancel(); } lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name, true); // 這里雖然默認(rèn)設(shè)置是10s一次,但是在初始化的時(shí)候,設(shè)置了30s一次 lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000); forceQuickPing(); } class Pinger { private final IPingStrategy pingerStrategy; public Pinger(IPingStrategy pingerStrategy) { this.pingerStrategy = pingerStrategy; } public void runPinger() throws Exception { if (!pingInProgress.compareAndSet(false, true)) { return; // Ping in progress - nothing to do } // we are "in" - we get to Ping Server[] allServers = null; boolean[] results = null; Lock allLock = null; Lock upLock = null; try { /* * The readLock should be free unless an addServer operation is * going on... */ allLock = allServerLock.readLock(); allLock.lock(); allServers = allServerList.toArray(new Server[allServerList.size()]); allLock.unlock(); int numCandidates = allServers.length; results = pingerStrategy.pingServers(ping, allServers); final List<Server> newUpList = new ArrayList<Server>(); final List<Server> changedServers = new ArrayList<Server>(); for (int i = 0; i < numCandidates; i++) { boolean isAlive = results[i]; Server svr = allServers[i]; boolean oldIsAlive = svr.isAlive(); svr.setAlive(isAlive); if (oldIsAlive != isAlive) { changedServers.add(svr); logger.debug("LoadBalancer [{}]: Server [{}] status changed to {}", name, svr.getId(), (isAlive ? "ALIVE" : "DEAD")); } if (isAlive) { newUpList.add(svr); } } upLock = upServerLock.writeLock(); upLock.lock(); upServerList = newUpList; upLock.unlock(); notifyServerStatusChangeListener(changedServers); } finally { pingInProgress.set(false); } } } private static class SerialPingStrategy implements IPingStrategy { @Override public boolean[] pingServers(IPing ping, Server[] servers) { int numCandidates = servers.length; boolean[] results = new boolean[numCandidates]; logger.debug("LoadBalancer: PingTask executing [{}] servers configured", numCandidates); for (int i = 0; i < numCandidates; i++) { results[i] = false; /* Default answer is DEAD. */ try { // NOTE: IFF we were doing a real ping // assuming we had a large set of servers (say 15) // the logic below will run them serially // hence taking 15 times the amount of time it takes // to ping each server // A better method would be to put this in an executor // pool // But, at the time of this writing, we dont REALLY // use a Real Ping (its mostly in memory eureka call) // hence we can afford to simplify this design and run // this // serially if (ping != null) { results[i] = ping.isAlive(servers[i]); } } catch (Exception e) { logger.error("Exception while pinging Server: '{}'", servers[i], e); } } return results; } }
4) 最后拼接完整URL使用JDK-HttpURLConnection進(jìn)行調(diào)用
SynchronousMethodHandler.java(io.github.openfeign:feign-core:10.10.1/feign.SynchronousMethodHandler.java)
Object executeAndDecode(RequestTemplate template, Options options) throws Throwable { Request request = this.targetRequest(template); if (this.logLevel != Level.NONE) { this.logger.logRequest(this.metadata.configKey(), this.logLevel, request); } long start = System.nanoTime(); Response response; try { response = this.client.execute(request, options); response = response.toBuilder().request(request).requestTemplate(template).build(); } catch (IOException var13) { if (this.logLevel != Level.NONE) { this.logger.logIOException(this.metadata.configKey(), this.logLevel, var13, this.elapsedTime(start)); } throw FeignException.errorExecuting(request, var13); } ... }
LoadBalancerFeignClient.java
@Override public Response execute(Request request, Request.Options options) throws IOException { try { URI asUri = URI.create(request.url()); String clientName = asUri.getHost(); URI uriWithoutHost = cleanUrl(request.url(), clientName); FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest( this.delegate, request, uriWithoutHost); IClientConfig requestConfig = getClientConfig(options, clientName); return lbClient(clientName) .executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse(); } catch (ClientException e) { IOException io = findIOException(e); if (io != null) { throw io; } throw new RuntimeException(e); } }
AbstractLoadBalancerAwareClient.java
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException { LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig); try { return command.submit( new ServerOperation<T>() { @Override public Observable<T> call(Server server) { // 獲取真實(shí)訪問(wèn)URL URI finalUri = reconstructURIWithServer(server, request.getUri()); S requestForServer = (S) request.replaceUri(finalUri); try { return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig)); } catch (Exception e) { return Observable.error(e); } } }) .toBlocking() .single(); } catch (Exception e) { Throwable t = e.getCause(); if (t instanceof ClientException) { throw (ClientException) t; } else { throw new ClientException(e); } } }
FeignLoadBalancer.java
@Override public RibbonResponse execute(RibbonRequest request, IClientConfig configOverride) throws IOException { Request.Options options; if (configOverride != null) { RibbonProperties override = RibbonProperties.from(configOverride); options = new Request.Options(override.connectTimeout(this.connectTimeout), override.readTimeout(this.readTimeout)); } else { options = new Request.Options(this.connectTimeout, this.readTimeout); } Response response = request.client().execute(request.toRequest(), options); return new RibbonResponse(request.getUri(), response); }
feign.Client.java
@Override public Response execute(Request request, Options options) throws IOException { HttpURLConnection connection = convertAndSend(request, options); return convertResponse(connection, request); } Response convertResponse(HttpURLConnection connection, Request request) throws IOException { // 使用HttpURLConnection 真實(shí)進(jìn)行Http調(diào)用 int status = connection.getResponseCode(); String reason = connection.getResponseMessage(); if (status < 0) { throw new IOException(format("Invalid status(%s) executing %s %s", status, connection.getRequestMethod(), connection.getURL())); } Map<String, Collection<String>> headers = new LinkedHashMap<>(); for (Map.Entry<String, List<String>> field : connection.getHeaderFields().entrySet()) { // response message if (field.getKey() != null) { headers.put(field.getKey(), field.getValue()); } } Integer length = connection.getContentLength(); if (length == -1) { length = null; } InputStream stream; if (status >= 400) { stream = connection.getErrorStream(); } else { stream = connection.getInputStream(); } return Response.builder() .status(status) .reason(reason) .headers(headers) .request(request) .body(stream, length) .build(); }
拓展干貨閱讀:一線大廠面試題、高并發(fā)等主流技術(shù)資料
以上就是Spring Cloud Feign內(nèi)部實(shí)現(xiàn)代碼細(xì)節(jié)的詳細(xì)內(nèi)容,更多關(guān)于Spring Cloud Feign的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- SpringCloud Feign的使用簡(jiǎn)介
- Spring Cloud Alibaba 使用 Feign+Sentinel 完成熔斷的示例
- 完美解決SpringCloud-OpenFeign使用okhttp替換不生效問(wèn)題
- 基于springcloud異步線程池、高并發(fā)請(qǐng)求feign的解決方案
- 淺談SpringCloud feign的http請(qǐng)求組件優(yōu)化方案
- SpringCloud Open feign 使用okhttp 優(yōu)化詳解
- 詳解SpringCloud-OpenFeign組件的使用
- SpringCloud Feign轉(zhuǎn)發(fā)請(qǐng)求頭(防止session失效)的解決方案
- 解決Spring Cloud Feign 請(qǐng)求時(shí)附帶請(qǐng)求頭的問(wèn)題
- SpringCloud OpenFeign Post請(qǐng)求400錯(cuò)誤解決方案
- Spring Cloud Feign原理詳解
相關(guān)文章
Springboot使用jxls實(shí)現(xiàn)同sheet多個(gè)列表展示
這篇文章主要介紹了Springboot使用jxls實(shí)現(xiàn)同sheet多個(gè)列表展示,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-08-08idea啟動(dòng)命令過(guò)長(zhǎng)的問(wèn)題及解決
當(dāng)IDEA啟動(dòng)命令過(guò)長(zhǎng)時(shí),可以通過(guò)修改workspace.xml文件或調(diào)整啟動(dòng)類(lèi)配置來(lái)解決,方案一是在.idea文件或項(xiàng)目目錄中修改workspace.xml;方案二是通過(guò)運(yùn)行配置(run->edit)來(lái)保存啟動(dòng)設(shè)置,這兩種方法都可以有效縮短命令長(zhǎng)度,解決啟動(dòng)錯(cuò)誤2024-09-09spring cloud config 配置中心快速實(shí)現(xiàn)過(guò)程解析
這篇文章主要介紹了spring cloud config 配置中心快速實(shí)現(xiàn)過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-08-08mybatis-plus 自定義 Service Vo接口實(shí)現(xiàn)數(shù)據(jù)庫(kù)實(shí)體與 vo
這篇文章主要介紹了mybatis-plus 自定義 Service Vo接口實(shí)現(xiàn)數(shù)據(jù)庫(kù)實(shí)體與 vo 對(duì)象轉(zhuǎn)換返回功能,本文通過(guò)實(shí)例圖文相結(jié)合給大家介紹的非常詳細(xì),感興趣的朋友跟隨小編一起看看吧2024-08-08使用EasyExcel實(shí)現(xiàn)百萬(wàn)級(jí)別數(shù)據(jù)導(dǎo)出的代碼示例
近期需要開(kāi)發(fā)一個(gè)將百萬(wàn)數(shù)據(jù)量MySQL8的數(shù)據(jù)導(dǎo)出到excel的功能,所以本文講給大家介紹了基于EasyExcel實(shí)現(xiàn)百萬(wàn)級(jí)別數(shù)據(jù)導(dǎo)出,文中通過(guò)代碼示例講解的非常詳細(xì),需要的朋友可以參考下2023-12-12