java基于quasar實現(xiàn)協(xié)程池的方法示例
業(yè)務(wù)場景:golang與swoole都擁抱了協(xié)程,在同任務(wù)并發(fā)數(shù)量下,協(xié)程可比線程多幾倍。所以最近在查詢java時了解java本身是沒有協(xié)程的,但是某牛自行實現(xiàn)了協(xié)程,也就是本文的主角quasar(纖程)!在csdn中基本都是對它的基本使用,用法和線程差不多。不過沒看到誰公開一下手寫協(xié)程池的騷操作(誰會直接new它用?那是沒挨過社會的毒打呀~)
一個線程可以多個協(xié)程,一個進程也可以單獨擁有多個協(xié)程。
線程進程都是同步機制,而協(xié)程則是異步。
協(xié)程能保留上一次調(diào)用時的狀態(tài),每次過程重入時,就相當于進入上一次調(diào)用的狀態(tài)。
線程是搶占式,而協(xié)程是非搶占式的,所以需要用戶自己釋放使用權(quán)來切換到其他協(xié)程,因此同一時間其實只有一個協(xié)程擁有運行權(quán),相當于單線程的能力。
協(xié)程并不是取代線程, 而且抽象于線程之上, 線程是被分割的CPU資源, 協(xié)程是組織好的代碼流程, 協(xié)程需要線程來承載運行, 線程是協(xié)程的資源, 但協(xié)程不會直接使用線程, 協(xié)程直接利用的是執(zhí)行器(Interceptor), 執(zhí)行器可以關(guān)聯(lián)任意線程或線程池, 可以使當前線程, UI線程, 或新建新程.。
線程是協(xié)程的資源。協(xié)程通過Interceptor來間接使用線程這個資源。
廢話不多說,直接上代碼:
導入包:
<dependency> <groupId>co.paralleluniverse</groupId> <artifactId>quasar-core</artifactId> <version>0.7.9</version> <classifier>jdk8</classifier> </dependency>
WorkTools工具類:
package com.example.ai; ? import co.paralleluniverse.fibers.Fiber; import co.paralleluniverse.fibers.SuspendExecution; import co.paralleluniverse.strands.SuspendableRunnable; ? import java.util.concurrent.ArrayBlockingQueue; ? ? public class WorkTools { ? ? //協(xié)程池中默認協(xié)程的個數(shù)為5 ? ? private static int WORK_NUM = 5; ? ? //隊列默認任務(wù)為100 ? ? private static int TASK_COUNT = 100; ? ? ? //工做協(xié)程數(shù)組 ? ? private Fiber[] workThreads; ? ? //等待隊列 ? ? private final ArrayBlockingQueue<SuspendableRunnable> taskQueue; ? ? ? //用戶在構(gòu)造這個協(xié)程池時,但愿啟動的協(xié)程數(shù) ? ? private final int workerNum; ? ? ? ? //構(gòu)造方法:建立具備默認協(xié)程個數(shù)的協(xié)程池 ? ? public WorkTools() { ? ? ? ? this(WORK_NUM,TASK_COUNT); ? ? } ? ? ? //建立協(xié)程池,workNum為協(xié)程池中工做協(xié)程的個數(shù) ? ? public WorkTools(int workerNum, int taskCount) { ? ? ? ? if (workerNum <= 0) { ? ? ? ? ? ? workerNum = WORK_NUM; ? ? ? ? } ? ? ? ? if (taskCount <= 0) { ? ? ? ? ? ? taskCount = TASK_COUNT; ? ? ? ? } ? ? ? ? this.workerNum = workerNum; ? ? ? ? taskQueue = new ArrayBlockingQueue(taskCount); ? ? ? ? workThreads = new Fiber[workerNum]; ? ? ? ? for (int i = 0; i < workerNum; i++) { ? ? ? ? ? ? int finalI = i; ? ? ? ? ? ? workThreads[i] = new Fiber<>(new SuspendableRunnable() { ? ? ? ? ? ? ? ? @Override ? ? ? ? ? ? ? ? public void run() throws SuspendExecution, InterruptedException { ? ? ? ? ? ? ? ? ? ? SuspendableRunnable runnable = null; ? ? ? ? ? ? ? ? ? ? while (true){ ? ? ? ? ? ? ? ? ? ? ? ? try{ ? ? ? ? ? ? ? ? ? ? ? ? ? ? //取任務(wù),沒有則阻塞。 ? ? ? ? ? ? ? ? ? ? ? ? ? ? runnable = taskQueue.take(); ? ? ? ? ? ? ? ? ? ? ? ? }catch (Exception e){ ? ? ? ? ? ? ? ? ? ? ? ? ? ? System.out.println(e.getMessage()); ? ? ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? ? ? //存在任務(wù)則運行。 ? ? ? ? ? ? ? ? ? ? ? ? if(runnable != null){ ? ? ? ? ? ? ? ? ? ? ? ? ? ? runnable.run(); ? ? ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? ? ? ? runnable = null; ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? } ? ? ? ? ? ? }); ?//new一個工做協(xié)程 ? ? ? ? ? ? ? workThreads[i].start(); ?//啟動工做協(xié)程 ? ? ? ? ? } ? ? ? ? ? Runtime.getRuntime().availableProcessors(); ? ? } ? ? //執(zhí)行任務(wù),其實就是把任務(wù)加入任務(wù)隊列,何時執(zhí)行由協(xié)程池管理器決定 ? ? public void execute(SuspendableRunnable task) { ? ? ? ? try { ? ? ? ? ? ? taskQueue.put(task); ? //put:阻塞接口的插入 ? ? ? ? } catch (Exception e) { ? ? ? ? ? ? // TODO: handle exception ? ? ? ? ? ? System.out.println("阻塞"); ? ? ? ? } ? ? } ? ? //銷毀協(xié)程池,該方法保證全部任務(wù)都完成的狀況下才銷毀全部協(xié)程,不然等待任務(wù)完成再銷毀 ? ? public void destory() { ? ? ? ? //工做協(xié)程中止工做,且置為null ? ? ? ? System.out.println("ready close thread..."); ? ? ? ? for (int i = 0; i < workerNum; i++) { ? ? ? ? ? ? ? workThreads[i] = null; //help gc ? ? ? ? } ? ? ? ? taskQueue.clear(); ?//清空等待隊列 ? ? } ? ? //覆蓋toString方法,返回協(xié)程信息:工做協(xié)程個數(shù)和已完成任務(wù)個數(shù) ? ? @Override ? ? public String toString() { ? ? ? ? return "WorkThread number:" + workerNum + " ==分割線== wait task number:" + taskQueue.size(); ? ? } }
測試代碼:
package com.example.ai; import co.paralleluniverse.strands.SuspendableRunnable; import lombok.SneakyThrows; import org.springframework.boot.autoconfigure.SpringBootApplication; import java.util.concurrent.CountDownLatch; @SpringBootApplication public class AiApplication { @SneakyThrows public static void main(String[] args) { //等待協(xié)程任務(wù)完畢后再結(jié)束主線程 CountDownLatch cdl = new CountDownLatch(50); //開啟5個協(xié)程,50個任務(wù)列隊。 WorkTools myThreadPool = new WorkTools(5, 50); for (int i = 0; i< 50; i++){ int finalI = i; myThreadPool.execute(new SuspendableRunnable() { @Override public void run() { System.out.println(finalI); try { //延遲1秒 Thread.sleep(1000); cdl.countDown(); } catch (InterruptedException e) { System.out.println("阻塞中"); } } }); } //阻塞 cdl.await(); } }
具體代碼都有注釋了,自行了解。我也是以線程池寫法實現(xiàn)。
當前為解決問題:在協(xié)程阻塞過程中Fiber類會報阻塞警告,滿臉懵逼啊,看著很討厭。暫時沒有辦法處理,看各位大神誰有招下方評論提供給下思路。萬分感謝~
到此這篇關(guān)于java基于quasar實現(xiàn)協(xié)程池的方法示例的文章就介紹到這了,更多相關(guān)java quasar協(xié)程池內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
如何使用Java統(tǒng)計gitlab代碼行數(shù)
這篇文章主要介紹了如何使用Java統(tǒng)計gitlab代碼行數(shù),實現(xiàn)方式通過git腳本將所有的項目拉下來并然后通過進行代碼行數(shù)的統(tǒng)計,需要的朋友可以參考下2023-10-10通過weblogic API解析如何獲取weblogic中服務(wù)的IP和端口操作
這篇文章主要介紹了通過weblogic API解析如何獲取weblogic中服務(wù)的IP和端口操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-06-06Java實現(xiàn)過濾掉map集合中key或value為空的值示例
這篇文章主要介紹了Java實現(xiàn)過濾掉map集合中key或value為空的值,涉及java針對map的簡單遍歷、判斷、移除等相關(guān)操作技巧,需要的朋友可以參考下2018-06-06Spring集成MyBatis和PageHelper分頁插件整合過程詳解
Spring?整合?MyBatis?是將?MyBatis?數(shù)據(jù)訪問框架與?Spring?框架進行集成,以實現(xiàn)更便捷的開發(fā)和管理,在集成過程中,Spring?提供了許多特性和功能,如依賴注入、聲明式事務(wù)管理、AOP?等,這篇文章主要介紹了Spring集成MyBatis和PageHelper分頁插件整合,需要的朋友可以參考下2023-08-08解決Request.getParameter獲取不到特殊字符bug問題
這篇文章主要介紹了解決Request.getParameter獲取不到特殊字符bug問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-07-07淺析Java迭代器Iterator和Iterable的區(qū)別
Java語言中,Iterator和Iterable都是用來遍歷集合類數(shù)據(jù)結(jié)構(gòu)的接口,雖然它們有很多相似的地方,但在具體實現(xiàn)中卻有著一些不同之處,本文將詳細分析它們的區(qū)別,并提供相應(yīng)的代碼示例,需要的朋友可以參考下2023-07-07