CountDownLatch源碼解析之countDown()
CountDownLatch 源碼解析—— countDown()
上一篇文章從源碼層面說了一下CountDownLatch 中 await() 的原理。這篇文章說一下countDown() 。
public void countDown() { //CountDownLatch
sync.releaseShared(1);
}
↓
public final boolean releaseShared(int arg) { //AQS
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
↓
protected boolean tryReleaseShared(int releases) { //CountDownLatch.Sync
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
通過構(gòu)造器 CountDownLatch end = new CountDownLatch(2); state 被設(shè)置為2,所以c == 2,nextc = 2-1,
然后通過下面這個CAS操作將state設(shè)置為1。
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
此時nextc還不為0,返回false。一直等到countDown() 方法被調(diào)用兩次,state == 0,nextc ==0,此時返回true。
進(jìn)入doReleaseShared()方法。
doReleaseShared();
↓
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
回顧一下此時的等待隊(duì)列模型。
+--------------------------+ prev +------------------+ head | waitStatus = Node.SIGNAL | <---- node(tail) | currentThread | +--------------------------+ +------------------+
此時head 不為null,也不為tail,waitStatus == Node.SIGNAL,所以進(jìn)入 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) 這個判斷。
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
↓
/**
* CAS waitStatus field of a node.
*/
private static final boolean compareAndSetWaitStatus(Node node,
int expect,
int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset,
expect, update);
}
這個CAS 操作將 state 設(shè)置為 0 ,也就是說此時Head 中的 waitStatus 是0.此時隊(duì)列模型如下所示
+----------------+ prev +------------------+ head | waitStatus = 0 | <---- node(tail) | currentThread | +----------------+ +------------------+
該方法返回true。進(jìn)入unparkSuccessor(h);
unparkSuccessor(h);
↓
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
s 就是head的后繼結(jié)點(diǎn),也就是裝有當(dāng)前線程的結(jié)點(diǎn)。s != null ,并且s.waitStatus ==0 ,所以進(jìn)入 LockSupport.unpark(s.thread);
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}
也就是unlock 被阻塞的線程。裁判被允許吹哨了!
countDown() 的原理就此就非常清晰了。
每執(zhí)行一次countDown() 方法,state 就是減1,直到state == 0,則開始釋放被阻塞在隊(duì)列中的線程,根據(jù)前驅(qū)結(jié)點(diǎn)中waitStatus的狀態(tài),釋放后續(xù)結(jié)點(diǎn)中的線程。
OK,回到上一篇文章的問題,什么時候跳出下面這個循環(huán)(await方法中的循環(huán))
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
此時state == 0,所以進(jìn)入 setHeadAndPropagate 方法。
setHeadAndPropagate(node, r);
↓
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
↓
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
這個方法將head 的后繼結(jié)點(diǎn)變?yōu)閔ead。該方法過后,又將node的next結(jié)點(diǎn)設(shè)置為null,模型變成下圖
prev +---------+ next
null <---- node(tail/head) | null | ----> null
+---------+
也就是node head tail 什么的都被置為null,等待GC回收了,這個時候return,跳出了for循環(huán),隊(duì)列被清空。
下面演示一下整個過程
setHeadAndPropagate(node, r);
+----------------+
head(tail) | waitStatus=0 |
| thread =null |
+----------------+
↓
+----------------+ +----------------+
| waitStatus=0 | prev | waitStatus=0 |
head(tail) | thread =null | <---- node | currentThread |
+----------------+ +----------------+
↓
+----------------+ +----------------+
| waitStatus=0 | prev | waitStatus=0 |
head | thread =null | <---- node(tail) | currentThread |
+----------------+ +----------------+
↓
+----------------+ +----------------+
| Node.SIGNAL | prev | waitStatus=0 |
head | thread =null | <---- node(tail) | currentThread |
+----------------+ +----------------+
↓
+----------------+ +----------------+
| waitStatus=0 | prev | waitStatus=0 |
head | thread =null | <---- node(tail) | currentThread |
+----------------+ +----------------+
↓
+----------------+
prev | waitStatus=0 | next
null <---- node(tail/head) | null | ----> null
+----------------+
CountDownLatch 的核心就是一個阻塞線程隊(duì)列,這是由鏈表構(gòu)造而成的隊(duì)列,里面包含thread 和 waitStatus,其中waitStatus說明了后繼結(jié)點(diǎn)線程狀態(tài)。
state 是一個非常重要的標(biāo)志,構(gòu)造時,設(shè)置為對應(yīng)的n值,如果n != 0,阻塞隊(duì)列將一直阻塞,除非中斷線程。
每次調(diào)用countDown() 方法,就是將state-1,而調(diào)用await() 方法就是將調(diào)用該方法的線程加入到阻塞隊(duì)列,直到state==0,才能釋放線程。
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
- 使用CountDownTimer類輕松實(shí)現(xiàn)倒計時功能
- java線程并發(fā)countdownlatch類使用示例
- 詳解Java多線程編程中CountDownLatch阻塞線程的方法
- Android中CountDownTimer倒計時器用法實(shí)例
- Java多線程編程之CountDownLatch同步工具使用實(shí)例
- Android 列表倒計時的實(shí)現(xiàn)的示例代碼(CountDownTimer)
- Android使用CountDownTimer實(shí)現(xiàn)倒數(shù)定時器效果
- Android基于CountDownTimer實(shí)現(xiàn)倒計時功能
- Android基于CountDownView的時間控件擴(kuò)展
- Java CountDownLatch完成異步回調(diào)實(shí)例詳解
相關(guān)文章
SpringBoot實(shí)現(xiàn)圖片識別文字的四種方式小結(jié)
本文主要介紹了SpringBoot實(shí)現(xiàn)圖片識別文字的四種方式,包括Tess4J,百度智能云,阿里云,騰訊云這四種,具有一定的參考價值,感興趣的可以了解一下2024-02-02
JAVA中的延遲隊(duì)列DelayQueue應(yīng)用解析
這篇文章主要介紹了JAVA中的延遲隊(duì)列DelayQueue應(yīng)用解析,DelayQueue是一個根據(jù)元素的到期時間來排序的隊(duì)列,而并非是一般的隊(duì)列那樣先進(jìn)先出,最快過期的元素排在隊(duì)首,越晚到期的元素排得越后,需要的朋友可以參考下2023-12-12
elasticsearch元數(shù)據(jù)構(gòu)建metadata及routing類源碼分析
這篇文章主要為大家介紹了elasticsearch元數(shù)據(jù)構(gòu)建metadata?routing類內(nèi)部源碼分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-04-04
通過實(shí)例了解JavaBean開發(fā)及使用過程解析
這篇文章主要介紹了通過實(shí)例了解JavaBean開發(fā)及使用過程解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-08-08

