Kotlin Flow操作符及基本使用詳解
一、Flow的基本概念
Kotlin 的 Flow 相信大家都或多或少使用過,畢竟目前比較火,目前我把Flow的使用整理了一下。希望和大家所學(xué)對照一下,能有所啟發(fā)。
Kotlin 的 Flow 用于流式編程,作用于 Kotlin 的協(xié)程內(nèi)。與協(xié)程的生命周期綁定,當(dāng)協(xié)程取消的時候,F(xiàn)low 也會取消。
Flow 的操作符和 RxJava 類似,如果之前會 RxJava 那么可以輕松上手,相比 RxJava ,F(xiàn)low 更加的簡單與場景化。
按照 Flow 的數(shù)據(jù)流順序發(fā)送的過程,我們對數(shù)據(jù)流的三個角色交提供方(創(chuàng)建),中介(轉(zhuǎn)換),使用方(接收)。
按照 Flow 流 是否由接收者開始接收觸發(fā)整個流的啟動,我們分為冷流(由接收方啟動)與熱流(不由接收方啟動)。
基本的使用:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
runBlocking {
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.collect {
println(it)
}
}
打印結(jié)果:

基本的使用默認(rèn)是冷流,從 collect 方法開始發(fā)送數(shù)據(jù),這里簡單的定義一個創(chuàng)建者和一個接收者。
二、Flow的生命周期與異常處理
和 RxJava 一樣 ,我們同樣的可以監(jiān)聽任務(wù)流的生命周期,開始,結(jié)束,與接收
2.1 開始與結(jié)束
runBlocking {
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.onStart {
YYLogUtils.w("onStart")
}.onCompletion { exception ->
YYLogUtils.w("onCompletion: $exception")
}.collect { v ->
YYLogUtils.w(v.toString())
}
}
打印結(jié)果:

2.2 異常的處理
假如如果我們手動的拋一個異常,看看日志的打印順序
val exceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable ->
YYLogUtils.e(throwable.message ?: "Unkown Error")
}
lifecycleScope.launch(exceptionHandler) {
flow {
for (i in 1..5) {
delay(100)
emit(i)
if (i == 3) throw RuntimeException("自定義錯誤")
}
}.onStart {
YYLogUtils.w("onStart")
}.onCompletion { exception ->
YYLogUtils.w("onCompletion: $exception")
}.collect { v ->
YYLogUtils.w(v.toString())
}
}
看到打印結(jié)果:

我們可以在 onCompletion 中打印出錯誤的對象。但是大家看到我這里使用的協(xié)程的上下文來捕獲的異常,因為如果不捕獲這個異常,程序會崩潰,那可不可以不使用協(xié)程的異常捕獲?可以,我們可以使用 Flow 的異常捕獲。
runBlocking {
flow {
for (i in 1..5) {
delay(100)
emit(i)
if (i == 3) throw RuntimeException("自定義錯誤")
}
}.onStart {
YYLogUtils.w("onStart")
}.onCompletion { exception ->
YYLogUtils.w("onCompletion: $exception")
}.catch { exception ->
YYLogUtils.e("catch: $exception")
}.collect { v ->
YYLogUtils.w(v.toString())
}
}
可以看到打印的順序是和上面的一樣的,只是換成了Flow來捕獲異常了

2.3 retry的處理
retry 方法可以讓我們程序在錯誤或異常的時候嘗試重新執(zhí)行創(chuàng)建者的操作。
runBlocking {
flow {
for (i in 1..5) {
delay(100)
emit(i)
if (i == 3) throw RuntimeException("自定義錯誤")
}
}.retry(2).onStart {
YYLogUtils.w("onStart")
}.onCompletion { exception ->
YYLogUtils.w("onCompletion: $exception")
}.catch { exception ->
YYLogUtils.e("catch: $exception")
}.collect { v ->
YYLogUtils.w(v.toString())
}
}
可以看到打印的結(jié)果:

只會走一次開始的回調(diào)和結(jié)束的回調(diào)。
2.4 超時的處理
measureTimeMillis { } ,withTimeout(xx) { } 這些都是協(xié)程內(nèi)部的快捷處理方法,處理時間相關(guān)。
withTimeout 超時的處理使用的時候協(xié)程的快捷函數(shù),其實跟Flow沒什么關(guān)系,并不是Flow包下面的,所以我們不能使用Flow的異常來處理,因為它是Flow的父親協(xié)程里面的錯誤,我們可以使用try-catch。也可以使用協(xié)程上下文來處理異常。
val exceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable ->
YYLogUtils.e(throwable.message ?: "Unkown Error")
}
lifecycleScope.launch(exceptionHandler) {
withTimeout(200) {
flow {
for (i in 1..5) {
delay(100)
emit(i)
if (i == 3) throw RuntimeException("自定義錯誤")
}
}.retry(2).onStart {
YYLogUtils.w("onStart")
}.onCompletion { exception ->
YYLogUtils.w("onCompletion: $exception")
}.catch { exception ->
YYLogUtils.e("catch: $exception")
}.collect { v ->
YYLogUtils.w(v.toString())
}
}
}
打印結(jié)果如下:

如果我想計算Flow的耗時,其實就是計算協(xié)程內(nèi)的一個任務(wù)的耗時,跟是不是Flow沒關(guān)系。
lifecycleScope.launch(exceptionHandler) {
val time = measureTimeMillis {
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.onStart {
YYLogUtils.w("onStart")
}.onCompletion { exception ->
YYLogUtils.w("onCompletion: $exception")
}.catch { exception ->
YYLogUtils.e("catch: $exception")
}.collect { v ->
YYLogUtils.w(v.toString())
}
}
YYLogUtils.w("總耗時time:" + time)
}
打印的結(jié)果:

2.5 Flow的取消
同樣的 Flow 并沒有提供取消的方法,因為Flow是運行在協(xié)程中的我們可以依賴協(xié)程的運行與取消來實現(xiàn) Flow 的取消。
我們知道默認(rèn)的flow是冷流 ,flow.collect 才是觸發(fā)點,它標(biāo)記為 suspend 函數(shù),需要執(zhí)行在協(xié)程中。我們就能把Flow的創(chuàng)建和接收分開,并且取消 flow.collect 的作用域協(xié)程。
runBlocking {
val flow = flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.onStart {
YYLogUtils.w("onStart")
}.onCompletion { exception ->
YYLogUtils.w("onCompletion: $exception")
}.catch { exception ->
YYLogUtils.e("catch: $exception")
}
//Flow的取消依賴協(xié)程的取消
withTimeoutOrNull(210) {
flow.collect {
YYLogUtils.w("collect:$it")
}
}
}
效果如下:

甚至我們Flow的創(chuàng)建一般都不需要在協(xié)程中,比如我還能這么改
val flow = (1..5).asFlow().onStart {
YYLogUtils.w("onStart")
}.onCompletion { exception ->
YYLogUtils.w("onCompletion: $exception")
}.catch { exception ->
YYLogUtils.e("catch: $exception")
}
runBlocking {
//Flow的取消依賴協(xié)程的取消
withTimeoutOrNull(210) {
flow.collect {
YYLogUtils.w("collect:$it")
}
}
}
這里先講到Flow的基本生命周期與異常處理,F(xiàn)low的超時,計時,取消等概念,下面我們看看Flow的創(chuàng)建的方式
三、Flow的創(chuàng)建方式
上面的代碼中我們可以看到Flow的創(chuàng)建有幾種方式,這里總結(jié)一下
flow:創(chuàng)建Flow的操作符。 flowof:構(gòu)造一組數(shù)據(jù)的Flow進行發(fā)送。 asFlow:將其他數(shù)據(jù)轉(zhuǎn)換成Flow,一般是其他數(shù)據(jù)格式向Flow的轉(zhuǎn)換
flow構(gòu)建器 是經(jīng)常被使用的流構(gòu)建器,emit 是 suspend 的需要在協(xié)程中執(zhí)行
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}
flowOf構(gòu)建器 可以用于定義能夠發(fā)射固定數(shù)量值的流
flowOf(1,2,3,4,5)
asFlow構(gòu)建器可以將各種集合轉(zhuǎn)為Flow 例如 LongRange IntRange IntArray Array Sequence
(1..10).asFlow()
LiveData同樣可以使用 asFlow 擴展方法轉(zhuǎn)換為flow
val liveData = MutableLiveData<Int>(1)
val flow = liveData.asFlow()
.onStart {
YYLogUtils.w("onStart")
}.onCompletion { exception ->
YYLogUtils.w("onCompletion: $exception")
}.catch { exception ->
YYLogUtils.e("catch: $exception")
}
runBlocking {
//Flow的取消依賴協(xié)程的取消
withTimeoutOrNull(210) {
flow.collect {
YYLogUtils.w("collect:$it")
}
}
}

四、Flow的接收方式
Flow的接收函數(shù)操作符常見的有
collect、數(shù)據(jù)收集操作符,默認(rèn)的flow是冷流,即當(dāng)執(zhí)行collect時,上游才會被觸發(fā)執(zhí)行。
- collectIndexed、帶下標(biāo)的收集操作,如collectIndexed{ index, value -> }
- collectLatest、與collect的區(qū)別:當(dāng)新值從上游發(fā)出時,如果上個收集還未完成,會取消上個值得收集操作
- toList、toSet等 將flow{}結(jié)果轉(zhuǎn)化為集合。
- single 確保流發(fā)射單個值
- first 僅僅取第一個值
- reduce 如果發(fā)射的是 Int,最終會得到一個 Int,可做累加操作
- fold reduce 的升級版,多了一個初始值
其他的終端操作符,大家看名字都知道怎么使用了,我就不一一演示了,這里看看后面2個 reduce 怎么使用的。
runBlocking {
val reduce = flowOf(1, 2, 3).reduce { accumulator, value ->
YYLogUtils.w("accumulator:" + accumulator + " value:" + value)
accumulator + value
}
YYLogUtils.w("reduce:" + reduce)
}
看Log可以看到 accumulator 是已經(jīng)加過的數(shù)值 value是當(dāng)前數(shù)值

那么fold和 reduce類似,只是多了一個初始值
runBlocking {
val fold = flowOf(1, 2, 3).fold(4) { accumulator, value ->
YYLogUtils.w("accumulator: $accumulator value: $value")
accumulator + value
}
println(fold)
YYLogUtils.w("fold:" + fold)
}

比較難理解的就是這2個了。
五、Flow的轉(zhuǎn)換操作符
我們除了Flow創(chuàng)建于接收之外的一些操作符,另外都是一些中間操作符,要說起flow操作符可太多了,這里只說下常用的幾個操作符吧
5.1 基本操作符
transform transform 它會調(diào)用 Flow 的 collect() 函數(shù),然后構(gòu)建一個新的 Flow. 如果想要繼續(xù)發(fā)射值,需要重新調(diào)用 emit() 函數(shù)。
runBlocking {
flowOf(1, 2, 3).transform {
emit("transformed $it")
}.collect {
println("Collect: $it")
}
}
map實際上就是 transform + emit ,自動封裝了
runBlocking {
flowOf(1, 2, 3).map {
"mapped $it"
}.collect {
println("Collect: $it")
}
}
drop 根據(jù)條件不要表達式內(nèi)的數(shù)據(jù)
runBlocking {
flowOf(1, 2, 3).dropWhile {
it < 2
}.collect {
println("Collect $it")
}
}
打印值為: 2 3
filter 根據(jù)條件只要表達式內(nèi)的數(shù)據(jù)
runBlocking {
flowOf(1, 2, 3).filter {
it < 2
}.collect {
println("Collect $it")
}
}
打印值為: 1
debounce 防抖動函數(shù),當(dāng)用戶在很短的時間內(nèi)輸入 “d”,“dh”,“dhl”,但是用戶可能只對 “dhl” 的搜索結(jié)果感興趣,因此我們必須舍棄 “d”,“dh” 過濾掉不需要的請求,針對于這個情況,我們可以使用 debounce 函數(shù),在指定時間內(nèi)出現(xiàn)多個字符串,debounce 始終只會發(fā)出最后一個字符串
runBlocking {
val result = flow {
emit("h")
emit("i")
emit("d")
delay(90)
emit("dh")
emit("dhl")
}.debounce(200).toList()
println(result)
}
打印值為:dhl
distinctUntilChanged 用來過濾掉重復(fù)的請求,只有當(dāng)前值與最后一個值不同時才將其發(fā)出
runBlocking {
val result = flow {
emit("d")
emit("d")
emit("d")
emit("d")
emit("dhl")
emit("dhl")
emit("dhl")
emit("dhl")
}.distinctUntilChanged().toList()
println(result)
}
打印值為:d, dhl
flatMapLatest當(dāng)有新值發(fā)送時,會取消掉之前還未轉(zhuǎn)換完成的值
runBlocking {
flow {
emit("dh")
emit("dhl")
}.flatMapLatest { value ->
flow<String> {
delay(100)
println("collected $value") // 最后輸出 collected dhl
}
}.collect()
}
場景如下,正在查詢 “dh”,然后用戶輸入 “dhl”
打印值為:dhl
5.2 特殊操作符
這里涉及到上下游數(shù)據(jù)耗時的問題,類似RxJava的背壓的概念,比如發(fā)出的數(shù)據(jù)的速度比較快,但是接受的數(shù)據(jù)的速度比較慢。場景如下
runBlocking {
val time = measureTimeMillis {
flow {
(1..5).forEach {
delay(200)
println("emit: $it, ${System.currentTimeMillis()}, ${Thread.currentThread().name}")
emit(it)
}
}.collect {
delay(500)
println("Collect $it, ${System.currentTimeMillis()}, ${Thread.currentThread().name}")
}
}
println("time: $time")
}
那么就會阻塞掉,5*700 = 3500

可以看到上面的結(jié)果是串行等待執(zhí)行的,那么我們可以通過下面的操作符讓代碼并發(fā)執(zhí)行優(yōu)化效率
buffer 該運算符會在執(zhí)行期間為流創(chuàng)建一個單獨的協(xié)程。從而實現(xiàn)并發(fā)效果。
runBlocking {
val time = measureTimeMillis {
flow {
(1..5).forEach {
delay(200)
println("emit: $it, ${System.currentTimeMillis()}, ${Thread.currentThread().name}")
emit(it)
}
}.buffer().collect {
delay(500)
println("Collect $it, ${System.currentTimeMillis()}, ${Thread.currentThread().name}")
}
}
println("time: $time")
}
可以看到是并發(fā)執(zhí)行的。時間是200 + 5*500 = 2700

conflate 發(fā)射數(shù)據(jù)太快,只處理最新發(fā)射的
runBlocking {
val time = measureTimeMillis {
flow {
(1..5).forEach {
delay(200)
println("emit: $it, ${System.currentTimeMillis()}, ${Thread.currentThread().name}")
emit(it)
}
}.conflate().collect {
delay(500)
println("Collect $it, ${System.currentTimeMillis()}, ${Thread.currentThread().name}")
}
}
println("time: $time")
}
注意會丟失數(shù)據(jù)的,2和4沒有接收到

collectLatest 接收處理太慢,只處理最新接收的,但是注意一下這個不是中間操作符,這個是接收操作符,這里作為對比在這里演示
runBlocking {
val time = measureTimeMillis {
flow {
(1..5).forEach {
delay(200)
println("emit: $it, ${System.currentTimeMillis()}, ${Thread.currentThread().name}")
emit(it)
}
}.collectLatest {
// 消費效率較低
delay(500)
println("Collect $it, ${System.currentTimeMillis()}, ${Thread.currentThread().name}")
}
}
println("time: $time")
}
這樣就只會接收到最后一個 5

5.3 組合與展平操作符
zip 組合兩個流,將2個Flow合并為1個Flow
runBlocking {
flowOf("a", "b", "c").zip(flowOf(1, 2, 3)) { a, b ->
a + b //自己定義規(guī)則
}.collect {
println(it)
}
}
打印 a1 b2 c3
combine 可以合并多個不同的 Flow 數(shù)據(jù)流,生成一個新的流。只要其中某個子 Flow 數(shù)據(jù)流有產(chǎn)生新數(shù)據(jù)的時候,就會觸發(fā) combine 操作,進行重新計算,生成一個新的數(shù)據(jù)。
val bannerFlow = MutableStateFlow<String?>(null)
val listFlow = MutableStateFlow<String?>(null)
lifecycleScope.launch {
combine(bannerFlow, listFlow) { banner, list ->
val resultList = mutableListOf<String?>()
if (banner != null) {
resultList.add(banner)
}
if (list != null) {
resultList.add(list)
}
return@combine resultList
}.collect { list ->
YYLogUtils.w("list:" + list)
}
withContext(Dispatchers.Default) {
delay(1000)
bannerFlow.emit("Banner")
}
withContext(Dispatchers.Default) {
delay(3000)
listFlow.emit("list")
}
}
可以看到只要其中一個Flow更新了數(shù)據(jù)都會刷新

對比zip與combine 同樣的代碼我們對比zip與combine的區(qū)別,注意看Log的打印
val bannerFlow = MutableStateFlow<String?>(null)
val listFlow = MutableStateFlow<String?>(null)
lifecycleScope.launch {
bannerFlow.zip(listFlow){ banner, list ->
val resultList = mutableListOf<String?>()
if (banner != null) {
resultList.add(banner)
}
if (list != null) {
resultList.add(list)
}
return@zip resultList
}.collect { list ->
YYLogUtils.w("list:" + list)
}
}
lifecycleScope.launch {
withContext(Dispatchers.Default) {
delay(1000)
bannerFlow.emit("Banner")
}
withContext(Dispatchers.Default) {
delay(3000)
listFlow.emit("list")
}
}
zip只有在都更新了才會觸發(fā),這也是最重要的他們的不同點

merge
merge 操作符想說 活都被你們干完了,我還活不活了!????
其實我們可以理解 同一類型我們可以用 merge ,不同的類型我們用zip、combine 。 zip、combine需要我們自定義拼接方式,而 merge 則是需要兩者類型一樣,直接合并為一個對象。
val bannerFlow = MutableStateFlow<String?>(null)
val listFlow = MutableStateFlow<String?>(null)
lifecycleScope.launch {
listOf(bannerFlow, listFlow).merge().collect {
YYLogUtils.w("value:$it")
}
}
lifecycleScope.launch {
withContext(Dispatchers.Default) {
delay(1000)
bannerFlow.emit("Banner")
}
withContext(Dispatchers.Default) {
delay(3000)
listFlow.emit("list")
}
}
打印結(jié)果:

展平操作符 flattenConcat 以順序方式將給定的流展開為單個流
runBlocking {
flow {
emit(flowOf(1, 2))
emit(flowOf(3,4))
} .flattenConcat().collect { value->
print(value)
}
}
執(zhí)行結(jié)果:1 2 3 4
flattenMerge 作用和 flattenConcat 一樣,但是可以設(shè)置并發(fā)收集流的數(shù)量
runBlocking {
flow {
emit(flowOf(1, 2))
emit(flowOf(3,4))
} .flattenMerge(2).collect { value->
print(value)
}
}
執(zhí)行結(jié)果:1 2 3 4
flatMapConcat通過展平操作,用這兩個元素各自構(gòu)建出一個新的流 it + "一年級", it + "二年級", it + "三年級"。
runBlocking {
flowOf("初中", "高中").flatMapConcat {
flowOf(it + "一年級", it + "二年級", it + "三年級")
}.collect {
YYLogUtils.w(it)
}
}

flatMapMerge
runBlocking {
flowOf("初中", "高中").flatMapMerge {
flowOf(it + "一年級", it + "二年級", it + "三年級")
}.collect {
YYLogUtils.w(it)
}
}

實現(xiàn)的效果是一樣的,和上面的 flatten 效果類似,后綴為 Concat 是串行,后綴為 Merge 的是并行,效率更高。
5.4 切換線程
Flow的切換線程相比協(xié)程的更加簡單,至于使用的方式大家可能都不陌生了,這里我簡單的舉例。
切換線程的操作分一個中間操作符和一個接收操作符,代碼如下:
flow {
YYLogUtils.w( "start: ${Thread.currentThread().name}")
repeat(3) {
delay(1000)
this.emit(it)
}
YYLogUtils.w( "end: ${Thread.currentThread().name}")
}
.flowOn(Dispatchers.Main)
.onEach {
YYLogUtils.w( "collect: $it, ${Thread.currentThread().name}")
}
.launchIn(CoroutineScope(Dispatchers.IO))
打印結(jié)果如下:

launchIn為接收操作符,指明此Flow運行的協(xié)程作用域?qū)ο?,一般我們可以指定作用域,指定運行線程,例如CoroutineScope(Dispatchers.IO)、 MainScope() 、lifecycleScope等。
flowOn則是切換線程,需要指明協(xié)程的上下文對象 ,一般我們用于切換線程。
總結(jié)
不知不覺文章又超長了,內(nèi)容有點太多了,下面總結(jié)一下。
總的來說以上應(yīng)該都是 Flow 使用的基礎(chǔ)了,看下來感覺和RxJava還是很像的吧,我們使用 Kotlin 構(gòu)建項目的過程中,我們真心是不需要 RxJava 了,基本上 Flow 能替代 RxJava 完成我們想要的效果了。畢竟導(dǎo)入一個 RxJava 的庫也不小。
Flow 的使用還是很常見的,同時還有它的一些封裝類 SharedFlow 與 StateFlow 也比較常用。
以上就是Kotlin Flow操作符及基本使用詳解的詳細內(nèi)容,更多關(guān)于Kotlin Flow操作符的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
ERROR/AndroidRuntime(17121)的問題解決
ERROR/AndroidRuntime(17121)的問題解決,需要的朋友可以參考一下2013-05-05
Android應(yīng)用實現(xiàn)安裝后自啟動的方法
今天小編就為大家分享一篇Android應(yīng)用實現(xiàn)安裝后自啟動的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-08-08
Android DrawerLayout實現(xiàn)側(cè)拉菜單功能
這篇文章主要介紹了Android DrawerLayout實現(xiàn)側(cè)拉菜單功能,非常不錯,具有參考借鑒價值,需要的朋友可以參考下2017-06-06
音量控制鍵控制的音頻流(setVolumeControlStream)描述
當(dāng)開發(fā)多媒體應(yīng)用或者游戲應(yīng)用的時候,需要使用音量控制鍵來設(shè)置程序的音量大小,在Android系統(tǒng)中有多種音頻流,感興趣的朋友可以了解下2013-01-01
Android開發(fā)四大組件之實現(xiàn)電話攔截和電話錄音
這篇文章給大家介紹Android開發(fā)四大組件之實現(xiàn)電話攔截和電話錄音,涉及到android四大基本組件在程序中的應(yīng)用,對android四大基本組件感興趣的朋友可以參考下本篇文章2015-10-10
Android Studio項目適配AndroidX(Android 9.0)的方法步驟
這篇文章主要介紹了Android Studio項目適配AndroidX(Android 9.0)的方法步驟,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-11-11

