Java并發(fā)編程中的CyclicBarrier線程屏障詳解
一、簡(jiǎn)介
CyclicBarrier的字面意思是可循環(huán)使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達(dá)一個(gè)屏障(也可以叫同步點(diǎn))時(shí)被阻塞,直到最后一個(gè)線程到達(dá)屏障時(shí),屏障才會(huì)開門,所有被屏障攔截的線程才會(huì)繼續(xù)運(yùn)行。下圖演示了這一過程。
二、與countdownlatch的區(qū)別
它的主要作用其實(shí)和CountDownLanch差不多,都是讓一組線程到達(dá)一個(gè)屏障時(shí)被阻塞,直到最后一個(gè)線程到達(dá)屏障時(shí),屏障會(huì)被打開,所有被屏障阻塞的線程才會(huì)繼續(xù)執(zhí)行,不過它是可以循環(huán)執(zhí)行的,這是它與CountDownLanch最大的不同。
三、核心參數(shù)
/** The lock for guarding barrier entry */ // 可重入鎖 private final ReentrantLock lock = new ReentrantLock(); /** Condition to wait on until tripped */ // 條件隊(duì)列,具體看AQS private final Condition trip = lock.newCondition(); /** The number of parties */ // 參與的線程數(shù)量 private final int parties; /* The command to run when tripped */ // 由最后一個(gè)進(jìn)入 barrier 的線程執(zhí)行的操作 private final Runnable barrierCommand; /** The current generation */ // 當(dāng)前代 private Generation generation = new Generation(); // 正在等待進(jìn)入屏障的線程數(shù)量 private int count;
四、構(gòu)造方法
public CyclicBarrier(int parties, Runnable barrierAction) { // 參與的線程數(shù)量小于等于0,拋出異常 if (parties <= 0) throw new IllegalArgumentException(); // 設(shè)置parties this.parties = parties; // 設(shè)置count this.count = parties; // 設(shè)置barrierCommand this.barrierCommand = barrierAction; }
該構(gòu)造函數(shù)可以指定關(guān)聯(lián)該CyclicBarrier的線程數(shù)量,并且可以指定在所有線程都進(jìn)入屏障后的執(zhí)行動(dòng)作,該執(zhí)行動(dòng)作由最后一個(gè)進(jìn)行屏障的線程執(zhí)行。
五、核心方法
await方法
(1)dowait方法
await中調(diào)用dowait
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { // 保存當(dāng)前鎖 final ReentrantLock lock = this.lock; // 鎖定 lock.lock(); try { // 保存當(dāng)前代 final Generation g = generation; if (g.broken) // 屏障被破壞,拋出異常 throw new BrokenBarrierException(); if (Thread.interrupted()) { // 線程被中斷 // 損壞當(dāng)前屏障,并且喚醒所有的線程,只有擁有鎖的時(shí)候才會(huì)調(diào)用 breakBarrier(); // 拋出異常 throw new InterruptedException(); } // 減少正在等待進(jìn)入屏障的線程數(shù)量 int index = --count; if (index == 0) { // 正在等待進(jìn)入屏障的線程數(shù)量為0,所有線程都已經(jīng)進(jìn)入 // 運(yùn)行的動(dòng)作標(biāo)識(shí) boolean ranAction = false; try { // 保存運(yùn)行動(dòng)作 final Runnable command = barrierCommand; if (command != null) // 動(dòng)作不為空 // 運(yùn)行 command.run(); // 設(shè)置ranAction狀態(tài) ranAction = true; // 進(jìn)入下一代 nextGeneration(); return 0; } finally { if (!ranAction) // 沒有運(yùn)行的動(dòng)作 // 損壞當(dāng)前屏障 breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out // 無限循環(huán) for (;;) { try { if (!timed) // 沒有設(shè)置等待時(shí)間 // 等待 trip.await(); else if (nanos > 0L) // 設(shè)置了等待時(shí)間,并且等待時(shí)間大于0 // 等待指定時(shí)長(zhǎng) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { // 等于當(dāng)前代并且屏障沒有被損壞 // 損壞當(dāng)前屏障 breakBarrier(); // 拋出異常 throw ie; } else { // 不等于當(dāng)前帶后者是屏障被損壞 // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. // 中斷當(dāng)前線程 Thread.currentThread().interrupt(); } } if (g.broken) // 屏障被損壞,拋出異常 throw new BrokenBarrierException(); if (g != generation) // 不等于當(dāng)前代 // 返回索引 return index; if (timed && nanos <= 0L) { // 設(shè)置了等待時(shí)間,并且等待時(shí)間小于0 // 損壞屏障 breakBarrier(); // 拋出異常 throw new TimeoutException(); } } } finally { // 釋放鎖 lock.unlock(); } }
(2)核心流程
(3)nextGeneration方法
該方法會(huì)喚醒所有線程并且重置次數(shù),這也是為什么CyclicBarrier可以循環(huán)調(diào)用的原因
private void nextGeneration() { // signal completion of last generation // 喚醒所有線程 trip.signalAll(); // set up next generation // 恢復(fù)正在等待進(jìn)入屏障的線程數(shù)量 count = parties; // 新生一代 generation = new Generation(); }
六、應(yīng)用
static class MyThread extends Thread { private CyclicBarrier cb; public MyThread(String name, CyclicBarrier cb) { super(name); this.cb = cb; } public void run() { System.out.println(Thread.currentThread().getName() + " going to await"); try { cb.await(); System.out.println(Thread.currentThread().getName() + " continue"); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException, BrokenBarrierException { CyclicBarrier cb = new CyclicBarrier(3, new Thread("barrierAction") { public void run() { System.out.println(Thread.currentThread().getName() + " barrier action"); } }); MyThread t1 = new MyThread("t1", cb); MyThread t2 = new MyThread("t2", cb); t1.start(); t2.start(); System.out.println(Thread.currentThread().getName() + " going to await"); cb.await(); System.out.println(Thread.currentThread().getName() + " continue"); }
運(yùn)行結(jié)果:
所以調(diào)用時(shí)序?yàn)椋?/p>
到此這篇關(guān)于Java并發(fā)編程中的CyclicBarrier線程屏障詳解的文章就介紹到這了,更多相關(guān)CyclicBarrier線程屏障內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot開發(fā)案例之配置Druid數(shù)據(jù)庫連接池的示例
本篇文章主要介紹了SpringBoot開發(fā)案例之配置Druid數(shù)據(jù)庫連接池的示例,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-03-03MyBatis查詢時(shí)屬性名和字段名不一致問題的解決方法
這篇文章主要給大家介紹了關(guān)于MyBatis查詢時(shí)屬性名和字段名不一致問題的解決方法,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-01-01Java遞歸實(shí)現(xiàn)評(píng)論多級(jí)回復(fù)功能
這篇文章主要介紹了Java遞歸實(shí)現(xiàn)評(píng)論多級(jí)回復(fù)功能,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-06-06Java中的static關(guān)鍵字用法總結(jié)
這篇文章主要介紹了Java中的static關(guān)鍵字用法總結(jié),static是Java50個(gè)關(guān)鍵字之一,static關(guān)鍵字可以用來修飾代碼塊表示靜態(tài)代碼塊,修飾成員變量表示全局靜態(tài)成員變量,修飾方法表示靜態(tài)方法,需要的朋友可以參考下2023-11-11詳解spring項(xiàng)目中如何動(dòng)態(tài)刷新bean
這篇文章主要為大家介紹了詳解spring項(xiàng)目中如何動(dòng)態(tài)刷新bean,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-08-08Java多線程并發(fā)編程(互斥鎖Reentrant Lock)
這篇文章主要介紹了ReentrantLock 互斥鎖,在同一時(shí)間只能被一個(gè)線程所占有,在被持有后并未釋放之前,其他線程若想獲得該鎖只能等待或放棄,需要的朋友可以參考下2017-05-05