Java OkHttp框架源碼深入解析
1.OkHttp發(fā)起網(wǎng)絡(luò)請求
可以通過OkHttpClient發(fā)起一個網(wǎng)絡(luò)請求
//創(chuàng)建一個Client,相當(dāng)于打開一個瀏覽器 OkHttpClient okHttpClient = new OkHttpClient.Builder().build(); //創(chuàng)建一個請求。 Request request = new Request.Builder() .url("http://www.baidu.com") .method("GET",null) .build(); //調(diào)用Client 創(chuàng)建一個Call。 Call call = okHttpClient.newCall(request); //Call傳入一個回調(diào)函數(shù),并加入到請求隊列。 call.enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) { } @Override public void onResponse(Call call, Response response) throws IOException { } }); }
通過Retrofit發(fā)起一個OkHttp請求
Retrofit retrofit = new Retrofit.Builder() .baseUrl("http://www.baidu.com/") .build(); NetInterface netInterface = retrofit.create(NetInterface.class); Call<Person> call = netInterface.getPerson(); call.enqueue(new Callback<Person>() { @Override public void onResponse(Call<Person> call, Response<Person> response) { } @Override public void onFailure(Call<Person> call, Throwable t) { } });
以上兩種方式都是通過call.enqueue() 把網(wǎng)絡(luò)請求加入到請求隊列的。
這個call是RealCall的一個對象。
public void enqueue(Callback responseCallback) { client.dispatcher().enqueue(new AsyncCall(responseCallback)); }
這里有兩個判斷條件
runningAsyncCalls.size() < maxRequests如果運行隊列數(shù)量大于最大數(shù)量,
runningCallsForHost(call) < maxRequestsPerHost并且訪問同一臺服務(wù)器的請求數(shù)量大于最大數(shù)量,請求會放入等待隊列,否則加入運行隊列,直接執(zhí)行。
//等待隊列 private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>(); //運行隊列 private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>(); //運行隊列數(shù)量最大值 private int maxRequests = 64; //訪問不同主機的最大數(shù)量 private int maxRequestsPerHost = 5;
dispatcher.java synchronized void enqueue(AsyncCall call) { if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) { runningAsyncCalls.add(call); executorService().execute(call); } else { readyAsyncCalls.add(call); } }
接下來看這行代碼executorService().execute(call);
executorService()拿到一個線程池實例,
public synchronized ExecutorService executorService() { if (executorService == null) { executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false)); } return executorService;
execute(call)執(zhí)行任務(wù),發(fā)起網(wǎng)絡(luò)請求。
//AsyncCall.java @Override protected void execute() { try { //這個方法去請求網(wǎng)絡(luò),會返回Respose Response response = getResponseWithInterceptorChain(); //請求成功,回調(diào)接口 responseCallback.onResponse(RealCall.this, response); }catch(Exceptrion e){ //失敗回調(diào) responseCallback.onFailure(RealCall.this, e); }finally { //從當(dāng)前運行隊列中刪除這個請求 client.dispatcher().finished(this); } }
getResponseWithInterceptorChain()
這行代碼,使用了設(shè)計模式中的責(zé)任鏈模式。
//這個方法命名:通過攔截器鏈,獲取Response Response getResponseWithInterceptorChain() throws IOException { // Build a full stack of interceptors. List<Interceptor> interceptors = new ArrayList<>(); // 這個我們自己定義的攔截器。 interceptors.addAll(client.interceptors()); //重試和重定向攔截器 interceptors.add(retryAndFollowUpInterceptor); //請求頭攔截器 interceptors.add(new BridgeInterceptor(client.cookieJar())); //緩存攔截器 interceptors.add(new CacheInterceptor(client.internalCache())); //連接攔截器 interceptors.add(new ConnectInterceptor(client)); if (!forWebSocket) { interceptors.addAll(client.networkInterceptors()); } //訪問攔截器 interceptors.add(new CallServerInterceptor(forWebSocket)); //攔截器責(zé)任鏈 Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0, originalRequest, this, eventListener, client.connectTimeoutMillis(), client.readTimeoutMillis(), client.writeTimeoutMillis()); //執(zhí)行攔截器集合中的攔截器 return chain.proceed(originalRequest); }
責(zé)任鏈模式中,鏈條的上游持有下游對象的引用。這樣能夠保證在鏈條上的每一個對象,都能對其符合條件的任務(wù)進行處理。
但是在上面的攔截器構(gòu)成責(zé)任鏈中,是把攔截器,放在了一個集合中。
第一個參數(shù)interceptors 是一個攔截器的集合。
第五個參數(shù)0是集合的index,RealInterceptorChain就是根據(jù)這個索引值+1,
對chain.proceed方法循環(huán)調(diào)用,進行集合遍歷,并執(zhí)行攔截器中定義的方法的。
這個責(zé)任鏈模式,并沒有明確的指定下游對象是什么,而是通過集合index值的變化,動態(tài)的指定的。
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0......) chain.proceed(originalRequest); public Response proceed(Request request,...){ //構(gòu)建一個index+1的攔截器鏈 RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec, connection, index + 1,....); //拿到當(dāng)前的攔截器 Interceptor interceptor = interceptors.get(index); //調(diào)用攔截器intercept(next)方法, //在這個方法中繼續(xù)調(diào)用realChain.proceed(),從而進行循環(huán)調(diào)用,index索引值再加1. Response response = interceptor.intercept(next); }
2.OkHttp的連接器
1)RetryAndFollowUpInterceptor:重試和重定向攔截器
public Response intercept(Chain chain){ while (true) { Response response; try { //創(chuàng)建StreamAllocation對象,這個對象會在連接攔截器中用到 StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(), createAddress(request.url()), call, eventListener, callStackTrace); this.streamAllocation = streamAllocation; 調(diào)用責(zé)任鏈下游攔截器 response = realChain.proceed(request, streamAllocation, null, null); } catch (RouteException e) { // The attempt to connect via a route failed. The request will not have been sent. 路由異常,請求還沒發(fā)出去。 這樣這個recover(),如果返回的是false,則拋出異常,不再重試 如果返回的是true,則執(zhí)行下面的continue,進行下一次while循環(huán),進行重試,重新發(fā)起網(wǎng)絡(luò)請求。 if (!recover(e.getLastConnectException(), streamAllocation, false, request)) { throw e.getFirstConnectException(); } releaseConnection = false; continue; } catch (IOException e) { // An attempt to communicate with a server failed. The request may have been sent. 請求已經(jīng)發(fā)出去了,但是和服務(wù)器連接失敗了。 這個recover()返回值的處理邏輯和上面異常一樣。 boolean requestSendStarted = !(e instanceof ConnectionShutdownException); if (!recover(e, streamAllocation, requestSendStarted, request)) throw e; releaseConnection = false; continue; } } finally {//finally是必定會執(zhí)行到的,不管上面的catch中執(zhí)行的是continue還是thow // We're throwing an unchecked exception. Release any resources. if (releaseConnection) { streamAllocation.streamFailed(null); streamAllocation.release(); } } 在這個重試攔截器中,okhttp的做法很巧妙。先是在外面有一個while循環(huán),如果發(fā)生異常, 會在recover方法中對異常類型進行判斷,如果不符合屬于重試,則返回false,并thow e,結(jié)束while循環(huán)。 如果符合重試的條件,則返回true,在上面的catch代碼塊中執(zhí)行continue方法,進入下一個while循環(huán)。 //如果請求正常,并且返回了response,則會進行重定向的邏輯判斷 followUpRequest在這個方法中會根據(jù)ResponseCode,狀態(tài)碼進行重定向的判斷, Request followUp; try { followUp = followUpRequest(response, streamAllocation.route()); } catch (IOException e) { streamAllocation.release(); throw e; } 如果flolowUp 為null,則不需要重定向,直接返回response if (followUp == null) { if (!forWebSocket) { streamAllocation.release(); } return response; } 如果flolowUp 不為null,則進行重定向了請求 如果重定向次數(shù)超過MAX_FOLLOW_UPS=20次,則拋出異常,結(jié)束while循環(huán) if (++followUpCount > MAX_FOLLOW_UPS) { streamAllocation.release(); throw new ProtocolException("Too many follow-up requests: " + followUpCount); } if (followUp.body() instanceof UnrepeatableRequestBody) { streamAllocation.release(); throw new HttpRetryException("Cannot retry streamed HTTP body", response.code()); } if (!sameConnection(response, followUp.url())) { streamAllocation.release(); //從重定向請求中拿到url,封裝一個新的streamAllocation對象, streamAllocation = new StreamAllocation(client.connectionPool(), createAddress(followUp.url()), call, eventListener, callStackTrace); this.streamAllocation = streamAllocation; } else if (streamAllocation.codec() != null) { throw new IllegalStateException("Closing the body of " + response + " didn't close its backing stream. Bad interceptor?"); } //將重定向請求賦值給request 進入下一個重定向的請求的while循環(huán),繼續(xù)走上面的while循環(huán)代碼 request = followUp; priorResponse = response; } } //只有這個方法返回值為false都不進行重試。 private boolean recover(IOException e, StreamAllocation streamAllocation, boolean requestSendStarted, Request userRequest) { streamAllocation.streamFailed(e); // The application layer has forbidden retries. 應(yīng)用層禁止重試。可以通過OkHttpClient進行配置(默認是允許的) if (!client.retryOnConnectionFailure()) return false; // We can't send the request body again. if (requestSendStarted && userRequest.body() instanceof UnrepeatableRequestBody) return false; // This exception is fatal. 致命的異常 判斷是否屬于重試的異常 if (!isRecoverable(e, requestSendStarted)) return false; // No more routes to attempt. 沒有更多可以連接的路由線路 if (!streamAllocation.hasMoreRoutes()) return false; // For failure recovery, use the same route selector with a new connection. return true; } 只有這個方法返回false,都不進行重試。 private boolean isRecoverable(IOException e, boolean requestSendStarted) { // If there was a protocol problem, don't recover. 出現(xiàn)了協(xié)議異常,不再重試 if (e instanceof ProtocolException) { return false; } // If there was an interruption don't recover, but if there was a timeout connecting to a route // we should try the next route (if there is one). requestSendStarted為false時,并且異常類型為Scoket超時異常,將會進行下一次重試 if (e instanceof InterruptedIOException) { return e instanceof SocketTimeoutException && !requestSendStarted; } // Look for known client-side or negotiation errors that are unlikely to be fixed by trying // again with a different route. 如果是一個握手異常,并且證書出現(xiàn)問題,則不能重試 if (e instanceof SSLHandshakeException) { // If the problem was a CertificateException from the X509TrustManager, // do not retry. if (e.getCause() instanceof CertificateException) { return false; } }
2)BridgeInterceptor 橋攔截器:連接服務(wù)器的橋梁,主要是在請求頭中設(shè)置一些參數(shù)配置
如:請求內(nèi)容長度,編碼,gzip壓縮等。
public Response intercept(Chain chain) throws IOException { Request userRequest = chain.request(); Request.Builder requestBuilder = userRequest.newBuilder(); RequestBody body = userRequest.body(); if (body != null) { MediaType contentType = body.contentType(); if (contentType != null) { requestBuilder.header("Content-Type", contentType.toString()); } .................. } 在請求頭中添加gizp,是否壓縮 boolean transparentGzip = false; if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) { transparentGzip = true; requestBuilder.header("Accept-Encoding", "gzip"); } //cookies List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url()); if (!cookies.isEmpty()) { requestBuilder.header("Cookie", cookieHeader(cookies)); } 調(diào)用責(zé)任鏈中下一個攔截器的方法,網(wǎng)絡(luò)請求得到的數(shù)據(jù)封裝到networkResponse中 Response networkResponse = chain.proceed(requestBuilder.build()); 對cookie進行處理 HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers()); 如果設(shè)置了gzip,則會對networkResponse進行解壓縮。 if (transparentGzip && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding")) && HttpHeaders.hasBody(networkResponse)) { GzipSource responseBody = new GzipSource(networkResponse.body().source()); Headers strippedHeaders = networkResponse.headers().newBuilder() .removeAll("Content-Encoding") .removeAll("Content-Length") .build(); responseBuilder.headers(strippedHeaders); String contentType = networkResponse.header("Content-Type"); responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody))); } return responseBuilder.build(); }
3)CacheInterceptor緩存攔截器
public Response intercept(Chain chain){ // this.cache = DiskLruCache.create(fileSystem, directory, 201105, 2, maxSize); 這個緩存在底層使用的是DiskLruCache //以request為key從緩存中拿到response。 Response cacheCandidate = cache != null ? cache.get(chain.request()): null; long now = System.currentTimeMillis(); //緩存策略 CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get(); Request networkRequest = strategy.networkRequest; Response cacheResponse = strategy.cacheResponse; // If we're forbidden from using the network and the cache is insufficient, fail. //如果請求和響應(yīng)都為null,直接返回504 if (networkRequest == null && cacheResponse == null) { return new Response.Builder() .request(chain.request()) .protocol(Protocol.HTTP_1_1) .code(504) .message("Unsatisfiable Request (only-if-cached)") .body(Util.EMPTY_RESPONSE) .sentRequestAtMillis(-1L) .receivedResponseAtMillis(System.currentTimeMillis()) .build(); } // If we don't need the network, we're done. //如果請求為null,緩存不為null,則直接使用緩存。 if (networkRequest == null) { return cacheResponse.newBuilder() .cacheResponse(stripBody(cacheResponse)) .build(); } Response networkResponse = null; try { //調(diào)用責(zé)任鏈下一個攔截器 networkResponse = chain.proceed(networkRequest); } finally { } Response response = networkResponse.newBuilder() .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build(); // Offer this request to the cache. //將響應(yīng)存入緩存。 CacheRequest cacheRequest = cache.put(response); }
4)ConnectInterceptor 連接攔截器。當(dāng)一個請求發(fā)出,需要建立連接,然后再通過流進行讀寫。
public Response intercept(Chain chain) throws IOException { RealInterceptorChain realChain = (RealInterceptorChain) chain; Request request = realChain.request(); //在重定向攔截器中創(chuàng)建, StreamAllocation streamAllocation = realChain.streamAllocation(); boolean doExtensiveHealthChecks = !request.method().equals("GET"); //從連接池中,找到一個可以復(fù)用的連接, HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks); // RealConnection 中封裝了一個Socket和一個Socket連接池 RealConnection connection = streamAllocation.connection(); //調(diào)用下一個攔截器 return realChain.proceed(request, streamAllocation, httpCodec, connection); } //遍歷連接池 RealConnection get(Address address, StreamAllocation streamAllocation, Route route) { assert (Thread.holdsLock(this)); for (RealConnection connection : connections) { if (connection.isEligible(address, route)) { streamAllocation.acquire(connection, true); return connection; } } return null; } public boolean isEligible(Address address, @Nullable Route route) { // If this connection is not accepting new streams, we're done. if (allocations.size() >= allocationLimit || noNewStreams) return false; // If the non-host fields of the address don't overlap, we're done. if (!Internal.instance.equalsNonHost(this.route.address(), address)) return false; // If the host exactly matches, we're done: this connection can carry the address. 從連接池中找到一個連接參數(shù)一致且并未占用的連接 if (address.url().host().equals(this.route().address().url().host())) { return true; // This connection is a perfect match. }
5)CallServerInterceptor 請求服務(wù)器攔截器
/** This is the last interceptor in the chain. It makes a network call to the server. */ 這是責(zé)任鏈中最后一個攔截器,這個會去請求服務(wù)器。 public Response intercept(Chain chain) throws IOException { RealInterceptorChain realChain = (RealInterceptorChain) chain; HttpCodec httpCodec = realChain.httpStream(); StreamAllocation streamAllocation = realChain.streamAllocation(); RealConnection connection = (RealConnection) realChain.connection(); Request request = realChain.request(); //將請求頭寫入緩存 httpCodec.writeRequestHeaders(request); return response;
到此這篇關(guān)于Java OkHttp框架源碼深入解析的文章就介紹到這了,更多相關(guān)Java OkHttp框架內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java實現(xiàn)一個簡單的Web服務(wù)器實例解析
這篇文章主要介紹了java實現(xiàn)一個簡單的Web服務(wù)器實例解析,分享了相關(guān)代碼示例,小編覺得還是挺不錯的,具有一定借鑒價值,需要的朋友可以參考下2018-02-02Java基礎(chǔ)之?dāng)?shù)組超詳細知識總結(jié)
這篇文章主要介紹了Java基礎(chǔ)之?dāng)?shù)組詳解,文中有非常詳細的代碼示例,對正在學(xué)習(xí)java基礎(chǔ)的小伙伴們有很好的幫助,需要的朋友可以參考下2021-05-05淺談java中Math.random()與java.util.random()的區(qū)別
下面小編就為大家?guī)硪黄獪\談java中Math.random()與java.util.random()的區(qū)別。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2016-09-09Java中WeakHashMap和HashMap的區(qū)別詳解
這篇文章主要介紹了Java中WeakHashMap和HashMap的區(qū)別詳解,WeakHashMap和HashMap一樣,WeakHashMap也是一個散列表,它存儲的內(nèi)容也是鍵值對(key-value)映射,而且鍵和值都可以為null,需要的朋友可以參考下2023-09-09Executor攔截器高級教程QueryInterceptor的規(guī)范
今天小編就為大家分享一篇關(guān)于Executor攔截器高級教程QueryInterceptor的規(guī)范,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2018-12-12Idea安裝Eslint插件提示:Plugin NativeScript was not installed的問題
這篇文章主要介紹了Idea安裝Eslint插件提示:Plugin NativeScript was not installed的問題,本文通過圖文并茂的形式給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-10-10