JUC循環(huán)屏障CyclicBarrier與CountDownLatch區(qū)別詳解
前言
jdk中提供了許多的并發(fā)工具類,大家可能比較熟悉的有CountDownLatch
,主要用來阻塞一個線程運行,直到其他線程運行完畢。而jdk還有一個功能類似并發(fā)工具類CyclicBarrier
,你知道它的作用嗎?和CountDownLatch
有什么區(qū)別呢?
對于CountDownLatch不了解的可以參考# CountDownLatch源碼硬核解析
介紹和使用
CyclicBarrier
,循環(huán)屏障,用來進行線程協(xié)作,讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最后一個線程到達屏障時,會觸發(fā)自己運行,運行完后,屏障又會開門,所有被屏障攔截的線程又可以繼續(xù)運行。所以CyclicBarrier
是可以重用的。
為了更好的理解,我們舉個例子,如下圖所示:
我們將屏障想成柵欄,5個線程想成5頭豬。5頭豬開始往前跑,直到都跑到柵欄前,柵欄開始做個自己的任務(wù),比如看看豬多重。然后打開柵欄,豬又會繼續(xù)跑,跑到下一個柵欄,就這樣循環(huán)....
API介紹
構(gòu)造方法
public CyclicBarrier(int parties)
: 創(chuàng)建parties
個線程任務(wù)的循環(huán)CyclicBarrier
public CyclicBarrier(int parties, Runnable barrierAction)
: 當parties
個線程到到屏障出,自己執(zhí)行任務(wù)barrierAction
常用API
int await()
:線程調(diào)用await
方法通知CyclicBarrier
本線程已經(jīng)到達屏障
基本使用
我們將上面豬豬的例子通過CyclicBarrier
簡單做一個實現(xiàn)。
@Slf4j(topic = "c.CyclicBarrierPig") public class CyclicBarrierPig { public static void main(String[] args) { ExecutorService service = Executors.newFixedThreadPool(5); CyclicBarrier barrier = new CyclicBarrier(5, () -> { System.out.println("主人看看哪個豬跑最快,最肥..."); }); // 循環(huán)跑3次 for (int i = 0; i < 3; i++) { // 5條豬開始跑 for(int j = 0; j<5; j++) { int finalJ = j; service.submit(() -> { log.info("pig{} is run .....", finalJ); try { // 隨機時間,模擬跑花費的時間 Thread.sleep(new Random().nextInt(5000)); log.info("pig{} reach barrier .....", finalJ); barrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); } } service.shutdown(); } }
運行結(jié)果:
實現(xiàn)原理
我們已經(jīng)明白CyclicBarrier
的基本使用了,那我們看看它是如何實現(xiàn)的。
成員屬性
- 全局鎖:利用可重入鎖實現(xiàn)的工具類
// barrier 實現(xiàn)是依賴于Condition條件隊列,condition 條件隊列必須依賴lock才能使用 private final ReentrantLock lock = new ReentrantLock(); // 線程掛起實現(xiàn)使用的 condition 隊列,當前代所有線程到位,這個條件隊列內(nèi)的線程才會被喚醒 private final Condition trip = lock.newCondition();
- 線程數(shù)量
// 代表多少個線程到達屏障開始觸發(fā)線程任務(wù) private final int parties; // 表示當前“代”還有多少個線程未到位,初始值為 parties private int count;
- 當前代中最后一個線程到位后要執(zhí)行的任務(wù)
private final Runnable barrierCommand;
- 代, 也是用標記一次循環(huán)
// 表示 barrier 對象當前 代 private Generation generation = new Generation(); private static class Generation { // 表示當前“代”是否被打破,如果被打破再來到這一代的線程 就會直接拋出 BrokenException 異常 // 且在這一代掛起的線程都會被喚醒,然后拋出 BrokerException 異常。 boolean broken = false; }
構(gòu)造方法
public CyclicBarrie(int parties, Runnable barrierAction) { // 因為小于等于 0 的 barrier 沒有任何意義 if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; // 可以為 null this.barrierCommand = barrierAction; }
成員方法
await()
:阻塞等待所有線程到位
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } // timed:表示當前調(diào)用await方法的線程是否指定了超時時長,如果 true 表示線程是響應(yīng)超時的 // nanos:線程等待超時時長,單位是納秒 private int dowait(boolean timed, long nanos) { final ReentrantLock lock = this.lock; // 加鎖 lock.lock(); try { // 獲取當前代 final Generation g = generation; // 【如果當前代是已經(jīng)被打破狀態(tài),則當前調(diào)用await方法的線程,直接拋出Broken異?!? if (g.broken) throw new BrokenBarrierException(); // 如果當前線程被中斷了,則打破當前代,然后當前線程拋出中斷異常 if (Thread.interrupted()) { // 設(shè)置當前代的狀態(tài)為 broken 狀態(tài),喚醒在 trip 條件隊列內(nèi)的線程 breakBarrier(); throw new InterruptedException(); } // 邏輯到這說明,當前線程中斷狀態(tài)是 false, 當前代的 broken 為 false(未打破狀態(tài)) // 假設(shè) parties 給的是 5,那么index對應(yīng)的值為 4,3,2,1,0 int index = --count; // 條件成立說明當前線程是最后一個到達 barrier 的線程,【需要開啟新代,喚醒阻塞線程】 if (index == 0) { // 柵欄任務(wù)啟動標記 boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) // 啟動觸發(fā)的任務(wù) command.run(); // run()未拋出異常的話,啟動標記設(shè)置為 true ranAction = true; // 開啟新的一代,這里會【喚醒所有的阻塞隊列】 nextGeneration(); // 返回 0 因為當前線程是此代最后一個到達的線程,index == 0 return 0; } finally { // 如果 command.run() 執(zhí)行拋出異常的話,會進入到這里 if (!ranAction) breakBarrier(); } } // 自旋,一直到條件滿足、當前代被打破、線程被中斷,等待超時 for (;;) { try { // 根據(jù)是否需要超時等待選擇阻塞方法 if (!timed) // 當前線程釋放掉 lock,【進入到 trip 條件隊列的尾部掛起自己】,等待被喚醒 trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { // 被中斷后來到這里的邏輯 // 當前代沒有變化并且沒有被打破 if (g == generation && !g.broken) { // 打破屏障 breakBarrier(); // node 節(jié)點在【條件隊列】內(nèi)收到中斷信號時 會拋出中斷異常 throw ie; } else { // 等待過程中代變化了,完成一次自我打斷 Thread.currentThread().interrupt(); } } // 喚醒后的線程,【判斷當前代已經(jīng)被打破,線程喚醒后依次拋出 BrokenBarrier 異?!? if (g.broken) throw new BrokenBarrierException(); // 當前線程掛起期間,最后一個線程到位了,然后觸發(fā)了開啟新的一代的邏輯 if (g != generation) return index; // 當前線程 trip 中等待超時,然后主動轉(zhuǎn)移到阻塞隊列 if (timed && nanos <= 0L) { breakBarrier(); // 拋出超時異常 throw new TimeoutException(); } } } finally { // 解鎖 lock.unlock(); } }
- breakBarrier():打破 Barrier 屏障
private void breakBarrier() { // 將代中的 broken 設(shè)置為 true,表示這一代是被打破了,再來到這一代的線程,直接拋出異常 generation.broken = true; // 重置 count 為 parties count = parties; // 將在trip條件隊列內(nèi)掛起的線程全部喚醒,喚醒后的線程會檢查當前是否是打破的,然后拋出異常 trip.signalAll(); }
- nextGeneration():開啟新的下一代
private void nextGeneration() { // 將在 trip 條件隊列內(nèi)掛起的線程全部喚醒 trip.signalAll(); // 重置 count 為 parties count = parties; // 開啟新的一代,使用一個新的generation對象,表示新的一代,新的一代和上一代【沒有任何關(guān)系】 generation = new Generation(); }
和CountDownLatch的區(qū)別
相同點
二者都能讓一個或多個線程阻塞等待,都可以用在多個線程間的協(xié)調(diào),起到線程同步的作用。
不同點
CountDownLatch
的計數(shù)器只能使用一次,而CyclicBarrier
的計數(shù)器可以反復(fù) 使用。CountDownLatch.await
一般阻塞工作線程,所有的進行預(yù)備工作的線程執(zhí)行countDown
,而CyclicBarrier
通過工作線程調(diào)用await
從而自行阻塞,直到所有工作線程達到指定屏障,所有的線程才會返回各自執(zhí)行自己的工作。CyclicBarrier
還可以提供一個barrierAction
,合并多線程計算結(jié)果。CountDownLatch
會阻塞主線程,CyclicBarrier
不會阻塞主線程,只會阻塞子線程。
總結(jié)
本文講解了CyclicBarrier
的基本功能和使用,同時講解了它大致的實現(xiàn)。關(guān)于CyclicBarrier
具體有什么使用場景,你可能還是比較迷惑,我再舉個例子,比如CyclicBarrier
可以用于多線程計算數(shù)據(jù),最后合并計算結(jié)果的應(yīng)用場景,更多關(guān)于JUC循環(huán)屏障的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
java使用BeanUtils.copyProperties方法對象復(fù)制同名字段類型不同賦值為空問題解決方案
這篇文章主要給大家介紹了關(guān)于java使用BeanUtils.copyProperties方法對象復(fù)制同名字段類型不同賦值為空問題的解決方案,文中通過代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-11-11Java輸入流Scanner/BufferedReader使用方法示例
這篇文章主要介紹了Java輸入流Scanner/BufferedReader使用方法,大家看示例吧2013-11-11java遞歸實現(xiàn)復(fù)制一個文件夾下所有文件功能
這篇文章主要介紹了java遞歸實現(xiàn)復(fù)制一個文件夾下所有文件功能n次,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2019-08-08MyBatis環(huán)境資源配置實現(xiàn)代碼詳解
這篇文章主要介紹了MyBatis環(huán)境資源配置實現(xiàn)代碼解析,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-08-08Springboot中@Value注解的場景用法及可能遇到的問題詳解
這篇文章主要給大家介紹了關(guān)于Springboot中@Value注解的場景用法及可能遇到問題的相關(guān)資料, @Value通常用于注入外部化屬性,即外部配置屬性的注入,文中通過圖文介紹的非常詳細,需要的朋友可以參考下2023-11-11Java中的InputStreamReader和OutputStreamWriter源碼分析_動力節(jié)點Java學(xué)院整理
本文通過示例代碼給大家解析了Java中的InputStreamReader和OutputStreamWriter知識,需要的的朋友參考下吧2017-05-05