Java AQS中CyclicBarrier回環(huán)柵欄的使用
一. 簡介
CyclicBarrier ,回環(huán)柵欄(循環(huán)屏障),通過它可以實(shí)現(xiàn)讓一組線程等待至某個狀態(tài)(屏障點(diǎn))之后再全部同時執(zhí)行。叫做回環(huán)是因?yàn)楫?dāng)所有等待線程都被釋放以后,CyclicBarrier可以被重用。
二. CyclicBarrier的使用
構(gòu)造方法
//parties表示屏障攔截的線程數(shù)量,每個線程調(diào)用await方法告訴CyclicBarrier我已經(jīng)到達(dá)了屏障,然后當(dāng)前線程被阻塞。 public CyclicBarrier(int parties) { this(parties, null); } //用于在線程到達(dá)屏障時,優(yōu)先執(zhí)行 barrierAction, //方便處理更復(fù)雜的業(yè)務(wù)場景(該線程的執(zhí)行時機(jī)是在到達(dá)屏障之后再執(zhí)行) public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
重要方法
//指定數(shù)量的線程全部調(diào)用await()方法時,這些線程不再阻塞 // BrokenBarrierException 表示柵欄已經(jīng)被破壞,破壞的原因可能是其中一個線程 await() 時被中斷或者超時 public int await() throws InterruptedException, BrokenBarrierException public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException //循環(huán)重置 public void reset() {}
三. CyclicBarrier的應(yīng)用場景
CyclicBarrier 可以用于多線程計算數(shù)據(jù),最后合并計算結(jié)果的場景。
例子一 ,多個線程調(diào)用await之后阻塞,等到達(dá)到屏障攔截的線程數(shù)量之后,再一起執(zhí)行
public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(3); for (int i = 0; i < 5; i++) { new Thread(new Runnable() { @Override public void run() { try { System.out.println(Thread.currentThread().getName() + "開始等待其他線程"); cyclicBarrier.await(); System.out.println(Thread.currentThread().getName() + "開始執(zhí)行"); //TODO 模擬業(yè)務(wù)處理 Thread.sleep(5000); System.out.println(Thread.currentThread().getName() + "執(zhí)行完畢"); } catch (Exception e) { e.printStackTrace(); } } }).start(); } }
Thread-0開始等待其他線程
Thread-2開始等待其他線程
Thread-1開始等待其他線程
Thread-4開始等待其他線程
Thread-3開始等待其他線程
Thread-2開始執(zhí)行
Thread-1開始執(zhí)行
Thread-0開始執(zhí)行
例子二 用于多線程計算數(shù)據(jù),最后合并計算結(jié)果的場景
//保存每個學(xué)生的平均成績 private ConcurrentHashMap<String, Integer> map=new ConcurrentHashMap<String,Integer>(); private ExecutorService threadPool= Executors.newFixedThreadPool(3); private CyclicBarrier cb=new CyclicBarrier(3,()->{ int result=0; Set<String> set = map.keySet(); for(String s:set){ result+=map.get(s); } System.out.println("三人平均成績?yōu)?"+(result/3)+"分"); }); public void count(){ for(int i=0;i<3;i++){ threadPool.execute(new Runnable(){ @Override public void run() { //獲取學(xué)生平均成績 int score=(int)(Math.random()*40+60); map.put(Thread.currentThread().getName(), score); System.out.println(Thread.currentThread().getName() +"同學(xué)的平均成績?yōu)椋?+score); try { //執(zhí)行完運(yùn)行await(),等待所有學(xué)生平均成績都計算完畢 cb.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }); } } public static void main(String[] args) { CyclicBarrierTest2 cb=new CyclicBarrierTest2(); cb.count(); }
pool-1-thread-3同學(xué)的平均成績?yōu)椋?2
pool-1-thread-1同學(xué)的平均成績?yōu)椋?4
pool-1-thread-2同學(xué)的平均成績?yōu)椋?5
三人平均成績?yōu)?73分
例子三 利用CyclicBarrier的計數(shù)器能夠重置,屏障可以重復(fù)使用的特性
public static void main(String[] args) { AtomicInteger counter = new AtomicInteger(); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 5, 5, 1000, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), (r) -> new Thread(r, counter.addAndGet(1) + " 號 "), new ThreadPoolExecutor.AbortPolicy()); CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("裁判:比賽開始~~")); for (int i = 0; i < 10; i++) { threadPoolExecutor.submit(new Runner(cyclicBarrier)); } } static class Runner extends Thread{ private CyclicBarrier cyclicBarrier; public Runner (CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override public void run() { try { int sleepMills = ThreadLocalRandom.current().nextInt(1000); Thread.sleep(sleepMills); System.out.println(Thread.currentThread().getName() + " 選手已就位, 準(zhǔn)備共用時: " + sleepMills + "ms" + cyclicBarrier.getNumberWaiting()); cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); }catch(BrokenBarrierException e){ e.printStackTrace(); } } }
3 號 選手已就位, 準(zhǔn)備共用時: 223ms0
2 號 選手已就位, 準(zhǔn)備共用時: 315ms1
5 號 選手已就位, 準(zhǔn)備共用時: 471ms2
1 號 選手已就位, 準(zhǔn)備共用時: 556ms3
4 號 選手已就位, 準(zhǔn)備共用時: 923ms4
裁判:比賽開始~~
3 號 選手已就位, 準(zhǔn)備共用時: 285ms0
2 號 選手已就位, 準(zhǔn)備共用時: 413ms1
1 號 選手已就位, 準(zhǔn)備共用時: 533ms2
5 號 選手已就位, 準(zhǔn)備共用時: 661ms3
4 號 選手已就位, 準(zhǔn)備共用時: 810ms4
裁判:比賽開始~~
四. CyclicBarrier與CountDownLatch的區(qū)別
- CountDownLatch的計數(shù)器只能使用一次,而CyclicBarrier的計數(shù)器可以使用reset() 方法重置。所以CyclicBarrier能處理更為復(fù)雜的業(yè)務(wù)場景,比如如果計算發(fā)生錯誤,可以重置計數(shù)器,并讓線程們重新執(zhí)行一次
- CyclicBarrier還提供getNumberWaiting(可以獲得CyclicBarrier阻塞的線程數(shù)量)、isBroken(用來知道阻塞的線程是否被中斷)等方法。
- CountDownLatch會阻塞主線程,CyclicBarrier不會阻塞主線程,只會阻塞子線程。
- CountDownLatch和CyclicBarrier都能夠?qū)崿F(xiàn)線程之間的等待,只不過它們側(cè)重點(diǎn)不同。CountDownLatch一般用于一個或多個線程,等待其他線程執(zhí)行完任務(wù)后,再執(zhí)行。CyclicBarrier一般用于一組線程互相等待至某個狀態(tài),然后這一組線程再同時執(zhí)行。
- CyclicBarrier 還可以提供一個 barrierAction,合并多線程計算結(jié)果。
- CyclicBarrier是通過ReentrantLock的"獨(dú)占鎖"和Conditon來實(shí)現(xiàn)一組線程的阻塞喚醒的,而CountDownLatch則是通過AQS的“共享鎖”實(shí)現(xiàn)
五. CyclicBarrier源碼解析
以例子以為例,thread0 從await方法開始,await會調(diào)用dowait
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
那么dowait里又做了什么
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; //如果狀態(tài)為0就將其修改為1,并設(shè)置當(dāng)前線程,調(diào)用await時外面肯定是lock lock.lock(); try { //每一個棧欄算是一代 final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } //每個線程執(zhí)行一次,count自減一 int index = --count; // count減到0,說明幾個線程都到達(dá)屏障,就會重置 進(jìn)入下一個屏障 if (index == 0) { boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) //trip就是一個條件隊列condition,入條件等待隊列,單向鏈表結(jié)構(gòu) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
入隊阻塞的方法在await()中,下面看一下
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //入隊 創(chuàng)建節(jié)點(diǎn) Node node = addConditionWaiter(); // 釋放鎖,這樣其他線程才能獲取鎖,執(zhí)行 int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { //不在當(dāng)前隊列中就阻塞 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
addConditionWaiter中創(chuàng)建節(jié)點(diǎn),對于thread0,頭節(jié)點(diǎn)是自己,lastWaiter也是自己
/** * 添加條件等待節(jié)點(diǎn) */ private Node addConditionWaiter() { Node t = lastWaiter; //節(jié)點(diǎn)取消 則移除. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } //創(chuàng)建一個節(jié)點(diǎn),thread=Thread.currentThread(),waitstatus= Node.CONDITION =-2 Node node = new Node(Thread.currentThread(), Node.CONDITION); //如果上一個節(jié)點(diǎn)為null,則將該節(jié)點(diǎn)設(shè)置為頭節(jié)點(diǎn) if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
釋放鎖
final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //釋放鎖時state變?yōu)?了 if (c == 0) { free = true; //將當(dāng)前線程設(shè)置為null setExclusiveOwnerThread(null); } //修改state狀態(tài)值 setState(c); return free; }
thread0的大致流程
首先會調(diào)用lock.lock進(jìn)行加鎖,加鎖之后調(diào)用trip.await方法進(jìn)行入隊阻塞,入隊是通過addConditionWaiter添加進(jìn)條件等待隊列,然后通過fullyRelease釋放鎖,設(shè)置當(dāng)前線程為null,然后修改state狀態(tài)值,最后調(diào)用LockSupport.park(this);進(jìn)行阻塞。
threa1的流程大致和thread0一樣,還沒有達(dá)到屏障數(shù)量,入隊的地方和thread0不一樣
private Node addConditionWaiter() { Node t = lastWaiter; //節(jié)點(diǎn)取消 則移除. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } //創(chuàng)建一個節(jié)點(diǎn),thread=Thread.currentThread(),waitstatus= Node.CONDITION =-2 Node node = new Node(Thread.currentThread(), Node.CONDITION); //如果上一個節(jié)點(diǎn)為null,則將該節(jié)點(diǎn)設(shè)置為頭節(jié)點(diǎn) if (t == null) firstWaiter = node; else //thread1執(zhí)行時,t已經(jīng)不為null了 t.nextWaiter = node; //lastWaiter指向當(dāng)前 lastWaiter = node; return node; }
thread1流程
thread2執(zhí)行的時候count已經(jīng)減到0,會執(zhí)行nextGeneration方法
//每個線程執(zhí)行一次,count自減一 int index = --count; // count減到0,說明幾個線程都到達(dá)屏障,就會重置 進(jìn)入下一個屏障 if (index == 0) { boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; //開啟下一代屏障 nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } private void nextGeneration() { // 喚醒所有線程,喚醒操作是在同步等待隊列中,所以要將條件等待隊列轉(zhuǎn)換為同步等待隊列 trip.signalAll(); // 重置count count = parties; //創(chuàng)建下一代屏障 generation = new Generation(); } public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } private void doSignalAll(Node first) { //將firstWaiter和lastWaiter都置為null,就沒有首尾節(jié)點(diǎn)了 lastWaiter = firstWaiter = null; //條件隊列的出隊 do { //循環(huán)將條件隊列轉(zhuǎn)同步隊列 Node next = first.nextWaiter; //first的nextWaiter置為null first.nextWaiter = null; //條件隊列轉(zhuǎn)同步隊列,因?yàn)閱拘咽窃谕疥犃兄? transferForSignal(first); first = next; } while (first != null); }
條件隊列的出隊
/** * 條件隊列轉(zhuǎn)同步隊列 */ final boolean transferForSignal(Node node) { //將同步狀態(tài)改為0 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; Node p = enq(node); int ws = p.waitStatus; //將ws設(shè)置為-1 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
同步隊列入隊
thread2開始解鎖
private int dowait(boolean timed, long nanos){ //.... } finally { lock.unlock(); } } public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) //喚醒線程0 unparkSuccessor(h); return true; } return false; } private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) //將ws設(shè)置為0 compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) //喚醒線程0 LockSupport.unpark(s.thread); }
Thread0喚醒之后就會獲取鎖,執(zhí)行業(yè)務(wù)邏輯然后再釋放鎖
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 在這里獲取鎖,再釋放鎖 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } //CAS嘗試獲取鎖 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); //cas獲取鎖 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
同步隊列就會出隊
Thread0喚醒之后,執(zhí)行業(yè)務(wù)邏輯之后會unlock,又會喚醒Thread1,Thread1喚醒之后,又會喚醒Thread2。
總結(jié):
await方法,
前半段 釋放鎖 進(jìn)入條件隊列,阻塞線程(Thread0 Thread1),
過渡階段 其他線程調(diào)用singnal/signalAll喚醒(Thread2),條件隊列轉(zhuǎn)同步隊列,可以在釋放鎖的時候喚醒head的后續(xù)節(jié)點(diǎn)所在的線程
后半段 (Thread0)被喚醒的線程獲取鎖(如果有競爭,CAS獲取鎖失敗,還會阻塞),Thread0釋放鎖,喚醒同步隊列中head的后續(xù)節(jié)點(diǎn)所在的線程(獨(dú)占鎖的邏輯)
到此這篇關(guān)于Java AQS中CyclicBarrier回環(huán)柵欄的使用的文章就介紹到這了,更多相關(guān)Java CyclicBarrier內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java中保證多線程間的數(shù)據(jù)共享的方法詳解
這篇文章詳解的發(fā)給大家介紹了Java中是如何保證多線程間的數(shù)據(jù)共享的,文中通過圖文介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作有一定的幫助,需要的朋友可以參考下2023-11-11SpringBoot手動開啟事務(wù):DataSourceTransactionManager問題
這篇文章主要介紹了SpringBoot手動開啟事務(wù):DataSourceTransactionManager問題,具有很好的價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-07-07Java程序員的10道常見的XML面試問答題(XML術(shù)語詳解)
包括web開發(fā)人員的Java面試在內(nèi)的各種面試中,XML面試題在各種編程工作的面試中很常見。XML是一種成熟的技術(shù),經(jīng)常作為從一個平臺到其他平臺傳輸數(shù)據(jù)的標(biāo)準(zhǔn)2014-04-04IDEA關(guān)于.properties資源文件的編碼調(diào)整問題
這篇文章主要介紹了IDEA關(guān)于.properties資源文件的編碼調(diào)整問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-06-06Spring Boot(四)之使用JWT和Spring Security保護(hù)REST API
這篇文章主要介紹了Spring Boot(四)之使用JWT和Spring Security保護(hù)REST API的相關(guān)知識,需要的朋友可以參考下2017-04-04關(guān)于Springboot+gateway整合依賴并處理依賴沖突問題
這篇文章主要介紹了Springboot+gateway整合依賴并處理依賴沖突問題,給大家提到了spring boot版本和spring cloud版本,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-01-01