詳解Java中的ForkJoin
ForkJoin簡(jiǎn)介
Fork/Join框架是Java 7提供的一種用于并行執(zhí)行任務(wù)的框架,它將大任務(wù)分解為若干個(gè)小任務(wù),并行執(zhí)行這些小任務(wù),最終通過合并每個(gè)小任務(wù)的結(jié)果得到大任務(wù)的結(jié)果。
Fork/Join采用的是分而治之的基本思想,分而治之就是將一個(gè)復(fù)雜的任務(wù),按照規(guī)定的閾值劃分成多個(gè)簡(jiǎn)單的小任務(wù),然后將這些小任務(wù)的結(jié)果再進(jìn)行匯總返回,得到最終的任務(wù)。
并行和并發(fā)的區(qū)別
并行和并發(fā)是計(jì)算機(jī)科學(xué)中的兩個(gè)概念,它們之間有一些相似之處,但也有明顯的區(qū)別。
并行是指多個(gè)處理器或者是多核的處理器同時(shí)處理多個(gè)不同的任務(wù)。并行可以在多處理器系統(tǒng)中實(shí)現(xiàn),利用每個(gè)處理機(jī)來處理一個(gè)可并發(fā)執(zhí)行的程序,從而實(shí)現(xiàn)多個(gè)程序的同時(shí)執(zhí)行。在并行執(zhí)行時(shí),每個(gè)處理器可以同時(shí)執(zhí)行多個(gè)程序,從而提高計(jì)算效率。
并發(fā)是指邏輯上的同時(shí)發(fā)生(即 true 的同時(shí)性),而并行是物理上的同時(shí)發(fā)生。在多道程序環(huán)境下,并發(fā)性是指在一段時(shí)間內(nèi)宏觀上有多個(gè)程序在同時(shí)運(yùn)行,但在單處理機(jī)系統(tǒng)中,每一時(shí)刻卻僅能有一道程序執(zhí)行,故微觀上這些程序只能是分時(shí)地交替執(zhí)行。
簡(jiǎn)而言之,并行是指多個(gè)處理器或多核處理器同時(shí)處理多個(gè)任務(wù),而并發(fā)是指在同一時(shí)間內(nèi)多個(gè)任務(wù)同時(shí)發(fā)生。
工作竊取算法
工作竊取算法是指某個(gè)線程從其他隊(duì)列里竊取任務(wù)來執(zhí)行。當(dāng)工作隊(duì)列中有空閑任務(wù)時(shí),就將任務(wù)從原線程的隊(duì)列中竊取過來,執(zhí)行完成后再將結(jié)果返回給原線程。這樣就保證了原線程不會(huì)一直等待空閑任務(wù),從而提高了程序的效率。
Fork/Join框架使用ForkJoinPool這個(gè)特殊的線程池來處理任務(wù)之間有依賴的情況,其實(shí)現(xiàn)了“work-stealing”算法(工作量竊取算法)并執(zhí)行ForkJoinTask對(duì)象。ForkJoinPool保持多個(gè)線程,其線程數(shù)量默認(rèn)為機(jī)器cpu核心數(shù)。每個(gè)線程都有一個(gè)特殊類型的deques隊(duì)列(雙端隊(duì)列),放置該線程的所有任務(wù),而不是所有線程共享一個(gè)公共隊(duì)列。
每個(gè)線程都會(huì)保證將自己隊(duì)列中的任務(wù)執(zhí)行完,當(dāng)自己的任務(wù)執(zhí)行完成之后,在去看其他線程的任務(wù)隊(duì)列中是否有未處理完的任務(wù),如果有則會(huì)幫助其他線程執(zhí)行。
這時(shí)雙端隊(duì)列的優(yōu)勢(shì)就體現(xiàn)出來了,被竊取的任務(wù)只會(huì)從隊(duì)列的頭部獲取任務(wù),而正常處理的線程每次都是從隊(duì)列的尾部獲取任務(wù)。
求1到1億的和
package com.fandf.test.forkjoin;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StopWatch;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
/**
* @author fandongfeng
*/
@Slf4j
public class ForkJoinDemo extends RecursiveTask<Long> {
/**
* 小任務(wù)的大小閾值
*/
public static final int TASK_SIZE = 100000;
/**
* 開始數(shù)字
*/
private final Long start;
/**
* 結(jié)束數(shù)字
*/
private final Long end;
public ForkJoinDemo(Long start, Long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long sum = 0L;
//如果任務(wù)足夠小就計(jì)算任務(wù)
boolean canCompute = (end - start) <= TASK_SIZE;
if (canCompute) {
for (Long i = start; i <= end; i++) {
sum += i;
}
} else {
// 如果任務(wù)大于閾值,就分裂成兩個(gè)子任務(wù)計(jì)算
long middle = (start + end) / 2;
ForkJoinDemo leftTask = new ForkJoinDemo(start, middle);
ForkJoinDemo rightTask = new ForkJoinDemo(middle + 1, end);
// 執(zhí)行子任務(wù)
leftTask.fork();
rightTask.fork();
// 等待任務(wù)執(zhí)行結(jié)束合并其結(jié)果
Long leftResult = leftTask.join();
Long rightResult = rightTask.join();
// 合并子任務(wù)
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool forkjoinPool = new ForkJoinPool();
//生成一個(gè)計(jì)算任務(wù),計(jì)算1+2+3+4+...+100000000
ForkJoinDemo task = new ForkJoinDemo(1L, 100000000L);
StopWatch stopWatch = new StopWatch();
stopWatch.start();
//執(zhí)行一個(gè)任務(wù)
Future<Long> result = forkjoinPool.submit(task);
try {
System.out.println("result:" + result.get());
} catch (Exception e) {
log.error("exception", e);
}
stopWatch.stop();
System.out.println("總耗時(shí):" + stopWatch.getTotalTimeMillis() + "毫秒");
System.out.println("getParallelism:" + forkjoinPool.getParallelism());
System.out.println("getPoolSize:" + forkjoinPool.getPoolSize());
}
}
輸出結(jié)果
result:5000000050000000
總耗時(shí):330毫秒
getParallelism:6
getPoolSize:7
ForkJoin框架實(shí)現(xiàn)
ForkJoinPool
ForkJoinPool是用于運(yùn)行ForkJoinTasks的線程池,實(shí)現(xiàn)了Executor接口

public ForkJoinPool() {
this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
defaultForkJoinWorkerThreadFactory, null, false);
}
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
ForkJoinPool構(gòu)造方法有四個(gè)參數(shù):
- parallelism:期望并發(fā)數(shù)。默認(rèn)會(huì)使用
Runtime.getRuntime().availableProcessors()的值 - factory:創(chuàng)建ForkJoin工作線程的工廠,默認(rèn)為defaultForkJoinWorkerThreadFactory
- handler:執(zhí)行任務(wù)時(shí)遇到不可恢復(fù)的錯(cuò)誤時(shí)的處理程序,默認(rèn)為null
- asyncMode:工作線程獲取任務(wù)使用FIFO(先進(jìn)先出)模式還是LIFO(后進(jìn)先出)模式,默認(rèn)為L(zhǎng)IFO
ForkJoinTask
ForkJoinTask是對(duì)于在ForkJoinPool中運(yùn)行任務(wù)的抽象類定義。
JDK為我們提供了三種特定類型的ForkJoinTask父類供我們自定義時(shí)繼承使用。
- RecursiveAction:子任務(wù)不返回結(jié)果
- RecursiveTask:子任務(wù)返回結(jié)果
- CountedCompleter:在任務(wù)完成執(zhí)行后會(huì)觸發(fā)執(zhí)行
ForkJoinWorkerThread
ForkJoinPool中用于執(zhí)行ForkJoinTask的線程。
ForkJoinPool實(shí)現(xiàn)了Executor接口。但是和我們常用的ThreadPoolExecutor又有一些區(qū)別。
如果使用ThreadPoolExecutor來實(shí)現(xiàn)上面分治任務(wù),那么每個(gè)子任務(wù)都需要?jiǎng)?chuàng)建一個(gè)線程,如果子任務(wù)的數(shù)量很大,假設(shè)有上萬(wàn)個(gè),那么使用ThreadPoolExecutor創(chuàng)建出上萬(wàn)個(gè)線程,這顯然是不可行也不合理的;
而ForkJoinPool在處理任務(wù)時(shí),并不會(huì)按照任務(wù)開啟線程,而是按照指定的期望并行數(shù)量創(chuàng)建線程。在每個(gè)線程工作時(shí),如果需要繼續(xù)拆分子任務(wù),則會(huì)將當(dāng)前任務(wù)放入ForkJoinWorkerThread的任務(wù)隊(duì)列中,遞歸處理直到最外層的任務(wù)。
ForkJoinTask啟動(dòng)方式
- 異步執(zhí)行
forkjoinPool.execute(task);無(wú)返回結(jié)果 - 同步執(zhí)行
forkjoinPool.invoke(task);等待返回結(jié)果 - 異步執(zhí)行,通過Future獲取結(jié)果
forkjoinPool.submit(task);
總結(jié)
在使用Fork/Join框架時(shí),需要注意以下幾點(diǎn):
- 必須首先創(chuàng)建一個(gè)ForkJoinTask對(duì)象。
- 在分發(fā)任務(wù)時(shí),需要注意線程安全問題,防止多個(gè)線程同時(shí)訪問共享資源??梢允褂胹ynchronized關(guān)鍵字或者Lock對(duì)象來保證線程安全。
- 在合并結(jié)果時(shí),也需要注意線程安全問題,可以使用CountDownLatch對(duì)象來確保每個(gè)Fork執(zhí)行完成后才能提交結(jié)果。
- 在使用Fork/Join框架時(shí),需要考慮算法的效率和性能問題??梢允褂肅ache技術(shù)來減少不必要的計(jì)算,使用join策略來合并結(jié)果等。
總之,F(xiàn)ork/Join框架是一種非常有用的并行計(jì)算框架,可以大大提高程序的執(zhí)行效率和并發(fā)能力。
以上就是詳解Java中的ForkJoin的詳細(xì)內(nèi)容,更多關(guān)于Java ForkJoin的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java超詳細(xì)精講數(shù)據(jù)結(jié)構(gòu)之bfs與雙端隊(duì)列
廣搜BFS的基本思想是: 首先訪問初始點(diǎn)v并將其標(biāo)志為已經(jīng)訪問。接著通過鄰接關(guān)系將鄰接點(diǎn)入隊(duì)。然后每訪問過一個(gè)頂點(diǎn)則出隊(duì)。按照順序,訪問每一個(gè)頂點(diǎn)的所有未被訪問過的頂點(diǎn)直到所有的頂點(diǎn)均被訪問過。廣度優(yōu)先遍歷類似與層次遍歷2022-07-07
Java中的HashMap弱引用之WeakHashMap詳解
這篇文章主要介紹了Java中的HashMap弱引用之WeakHashMap詳解,當(dāng)內(nèi)存空間不足,Java虛擬機(jī)寧愿拋出OutOfMemoryError錯(cuò)誤,使程序異常終止,也不會(huì)靠隨意回收具有強(qiáng)引用的對(duì)象來解決內(nèi)存不足的問題,需要的朋友可以參考下2023-09-09
java 模仿拼多多紅包遞減算法的實(shí)現(xiàn)
這篇文章主要介紹了java 模仿拼多多紅包遞減算法的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-02-02
Spring?Boot?集成JWT實(shí)現(xiàn)前后端認(rèn)證的示例代碼
小程序、H5應(yīng)用的快速發(fā)展,使得前后端分離已經(jīng)成為了趨勢(shì),本文主要介紹了Spring?Boot?集成JWT實(shí)現(xiàn)前后端認(rèn)證,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-04-04
springMVC+ajax實(shí)現(xiàn)文件上傳且?guī)нM(jìn)度條實(shí)例
本篇文章主要介紹了springMVC+ajax實(shí)現(xiàn)文件上傳且?guī)нM(jìn)度條實(shí)例,具有一定的參考價(jià)值,有興趣的可以了解一下。2017-01-01
SpringBoot實(shí)現(xiàn)權(quán)限驗(yàn)證的示例步驟
權(quán)限驗(yàn)證是一種用于控制對(duì)系統(tǒng)資源和操作的訪問的機(jī)制。它允許開發(fā)人員定義誰(shuí)可以執(zhí)行特定操作或訪問特定資源,并確保只有經(jīng)過授權(quán)的用戶才能執(zhí)行這些操作,這篇文章主要介紹了SpringBoot實(shí)現(xiàn)權(quán)限驗(yàn)證,需要的朋友可以參考下2023-08-08
java使用JSCH實(shí)現(xiàn)SFTP文件管理
這篇文章主要為大家詳細(xì)介紹了java使用JSCH實(shí)現(xiàn)SFTP文件管理,實(shí)現(xiàn)上傳、下載等功能,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2019-08-08
微信公眾號(hào)測(cè)試賬號(hào)自定義菜單的實(shí)例代碼
這篇文章主要介紹了微信公眾號(hào)測(cè)試賬號(hào)自定義菜單的實(shí)例代碼,非常不錯(cuò),具有參考借鑒價(jià)值,需要的朋友可以參考下2017-02-02

