Kotlin協(xié)程之Flow的使用與原理解析
Flow的定義和特點
Flow是一種數(shù)據(jù)流,可以用于協(xié)程間的通信,具有冷、懶、響應(yīng)式等特點。Flow是基于協(xié)程構(gòu)建的,可以提供多個值。Flow在概念上類似于一個數(shù)據(jù)序列,但它可以使用掛起函數(shù)來異步地產(chǎn)生和消費值。這意味著,例如,F(xiàn)low可以安全地發(fā)起網(wǎng)絡(luò)請求來產(chǎn)生下一個值,而不會阻塞主線程。
Flow的特點主要有以下幾點:
- 冷:Flow是冷的,也就是說,它不會在沒有收集器的情況下開始執(zhí)行。只有當(dāng)有收集器訂閱了Flow時,它才會開始發(fā)射值。這與熱的數(shù)據(jù)流不同,熱的數(shù)據(jù)流會在沒有收集器的情況下也產(chǎn)生值。
- 懶:Flow是懶的,也就是說,它只會在需要時才計算值。每個收集器都會觸發(fā)Flow的重新計算,而不是共享之前計算的結(jié)果。這與惰性序列類似,惰性序列只會在迭代時才計算元素。
- 響應(yīng)式:Flow是響應(yīng)式的,也就是說,它可以根據(jù)收集器的需求來調(diào)整發(fā)射速度。如果收集器不能及時處理值,F(xiàn)low可以暫?;蛉∠l(fā)射。這與反應(yīng)式流規(guī)范(Reactive Streams Specification)中定義的背壓(backpressure)機制類似。
Flow的創(chuàng)建和操作
Flow可以通過多種方式創(chuàng)建,最簡單的方式是使用flow{}構(gòu)建器函數(shù),在其中使用emit函數(shù)手動發(fā)射值。例如,下面的代碼創(chuàng)建了一個發(fā)射1到3的整數(shù)值的Flow:
// 創(chuàng)建一個Flow<Int> fun simple(): Flow<Int> = flow { // 發(fā)射1到3 for (i in 1..3) { emit(i) // 發(fā)射下一個值 } }
除了flow{}構(gòu)建器函數(shù)外,還有一些其他方式可以創(chuàng)建Flow,例如:
- 使用flowOf()函數(shù)創(chuàng)建一個包含固定元素的Flow。
- 使用asFlow()擴展函數(shù)將各種集合和序列轉(zhuǎn)換為Flow。
- 使用channelFlow()構(gòu)建器函數(shù)創(chuàng)建一個基于通道(Channel)的Flow。
- 使用callbackFlow()構(gòu)建器函數(shù)創(chuàng)建一個基于回調(diào)(Callback)的Flow。
創(chuàng)建好Flow后,可以使用各種操作符對數(shù)據(jù)進(jìn)行處理。操作符分為兩類:
- 中間操作符:中間操作符用于對數(shù)據(jù)進(jìn)行轉(zhuǎn)換、過濾、組合等操作,但不會終止流。中間操作符返回一個新的Flow,可以鏈?zhǔn)秸{(diào)用多個中間操作符。例如,filter、map、take等操作符都是中間操作符。
- 終止操作符:終止操作符用于對數(shù)據(jù)進(jìn)行收集、聚合、統(tǒng)計等操作,并終止流。終止操作符返回一個非Flow類型的結(jié)果,并觸發(fā)流的執(zhí)行。例如,collect、first、toList等操作符都是終止操作符。
例如,下面的代碼使用了map和filter兩個中間操作符對simple()函數(shù)返回的Flow進(jìn)行了轉(zhuǎn)換和過濾,并使用了collect終止操作符對結(jié)果進(jìn)行了打印:
// 對simple()返回的Flow進(jìn)行處理 fun main() = runBlocking<Unit> { // 啟動并發(fā)協(xié)程以驗證主線程并未阻塞 launch { for (k in 1..3) { println("I'm not blocked $k") delay(100) } } // 收集這個流 simple() .map { it * it } // 數(shù)字求平方 .filter { it % 2 == 0 } // 過濾偶數(shù) .collect { value -> // 終止操作符 println(value) // 打印結(jié)果 } }
輸出結(jié)果為:
I'm not blocked 1
4
I'm not blocked 2
I'm not blocked 3
可以看到,F(xiàn)low的執(zhí)行是在協(xié)程中進(jìn)行的,不會阻塞主線程。同時,F(xiàn)low的操作符也是掛起函數(shù),可以在其中進(jìn)行異步操作,例如:
// 對simple()返回的Flow進(jìn)行處理 fun main() = runBlocking<Unit> { // 收集這個流 simple() .map { request(it) } // 模擬異步請求 .collect { value -> // 終止操作符 println(value) // 打印結(jié)果 } } // 模擬異步請求,返回字符串 suspend fun request(i: Int): String { delay(1000) // 延遲1秒 return "response $i" }
輸出結(jié)果為:
response 1
response 2
response 3
可以看到,每個請求都延遲了1秒,但是不會阻塞主線程或其他請求。
Flow的生命周期和異常處理
Flow提供了一些回調(diào)函數(shù)來監(jiān)聽流的生命周期,例如:
- onStart:在流開始收集之前調(diào)用,可以用于執(zhí)行一些初始化操作,例如打開文件或數(shù)據(jù)庫連接等。
- onEach:在每個元素被發(fā)射之后調(diào)用,可以用于執(zhí)行一些通用操作,例如日志記錄或更新UI等。
- onCompletion:在流完成收集之后調(diào)用,無論是正常完成還是異常終止??梢杂糜趫?zhí)行一些清理操作,例如關(guān)閉文件或數(shù)據(jù)庫連接等。onCompletion可以接收一個可空的Throwable參數(shù),表示流終止的原因,如果為null,則表示正常完成。
例如,下面的代碼使用了onStart和onCompletion兩個回調(diào)函數(shù)來打印流的開始和結(jié)束時間:
// 對simple()返回的Flow進(jìn)行處理 fun main() = runBlocking<Unit> { // 收集這個流 simple() .onStart { println("Flow started at ${System.currentTimeMillis()}") } // 開始回調(diào) .onCompletion { println("Flow completed at ${System.currentTimeMillis()}") } // 結(jié)束回調(diào) .collect { value -> // 終止操作符 println(value) // 打印結(jié)果 } }
輸出結(jié)果為:
Flow started at 1632828678656
1
2
3
Flow completed at 1632828678657
可以看到,流的開始和結(jié)束時間都被打印出來了。
Flow也提供了一些方式來處理異常,例如:
- catch:在流發(fā)生異常時調(diào)用,可以用于捕獲和處理異常,并決定是否繼續(xù)或終止流。catch操作符必須放在可能發(fā)生異常的操作符之后,否則無法捕獲異常。
- try-catch:在收集流時使用try-catch塊包裹collect操作符,可以用于捕獲和處理異常,并決定是否繼續(xù)或終止程序。try-catch塊可以捕獲任何位置發(fā)生的異常。
例如,下面的代碼使用了catch和try-catch兩種方式來處理異常:
// 創(chuàng)建一個可能發(fā)生異常的Flow<Int> fun foo(): Flow<Int> = flow { for (i in 1..3) { println("Emitting $i") emit(i) // 發(fā)射下一個值 } throw RuntimeException() // 拋出異常 } // 對foo()返回的Flow進(jìn)行處理 fun main() = runBlocking<Unit> { // 使用catch操作符捕獲異常,并打印錯誤信息,然后繼續(xù)發(fā)射-1作為錯誤標(biāo)識 foo() .catch { e -> println("Caught $e") } // 捕獲異常 .emit(-1) // 發(fā)射錯誤標(biāo)識 .collect { value -> println(value) } println("Done") // 使用try-catch塊捕獲異常,并打印錯誤信息,然后終止程序 try { foo().collect { value -> println(value) } } catch (e: Throwable) { println("Caught $e") } println("Done") }
輸出結(jié)果為:
Emitting 1
1
Emitting 2
2
Emitting 3
3
Caught java.lang.RuntimeException
-1
Done
Emitting 1
1
Emitting 2
2
Emitting 3
3
Caught java.lang.RuntimeException
Done
可以看到,catch操作符可以在流中處理異常,并繼續(xù)發(fā)射值,而try-catch塊可以在程序中處理異常,并終止程序。
Flow的線程切換
Flow提供了一些操作符來切換上游和下游的上下文,例如:
- flowOn:flowOn操作符用于切換上游的上下文,也就是說,它會影響flow{}構(gòu)建器函數(shù)和之前的中間操作符的執(zhí)行上下文。flowOn操作符可以用于將耗時的計算操作放在后臺線程中執(zhí)行,而不影響主線程。
- launchIn:launchIn操作符用于切換下游的上下文,也就是說,它會影響collect操作符和之后的中間操作符的執(zhí)行上下文。launchIn操作符可以用于將收集操作放在協(xié)程中執(zhí)行,而不阻塞當(dāng)前線程。
例如,下面的代碼使用了flowOn和launchIn兩個操作符來切換上下文:
// 創(chuàng)建一個Flow<Int> fun simple(): Flow<Int> = flow { // 發(fā)射1到3,并打印當(dāng)前線程名 for (i in 1..3) { Thread.sleep(100) // 假裝我們以消耗 CPU 的方式進(jìn)行計算 log("Emitting $i") emit(i) // 發(fā)射下一個值 } }.flowOn(Dispatchers.Default) // 在流構(gòu)建器中改變消耗 CPU 代碼上下文 // 對simple()返回的Flow進(jìn)行處理 fun main() = runBlocking<Unit> { // 收集這個流,并打印當(dāng)前線程名 simple() .collect { value -> log("Collected $value") } println("Done") // 使用launchIn操作符將收集操作放在協(xié)程中執(zhí)行,并打印當(dāng)前線程名 simple() .onEach { value -> log("Collected $value") } .launchIn(this) // 在單獨的協(xié)程中收集并打印結(jié)果 println("Done") }
輸出結(jié)果為:
[DefaultDispatcher-worker-1] Emitting 1
[main] Collected 1
[DefaultDispatcher-worker-1] Emitting 2
[main] Collected 2
[DefaultDispatcher-worker-1] Emitting 3
[main] Collected 3
Done
Done
[DefaultDispatcher-worker-2] Emitting 1
[main] Collected 1
[DefaultDispatcher-worker-2] Emitting 2
[main] Collected 2
[DefaultDispatcher-worker-2] Emitting 3
[main] Collected 3
可以看到,flowOn操作符將發(fā)射操作放在了DefaultDispatcher線程中執(zhí)行,而collect操作仍然在主線程中執(zhí)行。launchIn操作符將收集操作放在了一個單獨的協(xié)程中執(zhí)行,而不阻塞主線程。
以上就是Kotlin協(xié)程之Flow的使用與原理解析的詳細(xì)內(nèi)容,更多關(guān)于Kotlin Flow使用與原理的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Android提高之SurfaceView與多線程的混搭實例
這篇文章主要介紹了Android提高之SurfaceView與多線程的混搭,很實用的功能,需要的朋友可以參考下2014-08-08AndroidStudio上傳本地項目到碼云的方法步驟(OSChina)
這篇文章主要介紹了AndroidStudio上傳本地項目到碼云的方法步驟(OSChina),小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-04-04Android菜單操作之創(chuàng)建并響應(yīng)菜單
這篇文章主要介紹了Android菜單操作之創(chuàng)建并響應(yīng)菜單的相關(guān)資料,如何使用代碼創(chuàng)建菜單項,給菜單項分組,及各種響應(yīng)菜單事件的方法,需要的朋友可以參考下2016-04-04