Java中的CountDownLatch原理深入解析
1. CountDownLatch是什么?
CountDownLatch是多線程控制的一種同步工具類,它被稱為門閥、 計(jì)數(shù)器或者閉鎖。這個工具經(jīng)常用來用來協(xié)調(diào)多個線程之間的同步,或者說起到線程之間的通信(而不是用作互斥的作用)。
它允許一個或多個線程一直等待,直到其他線程執(zhí)行完后再執(zhí)行。例如,應(yīng)用程序的主線程希望在負(fù)責(zé)啟動框架服務(wù)的線程已經(jīng)啟動所有框架服務(wù)之后執(zhí)行。
當(dāng)然利用ReentrantLock + Condition也可以實(shí)現(xiàn)線程之間通信,達(dá)到同樣的效果
2. 類圖

可以看出CountDownLatch只有一個內(nèi)部類Sync,Sync繼承AbstractQueuedSynchronizer
3. 實(shí)現(xiàn)原理
3.1 示例用法
// N個線程等待主線程
class Driver { // ...
void main() throws InterruptedException {
// 開始信號
CountDownLatch startSignal = new CountDownLatch(1);
// 完成信號
CountDownLatch doneSignal = new CountDownLatch(N);
for (int i = 0; i < N; ++i) // create and start threads
// 創(chuàng)建N個工作線程并開始運(yùn)行
new Thread(new Worker(startSignal, doneSignal)).start();
// 做準(zhǔn)備工作
doSomethingElse(); // don't let run yet
// 準(zhǔn)備完畢,喚醒工作線程
startSignal.countDown(); // let all threads proceed
doSomethingElse();
// 等待工作線程結(jié)束
doneSignal.await(); // wait for all to finish
}
}
class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
// 構(gòu)造方法創(chuàng)建工作線程
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
// 工作線程進(jìn)入等待狀態(tài)
startSignal.await();
// 工作線程工作
doWork();
// 完成工作后,countDown
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
void doWork() { ... }
}// 主線程等到N個線程
class Driver2 { // ...
void main() throws InterruptedException {
// 完成信號
CountDownLatch doneSignal = new CountDownLatch(N);
// 創(chuàng)建線程執(zhí)行器
Executor e = ...
for (int i = 0; i < N; ++i) // create and start threads
// 創(chuàng)建并執(zhí)行N個準(zhǔn)備工作線程
e.execute(new WorkerRunnable(doneSignal, i));
// 主線程等到準(zhǔn)備工作線程執(zhí)行完畢
doneSignal.await(); // wait for all to finish
}
}
class WorkerRunnable implements Runnable {
private final CountDownLatch doneSignal;
private final int i;
// 構(gòu)造方法
WorkerRunnable(CountDownLatch doneSignal, int i) {
this.doneSignal = doneSignal;
this.i = i;
}
// run
public void run() {
try {
// 完成準(zhǔn)備工作
doWork(i);
// countDown
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
void doWork() { ... }
}3.2 Sync
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
// State即同步狀態(tài),在不同的實(shí)現(xiàn)中叫法不一樣,只是為了方便理解
// 構(gòu)造方法初始化計(jì)數(shù)器計(jì)數(shù)值(即同步狀態(tài)值)
Sync(int count) {
setState(count);
}
// 獲取計(jì)數(shù)值
int getCount() {
return getState();
}
// 共享模式獲取
protected int tryAcquireShared(int acquires) {
// 體現(xiàn)出只有計(jì)數(shù)值為0時,才能算獲取成功
return (getState() == 0) ? 1 : -1;
}
// 共享模式釋放
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
// 如果計(jì)數(shù)值已經(jīng)為0,直接返回false,結(jié)束自旋
if (c == 0)
return false;
// 否則計(jì)數(shù) - 1
int nextc = c-1;
// 通過自旋 + CAS方式改變剩余計(jì)數(shù)
if (compareAndSetState(c, nextc))
// 如果計(jì)數(shù)為0返回true,否則返回false,結(jié)束自旋
// 返回true表示可以喚醒等待的線程
return nextc == 0;
}
}
}通過上面代碼解析可知, CountDownLatch的實(shí)現(xiàn)方法都是在內(nèi)部類Sync里面。
3.3 CountDownLatch
public class CountDownLatch {
// 同步隊(duì)列
private final Sync sync;
// 構(gòu)造方法初始化計(jì)數(shù)值
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
// 線程等待
public void await() throws InterruptedException {
// 調(diào)用AQS的acquireSharedInterruptibly方法
// 即共享模式響應(yīng)中斷的獲取
sync.acquireSharedInterruptibly(1);
}
// 計(jì)數(shù) - 1
public void countDown() {
sync.releaseShared(1);
}
}3.3.1 await() 方法解析
// CountDownLatch
public void await() throws InterruptedException {
// 調(diào)用AQS的acquireSharedInterruptibly方法
sync.acquireSharedInterruptibly(1);
}
// 進(jìn)入AQS
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 中斷判斷
if (Thread.interrupted())
throw new InterruptedException();
// 如果沒有獲取到同步狀態(tài),或者說計(jì)數(shù)值不為0
// 則調(diào)用doAcquireSharedInterruptibly方法,進(jìn)入同步隊(duì)列
// 如果計(jì)數(shù)值為0則執(zhí)行后續(xù)業(yè)務(wù)邏輯
if (tryAcquireShared(arg) < 0)
// 該方法的解析參考文章結(jié)尾的鏈接,此處不再贅述
doAcquireSharedInterruptibly(arg);
}
// CountDownLatch 中tryAcquireShared的實(shí)現(xiàn)
protected int tryAcquireShared(int acquires) {
// 當(dāng)計(jì)數(shù)為0時,線程才不會進(jìn)入同步隊(duì)列
return (getState() == 0) ? 1 : -1;
}通過上面代碼可以知道,如果計(jì)數(shù)值為0,表示獲取成功。這就是CountDownLatch的機(jī)制,嘗試獲取latch的線程只有當(dāng)latch的值減到0的時候,才能獲取成功。
3.3.2 countDown() 方法解析
// CountDownLatch
public void countDown() {
// 調(diào)用AQS的releaseShared方法
sync.releaseShared(1);
}
// 進(jìn)入AQS
public final boolean releaseShared(int arg) {
// 共享模式釋放
if (tryReleaseShared(arg)) {
// 如果釋放成功則喚醒等待的線程,并返回true
// 具體喚醒邏輯不再贅述,參考AQS解析文章
doReleaseShared();
return true;
}
return false;
}
// CountDownLatch 中tryReleaseShared的實(shí)現(xiàn)
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
// 通過自旋 + CAS方式改變剩余計(jì)數(shù)
if (compareAndSetState(c, nextc))
// 如果計(jì)數(shù)為0返回true,否則返回false,結(jié)束自旋
// 返回true表示可以喚醒等待的線程
return nextc == 0;
}
}3.3.3 CountDownLatch如何喚醒所有調(diào)用 await() 等待的線程呢?
當(dāng)調(diào)用doReleaseShared()喚醒后繼節(jié)點(diǎn)后,回到線程被掛起的地方,也就是doAcquireSharedInterruptibly(int arg)方法中
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 將當(dāng)前線程加入同步隊(duì)列的尾部
final Node node = addWaiter(Node.SHARED);
try {
// 自旋
for (;;) {
// 獲取當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)
final Node p = node.predecessor();
// 如果前驅(qū)節(jié)點(diǎn)是頭結(jié)點(diǎn),則嘗試獲取同步狀態(tài)
if (p == head) {
// 當(dāng)前節(jié)點(diǎn)嘗試獲取同步狀態(tài)
int r = tryAcquireShared(arg);
if (r >= 0) {
// 如果獲取成功,則設(shè)置當(dāng)前節(jié)點(diǎn)為頭結(jié)點(diǎn)
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
// 如果當(dāng)前節(jié)點(diǎn)的前驅(qū)不是頭結(jié)點(diǎn),嘗試掛起當(dāng)前線程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}當(dāng)頭結(jié)點(diǎn)的后繼節(jié)點(diǎn)被喚醒后,線程將從掛起的地方醒來,繼續(xù)執(zhí)行,因?yàn)闆]有return,所以進(jìn)入下一次循環(huán)。
此時,獲取同步狀態(tài)成功,執(zhí)行setHeadAndPropagate(node, r)。
// 如果執(zhí)行這個函數(shù),那么propagate一定等于1
private void setHeadAndPropagate(Node node, int propagate) {
// 獲取頭結(jié)點(diǎn)
Node h = head;
// 因?yàn)楫?dāng)前節(jié)點(diǎn)被喚醒,設(shè)置當(dāng)前節(jié)點(diǎn)為頭結(jié)點(diǎn)
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// 獲取當(dāng)前節(jié)點(diǎn)的下一個節(jié)點(diǎn)
Node s = node.next;
// 如果下一個節(jié)點(diǎn)為null或者節(jié)點(diǎn)為shared節(jié)點(diǎn)
if (s == null || s.isShared())
doReleaseShared();
}
}
private void doReleaseShared() {
// 自旋
for (;;) {
Node h = head;
// 如果隊(duì)列存在排隊(duì)的節(jié)點(diǎn)
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// CAS設(shè)置不成功則不斷循環(huán)
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// CAS操作成功后釋放后繼節(jié)點(diǎn),并喚醒線程
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 隊(duì)列不存在排隊(duì)的節(jié)點(diǎn),直接結(jié)束自旋
if (h == head) // loop if head changed
break;
}
調(diào)用doReleaseShared方法喚醒后繼節(jié)點(diǎn),后繼節(jié)點(diǎn)又回到線程被掛起的地方,也就是doAcquireSharedInterruptibly(int arg)方法中,實(shí)現(xiàn)循環(huán)喚醒所有await的線程。
此篇文章只解析了CountDownLatch的實(shí)現(xiàn),它就是一個基于 AQS 的計(jì)數(shù)器,它內(nèi)部的方法都是圍繞 AQS 框架來實(shí)現(xiàn)的。
建議感興趣的同學(xué)先去了解AQS原理,只要明白了AQS的實(shí)現(xiàn)原理,再來看CountDownLatch、Semaphore、ReentrantLock等實(shí)現(xiàn)原理就一目了然了。
到此這篇關(guān)于Java中的CountDownLatch原理深入解析的文章就介紹到這了,更多相關(guān)CountDownLatch原理內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java實(shí)現(xiàn)Dijkstra最短路徑算法
這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)Dijkstra最短路徑算法,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2019-01-01
Mybatis中強(qiáng)大的resultMap功能介紹
這篇文章主要給大家介紹了關(guān)于Mybatis中強(qiáng)大的resultMap功能的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家學(xué)習(xí)或者使用Mybatis具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧2019-06-06
Java中構(gòu)造方法set/get和toString的使用詳解
這篇文章主要介紹了Java中構(gòu)造方法set/get和toString的使用詳解,構(gòu)造函數(shù)的最大作用就是創(chuàng)建對象時完成初始化,當(dāng)我們在new一個對象并傳入?yún)?shù)的時候,會自動調(diào)用構(gòu)造函數(shù)并完成參數(shù)的初始化,需要的朋友可以參考下2019-07-07
spring?boot實(shí)現(xiàn)圖片上傳到后臺的功能(瀏覽器可直接訪問)
這篇文章主要介紹了spring?boot實(shí)現(xiàn)圖片上傳到后臺的功能(瀏覽器可直接訪問),需要的朋友可以參考下2022-04-04
SpringBoot實(shí)現(xiàn)動態(tài)插拔的AOP的完整案例
在現(xiàn)代軟件開發(fā)中,面向切面編程(AOP) 是一種非常重要的技術(shù),能夠有效實(shí)現(xiàn)日志記錄、安全控制、性能監(jiān)控等橫切關(guān)注點(diǎn)的分離,在傳統(tǒng)的 AOP 實(shí)現(xiàn)中,切面邏輯往往是固定的,難以動態(tài)調(diào)整,本文將詳細(xì)探討如何利用 Spring Boot 實(shí)現(xiàn)動態(tài)插拔的 AOP,需要的朋友可以參考下2025-01-01
mybatis plus CU自動填充 和 軟刪除自動填充的實(shí)現(xiàn)方法
這篇文章主要介紹了mybatis plus CU自動填充 和 軟刪除自動填充的實(shí)現(xiàn)方法,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-07-07
SpringBoot使用@PostConstruct注解導(dǎo)入配置方式
這篇文章主要介紹了SpringBoot使用@PostConstruct注解導(dǎo)入配置方式,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-11-11

