Java CountDownLatch的源碼硬核解析
前言
對(duì)于并發(fā)執(zhí)行,Java中的CountDownLatch是一個(gè)重要的類,簡(jiǎn)單理解, CountDownLatch中count down是倒數(shù)的意思,latch則是“門(mén)閂”的含義。在數(shù)量倒數(shù)到0的時(shí)候,打開(kāi)“門(mén)閂”, 一起走,否則都等待在“門(mén)閂”的地方。
為了更好的理解CountDownLatch這個(gè)類,本文通過(guò)例子和源碼帶領(lǐng)大家深入解析這個(gè)類的原理。
介紹和使用
例子
我們先通過(guò)一個(gè)例子快速理解下CountDownLatch的妙處。
最近LOL S12賽如火如荼舉行,比如我們玩王者榮耀的時(shí)候,10個(gè)萬(wàn)玩家登入游戲,每個(gè)玩家的網(wǎng)速可能不一樣,只有每個(gè)人進(jìn)度條走完,才會(huì)一起來(lái)到王者峽谷,網(wǎng)速快的要等網(wǎng)速慢的。我們通過(guò)例子模擬下這個(gè)過(guò)程。
@Slf4j(topic = "a.CountDownLatchTest")
public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
// 創(chuàng)建一個(gè)倒時(shí)器,默認(rèn)10個(gè)數(shù)量
CountDownLatch latch = new CountDownLatch(10);
ExecutorService service = Executors.newFixedThreadPool(10);
// 設(shè)置進(jìn)度數(shù)據(jù)
String[] personProcess = new String[10];
Random random = new Random();
for (int i = 0; i < 10; i++) {
int finalJ = i;
service.submit(() -> {
// 模擬10個(gè)人的進(jìn)度條
for (int j = 0; j <= 100; j++) {
// 模擬網(wǎng)速快慢,隨機(jī)生成
try {
Thread.sleep(random.nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
// 設(shè)置進(jìn)度數(shù)據(jù)
personProcess[finalJ] = j + "%";
log.info("{}", Arrays.toString(personProcess));
}
// 運(yùn)行結(jié)束,倒時(shí)器 - 1
latch.countDown();
});
}
// 打開(kāi)"閥門(mén)"
latch.await();
log.info("王者峽谷到了");
service.shutdown();
}
}
運(yùn)行結(jié)果:

概述
CountDownLatch一般用作多線程倒計(jì)時(shí)計(jì)數(shù)器,強(qiáng)制它們等待其他一組(CountDownLatch的初始化決定)任務(wù)執(zhí)行完成。
構(gòu)造器:
public CountDownLatch(int count):設(shè)置倒數(shù)器需要倒數(shù)的數(shù)量
常用API:
public void await() throws InterruptedException:調(diào)用await()方法的線程會(huì)被掛起,等待直到count值為0再繼續(xù)執(zhí)行。public boolean await(long timeout, TimeUnit unit) throws InterruptedException:同await(),若等待timeout時(shí)長(zhǎng)后,count值還是沒(méi)有變?yōu)?,不再等待,繼續(xù)執(zhí)行。時(shí)間單位如下常用的毫秒、天、小時(shí)、微秒、分鐘、納秒、秒。public void countDown(): count值遞減1public long getCount():獲取當(dāng)前count值
常見(jiàn)使用場(chǎng)景:
一個(gè)程序中有N個(gè)任務(wù)在執(zhí)行,我們可以創(chuàng)建值為N的CountDownLatch,當(dāng)每個(gè)任務(wù)完成后,調(diào)用一下countDown()方法進(jìn)行遞減count值,再在主線程中使用await()方法等待任務(wù)執(zhí)行完成,主線程繼續(xù)執(zhí)行。
實(shí)現(xiàn)思路
通過(guò)前面的例子和介紹我們知道CountDownLatch的大致使用流程:
- 創(chuàng)建
CountDownLatch并設(shè)置計(jì)數(shù)器值。 - 啟動(dòng)多線程并且調(diào)用
CountDownLatch實(shí)例的countDown()方法。 - 主線程調(diào)用
await()方法,這樣主線程的操作就會(huì)在這個(gè)方法上阻塞,直到其他線程完成各自的任務(wù),count值為0,停止阻塞,主線程繼續(xù)執(zhí)行。
不妨我們先思考下,它是怎么實(shí)現(xiàn)的呢?我們可以問(wèn)自己幾個(gè)問(wèn)題?
- 如何做到可以讓主線程阻塞等待在那里?是不是可以調(diào)用
LockSupport.park()方法進(jìn)行阻塞。 - 那么什么時(shí)候該阻塞呢?我們需要有個(gè)變量,比如state, 如果state大于0,就阻塞主線程。
- 那么什么時(shí)候該喚醒呢,又如何喚醒呢?如果任務(wù)執(zhí)行完成后,我們讓state 減去1,也就是調(diào)用
countDown()方法,如果發(fā)現(xiàn)state是0,那么就調(diào)用LockSupport.unpark()喚醒此前阻塞的地方,繼續(xù)執(zhí)行。
是不是很熟悉,這就是我們的AQS共享模式的實(shí)現(xiàn)原理啊,不了解AQS共享模式的可以參考本篇文章:深入淺出理解Java并發(fā)AQS的共享鎖模式
我們把思路理清楚后,直接看CountDownLatch的源碼。
源碼解析
類結(jié)構(gòu)圖

以上是CountDownLatch的類結(jié)構(gòu)圖,
Sync是CountDownLatch的內(nèi)部類,被成員變量sync持有。Sync繼承了AbstractQueuedSynchronizer,也就是我們大名鼎鼎的AQS。
await() 實(shí)現(xiàn)原理
1.線程調(diào)用 await()會(huì)阻塞等待其他線程完成任務(wù)
// CountDownLatch#await
public void await() throws InterruptedException {
// 調(diào)用AbstractQueuedSynchronizer的acquireSharedInterruptibly方法
sync.acquireSharedInterruptibly(1);
}
// AbstractQueuedSynchronizer#acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
// 判斷線程是否被打斷,拋出打斷異常
if (Thread.interrupted())
throw new InterruptedException();
// 嘗試獲取共享鎖
// 條件成立說(shuō)明 state > 0,此時(shí)線程入隊(duì)阻塞等待,等待其他線程獲取共享資源
// 條件不成立說(shuō)明 state = 0,此時(shí)不需要阻塞線程,直接結(jié)束函數(shù)調(diào)用
if (tryAcquireShared(arg) < 0)
// 阻塞當(dāng)前線程的邏輯
doAcquireSharedInterruptibly(arg);
}
// CountDownLatch.Sync#tryAcquireShared
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
2.doAcquireSharedInterruptibly()方法是實(shí)現(xiàn)線程阻塞的核心邏輯
// AbstractQueuedSynchronizer#doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
// 將調(diào)用latch.await()方法的線程 包裝成 SHARED 類型的 node 加入到 AQS 的阻塞隊(duì)列中
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 獲取當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)
final Node p = node.predecessor();
// 前驅(qū)節(jié)點(diǎn)時(shí)頭節(jié)點(diǎn)就可以嘗試獲取鎖
if (p == head) {
// 再次嘗試獲取鎖,獲取成功返回 1
int r = tryAcquireShared(arg);
if (r >= 0) {
// 獲取鎖成功,設(shè)置當(dāng)前節(jié)點(diǎn)為 head 節(jié)點(diǎn),并且向后傳播
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 阻塞在這里
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
// 阻塞線程被中斷后拋出異常,進(jìn)入取消節(jié)點(diǎn)的邏輯
if (failed)
cancelAcquire(node);
}
}
3.parkAndCheckInterrupt()方法中會(huì)進(jìn)行阻塞操作
private final boolean parkAndCheckInterrupt() {
// 阻塞線程
LockSupport.park(this);
return Thread.interrupted();
}
countDown()實(shí)現(xiàn)原理
1.任務(wù)結(jié)束調(diào)用 countDown() 完成計(jì)數(shù)器減一(釋放鎖)的操作
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
// 嘗試釋放共享鎖
if (tryReleaseShared(arg)) {
// 釋放鎖成功開(kāi)始喚醒阻塞節(jié)點(diǎn)
doReleaseShared();
return true;
}
return false;
}
2.調(diào)用tryReleaseShared()方法嘗試釋放鎖,true表示state等于0,去喚醒阻塞線程。
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
// 條件成立說(shuō)明前面【已經(jīng)有線程觸發(fā)喚醒操作】了,這里返回 false
if (c == 0)
return false;
// 計(jì)數(shù)器減一
int nextc = c-1;
if (compareAndSetState(c, nextc))
// 計(jì)數(shù)器為 0 時(shí)返回 true
return nextc == 0;
}
}
3.調(diào)用doReleaseShared()喚醒阻塞的節(jié)點(diǎn)
private void doReleaseShared() {
for (;;) {
Node h = head;
// 判斷隊(duì)列是否是空隊(duì)列
if (h != null && h != tail) {
int ws = h.waitStatus;
// 頭節(jié)點(diǎn)的狀態(tài)為 signal,說(shuō)明后繼節(jié)點(diǎn)沒(méi)有被喚醒過(guò)
if (ws == Node.SIGNAL) {
// cas 設(shè)置頭節(jié)點(diǎn)的狀態(tài)為 0,設(shè)置失敗繼續(xù)自旋
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 喚醒后繼節(jié)點(diǎn)
unparkSuccessor(h);
}
// 如果有其他線程已經(jīng)設(shè)置了頭節(jié)點(diǎn)的狀態(tài),重新設(shè)置為 PROPAGATE 傳播屬性
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
// 條件不成立說(shuō)明被喚醒的節(jié)點(diǎn)非常積極,直接將自己設(shè)置為了新的head,
// 此時(shí)喚醒它的節(jié)點(diǎn)(前驅(qū))執(zhí)行 h == head 不成立,所以不會(huì)跳出循環(huán),會(huì)繼續(xù)喚醒新的 head 節(jié)點(diǎn)的后繼節(jié)點(diǎn)
if (h == head)
break;
}
}以上就是Java CountDownLatch的源碼硬核解析的詳細(xì)內(nèi)容,更多關(guān)于Java CountDownLatch的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- Java?CountDownLatch線程同步源碼硬核解析
- Java使用CountDownLatch實(shí)現(xiàn)網(wǎng)絡(luò)同步請(qǐng)求的示例代碼
- Java CountDownLatch與CyclicBarrier及Semaphore使用教程
- Java之CountDownLatch原理全面解析
- java并發(fā)包工具CountDownLatch源碼分析
- java并發(fā)使用CountDownLatch在生產(chǎn)環(huán)境翻車(chē)剖析
- java線程并發(fā)控制同步工具CountDownLatch
- Java AQS中閉鎖CountDownLatch的使用
相關(guān)文章
JAVA加密算法- 非對(duì)稱加密算法(DH,RSA)的詳細(xì)介紹
這篇文章主要介紹了JAVA加密算法- 非對(duì)稱加密算法(DH,RSA),詳細(xì)介紹了DH,RSA的用法和示例,需要的朋友可以了解一下。2016-11-11
Java關(guān)鍵字instanceof用法及實(shí)現(xiàn)策略
instanceof 運(yùn)算符是用來(lái)在運(yùn)行時(shí)判斷對(duì)象是否是指定類及其父類的一個(gè)實(shí)例。這篇文章主要介紹了Java關(guān)鍵字instanceof用法解析,需要的朋友可以參考下2020-08-08
Java實(shí)現(xiàn)不同的類的屬性之間相互賦值
今天小編就為大家分享一篇關(guān)于Java實(shí)現(xiàn)不同的類的屬性之間相互賦值,小編覺(jué)得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧2019-03-03
Springmvc如何實(shí)現(xiàn)向前臺(tái)傳遞數(shù)據(jù)
這篇文章主要介紹了Springmvc如何實(shí)現(xiàn)向前臺(tái)傳遞數(shù)據(jù),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-07-07
springboot實(shí)現(xiàn)返回視圖而不是string的方法
這篇文章主要介紹了springboot實(shí)現(xiàn)返回視圖而不是string的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-01-01

