Java并發(fā)編程中的CyclicBarrier線程屏障詳解
一、簡介
CyclicBarrier的字面意思是可循環(huán)使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最后一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續(xù)運行。下圖演示了這一過程。
二、與countdownlatch的區(qū)別
它的主要作用其實和CountDownLanch差不多,都是讓一組線程到達一個屏障時被阻塞,直到最后一個線程到達屏障時,屏障會被打開,所有被屏障阻塞的線程才會繼續(xù)執(zhí)行,不過它是可以循環(huán)執(zhí)行的,這是它與CountDownLanch最大的不同。
三、核心參數(shù)
/** The lock for guarding barrier entry */ // 可重入鎖 private final ReentrantLock lock = new ReentrantLock(); /** Condition to wait on until tripped */ // 條件隊列,具體看AQS private final Condition trip = lock.newCondition(); /** The number of parties */ // 參與的線程數(shù)量 private final int parties; /* The command to run when tripped */ // 由最后一個進入 barrier 的線程執(zhí)行的操作 private final Runnable barrierCommand; /** The current generation */ // 當前代 private Generation generation = new Generation(); // 正在等待進入屏障的線程數(shù)量 private int count;
四、構造方法
public CyclicBarrier(int parties, Runnable barrierAction) { // 參與的線程數(shù)量小于等于0,拋出異常 if (parties <= 0) throw new IllegalArgumentException(); // 設置parties this.parties = parties; // 設置count this.count = parties; // 設置barrierCommand this.barrierCommand = barrierAction; }
該構造函數(shù)可以指定關聯(lián)該CyclicBarrier的線程數(shù)量,并且可以指定在所有線程都進入屏障后的執(zhí)行動作,該執(zhí)行動作由最后一個進行屏障的線程執(zhí)行。
五、核心方法
await方法
(1)dowait方法
await中調用dowait
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { // 保存當前鎖 final ReentrantLock lock = this.lock; // 鎖定 lock.lock(); try { // 保存當前代 final Generation g = generation; if (g.broken) // 屏障被破壞,拋出異常 throw new BrokenBarrierException(); if (Thread.interrupted()) { // 線程被中斷 // 損壞當前屏障,并且喚醒所有的線程,只有擁有鎖的時候才會調用 breakBarrier(); // 拋出異常 throw new InterruptedException(); } // 減少正在等待進入屏障的線程數(shù)量 int index = --count; if (index == 0) { // 正在等待進入屏障的線程數(shù)量為0,所有線程都已經進入 // 運行的動作標識 boolean ranAction = false; try { // 保存運行動作 final Runnable command = barrierCommand; if (command != null) // 動作不為空 // 運行 command.run(); // 設置ranAction狀態(tài) ranAction = true; // 進入下一代 nextGeneration(); return 0; } finally { if (!ranAction) // 沒有運行的動作 // 損壞當前屏障 breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out // 無限循環(huán) for (;;) { try { if (!timed) // 沒有設置等待時間 // 等待 trip.await(); else if (nanos > 0L) // 設置了等待時間,并且等待時間大于0 // 等待指定時長 nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { // 等于當前代并且屏障沒有被損壞 // 損壞當前屏障 breakBarrier(); // 拋出異常 throw ie; } else { // 不等于當前帶后者是屏障被損壞 // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. // 中斷當前線程 Thread.currentThread().interrupt(); } } if (g.broken) // 屏障被損壞,拋出異常 throw new BrokenBarrierException(); if (g != generation) // 不等于當前代 // 返回索引 return index; if (timed && nanos <= 0L) { // 設置了等待時間,并且等待時間小于0 // 損壞屏障 breakBarrier(); // 拋出異常 throw new TimeoutException(); } } } finally { // 釋放鎖 lock.unlock(); } }
(2)核心流程
(3)nextGeneration方法
該方法會喚醒所有線程并且重置次數(shù),這也是為什么CyclicBarrier可以循環(huán)調用的原因
private void nextGeneration() { // signal completion of last generation // 喚醒所有線程 trip.signalAll(); // set up next generation // 恢復正在等待進入屏障的線程數(shù)量 count = parties; // 新生一代 generation = new Generation(); }
六、應用
static class MyThread extends Thread { private CyclicBarrier cb; public MyThread(String name, CyclicBarrier cb) { super(name); this.cb = cb; } public void run() { System.out.println(Thread.currentThread().getName() + " going to await"); try { cb.await(); System.out.println(Thread.currentThread().getName() + " continue"); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException, BrokenBarrierException { CyclicBarrier cb = new CyclicBarrier(3, new Thread("barrierAction") { public void run() { System.out.println(Thread.currentThread().getName() + " barrier action"); } }); MyThread t1 = new MyThread("t1", cb); MyThread t2 = new MyThread("t2", cb); t1.start(); t2.start(); System.out.println(Thread.currentThread().getName() + " going to await"); cb.await(); System.out.println(Thread.currentThread().getName() + " continue"); }
運行結果:
所以調用時序為:
到此這篇關于Java并發(fā)編程中的CyclicBarrier線程屏障詳解的文章就介紹到這了,更多相關CyclicBarrier線程屏障內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
SpringBoot開發(fā)案例之配置Druid數(shù)據庫連接池的示例
本篇文章主要介紹了SpringBoot開發(fā)案例之配置Druid數(shù)據庫連接池的示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-03-03Java多線程并發(fā)編程(互斥鎖Reentrant Lock)
這篇文章主要介紹了ReentrantLock 互斥鎖,在同一時間只能被一個線程所占有,在被持有后并未釋放之前,其他線程若想獲得該鎖只能等待或放棄,需要的朋友可以參考下2017-05-05