Java concurrency之CountDownLatch原理和示例_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
CountDownLatch簡(jiǎn)介
CountDownLatch是一個(gè)同步輔助類(lèi),在完成一組正在其他線程中執(zhí)行的操作之前,它允許一個(gè)或多個(gè)線程一直等待。
CountDownLatch和CyclicBarrier的區(qū)別
(01) CountDownLatch的作用是允許1或N個(gè)線程等待其他線程完成執(zhí)行;而CyclicBarrier則是允許N個(gè)線程相互等待。
(02) CountDownLatch的計(jì)數(shù)器無(wú)法被重置;CyclicBarrier的計(jì)數(shù)器可以被重置后使用,因此它被稱(chēng)為是循環(huán)的barrier。
關(guān)于CyclicBarrier的原理,后面一章再來(lái)學(xué)習(xí)。
CountDownLatch函數(shù)列表
CountDownLatch(int count)
構(gòu)造一個(gè)用給定計(jì)數(shù)初始化的 CountDownLatch。
// 使當(dāng)前線程在鎖存器倒計(jì)數(shù)至零之前一直等待,除非線程被中斷。 void await() // 使當(dāng)前線程在鎖存器倒計(jì)數(shù)至零之前一直等待,除非線程被中斷或超出了指定的等待時(shí)間。 boolean await(long timeout, TimeUnit unit) // 遞減鎖存器的計(jì)數(shù),如果計(jì)數(shù)到達(dá)零,則釋放所有等待的線程。 void countDown() // 返回當(dāng)前計(jì)數(shù)。 long getCount() // 返回標(biāo)識(shí)此鎖存器及其狀態(tài)的字符串。 String toString()
CountDownLatch數(shù)據(jù)結(jié)構(gòu)
CountDownLatch的UML類(lèi)圖如下:
CountDownLatch的數(shù)據(jù)結(jié)構(gòu)很簡(jiǎn)單,它是通過(guò)"共享鎖"實(shí)現(xiàn)的。它包含了sync對(duì)象,sync是Sync類(lèi)型。Sync是實(shí)例類(lèi),它繼承于AQS。
1. CountDownLatch(int count)
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
說(shuō)明:該函數(shù)是創(chuàng)建一個(gè)Sync對(duì)象,而Sync是繼承于AQS類(lèi)。Sync構(gòu)造函數(shù)如下:
Sync(int count) { setState(count); }
setState()在AQS中實(shí)現(xiàn),源碼如下:
protected final void setState(long newState) { state = newState; }
說(shuō)明:在AQS中,state是一個(gè)private volatile long類(lèi)型的對(duì)象。對(duì)于CountDownLatch而言,state表示的”鎖計(jì)數(shù)器“。CountDownLatch中的getCount()最終是調(diào)用AQS中的getState(),返回的state對(duì)象,即”鎖計(jì)數(shù)器“。
2. await()
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
說(shuō)明:該函數(shù)實(shí)際上是調(diào)用的AQS的acquireSharedInterruptibly(1);
AQS中的acquireSharedInterruptibly()的源碼如下:
public final void acquireSharedInterruptibly(long arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
說(shuō)明:acquireSharedInterruptibly()的作用是獲取共享鎖。
如果當(dāng)前線程是中斷狀態(tài),則拋出異常InterruptedException。否則,調(diào)用tryAcquireShared(arg)嘗試獲取共享鎖;嘗試成功則返回,否則就調(diào)用doAcquireSharedInterruptibly()。doAcquireSharedInterruptibly()會(huì)使當(dāng)前線程一直等待,直到當(dāng)前線程獲取到共享鎖(或被中斷)才返回。
tryAcquireShared()在CountDownLatch.java中被重寫(xiě),它的源碼如下:
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
說(shuō)明:tryAcquireShared()的作用是嘗試獲取共享鎖。
如果"鎖計(jì)數(shù)器=0",即鎖是可獲取狀態(tài),則返回1;否則,鎖是不可獲取狀態(tài),則返回-1。
private void doAcquireSharedInterruptibly(long arg) throws InterruptedException { // 創(chuàng)建"當(dāng)前線程"的Node節(jié)點(diǎn),且Node中記錄的鎖是"共享鎖"類(lèi)型;并將該節(jié)點(diǎn)添加到CLH隊(duì)列末尾。 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { // 獲取上一個(gè)節(jié)點(diǎn)。 // 如果上一節(jié)點(diǎn)是CLH隊(duì)列的表頭,則"嘗試獲取共享鎖"。 final Node p = node.predecessor(); if (p == head) { long r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // (上一節(jié)點(diǎn)不是CLH隊(duì)列的表頭) 當(dāng)前線程一直等待,直到獲取到共享鎖。 // 如果線程在等待過(guò)程中被中斷過(guò),則再次中斷該線程(還原之前的中斷狀態(tài))。 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
說(shuō)明:
(01) addWaiter(Node.SHARED)的作用是,創(chuàng)建”當(dāng)前線程“的Node節(jié)點(diǎn),且Node中記錄的鎖的類(lèi)型是”共享鎖“(Node.SHARED);并將該節(jié)點(diǎn)添加到CLH隊(duì)列末尾。
(02) node.predecessor()的作用是,獲取上一個(gè)節(jié)點(diǎn)。如果上一節(jié)點(diǎn)是CLH隊(duì)列的表頭,則”嘗試獲取共享鎖“。
(03) shouldParkAfterFailedAcquire()的作用和它的名稱(chēng)一樣,如果在嘗試獲取鎖失敗之后,線程應(yīng)該等待,則返回true;否則,返回false。
(04) 當(dāng)shouldParkAfterFailedAcquire()返回ture時(shí),則調(diào)用parkAndCheckInterrupt(),當(dāng)前線程會(huì)進(jìn)入等待狀態(tài),直到獲取到共享鎖才繼續(xù)運(yùn)行。
3. countDown()
public void countDown() { sync.releaseShared(1); }
說(shuō)明:該函數(shù)實(shí)際上調(diào)用releaseShared(1)釋放共享鎖。
releaseShared()在AQS中實(shí)現(xiàn),源碼如下:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
說(shuō)明:releaseShared()的目的是讓當(dāng)前線程釋放它所持有的共享鎖。
它首先會(huì)通過(guò)tryReleaseShared()去嘗試釋放共享鎖。嘗試成功,則直接返回;嘗試失敗,則通過(guò)doReleaseShared()去釋放共享鎖。
tryReleaseShared()在CountDownLatch.java中被重寫(xiě),源碼如下:
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { // 獲取“鎖計(jì)數(shù)器”的狀態(tài) int c = getState(); if (c == 0) return false; // “鎖計(jì)數(shù)器”-1 int nextc = c-1; // 通過(guò)CAS函數(shù)進(jìn)行賦值。 if (compareAndSetState(c, nextc)) return nextc == 0; } }
說(shuō)明:tryReleaseShared()的作用是釋放共享鎖,將“鎖計(jì)數(shù)器”的值-1。
總結(jié):CountDownLatch是通過(guò)“共享鎖”實(shí)現(xiàn)的。在創(chuàng)建CountDownLatch中時(shí),會(huì)傳遞一個(gè)int類(lèi)型參數(shù)count,該參數(shù)是“鎖計(jì)數(shù)器”的初始狀態(tài),表示該“共享鎖”最多能被count給線程同時(shí)獲取。當(dāng)某線程調(diào)用該CountDownLatch對(duì)象的await()方法時(shí),該線程會(huì)等待“共享鎖”可用時(shí),才能獲取“共享鎖”進(jìn)而繼續(xù)運(yùn)行。而“共享鎖”可用的條件,就是“鎖計(jì)數(shù)器”的值為0!而“鎖計(jì)數(shù)器”的初始值為count,每當(dāng)一個(gè)線程調(diào)用該CountDownLatch對(duì)象的countDown()方法時(shí),才將“鎖計(jì)數(shù)器”-1;通過(guò)這種方式,必須有count個(gè)線程調(diào)用countDown()之后,“鎖計(jì)數(shù)器”才為0,而前面提到的等待線程才能繼續(xù)運(yùn)行!
以上,就是CountDownLatch的實(shí)現(xiàn)原理。
CountDownLatch的使用示例
下面通過(guò)CountDownLatch實(shí)現(xiàn):"主線程"等待"5個(gè)子線程"全部都完成"指定的工作(休眠1000ms)"之后,再繼續(xù)運(yùn)行。
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; public class CountDownLatchTest1 { private static int LATCH_SIZE = 5; private static CountDownLatch doneSignal; public static void main(String[] args) { try { doneSignal = new CountDownLatch(LATCH_SIZE); // 新建5個(gè)任務(wù) for(int i=0; i<LATCH_SIZE; i++) new InnerThread().start(); System.out.println("main await begin."); // "主線程"等待線程池中5個(gè)任務(wù)的完成 doneSignal.await(); System.out.println("main await finished."); } catch (InterruptedException e) { e.printStackTrace(); } } static class InnerThread extends Thread{ public void run() { try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + " sleep 1000ms."); // 將CountDownLatch的數(shù)值減1 doneSignal.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
運(yùn)行結(jié)果:
main await begin. Thread-0 sleep 1000ms. Thread-2 sleep 1000ms. Thread-1 sleep 1000ms. Thread-4 sleep 1000ms. Thread-3 sleep 1000ms. main await finished.
結(jié)果說(shuō)明:主線程通過(guò)doneSignal.await()等待其它線程將doneSignal遞減至0。其它的5個(gè)InnerThread線程,每一個(gè)都通過(guò)doneSignal.countDown()將doneSignal的值減1;當(dāng)doneSignal為0時(shí),main被喚醒后繼續(xù)執(zhí)行。
- 詳解java CountDownLatch和CyclicBarrier在內(nèi)部實(shí)現(xiàn)和場(chǎng)景上的區(qū)別
- Java線程并發(fā)工具類(lèi)CountDownLatch原理及用法
- JAVA CountDownLatch(倒計(jì)時(shí)計(jì)數(shù)器)用法實(shí)例
- 淺談java并發(fā)之計(jì)數(shù)器CountDownLatch
- java使用CountDownLatch等待多線程全部執(zhí)行完成
- Java并發(fā)系列之CountDownLatch源碼分析
- JAVA多線程CountDownLatch使用詳解
- Java CountDownLatch應(yīng)用場(chǎng)景代碼實(shí)例
相關(guān)文章
Dwr3.0純注解(純Java Code配置)配置與應(yīng)用淺析一之零配置文件化
Dwr對(duì)我來(lái)說(shuō)最重要的功能點(diǎn)就是反向Ajax調(diào)用,通俗來(lái)將就是后端可以直接調(diào)用前端的JS方法(只要在所能訪問(wèn)的范圍內(nèi)),這也就是Dwr的真正來(lái)由,當(dāng)然它也有最基本的前端直接調(diào)用后端的特性,省去了我們經(jīng)常的一般Ajax調(diào)用2016-04-04Java實(shí)現(xiàn)Excel導(dǎo)入導(dǎo)出數(shù)據(jù)庫(kù)的方法示例
這篇文章主要介紹了Java實(shí)現(xiàn)Excel導(dǎo)入導(dǎo)出數(shù)據(jù)庫(kù)的方法,結(jié)合實(shí)例形式分析了java針對(duì)Excel的讀寫(xiě)及數(shù)據(jù)庫(kù)操作相關(guān)實(shí)現(xiàn)技巧,需要的朋友可以參考下2017-08-08Jmeter post上傳文件實(shí)現(xiàn)過(guò)程詳解
這篇文章主要介紹了Jmeter post上傳文件實(shí)現(xiàn)過(guò)程詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-08-08Java實(shí)現(xiàn)5種限流算法及7種限流方式
本文主要介紹了Java實(shí)現(xiàn)5種限流算法及7種限流方式,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-08-08java并發(fā)編程專(zhuān)題(八)----(JUC)實(shí)例講解CountDownLatch
這篇文章主要介紹了java CountDownLatch的相關(guān)資料,文中示例代碼非常詳細(xì),幫助大家理解和學(xué)習(xí),感興趣的朋友可以了解下2020-07-07詳解Java多線程編程中線程的啟動(dòng)、中斷或終止操作
在Java中start和tun方法可用被用來(lái)啟動(dòng)線程,而用interrupt方法來(lái)中斷或終止線程,以下我們就來(lái)詳解Java多線程編程中線程的啟動(dòng)、中斷或終止操作2016-07-07