淺談Java Fork/Join并行框架
初步了解Fork/Join框架
Fork/Join 框架是java7中加入的一個(gè)并行任務(wù)框架,可以將任務(wù)分割成足夠小的小任務(wù),然后讓不同的線程來做這些分割出來的小事情,然后完成之后再進(jìn)行join,將小任務(wù)的結(jié)果組裝成大任務(wù)的結(jié)果。下面的圖片展示了這種框架的工作模型:
使用Fork/Join并行框架的前提是我們的任務(wù)可以拆分成足夠小的任務(wù),而且可以根據(jù)小任務(wù)的結(jié)果來組裝出大任務(wù)的結(jié)果,一個(gè)最簡(jiǎn)單的例子是使用Fork/Join框架來求一個(gè)數(shù)組中的最大/最小值,這個(gè)任務(wù)就可以拆成很多小任務(wù),大任務(wù)就是尋找一個(gè)大數(shù)組中的最大/最小值,我們可以將一個(gè)大數(shù)組拆成很多小數(shù)組,然后分別求解每個(gè)小數(shù)組中的最大/最小值,然后根據(jù)這些任務(wù)的結(jié)果組裝出最后的最大最小值,下面的代碼展示了如何通過Fork/Join求解數(shù)組的最大值:
import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** * Created by hujian06 on 2017/9/28. * * fork/join demo */ public class ForkJoinDemo { /** * how to find the max number in array by Fork/Join */ private static class MaxNumber extends RecursiveTask<Integer> { private int threshold = 2; private int[] array; // the data array private int index0 = 0; private int index1 = 0; public MaxNumber(int[] array, int index0, int index1) { this.array = array; this.index0 = index0; this.index1 = index1; } @Override protected Integer compute() { int max = Integer.MIN_VALUE; if ((index1 - index0) <= threshold) { for (int i = index0;i <= index1; i ++) { max = Math.max(max, array[i]); } } else { //fork/join int mid = index0 + (index1 - index0) / 2; MaxNumber lMax = new MaxNumber(array, index0, mid); MaxNumber rMax = new MaxNumber(array, mid + 1, index1); lMax.fork(); rMax.fork(); int lm = lMax.join(); int rm = rMax.join(); max = Math.max(lm, rm); } return max; } } public static void main(String ... args) throws ExecutionException, InterruptedException, TimeoutException { ForkJoinPool pool = new ForkJoinPool(); int[] array = {100,400,200,90,80,300,600,10,20,-10,30,2000,1000}; MaxNumber task = new MaxNumber(array, 0, array.length - 1); Future<Integer> future = pool.submit(task); System.out.println("Result:" + future.get(1, TimeUnit.SECONDS)); } }
可以通過設(shè)置不同的閾值來拆分成小任務(wù),閾值越小代表拆出來的小任務(wù)越多。
工作竊取算法
Fork/Join在實(shí)現(xiàn)上,大任務(wù)拆分出來的小任務(wù)會(huì)被分發(fā)到不同的隊(duì)列里面,每一個(gè)隊(duì)列都會(huì)用一個(gè)線程來消費(fèi),這是為了獲取任務(wù)時(shí)的多線程競(jìng)爭(zhēng),但是某些線程會(huì)提前消費(fèi)完自己的隊(duì)列。而有些線程沒有及時(shí)消費(fèi)完隊(duì)列,這個(gè)時(shí)候,完成了任務(wù)的線程就會(huì)去竊取那些沒有消費(fèi)完成的線程的任務(wù)隊(duì)列,為了減少線程競(jìng)爭(zhēng),F(xiàn)ork/Join使用雙端隊(duì)列來存取小任務(wù),分配給這個(gè)隊(duì)列的線程會(huì)一直從頭取得一個(gè)任務(wù)然后執(zhí)行,而竊取線程總是從隊(duì)列的尾端拉取task。
Frok/Join框架的實(shí)現(xiàn)細(xì)節(jié)
在上面的示例代碼中,我們發(fā)現(xiàn)Fork/Join的任務(wù)是通過ForkJoinPool來執(zhí)行的,所以框架的一個(gè)核心是任務(wù)的fork和join,然后就是這個(gè)ForkJoinPool。關(guān)于任務(wù)的fork和join,我們可以想象,而且也是由我們的代碼自己控制的,所以要分析Fork/Join,那么ForkJoinPool最值得研究。
上面的圖片展示了ForkJoinPool的類關(guān)系圖,可以看到本質(zhì)上它就是一個(gè)Executor。在ForkJoinPool里面,有兩個(gè)特別重要的成員如下:
volatile WorkQueue[] workQueues; final ForkJoinWorkerThreadFactory factory;
workQueues 用于保存向ForkJoinPool提交的任務(wù),而具體的執(zhí)行有ForkJoinWorkerThread執(zhí)行,而ForkJoinWorkerThreadFactory可以用于生產(chǎn)出ForkJoinWorkerThread??梢钥匆恍〧orkJoinWorkerThread,可以發(fā)現(xiàn)每一個(gè)ForkJoinWorkerThread會(huì)有一個(gè)pool和一個(gè)workQueue,和我們上面描述的是一致的,每個(gè)線程都被分配了一個(gè)任務(wù)隊(duì)列,而執(zhí)行這個(gè)任務(wù)隊(duì)列的線程由pool提供。
下面我們看一下當(dāng)我們fork的時(shí)候發(fā)生了什么:
public final ForkJoinTask<V> fork() { Thread t; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread)t).workQueue.push(this); else ForkJoinPool.common.externalPush(this); return this; }
看上面的fork代碼,可以看到首先取到了當(dāng)前線程,然后判斷是否是我們的ForkJoinPool專用線程,如果是,則強(qiáng)制類型轉(zhuǎn)換(向下轉(zhuǎn)換)成ForkJoinWorkerThread,然后將任務(wù)push到這個(gè)線程負(fù)責(zé)的隊(duì)列里面去。如果當(dāng)前線程不是ForkJoinWorkerThread類型的線程,那么就會(huì)走else之后的邏輯,大概的意思是首先嘗試將任務(wù)提交給當(dāng)前線程,如果不成功,則使用例外的處理方法,關(guān)于底層實(shí)現(xiàn)較為復(fù)雜,和我們使用Fork/Join關(guān)系也不太大,如果希望搞明白具體原理,可以看源碼。
下面看一下join的流程:
public final V join() { int s; if ((s = doJoin() & DONE_MASK) != NORMAL) reportException(s); return getRawResult(); } private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; return (s = status) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue). tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) : externalAwaitDone(); } final int doExec() { int s; boolean completed; if ((s = status) >= 0) { try { completed = exec(); } catch (Throwable rex) { return setExceptionalCompletion(rex); } if (completed) s = setCompletion(NORMAL); } return s; } /** * Implements execution conventions for RecursiveTask. */ protected final boolean exec() { result = compute(); return true; }
上面展示了主要的調(diào)用鏈路,我們發(fā)現(xiàn)最后落到了我們?cè)诖a里編寫的compute方法,也就是執(zhí)行它,所以,我們需要知道的一點(diǎn)是,fork僅僅是分割任務(wù),只有當(dāng)我們執(zhí)行join的時(shí)候,我們的額任務(wù)才會(huì)被執(zhí)行。
如何使用Fork/Join并行框架
前文首先展示了一個(gè)求數(shù)組中最大值得例子,然后介紹了“工作竊取算法”,然后分析了Fork/Join框架的一些細(xì)節(jié),下面才是我們最關(guān)心的,怎么使用Fork/Join框架呢?
為了使用Fork/Join框架,我們只需要繼承類RecursiveTask或者RecursiveAction。前者適用于有返回值的場(chǎng)景,而后者適合于沒有返回值的場(chǎng)景。
以上就是本文的全部內(nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Java中實(shí)現(xiàn)Unicode編碼解碼的方法
在Java編程中,Unicode編碼解碼是一項(xiàng)基本的操作,Unicode是一種用于表示文字字符的標(biāo)準(zhǔn)編碼,它包含了世界上幾乎所有的字符,包括各種語言的字母、符號(hào)和表情符號(hào)等,在Java中通過Unicode編碼,我們可以將任意字符轉(zhuǎn)換為字節(jié)流進(jìn)行傳輸和存儲(chǔ)2024-02-02springboot 返回json格式數(shù)據(jù)時(shí)間格式配置方式
這篇文章主要介紹了springboot 返回json格式數(shù)據(jù)時(shí)間格式配置方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11Servlet第一個(gè)項(xiàng)目的發(fā)布(入門)
這篇文章主要介紹了Servlet第一個(gè)項(xiàng)目的發(fā)布,下面是用servlet實(shí)現(xiàn)的一個(gè)簡(jiǎn)單的web項(xiàng)目,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2021-04-04Java數(shù)據(jù)結(jié)構(gòu)中雙向鏈表的實(shí)現(xiàn)
這篇文章主要介紹了Java數(shù)據(jù)結(jié)構(gòu)中雙向鏈表的實(shí)現(xiàn),雙向鏈表是一種常見的數(shù)據(jù)結(jié)構(gòu),它允許在鏈表中的任意位置進(jìn)行高效的插入和刪除操作,需要的朋友可以參考下2022-05-05Java的Struts框架中<results>標(biāo)簽的使用方法
這篇文章主要介紹了Java的Struts框架中<results>標(biāo)簽的使用方法,Struts框架是Java的SSH三大web開發(fā)框架之一,需要的朋友可以參考下2015-11-11SpringBoot+Jersey跨域文件上傳的實(shí)現(xiàn)示例
在SpringBoot開發(fā)后端服務(wù)時(shí),我們一般是提供接口給前端使用,本文主要介紹了SpringBoot+Jersey跨域文件上傳的實(shí)現(xiàn)示例,具有一定的參考價(jià)值,感興趣的可以了解一下2024-07-07