一文帶你了解Java中的ForkJoin
前言:
ForkJoin是在Java7中新加入的特性,大家可能對其比較陌生,但是Java8中Stream的并行流parallelStream就是依賴于ForkJoin。在ForkJoin體系中最為關鍵的就是ForkJoinTask和ForkJoinPool,F(xiàn)orkJoin就是利用分治的思想將大的任務按照一定規(guī)則Fork拆分成小任務,再通過Join聚合起來。
什么是ForkJoin?
ForkJoin 從字面上看Fork是分岔的意思,Join是結合的意思,我們可以理解為將大任務拆分成小任務進行計算求解,最后將小任務的結果進行結合求出大任務的解,這些裂變出來的小任務,我們就可以交給不同的線程去進行計算,這也就是分布式計算的一種思想。這與大數(shù)據(jù)中的分布式離線計算MapReduce類似,對ForkJoin最經(jīng)典的一個應用就是Java8中的Stream,我們知道Stream分為串行流和并行流,其中并行流parallelStream就是依賴于ForkJoin來實現(xiàn)并行處理的。
下面我們一起來看一下最為核心的ForkJoinTask和ForkJoinPool。
ForkJoinTask 任務
ForkJoinTask本身的依賴關系并不復雜,它與異步任務計算FutureTask一樣均實現(xiàn)了Future接口,F(xiàn)utureTask我們在之前的文章中有講到感興趣的可以閱讀一下——Java從源碼看異步任務計算FutureTask

下面我們就ForkJoinTask的核心源碼來研究一下,該任務是如何通過分治法進行計算。
ForkJoinTask最核心的莫過于fork()和join()方法了。
fork()
- 判斷當前線程是不是ForkJoinWorkerThread線程
- 是 直接將當前線程push到工作隊列中
- 否 調用ForkJoinPool 的externalPush方法
在ForkJoinPool構建了一個靜態(tài)的common對象,這里調用的就是common的externalPush()
join()
- 調用doJoin()方法,等待線程執(zhí)行完成
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
// 獲取結果的方法由子類實現(xiàn)
public abstract V getRawResult(); RecursiveTask 是ForkJoinTask的一個子類主要對獲取結果的方法進行了實現(xiàn),通過泛型約束結果。我們如果需要自己創(chuàng)建任務,仍需要實現(xiàn)RecursiveTask,并去編寫最為核心的計算方法compute()。
public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
private static final long serialVersionUID = 5232453952276485270L;
V result;
protected abstract V compute();
public final V getRawResult() {
return result;
}
protected final void setRawResult(V value) {
result = value;
}
protected final boolean exec() {
result = compute();
return true;
}
}ForkJoinPool 線程池
ForkJoinTask 中許多功能都依賴于ForkJoinPool線程池,所以說ForkJoinTask運行離不開ForkJoinPool,F(xiàn)orkJoinPool與ThreadPoolExecutor有許多相似之處,他是專門用來執(zhí)行ForkJoinTask任務的線程池,我之前也有文章對線程池技術進行了介紹,感興趣的可以進行閱讀——從java源碼分析線程池(池化技術)的實現(xiàn)原理
ForkJoinPool與ThreadPoolExecutor的繼承關系幾乎是相同的,他們相當于兄弟關系。

工作竊取算法
ForkJoinPool中采取工作竊取算法,如果每次fork子任務如果都去創(chuàng)建新線程去處理的話,對系統(tǒng)資源的開銷是巨大的,所以必須采取線程池。一般的線程池只有一個任務隊列,但是對于ForkJoinPool來說,由于同一個任務Fork出的各個子任務是平行關系,為了提高效率,減少線程的競爭,需要將這些平行的任務放到不同的隊列中,由于線程處理不同任務的速度不同,這樣就可能存在某個線程先執(zhí)行完了自己隊列中的任務,這時為了提升效率,就可以讓該線程去“竊取”其它任務隊列中的任務,這就是所謂的“工作竊取算法”。
對于一般的隊列來說,入隊元素都是在隊尾,出隊元素在隊首,要滿足“工作竊取”的需求,任務隊列應該支持從“隊尾”出隊元素,這樣可以減少與其它工作線程的沖突(因為其它工作線程會從隊首獲取自己任務隊列中的任務),這時就需要使用雙端阻塞隊列來解決。
構造方法
首先我們來看ForkJoinPool線程池的構造方法,他為我們提供了三種形式的構造,其中最為復雜的是四個入?yún)⒌臉嬙欤旅嫖覀兛匆幌滤膫€入?yún)⒍即硎裁矗?/p>
- int parallelism 可并行級別(不代表最多存在的線程數(shù)量)
- ForkJoinWorkerThreadFactory factory 線程創(chuàng)建工廠
- UncaughtExceptionHandler handler 異常捕獲處理器
- boolean asyncMode 先進先出的工作模式 或者 后進先出的工作模式
public ForkJoinPool() {
this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
defaultForkJoinWorkerThreadFactory, null, false);
}
public ForkJoinPool(int parallelism) {
this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}提交方法
下面我們看一下提交任務的方法:
externalPush這個方法我們很眼熟,它正是在fork的時候如果當前線程不是ForkJoinWorkerThread,新提交任務也是會通過這個方法去執(zhí)行任務。由此可見,fork就是新建一個子任務進行提交。
externalSubmit是最為核心的一個方法,它可以首次向池提交第一個任務,并執(zhí)行二次初始化。它還可以檢測外部線程的首次提交,并創(chuàng)建一個新的共享隊列。
signalWork(ws, q)是發(fā)送工作信號,讓工作隊列進行運轉。
public ForkJoinTask<?> submit(Runnable task) {
if (task == null)
throw new NullPointerException();
ForkJoinTask<?> job;
if (task instanceof ForkJoinTask<?>) // avoid re-wrap
job = (ForkJoinTask<?>) task;
else
job = new ForkJoinTask.AdaptedRunnableAction(task);
externalPush(job);
return job;
}
final void externalPush(ForkJoinTask<?> task) {
WorkQueue[] ws; WorkQueue q; int m;
int r = ThreadLocalRandom.getProbe();
int rs = runState;
if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
(q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
U.compareAndSwapInt(q, QLOCK, 0, 1)) {
ForkJoinTask<?>[] a; int am, n, s;
if ((a = q.array) != null &&
(am = a.length - 1) > (n = (s = q.top) - q.base)) {
int j = ((am & s) << ASHIFT) + ABASE;
U.putOrderedObject(a, j, task);
U.putOrderedInt(q, QTOP, s + 1);
U.putOrderedInt(q, QLOCK, 0);
if (n <= 1)
signalWork(ws, q);
return;
}
U.compareAndSwapInt(q, QLOCK, 1, 0);
}
externalSubmit(task);
}
private void externalSubmit(ForkJoinTask<?> task) {
int r; // initialize caller's probe
if ((r = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit();
r = ThreadLocalRandom.getProbe();
}
for (;;) {
WorkQueue[] ws; WorkQueue q; int rs, m, k;
boolean move = false;
if ((rs = runState) < 0) {
tryTerminate(false, false); // help terminate
throw new RejectedExecutionException();
}
else if ((rs & STARTED) == 0 || // initialize
((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
int ns = 0;
rs = lockRunState();
try {
if ((rs & STARTED) == 0) {
U.compareAndSwapObject(this, STEALCOUNTER, null,
new AtomicLong());
// create workQueues array with size a power of two
int p = config & SMASK; // ensure at least 2 slots
int n = (p > 1) ? p - 1 : 1;
n |= n >>> 1; n |= n >>> 2; n |= n >>> 4;
n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
workQueues = new WorkQueue[n];
ns = STARTED;
}
} finally {
unlockRunState(rs, (rs & ~RSLOCK) | ns);
}
}
else if ((q = ws[k = r & m & SQMASK]) != null) {
if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
ForkJoinTask<?>[] a = q.array;
int s = q.top;
boolean submitted = false; // initial submission or resizing
try { // locked version of push
if ((a != null && a.length > s + 1 - q.base) ||
(a = q.growArray()) != null) {
int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
U.putOrderedObject(a, j, task);
U.putOrderedInt(q, QTOP, s + 1);
submitted = true;
}
} finally {
U.compareAndSwapInt(q, QLOCK, 1, 0);
}
if (submitted) {
signalWork(ws, q);
return;
}
}
move = true; // move on failure
}
else if (((rs = runState) & RSLOCK) == 0) { // create new queue
q = new WorkQueue(this, null);
q.hint = r;
q.config = k | SHARED_QUEUE;
q.scanState = INACTIVE;
rs = lockRunState(); // publish index
if (rs > 0 && (ws = workQueues) != null &&
k < ws.length && ws[k] == null)
ws[k] = q; // else terminated
unlockRunState(rs, rs & ~RSLOCK);
}
else
move = true; // move if busy
if (move)
r = ThreadLocalRandom.advanceProbe(r);
}
}創(chuàng)建工人(線程)
提交任務后,通過signalWork(ws, q)方法,發(fā)送工作信號,當符合沒有執(zhí)行完畢,且沒有出現(xiàn)異常的條件下,循環(huán)執(zhí)行任務,根據(jù)控制變量嘗試添加工人(線程),通過線程工廠,生成線程,并且啟動線程,也控制著工人(線程)的下崗。
final void signalWork(WorkQueue[] ws, WorkQueue q) {
long c; int sp, i; WorkQueue v; Thread p;
while ((c = ctl) < 0L) { // too few active
if ((sp = (int)c) == 0) { // no idle workers
if ((c & ADD_WORKER) != 0L) // too few workers
tryAddWorker(c);
break;
}
if (ws == null) // unstarted/terminated
break;
if (ws.length <= (i = sp & SMASK)) // terminated
break;
if ((v = ws[i]) == null) // terminating
break;
int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState
int d = sp - v.scanState; // screen CAS
long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
v.scanState = vs; // activate v
if ((p = v.parker) != null)
U.unpark(p);
break;
}
if (q != null && q.base == q.top) // no more work
break;
}
}
private void tryAddWorker(long c) {
boolean add = false;
do {
long nc = ((AC_MASK & (c + AC_UNIT)) |
(TC_MASK & (c + TC_UNIT)));
if (ctl == c) {
int rs, stop; // check if terminating
if ((stop = (rs = lockRunState()) & STOP) == 0)
add = U.compareAndSwapLong(this, CTL, c, nc);
unlockRunState(rs, rs & ~RSLOCK);
if (stop != 0)
break;
if (add) {
createWorker();
break;
}
}
} while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
}
private boolean createWorker() {
ForkJoinWorkerThreadFactory fac = factory;
Throwable ex = null;
ForkJoinWorkerThread wt = null;
try {
if (fac != null && (wt = fac.newThread(this)) != null) {
wt.start();
return true;
}
} catch (Throwable rex) {
ex = rex;
}
deregisterWorker(wt, ex);
return false;
}
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
WorkQueue w = null;
if (wt != null && (w = wt.workQueue) != null) {
WorkQueue[] ws; // remove index from array
int idx = w.config & SMASK;
int rs = lockRunState();
if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w)
ws[idx] = null;
unlockRunState(rs, rs & ~RSLOCK);
}
long c; // decrement counts
do {} while (!U.compareAndSwapLong
(this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) |
(TC_MASK & (c - TC_UNIT)) |
(SP_MASK & c))));
if (w != null) {
w.qlock = -1; // ensure set
w.transferStealCount(this);
w.cancelAll(); // cancel remaining tasks
}
for (;;) { // possibly replace
WorkQueue[] ws; int m, sp;
if (tryTerminate(false, false) || w == null || w.array == null ||
(runState & STOP) != 0 || (ws = workQueues) == null ||
(m = ws.length - 1) < 0) // already terminating
break;
if ((sp = (int)(c = ctl)) != 0) { // wake up replacement
if (tryRelease(c, ws[sp & m], AC_UNIT))
break;
}
else if (ex != null && (c & ADD_WORKER) != 0L) {
tryAddWorker(c); // create replacement
break;
}
else // don't need replacement
break;
}
if (ex == null) // help clean on way out
ForkJoinTask.helpExpungeStaleExceptions();
else // rethrow
ForkJoinTask.rethrow(ex);
}
public static interface ForkJoinWorkerThreadFactory {
public ForkJoinWorkerThread newThread(ForkJoinPool pool);
}
static final class DefaultForkJoinWorkerThreadFactory
implements ForkJoinWorkerThreadFactory {
public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
return new ForkJoinWorkerThread(pool);
}
}
protected ForkJoinWorkerThread(ForkJoinPool pool) {
// Use a placeholder until a useful name can be set in registerWorker
super("aForkJoinWorkerThread");
this.pool = pool;
this.workQueue = pool.registerWorker(this);
}
final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
UncaughtExceptionHandler handler;
wt.setDaemon(true); // configure thread
if ((handler = ueh) != null)
wt.setUncaughtExceptionHandler(handler);
WorkQueue w = new WorkQueue(this, wt);
int i = 0; // assign a pool index
int mode = config & MODE_MASK;
int rs = lockRunState();
try {
WorkQueue[] ws; int n; // skip if no array
if ((ws = workQueues) != null && (n = ws.length) > 0) {
int s = indexSeed += SEED_INCREMENT; // unlikely to collide
int m = n - 1;
i = ((s << 1) | 1) & m; // odd-numbered indices
if (ws[i] != null) { // collision
int probes = 0; // step by approx half n
int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
while (ws[i = (i + step) & m] != null) {
if (++probes >= n) {
workQueues = ws = Arrays.copyOf(ws, n <<= 1);
m = n - 1;
probes = 0;
}
}
}
w.hint = s; // use as random seed
w.config = i | mode;
w.scanState = i; // publication fence
ws[i] = w;
}
} finally {
unlockRunState(rs, rs & ~RSLOCK);
}
wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
return w;
}例:ForkJoinTask實現(xiàn)歸并排序
這里我們就用經(jīng)典的歸并排序為例,構建一個我們自己的ForkJoinTask,按照歸并排序的思路,重寫其核心的compute()方法,通過ForkJoinPool.submit(task)提交任務,通過get()同步獲取任務執(zhí)行結果。
package com.zhj.interview;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class Test16 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
int[] bigArr = new int[10000000];
for (int i = 0; i < 10000000; i++) {
bigArr[i] = (int) (Math.random() * 10000000);
}
ForkJoinPool forkJoinPool = new ForkJoinPool();
MyForkJoinTask task = new MyForkJoinTask(bigArr);
long start = System.currentTimeMillis();
forkJoinPool.submit(task).get();
long end = System.currentTimeMillis();
System.out.println("耗時:" + (end-start));
}
}
class MyForkJoinTask extends RecursiveTask<int[]> {
private int source[];
public MyForkJoinTask(int source[]) {
if (source == null) {
throw new RuntimeException("參數(shù)有誤?。。?);
}
this.source = source;
}
@Override
protected int[] compute() {
int l = source.length;
if (l < 2) {
return Arrays.copyOf(source, l);
}
if (l == 2) {
if (source[0] > source[1]) {
int[] tar = new int[2];
tar[0] = source[1];
tar[1] = source[0];
return tar;
} else {
return Arrays.copyOf(source, l);
}
}
if (l > 2) {
int mid = l / 2;
MyForkJoinTask task1 = new MyForkJoinTask(Arrays.copyOf(source, mid));
task1.fork();
MyForkJoinTask task2 = new MyForkJoinTask(Arrays.copyOfRange(source, mid, l));
task2.fork();
int[] res1 = task1.join();
int[] res2 = task2.join();
int tar[] = merge(res1, res2);
return tar;
}
return null;
}
// 合并數(shù)組
private int[] merge(int[] res1, int[] res2) {
int l1 = res1.length;
int l2 = res2.length;
int l = l1 + l2;
int tar[] = new int[l];
for (int i = 0, i1 = 0, i2 = 0; i < l; i++) {
int v1 = i1 >= l1 ? Integer.MAX_VALUE : res1[i1];
int v2 = i2 >= l2 ? Integer.MAX_VALUE : res2[i2];
// 如果條件成立,說明應該取數(shù)組array1中的值
if(v1 < v2) {
tar[i] = v1;
i1++;
} else {
tar[i] = v2;
i2++;
}
}
return tar;
}
}ForkJoin計算流程
通過ForkJoinPool提交任務,獲取結果流程如下,拆分子任務不一定是二分的形式,可參照MapReduce的模式,也可以按照具體需求進行靈活的設計。

到此這篇關于一文帶你了解Java中的ForkJoin的文章就介紹到這了,更多相關Java中的ForkJoin內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
關于微服務使用Dubbo設置的端口和server.port的區(qū)別
這篇文章主要介紹了關于微服務使用Dubbo設置的端口和server.port的區(qū)別,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-12-12
springmvc+spring+mybatis實現(xiàn)用戶登錄功能(下)
這篇文章主要為大家詳細介紹了springmvc+spring+mybatis實現(xiàn)用戶登錄功能的第二篇,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-07-07
SpringMVC中解決@ResponseBody注解返回中文亂碼問題
這篇文章主要介紹了SpringMVC中解決@ResponseBody注解返回中文亂碼問題, 小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-04-04
SpringBoot中的@ConfigurationProperties注解解析
這篇文章主要介紹了SpringBoot中的@ConfigurationProperties注解解析,Spring源碼中大量使用了ConfigurationProperties注解,通過與其他注解配合使用,能夠實現(xiàn)Bean的按需配置,該注解可以放在類上,也可以放在方法上,需要的朋友可以參考下2023-11-11

