詳解Java中的ForkJoin
ForkJoin簡介
Fork/Join框架是Java 7提供的一種用于并行執(zhí)行任務的框架,它將大任務分解為若干個小任務,并行執(zhí)行這些小任務,最終通過合并每個小任務的結果得到大任務的結果。
Fork/Join采用的是分而治之的基本思想,分而治之就是將一個復雜的任務,按照規(guī)定的閾值劃分成多個簡單的小任務,然后將這些小任務的結果再進行匯總返回,得到最終的任務。
并行和并發(fā)的區(qū)別
并行和并發(fā)是計算機科學中的兩個概念,它們之間有一些相似之處,但也有明顯的區(qū)別。
并行是指多個處理器或者是多核的處理器同時處理多個不同的任務。并行可以在多處理器系統(tǒng)中實現(xiàn),利用每個處理機來處理一個可并發(fā)執(zhí)行的程序,從而實現(xiàn)多個程序的同時執(zhí)行。在并行執(zhí)行時,每個處理器可以同時執(zhí)行多個程序,從而提高計算效率。
并發(fā)是指邏輯上的同時發(fā)生(即 true 的同時性),而并行是物理上的同時發(fā)生。在多道程序環(huán)境下,并發(fā)性是指在一段時間內宏觀上有多個程序在同時運行,但在單處理機系統(tǒng)中,每一時刻卻僅能有一道程序執(zhí)行,故微觀上這些程序只能是分時地交替執(zhí)行。
簡而言之,并行是指多個處理器或多核處理器同時處理多個任務,而并發(fā)是指在同一時間內多個任務同時發(fā)生。
工作竊取算法
工作竊取算法是指某個線程從其他隊列里竊取任務來執(zhí)行。當工作隊列中有空閑任務時,就將任務從原線程的隊列中竊取過來,執(zhí)行完成后再將結果返回給原線程。這樣就保證了原線程不會一直等待空閑任務,從而提高了程序的效率。
Fork/Join框架使用ForkJoinPool這個特殊的線程池來處理任務之間有依賴的情況,其實現(xiàn)了“work-stealing”算法(工作量竊取算法)并執(zhí)行ForkJoinTask對象。ForkJoinPool保持多個線程,其線程數(shù)量默認為機器cpu核心數(shù)。每個線程都有一個特殊類型的deques隊列(雙端隊列),放置該線程的所有任務,而不是所有線程共享一個公共隊列。
每個線程都會保證將自己隊列中的任務執(zhí)行完,當自己的任務執(zhí)行完成之后,在去看其他線程的任務隊列中是否有未處理完的任務,如果有則會幫助其他線程執(zhí)行。
這時雙端隊列的優(yōu)勢就體現(xiàn)出來了,被竊取的任務只會從隊列的頭部獲取任務,而正常處理的線程每次都是從隊列的尾部獲取任務。
求1到1億的和
package com.fandf.test.forkjoin; import lombok.extern.slf4j.Slf4j; import org.springframework.util.StopWatch; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask; /** * @author fandongfeng */ @Slf4j public class ForkJoinDemo extends RecursiveTask<Long> { /** * 小任務的大小閾值 */ public static final int TASK_SIZE = 100000; /** * 開始數(shù)字 */ private final Long start; /** * 結束數(shù)字 */ private final Long end; public ForkJoinDemo(Long start, Long end) { this.start = start; this.end = end; } @Override protected Long compute() { long sum = 0L; //如果任務足夠小就計算任務 boolean canCompute = (end - start) <= TASK_SIZE; if (canCompute) { for (Long i = start; i <= end; i++) { sum += i; } } else { // 如果任務大于閾值,就分裂成兩個子任務計算 long middle = (start + end) / 2; ForkJoinDemo leftTask = new ForkJoinDemo(start, middle); ForkJoinDemo rightTask = new ForkJoinDemo(middle + 1, end); // 執(zhí)行子任務 leftTask.fork(); rightTask.fork(); // 等待任務執(zhí)行結束合并其結果 Long leftResult = leftTask.join(); Long rightResult = rightTask.join(); // 合并子任務 sum = leftResult + rightResult; } return sum; } public static void main(String[] args) { ForkJoinPool forkjoinPool = new ForkJoinPool(); //生成一個計算任務,計算1+2+3+4+...+100000000 ForkJoinDemo task = new ForkJoinDemo(1L, 100000000L); StopWatch stopWatch = new StopWatch(); stopWatch.start(); //執(zhí)行一個任務 Future<Long> result = forkjoinPool.submit(task); try { System.out.println("result:" + result.get()); } catch (Exception e) { log.error("exception", e); } stopWatch.stop(); System.out.println("總耗時:" + stopWatch.getTotalTimeMillis() + "毫秒"); System.out.println("getParallelism:" + forkjoinPool.getParallelism()); System.out.println("getPoolSize:" + forkjoinPool.getPoolSize()); } }
輸出結果
result:5000000050000000
總耗時:330毫秒
getParallelism:6
getPoolSize:7
ForkJoin框架實現(xiàn)
ForkJoinPool
ForkJoinPool
是用于運行ForkJoinTasks
的線程池,實現(xiàn)了Executor
接口
public ForkJoinPool() { this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), defaultForkJoinWorkerThreadFactory, null, false); } public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode) { this(checkParallelism(parallelism), checkFactory(factory), handler, asyncMode ? FIFO_QUEUE : LIFO_QUEUE, "ForkJoinPool-" + nextPoolId() + "-worker-"); checkPermission(); }
ForkJoinPool構造方法有四個參數(shù):
- parallelism:期望并發(fā)數(shù)。默認會使用
Runtime.getRuntime().availableProcessors()
的值 - factory:創(chuàng)建ForkJoin工作線程的工廠,默認為defaultForkJoinWorkerThreadFactory
- handler:執(zhí)行任務時遇到不可恢復的錯誤時的處理程序,默認為null
- asyncMode:工作線程獲取任務使用FIFO(先進先出)模式還是LIFO(后進先出)模式,默認為LIFO
ForkJoinTask
ForkJoinTask
是對于在ForkJoinPool
中運行任務的抽象類定義。
JDK為我們提供了三種特定類型的ForkJoinTask父類供我們自定義時繼承使用。
- RecursiveAction:子任務不返回結果
- RecursiveTask:子任務返回結果
- CountedCompleter:在任務完成執(zhí)行后會觸發(fā)執(zhí)行
ForkJoinWorkerThread
ForkJoinPool
中用于執(zhí)行ForkJoinTask
的線程。
ForkJoinPool實現(xiàn)了Executor接口。但是和我們常用的ThreadPoolExecutor又有一些區(qū)別。
如果使用ThreadPoolExecutor來實現(xiàn)上面分治任務,那么每個子任務都需要創(chuàng)建一個線程,如果子任務的數(shù)量很大,假設有上萬個,那么使用ThreadPoolExecutor創(chuàng)建出上萬個線程,這顯然是不可行也不合理的;
而ForkJoinPool在處理任務時,并不會按照任務開啟線程,而是按照指定的期望并行數(shù)量創(chuàng)建線程。在每個線程工作時,如果需要繼續(xù)拆分子任務,則會將當前任務放入ForkJoinWorkerThread的任務隊列中,遞歸處理直到最外層的任務。
ForkJoinTask啟動方式
- 異步執(zhí)行
forkjoinPool.execute(task);無返回結果 - 同步執(zhí)行
forkjoinPool.invoke(task);等待返回結果 - 異步執(zhí)行,通過Future獲取結果
forkjoinPool.submit(task);
總結
在使用Fork/Join框架時,需要注意以下幾點:
- 必須首先創(chuàng)建一個ForkJoinTask對象。
- 在分發(fā)任務時,需要注意線程安全問題,防止多個線程同時訪問共享資源??梢允褂胹ynchronized關鍵字或者Lock對象來保證線程安全。
- 在合并結果時,也需要注意線程安全問題,可以使用CountDownLatch對象來確保每個Fork執(zhí)行完成后才能提交結果。
- 在使用Fork/Join框架時,需要考慮算法的效率和性能問題??梢允褂肅ache技術來減少不必要的計算,使用join策略來合并結果等。
總之,F(xiàn)ork/Join框架是一種非常有用的并行計算框架,可以大大提高程序的執(zhí)行效率和并發(fā)能力。
以上就是詳解Java中的ForkJoin的詳細內容,更多關于Java ForkJoin的資料請關注腳本之家其它相關文章!
相關文章
Java超詳細精講數(shù)據(jù)結構之bfs與雙端隊列
廣搜BFS的基本思想是: 首先訪問初始點v并將其標志為已經(jīng)訪問。接著通過鄰接關系將鄰接點入隊。然后每訪問過一個頂點則出隊。按照順序,訪問每一個頂點的所有未被訪問過的頂點直到所有的頂點均被訪問過。廣度優(yōu)先遍歷類似與層次遍歷2022-07-07Java中的HashMap弱引用之WeakHashMap詳解
這篇文章主要介紹了Java中的HashMap弱引用之WeakHashMap詳解,當內存空間不足,Java虛擬機寧愿拋出OutOfMemoryError錯誤,使程序異常終止,也不會靠隨意回收具有強引用的對象來解決內存不足的問題,需要的朋友可以參考下2023-09-09Spring?Boot?集成JWT實現(xiàn)前后端認證的示例代碼
小程序、H5應用的快速發(fā)展,使得前后端分離已經(jīng)成為了趨勢,本文主要介紹了Spring?Boot?集成JWT實現(xiàn)前后端認證,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-04-04springMVC+ajax實現(xiàn)文件上傳且?guī)нM度條實例
本篇文章主要介紹了springMVC+ajax實現(xiàn)文件上傳且?guī)нM度條實例,具有一定的參考價值,有興趣的可以了解一下。2017-01-01