Java 實(shí)現(xiàn)協(xié)程的方法
協(xié)程(Coroutine)這個(gè)詞其實(shí)有很多叫法,比如有的人喜歡稱為纖程(Fiber),或者綠色線程(GreenThread)。其實(shí)究其本質(zhì),對(duì)于協(xié)程最直觀的解釋是線程的線程。雖然讀上去有點(diǎn)拗口,但本質(zhì)上就是這樣。
協(xié)程的核心在于調(diào)度那塊由他來(lái)負(fù)責(zé)解決,遇到阻塞操作,立刻放棄掉,并且記錄當(dāng)前棧上的數(shù)據(jù),阻塞完后立刻再找一個(gè)線程恢復(fù)棧并把阻塞的結(jié)果放到這個(gè)線程上去跑,這樣看上去好像跟寫同步代碼沒(méi)有任何差別,這整個(gè)流程可以稱為coroutine,而跑在由coroutine負(fù)責(zé)調(diào)度的線程稱為Fiber。
java協(xié)程的實(shí)現(xiàn)
早期,在JVM上實(shí)現(xiàn)協(xié)程一般會(huì)使用kilim,不過(guò)這個(gè)工具已經(jīng)很久不更新了,現(xiàn)在常用的工具是Quasar,而本文章會(huì)全部基于Quasar來(lái)介紹。
下面嘗試通過(guò)Quasar來(lái)實(shí)現(xiàn)類似于go語(yǔ)言的coroutine以及channel。
為了能有明確的對(duì)比,這里先用go語(yǔ)言實(shí)現(xiàn)一個(gè)對(duì)于10以內(nèi)自然數(shù)分別求平方的例子。
func counter(out chan<- int) { for x := 0; x < 10; x++ { out <- x } close(out) } func squarer(out chan<- int, in <-chan int) { for v := range in { out <- v * v } close(out) } func printer(in <-chan int) { for v := range in { fmt.Println(v) } } func main() { //定義兩個(gè)int類型的channel naturals := make(chan int) squares := make(chan int) //產(chǎn)生兩個(gè)Fiber,用go關(guān)鍵字 go counter(naturals) go squarer(squares, naturals) //獲取計(jì)算結(jié)果 printer(squares) }
上面這個(gè)例子,通過(guò)channel兩解耦兩邊的數(shù)據(jù)共享。對(duì)于這個(gè)channel,大家可以理解為Java里的SynchronousQueue。下面我直接上Quasar版JAVA代碼的,幾乎可以原封不動(dòng)的復(fù)制go語(yǔ)言的代碼。
public class Example { private static void printer(Channel<Integer> in) throws SuspendExecution, InterruptedException { Integer v; while ((v = in.receive()) != null) { System.out.println(v); } } public static void main(String[] args) throws ExecutionException, InterruptedException, SuspendExecution { //定義兩個(gè)Channel Channel<Integer> naturals = Channels.newChannel(-1); Channel<Integer> squares = Channels.newChannel(-1); //運(yùn)行兩個(gè)Fiber實(shí)現(xiàn). new Fiber(() -> { for (int i = 0; i < 10; i++) naturals.send(i); naturals.close(); }).start(); new Fiber(() -> { Integer v; while ((v = naturals.receive()) != null) squares.send(v * v); squares.close(); }).start(); printer(squares); } }
兩者對(duì)比,看上去Java似好像更復(fù)雜些,沒(méi)辦法這就是Java的風(fēng)格,而且這還是通過(guò)第三方的庫(kù)來(lái)實(shí)現(xiàn)的。
說(shuō)到這里各位肯定對(duì)Fiber很好奇了。也許你會(huì)表示懷疑Fiber是不是如上面所描述的那樣,下面我們嘗試用Quasar建立一百萬(wàn)個(gè)Fiber,看看內(nèi)存占用多少,我先嘗試了創(chuàng)建百萬(wàn)個(gè)Thread。
for (int i = 0; i < 1_000_000; i++) { new Thread(() -> { try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); }
很不幸,直接報(bào)Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread,這是情理之中的。下面是通過(guò)Quasar建立百萬(wàn)個(gè)Fiber。
public static void main(String[] args) throws ExecutionException, InterruptedException, SuspendExecution { int FiberNumber = 1_000_000; CountDownLatch latch = new CountDownLatch(1); AtomicInteger counter = new AtomicInteger(0); for (int i = 0; i < FiberNumber; i++) { new Fiber(() -> { counter.incrementAndGet(); if (counter.get() == FiberNumber) { System.out.println("done"); } Strand.sleep(1000000); }).start(); } latch.await(); }
我這里加了latch,阻止程序跑完就關(guān)閉,Strand.sleep其實(shí)跟Thread.sleep一樣,只是這里針對(duì)的是Fiber。
最終控制臺(tái)是可以輸出done的,說(shuō)明程序已經(jīng)創(chuàng)建了百萬(wàn)個(gè)Fiber,設(shè)置Sleep是為了讓Fiber一直運(yùn)行,從而方便計(jì)算內(nèi)存占用。官方宣稱一個(gè)空閑的Fiber大約占用400Byte,那這里應(yīng)該是占用400MB堆內(nèi)存,但是這里通過(guò)jmap -heap pid顯示大約占用了1000MB,也就是說(shuō)一個(gè)Fiber占用1KB。
Quasar是怎么實(shí)現(xiàn)Fiber的
其實(shí)Quasar實(shí)現(xiàn)的coroutine的方式與Go語(yǔ)言很像,只不過(guò)前者是使用框架來(lái)實(shí)現(xiàn),而go語(yǔ)言則是語(yǔ)言內(nèi)置的功能。
不過(guò)如果你熟悉了Go語(yǔ)言的調(diào)度機(jī)制的話,那么對(duì)于Quasar的調(diào)度機(jī)制就會(huì)好理解很多了,因?yàn)閮烧哂泻芏嘞嗨浦帯?/p>
Quasar里的Fiber其實(shí)是一個(gè)continuation,他可以被Quasar定義的scheduler調(diào)度,一個(gè)continuation記錄著運(yùn)行實(shí)例的狀態(tài),而且會(huì)被隨時(shí)中斷,并且也會(huì)隨后在他被中斷的地方恢復(fù)。
Quasar其實(shí)是通過(guò)修改bytecode來(lái)達(dá)到這個(gè)目的,所以運(yùn)行Quasar程序的時(shí)候,你需要先通過(guò)java-agent在運(yùn)行時(shí)修改你的代碼,當(dāng)然也可以在編譯期間這么干。go語(yǔ)言的內(nèi)置了自己的調(diào)度器,而Quasar則是默認(rèn)使用ForkJoinPool這個(gè)具有work-stealing功能的線程池來(lái)當(dāng)調(diào)度器。work-stealing非常重要,因?yàn)槟悴磺宄膫€(gè)Fiber會(huì)先執(zhí)行完,而work-stealing可以動(dòng)態(tài)的從其他的等等隊(duì)列偷一個(gè)context過(guò)來(lái),這樣可以最大化使用CPU資源。
那這里你會(huì)問(wèn)了,Quasar怎么知道修改哪些字節(jié)碼呢,其實(shí)也很簡(jiǎn)單,Quasar會(huì)通過(guò)java-agent在運(yùn)行時(shí)掃描哪些方法是可以中斷的,同時(shí)會(huì)在方法被調(diào)用前和調(diào)度后的方法內(nèi)插入一些continuation邏輯,如果你在方法上定義了@Suspendable注解,那Quasar會(huì)對(duì)調(diào)用該注解的方法做類似下面的事情。
這里假設(shè)你在方法f上定義了@Suspendable,同時(shí)去調(diào)用了有同樣注解的方法g,那么所有調(diào)用f的方法會(huì)插入一些字節(jié)碼,這些字節(jié)碼的邏輯就是記錄當(dāng)前Fiber棧上的狀態(tài),以便在未來(lái)可以動(dòng)態(tài)的恢復(fù)。(Fiber類似線程也有自己的棧)。在suspendable方法鏈內(nèi)Fiber的父類會(huì)調(diào)用Fiber.park,這樣會(huì)拋出SuspendExecution異常,從而來(lái)停止線程的運(yùn)行,好讓Quasar的調(diào)度器執(zhí)行調(diào)度。這里的SuspendExecution會(huì)被Fiber自己捕獲,業(yè)務(wù)層面上不應(yīng)該捕獲到。如果Fiber被喚醒了(調(diào)度器層面會(huì)去調(diào)用Fiber.unpark),那么f會(huì)在被中斷的地方重新被調(diào)用(這里Fiber會(huì)知道自己在哪里被中斷),同時(shí)會(huì)把g的調(diào)用結(jié)果(g會(huì)return結(jié)果)插入到f的恢復(fù)點(diǎn),這樣看上去就好像g的return是f的local variables了,從而避免了callback嵌套。
上面說(shuō)了一大堆,其實(shí)簡(jiǎn)單點(diǎn)來(lái)講就是,想辦法讓運(yùn)行中的線程棧停下來(lái),然后讓Quasar的調(diào)度器介入。
JVM線程中斷的條件有兩個(gè):
1、拋異常
2、return。
而在Quasar中,一般就是通過(guò)拋異常的方式來(lái)達(dá)到的,所以你會(huì)看到上面的代碼會(huì)拋出SuspendExecution。但是如果你真捕獲到這個(gè)異常,那就說(shuō)明有問(wèn)題了,所以一般會(huì)這么寫。
@Suspendable public int f() { try { // do some stuff return g() * 2; } catch(SuspendExecution s) { //這里不應(yīng)該捕獲到異常. throw new AssertionError(s); } }
以上就是Java 實(shí)現(xiàn)協(xié)程的方法的詳細(xì)內(nèi)容,更多關(guān)于Java 實(shí)現(xiàn)協(xié)程的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Spring Boot集成Sorl搜索客戶端的實(shí)現(xiàn)代碼
本篇文章主要介紹了Spring Boot集成Sorl搜索客戶端的實(shí)現(xiàn)代碼,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-11-11關(guān)于Https協(xié)議和HttpClient的實(shí)現(xiàn)詳解
這篇文章主要給大家介紹了關(guān)于Https協(xié)議和HttpClient實(shí)現(xiàn)的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2018-05-05Java后端長(zhǎng)時(shí)間無(wú)操作自動(dòng)退出的實(shí)現(xiàn)方式
這篇文章主要介紹了Java后端長(zhǎng)時(shí)間無(wú)操作自動(dòng)退出的實(shí)現(xiàn)方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-01-01SpringCloud配置中心Config過(guò)程解析
這篇文章主要介紹了SpringCloud配置中心Config過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-03-03詳解Springboot之Logback的使用學(xué)習(xí)
Logback是SpringBoot內(nèi)置的日志處理框架,你會(huì)發(fā)現(xiàn)spring-boot-starter其中包含了spring-boot-starter-logging,該依賴內(nèi)容就是Spring Boot默認(rèn)的日志框架logback,本文詳細(xì)介紹了該框架 ,需要的朋友可以參考下2021-05-05IntelliJ IDEA中出現(xiàn)"PSI and index do not match"錯(cuò)誤的解決辦法
今天小編就為大家分享一篇關(guān)于IntelliJ IDEA中出現(xiàn)"PSI and index do not match"錯(cuò)誤的解決辦法,小編覺(jué)得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧2018-10-10Java實(shí)現(xiàn)多線程中的靜態(tài)代理模式
靜態(tài)代理屬于設(shè)計(jì)模式中的代理模式。這篇文章主要介紹了Java實(shí)現(xiàn)多線程中的靜態(tài)代理模式,詳細(xì)的介紹了靜態(tài)代理的使用,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-05-05關(guān)于Java中增強(qiáng)for循環(huán)使用的注意事項(xiàng)
for循環(huán)語(yǔ)句是java循環(huán)語(yǔ)句中最常用的循環(huán)語(yǔ)句,一般用在循環(huán)次數(shù)已知的情況下使用,這篇文章主要給大家介紹了關(guān)于Java中增強(qiáng)for循環(huán)使用的注意事項(xiàng),需要的朋友可以參考下2021-06-06SpringBoot集成Hadoop對(duì)HDFS的文件操作方法
這篇文章主要介紹了SpringBoot集成Hadoop對(duì)HDFS的文件操作方法,本文給大家介紹的非常詳細(xì),感興趣的朋友跟隨小編一起看看吧2024-07-07