欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Kotlin協(xié)程Channel源碼示例淺析

 更新時(shí)間:2022年12月21日 16:13:24   作者:wayne214  
這篇文章主要為大家介紹了Kotlin協(xié)程Channel源碼示例淺析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

結(jié)論先行

Kotlin協(xié)程中的Channel用于處理多個(gè)數(shù)據(jù)組合的流,隨用隨取,時(shí)刻準(zhǔn)備著,就像自來(lái)水一樣,打開開關(guān)就有水了。

Channel使用示例

fun main() = runBlocking {
    logX("開始")
    val channel = Channel<Int> {  }
    launch {
        (1..3).forEach{
            channel.send(it)
            logX("發(fā)送數(shù)據(jù): $it")
        }
        // 關(guān)閉channel, 節(jié)省資源
        channel.close()
    }
    launch {
        for (i in channel){
            logX("接收數(shù)據(jù): $i")
        }
    }
    logX("結(jié)束")
}

示例代碼 使用Channel創(chuàng)建了一組int類型的數(shù)據(jù)流,通過(guò)send發(fā)送數(shù)據(jù),并通過(guò)for循環(huán)取出channel中的數(shù)據(jù),最后channel是一種協(xié)程資源,使用結(jié)束后應(yīng)該及時(shí)調(diào)用close方法關(guān)閉,以免浪費(fèi)不必要的資源。

Channel的源碼

public fun <E> Channel(
    capacity: Int = RENDEZVOUS,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> =
    when (capacity) {
        RENDEZVOUS -> {}
        CONFLATED -> {}
        UNLIMITED -> {}
        else -> {}
    }

可以看到Channel的構(gòu)造函數(shù)包含了三個(gè)參數(shù),分別是capacity、onBufferOverflow、onUndeliveredElement.

首先看capacity,這個(gè)參數(shù)代表了管道的容量,默認(rèn)參數(shù)是RENDEZVOUS,取值是0,還有其他一些值:

  • UNLIMITED: Int = Int.MAX_VALUE,沒(méi)有限量
  • CONFLATED: 容量為1,新的覆蓋舊的值
  • BUFFERED: 添加緩沖容量,默認(rèn)值是64,可以通過(guò)修改VM參數(shù):kotlinx.coroutines.channels.defaultBuffer,進(jìn)行修改

接下來(lái)看onBufferOverflow, 顧名思義就是管道容量滿了,怎么辦?默認(rèn)是掛起,也就是suspend,一共有三種分別是: SUSPNED、DROP_OLDEST以及DROP_LATEST

public enum class BufferOverflow {
    /**
     * Suspend on buffer overflow.
     */
    SUSPEND,
    /**
     * Drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
     */
    DROP_OLDEST,
    /**
     * Drop **the latest** value that is being added to the buffer right now on buffer overflow
     * (so that buffer contents stay the same), do not suspend.
     */
    DROP_LATEST
}
  • SUSPEND,當(dāng)管道的容量滿了以后,如果發(fā)送方還要繼續(xù)發(fā)送,我們就會(huì)掛起當(dāng)前的 send() 方法。由于它是一個(gè)掛起函數(shù),所以我們可以以非阻塞的方式,將發(fā)送方的執(zhí)行流程掛起,等管道中有了空閑位置以后再恢復(fù),有點(diǎn)像生產(chǎn)者-消費(fèi)者模型
  • DROP_OLDEST,顧名思義,就是丟棄最舊的那條數(shù)據(jù),然后發(fā)送新的數(shù)據(jù),有點(diǎn)像LRU算法。
  • DROP_LATEST,丟棄最新的那條數(shù)據(jù)。這里要注意,這個(gè)動(dòng)作的含義是丟棄當(dāng)前正準(zhǔn)備發(fā)送的那條數(shù)據(jù),而管道中的內(nèi)容將維持不變。

最后一個(gè)參數(shù)是onUndeliveredElement,從名字看像是沒(méi)有投遞成功的回調(diào),也確實(shí)如此,當(dāng)管道中某些數(shù)據(jù)沒(méi)有成功接收時(shí),這個(gè)就會(huì)被調(diào)用。

綜合這個(gè)參數(shù)使用一下

fun main() = runBlocking {
    println("開始")
    val channel = Channel<Int>(capacity = 2, onBufferOverflow = BufferOverflow.DROP_OLDEST) {
        println("onUndeliveredElement = $it")
    }
    launch {
        (1..3).forEach{
            channel.send(it)
            println("發(fā)送數(shù)據(jù): $it")
        }
        // 關(guān)閉channel, 節(jié)省資源
        channel.close()
    }
    launch {
        for (i in channel){
            println("接收數(shù)據(jù): $i")
        }
    }
    println("結(jié)束")
}
輸出結(jié)果如下:
開始
結(jié)束
發(fā)送數(shù)據(jù): 1
發(fā)送數(shù)據(jù): 2
發(fā)送數(shù)據(jù): 3
接收數(shù)據(jù): 2
接收數(shù)據(jù): 3

安全的從Channel中取數(shù)據(jù)

先看一個(gè)例子

val channel: ReceiveChannel<Int> = produce {
        (1..100).forEach{
            send(it)
            println("發(fā)送: $it")
        }
    }
while (!channel.isClosedForReceive){
    val i = channel.receive();
    println("接收: $i")
}
輸出報(bào)錯(cuò)信息:
Exception in thread "main" kotlinx.coroutines.channels.ClosedReceiveChannelException: Channel was closed

可以看到使用isClosedForReceive判斷是否關(guān)閉再使用receive方法接收數(shù)據(jù),依然會(huì)報(bào)錯(cuò),所以不推薦使用這種方式。

推薦使用上面for循環(huán)的方式取數(shù)據(jù),還有kotlin推薦的consumeEach方式,看一下示例代碼

val channel: ReceiveChannel<Int> = produce {
        (1..100).forEach{
            send(it)
            println("發(fā)送: $it")
        }
    }
channel.consumeEach {
    println("接收:$it")
}

所以,當(dāng)我們想要獲取Channel當(dāng)中的數(shù)據(jù)時(shí),我們盡量使用 for 循環(huán),或者是channel.consumeEach {},不要直接調(diào)用channel.receive()。

“熱的數(shù)據(jù)流”從何而來(lái)?

先看一下代碼

    println("開始")
    val channel = Channel<Int>(capacity = 3, onBufferOverflow = BufferOverflow.DROP_OLDEST) {
        println("onUndeliveredElement = $it")
    }
    launch {
        (1..3).forEach{
            channel.send(it)
            println("發(fā)送數(shù)據(jù): $it")
        }
    }
    println("結(jié)束")
}
輸出:
開始
結(jié)束
發(fā)送數(shù)據(jù): 1
發(fā)送數(shù)據(jù): 2
發(fā)送數(shù)據(jù): 3

可以看到上述代碼中并沒(méi)有 取channel中的數(shù)據(jù),但是發(fā)送的代碼正常執(zhí)行了,這種“不管有沒(méi)有接收方,發(fā)送方都會(huì)工作”的模式,就是我們將其認(rèn)定為“熱”的原因。

舉個(gè)例子,就像去海底撈吃火鍋一樣,你不需要主動(dòng)要求服務(wù)員加水,服務(wù)員看到你的杯子中水少了,會(huì)自動(dòng)給你添加,你只管拿起水杯喝水就行了。

總的來(lái)說(shuō),不管接收方是否存在,Channel 的發(fā)送方一定會(huì)工作。

Channel能力的來(lái)源

通過(guò)源碼可以看到Channel只是一個(gè)接口,它的能力來(lái)源于SendChannel和ReceiveChannel,一個(gè)發(fā)送管道,一個(gè)接收管道,相當(dāng)于做了一個(gè)組合。

這也是一種良好的設(shè)計(jì)思想,“對(duì)讀取開放,對(duì)寫入封閉”的開閉原則。

以上就是Kotlin協(xié)程Channel源碼示例淺析的詳細(xì)內(nèi)容,更多關(guān)于Kotlin協(xié)程Channel的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評(píng)論