Java隊(duì)列同步器之CountDownLatch實(shí)現(xiàn)詳解
CountDownLatch使用場(chǎng)景
CountDownLatch是一個(gè)同步工具類,它允許一個(gè)或多個(gè)線程一直等待,直到其他線程執(zhí)行完后再執(zhí)行。
例如,應(yīng)用程序的主線程希望在負(fù)責(zé)啟動(dòng)框架服務(wù)的線程已經(jīng)啟動(dòng)所有框架服務(wù)之后執(zhí)行。CountDownLatch在多線程并發(fā)編程中充當(dāng)一個(gè)計(jì)時(shí)器的功能。
典型的使用例子如下:
public class CountDownLatchTest { private static CountDownLatch latch = new CountDownLatch(2); public static void main(String[] args) throws Exception { long now = System.currentTimeMillis(); ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 1, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(5), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); executor.execute(new QuickTask(latch)); executor.execute(new SlowTask(latch)); latch.await(); System.out.println("Both QuickTask and SlowTask are finished cost: " + (System.currentTimeMillis() - now)); executor.shutdown(); } static final class QuickTask implements Runnable { private CountDownLatch countDownLatch; public QuickTask(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } @Override public void run() { try { System.out.println("QuickTaskThread: " + Thread.currentThread().getName()); Thread.sleep(3000); System.out.println("QuickTaskThread finished"); } catch (InterruptedException e) { e.printStackTrace(); } finally { if (countDownLatch != null) { countDownLatch.countDown(); } } } } static final class SlowTask implements Runnable { private CountDownLatch countDownLatch; public SlowTask(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } @Override public void run() { try { System.out.println("SlowTaskThread: " + Thread.currentThread().getName()); Thread.sleep(5000); System.out.println("SlowTaskThread finished"); } catch (InterruptedException e) { e.printStackTrace(); } finally { if (countDownLatch != null) { countDownLatch.countDown(); } } } } }
運(yùn)行結(jié)果:
CountDownLatch實(shí)現(xiàn)分析
CountDownLatch類的源碼很簡(jiǎn)單,如下:
public class CountDownLatch { private final Sync sync; public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } /** * 這里繼承隊(duì)列同步器,并重寫tryAcquireShared(int acquires)、tryReleaseShared(int releases)方法 */ private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; /** * 設(shè)置AQS同步狀態(tài),同步狀態(tài)變量定義在抽象類AbstractQueuedSynchronizer中 * private volatile int state; */ Sync(int count) { setState(count); } int getCount() { return getState(); } /** * 只有當(dāng)CountDownLatch里面的計(jì)數(shù)器為0時(shí),才會(huì)返回1 * 在AbstractQueuedSynchronizer里面tryAcquireShared(int acquires)表示共享式獲取同步狀態(tài), * 只有返回值大于等于0的時(shí)候表示獲取成功,反之則表示獲取失敗 */ protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } /** * 在AbstractQueuedSynchronizer里面tryAcquireShared(int acquires)表示共享式釋放同步狀態(tài) * 每成功調(diào)用一次這個(gè)方法Sync的實(shí)例的status值就會(huì)減一,當(dāng)status的值減為0時(shí),則不會(huì)再減。 */ protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero // compareAndSetState方法執(zhí)行不成功就一直循環(huán)執(zhí)行直到成功 for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } /** * countDown:每調(diào)用一次countDown()方法都會(huì)使sync成員變量status減一(直到status為0),status為0,則會(huì)讓調(diào)用await()方法的地方不在阻塞, * 從而達(dá)到可以等待多個(gè)并發(fā)事件完成的目標(biāo) * void */ public void countDown() { sync.releaseShared(1); } public long getCount() { return sync.getCount(); } public String toString() { return super.toString() + "[Count = " + sync.getCount() + "]"; } }
CountDownLatch內(nèi)部依賴Sync實(shí)現(xiàn),而Sync繼承AQS。
CountDownLatch主要分析以下三點(diǎn):
- 構(gòu)造方法,創(chuàng)建CountDownLatch對(duì)象時(shí)指定count值,即線程個(gè)數(shù)
- countDown()方法的實(shí)現(xiàn),每執(zhí)行一個(gè)線程方法就將計(jì)數(shù)器減一,當(dāng)計(jì)數(shù)為0時(shí),啟用當(dāng)前線程
- await()方法的實(shí)現(xiàn),當(dāng)前線程在計(jì)數(shù)器為0之前一直等待,除非線程被中斷
countDown()方法實(shí)現(xiàn)
countDown()方法源碼如下:
public void countDown() { //遞減鎖重入次數(shù),當(dāng)state=0時(shí)喚醒所有阻塞線程 sync.releaseShared(1); }
releaseShared()方法定義在抽象類AbstractQueuedSynchronizer中,如下這里使用了模板方法模式,該方法中的tryReleaseShard()共享式釋放鎖方法交給子類去實(shí)現(xiàn),doReleaseShared()方法在抽象類中實(shí)現(xiàn)。
AbstractQueuedSynchronizer中的doReleaseShared()方法如下:
private void doReleaseShared() { //喚醒所有阻塞隊(duì)列里面的線程 for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; //節(jié)點(diǎn)是否在等待喚醒狀態(tài) if (ws == Node.SIGNAL) { //修改狀態(tài)為初始 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; //成功則喚醒線程 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
CountDownLatch內(nèi)部通過共享鎖實(shí)現(xiàn)。在創(chuàng)建CountDownLatch實(shí)例時(shí),需要傳遞一個(gè)int型的參數(shù):count,該參數(shù)為計(jì)數(shù)器的初始值,也可以理解為該共享鎖可以獲取的總次數(shù)。
await()方法實(shí)現(xiàn)
await()方法源碼如下:
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
acquireSharedInterruptibly()方法定義在抽象類AbstractQueuedSynchronizer中,如下這里使用了模板方法模式,該方法中的tryAcquiredShard()共享式獲取鎖方法交給子類去實(shí)現(xiàn),doAcquiredSharedInterruptibly()方法在抽象類中實(shí)現(xiàn)。
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
CountDownLatch中的內(nèi)部類Sync對(duì)AQS的tryAcquireShared方法進(jìn)行了復(fù)寫。
- 當(dāng)前計(jì)數(shù)器的值為0的時(shí)候返回1,表示獲取鎖成功,此時(shí)acquireSharedInterruptibly()方法直接返回,線程可繼續(xù)操作。
- 當(dāng)前計(jì)數(shù)器的值不為0的時(shí)候返回-1,表示獲取鎖失敗 ,進(jìn)入doAcquiredSharedInterruptibly()方法,進(jìn)入隊(duì)列中排隊(duì)等待。
AQS中doAcquiredSharedInterruptibly()方法實(shí)現(xiàn)如下:
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //加入等待隊(duì)列 final Node node = addWaiter(Node.SHARED); boolean failed = true; // 進(jìn)入 CAS 循環(huán) try { for (;;) { //當(dāng)一個(gè)節(jié)點(diǎn)進(jìn)入等待隊(duì)列后, 獲取此節(jié)點(diǎn)的prev節(jié)點(diǎn) final Node p = node.predecessor(); // 如果獲取到的prev是head,也就是隊(duì)列中第一個(gè)等待線程 if (p == head) { //再次嘗試申請(qǐng),反應(yīng)到CountDownLatch就是查看是否還有線程需要等待(state是否為0) int r = tryAcquireShared(arg); // 如果 r >=0 說明 沒有線程需要等待了state==0 if (r >= 0) { //嘗試將第一個(gè)線程關(guān)聯(lián)的節(jié)點(diǎn)設(shè)置為head setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //經(jīng)過自旋tryAcquireShared后,state還不為0,就會(huì)到這里,第一次的時(shí)候,waitStatus是0,那么node的waitStatus就會(huì)被置為SIGNAL,第二次再走到這里,就會(huì)用LockSupport的park方法把當(dāng)前線程阻塞住 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
這個(gè)方法進(jìn)來執(zhí)行的第一個(gè)動(dòng)作就是嘗試將當(dāng)前線程封裝成Node加入同步隊(duì)列 ,即調(diào)用addWaiter()方法。到這里就走到了AQS的核心部分,AQS用內(nèi)部的一個(gè)Node類維護(hù)一個(gè)Node FIFO隊(duì)列。
addWaiter()方法內(nèi)部用CAS實(shí)現(xiàn)隊(duì)列出入不會(huì)發(fā)生阻塞。
LockSupport是JDK中比較底層的類,用來創(chuàng)建鎖和其他同步工具類的基本線程阻塞原語(yǔ)。
setHeadAndPropagate()方法負(fù)責(zé)將自旋等待或被LockSupport阻塞的線程喚醒。
private void setHeadAndPropagate(Node node, int propagate) { //備份現(xiàn)在的head Node h = head; //搶到鎖的線程被喚醒,將這個(gè)節(jié)點(diǎn)設(shè)置為head setHead(node) // propagate 一般都會(huì)大于0 或者存在可被喚醒的線程 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; // 只有一個(gè)節(jié)點(diǎn)或者是共享模式 釋放所有等待線程各自嘗試搶占鎖 if (s == null || s.isShared()) doReleaseShared(); } }
線程封裝成Node對(duì)象時(shí),waitStatus是volatile變量,初始值是0,對(duì)其賦值可能有4個(gè)取值
//當(dāng)前節(jié)點(diǎn)線程由于超時(shí)或中斷被取消,這種狀態(tài)的節(jié)點(diǎn)將會(huì)被忽略,并移出隊(duì)列 static final int CANCELLED = 1; //表示當(dāng)前線程已被掛起,其后繼節(jié)點(diǎn)可以嘗試搶占鎖 static final int SIGNAL = -1; //線程在Condition條件隊(duì)列中等待,當(dāng)從同步隊(duì)列中復(fù)制到條件隊(duì)列時(shí)變?yōu)? static final int CONDITION = -2; //共享模式下,釋放共享資源時(shí)通知其他節(jié)點(diǎn) static final int PROPAGATE = -3;
AQS獨(dú)占與共享小結(jié)
AQS的功能可以分為兩類:獨(dú)占與共享;如ReentrantLock利用了其獨(dú)占功能,CountDownLatch利用了其共享功能。AQS的靜態(tài)內(nèi)部類Node里有兩個(gè)變量,獨(dú)占鎖與共享鎖在創(chuàng)建自己的節(jié)點(diǎn)時(shí)(addWaiter方法)用于表明身份,它們會(huì)被賦值給Node的nextWaiter變量。
/** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null; final boolean isShared() { return nextWaiter == SHARED; } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; }
獨(dú)占鎖就是每次只允許一個(gè)線程執(zhí)行,當(dāng)前線程執(zhí)行完會(huì)release將同步狀態(tài)歸零,再喚醒后繼節(jié)點(diǎn)。通過自定義tryAcquire()方法來實(shí)現(xiàn)公平與非公平。
獨(dú)占式獲取及釋放資源acquire & release
//成功代表同步狀態(tài)的變更,排斥其他線程;否則加入等待隊(duì)列 public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } //同步狀態(tài)歸0后,喚醒后繼節(jié)點(diǎn) public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
獨(dú)占式地釋放和獲取資源都是嚴(yán)格按照FIFO的,即通過鏈表的前后驅(qū)指針按順序來的。在獲取資源時(shí),節(jié)點(diǎn)每次都看自身前節(jié)點(diǎn)是否是頭節(jié)點(diǎn),若是就嘗試獲取資源;獲取沒成功不要緊,此時(shí)頭節(jié)點(diǎn)狀態(tài)是SIGNAL,此時(shí)該節(jié)點(diǎn)會(huì)使用LokSupport的park掛起自己,頭節(jié)點(diǎn)釋放資源后就會(huì)unpark該節(jié)點(diǎn)線程,下一輪循環(huán)中該節(jié)點(diǎn)就可以成功獲取資源啦! 如果前節(jié)點(diǎn)不是頭節(jié)點(diǎn),那就繼續(xù)自旋。
不同于ReentrantLock,CountDownLatch調(diào)用的是AQS里的acquireSharedInterruptibly()與releaseShared()方法,這兩個(gè)方法是共享式獲取與釋放資源的實(shí)現(xiàn)。CountDownLatch實(shí)現(xiàn)了自己的tryAcquireShared()與tryReleaseShared()方法。
共享式獲取及釋放資源acquireSharedInterruptibly & releaseShared
//tryAcquireShared()方法返回正數(shù),代表資源獲取成功,只要不為0嘗試獲取資源的線程就一直成功,返回0代表最后一個(gè)資源被獲取了,返回負(fù)數(shù)代表資源已經(jīng)沒有了,加入等待隊(duì)列 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } //tryReleaseShared()方法返回true,代表資源釋放成功,喚醒所有等待資源的阻塞線程 public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
獨(dú)占鎖的tryAcquire()及tryRelease()返回boolean代表同步狀態(tài)更改的成功與否;tryReleaseShared()方法也返回boolean值代表資源釋放成功與否,但是AQS中定義的tryAcquireShared()方法返回的卻是int值,這正好體現(xiàn)了獨(dú)占與共享的區(qū)別。
來看tryAcquireShared()方法對(duì)返回值的注釋
* @return a negative value on failure; zero if acquisition in shared * mode succeeded but no subsequent shared-mode acquire can * succeed; and a positive value if acquisition in shared * mode succeeded and subsequent shared-mode acquires might * also succeed, in which case a subsequent waiting thread * must check availability. (Support for three different * return values enables this method to be used in contexts * where acquires only sometimes act exclusively.) Upon * success, this object has been acquired.
翻譯一下,就是tryAcquireShared()返回大于0的正數(shù)代表當(dāng)前線程能夠正常獲取資源,其之后的線程也可能正常獲取資源,返回0代表當(dāng)前線程能夠正常獲取資源,但之后的線程將會(huì)進(jìn)入等待隊(duì)列中。獨(dú)占與共享最大不同就在各自的tryAcquire里,對(duì)于獨(dú)占來說只有true或false,只有一個(gè)線程得以執(zhí)行任務(wù);而對(duì)于共享鎖的tryAcquireShared()來說,線程數(shù)沒達(dá)到限制都可以直接執(zhí)行。 但本質(zhì)上都是對(duì)AQS同步狀態(tài)的修改,一個(gè)是0與1之間,另一個(gè)允許更多而已。
到此這篇關(guān)于Java隊(duì)列同步器之CountDownLatch實(shí)現(xiàn)詳解的文章就介紹到這了,更多相關(guān)Java的CountDownLatch實(shí)現(xiàn)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
關(guān)于Spring?Boot內(nèi)存泄露排查的記錄
這篇文章主要介紹了關(guān)于Spring?Boot內(nèi)存泄露排查的記錄,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-06-06使用開源項(xiàng)目JAVAE2 進(jìn)行視頻格式轉(zhuǎn)換
這篇文章主要介紹了使用開源項(xiàng)目JAVAE 進(jìn)行視頻格式轉(zhuǎn)換,幫助大家更好的利用Java處理視頻,完成自身需求,感興趣的朋友可以了解下2020-11-11SpringBoot使用slf4j日志并輸出到文件中的操作方法
這篇文章主要介紹了SpringBoot使用slf4j日志并輸出到文件中,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-08-08postman測(cè)試post請(qǐng)求參數(shù)為json類型的實(shí)例講解
下面小編就為大家分享一篇postman測(cè)試post請(qǐng)求參數(shù)為json類型的實(shí)例講解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2018-03-03JDBC操作數(shù)據(jù)庫(kù)的增加、刪除、更新、查找實(shí)例分析
這篇文章主要介紹了JDBC操作數(shù)據(jù)庫(kù)的增加、刪除、更新、查找方法,以完整實(shí)例形式分析了Java基于JDBC連接數(shù)據(jù)庫(kù)及進(jìn)行數(shù)據(jù)的增刪改查等技巧,具有一定參考借鑒價(jià)值,需要的朋友可以參考下2015-10-10詳解java 中Spring jsonp 跨域請(qǐng)求的實(shí)例
這篇文章主要介紹了詳解java 中Spring jsonp 跨域請(qǐng)求的實(shí)例的相關(guān)資料,jsonp 可用于解決主流瀏覽器的跨域數(shù)據(jù)訪問的問題,需要的朋友可以參考下2017-08-08