Java并發(fā)中的Fork/Join 框架機(jī)制詳解
什么是 Fork/Join 框架
Fork/Join 框架是一種在 JDk 7 引入的線程池,用于并行執(zhí)行把一個大任務(wù)拆成多個小任務(wù)并行執(zhí)行,最終匯總每個小任務(wù)結(jié)果得到大任務(wù)結(jié)果的特殊任務(wù)。通過其命名也很容易看出框架主要分為 Fork 和 Join 兩個階段,第一階段 Fork 是把一個大任務(wù)拆分為多個子任務(wù)并行的執(zhí)行,第二階段 Join 是合并這些子任務(wù)的所有執(zhí)行結(jié)果,最后得到大任務(wù)的結(jié)果。
這里不難發(fā)現(xiàn)其執(zhí)行主要流程:首先判斷一個任務(wù)是否足夠小,如果任務(wù)足夠小,則直接計算,否則,就拆分成幾個更小的小任務(wù)分別計算,這個過程可以反復(fù)的拆分成一系列小任務(wù)。Fork/Join 框架是一種基于 分治 的算法,通過拆分大任務(wù)成多個獨(dú)立的小任務(wù),然后并行執(zhí)行這些小任務(wù),最后合并小任務(wù)的結(jié)果得到大任務(wù)的最終結(jié)果,通過并行計算以提高效率。。
Fork/Join 框架使用示例
下面通過一個計算列表中所有元素的總和的示例來看看 Fork/Join 框架是如何使用的,總的思路是:將這個列表分成許多子列表,然后對每個子列表的元素進(jìn)行求和,然后,我們再計算所有這些值的總和就得到原始列表的和了。Fork/Join 框架中定義了 ForkJoinTask 來表示一個 Fork/Join 任務(wù),其提供了 fork()、join() 等操作,通常情況下,我們并不需要直接繼承這個 ForkJoinTask 類,而是使用框架提供的兩個 ForkJoinTask 的子類:
- RecursiveAction 用于表示沒有返回結(jié)果的 Fork/Join 任務(wù)。
- RecursiveTask 用于表示有返回結(jié)果的 Fork/Join 任務(wù)。
很顯然,在這個示例中是需要返回結(jié)果的,可以定義 SumAction 類繼承自 RecursiveTask,代碼入下:
/** * @author mghio * @since 2021-07-25 */ public class SumTask extends RecursiveTask<Long> { private static final int SEQUENTIAL_THRESHOLD = 50; private final List<Long> data; public SumTask(List<Long> data) { this.data = data; } @Override protected Long compute() { if (data.size() <= SEQUENTIAL_THRESHOLD) { long sum = computeSumDirectly(); System.out.format("Sum of %s: %d\n", data.toString(), sum); return sum; } else { int mid = data.size() / 2; SumTask firstSubtask = new SumTask(data.subList(0, mid)); SumTask secondSubtask = new SumTask(data.subList(mid, data.size())); // 執(zhí)行子任務(wù) firstSubtask.fork(); secondSubtask.fork(); // 等待子任務(wù)執(zhí)行完成,并獲取結(jié)果 long firstSubTaskResult = firstSubtask.join(); long secondSubTaskResult = secondSubtask.join(); return firstSubTaskResult + secondSubTaskResult; } } private long computeSumDirectly() { long sum = 0; for (Long l : data) { sum += l; } return sum; } public static void main(String[] args) { Random random = new Random(); List<Long> data = random .longs(1_000, 1, 100) .boxed() .collect(Collectors.toList()); ForkJoinPool pool = new ForkJoinPool(); SumTask task = new SumTask(data); pool.invoke(task); System.out.println("Sum: " + pool.invoke(task)); } }
這里當(dāng)列表大小小于 SEQUENTIAL_THRESHOLD 變量的值(閾值)時視為小任務(wù),直接計算求和列表元素結(jié)果,否則再次拆分為小任務(wù),運(yùn)行結(jié)果如下:
通過這個示例代碼可以發(fā)現(xiàn),F(xiàn)ork/Join 框架 中 ForkJoinTask 任務(wù)與平常的一般任務(wù)的主要不同點在于:ForkJoinTask 需要實現(xiàn)抽象方法 compute() 來定義計算邏輯,在這個方法里一般通用的實現(xiàn)模板是,首先先判斷當(dāng)前任務(wù)是否是小任務(wù),如果是,就執(zhí)行執(zhí)行任務(wù),如果不是小任務(wù),則再次拆分為兩個子任務(wù),然后當(dāng)每個子任務(wù)調(diào)用 fork() 方法時,會再次進(jìn)入到 compute() 方法中,檢查當(dāng)前任務(wù)是否需要再拆分為子任務(wù),如果已經(jīng)是小任務(wù),則執(zhí)行當(dāng)前任務(wù)并返回結(jié)果,否則繼續(xù)分割,最后調(diào)用 join() 方法等待所有子任務(wù)執(zhí)行完成并獲得執(zhí)行結(jié)果。偽代碼如下:
if (problem is small) { directly solve problem. } else { Step 1. split problem into independent parts. Step 2. fork new subtasks to solve each part. Step 3. join all subtasks. Step 4. compose result from subresults. }
Fork/Join 框架設(shè)計
Fork/Join 框架核心思想是把一個大任務(wù)拆分成若干個小任務(wù),然后匯總每個小任務(wù)的結(jié)果最終得到大任務(wù)的結(jié)果,如果讓你設(shè)計一個這樣的框架,你會如何實現(xiàn)呢?(建議思考一下),F(xiàn)ork/Join 框架的整個流程正如其名所示,分為兩個步驟:
- 大任務(wù)分割 需要有這么一個的類,用來將大任務(wù)拆分為子任務(wù),可能一次拆分后的子任務(wù)還是比較大,需要多次拆分,直到拆分出來的子任務(wù)符合我們定義的小任務(wù)才結(jié)束。
- 執(zhí)行任務(wù)并合并任務(wù)結(jié)果 第一步拆分出來的子任務(wù)分別存放在一個個 雙端隊列 里面(P.S. 這里為什么要使用雙端隊列請看下文),然后每個隊列啟動一個線程從隊列中獲取任務(wù)執(zhí)行。這些子任務(wù)的執(zhí)行結(jié)果都會放到一個統(tǒng)一的隊列中,然后再啟動一個線程從這個隊列中拿數(shù)據(jù),最后合并這些數(shù)據(jù)返回。
Fork/Join 框架使用了如下兩個類來完成以上兩個步驟:
- ForkJoinTask 類 在上文的實例中也有提到,表示 ForkJoin 任務(wù),在使用框架時首先必須先定義任務(wù),通常只需要繼承自 ForkJoinTask 類的子類 RecursiveAction(無返回結(jié)果) 或者 RecursiveTask(有返回結(jié)果)即可。
- ForkJoinPool 從名字也可以猜到一二了,就是用來執(zhí)行 ForkJoinTask 的線程池。大任務(wù)拆分出的子任務(wù)會添加到當(dāng)前線程的雙端隊列的頭部。
喜歡思考的你,心中想必會想到這么一種場景,當(dāng)我們需要完成一個大任務(wù)時,會先把這個大任務(wù)拆分為多個獨(dú)立的子任務(wù),這些子任務(wù)會放到獨(dú)立的隊列中,并為每個隊列都創(chuàng)建一個單獨(dú)的線程去執(zhí)行隊列里的任務(wù),即這里線程和隊列時一對一的關(guān)系,那么當(dāng)有的線程可能會先把自己隊列的任務(wù)執(zhí)行完成了,而有的線程則沒有執(zhí)行完成,這就導(dǎo)致一些先執(zhí)行完任務(wù)的線程干等了,這是個好問題。
既然是做并發(fā)的,肯定要最大程度壓榨計算機(jī)的性能,對于這種場景并發(fā)大師 Doug Lea 使用了工作竊取算法處理,使用工作竊取算法后,先完成自己隊列中任務(wù)的線程會去其它線程的隊列中”竊取“一個任務(wù)來執(zhí)行,哈哈,一方有難,八方支援。但是此時這個線程和隊列的持有線程會同時訪問同一個隊列,所以為了減少竊取任務(wù)的線程和被竊取任務(wù)的線程之間的競爭,F(xiàn)orkJoin 選擇了雙端隊列這種數(shù)據(jù)結(jié)構(gòu),這樣就可以按照這種規(guī)則執(zhí)行任務(wù)了:被竊取任務(wù)的線程始終從隊列頭部獲取任務(wù)并執(zhí)行,竊取任務(wù)的線程使用從隊列尾部獲取任務(wù)執(zhí)行。這個算法在絕大部分情況下都可以充分利用多線程進(jìn)行并行計算,但是在雙端隊列里只有一個任務(wù)等極端情況下還是會存在一定程度的競爭。
Fork/Join 框架實現(xiàn)原理
Fork/Join 框架的實現(xiàn)核心是 ForkJoinPool 類,該類的重要組成部分為 ForkJoinTask 數(shù)組和 ForkJoinWorkerThread 數(shù)組,其中 ForkJoinTask 數(shù)組用來存放框架使用者給提交給 ForkJoinPool 的任務(wù),F(xiàn)orkJoinWorkerThread 數(shù)組則負(fù)責(zé)執(zhí)行這些任務(wù)。任務(wù)有如下四種狀態(tài):
NORMAL 已完成
CANCELLED 被取消
SIGNAL 信號
EXCEPTIONAL 發(fā)生異常
下面來看看這兩個類的核心方法實現(xiàn)原理,首先來看 ForkJoinTask 的 fork() 方法,源碼如下:
方法對于 ForkJoinWorkerThread 類型的線程,首先會調(diào)用 ForkJoinWorkerThread 的 workQueue 的 push() 方法異步的去執(zhí)行這個任務(wù),然后馬上返回結(jié)果。繼續(xù)跟進(jìn) ForkJoinPool 的 push() 方法,源碼如下:
方法將當(dāng)前任務(wù)添加到 ForkJoinTask 任務(wù)隊列數(shù)組中,然后再調(diào)用 ForkJoinPool 的 signalWork 方法創(chuàng)建或者喚醒一個工作線程來執(zhí)行該任務(wù)。然后再來看看 ForkJoinTask 的 join() 方法,方法源碼如下:
方法首先調(diào)用了 doJoin() 方法,該方法返回當(dāng)前任務(wù)的狀態(tài),根據(jù)返回的任務(wù)狀態(tài)做不同的處理:
- 已完成狀態(tài)則直接返回結(jié)果
- 被取消狀態(tài)則直接拋出異常(CancellationException)
- 發(fā)生異常狀態(tài)則直接拋出對應(yīng)的異常
繼續(xù)跟進(jìn) doJoin() 方法,方法源碼如下:
方法首先判斷當(dāng)前任務(wù)狀態(tài)是否已經(jīng)執(zhí)行完成,如果執(zhí)行完成則直接返回任務(wù)狀態(tài)。如果沒有執(zhí)行完成,則從任務(wù)數(shù)組中(workQueue)取出任務(wù)并執(zhí)行,任務(wù)執(zhí)行完成則設(shè)置任務(wù)狀態(tài)為 NORMAL,如果出現(xiàn)異常則記錄異常并設(shè)置任務(wù)狀態(tài)為 EXCEPTIONAL(在 doExec() 方法中)。
總結(jié)
本文主要介紹了 Java 并發(fā)框架中的 Fork/Join 框架的基本原理和其使用的工作竊取算法(work-stealing)、設(shè)計方式和部分實現(xiàn)源碼。Fork/Join 框架在 JDK 的官方標(biāo)準(zhǔn)庫中也有應(yīng)用。比如 JDK 1.8+ 標(biāo)準(zhǔn)庫提供的 Arrays.parallelSort(array) 可以進(jìn)行并行排序,它的原理就是內(nèi)部通過 Fork/Join 框架對大數(shù)組分拆進(jìn)行并行排序,可以提高排序的速度,還有集合中的 Collection.parallelStream() 方法底層也是基于 Fork/Join 框架實現(xiàn)的,最后就是定義小任務(wù)的閾值往往是需要通過測試驗證才能合理給出,并且保證程序可以達(dá)到最好的性能。
到此這篇關(guān)于Java 并發(fā)中的Fork/Join 框架機(jī)制詳解的文章就介紹到這了,更多相關(guān)Java Fork/Join 框架內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java聊天室之實現(xiàn)接收和發(fā)送Socket
這篇文章主要為大家詳細(xì)介紹了Java簡易聊天室之實現(xiàn)接收和發(fā)送Socket功能,文中的示例代碼講解詳細(xì),具有一定的借鑒價值,需要的可以了解一下2022-10-10Fluent Mybatis學(xué)習(xí)之Update語法實踐
Fluent MyBatis是一個MyBatis的增強(qiáng)工具,沒有對mybatis做任何修改。本篇文章將詳細(xì)介紹對Fluent Mybatis中的update語法進(jìn)行驗證。代碼具有一定價值,感興趣的小伙伴可以學(xué)習(xí)一下2021-11-11Java Swing實現(xiàn)坦克大戰(zhàn)游戲
這篇文章主要介紹了Java Swing實現(xiàn)坦克大戰(zhàn)游戲,文中有非常詳細(xì)的代碼示例,對正在學(xué)習(xí)java的小伙伴們有很大的幫助喲,需要的朋友可以參考下2021-05-05如何在mybatis中向BLOB字段批量插入數(shù)據(jù)
這篇文章主要介紹了如何在mybatis中向BLOB字段批量插入數(shù)據(jù)的相關(guān)知識,本文通過實例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧2020-10-10