詳解Java并發(fā)工具類之CountDownLatch和CyclicBarrier
在JDK的并發(fā)包中,有幾個(gè)非常有用的并發(fā)工具類,它們分別是:CountDownLatch、CyclicBarrier、Semaphore和Exchanger。
CountDownLatch(倒計(jì)時(shí)門閂):它允許一個(gè)或多個(gè)線程等待其他線程完成操作后再繼續(xù)執(zhí)行。它通過一個(gè)計(jì)數(shù)器來實(shí)現(xiàn),線程通過調(diào)用countDown()方法來減少計(jì)數(shù)器的值,await()方法進(jìn)行阻塞等待計(jì)數(shù)器減少,當(dāng)計(jì)數(shù)器達(dá)到零時(shí),等待的線程將被釋放。CyclicBarrier(循環(huán)屏障):它允許一組線程互相等待,直到到達(dá)一個(gè)共同的屏障點(diǎn),然后繼續(xù)執(zhí)行后續(xù)操作。與CountDownLatch不同的是,CyclicBarrier的計(jì)數(shù)器可以重復(fù)使用(reset()方法),當(dāng)所有等待線程都到達(dá)屏障點(diǎn)后,計(jì)數(shù)器會(huì)重置,線程可以繼續(xù)下一次等待。Semaphore(信號(hào)量):它用于控制對(duì)某個(gè)資源的訪問權(quán)限。Semaphore維護(hù)了一組許可證,線程在訪問資源前需要獲取許可證,如果許可證不可用,則線程必須等待,直到有可用的許可證。Exchanger(交換器):它提供了一種線程間交換數(shù)據(jù)的機(jī)制。兩個(gè)線程可以通過Exchanger交換數(shù)據(jù),當(dāng)兩個(gè)線程都調(diào)用exchange()方法后,他們會(huì)彼此交換數(shù)據(jù),并繼續(xù)執(zhí)行后續(xù)操作。
CountDownLatch
Latch(門閂)設(shè)計(jì)模式
當(dāng)多個(gè)線程并發(fā)執(zhí)行任務(wù),然后只有等待所有子任務(wù)全部完成進(jìn)行匯總,程序的門閂才能打開讓程序繼續(xù)往下執(zhí)行。它指定了一個(gè)屏障,只有所有條件都滿足的時(shí)候,門閥才能打開。
比如小明和小紅相約周末去爬山,約定在人民廣場(chǎng)碰頭,然后一同出發(fā)去爬山,他們各自從家里出發(fā),無論是其中某一個(gè)先到達(dá)了人民廣場(chǎng)都要等待另一個(gè)到達(dá)之后才可以繼續(xù)進(jìn)行下去,這里的人民廣場(chǎng)碰頭就相當(dāng)于上述的門閂。

示例
還是使用上面的例子,我們模擬小明和小紅從家出發(fā),設(shè)定不同的等待時(shí)間模擬到達(dá)人民廣場(chǎng)的路程耗時(shí)。代碼如下
public static void main(String[] args) throws InterruptedException, ExecutionException {
? ? ? ?final int threadNum = 2;
? ? ? ?ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
? ? ? ?CountDownLatch countDownLatch = new CountDownLatch(threadNum);
?
? ? ? ?executorService.execute(() -> {
?
? ? ? ? ? ?System.out.println("小明開始出發(fā)");
? ? ? ? ? ?try {
? ? ? ? ? ? ? ?TimeUnit.SECONDS.sleep(2);
? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ?e.printStackTrace();
? ? ? ? ? }
? ? ? ? ? ?countDownLatch.countDown(); ?// 計(jì)數(shù)器 -1
? ? ? ? ? ?System.out.println("小明到達(dá)人民廣場(chǎng)");
?
?
? ? ? });
? ? ? ?executorService.execute(() -> {
?
? ? ? ? ? ?System.out.println("小紅開始出發(fā)");
? ? ? ? ? ?try {
? ? ? ? ? ? ? ?TimeUnit.SECONDS.sleep(3);
? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ?e.printStackTrace();
? ? ? ? ? }
? ? ? ? ? ?countDownLatch.countDown(); ?// 計(jì)數(shù)器 -1
? ? ? ? ? ?System.out.println("小紅到達(dá)人民廣場(chǎng)");
?
?
? ? ? });
? ? ? ?countDownLatch.await();
? ? ? ?System.out.println("小明和小紅都到達(dá)了人民廣場(chǎng),開始一起出發(fā)去爬山");
? ? ? ?executorService.shutdown();
?
? }
?結(jié)果
小明開始出發(fā)
小紅開始出發(fā)
小明到達(dá)人民廣場(chǎng) // 2s后打印
小紅到達(dá)人民廣場(chǎng) // 3s后打印
小明和小紅都到達(dá)了人民廣場(chǎng),開始一起出發(fā)去爬山 //3s后打印
?
Process finished with exit code 0
與Join()的區(qū)別
可能這里會(huì)有疑問,使用Thread.join()也可以實(shí)現(xiàn)相同的功能,這與使用CountDownLatch有什么區(qū)別呢?
join()的實(shí)現(xiàn)
public static void main(String[] args) throws InterruptedException {
? ?Thread thread1 = new Thread(() -> {
? ? ? ?System.out.println("小明開始出發(fā)");
? ? ? ?try {
? ? ? ? ? ?TimeUnit.SECONDS.sleep(2);
? ? ? } catch (InterruptedException e) {
? ? ? ? ? ?e.printStackTrace();
? ? ? }
? ? ? ?System.out.println("小明到達(dá)人民廣場(chǎng)");
?
? }, "thread1");
? ?Thread thread2 = new Thread(() -> {
? ? ? ?System.out.println("小紅開始出發(fā)");
? ? ? ?try {
? ? ? ? ? ?TimeUnit.SECONDS.sleep(3);
? ? ? } catch (InterruptedException e) {
? ? ? ? ? ?e.printStackTrace();
? ? ? }
? ? ? ?System.out.println("小紅到達(dá)人民廣場(chǎng)");
?
? }, "thread2");
?
? ?thread1.start();
? ?thread2.start();
? ?thread1.join();
? ?thread2.join();
? ?System.out.println("小明和小紅都到達(dá)了人民廣場(chǎng),開始一起出發(fā)去爬山");
?
}結(jié)果
小明開始出發(fā)
小紅開始出發(fā)
小明到達(dá)人民廣場(chǎng) // 2s后打印
小紅到達(dá)人民廣場(chǎng) // 3s后打印
小明和小紅都到達(dá)了人民廣場(chǎng),開始一起出發(fā)去爬山 //3s后打印
?
Process finished with exit code 0
發(fā)現(xiàn)使用join()實(shí)現(xiàn)和countDownCatch實(shí)現(xiàn)好像在代碼上的體現(xiàn)并沒有太大差異,不急,我們接著往下看
join()實(shí)現(xiàn)原理
我們點(diǎn)進(jìn)去join的jdk源碼查看它的實(shí)現(xiàn)邏輯
public final void join() throws InterruptedException {
? ?join(0);
}
?
public final synchronized void join(long millis)
? ?throws InterruptedException {
? ? ? ?long base = System.currentTimeMillis();
? ? ? ?long now = 0;
?
? ? ? ?if (millis < 0) {
? ? ? ? ? ?throw new IllegalArgumentException("timeout value is negative");
? ? ? }
// 調(diào)用join真正執(zhí)行的方法
? ? ? ?if (millis == 0) {
? ? ? ? ? ?while (isAlive()) {
? ? ? ? ? ? ? ?wait(0);
? ? ? ? ? }
? ? ? } else {
? ? ? ? ? ?while (isAlive()) {
? ? ? ? ? ? ? ?long delay = millis - now;
? ? ? ? ? ? ? ?if (delay <= 0) {
? ? ? ? ? ? ? ? ? ?break;
? ? ? ? ? ? ? }
? ? ? ? ? ? ? ?wait(delay);
? ? ? ? ? ? ? ?now = System.currentTimeMillis() - base;
? ? ? ? ? }
? ? ? }
? }
?我們看到他的核心代碼就幾行
while (isAlive()) {
? ? wait(0);
}這幾行代碼不難理解,通過不停的檢查join線程是否存活,如果線程狀態(tài)是活動(dòng)的,那么就一直等待下去(wait(0)表示永久等待),直到j(luò)oin線程中止后,線程的this.notifyAll()方法會(huì)被調(diào)用,不過調(diào)用notifyAll()方法是在JVM里 實(shí)現(xiàn)的,所以在JDK里看不到。
Join()與countDownLatch比較
回到上一個(gè)問題,join到底和countDownLatch有什么區(qū)別,countDownLatch底層使用了計(jì)數(shù)器來控制線程的喚醒,提供了更細(xì)粒度的線程控制,比如我們運(yùn)行了100個(gè)線程,但是只需要80個(gè)線程執(zhí)行結(jié)束就可以繼續(xù)下去,那么使用join就不合適了。
綜上所述 CountDownLatch相對(duì)于Join的優(yōu)勢(shì):
CountDownLatch可以等待多個(gè)線程的完成,而Join只能等待一個(gè)線程。CountDownLatch可以靈活地設(shè)置計(jì)數(shù)器的值,不僅僅限于線程數(shù),可以根據(jù)需要自由控制。CountDownLatch提供了更細(xì)粒度的線程間協(xié)作和控制,可以在任意位置進(jìn)行countDown()和await()的調(diào)用,更靈活地控制線程的流程。
CountDownLatch程序?qū)崿F(xiàn)
上面說了很多CountDownLatch的示例和與join比較,也提了一下CountDownLatch底層的原理,下面就看一下如何實(shí)現(xiàn)一個(gè)簡(jiǎn)單的CountDownLatch
程序
我們先新建一個(gè)抽象類,包含countDownLatch需要的參數(shù)和方法
public abstract class Latch {
? ?// 控制了多少線程完成后門閥才能打開
? ?protected int limit;
?
? ?// 構(gòu)造函數(shù)
? ?public Latch(int limit){
? ? ? ?this.limit = limit;
? }
?
? ?// 方法使得線程一直等待
? ?public abstract void await() throws InterruptedException;
?
? ?// 當(dāng)前任務(wù)線程完成工作之后調(diào)用該方法使得計(jì)數(shù)器減一
? ?public abstract void countDown();
?
? ?// 獲取當(dāng)前還有多少個(gè)線程沒有完成任務(wù)
? ?public abstract int getUnArrived();
?
}然后實(shí)現(xiàn)這個(gè)抽象類,并寫入具體邏輯代碼
public class CountDownLatch extends Latch {
?
? ?private final Lock lock = new ReentrantLock();
? ?private final Condition condition = lock.newCondition();
?
?
? ?public CountDownLatch(int limit) {
? ? ? ?super(limit);
? }
?
? ?@Override
? ?public void await() throws InterruptedException {
? ? ? ?lock.lock();
? ? ? ?while (limit > 0){
? ? ? ? ? ?condition.await();
? ? ? }
? ? ? ?lock.unlock();
? }
?
? ?@Override
? ?public void countDown() {
?
? ? ? ?lock.lock();
? ? ? ?if(limit < 0){
? ? ? ? ? ?throw new IllegalStateException();
? ? ? }
? ? ? ?limit--;
? ? ? ?condition.signalAll();
?
? ? ? ?lock.unlock();
?
? }
?
? ?@Override
? ?public int getUnArrived() {
? ? ? ?return limit;
? }
}測(cè)試
public class LatchDemo {
? ?public static void main(String[] args) throws InterruptedException {
? ? ? ?Latch latch = new CountDownLatch(2);
?
? ? ? ?ExecutorService executorService = Executors.newFixedThreadPool(2);
?
? ? ? ?executorService.execute(()->{
? ? ? ? ? ?System.out.println("小明開始出發(fā)");
? ? ? ? ? ?try {
? ? ? ? ? ? ? ?TimeUnit.SECONDS.sleep(2);
? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ?e.printStackTrace();
? ? ? ? ? }
? ? ? ? ? ?latch.countDown(); ?// 計(jì)數(shù)器 -1
? ? ? ? ? ?System.out.println("小明到達(dá)人民廣場(chǎng)");
? ? ? });
? ? ? ?executorService.execute(()->{
? ? ? ? ? ?System.out.println("小紅開始出發(fā)");
? ? ? ? ? ?try {
? ? ? ? ? ? ? ?TimeUnit.SECONDS.sleep(3);
? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ?e.printStackTrace();
? ? ? ? ? }
? ? ? ? ? ?latch.countDown(); ?// 計(jì)數(shù)器 -1
? ? ? ? ? ?System.out.println("小紅到達(dá)人民廣場(chǎng)");
? ? ? });
?
?
? ? ? ?latch.await();
? ? ? ?System.out.println("小明和小紅都到達(dá)了人民廣場(chǎng),開始一起出發(fā)去爬山");
?
? ? ? ?executorService.shutdown();
?
? }
}結(jié)果
小明開始出發(fā)
小紅開始出發(fā)
小明到達(dá)人民廣場(chǎng)
小紅到達(dá)人民廣場(chǎng)
小明和小紅都到達(dá)了人民廣場(chǎng),開始一起出發(fā)去爬山
?
Process finished with exit code 0
可以看到結(jié)果如前文一致,這就實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的CountDownLatch,當(dāng)然具體實(shí)現(xiàn)還有更多的細(xì)節(jié),如有需要,請(qǐng)翻閱源碼。
總結(jié)
通過上面的簡(jiǎn)單實(shí)現(xiàn),我們可以看到CountDownLatch基于計(jì)數(shù)器實(shí)現(xiàn)了多線程之間的門閥攔截,底層還是通過線程之間的通訊、鎖和計(jì)數(shù)器控制。
CyclicBarrier
除了使用CountDownLatch來實(shí)現(xiàn)多線程之間的阻塞同步,也可以使用CyclicBarrier來實(shí)現(xiàn),并且CyclicBarrier提供了比CountDownLatch更強(qiáng)大的功能。
CyclicBarrier的字面意思是可循環(huán)使用的屏障。它提供了一種同步機(jī)制,使一組線程能夠在達(dá)到屏障時(shí)被阻塞,直到最后一個(gè)線程到達(dá)屏障時(shí),屏障才會(huì)開啟,所有被阻塞的線程才能繼續(xù)執(zhí)行。
網(wǎng)上找的一張示意圖

示例
還是用之前的例子,模擬小明和小紅去爬山,代碼如下,結(jié)果就不贅述了。
public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
?
? ?CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
?
? ?ExecutorService executorService = Executors.newFixedThreadPool(2);
?
? ?executorService.execute(()->{
? ? ? ?System.out.println("小明開始出發(fā)");
? ? ? ?try {
? ? ? ? ? ?TimeUnit.SECONDS.sleep(2);
? ? ? ? ? ?System.out.println("小明到達(dá)人民廣場(chǎng)");
? ? ? ? ? ?cyclicBarrier.await(); // 計(jì)數(shù)器 -1
? ? ? } catch (Exception e) {
? ? ? ? ? ?e.printStackTrace();
? ? ? }
?
? });
? ?executorService.execute(()->{
? ? ? ?System.out.println("小紅開始出發(fā)");
? ? ? ?try {
? ? ? ? ? ?TimeUnit.SECONDS.sleep(3);
? ? ? ? ? ?System.out.println("小紅到達(dá)人民廣場(chǎng)");
? ? ? ? ? ?cyclicBarrier.await(); // 計(jì)數(shù)器 -1
? ? ? } catch (Exception e) {
? ? ? ? ? ?e.printStackTrace();
? ? ? }
? });
? ?cyclicBarrier.await();
? ?System.out.println("小明和小紅都到達(dá)了人民廣場(chǎng),開始一起出發(fā)去爬山");
?
? ?executorService.shutdown();
?
}不同的是,這里我設(shè)置了三個(gè)屏障點(diǎn) cyclicBarrier.await();,而使用CountDownLatch只用了兩個(gè)計(jì)數(shù)器減一操作 + 一個(gè)wait()方法,使用起來很相似,我們說cyclicBarrier 比 CountDownLatch 功能更強(qiáng)大,那么強(qiáng)大在哪里呢?
重置計(jì)數(shù)器和獲取狀態(tài)
重置計(jì)數(shù)器
CountDownLatch的計(jì)數(shù)器只能使用一次,而CyclicBarrier的計(jì)數(shù)器可以使用reset()方法重置。所以CyclicBarrier能處理更為復(fù)雜的業(yè)務(wù)場(chǎng)景。例如,如果計(jì)算發(fā)生錯(cuò)誤,可以重置計(jì)數(shù)器,并讓線程重新執(zhí)行一次。
public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
?
? ?final int threadNum = 3;
? ?ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
? ?CyclicBarrier cyclicBarrier = new CyclicBarrier(threadNum, () -> {
? ? ? ?System.out.println("所有線程都到達(dá)屏障");
? });
?
? ?for (int i = 0; i < threadNum; i++) {
? ? ? ?executorService.execute(() -> {
? ? ? ? ? ?System.out.println(Thread.currentThread().getName() + " 到達(dá)屏障");
? ? ? ? ? ?try {
? ? ? ? ? ? ? ?cyclicBarrier.await();
? ? ? ? ? } catch (InterruptedException | BrokenBarrierException e) {
? ? ? ? ? ? ? ?e.printStackTrace();
? ? ? ? ? }
? ? ? });
? }
?
? ?Thread.sleep(2000); // 等待一段時(shí)間,確保所有線程都到達(dá)屏障
?
? ?cyclicBarrier.reset(); // 重置屏障
?
? ?System.out.println("屏障已重置");
?
? ?for (int i = 0; i < threadNum-1; i++) {
? ? ? ?executorService.execute(() -> {
? ? ? ? ? ?try {
? ? ? ? ? ? ? ?TimeUnit.SECONDS.sleep(1);
? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ?e.printStackTrace();
? ? ? ? ? }
? ? ? ? ? ?System.out.println(Thread.currentThread().getName() + " 到達(dá)屏障");
? ? ? ? ? ?try {
? ? ? ? ? ? ? ?cyclicBarrier.await();
? ? ? ? ? } catch (InterruptedException | BrokenBarrierException e) {
? ? ? ? ? ? ? ?e.printStackTrace();
? ? ? ? ? }
? ? ? });
? }
?
? ?System.out.println("第二次進(jìn)入 循環(huán)屏障");
? ?cyclicBarrier.await();
? ?System.out.println("第二次循環(huán) 邁過屏障");
?
?
? ?executorService.shutdown();
?
?
}結(jié)果
pool-1-thread-2 到達(dá)屏障
pool-1-thread-3 到達(dá)屏障
pool-1-thread-1 到達(dá)屏障
所有線程都到達(dá)屏障
屏障已重置
第二次進(jìn)入 循環(huán)屏障
pool-1-thread-2 到達(dá)屏障
pool-1-thread-1 到達(dá)屏障
所有線程都到達(dá)屏障
第二次循環(huán) 邁過屏障
?
Process finished with exit code 0
先說一下CyclicBarrier提供的另一個(gè)構(gòu)造函數(shù)CyclicBarrier(int parties,Runnable barrierAction),用于在線程到達(dá)屏障時(shí),優(yōu)先執(zhí)行barrierAction,也就是上方代碼中用到的這幾段
CyclicBarrier cyclicBarrier = new CyclicBarrier(threadNum, () -> {
? ?System.out.println("所有線程都到達(dá)屏障");
});這里用于提示所有的線程到達(dá)屏障。緊接著是比較常規(guī)的代碼,循環(huán)構(gòu)造線程并在線程中執(zhí)行了 cyclicBarrier.await();到達(dá)屏障。重點(diǎn)是 cyclicBarrier.reset();重置屏障后,我留下一個(gè)屏障給主線程測(cè)試使用,而在新構(gòu)造的線程中停留1s, System.out.println("第二次循環(huán) 邁過屏障");打印在 System.out.println(Thread.currentThread().getName() + " 到達(dá)屏障");之后,說明屏障計(jì)數(shù)器已經(jīng)重置并且生效了。
獲取狀態(tài)
除了上述的基本功能外,CyclicBarrier也提供了以下API用來查看狀態(tài),
getNumberWaiting() // 顧名思義,獲取目前正在屏障處阻塞等待的線程數(shù)量。getParties() // 獲取屏障數(shù)量 也就是我們傳入構(gòu)造函數(shù)中的parties參數(shù)isBroken() // 查詢阻塞的線程是否被中斷
以上就是詳解Java并發(fā)工具類之CountDownLatch和CyclicBarrier的詳細(xì)內(nèi)容,更多關(guān)于Java CountDownLatch CyclicBarrier的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java 守護(hù)線程_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
Java語言機(jī)制是構(gòu)建在JVM的基礎(chǔ)之上的,意思是Java平臺(tái)把操作系統(tǒng)的底層給屏蔽起來,所以它可以在它自己的虛擬的平臺(tái)里面構(gòu)造出對(duì)自己有利的機(jī)制,而語言或者說平臺(tái)的設(shè)計(jì)者多多少少是收到Unix思想的影響,而守護(hù)線程機(jī)制又是對(duì)JVM這樣的平臺(tái)湊合,于是守護(hù)線程應(yīng)運(yùn)而生2017-05-05
線程池滿Thread?pool?exhausted排查和解決方案
這篇文章主要介紹了線程池滿Thread?pool?exhausted排查和解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-11-11
Mybatis結(jié)果集映射一對(duì)多簡(jiǎn)單入門教程
本文給大家介紹Mybatis結(jié)果集映射一對(duì)多簡(jiǎn)單入門教程,包括搭建數(shù)據(jù)庫環(huán)境的過程,idea搭建maven項(xiàng)目的代碼詳解,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),需要的朋友參考下吧2021-06-06
Spring裝配Bean之用Java代碼安裝配置bean詳解
這篇文章主要給大家介紹了關(guān)于Spring裝配Bean之用Java代碼安裝配置bean的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用spring具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧。2017-10-10
Java Map如何根據(jù)key取value以及不指定key取出所有的value
這篇文章主要介紹了Java Map如何根據(jù)key取value以及不指定key取出所有的value,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-09-09
關(guān)于springcloud報(bào)錯(cuò)報(bào)UnsatisfiedDependencyException的問題
這篇文章主要介紹了關(guān)于springcloud報(bào)錯(cuò)報(bào)UnsatisfiedDependencyException的問題,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-11-11
java 使用URLDecoder和URLEncoder對(duì)中文進(jìn)行處理
這篇文章主要介紹了java 使用URLDecoder和URLEncoder對(duì)中文進(jìn)行處理的相關(guān)資料,需要的朋友可以參考下2017-02-02

