Kotlin Flow操作符及基本使用詳解
一、Flow的基本概念
Kotlin 的 Flow 相信大家都或多或少使用過,畢竟目前比較火,目前我把Flow的使用整理了一下。希望和大家所學對照一下,能有所啟發(fā)。
Kotlin 的 Flow 用于流式編程,作用于 Kotlin 的協(xié)程內(nèi)。與協(xié)程的生命周期綁定,當協(xié)程取消的時候,F(xiàn)low 也會取消。
Flow 的操作符和 RxJava 類似,如果之前會 RxJava 那么可以輕松上手,相比 RxJava ,F(xiàn)low 更加的簡單與場景化。
按照 Flow 的數(shù)據(jù)流順序發(fā)送的過程,我們對數(shù)據(jù)流的三個角色交提供方(創(chuàng)建),中介(轉(zhuǎn)換),使用方(接收)。
按照 Flow 流 是否由接收者開始接收觸發(fā)整個流的啟動,我們分為冷流(由接收方啟動)與熱流(不由接收方啟動)。
基本的使用:
import kotlinx.coroutines.* import kotlinx.coroutines.flow.* runBlocking { flow { for (i in 1..5) { delay(100) emit(i) } }.collect { println(it) } }
打印結(jié)果:
基本的使用默認是冷流,從 collect 方法開始發(fā)送數(shù)據(jù),這里簡單的定義一個創(chuàng)建者和一個接收者。
二、Flow的生命周期與異常處理
和 RxJava 一樣 ,我們同樣的可以監(jiān)聽任務(wù)流的生命周期,開始,結(jié)束,與接收
2.1 開始與結(jié)束
runBlocking { flow { for (i in 1..5) { delay(100) emit(i) } }.onStart { YYLogUtils.w("onStart") }.onCompletion { exception -> YYLogUtils.w("onCompletion: $exception") }.collect { v -> YYLogUtils.w(v.toString()) } }
打印結(jié)果:
2.2 異常的處理
假如如果我們手動的拋一個異常,看看日志的打印順序
val exceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable -> YYLogUtils.e(throwable.message ?: "Unkown Error") } lifecycleScope.launch(exceptionHandler) { flow { for (i in 1..5) { delay(100) emit(i) if (i == 3) throw RuntimeException("自定義錯誤") } }.onStart { YYLogUtils.w("onStart") }.onCompletion { exception -> YYLogUtils.w("onCompletion: $exception") }.collect { v -> YYLogUtils.w(v.toString()) } }
看到打印結(jié)果:
我們可以在 onCompletion 中打印出錯誤的對象。但是大家看到我這里使用的協(xié)程的上下文來捕獲的異常,因為如果不捕獲這個異常,程序會崩潰,那可不可以不使用協(xié)程的異常捕獲?可以,我們可以使用 Flow 的異常捕獲。
runBlocking { flow { for (i in 1..5) { delay(100) emit(i) if (i == 3) throw RuntimeException("自定義錯誤") } }.onStart { YYLogUtils.w("onStart") }.onCompletion { exception -> YYLogUtils.w("onCompletion: $exception") }.catch { exception -> YYLogUtils.e("catch: $exception") }.collect { v -> YYLogUtils.w(v.toString()) } }
可以看到打印的順序是和上面的一樣的,只是換成了Flow來捕獲異常了
2.3 retry的處理
retry 方法可以讓我們程序在錯誤或異常的時候嘗試重新執(zhí)行創(chuàng)建者的操作。
runBlocking { flow { for (i in 1..5) { delay(100) emit(i) if (i == 3) throw RuntimeException("自定義錯誤") } }.retry(2).onStart { YYLogUtils.w("onStart") }.onCompletion { exception -> YYLogUtils.w("onCompletion: $exception") }.catch { exception -> YYLogUtils.e("catch: $exception") }.collect { v -> YYLogUtils.w(v.toString()) } }
可以看到打印的結(jié)果:
只會走一次開始的回調(diào)和結(jié)束的回調(diào)。
2.4 超時的處理
measureTimeMillis { }
,withTimeout(xx) { }
這些都是協(xié)程內(nèi)部的快捷處理方法,處理時間相關(guān)。
withTimeout 超時的處理使用的時候協(xié)程的快捷函數(shù),其實跟Flow沒什么關(guān)系,并不是Flow包下面的,所以我們不能使用Flow的異常來處理,因為它是Flow的父親協(xié)程里面的錯誤,我們可以使用try-catch。也可以使用協(xié)程上下文來處理異常。
val exceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable -> YYLogUtils.e(throwable.message ?: "Unkown Error") } lifecycleScope.launch(exceptionHandler) { withTimeout(200) { flow { for (i in 1..5) { delay(100) emit(i) if (i == 3) throw RuntimeException("自定義錯誤") } }.retry(2).onStart { YYLogUtils.w("onStart") }.onCompletion { exception -> YYLogUtils.w("onCompletion: $exception") }.catch { exception -> YYLogUtils.e("catch: $exception") }.collect { v -> YYLogUtils.w(v.toString()) } } }
打印結(jié)果如下:
如果我想計算Flow的耗時,其實就是計算協(xié)程內(nèi)的一個任務(wù)的耗時,跟是不是Flow沒關(guān)系。
lifecycleScope.launch(exceptionHandler) { val time = measureTimeMillis { flow { for (i in 1..5) { delay(100) emit(i) } }.onStart { YYLogUtils.w("onStart") }.onCompletion { exception -> YYLogUtils.w("onCompletion: $exception") }.catch { exception -> YYLogUtils.e("catch: $exception") }.collect { v -> YYLogUtils.w(v.toString()) } } YYLogUtils.w("總耗時time:" + time) }
打印的結(jié)果:
2.5 Flow的取消
同樣的 Flow 并沒有提供取消的方法,因為Flow是運行在協(xié)程中的我們可以依賴協(xié)程的運行與取消來實現(xiàn) Flow 的取消。
我們知道默認的flow是冷流 ,flow.collect 才是觸發(fā)點,它標記為 suspend 函數(shù),需要執(zhí)行在協(xié)程中。我們就能把Flow的創(chuàng)建和接收分開,并且取消 flow.collect 的作用域協(xié)程。
runBlocking { val flow = flow { for (i in 1..5) { delay(100) emit(i) } }.onStart { YYLogUtils.w("onStart") }.onCompletion { exception -> YYLogUtils.w("onCompletion: $exception") }.catch { exception -> YYLogUtils.e("catch: $exception") } //Flow的取消依賴協(xié)程的取消 withTimeoutOrNull(210) { flow.collect { YYLogUtils.w("collect:$it") } } }
效果如下:
甚至我們Flow的創(chuàng)建一般都不需要在協(xié)程中,比如我還能這么改
val flow = (1..5).asFlow().onStart { YYLogUtils.w("onStart") }.onCompletion { exception -> YYLogUtils.w("onCompletion: $exception") }.catch { exception -> YYLogUtils.e("catch: $exception") } runBlocking { //Flow的取消依賴協(xié)程的取消 withTimeoutOrNull(210) { flow.collect { YYLogUtils.w("collect:$it") } } }
這里先講到Flow的基本生命周期與異常處理,F(xiàn)low的超時,計時,取消等概念,下面我們看看Flow的創(chuàng)建的方式
三、Flow的創(chuàng)建方式
上面的代碼中我們可以看到Flow的創(chuàng)建有幾種方式,這里總結(jié)一下
flow:創(chuàng)建Flow的操作符。 flowof:構(gòu)造一組數(shù)據(jù)的Flow進行發(fā)送。 asFlow:將其他數(shù)據(jù)轉(zhuǎn)換成Flow,一般是其他數(shù)據(jù)格式向Flow的轉(zhuǎn)換
flow構(gòu)建器 是經(jīng)常被使用的流構(gòu)建器,emit 是 suspend 的需要在協(xié)程中執(zhí)行
flow { for (i in 1..5) { delay(100) emit(i) } }
flowOf構(gòu)建器 可以用于定義能夠發(fā)射固定數(shù)量值的流
flowOf(1,2,3,4,5)
asFlow構(gòu)建器可以將各種集合轉(zhuǎn)為Flow 例如 LongRange IntRange IntArray Array Sequence
(1..10).asFlow()
LiveData同樣可以使用 asFlow 擴展方法轉(zhuǎn)換為flow
val liveData = MutableLiveData<Int>(1) val flow = liveData.asFlow() .onStart { YYLogUtils.w("onStart") }.onCompletion { exception -> YYLogUtils.w("onCompletion: $exception") }.catch { exception -> YYLogUtils.e("catch: $exception") } runBlocking { //Flow的取消依賴協(xié)程的取消 withTimeoutOrNull(210) { flow.collect { YYLogUtils.w("collect:$it") } } }
四、Flow的接收方式
Flow的接收函數(shù)操作符常見的有
collect、數(shù)據(jù)收集操作符,默認的flow是冷流,即當執(zhí)行collect時,上游才會被觸發(fā)執(zhí)行。
- collectIndexed、帶下標的收集操作,如collectIndexed{ index, value -> }
- collectLatest、與collect的區(qū)別:當新值從上游發(fā)出時,如果上個收集還未完成,會取消上個值得收集操作
- toList、toSet等 將flow{}結(jié)果轉(zhuǎn)化為集合。
- single 確保流發(fā)射單個值
- first 僅僅取第一個值
- reduce 如果發(fā)射的是 Int,最終會得到一個 Int,可做累加操作
- fold reduce 的升級版,多了一個初始值
其他的終端操作符,大家看名字都知道怎么使用了,我就不一一演示了,這里看看后面2個 reduce 怎么使用的。
runBlocking { val reduce = flowOf(1, 2, 3).reduce { accumulator, value -> YYLogUtils.w("accumulator:" + accumulator + " value:" + value) accumulator + value } YYLogUtils.w("reduce:" + reduce) }
看Log可以看到 accumulator 是已經(jīng)加過的數(shù)值 value是當前數(shù)值
那么fold和 reduce類似,只是多了一個初始值
runBlocking { val fold = flowOf(1, 2, 3).fold(4) { accumulator, value -> YYLogUtils.w("accumulator: $accumulator value: $value") accumulator + value } println(fold) YYLogUtils.w("fold:" + fold) }
比較難理解的就是這2個了。
五、Flow的轉(zhuǎn)換操作符
我們除了Flow創(chuàng)建于接收之外的一些操作符,另外都是一些中間操作符,要說起flow操作符可太多了,這里只說下常用的幾個操作符吧
5.1 基本操作符
transform transform 它會調(diào)用 Flow 的 collect() 函數(shù),然后構(gòu)建一個新的 Flow. 如果想要繼續(xù)發(fā)射值,需要重新調(diào)用 emit() 函數(shù)。
runBlocking { flowOf(1, 2, 3).transform { emit("transformed $it") }.collect { println("Collect: $it") } }
map實際上就是 transform + emit ,自動封裝了
runBlocking { flowOf(1, 2, 3).map { "mapped $it" }.collect { println("Collect: $it") } }
drop 根據(jù)條件不要表達式內(nèi)的數(shù)據(jù)
runBlocking { flowOf(1, 2, 3).dropWhile { it < 2 }.collect { println("Collect $it") } }
打印值為: 2 3
filter 根據(jù)條件只要表達式內(nèi)的數(shù)據(jù)
runBlocking { flowOf(1, 2, 3).filter { it < 2 }.collect { println("Collect $it") } }
打印值為: 1
debounce 防抖動函數(shù),當用戶在很短的時間內(nèi)輸入 “d”,“dh”,“dhl”,但是用戶可能只對 “dhl” 的搜索結(jié)果感興趣,因此我們必須舍棄 “d”,“dh” 過濾掉不需要的請求,針對于這個情況,我們可以使用 debounce 函數(shù),在指定時間內(nèi)出現(xiàn)多個字符串,debounce 始終只會發(fā)出最后一個字符串
runBlocking { val result = flow { emit("h") emit("i") emit("d") delay(90) emit("dh") emit("dhl") }.debounce(200).toList() println(result) }
打印值為:dhl
distinctUntilChanged 用來過濾掉重復(fù)的請求,只有當前值與最后一個值不同時才將其發(fā)出
runBlocking { val result = flow { emit("d") emit("d") emit("d") emit("d") emit("dhl") emit("dhl") emit("dhl") emit("dhl") }.distinctUntilChanged().toList() println(result) }
打印值為:d, dhl
flatMapLatest當有新值發(fā)送時,會取消掉之前還未轉(zhuǎn)換完成的值
runBlocking { flow { emit("dh") emit("dhl") }.flatMapLatest { value -> flow<String> { delay(100) println("collected $value") // 最后輸出 collected dhl } }.collect() }
場景如下,正在查詢 “dh”,然后用戶輸入 “dhl”
打印值為:dhl
5.2 特殊操作符
這里涉及到上下游數(shù)據(jù)耗時的問題,類似RxJava的背壓的概念,比如發(fā)出的數(shù)據(jù)的速度比較快,但是接受的數(shù)據(jù)的速度比較慢。場景如下
runBlocking { val time = measureTimeMillis { flow { (1..5).forEach { delay(200) println("emit: $it, ${System.currentTimeMillis()}, ${Thread.currentThread().name}") emit(it) } }.collect { delay(500) println("Collect $it, ${System.currentTimeMillis()}, ${Thread.currentThread().name}") } } println("time: $time") }
那么就會阻塞掉,5*700 = 3500
可以看到上面的結(jié)果是串行等待執(zhí)行的,那么我們可以通過下面的操作符讓代碼并發(fā)執(zhí)行優(yōu)化效率
buffer 該運算符會在執(zhí)行期間為流創(chuàng)建一個單獨的協(xié)程。從而實現(xiàn)并發(fā)效果。
runBlocking { val time = measureTimeMillis { flow { (1..5).forEach { delay(200) println("emit: $it, ${System.currentTimeMillis()}, ${Thread.currentThread().name}") emit(it) } }.buffer().collect { delay(500) println("Collect $it, ${System.currentTimeMillis()}, ${Thread.currentThread().name}") } } println("time: $time") }
可以看到是并發(fā)執(zhí)行的。時間是200 + 5*500 = 2700
conflate 發(fā)射數(shù)據(jù)太快,只處理最新發(fā)射的
runBlocking { val time = measureTimeMillis { flow { (1..5).forEach { delay(200) println("emit: $it, ${System.currentTimeMillis()}, ${Thread.currentThread().name}") emit(it) } }.conflate().collect { delay(500) println("Collect $it, ${System.currentTimeMillis()}, ${Thread.currentThread().name}") } } println("time: $time") }
注意會丟失數(shù)據(jù)的,2和4沒有接收到
collectLatest 接收處理太慢,只處理最新接收的,但是注意一下這個不是中間操作符,這個是接收操作符,這里作為對比在這里演示
runBlocking { val time = measureTimeMillis { flow { (1..5).forEach { delay(200) println("emit: $it, ${System.currentTimeMillis()}, ${Thread.currentThread().name}") emit(it) } }.collectLatest { // 消費效率較低 delay(500) println("Collect $it, ${System.currentTimeMillis()}, ${Thread.currentThread().name}") } } println("time: $time") }
這樣就只會接收到最后一個 5
5.3 組合與展平操作符
zip 組合兩個流,將2個Flow合并為1個Flow
runBlocking { flowOf("a", "b", "c").zip(flowOf(1, 2, 3)) { a, b -> a + b //自己定義規(guī)則 }.collect { println(it) } }
打印 a1 b2 c3
combine 可以合并多個不同的 Flow 數(shù)據(jù)流,生成一個新的流。只要其中某個子 Flow 數(shù)據(jù)流有產(chǎn)生新數(shù)據(jù)的時候,就會觸發(fā) combine 操作,進行重新計算,生成一個新的數(shù)據(jù)。
val bannerFlow = MutableStateFlow<String?>(null) val listFlow = MutableStateFlow<String?>(null) lifecycleScope.launch { combine(bannerFlow, listFlow) { banner, list -> val resultList = mutableListOf<String?>() if (banner != null) { resultList.add(banner) } if (list != null) { resultList.add(list) } return@combine resultList }.collect { list -> YYLogUtils.w("list:" + list) } withContext(Dispatchers.Default) { delay(1000) bannerFlow.emit("Banner") } withContext(Dispatchers.Default) { delay(3000) listFlow.emit("list") } }
可以看到只要其中一個Flow更新了數(shù)據(jù)都會刷新
對比zip與combine 同樣的代碼我們對比zip與combine的區(qū)別,注意看Log的打印
val bannerFlow = MutableStateFlow<String?>(null) val listFlow = MutableStateFlow<String?>(null) lifecycleScope.launch { bannerFlow.zip(listFlow){ banner, list -> val resultList = mutableListOf<String?>() if (banner != null) { resultList.add(banner) } if (list != null) { resultList.add(list) } return@zip resultList }.collect { list -> YYLogUtils.w("list:" + list) } } lifecycleScope.launch { withContext(Dispatchers.Default) { delay(1000) bannerFlow.emit("Banner") } withContext(Dispatchers.Default) { delay(3000) listFlow.emit("list") } }
zip只有在都更新了才會觸發(fā),這也是最重要的他們的不同點
merge
merge 操作符想說 活都被你們干完了,我還活不活了!????
其實我們可以理解 同一類型我們可以用 merge ,不同的類型我們用zip、combine 。 zip、combine需要我們自定義拼接方式,而 merge 則是需要兩者類型一樣,直接合并為一個對象。
val bannerFlow = MutableStateFlow<String?>(null) val listFlow = MutableStateFlow<String?>(null) lifecycleScope.launch { listOf(bannerFlow, listFlow).merge().collect { YYLogUtils.w("value:$it") } } lifecycleScope.launch { withContext(Dispatchers.Default) { delay(1000) bannerFlow.emit("Banner") } withContext(Dispatchers.Default) { delay(3000) listFlow.emit("list") } }
打印結(jié)果:
展平操作符 flattenConcat 以順序方式將給定的流展開為單個流
runBlocking { flow { emit(flowOf(1, 2)) emit(flowOf(3,4)) } .flattenConcat().collect { value-> print(value) } }
執(zhí)行結(jié)果:1 2 3 4
flattenMerge 作用和 flattenConcat 一樣,但是可以設(shè)置并發(fā)收集流的數(shù)量
runBlocking { flow { emit(flowOf(1, 2)) emit(flowOf(3,4)) } .flattenMerge(2).collect { value-> print(value) } }
執(zhí)行結(jié)果:1 2 3 4
flatMapConcat通過展平操作,用這兩個元素各自構(gòu)建出一個新的流 it + "一年級", it + "二年級", it + "三年級"。
runBlocking { flowOf("初中", "高中").flatMapConcat { flowOf(it + "一年級", it + "二年級", it + "三年級") }.collect { YYLogUtils.w(it) } }
flatMapMerge
runBlocking { flowOf("初中", "高中").flatMapMerge { flowOf(it + "一年級", it + "二年級", it + "三年級") }.collect { YYLogUtils.w(it) } }
實現(xiàn)的效果是一樣的,和上面的 flatten 效果類似,后綴為 Concat 是串行,后綴為 Merge 的是并行,效率更高。
5.4 切換線程
Flow的切換線程相比協(xié)程的更加簡單,至于使用的方式大家可能都不陌生了,這里我簡單的舉例。
切換線程的操作分一個中間操作符和一個接收操作符,代碼如下:
flow { YYLogUtils.w( "start: ${Thread.currentThread().name}") repeat(3) { delay(1000) this.emit(it) } YYLogUtils.w( "end: ${Thread.currentThread().name}") } .flowOn(Dispatchers.Main) .onEach { YYLogUtils.w( "collect: $it, ${Thread.currentThread().name}") } .launchIn(CoroutineScope(Dispatchers.IO))
打印結(jié)果如下:
launchIn為接收操作符,指明此Flow運行的協(xié)程作用域?qū)ο?,一般我們可以指定作用域,指定運行線程,例如CoroutineScope(Dispatchers.IO)、 MainScope() 、lifecycleScope等。
flowOn則是切換線程,需要指明協(xié)程的上下文對象 ,一般我們用于切換線程。
總結(jié)
不知不覺文章又超長了,內(nèi)容有點太多了,下面總結(jié)一下。
總的來說以上應(yīng)該都是 Flow 使用的基礎(chǔ)了,看下來感覺和RxJava還是很像的吧,我們使用 Kotlin 構(gòu)建項目的過程中,我們真心是不需要 RxJava 了,基本上 Flow 能替代 RxJava 完成我們想要的效果了。畢竟導(dǎo)入一個 RxJava 的庫也不小。
Flow 的使用還是很常見的,同時還有它的一些封裝類 SharedFlow 與 StateFlow 也比較常用。
以上就是Kotlin Flow操作符及基本使用詳解的詳細內(nèi)容,更多關(guān)于Kotlin Flow操作符的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
ERROR/AndroidRuntime(17121)的問題解決
ERROR/AndroidRuntime(17121)的問題解決,需要的朋友可以參考一下2013-05-05Android應(yīng)用實現(xiàn)安裝后自啟動的方法
今天小編就為大家分享一篇Android應(yīng)用實現(xiàn)安裝后自啟動的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-08-08Android DrawerLayout實現(xiàn)側(cè)拉菜單功能
這篇文章主要介紹了Android DrawerLayout實現(xiàn)側(cè)拉菜單功能,非常不錯,具有參考借鑒價值,需要的朋友可以參考下2017-06-06音量控制鍵控制的音頻流(setVolumeControlStream)描述
當開發(fā)多媒體應(yīng)用或者游戲應(yīng)用的時候,需要使用音量控制鍵來設(shè)置程序的音量大小,在Android系統(tǒng)中有多種音頻流,感興趣的朋友可以了解下2013-01-01Android開發(fā)四大組件之實現(xiàn)電話攔截和電話錄音
這篇文章給大家介紹Android開發(fā)四大組件之實現(xiàn)電話攔截和電話錄音,涉及到android四大基本組件在程序中的應(yīng)用,對android四大基本組件感興趣的朋友可以參考下本篇文章2015-10-10Android Studio項目適配AndroidX(Android 9.0)的方法步驟
這篇文章主要介紹了Android Studio項目適配AndroidX(Android 9.0)的方法步驟,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-11-11