AQS同步組件CyclicBarrier循環(huán)屏障用例剖析
CyclicBarrier原理
CyclicBarrier 的字面意思是可循環(huán)使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最后一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續(xù)運行。
當某個線程調用了await方法之后,就會進入等待狀態(tài),并將計數器+1,直到所有線程調用await方法使計數器為CyclicBarrier設置的值,才可以繼續(xù)執(zhí)行,由于計數器可以重復使用,所以我們又叫它循環(huán)屏障。
CyclicBarrier默認的構造方法是CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每個線程調用await方法告訴CyclicBarrier我已經到達了屏障,然后當前線程被阻塞。
CyclicBarrier可以用于多線程計算數據,最后合并計算結果的應用場景。
源碼分析
/**
* 創(chuàng)建一個新的CyclicBarrier當給定數量的參與方(線程)等待它時,它將觸發(fā),
* 并且在障礙觸發(fā)時不執(zhí)行預定義的操作。
*
* @param 在barrier被觸發(fā)之前必須調用await()的線程數
* @throws IllegalArgumentException 如果parties小于1拋出異常
*/
public CyclicBarrier(int parties) {
this(parties, null);
}
/**
*
* 當前線程調用await方法的線程告知CyclicBarrier已經到達屏障,然后當前線程被阻塞
*
* @return 當前線程的到達索引,其中索引為- 1表示第一個到達的,0表示最后一個到達的
* @throws InterruptedException 如果當前線程在等待時被中斷
* @throws BrokenBarrierException 如果另一個線程在當前線程等待時被中斷或超時,
* 或者屏障被重置,或者在調用await方法時屏障被破壞,或者屏障操作(如果存在)由于異常而失敗
*/
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
使用案例
await()
/**
* 線程數量
*/
private final static int threadCount = 15;
/**
* 屏障攔截的線程數量為5,表示每次屏障會攔截5個線程
*/
private static CyclicBarrier barrier = new CyclicBarrier(5);
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready {}", threadNum,barrier.getNumberWaiting());
//每次調用await方法后計數器+1,當前線程被阻塞
barrier.await();
log.info("{} continue", threadNum);
}
輸出結果:
16:16:40.245 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 0 is ready 0
16:16:41.244 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 1 is ready 1
16:16:42.244 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 2 is ready 2
16:16:43.244 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 3 is ready 3
16:16:44.245 [pool-1-thread-5] INFO com.zjq.aqs.CyclicBarrier - 4 is ready 4
16:16:44.245 [pool-1-thread-5] INFO com.zjq.aqs.CyclicBarrier - 4 continue
16:16:44.245 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 0 continue
16:16:44.245 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 1 continue
16:16:44.245 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 2 continue
16:16:44.245 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 3 continue
16:16:45.245 [pool-1-thread-6] INFO com.zjq.aqs.CyclicBarrier - 5 is ready 0
16:16:46.245 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 6 is ready 1
16:16:47.246 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 7 is ready 2
16:16:48.246 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 8 is ready 3
16:16:49.246 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 9 is ready 4
16:16:49.246 [pool-1-thread-6] INFO com.zjq.aqs.CyclicBarrier - 5 continue
16:16:49.246 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 9 continue
16:16:49.246 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 6 continue
16:16:49.246 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 8 continue
16:16:49.246 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 7 continue
16:16:50.247 [pool-1-thread-5] INFO com.zjq.aqs.CyclicBarrier - 10 is ready 0
16:16:51.247 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 11 is ready 1
16:16:52.247 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 12 is ready 2
16:16:53.248 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 13 is ready 3
16:16:54.248 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 14 is ready 4
16:16:54.248 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 14 continue
16:16:54.248 [pool-1-thread-5] INFO com.zjq.aqs.CyclicBarrier - 10 continue
16:16:54.248 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 12 continue
16:16:54.248 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 11 continue
16:16:54.248 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 13 continue
通過輸出結果可以知道,每次屏障會阻塞5個線程,5個線程執(zhí)行后計數器達到預設值,繼續(xù)執(zhí)行后續(xù)操作。
await(long timeout, TimeUnit unit)
/**
* 線程數量
*/
private final static int threadCount = 15;
/**
* 屏障攔截的線程數量為5,表示每次屏障會攔截5個線程
*/
private static CyclicBarrier barrier = new CyclicBarrier(5);
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready{}", threadNum,barrier.getNumberWaiting());
//每次調用await方法后計數器+1,當前線程被阻塞
//等待2s.為了使在發(fā)生異常的時候,不影響其他線程,一定要catch
//由于設置了超時時間后阻塞的線程可能會被中斷,拋出BarrierException異常,如果想繼續(xù)往下執(zhí)行,需要加上try-catch
try {
barrier.await(2, TimeUnit.SECONDS);
}catch (Exception e){
//查看執(zhí)行異常的線程
log.info("線程{} 執(zhí)行異常,阻塞被中斷?{}",threadNum,barrier.isBroken());
}
log.info("{} continue", threadNum);
}
輸出結果:
17:06:24.440 [pool-1-thread-1] INFO com.zjq.CyclicBarrier - 0 is ready0
17:06:25.435 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 1 is ready1
17:06:26.435 [pool-1-thread-3] INFO com.zjq.CyclicBarrier - 2 is ready2
17:06:26.455 [pool-1-thread-1] INFO com.zjq.CyclicBarrier - 線程0 執(zhí)行異常,阻塞被中斷?true
17:06:26.456 [pool-1-thread-3] INFO com.zjq.CyclicBarrier - 線程2 執(zhí)行異常,阻塞被中斷?true
17:06:26.456 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 線程1 執(zhí)行異常,阻塞被中斷?true
17:06:26.456 [pool-1-thread-1] INFO com.zjq.CyclicBarrier - 0 continue
17:06:26.456 [pool-1-thread-3] INFO com.zjq.CyclicBarrier - 2 continue
17:06:26.456 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 1 continue
17:06:27.434 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 3 is ready0
17:06:27.434 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 線程3 執(zhí)行異常,阻塞被中斷?true
17:06:27.434 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 3 continue
17:06:28.435 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 4 is ready0
17:06:28.435 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 線程4 執(zhí)行異常,阻塞被中斷?true
17:06:28.435 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 4 continue
17:06:29.435 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 5 is ready0
17:06:29.435 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 線程5 執(zhí)行異常,阻塞被中斷?true
17:06:29.435 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 5 continue
17:06:30.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 6 is ready0
17:06:30.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 線程6 執(zhí)行異常,阻塞被中斷?true
17:06:30.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 6 continue
17:06:31.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 7 is ready0
17:06:31.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 線程7 執(zhí)行異常,阻塞被中斷?true
17:06:31.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 7 continue
17:06:32.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 8 is ready0
17:06:32.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 線程8 執(zhí)行異常,阻塞被中斷?true
17:06:32.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 8 continue
17:06:33.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 9 is ready0
17:06:33.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 線程9 執(zhí)行異常,阻塞被中斷?true
17:06:33.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 9 continue
CyclicBarrier(int parties, Runnable barrierAction)
/**
* 線程到達屏障時,優(yōu)先執(zhí)行barrierAction,方便處理更復雜的業(yè)務場景
*/
private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
log.info("callback is running");
});
輸出結果:
17:11:38.867 [pool-1-thread-1] INFO com.zjq.CyclicBarrier3 - 0 is ready
17:11:38.966 [pool-1-thread-2] INFO com.zjq.CyclicBarrier3 - 1 is ready
17:11:39.067 [pool-1-thread-3] INFO com.zjq.CyclicBarrier3 - 2 is ready
17:11:39.167 [pool-1-thread-4] INFO com.zjq.CyclicBarrier3 - 3 is ready
17:11:39.268 [pool-1-thread-5] INFO com.zjq.CyclicBarrier3 - 4 is ready
17:11:39.268 [pool-1-thread-5] INFO com.zjq.CyclicBarrier3 - callback is running
17:11:39.268 [pool-1-thread-5] INFO com.zjq.CyclicBarrier3 - 4 continue
17:11:39.268 [pool-1-thread-1] INFO com.zjq.CyclicBarrier3 - 0 continue
17:11:39.268 [pool-1-thread-2] INFO com.zjq.CyclicBarrier3 - 1 continue
17:11:39.268 [pool-1-thread-3] INFO com.zjq.CyclicBarrier3 - 2 continue
17:11:39.268 [pool-1-thread-4] INFO com.zjq.CyclicBarrier3 - 3 continue
17:11:39.369 [pool-1-thread-6] INFO com.zjq.CyclicBarrier3 - 5 is ready
17:11:39.470 [pool-1-thread-7] INFO com.zjq.CyclicBarrier3 - 6 is ready
17:11:39.570 [pool-1-thread-8] INFO com.zjq.CyclicBarrier3 - 7 is ready
17:11:39.671 [pool-1-thread-9] INFO com.zjq.CyclicBarrier3 - 8 is ready
17:11:39.772 [pool-1-thread-10] INFO com.zjq.CyclicBarrier3 - 9 is ready
17:11:39.772 [pool-1-thread-10] INFO com.zjq.CyclicBarrier3 - callback is running
17:11:39.772 [pool-1-thread-10] INFO com.zjq.CyclicBarrier3 - 9 continue
17:11:39.772 [pool-1-thread-6] INFO com.zjq.CyclicBarrier3 - 5 continue
17:11:39.772 [pool-1-thread-9] INFO com.zjq.CyclicBarrier3 - 8 continue
17:11:39.772 [pool-1-thread-7] INFO com.zjq.CyclicBarrier3 - 6 continue
17:11:39.772 [pool-1-thread-8] INFO com.zjq.CyclicBarrier3 - 7 continue
CyclicBarrier和CountDownLatch的區(qū)別
- CountDownLatch的計數器只能使用一次。而CyclicBarrier的計數器可以使用reset()方法重置。所以CyclicBarrier能處理更為復雜的業(yè)務場景,比如如果計算發(fā)生錯誤,可以重置計數器,并讓線程們重新執(zhí)行一次。
- CountDownLatch主要用于實現(xiàn)一個或n個線程需要等待其他線程完成某項操作之后,才能繼續(xù)往下執(zhí)行,描述的是一個或n個線程等待其他線程的關系,而CyclicBarrier是多個線程相互等待,知道滿足條件以后再一起往下執(zhí)行。描述的是多個線程相互等待的場景
- CyclicBarrier還提供其他有用的方法,比如getNumberWaiting方法可以獲得CyclicBarrier阻塞的線程數量。isBroken方法用來知道阻塞的線程是否被中斷。
以上就是AQS同步組件CyclicBarrier循環(huán)屏障用例剖析的詳細內容,更多關于AQS同步組件CyclicBarrier的資料請關注腳本之家其它相關文章!
相關文章
springboot實現(xiàn)攔截器的3種方式及異步執(zhí)行的思考
實際項目中,我們經常需要輸出請求參數,響應結果,方法耗時,統(tǒng)一的權限校驗等。本文首先為大家介紹 HTTP 請求中三種常見的攔截實現(xiàn),并且比較一下其中的差異。感興趣的可以了解一下2021-07-07
springboot集成junit編寫單元測試實戰(zhàn)
在做單元測試時,代碼覆蓋率常常被拿來作為衡量測試好壞的指標,本文主要介紹了springboot集成junit編寫單元測試實戰(zhàn),文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-02-02

