Java中的CountDownLatch原理深入解析
1. CountDownLatch是什么?
CountDownLatch是多線程控制的一種同步工具類,它被稱為門閥、 計(jì)數(shù)器或者閉鎖。這個(gè)工具經(jīng)常用來用來協(xié)調(diào)多個(gè)線程之間的同步,或者說起到線程之間的通信(而不是用作互斥的作用)。
它允許一個(gè)或多個(gè)線程一直等待,直到其他線程執(zhí)行完后再執(zhí)行。例如,應(yīng)用程序的主線程希望在負(fù)責(zé)啟動(dòng)框架服務(wù)的線程已經(jīng)啟動(dòng)所有框架服務(wù)之后執(zhí)行。
當(dāng)然利用ReentrantLock + Condition也可以實(shí)現(xiàn)線程之間通信,達(dá)到同樣的效果
2. 類圖
可以看出CountDownLatch只有一個(gè)內(nèi)部類Sync,Sync繼承AbstractQueuedSynchronizer
3. 實(shí)現(xiàn)原理
3.1 示例用法
// N個(gè)線程等待主線程 class Driver { // ... void main() throws InterruptedException { // 開始信號(hào) CountDownLatch startSignal = new CountDownLatch(1); // 完成信號(hào) CountDownLatch doneSignal = new CountDownLatch(N); for (int i = 0; i < N; ++i) // create and start threads // 創(chuàng)建N個(gè)工作線程并開始運(yùn)行 new Thread(new Worker(startSignal, doneSignal)).start(); // 做準(zhǔn)備工作 doSomethingElse(); // don't let run yet // 準(zhǔn)備完畢,喚醒工作線程 startSignal.countDown(); // let all threads proceed doSomethingElse(); // 等待工作線程結(jié)束 doneSignal.await(); // wait for all to finish } } class Worker implements Runnable { private final CountDownLatch startSignal; private final CountDownLatch doneSignal; // 構(gòu)造方法創(chuàng)建工作線程 Worker(CountDownLatch startSignal, CountDownLatch doneSignal) { this.startSignal = startSignal; this.doneSignal = doneSignal; } public void run() { try { // 工作線程進(jìn)入等待狀態(tài) startSignal.await(); // 工作線程工作 doWork(); // 完成工作后,countDown doneSignal.countDown(); } catch (InterruptedException ex) {} // return; } void doWork() { ... } }
// 主線程等到N個(gè)線程 class Driver2 { // ... void main() throws InterruptedException { // 完成信號(hào) CountDownLatch doneSignal = new CountDownLatch(N); // 創(chuàng)建線程執(zhí)行器 Executor e = ... for (int i = 0; i < N; ++i) // create and start threads // 創(chuàng)建并執(zhí)行N個(gè)準(zhǔn)備工作線程 e.execute(new WorkerRunnable(doneSignal, i)); // 主線程等到準(zhǔn)備工作線程執(zhí)行完畢 doneSignal.await(); // wait for all to finish } } class WorkerRunnable implements Runnable { private final CountDownLatch doneSignal; private final int i; // 構(gòu)造方法 WorkerRunnable(CountDownLatch doneSignal, int i) { this.doneSignal = doneSignal; this.i = i; } // run public void run() { try { // 完成準(zhǔn)備工作 doWork(i); // countDown doneSignal.countDown(); } catch (InterruptedException ex) {} // return; } void doWork() { ... } }
3.2 Sync
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; // State即同步狀態(tài),在不同的實(shí)現(xiàn)中叫法不一樣,只是為了方便理解 // 構(gòu)造方法初始化計(jì)數(shù)器計(jì)數(shù)值(即同步狀態(tài)值) Sync(int count) { setState(count); } // 獲取計(jì)數(shù)值 int getCount() { return getState(); } // 共享模式獲取 protected int tryAcquireShared(int acquires) { // 體現(xiàn)出只有計(jì)數(shù)值為0時(shí),才能算獲取成功 return (getState() == 0) ? 1 : -1; } // 共享模式釋放 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); // 如果計(jì)數(shù)值已經(jīng)為0,直接返回false,結(jié)束自旋 if (c == 0) return false; // 否則計(jì)數(shù) - 1 int nextc = c-1; // 通過自旋 + CAS方式改變剩余計(jì)數(shù) if (compareAndSetState(c, nextc)) // 如果計(jì)數(shù)為0返回true,否則返回false,結(jié)束自旋 // 返回true表示可以喚醒等待的線程 return nextc == 0; } } }
通過上面代碼解析可知, CountDownLatch的實(shí)現(xiàn)方法都是在內(nèi)部類Sync里面。
3.3 CountDownLatch
public class CountDownLatch { // 同步隊(duì)列 private final Sync sync; // 構(gòu)造方法初始化計(jì)數(shù)值 public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } // 線程等待 public void await() throws InterruptedException { // 調(diào)用AQS的acquireSharedInterruptibly方法 // 即共享模式響應(yīng)中斷的獲取 sync.acquireSharedInterruptibly(1); } // 計(jì)數(shù) - 1 public void countDown() { sync.releaseShared(1); } }
3.3.1 await() 方法解析
// CountDownLatch public void await() throws InterruptedException { // 調(diào)用AQS的acquireSharedInterruptibly方法 sync.acquireSharedInterruptibly(1); } // 進(jìn)入AQS public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // 中斷判斷 if (Thread.interrupted()) throw new InterruptedException(); // 如果沒有獲取到同步狀態(tài),或者說計(jì)數(shù)值不為0 // 則調(diào)用doAcquireSharedInterruptibly方法,進(jìn)入同步隊(duì)列 // 如果計(jì)數(shù)值為0則執(zhí)行后續(xù)業(yè)務(wù)邏輯 if (tryAcquireShared(arg) < 0) // 該方法的解析參考文章結(jié)尾的鏈接,此處不再贅述 doAcquireSharedInterruptibly(arg); } // CountDownLatch 中tryAcquireShared的實(shí)現(xiàn) protected int tryAcquireShared(int acquires) { // 當(dāng)計(jì)數(shù)為0時(shí),線程才不會(huì)進(jìn)入同步隊(duì)列 return (getState() == 0) ? 1 : -1; }
通過上面代碼可以知道,如果計(jì)數(shù)值為0,表示獲取成功。這就是CountDownLatch的機(jī)制,嘗試獲取latch的線程只有當(dāng)latch的值減到0的時(shí)候,才能獲取成功。
3.3.2 countDown() 方法解析
// CountDownLatch public void countDown() { // 調(diào)用AQS的releaseShared方法 sync.releaseShared(1); } // 進(jìn)入AQS public final boolean releaseShared(int arg) { // 共享模式釋放 if (tryReleaseShared(arg)) { // 如果釋放成功則喚醒等待的線程,并返回true // 具體喚醒邏輯不再贅述,參考AQS解析文章 doReleaseShared(); return true; } return false; } // CountDownLatch 中tryReleaseShared的實(shí)現(xiàn) protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; // 通過自旋 + CAS方式改變剩余計(jì)數(shù) if (compareAndSetState(c, nextc)) // 如果計(jì)數(shù)為0返回true,否則返回false,結(jié)束自旋 // 返回true表示可以喚醒等待的線程 return nextc == 0; } }
3.3.3 CountDownLatch如何喚醒所有調(diào)用 await() 等待的線程呢?
當(dāng)調(diào)用doReleaseShared()喚醒后繼節(jié)點(diǎn)后,回到線程被掛起的地方,也就是doAcquireSharedInterruptibly(int arg)方法中
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 將當(dāng)前線程加入同步隊(duì)列的尾部 final Node node = addWaiter(Node.SHARED); try { // 自旋 for (;;) { // 獲取當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn) final Node p = node.predecessor(); // 如果前驅(qū)節(jié)點(diǎn)是頭結(jié)點(diǎn),則嘗試獲取同步狀態(tài) if (p == head) { // 當(dāng)前節(jié)點(diǎn)嘗試獲取同步狀態(tài) int r = tryAcquireShared(arg); if (r >= 0) { // 如果獲取成功,則設(shè)置當(dāng)前節(jié)點(diǎn)為頭結(jié)點(diǎn) setHeadAndPropagate(node, r); p.next = null; // help GC return; } } // 如果當(dāng)前節(jié)點(diǎn)的前驅(qū)不是頭結(jié)點(diǎn),嘗試掛起當(dāng)前線程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } catch (Throwable t) { cancelAcquire(node); throw t; } }
當(dāng)頭結(jié)點(diǎn)的后繼節(jié)點(diǎn)被喚醒后,線程將從掛起的地方醒來,繼續(xù)執(zhí)行,因?yàn)闆]有return,所以進(jìn)入下一次循環(huán)。
此時(shí),獲取同步狀態(tài)成功,執(zhí)行setHeadAndPropagate(node, r)。
// 如果執(zhí)行這個(gè)函數(shù),那么propagate一定等于1 private void setHeadAndPropagate(Node node, int propagate) { // 獲取頭結(jié)點(diǎn) Node h = head; // 因?yàn)楫?dāng)前節(jié)點(diǎn)被喚醒,設(shè)置當(dāng)前節(jié)點(diǎn)為頭結(jié)點(diǎn) setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { // 獲取當(dāng)前節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn) Node s = node.next; // 如果下一個(gè)節(jié)點(diǎn)為null或者節(jié)點(diǎn)為shared節(jié)點(diǎn) if (s == null || s.isShared()) doReleaseShared(); } } private void doReleaseShared() { // 自旋 for (;;) { Node h = head; // 如果隊(duì)列存在排隊(duì)的節(jié)點(diǎn) if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { // CAS設(shè)置不成功則不斷循環(huán) if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // CAS操作成功后釋放后繼節(jié)點(diǎn),并喚醒線程 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 隊(duì)列不存在排隊(duì)的節(jié)點(diǎn),直接結(jié)束自旋 if (h == head) // loop if head changed break; }
調(diào)用doReleaseShared方法喚醒后繼節(jié)點(diǎn),后繼節(jié)點(diǎn)又回到線程被掛起的地方,也就是doAcquireSharedInterruptibly(int arg)方法中,實(shí)現(xiàn)循環(huán)喚醒所有await的線程。
此篇文章只解析了CountDownLatch的實(shí)現(xiàn),它就是一個(gè)基于 AQS 的計(jì)數(shù)器,它內(nèi)部的方法都是圍繞 AQS 框架來實(shí)現(xiàn)的。
建議感興趣的同學(xué)先去了解AQS原理,只要明白了AQS的實(shí)現(xiàn)原理,再來看CountDownLatch、Semaphore、ReentrantLock等實(shí)現(xiàn)原理就一目了然了。
到此這篇關(guān)于Java中的CountDownLatch原理深入解析的文章就介紹到這了,更多相關(guān)CountDownLatch原理內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java實(shí)現(xiàn)Dijkstra最短路徑算法
這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)Dijkstra最短路徑算法,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2019-01-01Mybatis中強(qiáng)大的resultMap功能介紹
這篇文章主要給大家介紹了關(guān)于Mybatis中強(qiáng)大的resultMap功能的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用Mybatis具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧2019-06-06Java中構(gòu)造方法set/get和toString的使用詳解
這篇文章主要介紹了Java中構(gòu)造方法set/get和toString的使用詳解,構(gòu)造函數(shù)的最大作用就是創(chuàng)建對(duì)象時(shí)完成初始化,當(dāng)我們?cè)趎ew一個(gè)對(duì)象并傳入?yún)?shù)的時(shí)候,會(huì)自動(dòng)調(diào)用構(gòu)造函數(shù)并完成參數(shù)的初始化,需要的朋友可以參考下2019-07-07spring?boot實(shí)現(xiàn)圖片上傳到后臺(tái)的功能(瀏覽器可直接訪問)
這篇文章主要介紹了spring?boot實(shí)現(xiàn)圖片上傳到后臺(tái)的功能(瀏覽器可直接訪問),需要的朋友可以參考下2022-04-04SpringBoot實(shí)現(xiàn)動(dòng)態(tài)插拔的AOP的完整案例
在現(xiàn)代軟件開發(fā)中,面向切面編程(AOP) 是一種非常重要的技術(shù),能夠有效實(shí)現(xiàn)日志記錄、安全控制、性能監(jiān)控等橫切關(guān)注點(diǎn)的分離,在傳統(tǒng)的 AOP 實(shí)現(xiàn)中,切面邏輯往往是固定的,難以動(dòng)態(tài)調(diào)整,本文將詳細(xì)探討如何利用 Spring Boot 實(shí)現(xiàn)動(dòng)態(tài)插拔的 AOP,需要的朋友可以參考下2025-01-01mybatis plus CU自動(dòng)填充 和 軟刪除自動(dòng)填充的實(shí)現(xiàn)方法
這篇文章主要介紹了mybatis plus CU自動(dòng)填充 和 軟刪除自動(dòng)填充的實(shí)現(xiàn)方法,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-07-07SpringBoot使用@PostConstruct注解導(dǎo)入配置方式
這篇文章主要介紹了SpringBoot使用@PostConstruct注解導(dǎo)入配置方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11