深度解析Java中CountDownLatch的原理
在高并發(fā)編程中,AbstractQueuedSynchronizer(簡稱AQS)抽象的隊列同步器是我們必須掌握的,AQS底層提供了二種鎖模式
- 獨占鎖:ReentrantLock就是基于獨占鎖模式實現(xiàn)的
- 共享鎖:CountDownLatch,ReadWriteLock,Semplere都是基于共享鎖模式實現(xiàn)的
接下來我們通過CountDownLatch底層實現(xiàn)原理來了解AQS共享鎖模式的實現(xiàn)原理
CountDownLatch用法
CountDownLatch一般是在需要等待多個線程全部執(zhí)行完畢之后才繼續(xù)執(zhí)行剩下的業(yè)務邏輯,舉個例子,比如你現(xiàn)在去餐廳吃飯點了份辣子雞。
這時候餐廳有處理雞塊的,有配置調(diào)料的,還有燒菜的等多個廚師一起協(xié)作最后才能完成一道辣子雞,而且這幾個步驟可以是一起執(zhí)行的。一個廚師在配置調(diào)料的同時,另外一個廚師正在處理雞塊,還有一個廚師正在熱油等。
但是作為顧客的我們來說,我們必須等到這幾個廚師全部執(zhí)行完畢之后我們才能吃到辣子雞
public static void main(String[] args) throws Exception{ CountDownLatch countDownLatch = new CountDownLatch(3); new Thread(() -> { System.out.println("處理雞塊"); try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } countDownLatch.countDown(); }).start(); new Thread(() -> { System.out.println("配置調(diào)料"); try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } countDownLatch.countDown(); }).start(); new Thread(() -> { System.out.println("起鍋熱油"); try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } countDownLatch.countDown(); }).start(); //會阻塞,等待所有的線程執(zhí)行結(jié)束之后才會繼續(xù)執(zhí)行剩下的邏輯 countDownLatch.await(); //執(zhí)行剩下業(yè)務邏輯 }
首先我們看 countDownLatch.await(); 這段阻塞的代碼,看下底層是如何讓線程進入阻塞等待的
進入之后到CountDownLatch類中,然后繼續(xù)這個方法
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
此時就會進去AQS的內(nèi)部實現(xiàn)中
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
首先我們看下 tryAcquireShared(arg) < 0 這個判斷是干嘛的,他是進入到CountDownLatch的類中,這里判斷 state的值是否等于0,在初始化 CountDownLatch 的時候,我們將state的值初始化成了3,只有當執(zhí)行一次 countDownLatch.countDown(); 的時候,這個值才會減1,但是此時我們的線程還沒有執(zhí)行結(jié)束,所以這個值不會等于0,那么這時候就會返回 -1
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
返回-1以后,就會執(zhí)行 doAcquireSharedInterruptibly(arg); 這個業(yè)務邏輯了
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //創(chuàng)建一個新的共享的Node節(jié)點 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { //嘗試判斷state是否已經(jīng)等于0了,如果是,那么主線程就不用阻塞了, //可以繼續(xù)執(zhí)行了,以此來提高程序性能 int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //這里才是真正讓主線程阻塞的核心方法 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
我們看一下這里的 addWaiter(Node.SHARED)方法
private Node addWaiter(Node mode) { //把當前線程,也就是main線程封裝成一個Node,并設(shè)置成共享模式 Node node = new Node(Thread.currentThread(), mode); //在第一次的時候,這個tail節(jié)點是為null的 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //初始化鏈表 enq(node); return node; }
分析下初始化雙向鏈表邏輯
private Node enq(final Node node) { for (;;) { //注意:這里是死循環(huán) Node t = tail; //第一次進來,因為tail=null,所以會進入到if里面去 if (t == null) { // Must initialize //這里新創(chuàng)建一個空的Node節(jié)點 if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
第一次進來:因為第一次進來的時候tail=null,所以會進入到if中去,然后創(chuàng)建一個新的空的節(jié)點,然后將頭節(jié)點和尾節(jié)點都指向這個節(jié)點
然后進入第二次循環(huán):這時候tail已經(jīng)不為空了,所以會進入到else分支里面去,所以的操作就是將當前線程封裝成的Node設(shè)置尾巴節(jié)點,然后設(shè)置前置節(jié)點和后置節(jié)點的關(guān)系
現(xiàn)在回頭addWaiter()方法已經(jīng)清楚了,繼續(xù)分析剩下的邏輯
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //創(chuàng)建一個新的共享的Node節(jié)點 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { //這里還是個死循環(huán) //拿到頭節(jié)點 final Node p = node.predecessor(); if (p == head) { //繼續(xù)判斷state的值是否等于0,如果已經(jīng)等于0了,那么主線程就不需要阻塞等待了,可以繼續(xù)執(zhí)行了 int r = tryAcquireShared(arg); //如果state的值等于0,這里r=1,不等于0,r=-1 //我們假設(shè)現(xiàn)在就是不等于0,也就是其它線程還沒有執(zhí)行結(jié)束,所以不會進入到if if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
進入 shouldParkAfterFailedAcquire()方法
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //這里獲取Node的waitStatus,在Node初始化之后,默認是是0, //所以會進入到 else 分支里面去,將Node的waitStatus的值修改成Node.SIGNAL //但是在上一步中是一個死循環(huán),所以會再次進入到這個方法中,這時候waitStatus的值是Node.SIGNAL //所以會進入到第一個if分支里面去,最后返回true int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
這時候shouldParkAfterFailedAcquire()方法返回了true,就會執(zhí)行 parkAndCheckInterrupt()方法了
private final boolean parkAndCheckInterrupt() { //真正讓線程阻塞的核心方法 LockSupport.park(this); return Thread.interrupted(); }
當主線程掛起之后,只有全部線程執(zhí)行結(jié)束了,才會繼續(xù)執(zhí)行,所以我們來分析下 countDownLatch.countDown();
public void countDown() { sync.releaseShared(1); }
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
進入tryReleaseShared(arg)方法,是判斷state是否等于0的,
protected boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); //第一次進來,因為state=3,所以不會進入if,只有在初始化的時候?qū)tate設(shè)置成0, //或者你有10個資源,但是有11個線程來獲取資源,最后一個線程進來的時候也會等于0 if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
一直到第三次進來之后,nextc就會等于0,因為一共減了三次1,也就是最后一個線程執(zhí)行到這里來了,最后返回true,返回true以后就會執(zhí)行doReleaseShared();方法了
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // 核心方法,喚醒阻塞線程,這里傳入的是頭節(jié)點 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) break; } }
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); //拿到真正封裝了當前線程的Node Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 執(zhí)行喚醒操作 LockSupport.unpark(s.thread); }
以上就是深度解析Java中CountDownLatch的原理的詳細內(nèi)容,更多關(guān)于Java CountDownLatch的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java并發(fā)(Runnable+Thread)實現(xiàn)硬盤文件搜索功能
這篇文章主要介紹了Java并發(fā)(Runnable+Thread)實現(xiàn)硬盤文件搜索,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-01-01java中利用List的subList方法實現(xiàn)對List分頁(簡單易學)
本篇文章主要介紹了java中l(wèi)ist數(shù)據(jù)拆分為sublist實現(xiàn)頁面分頁的簡單代碼,具有一定的參考價值,有需要的可以了解一下。2016-11-11Java 梳理總結(jié)關(guān)于static關(guān)鍵字常見問題
static關(guān)鍵字基本概念我們可以一句話來概括:方便在沒有創(chuàng)建對象的情況下來進行調(diào)用。也就是說:被static關(guān)鍵字修飾的不需要創(chuàng)建對象去調(diào)用,直接根據(jù)類名就可以去訪問,讓我們來了解一下你可能還不知道情況2022-04-04java中map和對象互轉(zhuǎn)工具類的實現(xiàn)示例
這篇文章主要介紹了java中map和對象互轉(zhuǎn)工具類的實現(xiàn)示例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-08-08