欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Kotlin協(xié)程之Flow的使用與原理解析

 更新時(shí)間:2023年09月07日 10:31:01   作者:淘淘養(yǎng)樂多  
Flow是一種數(shù)據(jù)流,可以用于協(xié)程間的通信,具有冷、懶、響應(yīng)式等特點(diǎn),Flow是基于協(xié)程構(gòu)建的,可以提供多個(gè)值,本文就來(lái)給大家講講Kotlin Flow使用與原理,需要的朋友可以參考下

Flow的定義和特點(diǎn)

Flow是一種數(shù)據(jù)流,可以用于協(xié)程間的通信,具有冷、懶、響應(yīng)式等特點(diǎn)。Flow是基于協(xié)程構(gòu)建的,可以提供多個(gè)值。Flow在概念上類似于一個(gè)數(shù)據(jù)序列,但它可以使用掛起函數(shù)來(lái)異步地產(chǎn)生和消費(fèi)值。這意味著,例如,F(xiàn)low可以安全地發(fā)起網(wǎng)絡(luò)請(qǐng)求來(lái)產(chǎn)生下一個(gè)值,而不會(huì)阻塞主線程。

Flow的特點(diǎn)主要有以下幾點(diǎn):

  • :Flow是冷的,也就是說,它不會(huì)在沒有收集器的情況下開始執(zhí)行。只有當(dāng)有收集器訂閱了Flow時(shí),它才會(huì)開始發(fā)射值。這與熱的數(shù)據(jù)流不同,熱的數(shù)據(jù)流會(huì)在沒有收集器的情況下也產(chǎn)生值。
  • :Flow是懶的,也就是說,它只會(huì)在需要時(shí)才計(jì)算值。每個(gè)收集器都會(huì)觸發(fā)Flow的重新計(jì)算,而不是共享之前計(jì)算的結(jié)果。這與惰性序列類似,惰性序列只會(huì)在迭代時(shí)才計(jì)算元素。
  • 響應(yīng)式:Flow是響應(yīng)式的,也就是說,它可以根據(jù)收集器的需求來(lái)調(diào)整發(fā)射速度。如果收集器不能及時(shí)處理值,F(xiàn)low可以暫停或取消發(fā)射。這與反應(yīng)式流規(guī)范(Reactive Streams Specification)中定義的背壓(backpressure)機(jī)制類似。

Flow的創(chuàng)建和操作

Flow可以通過多種方式創(chuàng)建,最簡(jiǎn)單的方式是使用flow{}構(gòu)建器函數(shù),在其中使用emit函數(shù)手動(dòng)發(fā)射值。例如,下面的代碼創(chuàng)建了一個(gè)發(fā)射1到3的整數(shù)值的Flow:

// 創(chuàng)建一個(gè)Flow<Int>
fun simple(): Flow<Int> = flow {
    // 發(fā)射1到3
    for (i in 1..3) {
        emit(i) // 發(fā)射下一個(gè)值
    }
}

除了flow{}構(gòu)建器函數(shù)外,還有一些其他方式可以創(chuàng)建Flow,例如:

  • 使用flowOf()函數(shù)創(chuàng)建一個(gè)包含固定元素的Flow。
  • 使用asFlow()擴(kuò)展函數(shù)將各種集合和序列轉(zhuǎn)換為Flow。
  • 使用channelFlow()構(gòu)建器函數(shù)創(chuàng)建一個(gè)基于通道(Channel)的Flow。
  • 使用callbackFlow()構(gòu)建器函數(shù)創(chuàng)建一個(gè)基于回調(diào)(Callback)的Flow。

創(chuàng)建好Flow后,可以使用各種操作符對(duì)數(shù)據(jù)進(jìn)行處理。操作符分為兩類:

  • 中間操作符:中間操作符用于對(duì)數(shù)據(jù)進(jìn)行轉(zhuǎn)換、過濾、組合等操作,但不會(huì)終止流。中間操作符返回一個(gè)新的Flow,可以鏈?zhǔn)秸{(diào)用多個(gè)中間操作符。例如,filter、map、take等操作符都是中間操作符。
  • 終止操作符:終止操作符用于對(duì)數(shù)據(jù)進(jìn)行收集、聚合、統(tǒng)計(jì)等操作,并終止流。終止操作符返回一個(gè)非Flow類型的結(jié)果,并觸發(fā)流的執(zhí)行。例如,collect、first、toList等操作符都是終止操作符。

例如,下面的代碼使用了map和filter兩個(gè)中間操作符對(duì)simple()函數(shù)返回的Flow進(jìn)行了轉(zhuǎn)換和過濾,并使用了collect終止操作符對(duì)結(jié)果進(jìn)行了打?。?/p>

// 對(duì)simple()返回的Flow進(jìn)行處理
fun main() = runBlocking<Unit> {
    // 啟動(dòng)并發(fā)協(xié)程以驗(yàn)證主線程并未阻塞
    launch {
        for (k in 1..3) {
            println("I'm not blocked $k")
            delay(100)
        }
    }
    // 收集這個(gè)流
    simple()
        .map { it * it } // 數(shù)字求平方
        .filter { it % 2 == 0 } // 過濾偶數(shù)
        .collect { value -> // 終止操作符
            println(value) // 打印結(jié)果
        }
}

輸出結(jié)果為:

I'm not blocked 1
4
I'm not blocked 2
I'm not blocked 3

可以看到,F(xiàn)low的執(zhí)行是在協(xié)程中進(jìn)行的,不會(huì)阻塞主線程。同時(shí),F(xiàn)low的操作符也是掛起函數(shù),可以在其中進(jìn)行異步操作,例如:

// 對(duì)simple()返回的Flow進(jìn)行處理
fun main() = runBlocking<Unit> {
    // 收集這個(gè)流
    simple()
        .map { request(it) } // 模擬異步請(qǐng)求
        .collect { value -> // 終止操作符
            println(value) // 打印結(jié)果
        }
}
// 模擬異步請(qǐng)求,返回字符串
suspend fun request(i: Int): String {
    delay(1000) // 延遲1秒
    return "response $i"
}

輸出結(jié)果為:

response 1
response 2
response 3

可以看到,每個(gè)請(qǐng)求都延遲了1秒,但是不會(huì)阻塞主線程或其他請(qǐng)求。

Flow的生命周期和異常處理

Flow提供了一些回調(diào)函數(shù)來(lái)監(jiān)聽流的生命周期,例如:

  • onStart:在流開始收集之前調(diào)用,可以用于執(zhí)行一些初始化操作,例如打開文件或數(shù)據(jù)庫(kù)連接等。
  • onEach:在每個(gè)元素被發(fā)射之后調(diào)用,可以用于執(zhí)行一些通用操作,例如日志記錄或更新UI等。
  • onCompletion:在流完成收集之后調(diào)用,無(wú)論是正常完成還是異常終止。可以用于執(zhí)行一些清理操作,例如關(guān)閉文件或數(shù)據(jù)庫(kù)連接等。onCompletion可以接收一個(gè)可空的Throwable參數(shù),表示流終止的原因,如果為null,則表示正常完成。

例如,下面的代碼使用了onStart和onCompletion兩個(gè)回調(diào)函數(shù)來(lái)打印流的開始和結(jié)束時(shí)間:

// 對(duì)simple()返回的Flow進(jìn)行處理
fun main() = runBlocking<Unit> {
    // 收集這個(gè)流
    simple()
        .onStart { println("Flow started at ${System.currentTimeMillis()}") } // 開始回調(diào)
        .onCompletion { println("Flow completed at ${System.currentTimeMillis()}") } // 結(jié)束回調(diào)
        .collect { value -> // 終止操作符
            println(value) // 打印結(jié)果
        }
}

輸出結(jié)果為:

Flow started at 1632828678656
1
2
3
Flow completed at 1632828678657

可以看到,流的開始和結(jié)束時(shí)間都被打印出來(lái)了。

Flow也提供了一些方式來(lái)處理異常,例如:

  • catch:在流發(fā)生異常時(shí)調(diào)用,可以用于捕獲和處理異常,并決定是否繼續(xù)或終止流。catch操作符必須放在可能發(fā)生異常的操作符之后,否則無(wú)法捕獲異常。
  • try-catch:在收集流時(shí)使用try-catch塊包裹c(diǎn)ollect操作符,可以用于捕獲和處理異常,并決定是否繼續(xù)或終止程序。try-catch塊可以捕獲任何位置發(fā)生的異常。

例如,下面的代碼使用了catch和try-catch兩種方式來(lái)處理異常:

// 創(chuàng)建一個(gè)可能發(fā)生異常的Flow<Int>
fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i) // 發(fā)射下一個(gè)值
    }
    throw RuntimeException() // 拋出異常
}
// 對(duì)foo()返回的Flow進(jìn)行處理
fun main() = runBlocking<Unit> {
    // 使用catch操作符捕獲異常,并打印錯(cuò)誤信息,然后繼續(xù)發(fā)射-1作為錯(cuò)誤標(biāo)識(shí)
    foo()
        .catch { e -> println("Caught $e") } // 捕獲異常
        .emit(-1) // 發(fā)射錯(cuò)誤標(biāo)識(shí)
        .collect { value ->
            println(value)
        }
    println("Done")
// 使用try-catch塊捕獲異常,并打印錯(cuò)誤信息,然后終止程序
    try {
        foo().collect { value ->
            println(value)
        }
    } catch (e: Throwable) {
        println("Caught $e")
    }
    println("Done")
}

輸出結(jié)果為:

Emitting 1
1
Emitting 2
2
Emitting 3
3
Caught java.lang.RuntimeException
-1
Done
Emitting 1
1
Emitting 2
2
Emitting 3
3
Caught java.lang.RuntimeException
Done

可以看到,catch操作符可以在流中處理異常,并繼續(xù)發(fā)射值,而try-catch塊可以在程序中處理異常,并終止程序。

Flow的線程切換

Flow提供了一些操作符來(lái)切換上游和下游的上下文,例如:

  • flowOn:flowOn操作符用于切換上游的上下文,也就是說,它會(huì)影響flow{}構(gòu)建器函數(shù)和之前的中間操作符的執(zhí)行上下文。flowOn操作符可以用于將耗時(shí)的計(jì)算操作放在后臺(tái)線程中執(zhí)行,而不影響主線程。
  • launchIn:launchIn操作符用于切換下游的上下文,也就是說,它會(huì)影響collect操作符和之后的中間操作符的執(zhí)行上下文。launchIn操作符可以用于將收集操作放在協(xié)程中執(zhí)行,而不阻塞當(dāng)前線程。

例如,下面的代碼使用了flowOn和launchIn兩個(gè)操作符來(lái)切換上下文:

// 創(chuàng)建一個(gè)Flow<Int>
fun simple(): Flow<Int> = flow {
    // 發(fā)射1到3,并打印當(dāng)前線程名
    for (i in 1..3) {
        Thread.sleep(100) // 假裝我們以消耗 CPU 的方式進(jìn)行計(jì)算
        log("Emitting $i")
        emit(i) // 發(fā)射下一個(gè)值
    }
}.flowOn(Dispatchers.Default) // 在流構(gòu)建器中改變消耗 CPU 代碼上下文
// 對(duì)simple()返回的Flow進(jìn)行處理
fun main() = runBlocking<Unit> {
    // 收集這個(gè)流,并打印當(dāng)前線程名
    simple()
        .collect { value ->
            log("Collected $value")
        }
    println("Done")
    // 使用launchIn操作符將收集操作放在協(xié)程中執(zhí)行,并打印當(dāng)前線程名
    simple()
        .onEach { value ->
            log("Collected $value")
        }
        .launchIn(this) // 在單獨(dú)的協(xié)程中收集并打印結(jié)果
    println("Done")
}

輸出結(jié)果為:

[DefaultDispatcher-worker-1] Emitting 1
[main] Collected 1
[DefaultDispatcher-worker-1] Emitting 2
[main] Collected 2
[DefaultDispatcher-worker-1] Emitting 3
[main] Collected 3
Done
Done
[DefaultDispatcher-worker-2] Emitting 1
[main] Collected 1
[DefaultDispatcher-worker-2] Emitting 2
[main] Collected 2
[DefaultDispatcher-worker-2] Emitting 3
[main] Collected 3

可以看到,flowOn操作符將發(fā)射操作放在了DefaultDispatcher線程中執(zhí)行,而collect操作仍然在主線程中執(zhí)行。launchIn操作符將收集操作放在了一個(gè)單獨(dú)的協(xié)程中執(zhí)行,而不阻塞主線程。

以上就是Kotlin協(xié)程之Flow的使用與原理解析的詳細(xì)內(nèi)容,更多關(guān)于Kotlin Flow使用與原理的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評(píng)論