Flow如何解決背壓問題的方法詳解
前言
隨著時間的推移,越來越多的主流應(yīng)用已經(jīng)開始全面擁抱Kotlin
,協(xié)程的引入,Flow
的誕生,給予了開發(fā)很多便捷,作為協(xié)程與響應(yīng)式編程結(jié)合的流式處理框架,一方面它簡單的數(shù)據(jù)轉(zhuǎn)換與操作符,沒有繁瑣的操作符處理,廣受大部分開發(fā)的青睞,另一方面它并沒有響應(yīng)式編程帶來的背壓問題(BackPressure
)的困擾;接下來,本文將會就Flow如何解決背壓問題進行探討
關(guān)于背壓(BackPressure)
背壓問題是什么
首先我們要明確背壓問題是什么,它是如何產(chǎn)生的?簡單來說,在一般的流處理框架中,消息的接收處理速度跟不上消息的發(fā)送速度,從而導(dǎo)致數(shù)據(jù)不匹配,造成積壓。如果不及時正確處理背壓問題,會導(dǎo)致一些嚴重的問題
- 比如說,消息擁堵了,系統(tǒng)運行不暢從而導(dǎo)致崩潰
- 比如說,資源被消耗殆盡,甚至?xí)l(fā)生數(shù)據(jù)丟失的情況
如下圖所示,可以直觀了解背壓問題的產(chǎn)生,它在生產(chǎn)者的生產(chǎn)速率高于消費者的處理速率的情況下出現(xiàn)
定義背壓策略
既然知道了背壓問題我們已經(jīng)知道是如何產(chǎn)生的,就要去嘗試如何正確處理它,大致意思是,如果你有一個流,你需要一個緩沖區(qū),以防數(shù)據(jù)產(chǎn)生的速度快于消耗的速度,所以往往就會針對這個背壓策略進行些討論
- 定義的中間緩沖區(qū)需要多大才比較合適?
- 如果緩沖區(qū)數(shù)據(jù)已滿了,我們怎么樣處理新的事件?
對于以上問題,通過學(xué)習(xí)Flow
里的背壓策略,相信可以很快就知道答案了
Flow的背壓機制
由于Flow
是基于協(xié)程中使用的,它不需要一些巧妙設(shè)計的解決方案來明確處理背壓,在Flow
中,不同于一些傳統(tǒng)的響應(yīng)式框架,它的背壓管理是使用Kotlin
掛起函數(shù)suspend
實現(xiàn)的,看下源碼你會發(fā)現(xiàn),它里面所有的函數(shù)方法都是使用suspend
修飾符標(biāo)記,這個修飾符就是為了暫停調(diào)度者的執(zhí)行不阻塞線程。因此,Flow<T>
在同一個協(xié)程中發(fā)射和收集時,如果收集器跟不上數(shù)據(jù)流,它可以簡單地暫停元素的發(fā)射,直到它準(zhǔn)備好接收更多??吹竭@,是不是覺得有點難懂.......
簡單舉個例子,假設(shè)我們擁有一個烤箱,可以用來烤面包,由于烤箱容量的限制,一次只能烤4個面包,如果你試著一次烤8個面包,會大大加大烤箱的承載負荷,這已經(jīng)遠遠超過了它的內(nèi)存使用量,很有可能會因此燒掉你的面包。
模擬背壓問題
回顧下之前所說的,當(dāng)我們消耗的速度比生產(chǎn)的速度慢的時候,就會產(chǎn)生背壓,下面用代碼來模擬下這個過程
首先先創(chuàng)建一個方法,用來每秒發(fā)送元素
fun currentTime() = System.currentTimeMillis() fun threadName() = Thread.currentThread().name var start: Long = 0 ? fun createEmitter(): Flow<Int> = (1..5) .asFlow() .onStart { start = currentTime() } .onEach { delay(1000L) print("Emit $it (${currentTime() - start}ms) ") }
接著需要收集元素,這里我們延遲3秒再接收元素, 延遲是為了夸大緩慢的消費者并創(chuàng)建一個超級慢的收集器。
fun main() { runBlocking { val time = measureTimeMillis { createEmitter().collect { print("\nCollect $it starts ${start - currentTime()}ms") delay(3000L) println(" Collect $it ends ${currentTime() - start}ms") } } print("\nCollected in $time ms") } }
看下輸出結(jié)果,如下圖所示
這樣整個過程下來,大概需要20多秒才能結(jié)束,這里我們模擬了接收元素比發(fā)送元素慢的情況,因此就需要一個背壓機制,而這正是Flow本質(zhì)中的,它并不需要另外的設(shè)計來解決背壓
背壓處理方式
使用buffer進行緩存收集
為了使緩沖和背壓處理正常工作,我們需要在單獨的協(xié)程中運行收集器。這就是.buffer()
操作符進來的地方,它是將所有發(fā)出的項目發(fā)送Channel
到在單獨的協(xié)程中運行的收集器。
public fun <T> Flow<T>.buffer( capacity: Int = BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND ): Flow<T>
它還為我們提供了緩沖功能,我們可以指定capacity
我們的緩沖區(qū)和處理策略onBufferOverflow
,所以當(dāng)Buffer
溢出的時候,它為我們提供了三個選項
enum BufferOverflow { SUSPEND, DROP_OLDEST, DROP_LATEST }
- 默認使用
SUSPEND
:會將當(dāng)前協(xié)程掛起,直到緩沖區(qū)中的數(shù)據(jù)被消費了 DROP_OLDEST
:它會丟棄最老的數(shù)據(jù)DROP_LATEST
: 它會丟棄最新的數(shù)據(jù)
好的,我們回到上文所展示的模擬示例,這時候我們可以加入緩沖收集buffer
,不指定任何參數(shù),這樣默認就是使用SUSPEND
,它會將當(dāng)前協(xié)程進行掛起
此時當(dāng)收集器繁忙的時候,程序就開始緩沖,并在第一次收集方法調(diào)用結(jié)束的時候,兩次發(fā)射后再次開始收集,此時流程的耗時時長縮短到大約16秒就可以執(zhí)行完畢,如下圖所示輸出結(jié)果
使用conflate解決
conflate
操作符于Channel
中的Conflate
模式是一直的,新數(shù)據(jù)會直接覆蓋掉舊數(shù)據(jù),它不設(shè)緩沖區(qū),也就是緩沖區(qū)大小為 0,丟棄舊數(shù)據(jù),也就是采取 DROP_OLDEST
策略,那么不就等于buffer(0,BufferOverflow.DROP_OLDEST)
,可以看下它的源碼可以佐證我們的判斷
public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)
在某些情況下,由于根本原因是解決生產(chǎn)消費速率不匹配的問題,我們需要做一些取舍的操作,conflate
將丟棄掉舊數(shù)據(jù),只有在收集器空閑之前發(fā)出的最后一個元素才被收集,將上文的模擬實例改為conflate
執(zhí)行,你會發(fā)現(xiàn)我們直接丟棄掉了2和4,或者說新的數(shù)據(jù)直接覆蓋掉了它們,整個流程只需要10秒左右就執(zhí)行完成了
使用collectLatest解決
通過官方介紹,我們知道collectLatest
作用在于當(dāng)原始流發(fā)出一個新的值的時候,前一個值的處理將被取消,也就是不會被接收, 和conflate
的區(qū)別在于它不會用新的數(shù)據(jù)覆蓋,而是每一個都會被處理,只不過如果前一個還沒被處理完后一個就來了的話,處理前一個數(shù)據(jù)的邏輯就會被取消
suspend fun <T> Flow<T>.collectLatest(action: suspend (T) -> Unit)
還是上文的模擬實例,這里我們使用collectLatest
看下輸出結(jié)果:
這樣也是有副作用的,如果每個更新都非常重要,例如一些視圖,狀態(tài)刷新,這個時候就不必要用collectLatest
; 當(dāng)然如果有些更新可以無損失的覆蓋,例如數(shù)據(jù)庫刷新,就可以使用到collectLatest
,具體詳細的使用場景,還需要靠開發(fā)者自己去衡量選擇使用
小結(jié)
對于Flow
可以說不需要額外提供什么巧妙的方式解決背壓問題,Flow
的本質(zhì),亦或者說Kotlin
協(xié)程本身就已經(jīng)提供了相應(yīng)的解決方案;開發(fā)者只需要在不同的場景中選擇正確的背壓策略即可。總的來說,它們都是通過使用Kotlin
掛起函數(shù)suspend
,當(dāng)流的收集器不堪重負時,它可以簡單地暫停發(fā)射器,然后在準(zhǔn)備好接受更多元素時恢復(fù)它。
關(guān)于掛起函數(shù)suspend
這里就不過多贅述了,只需要明白的一點是它與傳統(tǒng)的基于線程的同步數(shù)據(jù)管道中背壓管理非常相似,無非就是,緩慢的消費者通過阻塞生產(chǎn)者的線程自動向生產(chǎn)者施加背壓,簡單來說,suspend
通過透明地管理跨線程的背壓而不阻塞它們,將其超越單個線程并進入異步編程領(lǐng)域。
以上就是Flow如何解決背壓問題的方法詳解的詳細內(nèi)容,更多關(guān)于Flow 解決背壓問題的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Kotlin協(xié)程Job生命周期結(jié)構(gòu)化并發(fā)詳解
這篇文章主要為大家介紹了Kotlin協(xié)程Job生命周期結(jié)構(gòu)化并發(fā)詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-12-12Android 中RecyclerView頂部刷新實現(xiàn)詳解
這篇文章主要介紹了Android 中RecyclerView頂部刷新實現(xiàn)詳解的相關(guān)資料,希望通過本文能幫助到大家,需要的朋友可以參考下2017-10-10Android開發(fā)之APP安裝后在桌面上不顯示應(yīng)用圖標(biāo)的解決方法
這篇文章主要介紹了Android開發(fā)之APP安裝后在桌面上不顯示應(yīng)用圖標(biāo)的解決方法,涉及Android activity相關(guān)屬性設(shè)置技巧,需要的朋友可以參考下2017-07-07