欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java CountDownLatch的源碼硬核解析

 更新時間:2022年10月27日 17:00:28   作者:JAVA旭陽  
對于并發(fā)執(zhí)行,Java中的CountDownLatch是一個重要的類。為了更好的理解CountDownLatch這個類,本文將通過例子和源碼帶領大家深入解析這個類的原理,感興趣的可以學習一下

前言

對于并發(fā)執(zhí)行,Java中的CountDownLatch是一個重要的類,簡單理解, CountDownLatchcount down是倒數(shù)的意思,latch則是“門閂”的含義。在數(shù)量倒數(shù)到0的時候,打開“門閂”, 一起走,否則都等待在“門閂”的地方。

為了更好的理解CountDownLatch這個類,本文通過例子和源碼帶領大家深入解析這個類的原理。

介紹和使用

例子

我們先通過一個例子快速理解下CountDownLatch的妙處。

最近LOL S12賽如火如荼舉行,比如我們玩王者榮耀的時候,10個萬玩家登入游戲,每個玩家的網(wǎng)速可能不一樣,只有每個人進度條走完,才會一起來到王者峽谷,網(wǎng)速快的要等網(wǎng)速慢的。我們通過例子模擬下這個過程。

@Slf4j(topic = "a.CountDownLatchTest")
public class CountDownLatchTest {

    public static void main(String[] args) throws InterruptedException {
        // 創(chuàng)建一個倒時器,默認10個數(shù)量
        CountDownLatch latch = new CountDownLatch(10);
        ExecutorService service = Executors.newFixedThreadPool(10);
        // 設置進度數(shù)據(jù)
        String[] personProcess = new String[10];
        Random random = new Random();

        for (int i = 0; i < 10; i++) {
            int finalJ = i;
            service.submit(() -> {
                // 模擬10個人的進度條
                for (int j = 0; j <= 100; j++) {
                    // 模擬網(wǎng)速快慢,隨機生成
                    try {
                        Thread.sleep(random.nextInt(100));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    // 設置進度數(shù)據(jù)
                    personProcess[finalJ] = j + "%";
                   log.info("{}", Arrays.toString(personProcess));
                }

                // 運行結束,倒時器 - 1
                latch.countDown();
            });
        }
        // 打開"閥門"
        latch.await();
       log.info("王者峽谷到了");
        service.shutdown();
    }
}

運行結果:

概述

CountDownLatch一般用作多線程倒計時計數(shù)器,強制它們等待其他一組(CountDownLatch的初始化決定)任務執(zhí)行完成。

構造器:

public CountDownLatch(int count):設置倒數(shù)器需要倒數(shù)的數(shù)量

常用API:

  • public void await() throws InterruptedException:調用await()方法的線程會被掛起,等待直到count值為0再繼續(xù)執(zhí)行。
  • public boolean await(long timeout, TimeUnit unit) throws InterruptedException:同await(),若等待timeout時長后,count值還是沒有變?yōu)?,不再等待,繼續(xù)執(zhí)行。時間單位如下常用的毫秒、天、小時、微秒、分鐘、納秒、秒。
  • public void countDown(): count值遞減1
  • public long getCount():獲取當前count值

常見使用場景:

一個程序中有N個任務在執(zhí)行,我們可以創(chuàng)建值為N的CountDownLatch,當每個任務完成后,調用一下countDown()方法進行遞減count值,再在主線程中使用await()方法等待任務執(zhí)行完成,主線程繼續(xù)執(zhí)行。

實現(xiàn)思路

通過前面的例子和介紹我們知道CountDownLatch的大致使用流程:

  • 創(chuàng)建CountDownLatch并設置計數(shù)器值。
  • 啟動多線程并且調用CountDownLatch實例的countDown()方法。
  • 主線程調用 await() 方法,這樣主線程的操作就會在這個方法上阻塞,直到其他線程完成各自的任務,count值為0,停止阻塞,主線程繼續(xù)執(zhí)行。

不妨我們先思考下,它是怎么實現(xiàn)的呢?我們可以問自己幾個問題?

  • 如何做到可以讓主線程阻塞等待在那里?是不是可以調用LockSupport.park()方法進行阻塞。
  • 那么什么時候該阻塞呢?我們需要有個變量,比如state, 如果state大于0,就阻塞主線程。
  • 那么什么時候該喚醒呢,又如何喚醒呢?如果任務執(zhí)行完成后,我們讓state 減去1,也就是調用countDown()方法,如果發(fā)現(xiàn)state是0,那么就調用LockSupport.unpark()喚醒此前阻塞的地方,繼續(xù)執(zhí)行。

是不是很熟悉,這就是我們的AQS共享模式的實現(xiàn)原理啊,不了解AQS共享模式的可以參考本篇文章:深入淺出理解Java并發(fā)AQS的共享鎖模式

我們把思路理清楚后,直接看CountDownLatch的源碼。

源碼解析

類結構圖

以上是CountDownLatch的類結構圖,

  • SyncCountDownLatch的內部類,被成員變量sync持有。
  • Sync繼承了AbstractQueuedSynchronizer,也就是我們大名鼎鼎的AQS。

await() 實現(xiàn)原理

1.線程調用 await()會阻塞等待其他線程完成任務

// CountDownLatch#await
public void await() throws InterruptedException {
    // 調用AbstractQueuedSynchronizer的acquireSharedInterruptibly方法
    sync.acquireSharedInterruptibly(1);
}
// AbstractQueuedSynchronizer#acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    // 判斷線程是否被打斷,拋出打斷異常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 嘗試獲取共享鎖
    // 條件成立說明 state > 0,此時線程入隊阻塞等待,等待其他線程獲取共享資源
    // 條件不成立說明 state = 0,此時不需要阻塞線程,直接結束函數(shù)調用
    if (tryAcquireShared(arg) < 0)
        // 阻塞當前線程的邏輯
        doAcquireSharedInterruptibly(arg);
}
// CountDownLatch.Sync#tryAcquireShared
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

2.doAcquireSharedInterruptibly()方法是實現(xiàn)線程阻塞的核心邏輯

// AbstractQueuedSynchronizer#doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    // 將調用latch.await()方法的線程 包裝成 SHARED 類型的 node 加入到 AQS 的阻塞隊列中
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            // 獲取當前節(jié)點的前驅節(jié)點
            final Node p = node.predecessor();
            // 前驅節(jié)點時頭節(jié)點就可以嘗試獲取鎖
            if (p == head) {
                // 再次嘗試獲取鎖,獲取成功返回 1
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 獲取鎖成功,設置當前節(jié)點為 head 節(jié)點,并且向后傳播
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 阻塞在這里
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        // 阻塞線程被中斷后拋出異常,進入取消節(jié)點的邏輯
        if (failed)
            cancelAcquire(node);
    }
}

3.parkAndCheckInterrupt()方法中會進行阻塞操作

private final boolean parkAndCheckInterrupt() {
    	// 阻塞線程
        LockSupport.park(this);
        return Thread.interrupted();
    }

countDown()實現(xiàn)原理

1.任務結束調用 countDown() 完成計數(shù)器減一(釋放鎖)的操作

public void countDown() {
    sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
    // 嘗試釋放共享鎖
    if (tryReleaseShared(arg)) {
        // 釋放鎖成功開始喚醒阻塞節(jié)點
        doReleaseShared();
        return true;
    }
    return false;
}

2.調用tryReleaseShared()方法嘗試釋放鎖,true表示state等于0,去喚醒阻塞線程。

protected boolean tryReleaseShared(int releases) {
    for (;;) {
        int c = getState();
        // 條件成立說明前面【已經(jīng)有線程觸發(fā)喚醒操作】了,這里返回 false
        if (c == 0)
            return false;
        // 計數(shù)器減一
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            // 計數(shù)器為 0 時返回 true
            return nextc == 0;
    }
}

3.調用doReleaseShared()喚醒阻塞的節(jié)點

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        // 判斷隊列是否是空隊列
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 頭節(jié)點的狀態(tài)為 signal,說明后繼節(jié)點沒有被喚醒過
            if (ws == Node.SIGNAL) {
                // cas 設置頭節(jié)點的狀態(tài)為 0,設置失敗繼續(xù)自旋
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                // 喚醒后繼節(jié)點
                unparkSuccessor(h);
            }
            // 如果有其他線程已經(jīng)設置了頭節(jié)點的狀態(tài),重新設置為 PROPAGATE 傳播屬性
            else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;
        }
        // 條件不成立說明被喚醒的節(jié)點非常積極,直接將自己設置為了新的head,
        // 此時喚醒它的節(jié)點(前驅)執(zhí)行 h == head 不成立,所以不會跳出循環(huán),會繼續(xù)喚醒新的 head 節(jié)點的后繼節(jié)點
        if (h == head)
            break;
    }
}

以上就是Java CountDownLatch的源碼硬核解析的詳細內容,更多關于Java CountDownLatch的資料請關注腳本之家其它相關文章!

相關文章

最新評論