java中的forkjoin框架的使用
fork join框架是java 7中引入框架,這個(gè)框架的引入主要是為了提升并行計(jì)算的能力。
fork join主要有兩個(gè)步驟,第一就是fork,將一個(gè)大任務(wù)分成很多個(gè)小任務(wù),第二就是join,將第一個(gè)任務(wù)的結(jié)果join起來,生成最后的結(jié)果。如果第一步中并沒有任何返回值,join將會(huì)等到所有的小任務(wù)都結(jié)束。
還記得之前的文章我們講到了thread pool的基本結(jié)構(gòu)嗎?
- ExecutorService - ForkJoinPool 用來調(diào)用任務(wù)執(zhí)行。
- workerThread - ForkJoinWorkerThread 工作線程,用來執(zhí)行具體的任務(wù)。
- task - ForkJoinTask 用來定義要執(zhí)行的任務(wù)。
下面我們從這三個(gè)方面來詳細(xì)講解fork join框架。
ForkJoinPool
ForkJoinPool是一個(gè)ExecutorService的一個(gè)實(shí)現(xiàn),它提供了對(duì)工作線程和線程池的一些便利管理方法。
public class ForkJoinPool extends AbstractExecutorService
一個(gè)work thread一次只能處理一個(gè)任務(wù),但是ForkJoinPool并不會(huì)為每個(gè)任務(wù)都創(chuàng)建一個(gè)單獨(dú)的線程,它會(huì)使用一個(gè)特殊的數(shù)據(jù)結(jié)構(gòu)double-ended queue來存儲(chǔ)任務(wù)。這樣的結(jié)構(gòu)可以方便的進(jìn)行工作竊取(work-stealing)。
什么是work-stealing呢?
默認(rèn)情況下,work thread從分配給自己的那個(gè)隊(duì)列頭中取出任務(wù)。如果這個(gè)隊(duì)列是空的,那么這個(gè)work thread會(huì)從其他的任務(wù)隊(duì)列尾部取出任務(wù)來執(zhí)行,或者從全局隊(duì)列中取出。這樣的設(shè)計(jì)可以充分利用work thread的性能,提升并發(fā)能力。
下面看下怎么創(chuàng)建一個(gè)ForkJoinPool。
最常見的方法就是使用ForkJoinPool.commonPool()來創(chuàng)建,commonPool()為所有的ForkJoinTask提供了一個(gè)公共默認(rèn)的線程池。
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
另外一種方式是使用構(gòu)造函數(shù):
ForkJoinPool forkJoinPool = new ForkJoinPool(2);
這里的參數(shù)是并行級(jí)別,2指的是線程池將會(huì)使用2個(gè)處理器核心。
ForkJoinWorkerThread
ForkJoinWorkerThread是使用在ForkJoinPool的工作線程。
public class ForkJoinWorkerThread extends Thread }
和一般的線程不一樣的是它定義了兩個(gè)變量:
final ForkJoinPool pool; // the pool this thread works in final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics
一個(gè)是該worker thread所屬的ForkJoinPool。 另外一個(gè)是支持 work-stealing機(jī)制的Queue。
再看一下它的run方法:
public void run() {
if (workQueue.array == null) { // only run once
Throwable exception = null;
try {
onStart();
pool.runWorker(workQueue);
} catch (Throwable ex) {
exception = ex;
} finally {
try {
onTermination(exception);
} catch (Throwable ex) {
if (exception == null)
exception = ex;
} finally {
pool.deregisterWorker(this, exception);
}
}
}
}
簡(jiǎn)單點(diǎn)講就是從Queue中取出任務(wù)執(zhí)行。
ForkJoinTask
ForkJoinTask是ForkJoinPool中運(yùn)行的任務(wù)類型。通常我們會(huì)用到它的兩個(gè)子類:RecursiveAction和RecursiveTask<V>。
他們都定義了一個(gè)需要實(shí)現(xiàn)的compute()方法用來實(shí)現(xiàn)具體的業(yè)務(wù)邏輯。不同的是RecursiveAction只是用來執(zhí)行任務(wù),而RecursiveTask<V>可以有返回值。
既然兩個(gè)類都帶了Recursive,那么具體的實(shí)現(xiàn)邏輯也會(huì)跟遞歸有關(guān),我們舉個(gè)使用RecursiveAction來打印字符串的例子:
public class CustomRecursiveAction extends RecursiveAction {
private String workload = "";
private static final int THRESHOLD = 4;
private static Logger logger =
Logger.getAnonymousLogger();
public CustomRecursiveAction(String workload) {
this.workload = workload;
}
@Override
protected void compute() {
if (workload.length() > THRESHOLD) {
ForkJoinTask.invokeAll(createSubtasks());
} else {
processing(workload);
}
}
private List<CustomRecursiveAction> createSubtasks() {
List<CustomRecursiveAction> subtasks = new ArrayList<>();
String partOne = workload.substring(0, workload.length() / 2);
String partTwo = workload.substring(workload.length() / 2, workload.length());
subtasks.add(new CustomRecursiveAction(partOne));
subtasks.add(new CustomRecursiveAction(partTwo));
return subtasks;
}
private void processing(String work) {
String result = work.toUpperCase();
logger.info("This result - (" + result + ") - was processed by "
+ Thread.currentThread().getName());
}
}
上面的例子使用了二分法來打印字符串。
我們?cè)倏匆粋€(gè)RecursiveTask<V>的例子:
public class CustomRecursiveTask extends RecursiveTask<Integer> {
private int[] arr;
private static final int THRESHOLD = 20;
public CustomRecursiveTask(int[] arr) {
this.arr = arr;
}
@Override
protected Integer compute() {
if (arr.length > THRESHOLD) {
return ForkJoinTask.invokeAll(createSubtasks())
.stream()
.mapToInt(ForkJoinTask::join)
.sum();
} else {
return processing(arr);
}
}
private Collection<CustomRecursiveTask> createSubtasks() {
List<CustomRecursiveTask> dividedTasks = new ArrayList<>();
dividedTasks.add(new CustomRecursiveTask(
Arrays.copyOfRange(arr, 0, arr.length / 2)));
dividedTasks.add(new CustomRecursiveTask(
Arrays.copyOfRange(arr, arr.length / 2, arr.length)));
return dividedTasks;
}
private Integer processing(int[] arr) {
return Arrays.stream(arr)
.filter(a -> a > 10 && a < 27)
.map(a -> a * 10)
.sum();
}
}
和上面的例子很像,不過這里我們需要有返回值。
在ForkJoinPool中提交Task
有了上面的兩個(gè)任務(wù),我們就可以在ForkJoinPool中提交了:
int[] intArray= {12,12,13,14,15};
CustomRecursiveTask customRecursiveTask= new CustomRecursiveTask(intArray);
int result = forkJoinPool.invoke(customRecursiveTask);
System.out.println(result);
上面的例子中,我們使用invoke來提交,invoke將會(huì)等待任務(wù)的執(zhí)行結(jié)果。
如果不使用invoke,我們也可以將其替換成fork()和join():
customRecursiveTask.fork(); int result2= customRecursiveTask.join(); System.out.println(result2);
fork() 是將任務(wù)提交給pool,但是并不觸發(fā)執(zhí)行, join()將會(huì)真正的執(zhí)行并且得到返回結(jié)果。
本文的例子可以參考https://github.com/ddean2009/learn-java-concurrency/tree/master/forkjoin
到此這篇關(guān)于java中的fork join框架的使用的文章就介紹到這了,更多相關(guān)java fork join框架內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java多線程編程實(shí)現(xiàn)socket通信示例代碼
這篇文章主要介紹了Java多線程編程實(shí)現(xiàn)socket通信示例代碼,詳細(xì)介紹了tcp、udp協(xié)議,以及基于socket的Java網(wǎng)絡(luò)編程的相關(guān)內(nèi)容及代碼示例,代碼測(cè)試可用,供大家參考。2017-10-10
利用Spring Boot創(chuàng)建docker image的完整步驟
這篇文章主要給大家介紹了關(guān)于如何利用Spring Boot創(chuàng)建docker image的完整步驟,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-08-08
springboot配置Jackson返回統(tǒng)一默認(rèn)值的實(shí)現(xiàn)示例
在項(xiàng)目開發(fā)中,我們返回的數(shù)據(jù)或者對(duì)象沒有的時(shí)候一般直接返回的null,那么如何返回統(tǒng)一默認(rèn)值,感興趣的可以了解一下2021-07-07
Spring Security 密碼驗(yàn)證動(dòng)態(tài)加鹽的驗(yàn)證處理方法
小編最近在改造項(xiàng)目,需要將gateway整合security在一起進(jìn)行認(rèn)證和鑒權(quán),今天小編給大家分享Spring Security 密碼驗(yàn)證動(dòng)態(tài)加鹽的驗(yàn)證處理方法,感興趣的朋友一起看看吧2021-06-06
Java處理科學(xué)計(jì)數(shù)法數(shù)字方式
這篇文章主要介紹了Java處理科學(xué)計(jì)數(shù)法數(shù)字方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-07-07
java生成字母數(shù)字組合的隨機(jī)數(shù)示例 java生成隨機(jī)數(shù)
這篇文章主要介紹了java生成字母數(shù)字組合的隨機(jī)數(shù)的示例,大家參考使用吧2014-01-01
java將一個(gè)整數(shù)轉(zhuǎn)化成二進(jìn)制代碼示例
這篇文章主要介紹了java將一個(gè)整數(shù)轉(zhuǎn)化成二進(jìn)制代碼示例,具有一定借鑒價(jià)值,需要的朋友可以參考下2017-12-12

