圖解 Kotlin SharedFlow 緩存系統(tǒng)及示例詳解
前言
Kotlin 為我們提供了兩種創(chuàng)建“熱流”的工具:StateFlow 和 SharedFlow。StateFlow 經(jīng)常被用來替代 LiveData 充當架構組件使用,所以大家相對熟悉。其實 StateFlow 只是 SharedFlow 的一種特化形式,SharedFlow 的功能更強大、使用場景更多,這得益于其自帶的緩存系統(tǒng),本文用圖解的方式,帶大家更形象地理解 SharedFlow 的緩存系統(tǒng)。
創(chuàng)建 SharedFlow 需要使用到 MutableSharedFlow()
方法,我們通過方法的三個參數(shù)配置緩存:
fun <T> MutableSharedFlow( replay: Int = 0, extraBufferCapacity: Int = 0, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND ): MutableSharedFlow<T>
接下來,我們通過時序圖的形式介紹這三個關鍵參數(shù)對緩存的影響。正文之前讓我們先統(tǒng)一一下用語:
- Emitter:Flow 數(shù)據(jù)的生產(chǎn)者,從上游發(fā)射數(shù)據(jù)
- Subcriber:Flow 數(shù)據(jù)的消費者,在下游接收數(shù)據(jù)
replay
當 Subscriber 訂閱 SharedFlow 時,有機會接收到之前已發(fā)送過的數(shù)據(jù),replay 指定了可以收到 subscribe 之前數(shù)據(jù)的數(shù)量。replay 不能為負數(shù),默認值為 0 表示 Subscriber 只能接收到 subscribe 之后 emit 的數(shù)據(jù):
上圖展示的是 replay = 0 的情況,Subscriber 無法收到 subscribe 之前 emit 的 ?,只能接收到 ? 和 ?。
當 replay = n ( n > 0)時,SharedFlow 會啟用緩存,此時 BufferSize 為 n,意味著可以緩存發(fā)射過的最近 n 個數(shù)據(jù),并發(fā)送給新增的 Subscriber。
上圖以 n = 1 為例 :
- Emitter 發(fā)送 ? ,并被 Buffer 緩存
- Subscriber 訂閱 SharedFlow 后,接收到緩存的 ?
- Emitter 相繼發(fā)送 ? ? ,Buffer 緩存的數(shù)據(jù)相繼依次被更新
在生產(chǎn)者消費者模型中,有時消費的速度趕不及生產(chǎn),此時要加以控制,要么停止生產(chǎn),要么丟棄數(shù)據(jù)。SharedFlow 也同樣如此。有時 Subscriber 的處理速度較慢,Buffer 緩存的數(shù)據(jù)得不到及時處理,當 Buffer 為空時,emit 默認將會被掛起 ( onBufferOverflow = SUSPEND)
上面的圖展示了 replay = 1 時 emit 發(fā)生 suspend 場景:
- Emitter 發(fā)送 ? 并被緩存
- Subscriber 訂閱 SharedFlow ,接收 replay 的 ? 開始處理
- Emitter 發(fā)送 ? ,緩存數(shù)據(jù)更新為 ? ,由于 Subscriber 對 ? 的處理尚未結束,? 在緩存中沒有及時被消費
- Emitter 發(fā)送 ?,由于緩存的 ? 尚未被 Subscriber 消費,emit 發(fā)生掛起
- Subscriber 開始消費 ? ,Buffer 緩存 ? , Emitter 可以繼續(xù) emit 新數(shù)據(jù)
注意 SharedFlow 作為一個多播可以有多個 Subscriber,所以上面例子中,? 被消費的時間點,取決于最后一個開始處理的 Subscriber。
extraBufferCapacity
extraBufferCapacity 中的 extra 表示 replay-cache 之外為 Buffer 還可以額外追加的緩存。
若 replay = n, extraBufferCapacity = m,則 BufferSize = m + n。
extraBufferCapacity 默認為 0,設置 extraBufferCapacity 有助于提升 Emitter 的吞吐量
在上圖的基礎之上,我們再設置 extraBufferCapacity = 1,效果如下圖:
上圖中 BufferSize = 1 + 1 = 2 :
- Emitter 發(fā)送 ? 并得到 Subscriber1 的處理 ,? 作為 replay 的一個數(shù)據(jù)被緩存,
- Emitter 發(fā)送 ?,Buffer 中 replay-cache 的數(shù)據(jù)更新為 ?
- Emitter 發(fā)送 ?,Buffer 在存儲了 replay 數(shù)據(jù) ? 之上,作為 extra 又存儲了 ?
- Emitter 發(fā)送 ?,此時 Buffer 已沒有空余位置,emit 掛起
- Subscriber2 訂閱 SharedFlow。雖然此時 Buffer 中存有 ? ? 兩個數(shù)據(jù),但是由于 replay = 1,所以 Subscriber2 只能收到最近的一個數(shù)據(jù) ?
- Subscriber1 處理完 ? 后,依次處理 Buffer 中的下一個數(shù)據(jù),開始消費 ?
- 對于 SharedFlow 來說,已經(jīng)不存在沒有消費 ? 的 Subscriber,? 移除緩存,? 的 emit 繼續(xù),并進入緩存,此時 Buffer 又有兩個數(shù)據(jù) ? ? ,
- Subscriber1 處理完 ? ,開始消費 ?
- 不存在沒有消費 ? 的 Subscriber, ? 移除緩存。
onBufferOverflow
前面的例子中,當 Buffer 被填滿時,emit 會被掛起,這都是建立在 onBufferOverflow 為 SUSPEND 的前提下的。onBufferOverflow 用來指定緩存移除時的策略,除了默認的 SUSPEND,還有兩個數(shù)據(jù)丟棄策略:
- DROP_LATEST:丟棄最新的數(shù)據(jù)
- DROP_OLDEST:丟棄最老的數(shù)據(jù)
需要特別注意的是,當 BufferSize = 0 時,extraBufferCapacity 只支持 SUSPEND,其他丟棄策略是無效的。這很好理解,因為 Buffer 中沒有數(shù)據(jù),所以丟棄無從下手,所以啟動丟棄策略的前提是 Buffer 至少有一個緩沖區(qū),且數(shù)據(jù)被填滿
上圖展示 DROP_LATEST 的效果。假設 replay = 2,extra = 0
- Emitter 發(fā)送 ? 時,由于 ? 已經(jīng)被消費,所以 Buffer 數(shù)據(jù)從 ?? 變?yōu)???
- Emitter 發(fā)送 ? 時,由于 ? 還未被消費,Buffer 處于填滿狀態(tài), ? 直接被丟棄
- Emitter 發(fā)送 ? 時,由于 ? 已經(jīng)被費,可以移除緩存,Buffer 數(shù)據(jù)變?yōu)???
上圖展示了 DROP_OLDEST 的效果,與 DROP_LATEST 比較后非常明顯,緩存中永遠會儲存最新的兩個數(shù)據(jù),但是較老的數(shù)據(jù)不管有沒有被消費,都可能會從 Buffer 移除,所以 Subscriber 可以消費當前最新的數(shù)據(jù),但是有可能漏掉中間的數(shù)據(jù),比如圖中漏掉了 ?
注意:當 extraBufferCapacity 設為 SUSPEND 可以保證 Subscriber 一個不漏的消費掉所有數(shù)據(jù),但是會影響 Emitter 的速度;當設置為 DROP_XXX 時,可以保證 emit 調用后立即返回,但是 Subscriber 可能會漏掉部分數(shù)據(jù)。
如果我們不想讓 emit 發(fā)生掛起,除了設置 DROP_XXX 之外,還有一個方法就是調用 tryEmit
,這是一個非 suspend 版本的 emit
abstract suspend override fun emit(value: T) abstract fun tryEmit(value: T): Boolean
tryEmit 返回一個 boolean 值,你可以這樣判斷返回值,當使用 emit 會掛起時,使用 tryEmit 會返回 false,其余情況都是 true。這意味著 tryEmit 返回 false 的前提是 extraBufferCapacity 必須設為 SUSPEND,且 Buffer 中空余位置為 0 。此時使用 tryEmit 的效果等同于 DROP_LATEST。
SharedFlow Buffer
前面介紹的 MutableSharedFlow 的三個參數(shù),其本質都是圍繞 SharedFlow 的 Buffer 進行工作的。那么這個 Buffer 具體結構是怎樣的呢?
上面這個圖是 SharedFlow 源碼中關于 Buffer 的注釋,這個圖形象地告訴了我們 Buffer 是一個線性數(shù)據(jù)結構(就是一個普通的數(shù)組 Array<Any?>
),但是這個圖不能直觀反應 Buffer 運行機制。下面通過一個例子,看一下 Buffer 在運行時的具體更新過程:
val sharedFlow = MutableSharedFlow<Int>( replay = 2, extraBufferCapacity = 2, onBufferOverflow = BufferOverflow.SUSPEND ) var emitValue = 1 fun main() { runBlocking { launch { sharedFlow.onEach { delay(200) // simulate the consume of data }.collect() } repeat(12) { sharedFlow.emit(emitValue) emitValue++ delay(50) } } }
上面的代碼很簡單,SharedFlow 的 BufferSize = 2+2 = 4,Emitter 生產(chǎn)的速度大于 Subscriber 消費的速度,所以過程中會出現(xiàn) Buffer 的填充和更新,下面依舊用圖的方式展示 Buffer 的變化
先看一下代碼對應的時序圖:
有前面的介紹,相信這個時序圖很容易理解,這里就不再贅述了,下面重點圖解一下 Buffer 的內存變化。SharedFlow 的 Buffer 本質上是一個基于 Array 實現(xiàn)的 queue,通過指針移動從往隊列增刪元素,避免了元素在實際數(shù)組中的移動。這里關鍵的指針有三個:
- head:隊列的 head 指向 Buffer 的第一個有效數(shù)據(jù),這是時間上最早進入緩存的數(shù)據(jù),在數(shù)據(jù)被所有的 Subscriber 消費之前不會移除緩存。因此 head 也代表了最慢的 Subscriber 的處理進度
- replay:Buffer 為 replay-cache 預留空間的其實位置,當有新的 Subscriber 訂閱發(fā)生時,從此位置開始處理數(shù)據(jù)。
- end:新數(shù)據(jù)進入緩存時的位置,end 這也代表了最快的 Subscriber 的處理進度。
如果 bufferSize 表示當前 Buffer 中存儲數(shù)據(jù)的個數(shù),則我們可知三指針 index 符合如下關系:
- replay <= head + bufferSize
- end = head + bufferSize
了解了三指針的含義后,我們再來看上圖中的 Buffer 是如何工作的:
最后,總結一下 Buffer 的特點:
- 基于數(shù)組實現(xiàn),當數(shù)組空間不夠時進行 2n 的擴容
- 元素進入數(shù)組后的位置保持不變,通過移動指針,決定數(shù)據(jù)的消費起點
- 指針移動到數(shù)組尾部后,會重新指向頭部,數(shù)組空間可循環(huán)使用
以上就是圖解 Kotlin SharedFlow 緩存系統(tǒng)及示例詳解的詳細內容,更多關于Kotlin SharedFlow 緩存的資料請關注腳本之家其它相關文章!
相關文章
Android開發(fā)實現(xiàn)的電話竊聽和攔截應用
這篇文章主要介紹了Android開發(fā)實現(xiàn)的電話竊聽和攔截應用,結合實例形式分析了Android針對電話的監(jiān)聽與攔截的相關技巧,需要的朋友可以參考下2016-08-08Android中WebView與Js交互的實現(xiàn)方法
本文給大家介紹android中webview與js交互的實現(xiàn)方法,本文介紹的非常詳細,具有參考借鑒價值,感興趣的朋友一起學習2016-05-05Android NavigationView頭部設置監(jiān)聽事件
這篇文章主要為大家詳細介紹了Android NavigationView頭部設置監(jiān)聽事件,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-10-10Android開發(fā)實現(xiàn)從相冊中選擇照片功能詳解
這篇文章主要介紹了Android開發(fā)實現(xiàn)從相冊中選擇照片功能,涉及Android權限控制、事件綁定、文件路徑與獲取等相關操作技巧,需要的朋友可以參考下2019-03-03Android使用緩存機制實現(xiàn)文件下載及異步請求圖片加三級緩存
這篇文章主要介紹了Android使用緩存機制實現(xiàn)文件下載及異步請求圖片加三級緩存的相關資料,需要的朋友可以參考下2016-02-02