Kotlin協(xié)程與并發(fā)深入全面講解
協(xié)程與并發(fā)
Kotlin協(xié)程是基于線程執(zhí)行的。經(jīng)過(guò)一層封裝以后,Kotlin協(xié)程面對(duì)并發(fā),處理方式與Java不同。
在java的世界里,并發(fā)往往是多個(gè)線程一起工作,存在共享的變量。需要處理好同步問(wèn)題。要避免把協(xié)程與線程的概念混淆。
runBlocking { var i = 0 launch(Dispatchers.Default) { repeat(1000) { i++ } } delay(1000L) println(i) }
Log
1000
Process finished with exit code 0
上述代碼中沒(méi)有任何并發(fā)任務(wù),launch創(chuàng)建了一個(gè)協(xié)程,所有的計(jì)算都發(fā)生在協(xié)程中。所以不需要考慮同步問(wèn)題。
1.協(xié)程并發(fā)問(wèn)題
多個(gè)協(xié)程并發(fā)執(zhí)行的例子:
runBlocking { var i = 0 val jobs = mutableListOf<Job>() repeat(10) { val job = launch(Dispatchers.Default) { repeat(1000) { i++ } } jobs.add(job) } jobs.joinAll() println(i) }
9933
Process finished with exit code 0
上述代碼中,創(chuàng)建了10個(gè)協(xié)程任務(wù),每個(gè)協(xié)程任務(wù)都會(huì)工作在Default線程池中,這10個(gè)協(xié)程任務(wù)對(duì)i進(jìn)行1000次自增操作,但是因?yàn)?0個(gè)協(xié)程分別運(yùn)行在不同的線程之前,且共享一個(gè)變量,所以會(huì)產(chǎn)生同步問(wèn)題。
2.協(xié)程處理并發(fā)的手段
在Java中的同步手段有:synchronized、Atomic、Lock等;
使用@Synchronized注解或者synchronized(){}代碼塊
runBlocking { var i = 0 val lock = Any() val jobs = mutableListOf<Job>() repeat(10) { val job = launch(Dispatchers.Default) { repeat(1000) { synchronized(lock) { i++ } } } jobs.add(job) } jobs.joinAll() println(i) }
10000
Process finished with exit code 0
如何在上面的synchronized代碼塊中加入掛起函數(shù),則發(fā)現(xiàn)會(huì)報(bào)錯(cuò)。
如下:
runBlocking { suspend fun prepare() { } var i = 0 val lock = Any() val jobs = mutableListOf<Job>() repeat(10) { val job = launch(Dispatchers.Default) { repeat(1000) { synchronized(lock) { prepare() i++ } } } jobs.add(job) } jobs.joinAll() println(i) }
所以可以發(fā)現(xiàn)不能在synchronized{}當(dāng)中調(diào)用掛起函數(shù),編譯器會(huì)報(bào)錯(cuò)。因?yàn)閽炱鸷瘮?shù)會(huì)被翻譯成帶有Continuation的異步函數(shù),造成synchronized代碼塊無(wú)法同步處理。
協(xié)程并發(fā)思路
單線程并發(fā)
在Kotlin協(xié)程中可以實(shí)現(xiàn)單線程并發(fā)。
runBlocking { suspend fun getResult1(): String { printlnCoroutine("Start getResult1") delay(1000L) printlnCoroutine("End getResult1") return "Result1" } suspend fun getResult2(): String { printlnCoroutine("Start getResult2") delay(1000L) printlnCoroutine("End getResult2") return "Result2" } suspend fun getResult3(): String { printlnCoroutine("Start getResult3") delay(1000L) printlnCoroutine("End getResult3") return "Result3" } val results = mutableListOf<String>() val time = measureTimeMillis { val result1 = async { getResult1() } val result2 = async { getResult2() } val result3 = async { getResult3() } results.add(result1.await()) results.add(result2.await()) results.add(result3.await()) } println("Time:$time") println(results) } fun printlnCoroutine(any: Any?) { println("" + any + ";Thread:" + Thread.currentThread().name) }
Log
Start getResult1;Thread:main @coroutine#2
Start getResult2;Thread:main @coroutine#3
Start getResult3;Thread:main @coroutine#4
End getResult1;Thread:main @coroutine#2
End getResult2;Thread:main @coroutine#3
End getResult3;Thread:main @coroutine#4
Time:1028
[Result1, Result2, Result3]
Process finished with exit code 0
上面代碼啟動(dòng)了三個(gè)協(xié)程,它們之間是并發(fā)執(zhí)行的,每個(gè)協(xié)程耗時(shí)1000ms,總耗時(shí)1000多毫秒,而且這幾個(gè)協(xié)程都運(yùn)行在main線程上。
所以 可以考慮將i++邏輯分發(fā)到單線程之上。
runBlocking { val coroutineDispatcher = Executors.newSingleThreadExecutor { Thread(it, "MySingleThread").apply { isDaemon = true } }.asCoroutineDispatcher() var i = 0 val jobs = mutableListOf<Job>() repeat(10) { val job = launch(coroutineDispatcher) { repeat(1000) { i++ } } jobs.add(job) } jobs.joinAll() println(i) }
10000
Process finished with exit code 0
上述代碼把所有協(xié)程任務(wù)分發(fā)到單獨(dú)的線程中執(zhí)行,但這10個(gè)協(xié)程是并發(fā)執(zhí)行的。
Mutex
在java中,Lock之類的同步鎖是阻塞式的,而Kotlin提供了非阻塞式的鎖:Mutex。
runBlocking { val mutex = Mutex() var i = 0 val jobs = mutableListOf<Job>() repeat(10) { val job = launch(Dispatchers.Default) { repeat(1000) { mutex.lock() i++ mutex.unlock() } } jobs.add(job) } jobs.joinAll() println(i) }
Log
10000
Process finished with exit code 0
上述代碼使用mutex.lock()、 mutex.unlock()包裹同步計(jì)算邏輯,實(shí)現(xiàn)多線程同步。Mutex 對(duì)比 JDK 當(dāng)中的鎖,最大的優(yōu)勢(shì)就在于支持掛起和恢復(fù)。
public interface Mutex { public val isLocked: Boolean public fun tryLock(owner: Any? = null): Boolean public suspend fun lock(owner: Any? = null; @Deprecated(level = DeprecationLevel.WARNING, message = "Mutex.onLock deprecated without replacement. " + "For additional details please refer to #2794") // WARNING since 1.6.0 public val onLock: SelectClause2<Any?, Mutex> public fun holdsLock(owner: Any): Boolean public fun unlock(owner: Any? = null) }
Mutex 是一個(gè)接口,它的 lock() 方法其實(shí)是一個(gè)掛起函數(shù)。而這就是實(shí)現(xiàn)非阻塞式同步鎖的根本原因。
但是上述代碼中對(duì)于 Mutex 的使用其實(shí)是錯(cuò)誤的,會(huì)存在問(wèn)題。如果代碼在 mutex.lock()、mutex.unlock() 之間發(fā)生異常,從而導(dǎo)致 mutex.unlock() 無(wú)法被調(diào)用。這個(gè)時(shí)候,整個(gè)程序的執(zhí)行流程就會(huì)一直卡住,無(wú)法結(jié)束??聪旅娲a:
runBlocking { val mutex = Mutex() var i = 0 val jobs = mutableListOf<Job>() repeat(10) { val job = launch(Dispatchers.Default) { repeat(1000) { mutex.lock() i++ i/0 mutex.unlock() } } jobs.add(job) } jobs.joinAll() println(i) }
如何解決?使用mutex.withLock{}。
代碼入下:
runBlocking { val mutex = Mutex() var i = 0 val jobs = mutableListOf<Job>() repeat(10) { val job = launch(Dispatchers.Default) { repeat(1000) { mutex.withLock { i++ } } } jobs.add(job) } jobs.joinAll() println(i) }
10000
Process finished with exit code 0
public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T { contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) } lock(owner) try { return action() } finally { unlock(owner) } }
withLock{} 的本質(zhì),其實(shí)是在 finally{} 當(dāng)中調(diào)用了 unlock()。
Actor
Actor,它本質(zhì)上是基于 Channel 管道消息實(shí)現(xiàn)的。
sealed class Msg object AddMsg : Msg() class ResultMsg(val result: CompletableDeferred<Int>) : Msg() fun testCoroutinueConcurrent10() { runBlocking { suspend fun addActor() = actor<Msg> { var counter = 0 for (msg in channel) { when (msg) { is AddMsg -> counter++ is ResultMsg -> msg.result.complete(counter) } } } val actor = addActor() val jobs = mutableListOf<Job>() repeat(10) { val job = launch(Dispatchers.Default) { repeat(1000) { actor.send(AddMsg) } } jobs.add(job) } jobs.joinAll() val deferred = CompletableDeferred<Int>() actor.send(ResultMsg(deferred)) val result = deferred.await() actor.close() println(result) } }
Log
10000
Process finished with exit code 0
addActor() 掛起函數(shù),它其實(shí)調(diào)用了 actor() 這個(gè)高階函數(shù)。而這個(gè)函數(shù)的返回值類型其實(shí)是 SendChannel。由此可見(jiàn),Kotlin 當(dāng)中的 Actor 其實(shí)就是 Channel 的簡(jiǎn)單封裝。Actor 的多線程同步能力都源自于 Channel。這里,我們借助密封類定義了兩種消息類型,AddMsg、ResultMsg,然后在 actor{} 內(nèi)部,我們處理這兩種消息類型,如果我們收到了 AddMsg,則計(jì)算“i++”;如果收到了 ResultMsg,則返回計(jì)算結(jié)果。而在 actor{} 的外部,我們則只需要發(fā)送 10000 次的 AddMsg 消息,最后再發(fā)送一次 ResultMsg,取回計(jì)算結(jié)果即可。Actor 本質(zhì)上是基于 Channel 管道消息實(shí)現(xiàn)的。
避免共享可變狀態(tài)
runBlocking { val deferreds = mutableListOf<Deferred<Int>>() repeat(10) { val deferred = async(Dispatchers.Default) { var i = 0 repeat(1000) { i++ } return@async i } deferreds.add(deferred) } var result = 0 deferreds.forEach { result += it.await() } println(result) }
Log
10000
Process finished with exit code 0
在每一個(gè)協(xié)程當(dāng)中,都有一個(gè)局部的變量 i,同時(shí)將 launch 都改為了 async,讓每一個(gè)協(xié)程都可以返回計(jì)算結(jié)果。這種方式,相當(dāng)于將 10000 次計(jì)算,平均分配給了 10 個(gè)協(xié)程,讓它們各自計(jì)算 1000 次。這樣一來(lái),每個(gè)協(xié)程都可以進(jìn)行獨(dú)立的計(jì)算,然后我們將 10 個(gè)協(xié)程的結(jié)果匯總起來(lái),最后累加在一起。
runBlocking { val result = (1..10).map { async(Dispatchers.Default) { var i = 0 repeat(1000) { i++ } return@async i } }.awaitAll() .sum() println(result) }
Log
10000
Process finished with exit code 0
到此這篇關(guān)于Kotlin協(xié)程與并發(fā)深入全面講解的文章就介紹到這了,更多相關(guān)Kotlin協(xié)程與并發(fā)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
postman中POST請(qǐng)求時(shí)參數(shù)包含參數(shù)list設(shè)置方式
這篇文章主要介紹了postman中POST請(qǐng)求時(shí)參數(shù)包含參數(shù)list設(shè)置方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-05-05詳解Spring boot使用Redis集群替換mybatis二級(jí)緩存
本篇文章主要介紹了詳解Spring boot使用Redis集群替換mybatis二級(jí)緩存,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-05-05Java實(shí)戰(zhàn)項(xiàng)目練習(xí)之球館在線預(yù)約系統(tǒng)的實(shí)現(xiàn)
理論是遠(yuǎn)遠(yuǎn)不夠的,只有在實(shí)戰(zhàn)中才能獲得能力的提升,本篇文章手把手帶你用java+SpringBoot+maven+freemark+Mysql實(shí)現(xiàn)一個(gè)球館在線預(yù)約系統(tǒng),大家可以在過(guò)程中查缺補(bǔ)漏,提升水平2022-01-01SpringBoot中使用JeecgBoot的Autopoi導(dǎo)出Excel的方法步驟
這篇文章主要介紹了SpringBoot中使用JeecgBoot的Autopoi導(dǎo)出Excel的方法步驟,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09javaWeb實(shí)現(xiàn)學(xué)生信息管理系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了javaWeb實(shí)現(xiàn)學(xué)生信息管理系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-01-01