Java并行處理的實(shí)現(xiàn)
1. 背景
本文是一個(gè)短文章,介紹Java 中的并行處理。
說(shuō)明:10多分鐘讀完的文章我稱之為短文章,適合快速閱讀。
2.知識(shí)
并行計(jì)算(parallel computing)一般是指許多指令得以同時(shí)進(jìn)行的計(jì)算模式。在同時(shí)進(jìn)行的前提下,可以將計(jì)算的過(guò)程分解成小部分,之后以并發(fā)方式來(lái)加以解決。
也就是分解為幾個(gè)過(guò)程:
1、將一個(gè)大任務(wù)拆分成多個(gè)子任務(wù),子任務(wù)還可以繼續(xù)拆分。
2、各個(gè)子任務(wù)同時(shí)進(jìn)行運(yùn)算執(zhí)行。
3、在執(zhí)行完畢后,可能會(huì)有個(gè) " 歸納 " 的任務(wù),比如 求和,求平均等。
再簡(jiǎn)化一點(diǎn)的理解就是: 先拆分 --> 在同時(shí)進(jìn)行計(jì)算 --> 最后“歸納”
為什么要“并行”,優(yōu)點(diǎn)呢?
1、為了獲得 “節(jié)省時(shí)間”,“快”。適合用于大規(guī)模運(yùn)算的場(chǎng)景。從理論上講,在 n 個(gè)并行處理的執(zhí)行速度可能會(huì)是在單一處理機(jī)上執(zhí)行的速度的 n 倍。
2、以前的計(jì)算機(jī)是單核的,現(xiàn)代的計(jì)算機(jī)Cpu都是多核的,服務(wù)器甚至都是多Cpu的,并行計(jì)算可以充分利用硬件的性能。
3. Java 中的并行處理
JDK 8 新增的Stream API(java.util.stream)將生成環(huán)境的函數(shù)式編程引入了Java庫(kù)中,可以方便開(kāi)發(fā)者能夠?qū)懗龈佑行?、更加?jiǎn)潔的代碼。
steam 的另一個(gè)價(jià)值是創(chuàng)造性地支持并行處理(parallel processing)。示例:
final Collection< Task > tasks = Arrays.asList( new Task( Status.OPEN, 5 ), new Task( Status.OPEN, 13 ), new Task( Status.CLOSED, 8 ) ); // 并行執(zhí)行多個(gè)任務(wù),并 求和 final double totalPoints = tasks .stream() .parallel() .map( task -> task.getPoints() ) // or map( Task::getPoints ) .reduce( 0, Integer::sum ); System.out.println( "Total points (all tasks): " + totalPoints );
對(duì)于上面的tasks集合,上面的代碼計(jì)算所有任務(wù)的點(diǎn)數(shù)之和。
它使用 parallel 方法并行處理所有的task,并使用 reduce 方法計(jì)算最終的結(jié)果。
4. 擴(kuò)展
線程池方式實(shí)現(xiàn)并行處理
jdk1.5引入了并發(fā)包,其中包括了ThreadPoolExecutor,相關(guān)代碼如下:
public class ExecutorServiceTest { public static final int THRESHOLD = 10_000; public static long[] numbers; public static void main(String[] args) throws Exception { numbers = LongStream.rangeClosed(1, 10_000_000).toArray(); ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1); CompletionService<Long> completionService = new ExecutorCompletionService<Long>(executor); int taskSize = (int) (numbers.length / THRESHOLD); for (int i = 1; i <= taskSize; i++) { final int key = i; completionService.submit(new Callable<Long>() { @Override public Long call() throws Exception { return sum((key - 1) * THRESHOLD, key * THRESHOLD); } }); } long sumValue = 0; for (int i = 0; i < taskSize; i++) { sumValue += completionService.take().get(); } // 所有任務(wù)已經(jīng)完成,關(guān)閉線程池 System.out.println("sumValue = " + sumValue); executor.shutdown(); } private static long sum(int start, int end) { long sum = 0; for (int i = start; i < end; i++) { sum += numbers[i]; } return sum; } }
使用 fork/join框架
分支/合并框架的目的是以遞歸的方式將可以并行的認(rèn)為拆分成更小的任務(wù),然后將每個(gè)子任務(wù)的結(jié)果合并起來(lái)生成整體結(jié)果;相關(guān)代碼如下:
public class ForkJoinTest extends java.util.concurrent.RecursiveTask<Long> { private static final long serialVersionUID = 1L; private final long[] numbers; private final int start; private final int end; public static final long THRESHOLD = 10_000; public ForkJoinTest(long[] numbers) { this(numbers, 0, numbers.length); } private ForkJoinTest(long[] numbers, int start, int end) { this.numbers = numbers; this.start = start; this.end = end; } @Override protected Long compute() { int length = end - start; if (length <= THRESHOLD) { return computeSequentially(); } ForkJoinTest leftTask = new ForkJoinTest(numbers, start, start + length / 2); leftTask.fork(); ForkJoinTest rightTask = new ForkJoinTest(numbers, start + length / 2, end); Long rightResult = rightTask.compute(); // 注:join方法會(huì)阻塞,因此有必要在兩個(gè)子任務(wù)的計(jì)算都開(kāi)始之后才執(zhí)行join方法 Long leftResult = leftTask.join(); return leftResult + rightResult; } private long computeSequentially() { long sum = 0; for (int i = start; i < end; i++) { sum += numbers[i]; } return sum; } public static void main(String[] args) { System.out.println(forkJoinSum(10_000_000)); } public static long forkJoinSum(long n) { long[] numbers = LongStream.rangeClosed(1, n).toArray(); ForkJoinTask<Long> task = new ForkJoinTest(numbers); return new ForkJoinPool().invoke(task); } }
上面的代碼實(shí)現(xiàn)了 遞歸方式拆分子任務(wù),并放入到線程池中執(zhí)行。
5.參考:
https://zh.wikipedia.org/wiki/%E5%B9%B6%E8%A1%8C%E8%AE%A1%E7%AE%97
到此這篇關(guān)于Java并行處理的實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)Java并行處理內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java內(nèi)部類之成員內(nèi)部類、局部?jī)?nèi)部類和匿名內(nèi)部類用法及說(shuō)明
這篇文章主要介紹了java內(nèi)部類之成員內(nèi)部類、局部?jī)?nèi)部類和匿名內(nèi)部類的用法及說(shuō)明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-12-12詳解SpringMVC學(xué)習(xí)系列之國(guó)際化
這篇文章主要介紹了詳解SpringMVC學(xué)習(xí)系列之國(guó)際化,詳細(xì)的介紹了關(guān)于瀏覽器,Session,Cookie,URL請(qǐng)求的國(guó)際化的實(shí)現(xiàn),有興趣的可以了解一下2017-07-07Mybatis-Plus 條件構(gòu)造器 QueryWrapper 的基本用法
這篇文章主要介紹了Mybatis-Plus - 條件構(gòu)造器 QueryWrapper 的使用,通過(guò)實(shí)例代碼給大家介紹了查詢示例代碼及實(shí)現(xiàn)需求,代碼簡(jiǎn)單易懂,對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-09-09MyBatis學(xué)習(xí)教程(六)-調(diào)用存儲(chǔ)過(guò)程
這篇文章主要介紹了MyBatis學(xué)習(xí)教程(六)-調(diào)用存儲(chǔ)過(guò)程的相關(guān)資料,非常不錯(cuò),具有參考借鑒價(jià)值,感興趣的朋友一起看下吧2016-05-05springboot集成kafka消費(fèi)手動(dòng)啟動(dòng)停止操作
這篇文章主要介紹了springboot集成kafka消費(fèi)手動(dòng)啟動(dòng)停止操作,本文給大家介紹項(xiàng)目場(chǎng)景及解決分析,結(jié)合實(shí)例代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下2022-09-09簡(jiǎn)談java并發(fā)FutureTask的實(shí)現(xiàn)
這篇文章主要介紹了簡(jiǎn)談java并發(fā)FutureTask的實(shí)現(xiàn),FutureTask都是用于獲取線程執(zhí)行的返回結(jié)果。文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,,需要的朋友可以參考下2019-06-06Gradle構(gòu)建多模塊項(xiàng)目的方法步驟
這篇文章主要介紹了Gradle構(gòu)建多模塊項(xiàng)目的方法步驟,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-05-05