CountDownLatch基于AQS阻塞工具用法詳解
CountDownLatch解決了什么問題
CountDownLatch是基于AQS的阻塞工具,阻塞一個或者多個線程,直到所有的線程都執(zhí)行完成。
當(dāng)一個任務(wù)運算量比較大的時候,需要拆分為各種子任務(wù),必須要所有子任務(wù)完成后才能匯總為總?cè)蝿?wù)。
使用并發(fā)模擬的時候可以使用CountDownLatch.也可以設(shè)置超時等待時間,
CountDownLatch 用法
package com.conrrentcy.juc; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class CountDownLatchExample { private static final Logger log = LoggerFactory.getLogger(CountDownLatchExample.class); //線程數(shù)量 private static final int THREAD_NUM = 10; // CountdownLatch阻塞模擬 public static void main(String[] args) throws InterruptedException { // 創(chuàng)建線程池 用于執(zhí)行線程 ExecutorService executorService = Executors.newCachedThreadPool(); //創(chuàng)建countDownLatch final CountDownLatch countDownLatch = new CountDownLatch(THREAD_NUM); long startTime = System.currentTimeMillis(); //循環(huán)創(chuàng)建線程 for (int i = 0; i < THREAD_NUM; i++) { final int a = i; executorService.execute(() -> { try { test(a); } catch (Exception e) { log.error("Exception", e); } finally { countDownLatch.countDown(); } }); } countDownLatch.await(); long endTime = System.currentTimeMillis(); log.info("執(zhí)行完畢,{}-{}",startTime,endTime); executorService.shutdown(); } private static void test(int num) throws InterruptedException { Thread.sleep(100); log.info("{}-{}", num,System.currentTimeMillis()); Thread.sleep(100); } }
阻塞所有線程執(zhí)行完成后再執(zhí)行
CountDownLatch源碼解析
CountDownLatch源碼中的方法和屬性并不多,下面我們來一一解析。
1.AQS框架以及構(gòu)造方法
//當(dāng)前對象中私有阻塞工具 private final Sync sync; // 模板方法模式重寫AQS工具 private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; // 共享阻塞AQS Sync(int count) { setState(count); } // 獲取當(dāng)前還剩多少資源可以使用 int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } //構(gòu)造方法創(chuàng)建一個鎖對象 public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
2. countDown()方法解析
該方法用于線程執(zhí)行完畢后減計統(tǒng)計數(shù)量,
// 該方法時釋放一個共享鎖。當(dāng)所有鎖都被釋放完成后主線程就能繼續(xù)執(zhí)行了。 public void countDown() { sync.releaseShared(1); }
3.await()方法解析
//攔截主線程的方法。主線程在這里等待條件達成后繼續(xù)執(zhí)行。 public void await() throws InterruptedException { //在這里阻塞線程的執(zhí)行 sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //這里判斷是否還有可以共享的資源 // 如果有則返回-1 否則返回 1,重寫AQS的方法參見(1.AQS框架以及構(gòu)造方法) if (tryAcquireShared(arg) < 0) // 有資源則運行阻塞自旋等待所有線程執(zhí)行完畢 doAcquireSharedInterruptibly(arg); // 無資源可用就讓線程繼續(xù)執(zhí)行 } // 帶延遲的減少數(shù)據(jù)攔截方法 // 返回的結(jié)果是沒有跑完全部線程就繼續(xù)執(zhí)行下一步了。 public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { //線程如果被中斷則拋出異常 if (Thread.interrupted()) throw new InterruptedException(); // 表示如果線程被執(zhí)行完了直接返回成功,如果沒有執(zhí)行完則看等待時間來決定是否要繼續(xù)執(zhí)行。 return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); }
再看doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
下面是具體的流程
CountDownLatch 總結(jié)
CountDownLatch這個類能夠使一個線程等待其他線程完成各自的工作后再執(zhí)行。 在分散計算統(tǒng)一合成結(jié)果,按某個流程加載資源的方面有著非誠好用的效果。CountDownLatch是不能夠重用的,如果需要重新計數(shù),可以考慮使用CyclicBarrier或者創(chuàng)建新的CountDownLatch實例
下一篇我們講解像蓄水池一樣功能的Semphore,更多關(guān)于CountDownLatch AQS用法的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Springboot?2.x?RabbitTemplate默認(rèn)消息持久化的原因解析
這篇文章主要介紹了Springboot?2.x?RabbitTemplate默認(rèn)消息持久化的原因解析,本文通過示例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-03-03Java中AIO、BIO、NIO應(yīng)用場景及區(qū)別
本文主要介紹了Java中AIO、BIO、NIO應(yīng)用場景及區(qū)別,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-06-06Java內(nèi)存之happens-before和重排序
在JMM(Java內(nèi)存模型)中,如果一個操作執(zhí)行的結(jié)果需要對另一個操作可見,那么這兩個操作之間必須存在happens-before關(guān)系。下面小編來簡單介紹一下2019-05-05