Java中CyclicBarrier?循環(huán)屏障
一、簡介
CyclicBarrier 字面意思回環(huán)柵欄(循環(huán)屏障),它可以實現讓一組線程等待至某個狀態(tài)(屏障點)之后再全部同時執(zhí)行。叫做回環(huán)是因為當所有等待線程都被釋放以后,CyclicBarrier可以被重用。
CyclicBarrier 作用是讓一組線程相互等待,當達到一個共同點時,所有之前等待的線程再繼續(xù)執(zhí)行,且 CyclicBarrier 功能可重復使用。
二、CyclicBarrier的使用
構造方法:
?// parties表示屏障攔截的線程數量,每個線程調用 await 方法告訴 CyclicBarrier 我已經到達了屏障,然后當前線程被阻塞。 public CyclicBarrier(int parties) // 用于在線程到達屏障時,優(yōu)先執(zhí)行 barrierAction,方便處理更復雜的業(yè)務場景(該線程的執(zhí)行時機是在到達屏障之后再執(zhí)行)
重要方法:
//屏障 指定數量的線程全部調用await()方法時,這些線程不再阻塞 // BrokenBarrierException 表示柵欄已經被破壞,破壞的原因可能是其中一個線程 await() 時被中斷或者超時 public int await() throws InterruptedException, BrokenBarrierException public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException //循環(huán) 通過reset()方法可以進行重置
CyclicBarrier 應用場景
- 利用 CyclicBarrier 可以用于多線程計算數據,最后合并計算結果的場景。
- 利用 CyclicBarrier的計數器能夠重置,屏障可以重復使用的特性,可以支持類似“人滿發(fā)車”的場景
模擬合并計算場景
利用 CyclicBarrier 可以用于多線程計算數據,最后合并計算結果的場景。
public class CyclicBarrierTest2 { //保存每個學生的平均成績 private Conc urrentHashMap<String, Integer> map=new ConcurrentHashMap<String,Integer>(); private ExecutorService threadPool= Executors.newFixedThreadPool(3); private CyclicBarrier cb=new CyclicBarrier(3,()->{ int result=0; Set<String> set = map.keySet(); for(String s:set){ result+=map.get(s); } System.out.println("三人平均成績?yōu)?"+(result/3)+"分"); }); public void count(){ for(int i=0;i<3;i++){ threadPool.execute(new Runnable(){ @Override public void run() { //獲取學生平均成績 int score=(int)(Math.random()*40+60); map.put(Thread.currentThread().getName(), score); System.out.println(Thread.currentThread().getName() +"同學的平均成績?yōu)椋?+score); try { //執(zhí)行完運行await(),等待所有學生平均成績都計算完畢 cb.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }); } } public static void main(String[] args) { CyclicBarrierTest2 cb=new CyclicBarrierTest2(); cb.count(); } }
模擬“人滿發(fā)車”的場景
利用CyclicBarrier的計數器能夠重置,屏障可以重復使用的特性,可以支持類似“人滿發(fā)車”的場景
public class CyclicBarrierTest3 { public static void main(String[] args) { AtomicInteger counter = new AtomicInteger(); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 5, 5, 1000, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), (r) -> new Thread(r, counter.addAndGet(1) + " 號 "), new ThreadPoolExecutor.AbortPolicy()); CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("裁判:比賽開始~~")); for (int i = 0; i < 10; i++) { threadPoolExecutor.submit(new Runner(cyclicBarrier)); } } static class Runner extends Thread{ private CyclicBarrier cyclicBarrier; public Runner (CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override public void run() { try { int sleepMills = ThreadLocalRandom.current().nextInt(1000); Thread.sleep(sleepMills); System.out.println(Thread.currentThread().getName() + " 選手已就位, 準備共用時: " + sleepMills + "ms" + cyclicBarrier.getNumberWaiting()); cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); }catch(BrokenBarrierException e){ e.printStackTrace(); } } } }
輸出結果:
3 號 選手已就位, 準備共用時: 78ms0
1 號 選手已就位, 準備共用時: 395ms1
5 號 選手已就位, 準備共用時: 733ms2
2 號 選手已就位, 準備共用時: 776ms3
4 號 選手已就位, 準備共用時: 807ms4
裁判:比賽開始~~
4 號 選手已就位, 準備共用時: 131ms0
3 號 選手已就位, 準備共用時: 256ms1
2 號 選手已就位, 準備共用時: 291ms2
1 號 選手已就位, 準備共用時: 588ms3
5 號 選手已就位, 準備共用時: 763ms4
裁判:比賽開始~~
三、CyclicBarrier 源碼分析
CyclicBarrier 流程
主要是的流程:
- 獲取鎖 如果 count != 0 就進入阻塞;
- 進入阻塞之前,首先需要進入條件隊列,然后釋放鎖,最后阻塞;
- 如果 count != 0 會進行一個喚醒,將所有的條件隊列中的節(jié)點轉換為阻塞隊列;
- 被喚醒過后會進行鎖的獲取,如果鎖獲取失敗,會進入 lock 的阻塞隊列;
- 如果鎖獲取成功,進行鎖的釋放,以及喚醒,同步隊列中的線程。
下面是一個簡單的流程圖:
下面是具體的一些代碼調用的流程:
幾個常見的問題?
- 1.一組線程在觸發(fā)屏障之前互相等待,最后一個線程到達屏障后喚醒邏輯是如何實現的. 喚醒的過程是通過調用
java.util.concurrent.locks.Condition#signalAll
喚醒條件隊列上的所有節(jié)點。 - 2.刪欄循環(huán)使用是如何實現的? 實際上一個互斥鎖 ReentrantLock 的條件隊列和阻塞隊列的轉換。
- 3.條件隊列到同步隊列的轉換實現邏輯 ? 轉換過程中,首先會先將條件隊列中所有的阻塞線程喚醒,然后會去獲取 lock 如果獲取失敗,就進入同步隊列。
CyclicBarrier 與 CountDownLatch的區(qū)別
- CountDownLatch的計數器只能使用一次,而CyclicBarrier的計數器可以使用reset() 方法重置。所以CyclicBarrier能處理更為復雜的業(yè)務場景,比如如果計算發(fā)生錯誤,可以重置計數器,并讓線程們重新執(zhí)行一次
- CyclicBarrier還提供getNumberWaiting(可以獲得CyclicBarrier阻塞的線程數量)、isBroken(用來知道阻塞的線程是否被中斷)等方法。
- CountDownLatch會阻塞主線程,CyclicBarrier不會阻塞主線程,只會阻塞子線程。
- CountDownLatch和CyclicBarrier都能夠實現線程之間的等待,只不過它們側重點不同。CountDownLatch一般用于一個或多個線程,等待其他線程執(zhí)行完任務后,再執(zhí)行。CyclicBarrier一般用于一組線程互相等待至某個狀態(tài),然后這一組線程再同時執(zhí)行。
- CyclicBarrier 還可以提供一個 barrierAction,合并多線程計算結果。
- CyclicBarrier是通過ReentrantLock的"獨占鎖"和Conditon來實現一組線程的阻塞喚醒的,而CountDownLatch則是通過AQS的“共享鎖”實現
到此這篇關于Java中CyclicBarrier 循環(huán)屏障的文章就介紹到這了,更多相關Java CyclicBarrier 內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
SpringBoot生產環(huán)境打包如何去除無用依賴
這篇文章主要介紹了SpringBoot生產環(huán)境打包如何去除無用依賴問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-09-09Netty源碼分析NioEventLoop初始化線程選擇器創(chuàng)建
這篇文章主要介紹了Netty源碼分析NioEventLoop初始化線程選擇器創(chuàng)建,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-03-03使用restTemplate.postForEntity()的問題
這篇文章主要介紹了使用restTemplate.postForEntity()的問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-09-09