欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Kotlin協(xié)程與并發(fā)深入全面講解

 更新時間:2022年11月24日 09:53:20   作者:且聽真言  
Android官方對協(xié)程的定義-協(xié)程是一種并發(fā)設(shè)計模式,您可以在Android平臺上使用它來簡化異步執(zhí)行的代碼。協(xié)程是在版本1.3中添加到Kotlin的,它基于來自其他語言的既定概念

協(xié)程與并發(fā)

Kotlin協(xié)程是基于線程執(zhí)行的。經(jīng)過一層封裝以后,Kotlin協(xié)程面對并發(fā),處理方式與Java不同。

在java的世界里,并發(fā)往往是多個線程一起工作,存在共享的變量。需要處理好同步問題。要避免把協(xié)程與線程的概念混淆。

runBlocking {
        var i = 0
        launch(Dispatchers.Default) {
            repeat(1000) {
                i++
            }
        }
        delay(1000L)
        println(i)
    }

Log
1000
 
Process finished with exit code 0

上述代碼中沒有任何并發(fā)任務(wù),launch創(chuàng)建了一個協(xié)程,所有的計算都發(fā)生在協(xié)程中。所以不需要考慮同步問題。

1.協(xié)程并發(fā)問題

多個協(xié)程并發(fā)執(zhí)行的例子:

runBlocking {
        var i = 0
        val jobs = mutableListOf<Job>()
        repeat(10) {
            val job = launch(Dispatchers.Default) {
                repeat(1000) {
                    i++
                }
            }
            jobs.add(job)
        }
        jobs.joinAll()
        println(i)
    }

9933
 
Process finished with exit code 0

上述代碼中,創(chuàng)建了10個協(xié)程任務(wù),每個協(xié)程任務(wù)都會工作在Default線程池中,這10個協(xié)程任務(wù)對i進行1000次自增操作,但是因為10個協(xié)程分別運行在不同的線程之前,且共享一個變量,所以會產(chǎn)生同步問題。

2.協(xié)程處理并發(fā)的手段

在Java中的同步手段有:synchronized、Atomic、Lock等;

使用@Synchronized注解或者synchronized(){}代碼塊

 runBlocking {
        var i = 0
        val lock = Any()
        val jobs = mutableListOf<Job>()
        repeat(10) {
            val job = launch(Dispatchers.Default) {
                repeat(1000) {
                    synchronized(lock) {
                        i++
                    }
                }
            }
            jobs.add(job)
        }
        jobs.joinAll()
        println(i)
    }

10000
 
Process finished with exit code 0

如何在上面的synchronized代碼塊中加入掛起函數(shù),則發(fā)現(xiàn)會報錯。

如下:

  runBlocking {
        suspend fun prepare() {
        }
        var i = 0
        val lock = Any()
        val jobs = mutableListOf<Job>()
        repeat(10) {
            val job = launch(Dispatchers.Default) {
                repeat(1000) {
                    synchronized(lock) {
                        prepare()
                        i++
                    }
                }
            }
            jobs.add(job)
        }
        jobs.joinAll()
        println(i)
    }

所以可以發(fā)現(xiàn)不能在synchronized{}當(dāng)中調(diào)用掛起函數(shù),編譯器會報錯。因為掛起函數(shù)會被翻譯成帶有Continuation的異步函數(shù),造成synchronized代碼塊無法同步處理。

協(xié)程并發(fā)思路

單線程并發(fā)

在Kotlin協(xié)程中可以實現(xiàn)單線程并發(fā)。

runBlocking {
        suspend fun getResult1(): String {
            printlnCoroutine("Start getResult1")
            delay(1000L)
            printlnCoroutine("End getResult1")
            return "Result1"
        }
        suspend fun getResult2(): String {
            printlnCoroutine("Start getResult2")
            delay(1000L)
            printlnCoroutine("End getResult2")
            return "Result2"
        }
        suspend fun getResult3(): String {
            printlnCoroutine("Start getResult3")
            delay(1000L)
            printlnCoroutine("End getResult3")
            return "Result3"
        }
        val results = mutableListOf<String>()
        val time = measureTimeMillis {
            val result1 = async {
                getResult1()
            }
            val result2 = async {
                getResult2()
            }
            val result3 = async {
                getResult3()
            }
            results.add(result1.await())
            results.add(result2.await())
            results.add(result3.await())
        }
        println("Time:$time")
        println(results)
    }
fun printlnCoroutine(any: Any?) {
    println("" + any + ";Thread:" + Thread.currentThread().name)
}

Log
 
Start getResult1;Thread:main @coroutine#2
Start getResult2;Thread:main @coroutine#3
Start getResult3;Thread:main @coroutine#4
End getResult1;Thread:main @coroutine#2
End getResult2;Thread:main @coroutine#3
End getResult3;Thread:main @coroutine#4
Time:1028
[Result1, Result2, Result3]
 
Process finished with exit code 0

上面代碼啟動了三個協(xié)程,它們之間是并發(fā)執(zhí)行的,每個協(xié)程耗時1000ms,總耗時1000多毫秒,而且這幾個協(xié)程都運行在main線程上。

所以 可以考慮將i++邏輯分發(fā)到單線程之上。

 runBlocking {
        val coroutineDispatcher = Executors.newSingleThreadExecutor {
            Thread(it, "MySingleThread").apply {
                isDaemon = true
            }
        }.asCoroutineDispatcher()
        var i = 0
        val jobs = mutableListOf<Job>()
        repeat(10) {
            val job = launch(coroutineDispatcher) {
                repeat(1000) {
                        i++
                }
            }
            jobs.add(job)
        }
        jobs.joinAll()
        println(i)
    }

10000
 
Process finished with exit code 0

上述代碼把所有協(xié)程任務(wù)分發(fā)到單獨的線程中執(zhí)行,但這10個協(xié)程是并發(fā)執(zhí)行的。

Mutex

在java中,Lock之類的同步鎖是阻塞式的,而Kotlin提供了非阻塞式的鎖:Mutex。

   runBlocking {
        val mutex = Mutex()
        var i = 0
        val jobs = mutableListOf<Job>()
        repeat(10) {
            val job = launch(Dispatchers.Default) {
                repeat(1000) {
                    mutex.lock()
                    i++
                    mutex.unlock()
                }
            }
            jobs.add(job)
        }
        jobs.joinAll()
        println(i)
    }

Log
10000
 
Process finished with exit code 0

上述代碼使用mutex.lock()、 mutex.unlock()包裹同步計算邏輯,實現(xiàn)多線程同步。Mutex 對比 JDK 當(dāng)中的鎖,最大的優(yōu)勢就在于支持掛起和恢復(fù)。

public interface Mutex {
    public val isLocked: Boolean
    public fun tryLock(owner: Any? = null): Boolean
    public suspend fun lock(owner: Any? = null;
    @Deprecated(level = DeprecationLevel.WARNING, message = "Mutex.onLock deprecated without replacement. " +
        "For additional details please refer to #2794") // WARNING since 1.6.0
    public val onLock: SelectClause2<Any?, Mutex>
    public fun holdsLock(owner: Any): Boolean
    public fun unlock(owner: Any? = null)
}

Mutex 是一個接口,它的 lock() 方法其實是一個掛起函數(shù)。而這就是實現(xiàn)非阻塞式同步鎖的根本原因。

但是上述代碼中對于 Mutex 的使用其實是錯誤的,會存在問題。如果代碼在 mutex.lock()、mutex.unlock() 之間發(fā)生異常,從而導(dǎo)致 mutex.unlock() 無法被調(diào)用。這個時候,整個程序的執(zhí)行流程就會一直卡住,無法結(jié)束??聪旅娲a:

runBlocking {
        val mutex = Mutex()
        var i = 0
        val jobs = mutableListOf<Job>()
        repeat(10) {
            val job = launch(Dispatchers.Default) {
                repeat(1000) {
                    mutex.lock()
                    i++
                    i/0
                    mutex.unlock()
                }
            }
            jobs.add(job)
        }
        jobs.joinAll()
        println(i)
    }

如何解決?使用mutex.withLock{}。

代碼入下:

 runBlocking {
        val mutex = Mutex()
        var i = 0
        val jobs = mutableListOf<Job>()
        repeat(10) {
            val job = launch(Dispatchers.Default) {
                repeat(1000) {
                    mutex.withLock {
                        i++
                    }
                }
            }
            jobs.add(job)
        }
        jobs.joinAll()
        println(i)
    }

10000
 
Process finished with exit code 0

public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T {
    contract { 
        callsInPlace(action, InvocationKind.EXACTLY_ONCE)
    }
    lock(owner)
    try {
        return action()
    } finally {
        unlock(owner)
    }
}

withLock{} 的本質(zhì),其實是在 finally{} 當(dāng)中調(diào)用了 unlock()。

Actor

Actor,它本質(zhì)上是基于 Channel 管道消息實現(xiàn)的。

sealed class Msg
object AddMsg : Msg()
class ResultMsg(val result: CompletableDeferred<Int>) : Msg()
fun testCoroutinueConcurrent10() {
    runBlocking {
        suspend fun addActor() = actor<Msg> {
            var counter = 0
            for (msg in channel) {
                when (msg) {
                    is AddMsg -> counter++
                    is ResultMsg -> msg.result.complete(counter)
                }
            }
        }
        val actor = addActor()
        val jobs = mutableListOf<Job>()
        repeat(10) {
            val job = launch(Dispatchers.Default) {
                repeat(1000) {
                    actor.send(AddMsg)
                }
            }
            jobs.add(job)
        }
        jobs.joinAll()
        val deferred = CompletableDeferred<Int>()
        actor.send(ResultMsg(deferred))
        val result = deferred.await()
        actor.close()
        println(result)
    }
}

Log
10000
 
Process finished with exit code 0

addActor() 掛起函數(shù),它其實調(diào)用了 actor() 這個高階函數(shù)。而這個函數(shù)的返回值類型其實是 SendChannel。由此可見,Kotlin 當(dāng)中的 Actor 其實就是 Channel 的簡單封裝。Actor 的多線程同步能力都源自于 Channel。這里,我們借助密封類定義了兩種消息類型,AddMsg、ResultMsg,然后在 actor{} 內(nèi)部,我們處理這兩種消息類型,如果我們收到了 AddMsg,則計算“i++”;如果收到了 ResultMsg,則返回計算結(jié)果。而在 actor{} 的外部,我們則只需要發(fā)送 10000 次的 AddMsg 消息,最后再發(fā)送一次 ResultMsg,取回計算結(jié)果即可。Actor 本質(zhì)上是基于 Channel 管道消息實現(xiàn)的。

避免共享可變狀態(tài)

 runBlocking {
        val deferreds = mutableListOf<Deferred<Int>>()
        repeat(10) {
            val deferred = async(Dispatchers.Default) {
                var i = 0
                repeat(1000) {
                    i++
                }
                return@async i
            }
            deferreds.add(deferred)
        }
        var result = 0
        deferreds.forEach {
            result += it.await()
        }
        println(result)
    }

Log
 
10000
 
Process finished with exit code 0

在每一個協(xié)程當(dāng)中,都有一個局部的變量 i,同時將 launch 都改為了 async,讓每一個協(xié)程都可以返回計算結(jié)果。這種方式,相當(dāng)于將 10000 次計算,平均分配給了 10 個協(xié)程,讓它們各自計算 1000 次。這樣一來,每個協(xié)程都可以進行獨立的計算,然后我們將 10 個協(xié)程的結(jié)果匯總起來,最后累加在一起。

 runBlocking {
        val result = (1..10).map {
            async(Dispatchers.Default) {
                var i = 0
                repeat(1000) {
                    i++
                }
                return@async i
            }
        }.awaitAll()
            .sum()
        println(result)
    }

Log
10000
 
Process finished with exit code 0

到此這篇關(guān)于Kotlin協(xié)程與并發(fā)深入全面講解的文章就介紹到這了,更多相關(guān)Kotlin協(xié)程與并發(fā)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java的jps命令簡介及使用示例詳解

    Java的jps命令簡介及使用示例詳解

    jps是jdk提供的一個查看當(dāng)前java進程的小工具,?可以看做是JavaVirtual?Machine?Process?Status?Tool的縮寫,非常簡單實用,本文重點給大家介紹下Java的jps命令使用,感興趣的朋友一起看看吧
    2022-03-03
  • spring?batch線上異常定位記錄

    spring?batch線上異常定位記錄

    這篇文章主要為大家介紹了spring?batch線上異常定位記錄及異常解決,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步早日升職加薪
    2022-03-03
  • postman中POST請求時參數(shù)包含參數(shù)list設(shè)置方式

    postman中POST請求時參數(shù)包含參數(shù)list設(shè)置方式

    這篇文章主要介紹了postman中POST請求時參數(shù)包含參數(shù)list設(shè)置方式,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-05-05
  • 詳解Spring boot使用Redis集群替換mybatis二級緩存

    詳解Spring boot使用Redis集群替換mybatis二級緩存

    本篇文章主要介紹了詳解Spring boot使用Redis集群替換mybatis二級緩存,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2017-05-05
  • Java數(shù)組的基本操作方法整理

    Java數(shù)組的基本操作方法整理

    這篇文章主要給大家介紹了關(guān)于Java中數(shù)組的定義和使用的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-06-06
  • Java中的ThreadLocal詳解

    Java中的ThreadLocal詳解

    這篇文章主要介紹了Java中的ThreadLocal詳解,ThreadLocal?是一個線程局部變量,其實的功用非常簡單,就是為每一個使用該變量的線程都提供一個變量值的副本,是Java中一種較為特殊的線程綁定機制,需要的朋友可以參考下
    2023-09-09
  • Java實戰(zhàn)項目練習(xí)之球館在線預(yù)約系統(tǒng)的實現(xiàn)

    Java實戰(zhàn)項目練習(xí)之球館在線預(yù)約系統(tǒng)的實現(xiàn)

    理論是遠(yuǎn)遠(yuǎn)不夠的,只有在實戰(zhàn)中才能獲得能力的提升,本篇文章手把手帶你用java+SpringBoot+maven+freemark+Mysql實現(xiàn)一個球館在線預(yù)約系統(tǒng),大家可以在過程中查缺補漏,提升水平
    2022-01-01
  • SpringBoot中使用JeecgBoot的Autopoi導(dǎo)出Excel的方法步驟

    SpringBoot中使用JeecgBoot的Autopoi導(dǎo)出Excel的方法步驟

    這篇文章主要介紹了SpringBoot中使用JeecgBoot的Autopoi導(dǎo)出Excel的方法步驟,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-09-09
  • javaWeb實現(xiàn)學(xué)生信息管理系統(tǒng)

    javaWeb實現(xiàn)學(xué)生信息管理系統(tǒng)

    這篇文章主要為大家詳細(xì)介紹了javaWeb實現(xiàn)學(xué)生信息管理系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2018-01-01
  • JVM指令的使用深入詳解

    JVM指令的使用深入詳解

    這篇文章主要給大家介紹了關(guān)于JVM指令使用的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-01-01

最新評論