詳解Java中的ForkJoin
ForkJoin簡介
Fork/Join框架是Java 7提供的一種用于并行執(zhí)行任務(wù)的框架,它將大任務(wù)分解為若干個小任務(wù),并行執(zhí)行這些小任務(wù),最終通過合并每個小任務(wù)的結(jié)果得到大任務(wù)的結(jié)果。
Fork/Join采用的是分而治之的基本思想,分而治之就是將一個復雜的任務(wù),按照規(guī)定的閾值劃分成多個簡單的小任務(wù),然后將這些小任務(wù)的結(jié)果再進行匯總返回,得到最終的任務(wù)。
并行和并發(fā)的區(qū)別
并行和并發(fā)是計算機科學中的兩個概念,它們之間有一些相似之處,但也有明顯的區(qū)別。
并行是指多個處理器或者是多核的處理器同時處理多個不同的任務(wù)。并行可以在多處理器系統(tǒng)中實現(xiàn),利用每個處理機來處理一個可并發(fā)執(zhí)行的程序,從而實現(xiàn)多個程序的同時執(zhí)行。在并行執(zhí)行時,每個處理器可以同時執(zhí)行多個程序,從而提高計算效率。
并發(fā)是指邏輯上的同時發(fā)生(即 true 的同時性),而并行是物理上的同時發(fā)生。在多道程序環(huán)境下,并發(fā)性是指在一段時間內(nèi)宏觀上有多個程序在同時運行,但在單處理機系統(tǒng)中,每一時刻卻僅能有一道程序執(zhí)行,故微觀上這些程序只能是分時地交替執(zhí)行。
簡而言之,并行是指多個處理器或多核處理器同時處理多個任務(wù),而并發(fā)是指在同一時間內(nèi)多個任務(wù)同時發(fā)生。
工作竊取算法
工作竊取算法是指某個線程從其他隊列里竊取任務(wù)來執(zhí)行。當工作隊列中有空閑任務(wù)時,就將任務(wù)從原線程的隊列中竊取過來,執(zhí)行完成后再將結(jié)果返回給原線程。這樣就保證了原線程不會一直等待空閑任務(wù),從而提高了程序的效率。
Fork/Join框架使用ForkJoinPool這個特殊的線程池來處理任務(wù)之間有依賴的情況,其實現(xiàn)了“work-stealing”算法(工作量竊取算法)并執(zhí)行ForkJoinTask對象。ForkJoinPool保持多個線程,其線程數(shù)量默認為機器cpu核心數(shù)。每個線程都有一個特殊類型的deques隊列(雙端隊列),放置該線程的所有任務(wù),而不是所有線程共享一個公共隊列。
每個線程都會保證將自己隊列中的任務(wù)執(zhí)行完,當自己的任務(wù)執(zhí)行完成之后,在去看其他線程的任務(wù)隊列中是否有未處理完的任務(wù),如果有則會幫助其他線程執(zhí)行。
這時雙端隊列的優(yōu)勢就體現(xiàn)出來了,被竊取的任務(wù)只會從隊列的頭部獲取任務(wù),而正常處理的線程每次都是從隊列的尾部獲取任務(wù)。
求1到1億的和
package com.fandf.test.forkjoin;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StopWatch;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
/**
* @author fandongfeng
*/
@Slf4j
public class ForkJoinDemo extends RecursiveTask<Long> {
/**
* 小任務(wù)的大小閾值
*/
public static final int TASK_SIZE = 100000;
/**
* 開始數(shù)字
*/
private final Long start;
/**
* 結(jié)束數(shù)字
*/
private final Long end;
public ForkJoinDemo(Long start, Long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long sum = 0L;
//如果任務(wù)足夠小就計算任務(wù)
boolean canCompute = (end - start) <= TASK_SIZE;
if (canCompute) {
for (Long i = start; i <= end; i++) {
sum += i;
}
} else {
// 如果任務(wù)大于閾值,就分裂成兩個子任務(wù)計算
long middle = (start + end) / 2;
ForkJoinDemo leftTask = new ForkJoinDemo(start, middle);
ForkJoinDemo rightTask = new ForkJoinDemo(middle + 1, end);
// 執(zhí)行子任務(wù)
leftTask.fork();
rightTask.fork();
// 等待任務(wù)執(zhí)行結(jié)束合并其結(jié)果
Long leftResult = leftTask.join();
Long rightResult = rightTask.join();
// 合并子任務(wù)
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool forkjoinPool = new ForkJoinPool();
//生成一個計算任務(wù),計算1+2+3+4+...+100000000
ForkJoinDemo task = new ForkJoinDemo(1L, 100000000L);
StopWatch stopWatch = new StopWatch();
stopWatch.start();
//執(zhí)行一個任務(wù)
Future<Long> result = forkjoinPool.submit(task);
try {
System.out.println("result:" + result.get());
} catch (Exception e) {
log.error("exception", e);
}
stopWatch.stop();
System.out.println("總耗時:" + stopWatch.getTotalTimeMillis() + "毫秒");
System.out.println("getParallelism:" + forkjoinPool.getParallelism());
System.out.println("getPoolSize:" + forkjoinPool.getPoolSize());
}
}
輸出結(jié)果
result:5000000050000000
總耗時:330毫秒
getParallelism:6
getPoolSize:7
ForkJoin框架實現(xiàn)
ForkJoinPool
ForkJoinPool是用于運行ForkJoinTasks的線程池,實現(xiàn)了Executor接口

public ForkJoinPool() {
this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
defaultForkJoinWorkerThreadFactory, null, false);
}
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
ForkJoinPool構(gòu)造方法有四個參數(shù):
- parallelism:期望并發(fā)數(shù)。默認會使用
Runtime.getRuntime().availableProcessors()的值 - factory:創(chuàng)建ForkJoin工作線程的工廠,默認為defaultForkJoinWorkerThreadFactory
- handler:執(zhí)行任務(wù)時遇到不可恢復的錯誤時的處理程序,默認為null
- asyncMode:工作線程獲取任務(wù)使用FIFO(先進先出)模式還是LIFO(后進先出)模式,默認為LIFO
ForkJoinTask
ForkJoinTask是對于在ForkJoinPool中運行任務(wù)的抽象類定義。
JDK為我們提供了三種特定類型的ForkJoinTask父類供我們自定義時繼承使用。
- RecursiveAction:子任務(wù)不返回結(jié)果
- RecursiveTask:子任務(wù)返回結(jié)果
- CountedCompleter:在任務(wù)完成執(zhí)行后會觸發(fā)執(zhí)行
ForkJoinWorkerThread
ForkJoinPool中用于執(zhí)行ForkJoinTask的線程。
ForkJoinPool實現(xiàn)了Executor接口。但是和我們常用的ThreadPoolExecutor又有一些區(qū)別。
如果使用ThreadPoolExecutor來實現(xiàn)上面分治任務(wù),那么每個子任務(wù)都需要創(chuàng)建一個線程,如果子任務(wù)的數(shù)量很大,假設(shè)有上萬個,那么使用ThreadPoolExecutor創(chuàng)建出上萬個線程,這顯然是不可行也不合理的;
而ForkJoinPool在處理任務(wù)時,并不會按照任務(wù)開啟線程,而是按照指定的期望并行數(shù)量創(chuàng)建線程。在每個線程工作時,如果需要繼續(xù)拆分子任務(wù),則會將當前任務(wù)放入ForkJoinWorkerThread的任務(wù)隊列中,遞歸處理直到最外層的任務(wù)。
ForkJoinTask啟動方式
- 異步執(zhí)行
forkjoinPool.execute(task);無返回結(jié)果 - 同步執(zhí)行
forkjoinPool.invoke(task);等待返回結(jié)果 - 異步執(zhí)行,通過Future獲取結(jié)果
forkjoinPool.submit(task);
總結(jié)
在使用Fork/Join框架時,需要注意以下幾點:
- 必須首先創(chuàng)建一個ForkJoinTask對象。
- 在分發(fā)任務(wù)時,需要注意線程安全問題,防止多個線程同時訪問共享資源??梢允褂胹ynchronized關(guān)鍵字或者Lock對象來保證線程安全。
- 在合并結(jié)果時,也需要注意線程安全問題,可以使用CountDownLatch對象來確保每個Fork執(zhí)行完成后才能提交結(jié)果。
- 在使用Fork/Join框架時,需要考慮算法的效率和性能問題??梢允褂肅ache技術(shù)來減少不必要的計算,使用join策略來合并結(jié)果等。
總之,F(xiàn)ork/Join框架是一種非常有用的并行計算框架,可以大大提高程序的執(zhí)行效率和并發(fā)能力。
以上就是詳解Java中的ForkJoin的詳細內(nèi)容,更多關(guān)于Java ForkJoin的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java超詳細精講數(shù)據(jù)結(jié)構(gòu)之bfs與雙端隊列
廣搜BFS的基本思想是: 首先訪問初始點v并將其標志為已經(jīng)訪問。接著通過鄰接關(guān)系將鄰接點入隊。然后每訪問過一個頂點則出隊。按照順序,訪問每一個頂點的所有未被訪問過的頂點直到所有的頂點均被訪問過。廣度優(yōu)先遍歷類似與層次遍歷2022-07-07
Java中的HashMap弱引用之WeakHashMap詳解
這篇文章主要介紹了Java中的HashMap弱引用之WeakHashMap詳解,當內(nèi)存空間不足,Java虛擬機寧愿拋出OutOfMemoryError錯誤,使程序異常終止,也不會靠隨意回收具有強引用的對象來解決內(nèi)存不足的問題,需要的朋友可以參考下2023-09-09
Spring?Boot?集成JWT實現(xiàn)前后端認證的示例代碼
小程序、H5應(yīng)用的快速發(fā)展,使得前后端分離已經(jīng)成為了趨勢,本文主要介紹了Spring?Boot?集成JWT實現(xiàn)前后端認證,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-04-04
springMVC+ajax實現(xiàn)文件上傳且?guī)нM度條實例
本篇文章主要介紹了springMVC+ajax實現(xiàn)文件上傳且?guī)нM度條實例,具有一定的參考價值,有興趣的可以了解一下。2017-01-01
SpringBoot實現(xiàn)權(quán)限驗證的示例步驟
權(quán)限驗證是一種用于控制對系統(tǒng)資源和操作的訪問的機制。它允許開發(fā)人員定義誰可以執(zhí)行特定操作或訪問特定資源,并確保只有經(jīng)過授權(quán)的用戶才能執(zhí)行這些操作,這篇文章主要介紹了SpringBoot實現(xiàn)權(quán)限驗證,需要的朋友可以參考下2023-08-08

