Java?OkHttp框架源碼超詳細解析
一、自己的理解的OkHttp
我理解的http的本質(zhì)就是基于socket連接,把要傳輸?shù)臄?shù)據(jù)按照http協(xié)議的格式去封裝后,傳輸在網(wǎng)絡中,以此來實現(xiàn)的網(wǎng)絡通信。
而OkHttp協(xié)議就是幫助我們,把我們把要傳輸?shù)臄?shù)據(jù)請求,按照http協(xié)議的格式,傳輸在Socket上,當然還有很多優(yōu)化管理這些請求和連接的方法,例如:對于這些請求的管理:最多同時進行64個請求,同域名的最多同時進行5個請求。還有Socket連接池的管理。
二、OkHttp的使用方法
1.創(chuàng)建一個client,構(gòu)建一個request
OkHttpClient client = new OkHttpClient(); Request request = new Request.Builder() .url("https://www.baidu.com/") .build();
2.同步請求
Response response = client.newCall(request).execute();
3.異步請求
client.newCall(request).enqueue(new Callback() { @Override public void onFailure(@NotNull Call call, @NotNull IOException e) { //todo handle request failed } @Override public void onResponse(@NotNull Call call, @NotNull Response response) throws IOException { //todo handle Response } });
三、基本對象介紹
1.OkHttpClient
一個請求的配置類,采用了建造者模式,方便用戶配置一些請求參數(shù),如配置callTimeout
,cookie
,interceptor
等等。
open class OkHttpClient internal constructor( builder: Builder ) : Cloneable, Call.Factory, WebSocket.Factory { constructor() : this(Builder()) class Builder constructor() { //調(diào)度器 internal var dispatcher: Dispatcher = Dispatcher() //連接池 internal var connectionPool: ConnectionPool = ConnectionPool() //整體流程攔截器 internal val interceptors: MutableList<Interceptor> = mutableListOf() //網(wǎng)絡流程攔截器 internal val networkInterceptors: MutableList<Interceptor> = mutableListOf() //流程監(jiān)聽器 internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory() //連接失敗時是否重連 internal var retryOnConnectionFailure = true //服務器認證設置 internal var authenticator: Authenticator = Authenticator.NONE //是否重定向 internal var followRedirects = true //是否從HTTP重定向到HTTPS internal var followSslRedirects = true //cookie設置 internal var cookieJar: CookieJar = CookieJar.NO_COOKIES //緩存設置 internal var cache: Cache? = null //DNS設置 internal var dns: Dns = Dns.SYSTEM //代理設置 internal var proxy: Proxy? = null //代理選擇器設置 internal var proxySelector: ProxySelector? = null //代理服務器認證設置 internal var proxyAuthenticator: Authenticator = Authenticator.NONE //socket配置 internal var socketFactory: SocketFactory = SocketFactory.getDefault() //https socket配置 internal var sslSocketFactoryOrNull: SSLSocketFactory? = null internal var x509TrustManagerOrNull: X509TrustManager? = null internal var connectionSpecs: List<ConnectionSpec> = DEFAULT_CONNECTION_SPECS //協(xié)議 internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS //域名校驗 internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT internal var certificateChainCleaner: CertificateChainCleaner? = null //請求超時 internal var callTimeout = 0 //連接超時 internal var connectTimeout = 10_000 //讀取超時 internal var readTimeout = 10_000 //寫入超時 internal var writeTimeout = 10_000 internal var pingInterval = 0 internal var minWebSocketMessageToCompress = RealWebSocket.DEFAULT_MINIMUM_DEFLATE_SIZE internal var routeDatabase: RouteDatabase? = null ···省略代碼···
2.request
同樣是請求參數(shù)的配置類,也同樣采用了建造者模式,但相比于OkHttpClient
,Request
就十分簡單了,只有四個參數(shù),分別是請求URL、請求方法、請求頭、請求體。
class Request internal constructor( @get:JvmName("url") val url: HttpUrl, @get:JvmName("method") val method: String, @get:JvmName("headers") val headers: Headers, @get:JvmName("body") val body: RequestBody?, internal val tags: Map<Class<*>, Any> ) { open class Builder { //請求的URL internal var url: HttpUrl? = null //請求方法,如:GET、POST.. internal var method: String //請求頭 internal var headers: Headers.Builder //請求體 internal var body: RequestBody? = null ···省略代碼···
3.Call
請求調(diào)用接口,表示這個請求已經(jīng)準備好可以執(zhí)行,也可以取消,只能執(zhí)行一次。
interface Call : Cloneable { /** 返回發(fā)起此調(diào)用的原始請求 */ fun request(): Request /** * 同步請求,立即執(zhí)行。 * * 拋出兩種異常: * 1. 請求失敗拋出IOException; * 2. 如果在執(zhí)行過一回的前提下再次執(zhí)行拋出IllegalStateException;*/ @Throws(IOException::class) fun execute(): Response /** * 異步請求,將請求安排在將來的某個時間點執(zhí)行。 * 如果在執(zhí)行過一回的前提下再次執(zhí)行拋出IllegalStateException */ fun enqueue(responseCallback: Callback) /** 取消請求。已經(jīng)完成的請求不能被取消 */ fun cancel() /** 是否已被執(zhí)行 */ fun isExecuted(): Boolean /** 是否被取消 */ fun isCanceled(): Boolean /** 一個完整Call請求流程的超時時間配置,默認選自[OkHttpClient.Builder.callTimeout] */ fun timeout(): Timeout /** 克隆這個call,創(chuàng)建一個新的相同的Call */ public override fun clone(): Call /** 利用工廠模式來讓 OkHttpClient 來創(chuàng)建 Call對象 */ fun interface Factory { fun newCall(request: Request): Call } }
4.RealCall
OkHttpClient.kt override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)
RealCall
是Call接口
的具體實現(xiàn)類,是應用端與網(wǎng)絡層的連接橋,展示應用端原始的請求與連接數(shù)據(jù),以及網(wǎng)絡層返回的response
及其它數(shù)據(jù)流。 通過使用方法也可知,創(chuàng)建RealCall
對象后,就要調(diào)用同步或異步請求方法,所以它里面還包含同步請求 execute()
與異步請求 enqueue()
方法。(后面具體展開分析)
5.AsyncCall
異步請求調(diào)用,是RealCall
的一個內(nèi)部類,就是一個Runnable
,被dispatcher調(diào)度器中的線程池所執(zhí)行。
inner class AsyncCall( //用戶傳入的響應回調(diào)方法 private val responseCallback: Callback ) : Runnable { //同一個域名的請求次數(shù),volatile + AtomicInteger 保證在多線程下及時可見性與原子性 @Volatile var callsPerHost = AtomicInteger(0) private set fun reuseCallsPerHostFrom(other: AsyncCall) { this.callsPerHost = other.callsPerHost } ···省略代碼··· fun executeOn(executorService: ExecutorService) { client.dispatcher.assertThreadDoesntHoldLock() var success = false try { //調(diào)用線程池執(zhí)行 executorService.execute(this) success = true } catch (e: RejectedExecutionException) { val ioException = InterruptedIOException("executor rejected") ioException.initCause(e) noMoreExchanges(ioException) //請求失敗,調(diào)用 Callback.onFailure() 方法 responseCallback.onFailure(this@RealCall, ioException) } finally { if (!success) { //請求失敗,調(diào)用調(diào)度器finish方法 client.dispatcher.finished(this) // This call is no longer running! } } } override fun run() { threadName("OkHttp ${redactedUrl()}") { var signalledCallback = false timeout.enter() try { //請求成功,獲取到服務器返回的response val response = getResponseWithInterceptorChain() signalledCallback = true //調(diào)用 Callback.onResponse() 方法,將 response 傳遞出去 responseCallback.onResponse(this@RealCall, response) } catch (e: IOException) { if (signalledCallback) { // Do not signal the callback twice! Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e) } else { //請求失敗,調(diào)用 Callback.onFailure() 方法 responseCallback.onFailure(this@RealCall, e) } } catch (t: Throwable) { //請求出現(xiàn)異常,調(diào)用cancel方法來取消請求 cancel() if (!signalledCallback) { val canceledException = IOException("canceled due to $t") canceledException.addSuppressed(t) //請求失敗,調(diào)用 Callback.onFailure() 方法 responseCallback.onFailure(this@RealCall, canceledException) } throw t } finally { //請求結(jié)束,調(diào)用調(diào)度器finish方法 client.dispatcher.finished(this) } } } }
6.Dispatcher
調(diào)度器,用來調(diào)度Call
對象,同時包含線程池與異步請求隊列,用來存放與執(zhí)行AsyncCall
對象。
class Dispatcher constructor() { @get:Synchronized @get:JvmName("executorService") val executorService: ExecutorService get() { if (executorServiceOrNull == null) { //創(chuàng)建一個緩存線程池,來處理請求調(diào)用,這個線程池的核心線程數(shù)是0,等待隊列的長度也是0,意味著 //線程池會直接創(chuàng)建新的線程去處理請求 executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS, SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false)) } return executorServiceOrNull!! } /** 已準備好的異步請求隊列 */ @get:Synchronized private val readyAsyncCalls = ArrayDeque<AsyncCall>() /** 正在運行的異步請求隊列, 包含取消但是還未finish的AsyncCall */ private val runningAsyncCalls = ArrayDeque<AsyncCall>() /** 正在運行的同步請求隊列, 包含取消但是還未finish的RealCall */ private val runningSyncCalls = ArrayDeque<RealCall>() ···省略代碼··· }
四、流程分析
1.同步請求
client.newCall(request).execute();
newCall
方法就是創(chuàng)建一個RealCall
對象,然后執(zhí)行其execute()
方法。
RealCall.kt override fun execute(): Response { //CAS判斷是否已經(jīng)被執(zhí)行了, 確保只能執(zhí)行一次,如果已經(jīng)執(zhí)行過,則拋出異常 check(executed.compareAndSet(false, true)) { "Already Executed" } //請求超時開始計時 timeout.enter() //開啟請求監(jiān)聽 callStart() try { //調(diào)用調(diào)度器中的 executed() 方法,調(diào)度器只是將 call 加入到了runningSyncCalls隊列中 client.dispatcher.executed(this) //調(diào)用getResponseWithInterceptorChain 方法拿到 response return getResponseWithInterceptorChain() } finally { //執(zhí)行完畢,調(diào)度器將該 call 從 runningSyncCalls隊列中移除 client.dispatcher.finished(this) } } Dispatcher.kt @Synchronized internal fun executed(call: RealCall) { runningSyncCalls.add(call) }
調(diào)用調(diào)度器executed
方法,就是將當前的RealCall
對象加入到runningSyncCalls
隊列中,然后調(diào)用getResponseWithInterceptorChain
方法拿到response
。
2.異步請求
RealCall.kt override fun enqueue(responseCallback: Callback) { //CAS判斷是否已經(jīng)被執(zhí)行了, 確保只能執(zhí)行一次,如果已經(jīng)執(zhí)行過,則拋出異常 check(executed.compareAndSet(false, true)) { "Already Executed" } //開啟請求監(jiān)聽 callStart() //新建一個AsyncCall對象,通過調(diào)度器enqueue方法加入到readyAsyncCalls隊列中 client.dispatcher.enqueue(AsyncCall(responseCallback)) }
然后調(diào)用調(diào)度器的enqueue
方法
Dispatcher.kt internal fun enqueue(call: AsyncCall) { //加鎖,保證線程安全 synchronized(this) { //將該請求調(diào)用加入到 readyAsyncCalls 隊列中 readyAsyncCalls.add(call) // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to // the same host. if (!call.call.forWebSocket) { //通過域名來查找有沒有相同域名的請求,有則復用。 val existingCall = findExistingCallWithHost(call.host) if (existingCall != null) call.reuseCallsPerHostFrom(existingCall) } } //執(zhí)行請求 promoteAndExecute() } private fun promoteAndExecute(): Boolean { this.assertThreadDoesntHoldLock() val executableCalls = mutableListOf<AsyncCall>() //判斷是否有請求正在執(zhí)行 val isRunning: Boolean //加鎖,保證線程安全 synchronized(this) { //遍歷 readyAsyncCalls 隊列 val i = readyAsyncCalls.iterator() while (i.hasNext()) { val asyncCall = i.next() //runningAsyncCalls 的數(shù)量不能大于最大并發(fā)請求數(shù) 64 if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity. //同域名最大請求數(shù)5,同一個域名最多允許5條線程同時執(zhí)行請求 if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity. //從 readyAsyncCalls 隊列中移除,并加入到 executableCalls 及 runningAsyncCalls 隊列中 i.remove() asyncCall.callsPerHost.incrementAndGet() executableCalls.add(asyncCall) runningAsyncCalls.add(asyncCall) } //通過運行隊列中的請求數(shù)量來判斷是否有請求正在執(zhí)行 isRunning = runningCallsCount() > 0 } //遍歷可執(zhí)行隊列,調(diào)用線程池來執(zhí)行AsyncCall for (i in 0 until executableCalls.size) { val asyncCall = executableCalls[i] asyncCall.executeOn(executorService) } return isRunning }
調(diào)度器的enqueue
方法就是將AsyncCall
加入到readyAsyncCalls
隊列中,然后調(diào)用promoteAndExecute
方法來執(zhí)行請求,promoteAndExecute
方法做的其實就是遍歷readyAsyncCalls
隊列,然后將符合條件的請求用線程池執(zhí)行,也就是會執(zhí)行AsyncCall.run()
方法。
AsyncCall 方法的具體代碼看上面的這邊就不在此展示了,簡單來說就是調(diào)用getResponseWithInterceptorChain
方法拿到response
,然后通過Callback.onResponse
方法傳遞出去。反之,如果請求失敗,捕獲了異常,就通過Callback.onFailure
將異常信息傳遞出去。 最終,請求結(jié)束,調(diào)用調(diào)度器finish
方法。
Dispatcher.kt /** 異步請求調(diào)用結(jié)束方法 */ internal fun finished(call: AsyncCall) { call.callsPerHost.decrementAndGet() finished(runningAsyncCalls, call) } /** 同步請求調(diào)用結(jié)束方法 */ internal fun finished(call: RealCall) { finished(runningSyncCalls, call) } private fun <T> finished(calls: Deque<T>, call: T) { val idleCallback: Runnable? synchronized(this) { //將當前請求調(diào)用從 正在運行隊列 中移除 if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!") idleCallback = this.idleCallback } //繼續(xù)執(zhí)行剩余請求,將call從readyAsyncCalls中取出加入到runningAsyncCalls,然后執(zhí)行 val isRunning = promoteAndExecute() if (!isRunning && idleCallback != null) { //如果執(zhí)行完了所有請求,處于閑置狀態(tài),調(diào)用閑置回調(diào)方法 idleCallback.run() } }
請求結(jié)束,異步請求,把當前同域名的計數(shù)減一,然后后面和同步一樣,都是把請求從正在執(zhí)行的隊列中移除,然后繼續(xù)執(zhí)行剩余請求。
3.獲取Response
接著就是看看getResponseWithInterceptorChain
方法是如何拿到response
的。
internal fun getResponseWithInterceptorChain(): Response { //攔截器列表 val interceptors = mutableListOf<Interceptor>() interceptors += client.interceptors interceptors += RetryAndFollowUpInterceptor(client) interceptors += BridgeInterceptor(client.cookieJar) interceptors += CacheInterceptor(client.cache) interceptors += ConnectInterceptor if (!forWebSocket) { interceptors += client.networkInterceptors } interceptors += CallServerInterceptor(forWebSocket) //構(gòu)建攔截器責任鏈 val chain = RealInterceptorChain( call = this, interceptors = interceptors, index = 0, exchange = null, request = originalRequest, connectTimeoutMillis = client.connectTimeoutMillis, readTimeoutMillis = client.readTimeoutMillis, writeTimeoutMillis = client.writeTimeoutMillis ) //如果call請求完成,那就意味著交互完成了,沒有更多的東西來交換了 var calledNoMoreExchanges = false try { //執(zhí)行攔截器責任鏈來獲取 response val response = chain.proceed(originalRequest) //如果被取消,關(guān)閉響應,拋出異常 if (isCanceled()) { response.closeQuietly() throw IOException("Canceled") } return response } catch (e: IOException) { calledNoMoreExchanges = true throw noMoreExchanges(e) as Throwable } finally { if (!calledNoMoreExchanges) { noMoreExchanges(null) } } }
簡單概括一下:這里采用了責任鏈設計模式,通過攔截器構(gòu)建了以RealInterceptorChain
責任鏈,然后執(zhí)行proceed
方法來得到response
。
那么,這又涉及攔截器是什么?攔截器責任鏈又是什么?
五、Interceptor
只聲明了一個攔截器方法,在子類中具體實現(xiàn),還包含一個Chain
接口,核心方法是proceed(request)
處理請求來獲取response
。
fun interface Interceptor { /** 攔截方法 */ @Throws(IOException::class) fun intercept(chain: Chain): Response interface Chain { /** 原始請求數(shù)據(jù) */ fun request(): Request /** 核心方法,處理請求,獲取response */ @Throws(IOException::class) fun proceed(request: Request): Response fun connection(): Connection? fun call(): Call fun connectTimeoutMillis(): Int fun withConnectTimeout(timeout: Int, unit: TimeUnit): Chain fun readTimeoutMillis(): Int fun withReadTimeout(timeout: Int, unit: TimeUnit): Chain fun writeTimeoutMillis(): Int fun withWriteTimeout(timeout: Int, unit: TimeUnit): Chain } }
六、RealInterceptorChain
攔截器鏈就是實現(xiàn)Interceptor.Chain
接口,重點就是復寫的proceed
方法。
class RealInterceptorChain( internal val call: RealCall, private val interceptors: List<Interceptor>, private val index: Int, internal val exchange: Exchange?, internal val request: Request, internal val connectTimeoutMillis: Int, internal val readTimeoutMillis: Int, internal val writeTimeoutMillis: Int ) : Interceptor.Chain { ···省略代碼··· private var calls: Int = 0 override fun call(): Call = call override fun request(): Request = request @Throws(IOException::class) override fun proceed(request: Request): Response { check(index < interceptors.size) calls++ if (exchange != null) { check(exchange.finder.sameHostAndPort(request.url)) { "network interceptor ${interceptors[index - 1]} must retain the same host and port" } check(calls == 1) { "network interceptor ${interceptors[index - 1]} must call proceed() exactly once" } } //index+1, 復制創(chuàng)建新的責任鏈,也就意味著調(diào)用責任鏈中的下一個處理者,也就是下一個攔截器 val next = copy(index = index + 1, request = request) //取出當前攔截器 val interceptor = interceptors[index] //執(zhí)行當前攔截器的攔截方法 @Suppress("USELESS_ELVIS") val response = interceptor.intercept(next) ?: throw NullPointerException( "interceptor $interceptor returned null") if (exchange != null) { check(index + 1 >= interceptors.size || next.calls == 1) { "network interceptor $interceptor must call proceed() exactly once" } } check(response.body != null) { "interceptor $interceptor returned a response with no body" } return response } }
鏈式調(diào)用,最終會向下執(zhí)行攔截器列表中的每個攔截器,然后向上返回Response
。
七、攔截器
各類攔截器的總結(jié),按順序:
client.interceptors
:這是由開發(fā)者設置的,會在所有的攔截器處理之前進行最早的攔截處理,可用于添加一些公共參數(shù),如自定義header
、自定義log
等等。RetryAndFollowUpInterceptor
:這里會對連接做一些初始化工作,以及請求失敗的重試工作,重定向的后續(xù)請求工作。跟他的名字一樣,就是做重試工作還有一些連接跟蹤工作。BridgeInterceptor
:是客戶端與服務器之間的溝通橋梁,負責將用戶構(gòu)建的請求轉(zhuǎn)換為服務器需要的請求,以及將網(wǎng)絡請求返回回來的響應轉(zhuǎn)換為用戶可用的響應。CacheInterceptor
:這里主要是緩存的相關(guān)處理,會根據(jù)用戶在OkHttpClient
里定義的緩存配置,然后結(jié)合請求新建一個緩存策略,由它來判斷是使用網(wǎng)絡還是緩存來構(gòu)建response
。ConnectInterceptor
:這里主要就是負責建立連接,會建立TCP連接
或者TLS連接
。Client.networkInterceptors
:這里也是開發(fā)者自己設置的,所以本質(zhì)上和第一個攔截器差不多,但是由于位置不同,所以用處也不同。CallServerInterceptor
:這里就是進行網(wǎng)絡數(shù)據(jù)的請求和響應了,也就是實際的網(wǎng)絡I/O操作,將請求頭與請求體發(fā)送給服務器,以及解析服務器返回的response
。
接下來我們按順序,從上往下,對這些攔截器進行一一解讀。
1.client.interceptors
這是用戶自己定義的攔截器,稱為應用攔截器,會保存在OkHttpClient
的interceptors: List<Interceptor>
列表中。 他是攔截器責任鏈中的第一個攔截器,也就是說會第一個執(zhí)行攔截方法,我們可以通過它來添加自定義Header信息
,如:
class HeaderInterceptor implements Interceptor { @Override public Response intercept(Chain chain) throws IOException { Request request = chain.request().newBuilder() .addHeader("device-android", "xxxxxxxxxxx") .addHeader("country-code", "ZH") .build(); return chain.proceed(request); } } //然后在 OkHttpClient 中加入 OkHttpClient client = new OkHttpClient.Builder() .connectTimeout(60, TimeUnit.SECONDS) .readTimeout(15, TimeUnit.SECONDS) .writeTimeout(15, TimeUnit.SECONDS) .cookieJar(new MyCookieJar()) .addInterceptor(new HeaderInterceptor())//添加自定義Header攔截器 .build();
2.RetryAndFollowUpInterceptor
第二個攔截器,從它的名字也可知道,它負責請求失敗的重試工作與重定向的后續(xù)請求工作,同時它會對連接做一些初始化工作。
class RetryAndFollowUpInterceptor(private val client: OkHttpClient) : Interceptor { @Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { val realChain = chain as RealInterceptorChain var request = chain.request val call = realChain.call var followUpCount = 0 var priorResponse: Response? = null var newExchangeFinder = true var recoveredFailures = listOf<IOException>() while (true) { //這里會新建一個ExchangeFinder,ConnectInterceptor會使用到 call.enterNetworkInterceptorExchange(request, newExchangeFinder) var response: Response var closeActiveExchange = true try { if (call.isCanceled()) { throw IOException("Canceled") } try { response = realChain.proceed(request) newExchangeFinder = true } catch (e: RouteException) { //嘗試通過路由連接失敗。該請求將不會被發(fā)送。 if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) { throw e.firstConnectException.withSuppressed(recoveredFailures) } else { recoveredFailures += e.firstConnectException } newExchangeFinder = false continue } catch (e: IOException) { //嘗試與服務器通信失敗。該請求可能已發(fā)送。 if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) { throw e.withSuppressed(recoveredFailures) } else { recoveredFailures += e } newExchangeFinder = false continue } // Attach the prior response if it exists. Such responses never have a body. //嘗試關(guān)聯(lián)上一個response,注意:body是為null if (priorResponse != null) { response = response.newBuilder() .priorResponse(priorResponse.newBuilder() .body(null) .build()) .build() } val exchange = call.interceptorScopedExchange //會根據(jù) responseCode 來判斷,構(gòu)建一個新的request并返回來重試或者重定向 val followUp = followUpRequest(response, exchange) if (followUp == null) { if (exchange != null && exchange.isDuplex) { call.timeoutEarlyExit() } closeActiveExchange = false return response } //如果請求體是一次性的,不需要再次重試 val followUpBody = followUp.body if (followUpBody != null && followUpBody.isOneShot()) { closeActiveExchange = false return response } response.body?.closeQuietly() //最大重試次數(shù),不同的瀏覽器是不同的,比如:Chrome為21,Safari則是16 if (++followUpCount > MAX_FOLLOW_UPS) { throw ProtocolException("Too many follow-up requests: $followUpCount") } request = followUp priorResponse = response } finally { call.exitNetworkInterceptorExchange(closeActiveExchange) } } } /** 判斷是否要進行重連,false->不嘗試重連;true->嘗試重連。*/ private fun recover( e: IOException, call: RealCall, userRequest: Request, requestSendStarted: Boolean ): Boolean { //客戶端禁止重試 if (!client.retryOnConnectionFailure) return false //不能再次發(fā)送該請求體 if (requestSendStarted && requestIsOneShot(e, userRequest)) return false //發(fā)生的異常是致命的,無法恢復,如:ProtocolException if (!isRecoverable(e, requestSendStarted)) return false //沒有更多的路由來嘗試重連 if (!call.retryAfterFailure()) return false // 對于失敗恢復,使用帶有新連接的相同路由選擇器 return true } ···省略代碼···
3.BridgeInterceptor
從它的名字可以看出,他的定位是客戶端與服務器之間的溝通橋梁,負責將用戶構(gòu)建的請求轉(zhuǎn)換為服務器需要的請求,比如:添加Content-Type
,添加Cookie
,添加User-Agent
等等。再將服務器返回的response
做一些處理轉(zhuǎn)換為客戶端需要的response
。比如:移除響應頭中的Content-Encoding
、Content-Length
等等。
class BridgeInterceptor(private val cookieJar: CookieJar) : Interceptor { @Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { //獲取原始請求數(shù)據(jù) val userRequest = chain.request() val requestBuilder = userRequest.newBuilder() //重新構(gòu)建請求頭,請求體信息 val body = userRequest.body val contentType = body.contentType() requestBuilder.header("Content-Type", contentType.toString()) requestBuilder.header("Content-Length", contentLength.toString()) requestBuilder.header("Transfer-Encoding", "chunked") requestBuilder.header("Host", userRequest.url.toHostHeader()) requestBuilder.header("Connection", "Keep-Alive") ···省略代碼··· //添加cookie val cookies = cookieJar.loadForRequest(userRequest.url) if (cookies.isNotEmpty()) { requestBuilder.header("Cookie", cookieHeader(cookies)) } //添加user-agent if (userRequest.header("User-Agent") == null) { requestBuilder.header("User-Agent", userAgent) } //重新構(gòu)建一個Request,然后執(zhí)行下一個攔截器來處理該請求 val networkResponse = chain.proceed(requestBuilder.build()) cookieJar.receiveHeaders(userRequest.url, networkResponse.headers) //創(chuàng)建一個新的responseBuilder,目的是將原始請求數(shù)據(jù)構(gòu)建到response中 val responseBuilder = networkResponse.newBuilder() .request(userRequest) if (transparentGzip && "gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) && networkResponse.promisesBody()) { val responseBody = networkResponse.body if (responseBody != null) { val gzipSource = GzipSource(responseBody.source()) val strippedHeaders = networkResponse.headers.newBuilder() .removeAll("Content-Encoding") .removeAll("Content-Length") .build() //修改response header信息,移除Content-Encoding,Content-Length信息 responseBuilder.headers(strippedHeaders) val contentType = networkResponse.header("Content-Type") //修改response body信息 responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer())) } } return responseBuilder.build() ···省略代碼···
4.CacheInterceptor
用戶可以通過OkHttpClient.cache
來配置緩存,緩存攔截器通過CacheStrategy
來判斷是使用網(wǎng)絡還是緩存來構(gòu)建response
。
class CacheInterceptor(internal val cache: Cache?) : Interceptor { @Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { val call = chain.call() //通過request從OkHttpClient.cache中獲取緩存 val cacheCandidate = cache?.get(chain.request()) val now = System.currentTimeMillis() //創(chuàng)建一個緩存策略,用來確定怎么使用緩存 val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute() //為空表示不使用網(wǎng)絡,反之,則表示使用網(wǎng)絡 val networkRequest = strategy.networkRequest //為空表示不使用緩存,反之,則表示使用緩存 val cacheResponse = strategy.cacheResponse //追蹤網(wǎng)絡與緩存的使用情況 cache?.trackResponse(strategy) val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE //有緩存但不適用,關(guān)閉它 if (cacheCandidate != null && cacheResponse == null) { cacheCandidate.body?.closeQuietly() } //如果網(wǎng)絡被禁止,但是緩存又是空的,構(gòu)建一個code為504的response,并返回 if (networkRequest == null && cacheResponse == null) { return Response.Builder() .request(chain.request()) .protocol(Protocol.HTTP_1_1) .code(HTTP_GATEWAY_TIMEOUT) .message("Unsatisfiable Request (only-if-cached)") .body(EMPTY_RESPONSE) .sentRequestAtMillis(-1L) .receivedResponseAtMillis(System.currentTimeMillis()) .build().also { listener.satisfactionFailure(call, it) } } //如果我們禁用了網(wǎng)絡不使用網(wǎng)絡,且有緩存,直接根據(jù)緩存內(nèi)容構(gòu)建并返回response if (networkRequest == null) { return cacheResponse!!.newBuilder() .cacheResponse(stripBody(cacheResponse)) .build().also { listener.cacheHit(call, it) } } //為緩存添加監(jiān)聽 if (cacheResponse != null) { listener.cacheConditionalHit(call, cacheResponse) } else if (cache != null) { listener.cacheMiss(call) } var networkResponse: Response? = null try { //責任鏈往下處理,從服務器返回response 賦值給 networkResponse networkResponse = chain.proceed(networkRequest) } finally { //捕獲I/O或其他異常,請求失敗,networkResponse為空,且有緩存的時候,不暴露緩存內(nèi)容。 if (networkResponse == null && cacheCandidate != null) { cacheCandidate.body?.closeQuietly() } } //如果有緩存 if (cacheResponse != null) { //且網(wǎng)絡返回response code為304的時候,使用緩存內(nèi)容新構(gòu)建一個Response返回。 if (networkResponse?.code == HTTP_NOT_MODIFIED) { val response = cacheResponse.newBuilder() .headers(combine(cacheResponse.headers, networkResponse.headers)) .sentRequestAtMillis(networkResponse.sentRequestAtMillis) .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis) .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build() networkResponse.body!!.close() // Update the cache after combining headers but before stripping the // Content-Encoding header (as performed by initContentStream()). cache!!.trackConditionalCacheHit() cache.update(cacheResponse, response) return response.also { listener.cacheHit(call, it) } } else { //否則關(guān)閉緩存響應體 cacheResponse.body?.closeQuietly() } } //構(gòu)建網(wǎng)絡請求的response val response = networkResponse!!.newBuilder() .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build() //如果cache不為null,即用戶在OkHttpClient中配置了緩存,則將上一步新構(gòu)建的網(wǎng)絡請求response存到cache中 if (cache != null) { //根據(jù)response的code,header以及CacheControl.noStore來判斷是否可以緩存 if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) { // 將該response存入緩存 val cacheRequest = cache.put(response) return cacheWritingResponse(cacheRequest, response).also { if (cacheResponse != null) { listener.cacheMiss(call) } } } //根據(jù)請求方法來判斷緩存是否有效,只對Get請求進行緩存,其它方法的請求則移除 if (HttpMethod.invalidatesCache(networkRequest.method)) { try { //緩存無效,將該請求緩存從client緩存配置中移除 cache.remove(networkRequest) } catch (_: IOException) { // The cache cannot be written. } } } return response } ···省略代碼···
5.ConnectInterceptor
負責實現(xiàn)與服務器真正建立起連接,
object ConnectInterceptor : Interceptor { @Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { val realChain = chain as RealInterceptorChain //初始化一個exchange對象 val exchange = realChain.call.initExchange(chain) //根據(jù)這個exchange對象來復制創(chuàng)建一個新的連接責任鏈 val connectedChain = realChain.copy(exchange = exchange) //執(zhí)行該連接責任鏈 return connectedChain.proceed(realChain.request) } }
一掃下來,代碼十分簡單,攔截方法里就只有三步。
- 初始化一個
exchange
對象。 - 然后根據(jù)這個
exchange
對象來復制創(chuàng)建一個新的連接責任鏈。 - 執(zhí)行該連接責任鏈。
那這個exchange
對象又是什么呢?
RealCall.kt internal fun initExchange(chain: RealInterceptorChain): Exchange { ...省略代碼... //這里的exchangeFinder就是在RetryAndFollowUpInterceptor中創(chuàng)建的 val exchangeFinder = this.exchangeFinder!! //返回一個ExchangeCodec(是個編碼器,為request編碼以及為response解碼) val codec = exchangeFinder.find(client, chain) //根據(jù)exchangeFinder與codec新構(gòu)建一個Exchange對象,并返回 val result = Exchange(this, eventListener, exchangeFinder, codec) ...省略代碼... return result }
具體看看ExchangeFinder.find()
這一步,
ExchangeFinder.kt fun find( client: OkHttpClient, chain: RealInterceptorChain ): ExchangeCodec { try { //查找合格可用的連接,返回一個 RealConnection 對象 val resultConnection = findHealthyConnection( connectTimeout = chain.connectTimeoutMillis, readTimeout = chain.readTimeoutMillis, writeTimeout = chain.writeTimeoutMillis, pingIntervalMillis = client.pingIntervalMillis, connectionRetryEnabled = client.retryOnConnectionFailure, doExtensiveHealthChecks = chain.request.method != "GET" ) //根據(jù)連接,創(chuàng)建并返回一個請求響應編碼器:Http1ExchangeCodec 或者 Http2ExchangeCodec,分別對應Http1協(xié)議與Http2協(xié)議 return resultConnection.newCodec(client, chain) } catch (e: RouteException) { trackFailure(e.lastConnectException) throw e } catch (e: IOException) { trackFailure(e) throw RouteException(e) } }
繼續(xù)往下看findHealthyConnection
方法
ExchangeFinder.kt private fun findHealthyConnection( connectTimeout: Int, readTimeout: Int, writeTimeout: Int, pingIntervalMillis: Int, connectionRetryEnabled: Boolean, doExtensiveHealthChecks: Boolean ): RealConnection { while (true) { //重點:查找連接 val candidate = findConnection( connectTimeout = connectTimeout, readTimeout = readTimeout, writeTimeout = writeTimeout, pingIntervalMillis = pingIntervalMillis, connectionRetryEnabled = connectionRetryEnabled ) //檢查該連接是否合格可用,合格則直接返回該連接 if (candidate.isHealthy(doExtensiveHealthChecks)) { return candidate } //如果該連接不合格,標記為不可用,從連接池中移除 candidate.noNewExchanges() ...省略代碼... } }
所以核心方法就是findConnection
,我們繼續(xù)深入看看該方法:
private fun findConnection( connectTimeout: Int, readTimeout: Int, writeTimeout: Int, pingIntervalMillis: Int, connectionRetryEnabled: Boolean ): RealConnection { if (call.isCanceled()) throw IOException("Canceled") //第一次,嘗試重連 call 中的 connection,不需要去重新獲取連接 val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()! if (callConnection != null) { var toClose: Socket? = null synchronized(callConnection) { if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) { toClose = call.releaseConnectionNoEvents() } } //如果 call 中的 connection 還沒有釋放,就重用它。 if (call.connection != null) { check(toClose == null) return callConnection } //如果 call 中的 connection 已經(jīng)被釋放,關(guān)閉Socket. toClose?.closeQuietly() eventListener.connectionReleased(call, callConnection) } //需要一個新的連接,所以重置一些狀態(tài) refusedStreamCount = 0 connectionShutdownCount = 0 otherFailureCount = 0 //第二次,嘗試從連接池中獲取一個連接,不帶路由,不帶多路復用 if (connectionPool.callAcquirePooledConnection(address, call, null, false)) { val result = call.connection!! eventListener.connectionAcquired(call, result) return result } //連接池中是空的,準備下次嘗試連接的路由 val routes: List<Route>? val route: Route ...省略代碼... //第三次,再次嘗試從連接池中獲取一個連接,帶路由,不帶多路復用 if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) { val result = call.connection!! eventListener.connectionAcquired(call, result) return result } route = localRouteSelection.next() } //第四次,手動創(chuàng)建一個新連接 val newConnection = RealConnection(connectionPool, route) call.connectionToCancel = newConnection try { newConnection.connect( connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, call, eventListener ) } finally { call.connectionToCancel = null } call.client.routeDatabase.connected(newConnection.route()) //第五次,再次嘗試從連接池中獲取一個連接,帶路由,帶多路復用。 //這一步主要是為了校驗一下,比如已經(jīng)有了一條連接了,就可以直接復用,而不用使用手動創(chuàng)建的新連接。 if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) { val result = call.connection!! nextRouteToTry = route newConnection.socket().closeQuietly() eventListener.connectionAcquired(call, result) return result } synchronized(newConnection) { //將手動創(chuàng)建的新連接放入連接池 connectionPool.put(newConnection) call.acquireConnectionNoEvents(newConnection) } eventListener.connectionAcquired(call, newConnection) return newConnection }
在代碼中可以看出,一共做了5次嘗試去得到連接:
- 第一次,嘗試重連 call 中的 connection,不需要去重新獲取連接。
- 第二次,嘗試從連接池中獲取一個連接,不帶路由,不帶多路復用。
- 第三次,再次嘗試從連接池中獲取一個連接,帶路由,不帶多路復用。
- 第四次,手動創(chuàng)建一個新連接。
- 第五次,再次嘗試從連接池中獲取一個連接,帶路由,帶多路復用。
這一步就是為了建立連接。
6.client.networkInterceptors
該攔截器稱為網(wǎng)絡攔截器,與client.interceptors
一樣也是由用戶自己定義的,同樣是以列表的形式存在OkHttpClient
中。
那這兩個攔截器有什么不同呢?
其實他兩的不同都是由于他們所處的位置不同所導致的,應用攔截器處于第一個位置,所以無論如何它都會被執(zhí)行,而且只會執(zhí)行一次。而網(wǎng)絡攔截器處于倒數(shù)第二的位置,它不一定會被執(zhí)行,而且可能會被執(zhí)行多次,比如:在RetryAndFollowUpInterceptor
失敗或者CacheInterceptor
直接返回緩存的情況下,我們的網(wǎng)絡攔截器是不會被執(zhí)行的。
7.CallServerInterceptor
到了這里,客戶端與服務器已經(jīng)建立好了連接,接著就是將請求頭與請求體發(fā)送給服務器,以及解析服務器返回的response
了。
class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor { @Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { val realChain = chain as RealInterceptorChain val exchange = realChain.exchange!! val request = realChain.request val requestBody = request.body var invokeStartEvent = true var responseBuilder: Response.Builder? = null try { //寫入請求頭 exchange.writeRequestHeaders(request) //如果不是GET請求,并且請求體不為空 if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) { //當請求頭為"Expect: 100-continue"時,在發(fā)送請求體之前需要等待服務器返回"HTTP/1.1 100 Continue" 的response,如果沒有等到該response,就不發(fā)送請求體。 //POST請求,先發(fā)送請求頭,在獲取到100繼續(xù)狀態(tài)后繼續(xù)發(fā)送請求體 if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) { //刷新請求,即發(fā)送請求頭 exchange.flushRequest() //解析響應頭 responseBuilder = exchange.readResponseHeaders(expectContinue = true) exchange.responseHeadersStart() invokeStartEvent = false } //寫入請求體 if (responseBuilder == null) { if (requestBody.isDuplex()) { //如果請求體是雙公體,就先發(fā)送請求頭,稍后在發(fā)送請求體 exchange.flushRequest() val bufferedRequestBody = exchange.createRequestBody(request, true).buffer() //寫入請求體 requestBody.writeTo(bufferedRequestBody) } else { //如果獲取到了"Expect: 100-continue"響應,寫入請求體 val bufferedRequestBody = exchange.createRequestBody(request, false).buffer() requestBody.writeTo(bufferedRequestBody) bufferedRequestBody.close() } ···省略代碼··· //請求結(jié)束,發(fā)送請求體 exchange.finishRequest() ···省略代碼··· try { if (responseBuilder == null) { //讀取響應頭 responseBuilder = exchange.readResponseHeaders(expectContinue = false)!! ···省略代碼··· //構(gòu)建一個response var response = responseBuilder .request(request) .handshake(exchange.connection.handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build() var code = response.code ···省略代碼··· return response ···省略代碼···
簡單概括一下:寫入發(fā)送請求頭,然后根據(jù)條件是否寫入發(fā)送請求體,請求結(jié)束。解析服務器返回的請求頭,然后構(gòu)建一個新的response
,并返回。 這里CallServerInterceptor
是攔截器責任鏈中最后一個攔截器了,所以他不會再調(diào)用chain.proceed()
方法往下執(zhí)行,而是將這個構(gòu)建的response
往上傳遞給責任鏈中的每個攔截器。
總結(jié)一下流程:
到此這篇關(guān)于Java OkHttp框架源碼超詳細解析的文章就介紹到這了,更多相關(guān)Java OkHttp框架內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java高并發(fā)系統(tǒng)限流算法的實現(xiàn)
這篇文章主要介紹了Java高并發(fā)系統(tǒng)限流算法的應用,在開發(fā)高并發(fā)系統(tǒng)時有三把利器用來保護系統(tǒng):緩存、降級和限流,限流可以認為服務降級的一種,限流是對系統(tǒng)的一種保護措施,需要的朋友可以參考下2022-05-05Java中FTPClient上傳中文目錄、中文文件名亂碼問題解決方法
這篇文章主要介紹了Java中FTPClient上傳中文目錄、中文文件名亂碼問題解決方法,本文使用apache-commons-net工具包時遇到這個問題,解決方法很簡單,需要的朋友可以參考下2015-05-05Seata集成Mybatis-Plus解決多數(shù)據(jù)源事務問題
當進行業(yè)務操作時,訂單發(fā)生異常 ,進行了回滾操作,因為在不同的數(shù)據(jù)庫實例中,余額卻扣除成功,此時發(fā)現(xiàn)數(shù)據(jù)不一致問題,本文給大家介紹Seata集成Mybatis-Plus解決多數(shù)據(jù)源事務問題,感興趣的朋友一起看看吧2023-11-11mybatisPlus 實體類與數(shù)據(jù)庫表映射關(guān)系詳解
這篇文章主要介紹了mybatisPlus 實體類與數(shù)據(jù)庫表映射關(guān)系詳解,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教。2022-01-01IDEA使用Mybatis插件 MyBatisCodeHelper-Pro的圖文教程
這篇文章主要介紹了IDEA使用Mybatis插件 MyBatisCodeHelper-Pro的教程,本文通過圖文并茂的形式給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-09-09JAVA實現(xiàn)監(jiān)測tomcat是否宕機及控制重啟的方法
這篇文章主要介紹了JAVA實現(xiàn)監(jiān)測tomcat是否宕機及控制重啟的方法,可實現(xiàn)有效的檢測及控制tomcat服務器運行,具有一定參考借鑒價值,需要的朋友可以參考下2015-08-08