欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java多線程中的CountDownLatch詳細解讀

 更新時間:2023年11月20日 10:30:54   作者:?薄情痞子?  
這篇文章主要介紹了Java多線程中的CountDownLatch詳細解讀,一個同步輔助類,在完成一組正在其他線程中執(zhí)行的操作之前,它允許一個或多個線程一直等待,用給定的計數(shù) 初始化 CountDownLatch,需要的朋友可以參考下

簡介

一個同步輔助類,在完成一組正在其他線程中執(zhí)行的操作之前,它允許一個或多個線程一直等待。用給定的計數(shù) 初始化 CountDownLatch。

由于調(diào)用了 countDown() 方法會使初始化的計數(shù)減一,所以在當前計數(shù)到達零之前,await()方法會一直受阻塞。

之后,會釋放所有等待的線程,await 的所有后續(xù)調(diào)用都將立即返回。這種現(xiàn)象只出現(xiàn)一次——計數(shù)無法被重置。 一個線程(或者多個), 等待另外N個線程完成某個事情之后才能執(zhí)行

CountDownLatch和CyclicBarrier的區(qū)別

  1. CountDownLatch的作用是允許1或N個線程等待其他線程完成執(zhí)行;而CyclicBarrier則是允許N個線程相互等待。
  2.  CountDownLatch的計數(shù)器無法被重置;CyclicBarrier的計數(shù)器可以被重置后使用,因此它被稱為是循環(huán)的barrier。 CountDownLatch類的方法

 源碼:

package java.util.concurrent;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class CountDownLatch {
  
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
 
        Sync(int count) {
            setState(count);
        }
 
        int getCount() {
            return getState();
        }
 
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
 
        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
 
    private final Sync sync;
 
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
 
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
 
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
 
    public void countDown() {
        sync.releaseShared(1);
    }
 
    public long getCount() {
        return sync.getCount();
    }
 
    public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }
}

CountDownLatch的數(shù)據(jù)結(jié)構(gòu)很簡單,它是通過共享鎖實現(xiàn)的。它包含了sync對象,sync是Sync類型。Sync是實例類,它繼承于AQS。

核心方法

下面,我們分析CountDownLatch中3個核心方法: CountDownLatch(int count), await(), countDown()。

CountDownLatch(int count)

public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

此方法創(chuàng)建了一個Sync對象,而Sync是繼承于AQS類。Sync構(gòu)造函數(shù)如下:

Sync(int count) {
  setState(count);
 }

setState()在AQS中實現(xiàn),源碼如下:

private volatile int state; //state在AQS中的聲明
 
protected final void setState(int newState) {
        state = newState;
    }

說明:在AQS中,state是一個private volatile long類型的對象。對于CountDownLatch而言,state表示的”鎖計數(shù)器“。CountDownLatch中的getCount()最終是調(diào)用AQS中的getState(),返回的state對象,即”鎖計數(shù)器“。

//CountDownLatch類的getCount方法
public long getCount() {
        return sync.getCount();
    }
//Sync類中的方法
int getCount() {
   return getState();//調(diào)用AQS中的getState()方法
 }
//AQS中的方法
protected final int getState() {
        return state;
    }

countDown()

//CountDownLatch類中的countDown()方法
public void countDown() {
        sync.releaseShared(1);//調(diào)用的是AQS中的releaseShared()方法
    }
 
//AQS類中的releaseShared()方法
public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

說明:releaseShared()的目的是讓當前線程釋放它所持有的共享鎖。 它首先會通過tryReleaseShared()去嘗試釋放共享鎖。嘗試成功,則直接返回;嘗試失敗,則通過doReleaseShared()去釋放共享鎖。

tryReleaseShared()在CountDownLatch.java中被重寫,源碼如下:

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        // 獲取“鎖計數(shù)器”的狀態(tài)
        int c = getState();
        if (c == 0)
            return false;
        // “鎖計數(shù)器”-1
        int nextc = c-1;
        // 通過CAS函數(shù)進行賦值。
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

說明:tryReleaseShared()的作用是釋放共享鎖,將“鎖計數(shù)器”的值-1。

await()、await(long timeout, TimeUnit unit)

當計數(shù)值不為零,調(diào)用await(),會一直阻塞當前線程,直到計數(shù)值為0,才會喚醒執(zhí)行。

有參的awaitawait(long timeout, TimeUnit unit)方法,設(shè)置一個阻塞的超時 時間timeout。最多阻塞這個時間

public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
 
public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

接下來說無參構(gòu)造方法await():

說明:該函數(shù)實際上是調(diào)用的AQS的acquireSharedInterruptibly(1);

AQS中的acquireSharedInterruptibly()的源碼如下:

public final void acquireSharedInterruptibly(long arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

說明:acquireSharedInterruptibly()的作用是獲取共享鎖。 如果當前線程是中斷狀態(tài),則拋出異常InterruptedException。否則,調(diào)用tryAcquireShared(arg)嘗試獲取共享鎖;嘗試成功則返回,否則就調(diào)用doAcquireSharedInterruptibly()。doAcquireSharedInterruptibly()會使當前線程一直等待,直到當前線程獲取到共享鎖(或被中斷)才返回。

tryAcquireShared()在CountDownLatch.java中被重寫,它的源碼如下:

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

說明:tryAcquireShared()的作用是嘗試獲取共享鎖。 如果"鎖計數(shù)器=0",即鎖是可獲取狀態(tài),則返回1;否則,鎖是不可獲取狀態(tài),則返回-1。

AQS中的doAcquireSharedInterruptibly(long arg)方法

private void doAcquireSharedInterruptibly(long arg)
    throws InterruptedException {
    // 創(chuàng)建"當前線程"的Node節(jié)點,且Node中記錄的鎖是"共享鎖"類型;并將該節(jié)點添加到CLH隊列末尾。
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            // 獲取上一個節(jié)點。
            // 如果上一節(jié)點是CLH隊列的表頭,則"嘗試獲取共享鎖"。
            final Node p = node.predecessor();
            if (p == head) {
                long r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // (上一節(jié)點不是CLH隊列的表頭) 當前線程一直等待,直到獲取到共享鎖。
            // 如果線程在等待過程中被中斷過,則再次中斷該線程(還原之前的中斷狀態(tài))。
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

說明:

(1) addWaiter(Node.SHARED)的作用是,創(chuàng)建”當前線程“的Node節(jié)點,且Node中記錄的鎖的類型是”共享鎖“(Node.SHARED);并將該節(jié)點添加到CLH隊列末尾。

(2) node.predecessor()的作用是,獲取上一個節(jié)點。如果上一節(jié)點是CLH隊列的表頭,則”嘗試獲取共享鎖“。

(3) shouldParkAfterFailedAcquire()的作用和它的名稱一樣,如果在嘗試獲取鎖失敗之后,線程應(yīng)該等待,則返回true;否則,返回false。

(4) 當shouldParkAfterFailedAcquire()返回ture時,則調(diào)用parkAndCheckInterrupt(),當前線程會進入等待狀態(tài),直到獲取到共享鎖才繼續(xù)運行。

總結(jié):

CountDownLatch是通過“共享鎖”實現(xiàn)的。在創(chuàng)建CountDownLatch中時,會傳遞一個int類型參數(shù)count,該參數(shù)是“鎖計數(shù)器”的初始狀態(tài),表示該“共享鎖”最多能被count給線程同時獲取。當某線程調(diào)用該CountDownLatch對象的await()方法時,該線程會等待“共享鎖”可用時,才能獲取“共享鎖”進而繼續(xù)運行。而“共享鎖”可用的條件,就是“鎖計數(shù)器”的值為0!而“鎖計數(shù)器”的初始值為count,每當一個線程調(diào)用該CountDownLatch對象的countDown()方法時,才將“鎖計數(shù)器”-1;通過這種方式,必須有count個線程調(diào)用countDown()之后,“鎖計數(shù)器”才為0,而前面提到的等待線程才能繼續(xù)運行!

以上,就是CountDownLatch的實現(xiàn)原理。

CountDownLatch示例

/**
 * 主線程等待子線程執(zhí)行完成再執(zhí)行
 */
public class CountdownLatchTest1 {
    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(3);
        final CountDownLatch latch = new CountDownLatch(3);
        for (int i = 0; i < 3; i++) {
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println("子線程" + Thread.currentThread().getName() + "開始執(zhí)行");
                        Thread.sleep((long) (Math.random() * 10000));
                        System.out.println("子線程"+Thread.currentThread().getName()+"執(zhí)行完成");
                        latch.countDown();//當前線程調(diào)用此方法,則計數(shù)減一
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            service.execute(runnable);
        }
 
        try {
            System.out.println("主線程"+Thread.currentThread().getName()+"等待子線程執(zhí)行完成...");
            latch.await();//阻塞當前線程,直到計數(shù)器的值為0
            System.out.println("主線程"+Thread.currentThread().getName()+"開始執(zhí)行...");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 執(zhí)行結(jié)果:

public class CountdownLatchTest2 {
    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();
        final CountDownLatch cdOrder = new CountDownLatch(1);
        final CountDownLatch cdAnswer = new CountDownLatch(4);
        for (int i = 0; i < 4; i++) {
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println("選手" + Thread.currentThread().getName() + "正在等待裁判發(fā)布口令");
                        cdOrder.await();
                        System.out.println("選手" + Thread.currentThread().getName() + "已接受裁判口令");
                        Thread.sleep((long) (Math.random() * 10000));
                        System.out.println("選手" + Thread.currentThread().getName() + "到達終點");
                        cdAnswer.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            service.execute(runnable);
        }
        try {
            Thread.sleep((long) (Math.random() * 10000));
            System.out.println("裁判"+Thread.currentThread().getName()+"即將發(fā)布口令");
            cdOrder.countDown();
            System.out.println("裁判"+Thread.currentThread().getName()+"已發(fā)送口令,正在等待所有選手到達終點");
            cdAnswer.await();
            System.out.println("所有選手都到達終點");
            System.out.println("裁判"+Thread.currentThread().getName()+"匯總成績排名");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        service.shutdown();
    }
}

執(zhí)行結(jié)果:

到此這篇關(guān)于Java多線程中的CountDownLatch詳細解讀的文章就介紹到這了,更多相關(guān)CountDownLatch詳細解讀內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評論