欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java隊(duì)列同步器之CountDownLatch實(shí)現(xiàn)詳解

 更新時(shí)間:2023年12月08日 09:22:52   作者:GeorgiaStar  
這篇文章主要介紹了Java隊(duì)列同步器之CountDownLatch實(shí)現(xiàn)詳解,CountDownLatch是一個(gè)同步工具類,它允許一個(gè)或多個(gè)線程一直等待,直到其他線程執(zhí)行完后再執(zhí)行,例如,應(yīng)用程序的主線程希望在負(fù)責(zé)啟動(dòng)框架服務(wù)的線程已經(jīng)啟動(dòng)所有框架服務(wù)之后執(zhí)行,需要的朋友可以參考下

CountDownLatch使用場(chǎng)景

CountDownLatch是一個(gè)同步工具類,它允許一個(gè)或多個(gè)線程一直等待,直到其他線程執(zhí)行完后再執(zhí)行。

例如,應(yīng)用程序的主線程希望在負(fù)責(zé)啟動(dòng)框架服務(wù)的線程已經(jīng)啟動(dòng)所有框架服務(wù)之后執(zhí)行。CountDownLatch在多線程并發(fā)編程中充當(dāng)一個(gè)計(jì)時(shí)器的功能。

典型的使用例子如下:

public class CountDownLatchTest {
    private static CountDownLatch latch = new CountDownLatch(2);
    public static void main(String[] args) throws Exception {
        long now = System.currentTimeMillis();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 1,
            TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(5), Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy());
        executor.execute(new QuickTask(latch));
        executor.execute(new SlowTask(latch));
        latch.await();
        System.out.println("Both QuickTask and SlowTask are finished cost: " + (System.currentTimeMillis() - now));
        executor.shutdown();
    }
    static final class QuickTask implements Runnable {
        private CountDownLatch countDownLatch;
        public QuickTask(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }
        @Override
        public void run() {
            try {
                System.out.println("QuickTaskThread: " + Thread.currentThread().getName());
                Thread.sleep(3000);
                System.out.println("QuickTaskThread finished");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                if (countDownLatch != null) {
                    countDownLatch.countDown();
                }
            }
        }
    }
    static final class SlowTask implements Runnable {
        private CountDownLatch countDownLatch;
        public SlowTask(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }
        @Override
        public void run() {
            try {
                System.out.println("SlowTaskThread: " + Thread.currentThread().getName());
                Thread.sleep(5000);
                System.out.println("SlowTaskThread finished");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                if (countDownLatch != null) {
                    countDownLatch.countDown();
                }
            }
        }
    }
}

運(yùn)行結(jié)果:

CountDownLatch實(shí)現(xiàn)分析

CountDownLatch類的源碼很簡(jiǎn)單,如下:

public class CountDownLatch {
    private final Sync sync;
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
    /**
     * 這里繼承隊(duì)列同步器,并重寫tryAcquireShared(int acquires)、tryReleaseShared(int releases)方法
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
        /**
         * 設(shè)置AQS同步狀態(tài),同步狀態(tài)變量定義在抽象類AbstractQueuedSynchronizer中
         * private volatile int state;
         */
        Sync(int count) {
            setState(count);
        }
        int getCount() {
            return getState();
        }
        /**
         * 只有當(dāng)CountDownLatch里面的計(jì)數(shù)器為0時(shí),才會(huì)返回1
         * 在AbstractQueuedSynchronizer里面tryAcquireShared(int acquires)表示共享式獲取同步狀態(tài),
         * 只有返回值大于等于0的時(shí)候表示獲取成功,反之則表示獲取失敗
         */
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
        /**
         * 在AbstractQueuedSynchronizer里面tryAcquireShared(int acquires)表示共享式釋放同步狀態(tài)
         * 每成功調(diào)用一次這個(gè)方法Sync的實(shí)例的status值就會(huì)減一,當(dāng)status的值減為0時(shí),則不會(huì)再減。
         */
        protected boolean tryReleaseShared(int releases) {
         // Decrement count; signal when transition to zero
         // compareAndSetState方法執(zhí)行不成功就一直循環(huán)執(zhí)行直到成功
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
    /**
     * countDown:每調(diào)用一次countDown()方法都會(huì)使sync成員變量status減一(直到status為0),status為0,則會(huì)讓調(diào)用await()方法的地方不在阻塞,
     * 從而達(dá)到可以等待多個(gè)并發(fā)事件完成的目標(biāo) 
     * void
     */
    public void countDown() {
        sync.releaseShared(1);
    }
    public long getCount() {
        return sync.getCount();
    }
    public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }
}

CountDownLatch內(nèi)部依賴Sync實(shí)現(xiàn),而Sync繼承AQS。

CountDownLatch主要分析以下三點(diǎn):

  1. 構(gòu)造方法,創(chuàng)建CountDownLatch對(duì)象時(shí)指定count值,即線程個(gè)數(shù)
  2. countDown()方法的實(shí)現(xiàn),每執(zhí)行一個(gè)線程方法就將計(jì)數(shù)器減一,當(dāng)計(jì)數(shù)為0時(shí),啟用當(dāng)前線程
  3. await()方法的實(shí)現(xiàn),當(dāng)前線程在計(jì)數(shù)器為0之前一直等待,除非線程被中斷

countDown()方法實(shí)現(xiàn)

countDown()方法源碼如下:

public void countDown() {
    //遞減鎖重入次數(shù),當(dāng)state=0時(shí)喚醒所有阻塞線程
    sync.releaseShared(1);
}

releaseShared()方法定義在抽象類AbstractQueuedSynchronizer中,如下這里使用了模板方法模式,該方法中的tryReleaseShard()共享式釋放鎖方法交給子類去實(shí)現(xiàn),doReleaseShared()方法在抽象類中實(shí)現(xiàn)。

AbstractQueuedSynchronizer中的doReleaseShared()方法如下:

private void doReleaseShared() {
	//喚醒所有阻塞隊(duì)列里面的線程
	for (;;) {
	    Node h = head;
	    if (h != null && h != tail) {
	        int ws = h.waitStatus;
	        //節(jié)點(diǎn)是否在等待喚醒狀態(tài)
	        if (ws == Node.SIGNAL) {
	        	//修改狀態(tài)為初始
	            if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
	                continue;
	            //成功則喚醒線程
	            unparkSuccessor(h);
	        }
	        else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
	            continue;                // loop on failed CAS
	    }
	    if (h == head)                   // loop if head changed
	        break;
	}
}

CountDownLatch內(nèi)部通過共享鎖實(shí)現(xiàn)。在創(chuàng)建CountDownLatch實(shí)例時(shí),需要傳遞一個(gè)int型的參數(shù):count,該參數(shù)為計(jì)數(shù)器的初始值,也可以理解為該共享鎖可以獲取的總次數(shù)。

await()方法實(shí)現(xiàn)

await()方法源碼如下:

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

acquireSharedInterruptibly()方法定義在抽象類AbstractQueuedSynchronizer中,如下這里使用了模板方法模式,該方法中的tryAcquiredShard()共享式獲取鎖方法交給子類去實(shí)現(xiàn),doAcquiredSharedInterruptibly()方法在抽象類中實(shí)現(xiàn)。

protected int tryAcquireShared(int acquires) {
     return (getState() == 0) ? 1 : -1;
}

CountDownLatch中的內(nèi)部類Sync對(duì)AQS的tryAcquireShared方法進(jìn)行了復(fù)寫。

  • 當(dāng)前計(jì)數(shù)器的值為0的時(shí)候返回1,表示獲取鎖成功,此時(shí)acquireSharedInterruptibly()方法直接返回,線程可繼續(xù)操作。
  • 當(dāng)前計(jì)數(shù)器的值不為0的時(shí)候返回-1,表示獲取鎖失敗 ,進(jìn)入doAcquiredSharedInterruptibly()方法,進(jìn)入隊(duì)列中排隊(duì)等待。

AQS中doAcquiredSharedInterruptibly()方法實(shí)現(xiàn)如下:

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    //加入等待隊(duì)列                      
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    // 進(jìn)入 CAS 循環(huán)
    try {
        for (;;) {
            //當(dāng)一個(gè)節(jié)點(diǎn)進(jìn)入等待隊(duì)列后, 獲取此節(jié)點(diǎn)的prev節(jié)點(diǎn) 
            final Node p = node.predecessor();
            // 如果獲取到的prev是head,也就是隊(duì)列中第一個(gè)等待線程
            if (p == head) {
                //再次嘗試申請(qǐng),反應(yīng)到CountDownLatch就是查看是否還有線程需要等待(state是否為0)
                int r = tryAcquireShared(arg);
                // 如果 r >=0 說明 沒有線程需要等待了state==0
                if (r >= 0) {
                    //嘗試將第一個(gè)線程關(guān)聯(lián)的節(jié)點(diǎn)設(shè)置為head 
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            //經(jīng)過自旋tryAcquireShared后,state還不為0,就會(huì)到這里,第一次的時(shí)候,waitStatus是0,那么node的waitStatus就會(huì)被置為SIGNAL,第二次再走到這里,就會(huì)用LockSupport的park方法把當(dāng)前線程阻塞住
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

這個(gè)方法進(jìn)來執(zhí)行的第一個(gè)動(dòng)作就是嘗試將當(dāng)前線程封裝成Node加入同步隊(duì)列 ,即調(diào)用addWaiter()方法。到這里就走到了AQS的核心部分,AQS用內(nèi)部的一個(gè)Node類維護(hù)一個(gè)Node FIFO隊(duì)列。

addWaiter()方法內(nèi)部用CAS實(shí)現(xiàn)隊(duì)列出入不會(huì)發(fā)生阻塞。

LockSupport是JDK中比較底層的類,用來創(chuàng)建鎖和其他同步工具類的基本線程阻塞原語(yǔ)。

setHeadAndPropagate()方法負(fù)責(zé)將自旋等待或被LockSupport阻塞的線程喚醒。

private void setHeadAndPropagate(Node node, int propagate) {
	//備份現(xiàn)在的head
    Node h = head;  
	//搶到鎖的線程被喚醒,將這個(gè)節(jié)點(diǎn)設(shè)置為head
    setHead(node)
	// propagate 一般都會(huì)大于0 或者存在可被喚醒的線程
    if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        // 只有一個(gè)節(jié)點(diǎn)或者是共享模式 釋放所有等待線程各自嘗試搶占鎖
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

線程封裝成Node對(duì)象時(shí),waitStatus是volatile變量,初始值是0,對(duì)其賦值可能有4個(gè)取值

//當(dāng)前節(jié)點(diǎn)線程由于超時(shí)或中斷被取消,這種狀態(tài)的節(jié)點(diǎn)將會(huì)被忽略,并移出隊(duì)列
static final int CANCELLED =  1;
//表示當(dāng)前線程已被掛起,其后繼節(jié)點(diǎn)可以嘗試搶占鎖
static final int SIGNAL    = -1;
//線程在Condition條件隊(duì)列中等待,當(dāng)從同步隊(duì)列中復(fù)制到條件隊(duì)列時(shí)變?yōu)?
static final int CONDITION = -2;
//共享模式下,釋放共享資源時(shí)通知其他節(jié)點(diǎn)
static final int PROPAGATE = -3;

AQS獨(dú)占與共享小結(jié)

AQS的功能可以分為兩類:獨(dú)占與共享;如ReentrantLock利用了其獨(dú)占功能,CountDownLatch利用了其共享功能。AQS的靜態(tài)內(nèi)部類Node里有兩個(gè)變量,獨(dú)占鎖與共享鎖在創(chuàng)建自己的節(jié)點(diǎn)時(shí)(addWaiter方法)用于表明身份,它們會(huì)被賦值給Node的nextWaiter變量。

/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
final boolean isShared() {
    return nextWaiter == SHARED;
}
Node(Thread thread, Node mode) {     // Used by addWaiter
    this.nextWaiter = mode;
    this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
    this.waitStatus = waitStatus;
    this.thread = thread;
}

獨(dú)占鎖就是每次只允許一個(gè)線程執(zhí)行,當(dāng)前線程執(zhí)行完會(huì)release將同步狀態(tài)歸零,再喚醒后繼節(jié)點(diǎn)。通過自定義tryAcquire()方法來實(shí)現(xiàn)公平與非公平。

獨(dú)占式獲取及釋放資源acquire & release

//成功代表同步狀態(tài)的變更,排斥其他線程;否則加入等待隊(duì)列
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
//同步狀態(tài)歸0后,喚醒后繼節(jié)點(diǎn)
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

獨(dú)占式地釋放和獲取資源都是嚴(yán)格按照FIFO的,即通過鏈表的前后驅(qū)指針按順序來的。在獲取資源時(shí),節(jié)點(diǎn)每次都看自身前節(jié)點(diǎn)是否是頭節(jié)點(diǎn),若是就嘗試獲取資源;獲取沒成功不要緊,此時(shí)頭節(jié)點(diǎn)狀態(tài)是SIGNAL,此時(shí)該節(jié)點(diǎn)會(huì)使用LokSupport的park掛起自己,頭節(jié)點(diǎn)釋放資源后就會(huì)unpark該節(jié)點(diǎn)線程,下一輪循環(huán)中該節(jié)點(diǎn)就可以成功獲取資源啦! 如果前節(jié)點(diǎn)不是頭節(jié)點(diǎn),那就繼續(xù)自旋。

不同于ReentrantLock,CountDownLatch調(diào)用的是AQS里的acquireSharedInterruptibly()與releaseShared()方法,這兩個(gè)方法是共享式獲取與釋放資源的實(shí)現(xiàn)。CountDownLatch實(shí)現(xiàn)了自己的tryAcquireShared()與tryReleaseShared()方法。

共享式獲取及釋放資源acquireSharedInterruptibly & releaseShared

//tryAcquireShared()方法返回正數(shù),代表資源獲取成功,只要不為0嘗試獲取資源的線程就一直成功,返回0代表最后一個(gè)資源被獲取了,返回負(fù)數(shù)代表資源已經(jīng)沒有了,加入等待隊(duì)列
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
//tryReleaseShared()方法返回true,代表資源釋放成功,喚醒所有等待資源的阻塞線程
public final boolean releaseShared(int arg) {
	if (tryReleaseShared(arg)) {
	    doReleaseShared();
	    return true;
	}
	return false;
}

獨(dú)占鎖的tryAcquire()及tryRelease()返回boolean代表同步狀態(tài)更改的成功與否;tryReleaseShared()方法也返回boolean值代表資源釋放成功與否,但是AQS中定義的tryAcquireShared()方法返回的卻是int值,這正好體現(xiàn)了獨(dú)占與共享的區(qū)別。

來看tryAcquireShared()方法對(duì)返回值的注釋

* @return a negative value on failure; zero if acquisition in shared
 *         mode succeeded but no subsequent shared-mode acquire can
 *         succeed; and a positive value if acquisition in shared
 *         mode succeeded and subsequent shared-mode acquires might
 *         also succeed, in which case a subsequent waiting thread
 *         must check availability. (Support for three different
 *         return values enables this method to be used in contexts
 *         where acquires only sometimes act exclusively.)  Upon
 *         success, this object has been acquired.

翻譯一下,就是tryAcquireShared()返回大于0的正數(shù)代表當(dāng)前線程能夠正常獲取資源,其之后的線程也可能正常獲取資源,返回0代表當(dāng)前線程能夠正常獲取資源,但之后的線程將會(huì)進(jìn)入等待隊(duì)列中。獨(dú)占與共享最大不同就在各自的tryAcquire里,對(duì)于獨(dú)占來說只有true或false,只有一個(gè)線程得以執(zhí)行任務(wù);而對(duì)于共享鎖的tryAcquireShared()來說,線程數(shù)沒達(dá)到限制都可以直接執(zhí)行。 但本質(zhì)上都是對(duì)AQS同步狀態(tài)的修改,一個(gè)是0與1之間,另一個(gè)允許更多而已。

到此這篇關(guān)于Java隊(duì)列同步器之CountDownLatch實(shí)現(xiàn)詳解的文章就介紹到這了,更多相關(guān)Java的CountDownLatch實(shí)現(xiàn)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 詳解JAVA SPI機(jī)制和使用方法

    詳解JAVA SPI機(jī)制和使用方法

    這篇文章主要介紹了JAVA SPI機(jī)制的相關(guān)知識(shí)以及使用示例,文中代碼非常詳細(xì),幫助大家更好的學(xué)習(xí),感興趣的朋友可以了解下
    2020-06-06
  • 關(guān)于Spring?Boot內(nèi)存泄露排查的記錄

    關(guān)于Spring?Boot內(nèi)存泄露排查的記錄

    這篇文章主要介紹了關(guān)于Spring?Boot內(nèi)存泄露排查的記錄,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-06-06
  • Maven scala和java混合打包方式

    Maven scala和java混合打包方式

    這篇文章主要介紹了Maven scala和java混合打包方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-06-06
  • 使用開源項(xiàng)目JAVAE2 進(jìn)行視頻格式轉(zhuǎn)換

    使用開源項(xiàng)目JAVAE2 進(jìn)行視頻格式轉(zhuǎn)換

    這篇文章主要介紹了使用開源項(xiàng)目JAVAE 進(jìn)行視頻格式轉(zhuǎn)換,幫助大家更好的利用Java處理視頻,完成自身需求,感興趣的朋友可以了解下
    2020-11-11
  • SpringBoot使用slf4j日志并輸出到文件中的操作方法

    SpringBoot使用slf4j日志并輸出到文件中的操作方法

    這篇文章主要介紹了SpringBoot使用slf4j日志并輸出到文件中,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2023-08-08
  • postman測(cè)試post請(qǐng)求參數(shù)為json類型的實(shí)例講解

    postman測(cè)試post請(qǐng)求參數(shù)為json類型的實(shí)例講解

    下面小編就為大家分享一篇postman測(cè)試post請(qǐng)求參數(shù)為json類型的實(shí)例講解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2018-03-03
  • JDBC操作數(shù)據(jù)庫(kù)的增加、刪除、更新、查找實(shí)例分析

    JDBC操作數(shù)據(jù)庫(kù)的增加、刪除、更新、查找實(shí)例分析

    這篇文章主要介紹了JDBC操作數(shù)據(jù)庫(kù)的增加、刪除、更新、查找方法,以完整實(shí)例形式分析了Java基于JDBC連接數(shù)據(jù)庫(kù)及進(jìn)行數(shù)據(jù)的增刪改查等技巧,具有一定參考借鑒價(jià)值,需要的朋友可以參考下
    2015-10-10
  • JUnit5常用注解的使用

    JUnit5常用注解的使用

    注解是JUnit的標(biāo)志性技術(shù),本文就來對(duì)它的20個(gè)注解,以及元注解和組合注解進(jìn)行學(xué)習(xí),感興趣的可以了解一下
    2021-07-07
  • 詳解java 中Spring jsonp 跨域請(qǐng)求的實(shí)例

    詳解java 中Spring jsonp 跨域請(qǐng)求的實(shí)例

    這篇文章主要介紹了詳解java 中Spring jsonp 跨域請(qǐng)求的實(shí)例的相關(guān)資料,jsonp 可用于解決主流瀏覽器的跨域數(shù)據(jù)訪問的問題,需要的朋友可以參考下
    2017-08-08
  • 全面了解servlet中cookie的使用方法

    全面了解servlet中cookie的使用方法

    下面小編就為大家?guī)硪黄媪私鈙ervlet中cookie的使用方法。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2016-06-06

最新評(píng)論