詳解如何判斷Java線程池任務已執(zhí)行完
無論是在項目開發(fā)中,還是在面試中過程中,總會被問到或使用到并發(fā)編程來完成項目中的某個功能。
例如某個復雜的查詢,無法使用一個查詢語句來完成此功能,此時我們就需要執(zhí)行多個查詢語句,然后再將各自查詢的結果,組裝之后返回給前端了,那么這種場景下,我們就必須使用線程池來進行并發(fā)查詢了。
PS:磊哥做的最復雜的查詢,總共關聯了 21 張表,在和產品及需求方的溝通多次溝通下,才將查詢的業(yè)務從 21 張表,降到了至少要查詢 12 張表(非常難搞),那么這種場景下是無法使用一個查詢語句來實現的,那么并發(fā)查詢是必須要給安排上的。
1.需求分析
線程池的使用并不復雜,麻煩的是如何判斷線程池中的任務已經全部執(zhí)行完了?因為我們要等所有任務都執(zhí)行完之后,才能進行數據的組裝和返回,所以接下來,我們就來看如何判斷線程中的任務是否已經全部執(zhí)行完?
2.實現概述
判斷線程池中的任務是否執(zhí)行完的方法有很多,比如以下幾個:
- 使用 getCompletedTaskCount() 統(tǒng)計已經執(zhí)行完的任務,和 getTaskCount() 線程池的總任務進行對比,如果相等則說明線程池的任務執(zhí)行完了,否則既未執(zhí)行完。
- 使用 FutureTask 等待所有任務執(zhí)行完,線程池的任務就執(zhí)行完了。
- 使用 CountDownLatch 或 CyclicBarrier 等待所有線程都執(zhí)行完之后,再執(zhí)行后續(xù)流程。
具體實現代碼如下。
3.具體實現
3.1 統(tǒng)計完成任務數
通過判斷線程池中的計劃執(zhí)行任務數和已完成任務數,來判斷線程池是否已經全部執(zhí)行完,如果計劃執(zhí)行任務數=已完成任務數,那么線程池的任務就全部執(zhí)行完了,否則就未執(zhí)行完。示例代碼如下:
private?static?void?isCompletedByTaskCount(ThreadPoolExecutor?threadPool)?{ ????while?(threadPool.getTaskCount()?!=?threadPool.getCompletedTaskCount())?{ ????} }
以上程序執(zhí)行結果如下:
方法說明
- getTaskCount():返回計劃執(zhí)行的任務總數。由于任務和線程的狀態(tài)可能在計算過程中動態(tài)變化,因此返回的值只是一個近似值。
- getCompletedTaskCount():返回完成執(zhí)行任務的總數。因為任務和線程的狀態(tài)可能在計算過程中動態(tài)地改變,所以返回的值只是一個近似值,但是在連續(xù)的調用中并不會減少。
缺點分析
此判斷方法的缺點是 getTaskCount() 和 getCompletedTaskCount() 返回的是一個近似值,因為線程池中的任務和線程的狀態(tài)可能在計算過程中動態(tài)變化,所以它們兩個返回的都是一個近似值。
3.2 FutureTask
FutrueTask 的優(yōu)勢是任務判斷精準,調用每個 FutrueTask 的 get 方法就是等待該任務執(zhí)行完,如下代碼所示:
import?java.util.concurrent.ExecutionException; import?java.util.concurrent.ExecutorService; import?java.util.concurrent.Executors; import?java.util.concurrent.FutureTask; /** ?*?使用?FutrueTask?等待線程池執(zhí)行完全部任務 ?*/ public?class?FutureTaskDemo?{ ????public?static?void?main(String[]?args)?throws?ExecutionException,?InterruptedException?{ ????????//?創(chuàng)建一個固定大小的線程池 ????????ExecutorService?executor?=?Executors.newFixedThreadPool(3); ????????//?創(chuàng)建任務 ????????FutureTask<Integer>?task1?=?new?FutureTask<>(()?->?{ ????????????System.out.println("Task?1?start"); ????????????Thread.sleep(2000); ????????????System.out.println("Task?1?end"); ????????????return?1; ????????}); ????????FutureTask<Integer>?task2?=?new?FutureTask<>(()?->?{ ????????????System.out.println("Task?2?start"); ????????????Thread.sleep(3000); ????????????System.out.println("Task?2?end"); ????????????return?2; ????????}); ????????FutureTask<Integer>?task3?=?new?FutureTask<>(()?->?{ ????????????System.out.println("Task?3?start"); ????????????Thread.sleep(1500); ????????????System.out.println("Task?3?end"); ????????????return?3; ????????}); ????????//?提交三個任務給線程池 ????????executor.submit(task1); ????????executor.submit(task2); ????????executor.submit(task3); ????????//?等待所有任務執(zhí)行完畢并獲取結果 ????????int?result1?=?task1.get(); ????????int?result2?=?task2.get(); ????????int?result3?=?task3.get(); ????????System.out.println("Do?main?thread."); ????} }
以上程序的執(zhí)行結果如下:
3.3 CountDownLatch和CyclicBarrier
CountDownLatch 和 CyclicBarrier 類似,都是等待所有任務到達某個點之后,再進行后續(xù)的操作,如下圖所示:
CountDownLatch 使用的示例代碼如下:
public?static?void?main(String[]?args)?throws?InterruptedException?{ ????//?創(chuàng)建線程池 ????ThreadPoolExecutor?threadPool?=?new?ThreadPoolExecutor(10,?20, ?????0,?TimeUnit.SECONDS,?new?LinkedBlockingDeque<>(1024)); ????final?int?taskCount?=?5;????//?任務總數 ????//?單次計數器 ????CountDownLatch?countDownLatch?=?new?CountDownLatch(taskCount);?//?① ????//?添加任務 ????for?(int?i?=?0;?i?<?taskCount;?i++)?{ ????????final?int?finalI?=?i; ????????threadPool.submit(new?Runnable()?{ ????????????@Override ????????????public?void?run()?{ ????????????????try?{ ????????????????????//?隨機休眠?0-4s ????????????????????int?sleepTime?=?new?Random().nextInt(5); ????????????????????TimeUnit.SECONDS.sleep(sleepTime); ????????????????}?catch?(InterruptedException?e)?{ ????????????????????e.printStackTrace(); ????????????????} ????????????????System.out.println(String.format("任務%d執(zhí)行完成",?finalI)); ????????????????//?線程執(zhí)行完,計數器?-1 ????????????????countDownLatch.countDown();??//?② ????????????} ????????}); ????} ????//?阻塞等待線程池任務執(zhí)行完 ????countDownLatch.await();??//?③ ????//?線程池執(zhí)行完 ????System.out.println(); ????System.out.println("線程池任務執(zhí)行完成!"); }
代碼說明:以上代碼中標識為 ①、②、③ 的代碼行是核心實現代碼,其中:① 是聲明一個包含了 5 個任務的計數器;② 是每個任務執(zhí)行完之后計數器 -1;③ 是阻塞等待計數器 CountDownLatch 減為 0,表示任務都執(zhí)行完了,可以執(zhí)行 await 方法后面的業(yè)務代碼了。
以上程序的執(zhí)行結果如下:
缺點分析
CountDownLatch 缺點是計數器只能使用一次,CountDownLatch 創(chuàng)建之后不能被重復使用。CyclicBarrier 和 CountDownLatch 類似,它可以理解為一個可以重復使用的循環(huán)計數器,CyclicBarrier 可以調用 reset 方法將自己重置到初始狀態(tài),CyclicBarrier 具體實現代碼如下:
public?static?void?main(String[]?args)?throws?InterruptedException?{ ????//?創(chuàng)建線程池 ????ThreadPoolExecutor?threadPool?=?new?ThreadPoolExecutor(10,?20, ?????0,?TimeUnit.SECONDS,?new?LinkedBlockingDeque<>(1024)); ????final?int?taskCount?=?5;????//?任務總數 ????//?循環(huán)計數器?① ????CyclicBarrier?cyclicBarrier?=?new?CyclicBarrier(taskCount,?new?Runnable()?{ ????????@Override ????????public?void?run()?{ ????????????//?線程池執(zhí)行完 ????????????System.out.println(); ????????????System.out.println("線程池所有任務已執(zhí)行完!"); ????????} ????}); ????//?添加任務 ????for?(int?i?=?0;?i?<?taskCount;?i++)?{ ????????final?int?finalI?=?i; ????????threadPool.submit(new?Runnable()?{ ????????????@Override ????????????public?void?run()?{ ????????????????try?{ ????????????????????//?隨機休眠?0-4s ????????????????????int?sleepTime?=?new?Random().nextInt(5); ????????????????????TimeUnit.SECONDS.sleep(sleepTime); ????????????????????System.out.println(String.format("任務%d執(zhí)行完成",?finalI)); ????????????????????//?線程執(zhí)行完 ????????????????????cyclicBarrier.await();?//?② ????????????????}?catch?(InterruptedException?e)?{ ????????????????????e.printStackTrace(); ????????????????}?catch?(BrokenBarrierException?e)?{ ????????????????????e.printStackTrace(); ????????????????} ????????????} ????????}); ????} }
以上程序的執(zhí)行結果如下:
方法說明
CyclicBarrier 有 3 個重要的方法:
1.構造方法:構造方法可以傳遞兩個參數,參數 1 是計數器的數量 parties,參數 2 是計數器為 0 時,也就是任務都執(zhí)行完之后可以執(zhí)行的事件(方法)。
2.await 方法:在 CyclicBarrier 上進行阻塞等待,當調用此方法時 CyclicBarrier 的內部計數器會 -1,直到發(fā)生以下情形之一:
- 在 CyclicBarrier 上等待的線程數量達到 parties,也就是計數器的聲明數量時,則所有線程被釋放,繼續(xù)執(zhí)行。
- 當前線程被中斷,則拋出 InterruptedException 異常,并停止等待,繼續(xù)執(zhí)行。
- 其他等待的線程被中斷,則當前線程拋出 BrokenBarrierException 異常,并停止等待,繼續(xù)執(zhí)行。
- 其他等待的線程超時,則當前線程拋出 BrokenBarrierException 異常,并停止等待,繼續(xù)執(zhí)行。
- 其他線程調用 CyclicBarrier.reset() 方法,則當前線程拋出 BrokenBarrierException 異常,并停止等待,繼續(xù)執(zhí)行。
3.reset 方法:使得CyclicBarrier回歸初始狀態(tài),直觀來看它做了兩件事:
- 如果有正在等待的線程,則會拋出 BrokenBarrierException 異常,且這些線程停止等待,繼續(xù)執(zhí)行。
- 將是否破損標志位 broken 置為 false。
優(yōu)缺點分析
CyclicBarrier 從設計的復雜度到使用的復雜度都高于 CountDownLatch,相比于 CountDownLatch 來說它的優(yōu)點是可以重復使用(只需調用 reset 就能恢復到初始狀態(tài)),缺點是使用難度較高。
小結
在實現判斷線程池任務是否執(zhí)行完成的方案中,通過統(tǒng)計線程池執(zhí)行完任務的方式(實現方法 1),以及實現方法 3(CountDownLatch 或 CyclicBarrier)等統(tǒng)計,都是“不記名”的,只關注數量,不關注(具體)對象,所以這些方式都有可能受到外界代碼的影響,因此使用 FutureTask 等待具體任務執(zhí)行完的方式是最推薦的判斷方法。
到此這篇關于詳解如何判斷Java線程池任務已執(zhí)行完的文章就介紹到這了,更多相關Java線程池任務內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!