Java ForkJoinPool線程池的使用之并行計(jì)算數(shù)組求和實(shí)例
Java ForkJoinPool線程池的使用之并行計(jì)算數(shù)組求和
package com.zhangxueliang.juc;
import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;
public class ForkJoinPoolDemo {
static int[] nums = new int[1000000];
static final int MAX_NUM = 50000;
static Random r = new Random();
static {
for(int i=0; i<nums.length; i++) {
nums[i] = r.nextInt(100);
}
System.out.println("---" + Arrays.stream(nums).sum()); //stream api
}
static class AddTask extends RecursiveAction {
int start, end;
AddTask(int s, int e) {
start = s;
end = e;
}
@Override
protected void compute() {
if(end-start <= MAX_NUM) {
long sum = 0L;
for(int i=start; i<end; i++) sum += nums[i];
System.out.println("from:" + start + " to:" + end + " = " + sum);
} else {
int middle = start + (end-start)/2;
AddTask subTask1 = new AddTask(start, middle);
AddTask subTask2 = new AddTask(middle, end);
subTask1.fork();
subTask2.fork();
}
}
}
static class AddTaskRet extends RecursiveTask<Long> {
private static final long serialVersionUID = 1L;
int start, end;
AddTaskRet(int s, int e) {
start = s;
end = e;
}
@Override
protected Long compute() {
if(end-start <= MAX_NUM) {
long sum = 0L;
for(int i=start; i<end; i++) sum += nums[i];
return sum;
}
int middle = start + (end-start)/2;
AddTaskRet subTask1 = new AddTaskRet(start, middle);
AddTaskRet subTask2 = new AddTaskRet(middle, end);
subTask1.fork();
subTask2.fork();
return subTask1.join() + subTask2.join();
}
}
public static void main(String[] args) throws IOException {
/*ForkJoinPool fjp = new ForkJoinPool();
AddTask task = new AddTask(0, nums.length);
fjp.execute(task);*/
ForkJoinPoolDemo temp = new ForkJoinPoolDemo();
ForkJoinPool fjp = new ForkJoinPool();
AddTaskRet task = new AddTaskRet(0, nums.length);
fjp.execute(task);
long result = task.join();
System.out.println(result);
//System.in.read();
}
}
ForkJoinPool 示例代碼解析
這段代碼演示了 Java 中 ForkJoinPool 框架的使用,展示了兩種不同的任務(wù)分割方式:
RecursiveAction(無返回值)RecursiveTask(有返回值)
代碼結(jié)構(gòu)分析
1. 初始化部分
static int[] nums = new int[1000000]; // 創(chuàng)建包含100萬個(gè)元素的數(shù)組
static final int MAX_NUM = 50000; // 任務(wù)分割的閾值
static Random r = new Random(); // 隨機(jī)數(shù)生成器
// 靜態(tài)初始化塊:填充數(shù)組并計(jì)算總和
static {
for(int i=0; i<nums.length; i++) {
nums[i] = r.nextInt(100); // 每個(gè)元素賦值為0-99的隨機(jī)數(shù)
}
System.out.println("---" + Arrays.stream(nums).sum()); // 使用stream API計(jì)算總和作為驗(yàn)證基準(zhǔn)
}2. RecursiveAction 實(shí)現(xiàn)(無返回值)
static class AddTask extends RecursiveAction {
int start, end;
AddTask(int s, int e) {
start = s;
end = e;
}
@Override
protected void compute() {
if(end-start <= MAX_NUM) { // 如果任務(wù)足夠小,直接計(jì)算
long sum = 0L;
for(int i=start; i<end; i++) sum += nums[i];
System.out.println("from:" + start + " to:" + end + " = " + sum);
} else { // 否則分割任務(wù)
int middle = start + (end-start)/2;
AddTask subTask1 = new AddTask(start, middle);
AddTask subTask2 = new AddTask(middle, end);
subTask1.fork(); // 異步執(zhí)行子任務(wù)
subTask2.fork();
}
}
}3. RecursiveTask 實(shí)現(xiàn)(有返回值)
static class AddTaskRet extends RecursiveTask<Long> {
int start, end;
AddTaskRet(int s, int e) {
start = s;
end = e;
}
@Override
protected Long compute() {
if(end-start <= MAX_NUM) { // 如果任務(wù)足夠小,直接計(jì)算并返回結(jié)果
long sum = 0L;
for(int i=start; i<end; i++) sum += nums[i];
return sum;
}
// 分割任務(wù)
int middle = start + (end-start)/2;
AddTaskRet subTask1 = new AddTaskRet(start, middle);
AddTaskRet subTask2 = new AddTaskRet(middle, end);
subTask1.fork(); // 異步執(zhí)行子任務(wù)
subTask2.fork();
return subTask1.join() + subTask2.join(); // 合并子任務(wù)結(jié)果
}
}4. 主方法
public static void main(String[] args) throws IOException {
// 創(chuàng)建ForkJoinPool實(shí)例
ForkJoinPool fjp = new ForkJoinPool();
// 創(chuàng)建有返回值的任務(wù)
AddTaskRet task = new AddTaskRet(0, nums.length);
// 執(zhí)行任務(wù)
fjp.execute(task);
// 獲取并打印結(jié)果
long result = task.join();
System.out.println(result);
}關(guān)鍵概念解釋
ForkJoinPool:
- Java 7引入的線程池實(shí)現(xiàn)
- 使用工作竊取(work-stealing)算法提高并行效率
- 特別適合分治(divide-and-conquer)算法
RecursiveAction:
- 用于不返回結(jié)果的任務(wù)
- 需要實(shí)現(xiàn)
compute()方法 - 示例中的
AddTask只打印結(jié)果不返回
RecursiveTask:
- 用于需要返回結(jié)果的任務(wù)
- 需要實(shí)現(xiàn)
compute()方法并返回指定類型 - 示例中的
AddTaskRet返回子數(shù)組的和
fork()和join():
fork(): 異步安排任務(wù)執(zhí)行join(): 等待任務(wù)完成并獲取結(jié)果
執(zhí)行流程
- 初始化一個(gè)包含100萬個(gè)隨機(jī)數(shù)的數(shù)組
- 使用Stream API計(jì)算總和作為基準(zhǔn)
- 創(chuàng)建ForkJoinPool
- 創(chuàng)建AddTaskRet任務(wù),范圍是整個(gè)數(shù)組
- 任務(wù)會(huì)根據(jù)MAX_NUM閾值(50000)不斷分割,直到足夠小
- 小任務(wù)直接計(jì)算子數(shù)組和
- 合并所有子任務(wù)的結(jié)果得到最終總和
- 打印結(jié)果(應(yīng)與Stream API計(jì)算的結(jié)果一致)
使用建議
- 對于計(jì)算密集型任務(wù),F(xiàn)orkJoinPool通常比傳統(tǒng)線程池更高效
- 任務(wù)分割的閾值需要合理設(shè)置,太小會(huì)導(dǎo)致過多任務(wù)創(chuàng)建開銷,太大會(huì)降低并行度
- 有返回結(jié)果需求時(shí)使用RecursiveTask,否則使用RecursiveAction
- 注意join()是阻塞調(diào)用,會(huì)等待任務(wù)完成
這段代碼很好地展示了ForkJoin框架的分治思想和使用方法,是并行計(jì)算數(shù)組求和的經(jīng)典示例。
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
JFrame中添加和設(shè)置JPanel的方法實(shí)例解析
這篇文章主要介紹了JFrame中添加和設(shè)置JPanel的方法實(shí)例解析,具有一定借鑒價(jià)值2018-01-01
關(guān)于文件上傳MultipartBody的使用方法
這篇文章主要介紹了關(guān)于文件上傳MultipartBody的使用方法,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-06-06
JavaEE通過response實(shí)現(xiàn)請求重定向
這篇文章主要介紹了JavaEE通過response實(shí)現(xiàn)請求重定向的方法,非常的簡單實(shí)用,有需要的朋友可以參考下2014-10-10
SpringBoot集成Hadoop對HDFS的文件操作方法
這篇文章主要介紹了SpringBoot集成Hadoop對HDFS的文件操作方法,本文給大家介紹的非常詳細(xì),感興趣的朋友跟隨小編一起看看吧2024-07-07
升級dubbo2.7.4.1版本平滑遷移到注冊中心nacos
這篇文章主要為大家介紹了2.7.4.1的dubbo平滑遷移到注冊中心nacos的兩種版本升級方案,以及為什要升級,有需要的朋友可以借鑒參考下,希望能夠有所幫助2022-02-02
如何在 Linux 上搭建 java 部署環(huán)境(安裝jdk/tomcat/mys
這篇文章主要介紹了如何在 Linux 上搭建 java 部署環(huán)境(安裝jdk/tomcat/mysql) + 將程序部署到云服務(wù)器上的操作),本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-01-01

