AQS同步組件CyclicBarrier循環(huán)屏障用例剖析
CyclicBarrier原理
CyclicBarrier 的字面意思是可循環(huán)使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達(dá)一個(gè)屏障(也可以叫同步點(diǎn))時(shí)被阻塞,直到最后一個(gè)線程到達(dá)屏障時(shí),屏障才會(huì)開門,所有被屏障攔截的線程才會(huì)繼續(xù)運(yùn)行。
當(dāng)某個(gè)線程調(diào)用了await方法之后,就會(huì)進(jìn)入等待狀態(tài),并將計(jì)數(shù)器+1,直到所有線程調(diào)用await方法使計(jì)數(shù)器為CyclicBarrier設(shè)置的值,才可以繼續(xù)執(zhí)行,由于計(jì)數(shù)器可以重復(fù)使用,所以我們又叫它循環(huán)屏障。
CyclicBarrier默認(rèn)的構(gòu)造方法是CyclicBarrier(int parties),其參數(shù)表示屏障攔截的線程數(shù)量,每個(gè)線程調(diào)用await方法告訴CyclicBarrier我已經(jīng)到達(dá)了屏障,然后當(dāng)前線程被阻塞。
CyclicBarrier可以用于多線程計(jì)算數(shù)據(jù),最后合并計(jì)算結(jié)果的應(yīng)用場景。
源碼分析
/** * 創(chuàng)建一個(gè)新的CyclicBarrier當(dāng)給定數(shù)量的參與方(線程)等待它時(shí),它將觸發(fā), * 并且在障礙觸發(fā)時(shí)不執(zhí)行預(yù)定義的操作。 * * @param 在barrier被觸發(fā)之前必須調(diào)用await()的線程數(shù) * @throws IllegalArgumentException 如果parties小于1拋出異常 */ public CyclicBarrier(int parties) { this(parties, null); } /** * * 當(dāng)前線程調(diào)用await方法的線程告知CyclicBarrier已經(jīng)到達(dá)屏障,然后當(dāng)前線程被阻塞 * * @return 當(dāng)前線程的到達(dá)索引,其中索引為- 1表示第一個(gè)到達(dá)的,0表示最后一個(gè)到達(dá)的 * @throws InterruptedException 如果當(dāng)前線程在等待時(shí)被中斷 * @throws BrokenBarrierException 如果另一個(gè)線程在當(dāng)前線程等待時(shí)被中斷或超時(shí), * 或者屏障被重置,或者在調(diào)用await方法時(shí)屏障被破壞,或者屏障操作(如果存在)由于異常而失敗 */ public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
使用案例
await()
/** * 線程數(shù)量 */ private final static int threadCount = 15; /** * 屏障攔截的線程數(shù)量為5,表示每次屏障會(huì)攔截5個(gè)線程 */ private static CyclicBarrier barrier = new CyclicBarrier(5); public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < threadCount; i++) { final int threadNum = i; Thread.sleep(1000); executor.execute(() -> { try { race(threadNum); } catch (Exception e) { log.error("exception", e); } }); } executor.shutdown(); } private static void race(int threadNum) throws Exception { Thread.sleep(1000); log.info("{} is ready {}", threadNum,barrier.getNumberWaiting()); //每次調(diào)用await方法后計(jì)數(shù)器+1,當(dāng)前線程被阻塞 barrier.await(); log.info("{} continue", threadNum); }
輸出結(jié)果:
16:16:40.245 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 0 is ready 0
16:16:41.244 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 1 is ready 1
16:16:42.244 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 2 is ready 2
16:16:43.244 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 3 is ready 3
16:16:44.245 [pool-1-thread-5] INFO com.zjq.aqs.CyclicBarrier - 4 is ready 4
16:16:44.245 [pool-1-thread-5] INFO com.zjq.aqs.CyclicBarrier - 4 continue
16:16:44.245 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 0 continue
16:16:44.245 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 1 continue
16:16:44.245 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 2 continue
16:16:44.245 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 3 continue
16:16:45.245 [pool-1-thread-6] INFO com.zjq.aqs.CyclicBarrier - 5 is ready 0
16:16:46.245 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 6 is ready 1
16:16:47.246 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 7 is ready 2
16:16:48.246 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 8 is ready 3
16:16:49.246 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 9 is ready 4
16:16:49.246 [pool-1-thread-6] INFO com.zjq.aqs.CyclicBarrier - 5 continue
16:16:49.246 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 9 continue
16:16:49.246 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 6 continue
16:16:49.246 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 8 continue
16:16:49.246 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 7 continue
16:16:50.247 [pool-1-thread-5] INFO com.zjq.aqs.CyclicBarrier - 10 is ready 0
16:16:51.247 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 11 is ready 1
16:16:52.247 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 12 is ready 2
16:16:53.248 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 13 is ready 3
16:16:54.248 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 14 is ready 4
16:16:54.248 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 14 continue
16:16:54.248 [pool-1-thread-5] INFO com.zjq.aqs.CyclicBarrier - 10 continue
16:16:54.248 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 12 continue
16:16:54.248 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 11 continue
16:16:54.248 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 13 continue
通過輸出結(jié)果可以知道,每次屏障會(huì)阻塞5個(gè)線程,5個(gè)線程執(zhí)行后計(jì)數(shù)器達(dá)到預(yù)設(shè)值,繼續(xù)執(zhí)行后續(xù)操作。
await(long timeout, TimeUnit unit)
/** * 線程數(shù)量 */ private final static int threadCount = 15; /** * 屏障攔截的線程數(shù)量為5,表示每次屏障會(huì)攔截5個(gè)線程 */ private static CyclicBarrier barrier = new CyclicBarrier(5); public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < threadCount; i++) { final int threadNum = i; Thread.sleep(1000); executor.execute(() -> { try { race(threadNum); } catch (Exception e) { log.error("exception", e); } }); } executor.shutdown(); } private static void race(int threadNum) throws Exception { Thread.sleep(1000); log.info("{} is ready{}", threadNum,barrier.getNumberWaiting()); //每次調(diào)用await方法后計(jì)數(shù)器+1,當(dāng)前線程被阻塞 //等待2s.為了使在發(fā)生異常的時(shí)候,不影響其他線程,一定要catch //由于設(shè)置了超時(shí)時(shí)間后阻塞的線程可能會(huì)被中斷,拋出BarrierException異常,如果想繼續(xù)往下執(zhí)行,需要加上try-catch try { barrier.await(2, TimeUnit.SECONDS); }catch (Exception e){ //查看執(zhí)行異常的線程 log.info("線程{} 執(zhí)行異常,阻塞被中斷?{}",threadNum,barrier.isBroken()); } log.info("{} continue", threadNum); }
輸出結(jié)果:
17:06:24.440 [pool-1-thread-1] INFO com.zjq.CyclicBarrier - 0 is ready0
17:06:25.435 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 1 is ready1
17:06:26.435 [pool-1-thread-3] INFO com.zjq.CyclicBarrier - 2 is ready2
17:06:26.455 [pool-1-thread-1] INFO com.zjq.CyclicBarrier - 線程0 執(zhí)行異常,阻塞被中斷?true
17:06:26.456 [pool-1-thread-3] INFO com.zjq.CyclicBarrier - 線程2 執(zhí)行異常,阻塞被中斷?true
17:06:26.456 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 線程1 執(zhí)行異常,阻塞被中斷?true
17:06:26.456 [pool-1-thread-1] INFO com.zjq.CyclicBarrier - 0 continue
17:06:26.456 [pool-1-thread-3] INFO com.zjq.CyclicBarrier - 2 continue
17:06:26.456 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 1 continue
17:06:27.434 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 3 is ready0
17:06:27.434 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 線程3 執(zhí)行異常,阻塞被中斷?true
17:06:27.434 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 3 continue
17:06:28.435 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 4 is ready0
17:06:28.435 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 線程4 執(zhí)行異常,阻塞被中斷?true
17:06:28.435 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 4 continue
17:06:29.435 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 5 is ready0
17:06:29.435 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 線程5 執(zhí)行異常,阻塞被中斷?true
17:06:29.435 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 5 continue
17:06:30.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 6 is ready0
17:06:30.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 線程6 執(zhí)行異常,阻塞被中斷?true
17:06:30.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 6 continue
17:06:31.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 7 is ready0
17:06:31.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 線程7 執(zhí)行異常,阻塞被中斷?true
17:06:31.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 7 continue
17:06:32.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 8 is ready0
17:06:32.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 線程8 執(zhí)行異常,阻塞被中斷?true
17:06:32.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 8 continue
17:06:33.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 9 is ready0
17:06:33.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 線程9 執(zhí)行異常,阻塞被中斷?true
17:06:33.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 9 continue
CyclicBarrier(int parties, Runnable barrierAction)
/** * 線程到達(dá)屏障時(shí),優(yōu)先執(zhí)行barrierAction,方便處理更復(fù)雜的業(yè)務(wù)場景 */ private static CyclicBarrier barrier = new CyclicBarrier(5, () -> { log.info("callback is running"); });
輸出結(jié)果:
17:11:38.867 [pool-1-thread-1] INFO com.zjq.CyclicBarrier3 - 0 is ready
17:11:38.966 [pool-1-thread-2] INFO com.zjq.CyclicBarrier3 - 1 is ready
17:11:39.067 [pool-1-thread-3] INFO com.zjq.CyclicBarrier3 - 2 is ready
17:11:39.167 [pool-1-thread-4] INFO com.zjq.CyclicBarrier3 - 3 is ready
17:11:39.268 [pool-1-thread-5] INFO com.zjq.CyclicBarrier3 - 4 is ready
17:11:39.268 [pool-1-thread-5] INFO com.zjq.CyclicBarrier3 - callback is running
17:11:39.268 [pool-1-thread-5] INFO com.zjq.CyclicBarrier3 - 4 continue
17:11:39.268 [pool-1-thread-1] INFO com.zjq.CyclicBarrier3 - 0 continue
17:11:39.268 [pool-1-thread-2] INFO com.zjq.CyclicBarrier3 - 1 continue
17:11:39.268 [pool-1-thread-3] INFO com.zjq.CyclicBarrier3 - 2 continue
17:11:39.268 [pool-1-thread-4] INFO com.zjq.CyclicBarrier3 - 3 continue
17:11:39.369 [pool-1-thread-6] INFO com.zjq.CyclicBarrier3 - 5 is ready
17:11:39.470 [pool-1-thread-7] INFO com.zjq.CyclicBarrier3 - 6 is ready
17:11:39.570 [pool-1-thread-8] INFO com.zjq.CyclicBarrier3 - 7 is ready
17:11:39.671 [pool-1-thread-9] INFO com.zjq.CyclicBarrier3 - 8 is ready
17:11:39.772 [pool-1-thread-10] INFO com.zjq.CyclicBarrier3 - 9 is ready
17:11:39.772 [pool-1-thread-10] INFO com.zjq.CyclicBarrier3 - callback is running
17:11:39.772 [pool-1-thread-10] INFO com.zjq.CyclicBarrier3 - 9 continue
17:11:39.772 [pool-1-thread-6] INFO com.zjq.CyclicBarrier3 - 5 continue
17:11:39.772 [pool-1-thread-9] INFO com.zjq.CyclicBarrier3 - 8 continue
17:11:39.772 [pool-1-thread-7] INFO com.zjq.CyclicBarrier3 - 6 continue
17:11:39.772 [pool-1-thread-8] INFO com.zjq.CyclicBarrier3 - 7 continue
CyclicBarrier和CountDownLatch的區(qū)別
- CountDownLatch的計(jì)數(shù)器只能使用一次。而CyclicBarrier的計(jì)數(shù)器可以使用reset()方法重置。所以CyclicBarrier能處理更為復(fù)雜的業(yè)務(wù)場景,比如如果計(jì)算發(fā)生錯(cuò)誤,可以重置計(jì)數(shù)器,并讓線程們重新執(zhí)行一次。
- CountDownLatch主要用于實(shí)現(xiàn)一個(gè)或n個(gè)線程需要等待其他線程完成某項(xiàng)操作之后,才能繼續(xù)往下執(zhí)行,描述的是一個(gè)或n個(gè)線程等待其他線程的關(guān)系,而CyclicBarrier是多個(gè)線程相互等待,知道滿足條件以后再一起往下執(zhí)行。描述的是多個(gè)線程相互等待的場景
- CyclicBarrier還提供其他有用的方法,比如getNumberWaiting方法可以獲得CyclicBarrier阻塞的線程數(shù)量。isBroken方法用來知道阻塞的線程是否被中斷。
以上就是AQS同步組件CyclicBarrier循環(huán)屏障用例剖析的詳細(xì)內(nèi)容,更多關(guān)于AQS同步組件CyclicBarrier的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
springboot實(shí)現(xiàn)攔截器的3種方式及異步執(zhí)行的思考
實(shí)際項(xiàng)目中,我們經(jīng)常需要輸出請求參數(shù),響應(yīng)結(jié)果,方法耗時(shí),統(tǒng)一的權(quán)限校驗(yàn)等。本文首先為大家介紹 HTTP 請求中三種常見的攔截實(shí)現(xiàn),并且比較一下其中的差異。感興趣的可以了解一下2021-07-07關(guān)于設(shè)置Mybatis打印調(diào)試sql的兩種方式
這篇文章主要介紹了關(guān)于設(shè)置Mybatis打印調(diào)試sql的兩種方式,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-08-08講解Java設(shè)計(jì)模式編程中的建造者模式與原型模式
這篇文章主要介紹了Java設(shè)計(jì)模式編程中的建造者模式與原型模式,設(shè)計(jì)模式有利于團(tuán)隊(duì)開發(fā)過程中的代碼維護(hù),需要的朋友可以參考下2016-02-02springboot集成junit編寫單元測試實(shí)戰(zhàn)
在做單元測試時(shí),代碼覆蓋率常常被拿來作為衡量測試好壞的指標(biāo),本文主要介紹了springboot集成junit編寫單元測試實(shí)戰(zhàn),文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-02-02java開發(fā)主流定時(shí)任務(wù)解決方案全橫評詳解
這篇文章主要為大家介紹了java開發(fā)主流定時(shí)任務(wù)解決方案全橫評詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-09-09