CyclicBarrier之多線程中的循環(huán)柵欄詳解
1. CyclicBarrier簡介
現(xiàn)實(shí)生活中我們經(jīng)常會遇到這樣的情景,在進(jìn)行某個活動前需要等待人全部都齊了才開始。例如吃飯時要等全家人都上座了才動筷子,旅游時要等全部人都到齊了才出發(fā),比賽時要等運(yùn)動員都上場后才開始。
在JUC包中為我們提供了一個同步工具類能夠很好的模擬這類場景,它就是CyclicBarrier類。利用CyclicBarrier類可以實(shí)現(xiàn)一組線程相互等待,當(dāng)所有線程都到達(dá)某個屏障點(diǎn)(柵欄)后再進(jìn)行后續(xù)的操作。下圖演示了這一過程:
CyclicBarrier可以使一定數(shù)量的線程反復(fù)地在柵欄(不同輪次或不同代)位置處匯集。
- CyclicBarrier字面意思是“可重復(fù)使用的柵欄”,CyclicBarrier 和 CountDownLatch 很像,只是 CyclicBarrier 可以有不止一個柵欄,因?yàn)樗臇艡冢˙arrier)可以重復(fù)使用(Cyclic)。
- 當(dāng)線程到達(dá)柵欄位置時將調(diào)用await()方法,這個方法將阻塞(當(dāng)前線程)直到所有線程都到達(dá)柵欄位置。如果所有線程都到達(dá)柵欄位置,那么柵欄將打開,此時所有的線程都將被釋放,而柵欄將被重置以便下次使用。
2.CyclicBarrier的使用
2.1 常用方法
//參數(shù)parties:表示要到達(dá)屏障 (柵欄)的線程數(shù)量 //參數(shù)Runnable: 最后一個線程到達(dá)屏障之后要做的任務(wù) ? //構(gòu)造方法1 public CyclicBarrier(int parties) //構(gòu)造方法2 public CyclicBarrier(int parties, Runnable barrierAction) ? //線程調(diào)用await()方法表示當(dāng)前線程已經(jīng)到達(dá)柵欄,然后會被阻塞 public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } ? //帶時限的阻塞等待 public int await(long timeout, TimeUnit unit) throws InterruptedException,BrokenBarrierException,TimeoutException { return dowait(true, unit.toNanos(timeout)); }
2.2 使用舉例
適用場景:可用于需要多個線程均到達(dá)某一步之后才能繼續(xù)往下執(zhí)行的場景
//循環(huán)柵欄-可多次循環(huán)使用 CyclicBarrier cyclicBarrier = new CyclicBarrier(5,()->{ System.out.println(Thread.currentThread().getName()+" 完成最后任務(wù)!"); }); ? IntStream.range(1,6).forEach(i->{ new Thread(()->{ try { Thread.sleep(new Double(Math.random()*3000).longValue()); System.out.println(Thread.currentThread().getName()+" 到達(dá)柵欄A"); cyclicBarrier.await();//屏障點(diǎn)A,當(dāng)前線程會阻塞至此,等待計(jì)數(shù)器=0 System.out.println(Thread.currentThread().getName()+" 沖破柵欄A"); }catch (Exception e){ e.printStackTrace(); } }).start(); });
3.CyclicBarrier原理
CyclicBarrier是一道屏障,調(diào)用await()方法后,當(dāng)前線程進(jìn)入阻塞,當(dāng)parties數(shù)量的線程調(diào)用await方法后,所有的await方法會返回并繼續(xù)往下執(zhí)行。
3.1 成員變量
/** CyclicBarrier使用的排他鎖*/ private final ReentrantLock lock = new ReentrantLock(); /** barrier被沖破前,線程等待的condition*/ private final Condition trip = lock.newCondition(); /** barrier被沖破時,需要滿足的參與線程數(shù)。*/ private final int parties; /* barrier被沖破后執(zhí)行的方法。*/ private final Runnable barrierCommand; /** 當(dāng)其輪次 */ private Generation generation = new Generation(); ? /** *目前等待剩余的參與者數(shù)量。從 parties倒數(shù)到0。每個輪次該值會被重置回parties */ private int count;
(1)CyclicBarrier內(nèi)部是通過條件隊(duì)列trip來對線程進(jìn)行阻塞的,并且其內(nèi)部維護(hù)了兩個int型的變量parties和count。
- parties表示每次攔截的線程數(shù),該值在構(gòu)造時進(jìn)行賦值。
- count是內(nèi)部計(jì)數(shù)器,它的初始值和parties相同,以后隨著每次await方法的調(diào)用而減1,直到減為0就將所有線程喚醒。
(2)CyclicBarrier有一個靜態(tài)內(nèi)部類Generation,該類的對象代表柵欄的當(dāng)前代,就像玩游戲時代表的本局游戲,利用它可以實(shí)現(xiàn)循環(huán)等待
(3)barrierCommand表示換代前執(zhí)行的任務(wù),當(dāng)count減為0時表示本局游戲結(jié)束,需要轉(zhuǎn)到下一局。在轉(zhuǎn)到下一局游戲之前會將所有阻塞的線程喚醒,在喚醒所有線程之前你可以通過指定barrierCommand來執(zhí)行自己的任務(wù)。
3.2 構(gòu)造器
//構(gòu)造器1:指定本局要攔截的線程數(shù)parties 及 本局結(jié)束時要執(zhí)行的任務(wù) public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } ? //構(gòu)造器2 public CyclicBarrier(int parties) { this(parties, null); }
3.3 等待的方法
CyclicBarrier類最主要的功能就是使先到達(dá)屏障點(diǎn)的線程阻塞并等待后面的線程,其中它提供了兩種等待的方法,分別是定時等待和非定時等待。源代碼中await()方法最終調(diào)用的是dowait()方法:
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException,TimeoutException { // 獲取獨(dú)占鎖 final ReentrantLock lock = this.lock; lock.lock();//對共享資源count,generation操作前,需先上鎖保證線程安全 try { // 當(dāng)前代--當(dāng)前輪次對象的引用 final Generation g = generation; // 如果這輪次broken了,拋出異常 if (g.broken) throw new BrokenBarrierException(); // 如果線程中斷了,拋出異常 if (Thread.interrupted()) { breakBarrier();//如果被打斷,通過此方法設(shè)置當(dāng)前輪次為broken狀態(tài),通知當(dāng)前輪次所有等待的線程 throw new InterruptedException();//拋出異常 } //自旋前 //1、count值-1 int index = --count; // 2、判斷是否到0,若是,則沖破屏障點(diǎn)(說明最后一個線程已經(jīng)到達(dá)) if (index == 0) { boolean ranAction = false; try { final Runnable command = barrierCommand; // 3、執(zhí)行柵欄任務(wù)(若CyclicBarrier構(gòu)造時傳入了Runnable,則調(diào)用) if (command != null) command.run(); ranAction = true; // 4、更新一輪次,將count重置,將generation重置,喚醒之前等待的線程 nextGeneration(); return 0; } finally { // 如果執(zhí)行柵欄任務(wù)(command)的時候出現(xiàn)了異常,那么就認(rèn)為本輪次破環(huán)了 if (!ranAction) breakBarrier(); } } //計(jì)數(shù)器沒有到0 =》開始自旋,直到屏障被沖破,或者interrupted或者超時 for (;;) { try { // 開始等待;如果沒有時間限制,則直接等待,直到被喚醒(讓其他線程進(jìn)入到lock的代碼塊執(zhí)行以上邏輯) if (!timed) trip.await();//阻塞,此時會釋放鎖,以讓其他線程進(jìn)入await方法中,等待屏障被沖破后,向后執(zhí)行 // 如果有時間限制,則等待指定時間 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { // 如果當(dāng)前線程阻塞被interrupt了,并且本輪次還沒有被break,那么修改本輪次狀態(tài)為broken if (g == generation && ! g.broken) { // 讓柵欄失效 breakBarrier(); throw ie; } else { // 上面條件不滿足,說明這個線程不是這輪次的,就不會影響當(dāng)前這代柵欄的執(zhí)行,所以,就打個中斷標(biāo)記 Thread.currentThread().interrupt(); } } // 當(dāng)有任何一個線程中斷了,就會調(diào)用breakBarrier方法,就會喚醒其他的線程,其他線程醒來后,也要拋出異常 if (g.broken) throw new BrokenBarrierException(); // g != generation表示正常換輪次了,返回當(dāng)前線程所在柵欄的下標(biāo) // 如果 g == generation,說明還沒有換,那為什么會醒了? // 因?yàn)橐粋€線程可以使用多個柵欄,當(dāng)別的柵欄喚醒了這個線程,就會走到這里,所以需要判斷是否是當(dāng)前代。 // 正是因?yàn)檫@個原因,才需要generation來保證正確。 if (g != generation) return index; // 如果有時間限制,且時間小于等于0,銷毀柵欄并拋出異常 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } }//自旋 } finally { // 釋放獨(dú)占鎖 lock.unlock(); } }
更新本輪次的方法:nextGeneration()
private void nextGeneration() { trip.signalAll();//喚醒本輪次等待的線程 count = parties;//重置count值為初始值,為下一輪次(代)使用 generation = new Generation();//更新本輪次對象,這樣自旋中的線程才會跳出自旋。 } ? private static class Generation { boolean broken = false; } ? private void breakBarrier() { generation.broken = true;//設(shè)置標(biāo)識 count = parties;//重置count值為初始值 trip.signalAll();//喚醒所有等待線程 }
總結(jié)
以上為個人經(jīng)驗(yàn),希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
IDEA 自定義方法注解模板的實(shí)現(xiàn)方法
這篇文章主要介紹了IDEA 自定義方法注解模板的實(shí)現(xiàn)方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-09-09xxl-job定時任務(wù)配置應(yīng)用及添加到springboot項(xiàng)目中實(shí)現(xiàn)動態(tài)API調(diào)用
XXL-JOB是一個分布式任務(wù)調(diào)度平臺,其核心設(shè)計(jì)目標(biāo)是開發(fā)迅速、學(xué)習(xí)簡單、輕量級、易擴(kuò)展,本篇文章主要是對xuxueli的xxl-job做一個簡單的配置,以及將其添加到自己已有的項(xiàng)目中進(jìn)行api調(diào)用,感興趣的朋友跟隨小編一起看看吧2024-04-04深度解析Spring AI請求與響應(yīng)機(jī)制的核心邏輯
我們在前面的兩個章節(jié)中基本上對Spring Boot 3版本的新變化進(jìn)行了全面的回顧,以確保在接下來研究Spring AI時能夠避免任何潛在的問題,本文給大家介紹Spring AI請求與響應(yīng)機(jī)制的核心邏輯,感興趣的朋友跟隨小編一起看看吧2024-11-11SpringMVC 重定向參數(shù)RedirectAttributes實(shí)例
這篇文章主要介紹了SpringMVC 重定向參數(shù)RedirectAttributes實(shí)例,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-12-12里氏代換原則_動力節(jié)點(diǎn)Java學(xué)院整理
這篇文章主要為大家詳細(xì)介紹了里氏代換原則的相關(guān)資料,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-08-08詳解微信開發(fā)之Author網(wǎng)頁授權(quán)
微信開發(fā)中,經(jīng)常有這樣的需求:獲得用戶頭像、綁定微信號給用戶發(fā)信息,那么實(shí)現(xiàn)這些的前提就是授權(quán)!本文對此進(jìn)行系統(tǒng)介紹,需要的朋友一起來看下吧2016-12-12springboot實(shí)現(xiàn)mock平臺的示例代碼
本文主要介紹了springboot實(shí)現(xiàn)mock平臺的示例代碼,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-06-06