深度解析Java中CountDownLatch的原理
在高并發(fā)編程中,AbstractQueuedSynchronizer(簡稱AQS)抽象的隊(duì)列同步器是我們必須掌握的,AQS底層提供了二種鎖模式
- 獨(dú)占鎖:ReentrantLock就是基于獨(dú)占鎖模式實(shí)現(xiàn)的
- 共享鎖:CountDownLatch,ReadWriteLock,Semplere都是基于共享鎖模式實(shí)現(xiàn)的
接下來我們通過CountDownLatch底層實(shí)現(xiàn)原理來了解AQS共享鎖模式的實(shí)現(xiàn)原理
CountDownLatch用法
CountDownLatch一般是在需要等待多個線程全部執(zhí)行完畢之后才繼續(xù)執(zhí)行剩下的業(yè)務(wù)邏輯,舉個例子,比如你現(xiàn)在去餐廳吃飯點(diǎn)了份辣子雞。
這時候餐廳有處理雞塊的,有配置調(diào)料的,還有燒菜的等多個廚師一起協(xié)作最后才能完成一道辣子雞,而且這幾個步驟可以是一起執(zhí)行的。一個廚師在配置調(diào)料的同時,另外一個廚師正在處理雞塊,還有一個廚師正在熱油等。
但是作為顧客的我們來說,我們必須等到這幾個廚師全部執(zhí)行完畢之后我們才能吃到辣子雞
public static void main(String[] args) throws Exception{
CountDownLatch countDownLatch = new CountDownLatch(3);
new Thread(() -> {
System.out.println("處理雞塊");
try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
countDownLatch.countDown();
}).start();
new Thread(() -> {
System.out.println("配置調(diào)料");
try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
countDownLatch.countDown();
}).start();
new Thread(() -> {
System.out.println("起鍋熱油");
try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
countDownLatch.countDown();
}).start();
//會阻塞,等待所有的線程執(zhí)行結(jié)束之后才會繼續(xù)執(zhí)行剩下的邏輯
countDownLatch.await();
//執(zhí)行剩下業(yè)務(wù)邏輯
}首先我們看 countDownLatch.await(); 這段阻塞的代碼,看下底層是如何讓線程進(jìn)入阻塞等待的
進(jìn)入之后到CountDownLatch類中,然后繼續(xù)這個方法
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}此時就會進(jìn)去AQS的內(nèi)部實(shí)現(xiàn)中
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}首先我們看下 tryAcquireShared(arg) < 0 這個判斷是干嘛的,他是進(jìn)入到CountDownLatch的類中,這里判斷 state的值是否等于0,在初始化 CountDownLatch 的時候,我們將state的值初始化成了3,只有當(dāng)執(zhí)行一次 countDownLatch.countDown(); 的時候,這個值才會減1,但是此時我們的線程還沒有執(zhí)行結(jié)束,所以這個值不會等于0,那么這時候就會返回 -1
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}返回-1以后,就會執(zhí)行 doAcquireSharedInterruptibly(arg); 這個業(yè)務(wù)邏輯了
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//創(chuàng)建一個新的共享的Node節(jié)點(diǎn)
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
//嘗試判斷state是否已經(jīng)等于0了,如果是,那么主線程就不用阻塞了,
//可以繼續(xù)執(zhí)行了,以此來提高程序性能
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) //這里才是真正讓主線程阻塞的核心方法
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}我們看一下這里的 addWaiter(Node.SHARED)方法
private Node addWaiter(Node mode) {
//把當(dāng)前線程,也就是main線程封裝成一個Node,并設(shè)置成共享模式
Node node = new Node(Thread.currentThread(), mode);
//在第一次的時候,這個tail節(jié)點(diǎn)是為null的
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//初始化鏈表
enq(node);
return node;
}分析下初始化雙向鏈表邏輯
private Node enq(final Node node) {
for (;;) { //注意:這里是死循環(huán)
Node t = tail;
//第一次進(jìn)來,因?yàn)閠ail=null,所以會進(jìn)入到if里面去
if (t == null) { // Must initialize
//這里新創(chuàng)建一個空的Node節(jié)點(diǎn)
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}第一次進(jìn)來:因?yàn)榈谝淮芜M(jìn)來的時候tail=null,所以會進(jìn)入到if中去,然后創(chuàng)建一個新的空的節(jié)點(diǎn),然后將頭節(jié)點(diǎn)和尾節(jié)點(diǎn)都指向這個節(jié)點(diǎn)

然后進(jìn)入第二次循環(huán):這時候tail已經(jīng)不為空了,所以會進(jìn)入到else分支里面去,所以的操作就是將當(dāng)前線程封裝成的Node設(shè)置尾巴節(jié)點(diǎn),然后設(shè)置前置節(jié)點(diǎn)和后置節(jié)點(diǎn)的關(guān)系

現(xiàn)在回頭addWaiter()方法已經(jīng)清楚了,繼續(xù)分析剩下的邏輯
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//創(chuàng)建一個新的共享的Node節(jié)點(diǎn)
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) { //這里還是個死循環(huán)
//拿到頭節(jié)點(diǎn)
final Node p = node.predecessor();
if (p == head) {
//繼續(xù)判斷state的值是否等于0,如果已經(jīng)等于0了,那么主線程就不需要阻塞等待了,可以繼續(xù)執(zhí)行了
int r = tryAcquireShared(arg);
//如果state的值等于0,這里r=1,不等于0,r=-1
//我們假設(shè)現(xiàn)在就是不等于0,也就是其它線程還沒有執(zhí)行結(jié)束,所以不會進(jìn)入到if
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}進(jìn)入 shouldParkAfterFailedAcquire()方法
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//這里獲取Node的waitStatus,在Node初始化之后,默認(rèn)是是0,
//所以會進(jìn)入到 else 分支里面去,將Node的waitStatus的值修改成Node.SIGNAL
//但是在上一步中是一個死循環(huán),所以會再次進(jìn)入到這個方法中,這時候waitStatus的值是Node.SIGNAL
//所以會進(jìn)入到第一個if分支里面去,最后返回true
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}這時候shouldParkAfterFailedAcquire()方法返回了true,就會執(zhí)行 parkAndCheckInterrupt()方法了
private final boolean parkAndCheckInterrupt() {
//真正讓線程阻塞的核心方法
LockSupport.park(this);
return Thread.interrupted();
}當(dāng)主線程掛起之后,只有全部線程執(zhí)行結(jié)束了,才會繼續(xù)執(zhí)行,所以我們來分析下 countDownLatch.countDown();
public void countDown() {
sync.releaseShared(1);
}public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}進(jìn)入tryReleaseShared(arg)方法,是判斷state是否等于0的,
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
//第一次進(jìn)來,因?yàn)閟tate=3,所以不會進(jìn)入if,只有在初始化的時候?qū)tate設(shè)置成0,
//或者你有10個資源,但是有11個線程來獲取資源,最后一個線程進(jìn)來的時候也會等于0
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}一直到第三次進(jìn)來之后,nextc就會等于0,因?yàn)橐还矞p了三次1,也就是最后一個線程執(zhí)行到這里來了,最后返回true,返回true以后就會執(zhí)行doReleaseShared();方法了
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 核心方法,喚醒阻塞線程,這里傳入的是頭節(jié)點(diǎn)
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//拿到真正封裝了當(dāng)前線程的Node
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 執(zhí)行喚醒操作
LockSupport.unpark(s.thread);
}以上就是深度解析Java中CountDownLatch的原理的詳細(xì)內(nèi)容,更多關(guān)于Java CountDownLatch的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java并發(fā)(Runnable+Thread)實(shí)現(xiàn)硬盤文件搜索功能
這篇文章主要介紹了Java并發(fā)(Runnable+Thread)實(shí)現(xiàn)硬盤文件搜索,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-01-01
java中利用List的subList方法實(shí)現(xiàn)對List分頁(簡單易學(xué))
本篇文章主要介紹了java中l(wèi)ist數(shù)據(jù)拆分為sublist實(shí)現(xiàn)頁面分頁的簡單代碼,具有一定的參考價值,有需要的可以了解一下。2016-11-11
Java 梳理總結(jié)關(guān)于static關(guān)鍵字常見問題
static關(guān)鍵字基本概念我們可以一句話來概括:方便在沒有創(chuàng)建對象的情況下來進(jìn)行調(diào)用。也就是說:被static關(guān)鍵字修飾的不需要創(chuàng)建對象去調(diào)用,直接根據(jù)類名就可以去訪問,讓我們來了解一下你可能還不知道情況2022-04-04
java中map和對象互轉(zhuǎn)工具類的實(shí)現(xiàn)示例
這篇文章主要介紹了java中map和對象互轉(zhuǎn)工具類的實(shí)現(xiàn)示例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-08-08
MySQL?MyBatis?默認(rèn)插入當(dāng)前時間方式
這篇文章主要介紹了MySQL?MyBatis?默認(rèn)插入當(dāng)前時間方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-10-10
解決JAVA項(xiàng)目啟動卡住,無任何異常信息的問題
這篇文章主要介紹了解決JAVA項(xiàng)目啟動卡住,無任何異常信息的問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-03-03

