欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Kotlin Flow操作符及基本使用詳解

 更新時間:2022年08月31日 14:27:05   作者:newki  
這篇文章主要為大家介紹了Kotlin Flow操作符及基本使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

一、Flow的基本概念

Kotlin 的 Flow 相信大家都或多或少使用過,畢竟目前比較火,目前我把Flow的使用整理了一下。希望和大家所學對照一下,能有所啟發(fā)。

Kotlin 的 Flow 用于流式編程,作用于 Kotlin 的協(xié)程內(nèi)。與協(xié)程的生命周期綁定,當協(xié)程取消的時候,F(xiàn)low 也會取消。

Flow 的操作符和 RxJava 類似,如果之前會 RxJava 那么可以輕松上手,相比 RxJava ,F(xiàn)low 更加的簡單與場景化。

按照 Flow 的數(shù)據(jù)流順序發(fā)送的過程,我們對數(shù)據(jù)流的三個角色交提供方(創(chuàng)建),中介(轉(zhuǎn)換),使用方(接收)。

按照 Flow 流 是否由接收者開始接收觸發(fā)整個流的啟動,我們分為冷流(由接收方啟動)與熱流(不由接收方啟動)。

基本的使用:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
   runBlocking {
        flow {
            for (i in 1..5) {
                delay(100)
                emit(i)
            }
        }.collect {
            println(it)
        }
    }

打印結(jié)果:

基本的使用默認是冷流,從 collect 方法開始發(fā)送數(shù)據(jù),這里簡單的定義一個創(chuàng)建者和一個接收者。

二、Flow的生命周期與異常處理

和 RxJava 一樣 ,我們同樣的可以監(jiān)聽任務(wù)流的生命周期,開始,結(jié)束,與接收

2.1 開始與結(jié)束

      runBlocking {
            flow {
                for (i in 1..5) {
                    delay(100)
                    emit(i)
                }
            }.onStart {
                YYLogUtils.w("onStart")
            }.onCompletion { exception ->
                YYLogUtils.w("onCompletion: $exception")
            }.collect { v ->
                YYLogUtils.w(v.toString())
            }
        }

打印結(jié)果:

2.2 異常的處理

假如如果我們手動的拋一個異常,看看日志的打印順序

        val exceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable ->
            YYLogUtils.e(throwable.message ?: "Unkown Error")
        }
        lifecycleScope.launch(exceptionHandler) {
            flow {
                for (i in 1..5) {
                    delay(100)
                    emit(i)
                    if (i == 3) throw RuntimeException("自定義錯誤")
                }
            }.onStart {
                YYLogUtils.w("onStart")
            }.onCompletion { exception ->
                YYLogUtils.w("onCompletion: $exception")
            }.collect { v ->
                YYLogUtils.w(v.toString())
            }
        }

看到打印結(jié)果:

我們可以在 onCompletion 中打印出錯誤的對象。但是大家看到我這里使用的協(xié)程的上下文來捕獲的異常,因為如果不捕獲這個異常,程序會崩潰,那可不可以不使用協(xié)程的異常捕獲?可以,我們可以使用 Flow 的異常捕獲。

        runBlocking {
            flow {
                for (i in 1..5) {
                    delay(100)
                    emit(i)
                    if (i == 3) throw RuntimeException("自定義錯誤")
                }
            }.onStart {
                YYLogUtils.w("onStart")
            }.onCompletion { exception ->
                YYLogUtils.w("onCompletion: $exception")
            }.catch { exception ->
                YYLogUtils.e("catch: $exception")
            }.collect { v ->
                YYLogUtils.w(v.toString())
            }
        }

可以看到打印的順序是和上面的一樣的,只是換成了Flow來捕獲異常了

2.3 retry的處理

retry 方法可以讓我們程序在錯誤或異常的時候嘗試重新執(zhí)行創(chuàng)建者的操作。

       runBlocking {
            flow {
                for (i in 1..5) {
                    delay(100)
                    emit(i)
                    if (i == 3) throw RuntimeException("自定義錯誤")
                }
            }.retry(2).onStart {
                YYLogUtils.w("onStart")
            }.onCompletion { exception ->
                YYLogUtils.w("onCompletion: $exception")
            }.catch { exception ->
                YYLogUtils.e("catch: $exception")
            }.collect { v ->
                YYLogUtils.w(v.toString())
            }
        }

可以看到打印的結(jié)果:

只會走一次開始的回調(diào)和結(jié)束的回調(diào)。

2.4 超時的處理

measureTimeMillis { } ,withTimeout(xx) { } 這些都是協(xié)程內(nèi)部的快捷處理方法,處理時間相關(guān)。

withTimeout 超時的處理使用的時候協(xié)程的快捷函數(shù),其實跟Flow沒什么關(guān)系,并不是Flow包下面的,所以我們不能使用Flow的異常來處理,因為它是Flow的父親協(xié)程里面的錯誤,我們可以使用try-catch。也可以使用協(xié)程上下文來處理異常。

       val exceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable ->
            YYLogUtils.e(throwable.message ?: "Unkown Error")
        }
        lifecycleScope.launch(exceptionHandler) {
            withTimeout(200) {
                flow {
                    for (i in 1..5) {
                        delay(100)
                        emit(i)
                        if (i == 3) throw RuntimeException("自定義錯誤")
                    }
                }.retry(2).onStart {
                    YYLogUtils.w("onStart")
                }.onCompletion { exception ->
                    YYLogUtils.w("onCompletion: $exception")
                }.catch { exception ->
                    YYLogUtils.e("catch: $exception")
                }.collect { v ->
                    YYLogUtils.w(v.toString())
                }
            }
        }

打印結(jié)果如下:

如果我想計算Flow的耗時,其實就是計算協(xié)程內(nèi)的一個任務(wù)的耗時,跟是不是Flow沒關(guān)系。

       lifecycleScope.launch(exceptionHandler) {
            val time = measureTimeMillis {
                flow {
                    for (i in 1..5) {
                        delay(100)
                        emit(i)
                    }
                }.onStart {
                    YYLogUtils.w("onStart")
                }.onCompletion { exception ->
                    YYLogUtils.w("onCompletion: $exception")
                }.catch { exception ->
                    YYLogUtils.e("catch: $exception")
                }.collect { v ->
                    YYLogUtils.w(v.toString())
                }
            }
            YYLogUtils.w("總耗時time:" + time)
        }

打印的結(jié)果:

2.5 Flow的取消

同樣的 Flow 并沒有提供取消的方法,因為Flow是運行在協(xié)程中的我們可以依賴協(xié)程的運行與取消來實現(xiàn) Flow 的取消。

我們知道默認的flow是冷流 ,flow.collect 才是觸發(fā)點,它標記為 suspend 函數(shù),需要執(zhí)行在協(xié)程中。我們就能把Flow的創(chuàng)建和接收分開,并且取消 flow.collect 的作用域協(xié)程。

       runBlocking {
            val flow = flow {
                for (i in 1..5) {
                    delay(100)
                    emit(i)
                }
            }.onStart {
                YYLogUtils.w("onStart")
            }.onCompletion { exception ->
                YYLogUtils.w("onCompletion: $exception")
            }.catch { exception ->
                YYLogUtils.e("catch: $exception")
            }
            //Flow的取消依賴協(xié)程的取消
            withTimeoutOrNull(210) {
                flow.collect {
                    YYLogUtils.w("collect:$it")
                }
            }
        }

效果如下:

甚至我們Flow的創(chuàng)建一般都不需要在協(xié)程中,比如我還能這么改

     val flow = (1..5).asFlow().onStart {
            YYLogUtils.w("onStart")
        }.onCompletion { exception ->
            YYLogUtils.w("onCompletion: $exception")
        }.catch { exception ->
            YYLogUtils.e("catch: $exception")
        }
        runBlocking {
            //Flow的取消依賴協(xié)程的取消
            withTimeoutOrNull(210) {
                flow.collect {
                    YYLogUtils.w("collect:$it")
                }
            }
        }

這里先講到Flow的基本生命周期與異常處理,F(xiàn)low的超時,計時,取消等概念,下面我們看看Flow的創(chuàng)建的方式

三、Flow的創(chuàng)建方式

上面的代碼中我們可以看到Flow的創(chuàng)建有幾種方式,這里總結(jié)一下

flow:創(chuàng)建Flow的操作符。 flowof:構(gòu)造一組數(shù)據(jù)的Flow進行發(fā)送。 asFlow:將其他數(shù)據(jù)轉(zhuǎn)換成Flow,一般是其他數(shù)據(jù)格式向Flow的轉(zhuǎn)換

flow構(gòu)建器 是經(jīng)常被使用的流構(gòu)建器,emit 是 suspend 的需要在協(xié)程中執(zhí)行

flow {
    for (i in 1..5) {
        delay(100)
        emit(i)
    }
}

flowOf構(gòu)建器 可以用于定義能夠發(fā)射固定數(shù)量值的流

flowOf(1,2,3,4,5)

asFlow構(gòu)建器可以將各種集合轉(zhuǎn)為Flow 例如 LongRange IntRange IntArray Array Sequence

(1..10).asFlow()

LiveData同樣可以使用 asFlow 擴展方法轉(zhuǎn)換為flow

  val liveData = MutableLiveData<Int>(1)
        val flow = liveData.asFlow()
            .onStart {
                YYLogUtils.w("onStart")
            }.onCompletion { exception ->
                YYLogUtils.w("onCompletion: $exception")
            }.catch { exception ->
                YYLogUtils.e("catch: $exception")
            }
        runBlocking {
            //Flow的取消依賴協(xié)程的取消
            withTimeoutOrNull(210) {
                flow.collect {
                    YYLogUtils.w("collect:$it")
                }
            }
        }

四、Flow的接收方式

Flow的接收函數(shù)操作符常見的有

  • collect、數(shù)據(jù)收集操作符,默認的flow是冷流,即當執(zhí)行collect時,上游才會被觸發(fā)執(zhí)行。

  • collectIndexed、帶下標的收集操作,如collectIndexed{ index, value -> }
  • collectLatest、與collect的區(qū)別:當新值從上游發(fā)出時,如果上個收集還未完成,會取消上個值得收集操作
  • toList、toSet等 將flow{}結(jié)果轉(zhuǎn)化為集合。
  • single 確保流發(fā)射單個值
  • first 僅僅取第一個值
  • reduce 如果發(fā)射的是 Int,最終會得到一個 Int,可做累加操作
  • fold reduce 的升級版,多了一個初始值

其他的終端操作符,大家看名字都知道怎么使用了,我就不一一演示了,這里看看后面2個 reduce 怎么使用的。

       runBlocking {
            val reduce = flowOf(1, 2, 3).reduce { accumulator, value ->
                YYLogUtils.w("accumulator:" + accumulator + " value:" + value)
                accumulator + value
            }
            YYLogUtils.w("reduce:" + reduce)
        }

看Log可以看到 accumulator 是已經(jīng)加過的數(shù)值 value是當前數(shù)值

那么fold和 reduce類似,只是多了一個初始值

        runBlocking {
            val fold = flowOf(1, 2, 3).fold(4) { accumulator, value ->
                YYLogUtils.w("accumulator: $accumulator value: $value")
                accumulator + value
            }
            println(fold)
            YYLogUtils.w("fold:" + fold)
        }

比較難理解的就是這2個了。

五、Flow的轉(zhuǎn)換操作符

我們除了Flow創(chuàng)建于接收之外的一些操作符,另外都是一些中間操作符,要說起flow操作符可太多了,這里只說下常用的幾個操作符吧

5.1 基本操作符

transform transform 它會調(diào)用 Flow 的 collect() 函數(shù),然后構(gòu)建一個新的 Flow. 如果想要繼續(xù)發(fā)射值,需要重新調(diào)用 emit() 函數(shù)。

runBlocking {
    flowOf(1, 2, 3).transform {
        emit("transformed $it")
    }.collect {
        println("Collect: $it")
    }
}

map實際上就是 transform + emit ,自動封裝了

runBlocking {
    flowOf(1, 2, 3).map {
        "mapped $it"
    }.collect {
        println("Collect: $it")
    }
}

drop 根據(jù)條件不要表達式內(nèi)的數(shù)據(jù)

runBlocking {
    flowOf(1, 2, 3).dropWhile {
        it < 2
    }.collect {
        println("Collect $it")
    }
}

打印值為: 2 3

filter 根據(jù)條件只要表達式內(nèi)的數(shù)據(jù)

runBlocking {
    flowOf(1, 2, 3).filter {
        it < 2
    }.collect {
        println("Collect $it")
    }
}

打印值為: 1

debounce 防抖動函數(shù),當用戶在很短的時間內(nèi)輸入 “d”,“dh”,“dhl”,但是用戶可能只對 “dhl” 的搜索結(jié)果感興趣,因此我們必須舍棄 “d”,“dh” 過濾掉不需要的請求,針對于這個情況,我們可以使用 debounce 函數(shù),在指定時間內(nèi)出現(xiàn)多個字符串,debounce 始終只會發(fā)出最后一個字符串

runBlocking {
   val result = flow {
    emit("h")
    emit("i")
    emit("d")
    delay(90)
    emit("dh")
    emit("dhl")
}.debounce(200).toList()
println(result)
}

打印值為:dhl

distinctUntilChanged 用來過濾掉重復(fù)的請求,只有當前值與最后一個值不同時才將其發(fā)出

runBlocking {
   val result = flow {
    emit("d")
    emit("d")
    emit("d")
    emit("d")
    emit("dhl")
    emit("dhl")
    emit("dhl")
    emit("dhl")
}.distinctUntilChanged().toList()
println(result)
}

打印值為:d, dhl

flatMapLatest當有新值發(fā)送時,會取消掉之前還未轉(zhuǎn)換完成的值

runBlocking {
flow {
    emit("dh")
    emit("dhl")
}.flatMapLatest { value ->
    flow<String> {
        delay(100)
        println("collected $value") // 最后輸出 collected dhl
    }
}.collect()
}

場景如下,正在查詢 “dh”,然后用戶輸入 “dhl”

打印值為:dhl

5.2 特殊操作符

這里涉及到上下游數(shù)據(jù)耗時的問題,類似RxJava的背壓的概念,比如發(fā)出的數(shù)據(jù)的速度比較快,但是接受的數(shù)據(jù)的速度比較慢。場景如下

        runBlocking {
            val time = measureTimeMillis {
                flow {
                    (1..5).forEach {
                        delay(200)
                        println("emit: $it, ${System.currentTimeMillis()}, ${Thread.currentThread().name}")
                        emit(it)
                    }
                }.collect {
                    delay(500)
                    println("Collect $it, ${System.currentTimeMillis()}, ${Thread.currentThread().name}")
                }
            }
            println("time: $time")
        }

那么就會阻塞掉,5*700 = 3500

可以看到上面的結(jié)果是串行等待執(zhí)行的,那么我們可以通過下面的操作符讓代碼并發(fā)執(zhí)行優(yōu)化效率

buffer 該運算符會在執(zhí)行期間為流創(chuàng)建一個單獨的協(xié)程。從而實現(xiàn)并發(fā)效果。

        runBlocking {
            val time = measureTimeMillis {
                flow {
                    (1..5).forEach {
                        delay(200)
                        println("emit: $it, ${System.currentTimeMillis()}, ${Thread.currentThread().name}")
                        emit(it)
                    }
                }.buffer().collect {
                    delay(500)
                    println("Collect $it, ${System.currentTimeMillis()}, ${Thread.currentThread().name}")
                }
            }
            println("time: $time")
        }

可以看到是并發(fā)執(zhí)行的。時間是200 + 5*500 = 2700

conflate 發(fā)射數(shù)據(jù)太快,只處理最新發(fā)射的

        runBlocking {
            val time = measureTimeMillis {
                flow {
                    (1..5).forEach {
                        delay(200)
                        println("emit: $it, ${System.currentTimeMillis()}, ${Thread.currentThread().name}")
                        emit(it)
                    }
                }.conflate().collect {
                    delay(500)
                    println("Collect $it, ${System.currentTimeMillis()}, ${Thread.currentThread().name}")
                }
            }
            println("time: $time")
        }

注意會丟失數(shù)據(jù)的,2和4沒有接收到

collectLatest 接收處理太慢,只處理最新接收的,但是注意一下這個不是中間操作符,這個是接收操作符,這里作為對比在這里演示

        runBlocking {
            val time = measureTimeMillis {
                flow {
                    (1..5).forEach {
                        delay(200)
                        println("emit: $it, ${System.currentTimeMillis()}, ${Thread.currentThread().name}")
                        emit(it)
                    }
                }.collectLatest {
                    // 消費效率較低
                    delay(500)
                    println("Collect $it, ${System.currentTimeMillis()}, ${Thread.currentThread().name}")
                }
            }
            println("time: $time")
        }

這樣就只會接收到最后一個 5

5.3 組合與展平操作符

zip 組合兩個流,將2個Flow合并為1個Flow

runBlocking {
    flowOf("a", "b", "c").zip(flowOf(1, 2, 3)) { a, b -&gt;
        a + b  //自己定義規(guī)則
    }.collect {
        println(it)
    }
}

打印 a1 b2 c3

combine 可以合并多個不同的 Flow 數(shù)據(jù)流,生成一個新的流。只要其中某個子 Flow 數(shù)據(jù)流有產(chǎn)生新數(shù)據(jù)的時候,就會觸發(fā) combine 操作,進行重新計算,生成一個新的數(shù)據(jù)。

      val bannerFlow = MutableStateFlow<String?>(null)
        val listFlow = MutableStateFlow<String?>(null)
        lifecycleScope.launch {
            combine(bannerFlow, listFlow) { banner, list ->
                val resultList = mutableListOf<String?>()
                if (banner != null) {
                    resultList.add(banner)
                }
                if (list != null) {
                    resultList.add(list)
                }
                return@combine resultList
            }.collect { list ->
                YYLogUtils.w("list:" + list)
            }
            withContext(Dispatchers.Default) {
                delay(1000)
                bannerFlow.emit("Banner")
            }
            withContext(Dispatchers.Default) {
                delay(3000)
                listFlow.emit("list")
            }
        }

可以看到只要其中一個Flow更新了數(shù)據(jù)都會刷新

對比zip與combine 同樣的代碼我們對比zip與combine的區(qū)別,注意看Log的打印

        val bannerFlow = MutableStateFlow<String?>(null)
        val listFlow = MutableStateFlow<String?>(null)
        lifecycleScope.launch {
            bannerFlow.zip(listFlow){ banner, list ->
                val resultList = mutableListOf<String?>()
                if (banner != null) {
                    resultList.add(banner)
                }
                if (list != null) {
                    resultList.add(list)
                }
                return@zip resultList
            }.collect { list ->
                YYLogUtils.w("list:" + list)
            }
        }
        lifecycleScope.launch {
            withContext(Dispatchers.Default) {
                delay(1000)
                bannerFlow.emit("Banner")
            }
            withContext(Dispatchers.Default) {
                delay(3000)
                listFlow.emit("list")
            }
        }

zip只有在都更新了才會觸發(fā),這也是最重要的他們的不同點

merge

merge 操作符想說 活都被你們干完了,我還活不活了!????

其實我們可以理解 同一類型我們可以用 merge ,不同的類型我們用zip、combine 。 zip、combine需要我們自定義拼接方式,而 merge 則是需要兩者類型一樣,直接合并為一個對象。

       val bannerFlow = MutableStateFlow<String?>(null)
        val listFlow = MutableStateFlow<String?>(null)
        lifecycleScope.launch {
            listOf(bannerFlow, listFlow).merge().collect {
                YYLogUtils.w("value:$it")
            }
        }
        lifecycleScope.launch {
            withContext(Dispatchers.Default) {
                delay(1000)
                bannerFlow.emit("Banner")
            }
            withContext(Dispatchers.Default) {
                delay(3000)
                listFlow.emit("list")
            }
        }

打印結(jié)果:

展平操作符 flattenConcat 以順序方式將給定的流展開為單個流

    runBlocking {
        flow {
            emit(flowOf(1, 2))
            emit(flowOf(3,4))
        } .flattenConcat().collect { value->
            print(value)
        }
    }

執(zhí)行結(jié)果:1 2 3 4

flattenMerge 作用和 flattenConcat 一樣,但是可以設(shè)置并發(fā)收集流的數(shù)量

    runBlocking {
        flow {
            emit(flowOf(1, 2))
            emit(flowOf(3,4))
        } .flattenMerge(2).collect { value->
            print(value)
        }
    }

執(zhí)行結(jié)果:1 2 3 4

flatMapConcat通過展平操作,用這兩個元素各自構(gòu)建出一個新的流 it + "一年級", it + "二年級", it + "三年級"。

runBlocking {
    flowOf("初中", "高中").flatMapConcat {
        flowOf(it + "一年級", it + "二年級", it + "三年級")
    }.collect {
        YYLogUtils.w(it)
    }
}

flatMapMerge

runBlocking {
    flowOf("初中", "高中").flatMapMerge {
        flowOf(it + "一年級", it + "二年級", it + "三年級")
    }.collect {
        YYLogUtils.w(it)
    }
}

實現(xiàn)的效果是一樣的,和上面的 flatten 效果類似,后綴為 Concat 是串行,后綴為 Merge 的是并行,效率更高。

5.4 切換線程

Flow的切換線程相比協(xié)程的更加簡單,至于使用的方式大家可能都不陌生了,這里我簡單的舉例。

切換線程的操作分一個中間操作符和一個接收操作符,代碼如下:

        flow {
            YYLogUtils.w( "start: ${Thread.currentThread().name}")
            repeat(3) {
                delay(1000)
                this.emit(it)
            }
            YYLogUtils.w( "end: ${Thread.currentThread().name}")
        }
            .flowOn(Dispatchers.Main)
            .onEach {
                YYLogUtils.w( "collect: $it, ${Thread.currentThread().name}")
            }
            .launchIn(CoroutineScope(Dispatchers.IO))

打印結(jié)果如下:

launchIn為接收操作符,指明此Flow運行的協(xié)程作用域?qū)ο?,一般我們可以指定作用域,指定運行線程,例如CoroutineScope(Dispatchers.IO)、 MainScope() 、lifecycleScope等。

flowOn則是切換線程,需要指明協(xié)程的上下文對象 ,一般我們用于切換線程。

總結(jié)

不知不覺文章又超長了,內(nèi)容有點太多了,下面總結(jié)一下。

總的來說以上應(yīng)該都是 Flow 使用的基礎(chǔ)了,看下來感覺和RxJava還是很像的吧,我們使用 Kotlin 構(gòu)建項目的過程中,我們真心是不需要 RxJava 了,基本上 Flow 能替代 RxJava 完成我們想要的效果了。畢竟導(dǎo)入一個 RxJava 的庫也不小。

Flow 的使用還是很常見的,同時還有它的一些封裝類 SharedFlow 與 StateFlow 也比較常用。

以上就是Kotlin Flow操作符及基本使用詳解的詳細內(nèi)容,更多關(guān)于Kotlin Flow操作符的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評論