Java中使用ForkJoinPool的實(shí)現(xiàn)示例
背景
使用ForkJoinPool去分解計(jì)算密集型任務(wù)且且并行地執(zhí)行他們以獲得更好的Java應(yīng)用程序的性能。
ForkJoinPool是一個(gè)功能強(qiáng)大的Java類,用于處理計(jì)算密集型任務(wù)。它的工作原理是將任務(wù)分解為更小的子任務(wù),然后并行執(zhí)行。該線程池使用分而治之策略進(jìn)行操作,使其能夠并發(fā)執(zhí)行任務(wù),從而提高吞吐量并減少處理時(shí)間。
ForkJoinPool的一個(gè)獨(dú)特的特點(diǎn)是它的工作竊取算法,用它來優(yōu)化性能。當(dāng)工作線程完成其分配的任務(wù)時(shí),它將從其他線程竊取任務(wù),確保所有線程都能高效工作,并且不會(huì)浪費(fèi)計(jì)算機(jī)資源。
ForkJoinPool廣泛用于Java的并行Stream流和CompletableFutures,使開發(fā)人員能夠輕松地同時(shí)執(zhí)行任務(wù)。此外,其他JVM語言(如Kotlin和Akka)使用此框架來構(gòu)建需要高并發(fā)性和彈性的消息驅(qū)動(dòng)應(yīng)用程序。
1.使用ForkJoinPool的線程池
ForkJoinPool
類存儲(chǔ)工作線程Worker,Worker是在機(jī)器的每個(gè)CPU核心上運(yùn)行的進(jìn)程。每個(gè)進(jìn)程都存儲(chǔ)在deque中,deque代表雙端隊(duì)列。一旦工作線程的任務(wù)用完,它就開始從其他工作線程竊取任務(wù)。
首先,將有一個(gè)分叉任務(wù)的過程;這意味著一個(gè)大任務(wù)將被分解為可以并行執(zhí)行的較小任務(wù)。所有子任務(wù)完成后,它們將重新加入。然后ForkJoinPool類提供一個(gè)結(jié)果,如圖1所示。
當(dāng)任務(wù)被提交到 ForkJoinPool
中,該進(jìn)程將被劃分為較小的進(jìn)程,并推送到共享隊(duì)列中。
一旦調(diào)用了fork()方法,就會(huì)并行調(diào)用任務(wù),直到基本條件為true。一旦處理被分叉,join()方法確保線程相互等待,直到進(jìn)程完成。
最初,所有任務(wù)都將提交到一個(gè)主隊(duì)列,該主隊(duì)列將把任務(wù)推送到工作線程。請(qǐng)注意,任務(wù)是使用LIFO(后進(jìn)先出)策略插入的,該策略與堆棧數(shù)據(jù)結(jié)構(gòu)相同。
另一個(gè)重要的點(diǎn)是ForkJoinPool使用deques來存儲(chǔ)任務(wù)。這提供了使用后進(jìn)先出或先進(jìn)先出(先進(jìn)先出)的能力,這對(duì)于竊取工作的算法是必要的。
2.工作竊取算法
ForkJoinPool中的工作竊取是一種有效的算法,通過平衡池中所有可用線程的工作負(fù)載,可以有效地使用計(jì)算機(jī)資源。
當(dāng)一個(gè)線程變?yōu)榭臻e時(shí),它將嘗試從仍忙于分配工作的其他線程中竊取任務(wù),而不是保持非活動(dòng)狀態(tài)。這個(gè)過程最大限度地利用了計(jì)算資源,并確保沒有線程負(fù)擔(dān)過重,而其他線程保持空閑。工作竊取算法背后的關(guān)鍵概念是,每個(gè)線程都有自己的任務(wù)組,并按后進(jìn)先出的順序執(zhí)行。當(dāng)一個(gè)線程完成自己的任務(wù)并變?yōu)榭臻e時(shí),它將嘗試從另一個(gè)線程的deque的末尾“竊取”任務(wù),遵循FIFO策略,與隊(duì)列數(shù)據(jù)結(jié)構(gòu)相同。這允許空閑線程拾取等待時(shí)間最長(zhǎng)的任務(wù),從而減少總體等待時(shí)間并提高吞吐量。
在下圖中,線程2通過輪詢線程1的deque中的最后一個(gè)元素,從線程1中竊取一個(gè)任務(wù),然后執(zhí)行該任務(wù)。被盜任務(wù)通常是deque中最古老的任務(wù),這確保了工作負(fù)載在池中的所有線程之間均勻分布。
總的來說,F(xiàn)orkJoinPool的工作竊取算法是一個(gè)強(qiáng)大的功能,它可以通過確保所有可用的計(jì)算資源都得到有效利用來顯著提高并行應(yīng)用程序的性能。
3.ForkJoinPool的主要類
讓我們快速了解一下支持使用ForkJoinPool進(jìn)行處理的主要類。
- ForkJoinPool:創(chuàng)建一個(gè)線程池來使用ForkJoin框架。它的工作原理與其他線程池類似。這個(gè)類中最重要的方法是commonPool(),它創(chuàng)建ForkJoin線程池。
- 遞歸操作:這個(gè)類的主要功能是計(jì)算遞歸操作。請(qǐng)記住,在compute()方法中,我們不會(huì)返回值。這是因?yàn)檫f歸發(fā)生在compute()方法中。
- RecursiveTask:這個(gè)類的工作原理類似于RecursiveAction,不同之處在于compute()方法將返回一個(gè)值。
4.使用遞歸操作
要使用RecursiveAction功能,我們需要繼承它并重寫compute()方法。然后,我們用想要實(shí)現(xiàn)的邏輯創(chuàng)建子任務(wù)。
在下面的代碼示例中,我們將以并行和遞歸的方式計(jì)算數(shù)組中每個(gè)數(shù)字的兩倍。我們被限制為并行計(jì)算二乘二的數(shù)組元素。
正如您所看到的,fork()方法調(diào)用compute()方法。一旦整個(gè)數(shù)組的每個(gè)元素都得到了和,遞歸調(diào)用就會(huì)停止。一旦遞歸地求和了數(shù)組的所有元素,我們就會(huì)顯示結(jié)果。
Listing 1. An example of RecursiveAction
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; public class ForkJoinDoubleAction { public static void main(String[] args) { ForkJoinPool forkJoinPool = new ForkJoinPool(); int[] array = {1, 5, 10, 15, 20, 25, 50}; DoubleNumber doubleNumberTask = new DoubleNumber(array, 0, array.length); // Invokes compute method forkJoinPool.invoke(doubleNumberTask); System.out.println(DoubleNumber.result); } } class DoubleNumber extends RecursiveAction { final int PROCESS_THRESHOLD = 2; int[] array; int startIndex, endIndex; static int result; DoubleNumber(int[] array, int startIndex, int endIndex) { this.array = array; this.startIndex = startIndex; this.endIndex = endIndex; } @Override protected void compute() { if (endIndex - startIndex <= PROCESS_THRESHOLD) { for (int i = startIndex; i < endIndex; i++) { result += array[i] * 2; } } else { int mid = (startIndex + endIndex) / 2; DoubleNumber leftArray = new DoubleNumber(array, startIndex, mid); DoubleNumber rightArray = new DoubleNumber(array, mid, endIndex); // Invokes the compute method recursively leftArray.fork(); rightArray.fork(); // Joins results from recursive invocations leftArray.join(); rightArray.join(); } } }
該計(jì)算的輸出為252。
RecursiveAction需要記住的一點(diǎn)是,它不會(huì)返回值。通過使用分而治之的策略來提高性能,也可以打破這個(gè)過程。
這就是我們?cè)谇鍐?中所做的,我們不是計(jì)算每個(gè)數(shù)組元素的二重,而是通過將數(shù)組分解為多個(gè)部分來并行計(jì)算。
同樣重要的是要注意,RecursiveAction在用于可以有效分解為較小子問題的任務(wù)時(shí)最有效。因此,RecursiveAction和ForkJoinPool應(yīng)該用于計(jì)算密集型任務(wù),在這些任務(wù)中,工作的并行化可以顯著提高性能。否則,由于線程的創(chuàng)建和管理,性能將更差。
5.資源任務(wù)
在下一個(gè)示例中,讓我們研究一個(gè)簡(jiǎn)單的程序,它遞歸地在中間中斷,直到達(dá)到基本條件。在本例中,我們使用的是RecursiveTask類。RecursiveAction和RecursiveTask的區(qū)別在于,使用遞歸任務(wù),我們可以在compute()方法中返回一個(gè)值。Listing 2. An example of RecursiveTask
import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; public class ForkJoinSumArrayTask extends RecursiveTask<Integer> { private final List<Integer> numbers; public ForkJoinSumArrayTask(List<Integer> numbers) { this.numbers = numbers; } @Override protected Integer compute() { if (numbers.size() <= 2) { return numbers.stream().mapToInt(e -> e).sum(); } else { int mid = numbers.size() / 2; List<Integer> list1 = numbers.subList(0, mid); List<Integer> list2 = numbers.subList(mid, numbers.size()); ForkJoinSumArrayTask task1 = new ForkJoinSumArrayTask(list1); ForkJoinSumArrayTask task2 = new ForkJoinSumArrayTask(list2); task1.fork(); return task1.join() + task2.compute(); } } public static void main(String[] args) { ForkJoinPool forkJoinPool = new ForkJoinPool(); List<Integer> numbers = List.of(1, 3, 5, 7, 9); int output = forkJoinPool.invoke(new ForkJoinSumArrayTask(numbers)); System.out.println(output); } }
在這里,我們遞歸地分解中間的數(shù)組,直到它達(dá)到基本條件。
一旦我們破壞了主數(shù)組,我們將list1和list2發(fā)送到ForkJoinsMarrayTask,然后我們分叉task1,它并行執(zhí)行compute()方法和數(shù)組的其他部分。
一旦遞歸過程達(dá)到基本條件,就會(huì)調(diào)用連接方法,連接結(jié)果。這種情況下的輸出為25。
6.何時(shí)使用ForkJoinPool
ForkJoinPool不應(yīng)該在所有情況下都使用。如前所述,最好將其用于高度密集的并發(fā)進(jìn)程。讓我們具體看看這些情況是什么:
- 遞歸任務(wù):ForkJoinPool非常適合執(zhí)行遞歸算法,如快速排序、合并排序或二進(jìn)制搜索。這些算法可以分解成更小的子問題并并行執(zhí)行,這可以顯著提高性能。
- 令人尷尬的并行問題:如果你有一個(gè)問題可以很容易地劃分為獨(dú)立的子任務(wù),例如圖像處理或數(shù)值模擬,你可以使用ForkJoinPool并行執(zhí)行子任務(wù)。
- 高并發(fā)場(chǎng)景:在高并發(fā)場(chǎng)景中,如web服務(wù)器、數(shù)據(jù)處理管道或其他高性能應(yīng)用程序,可以使用ForkJoinPool跨多個(gè)線程并行執(zhí)行任務(wù),這有助于提高性能和吞吐量。
7.總結(jié)
在本文中,您了解了如何使用最重要的ForkJoinPool功能在單獨(dú)的CPU核心中執(zhí)行繁重的操作。讓我們以這篇文章的要點(diǎn)作為結(jié)論:
- ForkJoinPool是一個(gè)使用分治策略遞歸執(zhí)行任務(wù)的線程池。
- 它被諸如Kotlin和Akka之類的JVM語言用來構(gòu)建消息驅(qū)動(dòng)的應(yīng)用程序。
- ForkJoinPool并行執(zhí)行任務(wù),從而實(shí)現(xiàn)計(jì)算機(jī)資源的高效使用。
- 工作竊取算法通過允許空閑線程從繁忙線程中竊取任務(wù)來優(yōu)化資源利用率。
- 任務(wù)存儲(chǔ)在一個(gè)雙端隊(duì)列中,LIFO策略用于存儲(chǔ),F(xiàn)IFO用于竊取。
- ForkJoinPool框架中的主要類包括ForkJoinPool、RecursiveAction和RecursiveTask:
- RecursiveAction用于計(jì)算遞歸操作,不返回任何值。
- RecursiveTask類似,但返回一個(gè)值。
- compute()方法在兩個(gè)類中都被重寫以實(shí)現(xiàn)自定義邏輯。
- fork()方法調(diào)用compute()方法并將任務(wù)分解為更小的子任務(wù)。
- join()方法等待子任務(wù)完成并合并它們的結(jié)果。
- ForkJoinPool通常與并行流和CompletableFuture一起使用。
到此這篇關(guān)于Java中使用ForkJoinPool的實(shí)現(xiàn)示例的文章就介紹到這了,更多相關(guān)Java使用ForkJoinPool內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot整合HTTPS的項(xiàng)目實(shí)踐
HTTPS的主要作用是通過SSL證書保護(hù)用戶數(shù)據(jù)的安全與隱私,增加網(wǎng)站信任度,防止數(shù)據(jù)被竊取和篡改,保護(hù)網(wǎng)站免受釣魚攻擊,本文就來介紹一下,感興趣的可以了解一下2024-10-10SpringBoot服務(wù)上實(shí)現(xiàn)接口限流的方法
這篇文章主要介紹了SpringBoot服務(wù)上實(shí)現(xiàn)接口限流的方法,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-10-10JavaWeb中請(qǐng)求轉(zhuǎn)發(fā)和請(qǐng)求重定向的區(qū)別以及使用
今天帶大家學(xué)習(xí)JavaWeb的相關(guān)知識(shí),文章圍繞著JavaWeb中請(qǐng)求轉(zhuǎn)發(fā)和請(qǐng)求重定向的區(qū)別以及使用展開,文中有非常詳細(xì)的介紹,需要的朋友可以參考下2021-06-06Spring Boot 整合 Shiro+Thymeleaf過程解析
這篇文章主要介紹了Spring Boot 整合 Shiro+Thymeleaf過程解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-10-10