Java并發(fā)編程之Fork/Join框架詳解
一、簡介
Fork/Join框架是Java7提供的一個用于并行執(zhí)行任務(wù)的框架,是一個把大任務(wù)分割成若干個小任務(wù),最終匯總每個小任務(wù)結(jié)果后得到大任務(wù)結(jié)果的框架。
我們再通過Fork和Join這兩個單詞來理解一下Fork/Join框架。
Fork就是把一個大任務(wù)切分為若干子任務(wù)并行的執(zhí)行,Join就是合并這些子任務(wù)的執(zhí)行結(jié)果,最后得到這個大任務(wù)的結(jié)果。
比如計算1+2+…+10000,可以分割成10個子任務(wù),每個子任務(wù)分別對1000個數(shù)進行求和,最終匯總這10個子任務(wù)的結(jié)果。
二、流程
1.分割任務(wù):
首先我們需要有一個fork類來把大任務(wù)分割成子任務(wù),有可能子任務(wù)還是很大,所以還需要不停地分割,直到分割出的子任務(wù)足夠小。
2.執(zhí)行任務(wù)并合并結(jié)果。
分割的子任務(wù)分別放在雙端隊列里,然后幾個啟動線程分別從雙端隊列里獲取任務(wù)執(zhí)行。子任務(wù)執(zhí)行完的結(jié)果都統(tǒng)一放在一個隊列里,啟動一個線程從隊列里拿數(shù)據(jù),然后合并這些數(shù)據(jù)。

三、工作竊取算法
工作竊?。╳ork-stealing)算法是指某個線程從其他隊列里竊取任務(wù)來執(zhí)行。
假如我們需要做一個比較大的任務(wù),可以把這個任務(wù)分割為若干互不依賴的子任務(wù),為了減少線程間的競爭,把這些子任務(wù)分別放到不同的隊列里,并為每個隊列創(chuàng)建一個單獨的線程來執(zhí)行隊列里的任務(wù),線程和隊列一一對應(yīng)。
比如A線程負(fù)責(zé)處理A隊列里的任務(wù)。但是,有的線程會先把自己隊列里的任務(wù)干完,而其他線程對應(yīng)的隊列里還有任務(wù)等待處理。
干完活的線程與其等著,不如去幫其他線程干活,于是它就去其他線程的隊列里竊取一個任務(wù)來執(zhí)行。
而在這時它們會訪問同一個隊列,所以為了減少竊取任務(wù)線程和被竊取任務(wù)線程之間的競爭,通常會使用雙端隊列,被竊取任務(wù)線程永遠(yuǎn)從雙端隊列的頭部拿任務(wù)執(zhí)行,而竊取任務(wù)的線程永遠(yuǎn)從雙端隊列的尾部拿任務(wù)執(zhí)行

優(yōu)點:充分利用線程進行并行計算,減少了線程間的競爭。
缺點:在某些情況下還是存在競爭,比如雙端隊列里只有一個任務(wù)時。并且該算法會消耗了更多的系統(tǒng)資源,比如創(chuàng)建多個線程和多個雙端隊列。
四、ForkJoin應(yīng)用
Fork/Join使用兩個類來完成分治的操作
1.ForkJoinTask
我們要使用ForkJoin框架,必須首先創(chuàng)建一個ForkJoin任務(wù)。它提供在任務(wù)中執(zhí)行fork()和join()操作的機制。通常情況下,我們不需要直接繼承ForkJoinTask類,只需要繼承它的子類,F(xiàn)ork/Join框架提供了以下兩個子類。
·RecursiveAction:用于沒有返回結(jié)果的任務(wù)。·RecursiveTask:用于有返回結(jié)果的任務(wù)核心方法:
fork():在當(dāng)前線程運行的線程池中創(chuàng)建一個子任務(wù);join():模塊子任務(wù)完成的時候返回任務(wù)結(jié)果;invoke():執(zhí)行任務(wù),也可以實時等待最終執(zhí)行結(jié)果
2.ForkJoinPool
線程池最大的特點就是分叉(fork)合并(join)模式,將一個大任務(wù)拆分成多個小任務(wù),并行執(zhí)行,再結(jié)合工作竊取算法提高整體的執(zhí)行效率,充分利用CPU資源。
3.實例
private static final Integer MAX = 200;
static class MyForkJoinTask extends RecursiveTask<Integer> {
// 子任務(wù)開始計算的值
private Integer startValue;
// 子任務(wù)結(jié)束計算的值
private Integer endValue;
public MyForkJoinTask(Integer startValue , Integer endValue) {
this.startValue = startValue;
this.endValue = endValue;
}
@Override
protected Integer compute() {
// 如果條件成立,說明這個任務(wù)所需要計算的數(shù)值分為足夠小了
// 可以正式進行累加計算了
if(endValue - startValue < MAX) {
System.out.println("開始計算的部分:startValue = " + startValue + ";endValue = " + endValue);
Integer totalValue = 0;
for(int index = this.startValue ; index <= this.endValue ; index++) {
totalValue += index;
}
return totalValue;
}
// 否則再進行任務(wù)拆分,拆分成兩個任務(wù)
else {
MyForkJoinTask subTask1 = new MyForkJoinTask(startValue, (startValue + endValue) / 2);
subTask1.fork();
MyForkJoinTask subTask2 = new MyForkJoinTask((startValue + endValue) / 2 + 1 , endValue);
subTask2.fork();
return subTask1.join() + subTask2.join();
}
}
}
public static void main(String[] args) {
// 這是Fork/Join框架的線程池
ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Integer> taskFuture = pool.submit(new MyForkJoinTask(1,1001));
try {
Integer result = taskFuture.get();
System.out.println("result = " + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace(System.out);
}
}到此這篇關(guān)于Java并發(fā)編程之Fork/Join框架詳解的文章就介紹到這了,更多相關(guān)Fork/Join框架內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring超詳細(xì)講解創(chuàng)建BeanDefinition流程
Spring在初始化過程中,將xml中定義的對象解析到了BeanDefinition對象中,我們有必要了解一下BeanDefinition的內(nèi)部結(jié)構(gòu),有助于我們理解Spring的初始化流程2022-06-06
SpringBoot注解@ConditionalOnClass底層源碼實現(xiàn)
這篇文章主要為大家介紹了SpringBoot注解@ConditionalOnClass底層源碼實現(xiàn),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-02-02
Springboot?配置SqlSessionFactory方式
這篇文章主要介紹了Springboot?配置SqlSessionFactory方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-12-12
SpringBoot整合RabbitMQ實現(xiàn)交換機與隊列的綁定
這篇文章將通過幾個實例為大家介紹一些SpringBoot中RabbitMQ如何綁定交換機(交換器)與隊列,文中的示例代碼講解詳細(xì),感興趣的可以了解一下2022-05-05
Java實現(xiàn)字符串反轉(zhuǎn)的常用方法小結(jié)
在Java中,你可以使用多種方法來反轉(zhuǎn)字符串,這篇文章主要為大家整理了幾種常見的反轉(zhuǎn)字符串的方法,感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2024-03-03

