Kotlin協(xié)程與并發(fā)深入全面講解
協(xié)程與并發(fā)
Kotlin協(xié)程是基于線程執(zhí)行的。經(jīng)過一層封裝以后,Kotlin協(xié)程面對并發(fā),處理方式與Java不同。
在java的世界里,并發(fā)往往是多個線程一起工作,存在共享的變量。需要處理好同步問題。要避免把協(xié)程與線程的概念混淆。
runBlocking { var i = 0 launch(Dispatchers.Default) { repeat(1000) { i++ } } delay(1000L) println(i) }
Log
1000
Process finished with exit code 0
上述代碼中沒有任何并發(fā)任務(wù),launch創(chuàng)建了一個協(xié)程,所有的計算都發(fā)生在協(xié)程中。所以不需要考慮同步問題。
1.協(xié)程并發(fā)問題
多個協(xié)程并發(fā)執(zhí)行的例子:
runBlocking { var i = 0 val jobs = mutableListOf<Job>() repeat(10) { val job = launch(Dispatchers.Default) { repeat(1000) { i++ } } jobs.add(job) } jobs.joinAll() println(i) }
9933
Process finished with exit code 0
上述代碼中,創(chuàng)建了10個協(xié)程任務(wù),每個協(xié)程任務(wù)都會工作在Default線程池中,這10個協(xié)程任務(wù)對i進行1000次自增操作,但是因為10個協(xié)程分別運行在不同的線程之前,且共享一個變量,所以會產(chǎn)生同步問題。
2.協(xié)程處理并發(fā)的手段
在Java中的同步手段有:synchronized、Atomic、Lock等;
使用@Synchronized注解或者synchronized(){}代碼塊
runBlocking { var i = 0 val lock = Any() val jobs = mutableListOf<Job>() repeat(10) { val job = launch(Dispatchers.Default) { repeat(1000) { synchronized(lock) { i++ } } } jobs.add(job) } jobs.joinAll() println(i) }
10000
Process finished with exit code 0
如何在上面的synchronized代碼塊中加入掛起函數(shù),則發(fā)現(xiàn)會報錯。
如下:
runBlocking { suspend fun prepare() { } var i = 0 val lock = Any() val jobs = mutableListOf<Job>() repeat(10) { val job = launch(Dispatchers.Default) { repeat(1000) { synchronized(lock) { prepare() i++ } } } jobs.add(job) } jobs.joinAll() println(i) }
所以可以發(fā)現(xiàn)不能在synchronized{}當(dāng)中調(diào)用掛起函數(shù),編譯器會報錯。因為掛起函數(shù)會被翻譯成帶有Continuation的異步函數(shù),造成synchronized代碼塊無法同步處理。
協(xié)程并發(fā)思路
單線程并發(fā)
在Kotlin協(xié)程中可以實現(xiàn)單線程并發(fā)。
runBlocking { suspend fun getResult1(): String { printlnCoroutine("Start getResult1") delay(1000L) printlnCoroutine("End getResult1") return "Result1" } suspend fun getResult2(): String { printlnCoroutine("Start getResult2") delay(1000L) printlnCoroutine("End getResult2") return "Result2" } suspend fun getResult3(): String { printlnCoroutine("Start getResult3") delay(1000L) printlnCoroutine("End getResult3") return "Result3" } val results = mutableListOf<String>() val time = measureTimeMillis { val result1 = async { getResult1() } val result2 = async { getResult2() } val result3 = async { getResult3() } results.add(result1.await()) results.add(result2.await()) results.add(result3.await()) } println("Time:$time") println(results) } fun printlnCoroutine(any: Any?) { println("" + any + ";Thread:" + Thread.currentThread().name) }
Log
Start getResult1;Thread:main @coroutine#2
Start getResult2;Thread:main @coroutine#3
Start getResult3;Thread:main @coroutine#4
End getResult1;Thread:main @coroutine#2
End getResult2;Thread:main @coroutine#3
End getResult3;Thread:main @coroutine#4
Time:1028
[Result1, Result2, Result3]
Process finished with exit code 0
上面代碼啟動了三個協(xié)程,它們之間是并發(fā)執(zhí)行的,每個協(xié)程耗時1000ms,總耗時1000多毫秒,而且這幾個協(xié)程都運行在main線程上。
所以 可以考慮將i++邏輯分發(fā)到單線程之上。
runBlocking { val coroutineDispatcher = Executors.newSingleThreadExecutor { Thread(it, "MySingleThread").apply { isDaemon = true } }.asCoroutineDispatcher() var i = 0 val jobs = mutableListOf<Job>() repeat(10) { val job = launch(coroutineDispatcher) { repeat(1000) { i++ } } jobs.add(job) } jobs.joinAll() println(i) }
10000
Process finished with exit code 0
上述代碼把所有協(xié)程任務(wù)分發(fā)到單獨的線程中執(zhí)行,但這10個協(xié)程是并發(fā)執(zhí)行的。
Mutex
在java中,Lock之類的同步鎖是阻塞式的,而Kotlin提供了非阻塞式的鎖:Mutex。
runBlocking { val mutex = Mutex() var i = 0 val jobs = mutableListOf<Job>() repeat(10) { val job = launch(Dispatchers.Default) { repeat(1000) { mutex.lock() i++ mutex.unlock() } } jobs.add(job) } jobs.joinAll() println(i) }
Log
10000
Process finished with exit code 0
上述代碼使用mutex.lock()、 mutex.unlock()包裹同步計算邏輯,實現(xiàn)多線程同步。Mutex 對比 JDK 當(dāng)中的鎖,最大的優(yōu)勢就在于支持掛起和恢復(fù)。
public interface Mutex { public val isLocked: Boolean public fun tryLock(owner: Any? = null): Boolean public suspend fun lock(owner: Any? = null; @Deprecated(level = DeprecationLevel.WARNING, message = "Mutex.onLock deprecated without replacement. " + "For additional details please refer to #2794") // WARNING since 1.6.0 public val onLock: SelectClause2<Any?, Mutex> public fun holdsLock(owner: Any): Boolean public fun unlock(owner: Any? = null) }
Mutex 是一個接口,它的 lock() 方法其實是一個掛起函數(shù)。而這就是實現(xiàn)非阻塞式同步鎖的根本原因。
但是上述代碼中對于 Mutex 的使用其實是錯誤的,會存在問題。如果代碼在 mutex.lock()、mutex.unlock() 之間發(fā)生異常,從而導(dǎo)致 mutex.unlock() 無法被調(diào)用。這個時候,整個程序的執(zhí)行流程就會一直卡住,無法結(jié)束??聪旅娲a:
runBlocking { val mutex = Mutex() var i = 0 val jobs = mutableListOf<Job>() repeat(10) { val job = launch(Dispatchers.Default) { repeat(1000) { mutex.lock() i++ i/0 mutex.unlock() } } jobs.add(job) } jobs.joinAll() println(i) }
如何解決?使用mutex.withLock{}。
代碼入下:
runBlocking { val mutex = Mutex() var i = 0 val jobs = mutableListOf<Job>() repeat(10) { val job = launch(Dispatchers.Default) { repeat(1000) { mutex.withLock { i++ } } } jobs.add(job) } jobs.joinAll() println(i) }
10000
Process finished with exit code 0
public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T { contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) } lock(owner) try { return action() } finally { unlock(owner) } }
withLock{} 的本質(zhì),其實是在 finally{} 當(dāng)中調(diào)用了 unlock()。
Actor
Actor,它本質(zhì)上是基于 Channel 管道消息實現(xiàn)的。
sealed class Msg object AddMsg : Msg() class ResultMsg(val result: CompletableDeferred<Int>) : Msg() fun testCoroutinueConcurrent10() { runBlocking { suspend fun addActor() = actor<Msg> { var counter = 0 for (msg in channel) { when (msg) { is AddMsg -> counter++ is ResultMsg -> msg.result.complete(counter) } } } val actor = addActor() val jobs = mutableListOf<Job>() repeat(10) { val job = launch(Dispatchers.Default) { repeat(1000) { actor.send(AddMsg) } } jobs.add(job) } jobs.joinAll() val deferred = CompletableDeferred<Int>() actor.send(ResultMsg(deferred)) val result = deferred.await() actor.close() println(result) } }
Log
10000
Process finished with exit code 0
addActor() 掛起函數(shù),它其實調(diào)用了 actor() 這個高階函數(shù)。而這個函數(shù)的返回值類型其實是 SendChannel。由此可見,Kotlin 當(dāng)中的 Actor 其實就是 Channel 的簡單封裝。Actor 的多線程同步能力都源自于 Channel。這里,我們借助密封類定義了兩種消息類型,AddMsg、ResultMsg,然后在 actor{} 內(nèi)部,我們處理這兩種消息類型,如果我們收到了 AddMsg,則計算“i++”;如果收到了 ResultMsg,則返回計算結(jié)果。而在 actor{} 的外部,我們則只需要發(fā)送 10000 次的 AddMsg 消息,最后再發(fā)送一次 ResultMsg,取回計算結(jié)果即可。Actor 本質(zhì)上是基于 Channel 管道消息實現(xiàn)的。
避免共享可變狀態(tài)
runBlocking { val deferreds = mutableListOf<Deferred<Int>>() repeat(10) { val deferred = async(Dispatchers.Default) { var i = 0 repeat(1000) { i++ } return@async i } deferreds.add(deferred) } var result = 0 deferreds.forEach { result += it.await() } println(result) }
Log
10000
Process finished with exit code 0
在每一個協(xié)程當(dāng)中,都有一個局部的變量 i,同時將 launch 都改為了 async,讓每一個協(xié)程都可以返回計算結(jié)果。這種方式,相當(dāng)于將 10000 次計算,平均分配給了 10 個協(xié)程,讓它們各自計算 1000 次。這樣一來,每個協(xié)程都可以進行獨立的計算,然后我們將 10 個協(xié)程的結(jié)果匯總起來,最后累加在一起。
runBlocking { val result = (1..10).map { async(Dispatchers.Default) { var i = 0 repeat(1000) { i++ } return@async i } }.awaitAll() .sum() println(result) }
Log
10000
Process finished with exit code 0
到此這篇關(guān)于Kotlin協(xié)程與并發(fā)深入全面講解的文章就介紹到這了,更多相關(guān)Kotlin協(xié)程與并發(fā)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
postman中POST請求時參數(shù)包含參數(shù)list設(shè)置方式
這篇文章主要介紹了postman中POST請求時參數(shù)包含參數(shù)list設(shè)置方式,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-05-05詳解Spring boot使用Redis集群替換mybatis二級緩存
本篇文章主要介紹了詳解Spring boot使用Redis集群替換mybatis二級緩存,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-05-05Java實戰(zhàn)項目練習(xí)之球館在線預(yù)約系統(tǒng)的實現(xiàn)
理論是遠(yuǎn)遠(yuǎn)不夠的,只有在實戰(zhàn)中才能獲得能力的提升,本篇文章手把手帶你用java+SpringBoot+maven+freemark+Mysql實現(xiàn)一個球館在線預(yù)約系統(tǒng),大家可以在過程中查缺補漏,提升水平2022-01-01SpringBoot中使用JeecgBoot的Autopoi導(dǎo)出Excel的方法步驟
這篇文章主要介紹了SpringBoot中使用JeecgBoot的Autopoi導(dǎo)出Excel的方法步驟,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09javaWeb實現(xiàn)學(xué)生信息管理系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了javaWeb實現(xiàn)學(xué)生信息管理系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-01-01