Java并發(fā)之AQS與自旋鎖詳解
一、概述
談到并發(fā),不得不談ReentrantLock;而談到ReentrantLock,不得不談AbstractQueuedSynchronizer(AQS)!
類如其名,抽象的隊(duì)列式的同步器,AQS定義了一套多線程訪問共享資源的同步器框架,許多同步類實(shí)現(xiàn)都依賴于它,如常用的ReentrantLock/Semaphore/CountDownLatch...。
二、框架
它維護(hù)了一個(gè)volatile int state(代表共享資源)和一個(gè)FIFO線程等待隊(duì)列(多線程爭(zhēng)用資源被阻塞時(shí)會(huì)進(jìn)入此隊(duì)列)。這里volatile是核心關(guān)鍵詞,具體volatile的語(yǔ)義,在此不述。
state的訪問方式有三種:
- getState()
- setState()
- compareAndSetState()
AQS定義兩種資源共享方式:Exclusive(獨(dú)占,只有一個(gè)線程能執(zhí)行,如ReentrantLock)和Share(共享,多個(gè)線程可同時(shí)執(zhí)行,如Semaphore/CountDownLatch)。
不同的自定義同步器爭(zhēng)用共享資源的方式也不同。
自定義同步器在實(shí)現(xiàn)時(shí)只需要實(shí)現(xiàn)共享資源state的獲取與釋放方式即可,至于具體線程等待隊(duì)列的維護(hù)(如獲取資源失敗入隊(duì)/喚醒出隊(duì)等),AQS已經(jīng)在頂層實(shí)現(xiàn)好了。
自定義同步器實(shí)現(xiàn)時(shí)主要實(shí)現(xiàn)以下幾種方法:
- isHeldExclusively():該線程是否正在獨(dú)占資源。只有用到condition才需要去實(shí)現(xiàn)它。
- tryAcquire(int):獨(dú)占方式。嘗試獲取資源,成功則返回true,失敗則返回false。
- tryRelease(int):獨(dú)占方式。嘗試釋放資源,成功則返回true,失敗則返回false。
- tryAcquireShared(int):共享方式。嘗試獲取資源。負(fù)數(shù)表示失敗;0表示成功,但沒有剩余可用資源;正數(shù)表示成功,且有剩余資源。
- tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放后允許喚醒后續(xù)等待結(jié)點(diǎn)返回true,否則返回false。
以ReentrantLock為例,state初始化為0,表示未鎖定狀態(tài)。A線程lock()時(shí),會(huì)調(diào)用tryAcquire()獨(dú)占該鎖并將state+1。此后,其他線程再tryAcquire()時(shí)就會(huì)失敗,直到A線程unlock()到state=0(即釋放鎖)為止,其它線程才有機(jī)會(huì)獲取該鎖。當(dāng)然,釋放鎖之前,A線程自己是可以重復(fù)獲取此鎖的(state會(huì)累加),這就是可重入的概念。但要注意,獲取多少次就要釋放多么次,這樣才能保證state是能回到零態(tài)的。
再以CountDownLatch以例,任務(wù)分為N個(gè)子線程去執(zhí)行,state也初始化為N(注意N要與線程個(gè)數(shù)一致)。這N個(gè)子線程是并行執(zhí)行的,每個(gè)子線程執(zhí)行完后countDown()一次,state會(huì)CAS減1。等到所有子線程都執(zhí)行完后(即state=0),會(huì)unpark()主調(diào)用線程,然后主調(diào)用線程就會(huì)從await()函數(shù)返回,繼續(xù)后余動(dòng)作。
一般來說,自定義同步器要么是獨(dú)占方法,要么是共享方式,他們也只需實(shí)現(xiàn)tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一種即可。但AQS也支持自定義同步器同時(shí)實(shí)現(xiàn)獨(dú)占和共享兩種方式,如ReentrantReadWriteLock。
三、源碼詳解
本節(jié)開始講解AQS的源碼實(shí)現(xiàn)。依照acquire-release、acquireShared-releaseShared的次序來。
3.0 結(jié)點(diǎn)狀態(tài)waitStatus
這里我們說下Node。Node結(jié)點(diǎn)是對(duì)每一個(gè)等待獲取資源的線程的封裝,其包含了需要同步的線程本身及其等待狀態(tài),如是否被阻塞、是否等待喚醒、是否已經(jīng)被取消等。變量waitStatus則表示當(dāng)前Node結(jié)點(diǎn)的等待狀態(tài),共有5種取值CANCELLED、SIGNAL、CONDITION、PROPAGATE、0。
- CANCELLED(1):表示當(dāng)前結(jié)點(diǎn)已取消調(diào)度。當(dāng)timeout或被中斷(響應(yīng)中斷的情況下),會(huì)觸發(fā)變更為此狀態(tài),進(jìn)入該狀態(tài)后的結(jié)點(diǎn)將不會(huì)再變化。
- SIGNAL(-1):表示后繼結(jié)點(diǎn)在等待當(dāng)前結(jié)點(diǎn)喚醒。后繼結(jié)點(diǎn)入隊(duì)時(shí),會(huì)將前繼結(jié)點(diǎn)的狀態(tài)更新為SIGNAL。
- CONDITION(-2):表示結(jié)點(diǎn)等待在Condition上,當(dāng)其他線程調(diào)用了Condition的signal()方法后,CONDITION狀態(tài)的結(jié)點(diǎn)將從等待隊(duì)列轉(zhuǎn)移到同步隊(duì)列中,等待獲取同步鎖。
- PROPAGATE(-3):共享模式下,前繼結(jié)點(diǎn)不僅會(huì)喚醒其后繼結(jié)點(diǎn),同時(shí)也可能會(huì)喚醒后繼的后繼結(jié)點(diǎn)。
- 0:新結(jié)點(diǎn)入隊(duì)時(shí)的默認(rèn)狀態(tài)。
注意,負(fù)值表示結(jié)點(diǎn)處于有效等待狀態(tài),而正值表示結(jié)點(diǎn)已被取消。所以源碼中很多地方用>0、<0來判斷結(jié)點(diǎn)的狀態(tài)是否正常。
3.1 acquire(int)
此方法是獨(dú)占模式下線程獲取共享資源的頂層入口。如果獲取到資源,線程直接返回,否則進(jìn)入等待隊(duì)列,直到獲取到資源為止,且整個(gè)過程忽略中斷的影響。這也正是lock()的語(yǔ)義,當(dāng)然不僅僅只限于lock()。獲取到資源后,線程就可以去執(zhí)行其臨界區(qū)代碼了。下面是acquire()的源碼:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
函數(shù)流程如下:
- tryAcquire()嘗試直接去獲取資源,如果成功則直接返回(這里體現(xiàn)了非公平鎖,每個(gè)線程獲取鎖時(shí)會(huì)嘗試直接搶占加塞一次,而CLH隊(duì)列中可能還有別的線程在等待);
- addWaiter()將該線程加入等待隊(duì)列的尾部,并標(biāo)記為獨(dú)占模式;
- acquireQueued()使線程阻塞在等待隊(duì)列中獲取資源,一直獲取到資源后才返回。如果在整個(gè)等待過程中被中斷過,則返回true,否則返回false。
- 如果線程在等待過程中被中斷過,它是不響應(yīng)的。只是獲取資源后才再進(jìn)行自我中斷selfInterrupt(),將中斷補(bǔ)上。
這時(shí)單憑這4個(gè)抽象的函數(shù)來看流程還有點(diǎn)朦朧,不要緊,看完接下來的分析后,你就會(huì)明白了。就像《大話西游》里唐僧說的:等你明白了舍生取義的道理,你自然會(huì)回來和我唱這首歌的。
3.1.1 tryAcquire(int)
此方法嘗試去獲取獨(dú)占資源。如果獲取成功,則直接返回true,否則直接返回false。這也正是tryLock()的語(yǔ)義,還是那句話,當(dāng)然不僅僅只限于tryLock()。如下是tryAcquire()的源碼:
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
什么?直接throw異常?說好的功能呢?好吧,還記得概述里講的AQS只是一個(gè)框架,具體資源的獲取/釋放方式交由自定義同步器去實(shí)現(xiàn)嗎?就是這里了?。?!AQS這里只定義了一個(gè)接口,具體資源的獲取交由自定義同步器去實(shí)現(xiàn)了(通過state的get/set/CAS)!?。≈劣谀懿荒苤厝?,能不能加塞,那就看具體的自定義同步器怎么去設(shè)計(jì)了?。。‘?dāng)然,自定義同步器在進(jìn)行資源訪問時(shí)要考慮線程安全的影響。
這里之所以沒有定義成abstract,是因?yàn)楠?dú)占模式下只用實(shí)現(xiàn)tryAcquire-tryRelease,而共享模式下只用實(shí)現(xiàn)tryAcquireShared-tryReleaseShared。如果都定義成abstract,那么每個(gè)模式也要去實(shí)現(xiàn)另一模式下的接口。說到底,Doug Lea還是站在咱們開發(fā)者的角度,盡量減少不必要的工作量。
3.1.2 addWaiter(Node)
此方法用于將當(dāng)前線程加入到等待隊(duì)列的隊(duì)尾,并返回當(dāng)前線程所在的結(jié)點(diǎn)。還是上源碼吧:
private Node addWaiter(Node mode) { //以給定模式構(gòu)造結(jié)點(diǎn)。mode有兩種:EXCLUSIVE(獨(dú)占)和SHARED(共享) Node node = new Node(Thread.currentThread(), mode); //嘗試快速方式直接放到隊(duì)尾。 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //上一步失敗則通過enq入隊(duì)。 enq(node); return node; }
不用再說了,直接看注釋吧。
enq(Node)
此方法用于將node加入隊(duì)尾。源碼如下:
private Node enq(final Node node) { //CAS"自旋",直到成功加入隊(duì)尾 for (;;) { Node t = tail; if (t == null) { // 隊(duì)列為空,創(chuàng)建一個(gè)空的標(biāo)志結(jié)點(diǎn)作為head結(jié)點(diǎn),并將tail也指向它。 if (compareAndSetHead(new Node())) tail = head; } else {//正常流程,放入隊(duì)尾 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
如果你看過AtomicInteger.getAndIncrement()函數(shù)源碼,那么相信你一眼便看出這段代碼的精華。CAS自旋volatile變量,是一種很經(jīng)典的用法。還不太了解的,自己去百度一下吧。
3.1.3 acquireQueued(Node, int)
OK,通過tryAcquire()和addWaiter(),該線程獲取資源失敗,已經(jīng)被放入等待隊(duì)列尾部了。
聰明的你立刻應(yīng)該能想到該線程下一部該干什么了吧:進(jìn)入等待狀態(tài)休息,直到其他線程徹底釋放資源后喚醒自己,自己再拿到資源,然后就可以去干自己想干的事了。
沒錯(cuò),就是這樣!是不是跟醫(yī)院排隊(duì)拿號(hào)有點(diǎn)相似~~acquireQueued()就是干這件事:在等待隊(duì)列中排隊(duì)拿號(hào)(中間沒其它事干可以休息),直到拿到號(hào)后再返回。
這個(gè)函數(shù)非常關(guān)鍵,還是上源碼吧:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true;//標(biāo)記是否成功拿到資源 try { boolean interrupted = false;//標(biāo)記等待過程中是否被中斷過 //又是一個(gè)“自旋”! for (;;) { final Node p = node.predecessor();//拿到前驅(qū) //如果前驅(qū)是head,即該結(jié)點(diǎn)已成老二,那么便有資格去嘗試獲取資源(可能是老大釋放完資源喚醒自己的,當(dāng)然也可能被interrupt了)。 if (p == head && tryAcquire(arg)) { setHead(node);//拿到資源后,將head指向該結(jié)點(diǎn)。所以head所指的標(biāo)桿結(jié)點(diǎn),就是當(dāng)前獲取到資源的那個(gè)結(jié)點(diǎn)或null。 p.next = null; // setHead中node.prev已置為null,此處再將head.next置為null,就是為了方便GC回收以前的head結(jié)點(diǎn)。也就意味著之前拿完資源的結(jié)點(diǎn)出隊(duì)了! failed = false; // 成功獲取資源 return interrupted;//返回等待過程中是否被中斷過 } //如果自己可以休息了,就通過park()進(jìn)入waiting狀態(tài),直到被unpark()。如果不可中斷的情況下被中斷了,那么會(huì)從park()中醒過來,發(fā)現(xiàn)拿不到資源,從而繼續(xù)進(jìn)入park()等待。 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true;//如果等待過程中被中斷過,哪怕只有那么一次,就將interrupted標(biāo)記為true } } finally { if (failed) // 如果等待過程中沒有成功獲取資源(如timeout,或者可中斷的情況下被中斷了),那么取消結(jié)點(diǎn)在隊(duì)列中的等待。 cancelAcquire(node); } }
到這里了,我們先不急著總結(jié)acquireQueued()的函數(shù)流程,先看看shouldParkAfterFailedAcquire()和parkAndCheckInterrupt()具體干些什么。
shouldParkAfterFailedAcquire(Node, Node)
此方法主要用于檢查狀態(tài),看看自己是否真的可以去休息了,萬一隊(duì)列前邊的線程都放棄了只是瞎站著,那也說不定,對(duì)吧!
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus;//拿到前驅(qū)的狀態(tài) if (ws == Node.SIGNAL) //如果已經(jīng)告訴前驅(qū)拿完號(hào)后通知自己一下,那就可以安心休息了 return true; if (ws > 0) { /* * 如果前驅(qū)放棄了,那就一直往前找,直到找到最近一個(gè)正常等待的狀態(tài),并排在它的后邊。 * 注意:那些放棄的結(jié)點(diǎn),由于被自己“加塞”到它們前邊,它們相當(dāng)于形成一個(gè)無引用鏈,稍后就會(huì)被保安大叔趕走了(GC回收)! */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //如果前驅(qū)正常,那就把前驅(qū)的狀態(tài)設(shè)置成SIGNAL,告訴它拿完號(hào)后通知自己一下。有可能失敗,人家說不定剛剛釋放完呢! compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
整個(gè)流程中,如果前驅(qū)結(jié)點(diǎn)的狀態(tài)不是SIGNAL,那么自己就不能安心去休息,需要去找個(gè)安心的休息點(diǎn),同時(shí)可以再嘗試下看有沒有機(jī)會(huì)輪到自己拿號(hào)。
parkAndCheckInterrupt()
如果線程找好安全休息點(diǎn)后,那就可以安心去休息了。此方法就是讓線程去休息,真正進(jìn)入等待狀態(tài)。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this);//調(diào)用park()使線程進(jìn)入waiting狀態(tài) return Thread.interrupted();//如果被喚醒,查看自己是不是被中斷的。 }
park()會(huì)讓當(dāng)前線程進(jìn)入waiting狀態(tài)。在此狀態(tài)下,有兩種途徑可以喚醒該線程:
1)被unpark();
2)被interrupt()。
需要注意的是,Thread.interrupted()會(huì)清除當(dāng)前線程的中斷標(biāo)記位。
OK,看了shouldParkAfterFailedAcquire()和parkAndCheckInterrupt(),現(xiàn)在讓我們?cè)倩氐絘cquireQueued(),總結(jié)下該函數(shù)的具體流程:
- 結(jié)點(diǎn)進(jìn)入隊(duì)尾后,檢查狀態(tài),找到安全休息點(diǎn);
- 調(diào)用park()進(jìn)入waiting狀態(tài),等待unpark()或interrupt()喚醒自己;
- 被喚醒后,看自己是不是有資格能拿到號(hào)。如果拿到,head指向當(dāng)前結(jié)點(diǎn),并返回從入隊(duì)到拿到號(hào)的整個(gè)過程中是否被中斷過;如果沒拿到,繼續(xù)流程1。
3.1.4 小結(jié)
OKOK,acquireQueued()分析完之后,我們接下來再回到acquire()!再貼上它的源碼吧:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
再來總結(jié)下它的流程吧:
- 調(diào)用自定義同步器的tryAcquire()嘗試直接去獲取資源,如果成功則直接返回;
- 沒成功,則addWaiter()將該線程加入等待隊(duì)列的尾部,并標(biāo)記為獨(dú)占模式;
- acquireQueued()使線程在等待隊(duì)列中休息,有機(jī)會(huì)時(shí)(輪到自己,會(huì)被unpark())會(huì)去嘗試獲取資源。獲取到資源后才返回。如果在整個(gè)等待過程中被中斷過,則返回true,否則返回false。
- 如果線程在等待過程中被中斷過,它是不響應(yīng)的。只是獲取資源后才再進(jìn)行自我中斷selfInterrupt(),將中斷補(bǔ)上。
由于此函數(shù)是重中之重,我再用流程圖總結(jié)一下:
至此,acquire()的流程終于算是告一段落了。這也就是ReentrantLock.lock()的流程,不信你去看其lock()源碼吧,整個(gè)函數(shù)就是一條acquire(1)?。?!
3.2 release(int)
上一小節(jié)已經(jīng)把a(bǔ)cquire()說完了,這一小節(jié)就來講講它的反操作release()吧。此方法是獨(dú)占模式下線程釋放共享資源的頂層入口。它會(huì)釋放指定量的資源,如果徹底釋放了(即state=0),它會(huì)喚醒等待隊(duì)列里的其他線程來獲取資源。這也正是unlock()的語(yǔ)義,當(dāng)然不僅僅只限于unlock()。下面是release()的源碼:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head;//找到頭結(jié)點(diǎn) if (h != null && h.waitStatus != 0) unparkSuccessor(h);//喚醒等待隊(duì)列里的下一個(gè)線程 return true; } return false; }
邏輯并不復(fù)雜。它調(diào)用tryRelease()來釋放資源。有一點(diǎn)需要注意的是,它是根據(jù)tryRelease()的返回值來判斷該線程是否已經(jīng)完成釋放掉資源了!所以自定義同步器在設(shè)計(jì)tryRelease()的時(shí)候要明確這一點(diǎn)?。?/strong>
3.2.1 tryRelease(int)
此方法嘗試去釋放指定量的資源。下面是tryRelease()的源碼:
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
跟tryAcquire()一樣,這個(gè)方法是需要獨(dú)占模式的自定義同步器去實(shí)現(xiàn)的。正常來說,tryRelease()都會(huì)成功的,因?yàn)檫@是獨(dú)占模式,該線程來釋放資源,那么它肯定已經(jīng)拿到獨(dú)占資源了,直接減掉相應(yīng)量的資源即可(state-=arg),也不需要考慮線程安全的問題。但要注意它的返回值,上面已經(jīng)提到了,release()是根據(jù)tryRelease()的返回值來判斷該線程是否已經(jīng)完成釋放掉資源了!所以自義定同步器在實(shí)現(xiàn)時(shí),如果已經(jīng)徹底釋放資源(state=0),要返回true,否則返回false。
3.2.2 unparkSuccessor(Node)
此方法用于喚醒等待隊(duì)列中下一個(gè)線程。下面是源碼:
private void unparkSuccessor(Node node) { //這里,node一般為當(dāng)前線程所在的結(jié)點(diǎn)。 int ws = node.waitStatus; if (ws < 0)//置零當(dāng)前線程所在的結(jié)點(diǎn)狀態(tài),允許失敗。 compareAndSetWaitStatus(node, ws, 0); Node s = node.next;//找到下一個(gè)需要喚醒的結(jié)點(diǎn)s if (s == null || s.waitStatus > 0) {//如果為空或已取消 s = null; for (Node t = tail; t != null && t != node; t = t.prev) // 從后向前找。 if (t.waitStatus <= 0)//從這里可以看出,<=0的結(jié)點(diǎn),都是還有效的結(jié)點(diǎn)。 s = t; } if (s != null) LockSupport.unpark(s.thread);//喚醒 }
這個(gè)函數(shù)并不復(fù)雜。一句話概括:用unpark()喚醒等待隊(duì)列中最前邊的那個(gè)未放棄線程,這里我們也用s來表示吧。此時(shí),再和acquireQueued()聯(lián)系起來,s被喚醒后,進(jìn)入if (p == head && tryAcquire(arg))的判斷(即使p!=head也沒關(guān)系,它會(huì)再進(jìn)入shouldParkAfterFailedAcquire()尋找一個(gè)安全點(diǎn)。這里既然s已經(jīng)是等待隊(duì)列中最前邊的那個(gè)未放棄線程了,那么通過shouldParkAfterFailedAcquire()的調(diào)整,s也必然會(huì)跑到head的next結(jié)點(diǎn),下一次自旋p==head就成立啦),然后s把自己設(shè)置成head標(biāo)桿結(jié)點(diǎn),表示自己已經(jīng)獲取到資源了,acquire()也返回了?。nd then, DO what you WANT!
3.2.3 小結(jié)
release()是獨(dú)占模式下線程釋放共享資源的頂層入口。它會(huì)釋放指定量的資源,如果徹底釋放了(即state=0),它會(huì)喚醒等待隊(duì)列里的其他線程來獲取資源。
如果獲取鎖的線程在release時(shí)異常了,沒有unpark隊(duì)列中的其他結(jié)點(diǎn),這時(shí)隊(duì)列中的其他結(jié)點(diǎn)會(huì)怎么辦?是不是沒法再被喚醒了?
答案是YES!?。∵@時(shí),隊(duì)列中等待鎖的線程將永遠(yuǎn)處于park狀態(tài),無法再被喚醒!?。〉俏覀?cè)倩仡^想想,獲取鎖的線程在什么情形下會(huì)release拋出異常呢??
- 線程突然死掉了?可以通過thread.stop來停止線程的執(zhí)行,但該函數(shù)的執(zhí)行條件要嚴(yán)苛的多,而且函數(shù)注明是非線程安全的,已經(jīng)標(biāo)明Deprecated;
- 線程被interupt了?線程在運(yùn)行態(tài)是不響應(yīng)中斷的,所以也不會(huì)拋出異常;
- release代碼有bug,拋出異常了?目前來看,Doug Lea的release方法還是比較健壯的,沒有看出能引發(fā)異常的情形(如果有,恐怕早被用戶吐槽了)。除非自己寫的tryRelease()有bug,那就沒啥說的,自己寫的bug只能自己含著淚去承受了。
3.3 acquireShared(int)
此方法是共享模式下線程獲取共享資源的頂層入口。它會(huì)獲取指定量的資源,獲取成功則直接返回,獲取失敗則進(jìn)入等待隊(duì)列,直到獲取到資源為止,整個(gè)過程忽略中斷。下面是acquireShared()的源碼:
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
這里tryAcquireShared()依然需要自定義同步器去實(shí)現(xiàn)。但是AQS已經(jīng)把其返回值的語(yǔ)義定義好了:負(fù)值代表獲取失敗;0代表獲取成功,但沒有剩余資源;正數(shù)表示獲取成功,還有剩余資源,其他線程還可以去獲取。所以這里acquireShared()的流程就是:
tryAcquireShared()嘗試獲取資源,成功則直接返回;失敗則通過doAcquireShared()進(jìn)入等待隊(duì)列,直到獲取到資源為止才返回。
3.3.1 doAcquireShared(int)
此方法用于將當(dāng)前線程加入等待隊(duì)列尾部休息,直到其他線程釋放資源喚醒自己,自己成功拿到相應(yīng)量的資源后才返回。下面是doAcquireShared()的源碼:
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED);//加入隊(duì)列尾部 boolean failed = true;//是否成功標(biāo)志 try { boolean interrupted = false;//等待過程中是否被中斷過的標(biāo)志 for (;;) { final Node p = node.predecessor();//前驅(qū) if (p == head) {//如果到head的下一個(gè),因?yàn)閔ead是拿到資源的線程,此時(shí)node被喚醒,很可能是head用完資源來喚醒自己的 int r = tryAcquireShared(arg);//嘗試獲取資源 if (r >= 0) {//成功 setHeadAndPropagate(node, r);//將head指向自己,還有剩余資源可以再喚醒之后的線程 p.next = null; // help GC if (interrupted)//如果等待過程中被打斷過,此時(shí)將中斷補(bǔ)上。 selfInterrupt(); failed = false; return; } } //判斷狀態(tài),尋找安全點(diǎn),進(jìn)入waiting狀態(tài),等著被unpark()或interrupt() if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
有木有覺得跟acquireQueued()很相似?對(duì),其實(shí)流程并沒有太大區(qū)別。只不過這里將補(bǔ)中斷的selfInterrupt()放到doAcquireShared()里了,而獨(dú)占模式是放到acquireQueued()之外,其實(shí)都一樣,不知道Doug Lea是怎么想的。
跟獨(dú)占模式比,還有一點(diǎn)需要注意的是,這里只有線程是head.next時(shí)(“老二”),才會(huì)去嘗試獲取資源,有剩余的話還會(huì)喚醒之后的隊(duì)友。那么問題就來了,假如老大用完后釋放了5個(gè)資源,而老二需要6個(gè),老三需要1個(gè),老四需要2個(gè)。老大先喚醒老二,老二一看資源不夠,他是把資源讓給老三呢,還是不讓?答案是否定的!老二會(huì)繼續(xù)park()等待其他線程釋放資源,也更不會(huì)去喚醒老三和老四了。獨(dú)占模式,同一時(shí)刻只有一個(gè)線程去執(zhí)行,這樣做未嘗不可;但共享模式下,多個(gè)線程是可以同時(shí)執(zhí)行的,現(xiàn)在因?yàn)槔隙馁Y源需求量大,而把后面量小的老三和老四也都卡住了。當(dāng)然,這并不是問題,只是AQS保證嚴(yán)格按照入隊(duì)順序喚醒罷了(保證公平,但降低了并發(fā))。
setHeadAndPropagate(Node, int)
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; setHead(node);//head指向自己 //如果還有剩余量,繼續(xù)喚醒下一個(gè)鄰居線程 if (propagate > 0 || h == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
此方法在setHead()的基礎(chǔ)上多了一步,就是自己蘇醒的同時(shí),如果條件符合(比如還有剩余資源),還會(huì)去喚醒后繼結(jié)點(diǎn),畢竟是共享模式!
doReleaseShared()我們留著下一小節(jié)的releaseShared()里來講。
3.3.2 小結(jié)
OK,至此,acquireShared()也要告一段落了。讓我們?cè)偈崂硪幌滤牧鞒蹋?/p>
- tryAcquireShared()嘗試獲取資源,成功則直接返回;
- 失敗則通過doAcquireShared()進(jìn)入等待隊(duì)列park(),直到被unpark()/interrupt()并成功獲取到資源才返回。整個(gè)等待過程也是忽略中斷的。
其實(shí)跟acquire()的流程大同小異,只不過多了個(gè)自己拿到資源后,還會(huì)去喚醒后繼隊(duì)友的操作(這才是共享嘛)。
3.4 releaseShared()
上一小節(jié)已經(jīng)把a(bǔ)cquireShared()說完了,這一小節(jié)就來講講它的反操作releaseShared()吧。此方法是共享模式下線程釋放共享資源的頂層入口。它會(huì)釋放指定量的資源,如果成功釋放且允許喚醒等待線程,它會(huì)喚醒等待隊(duì)列里的其他線程來獲取資源。下面是releaseShared()的源碼:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) {//嘗試釋放資源 doReleaseShared();//喚醒后繼結(jié)點(diǎn) return true; } return false; }
此方法的流程也比較簡(jiǎn)單,一句話:釋放掉資源后,喚醒后繼。跟獨(dú)占模式下的release()相似,但有一點(diǎn)稍微需要注意:獨(dú)占模式下的tryRelease()在完全釋放掉資源(state=0)后,才會(huì)返回true去喚醒其他線程,這主要是基于獨(dú)占下可重入的考量;而共享模式下的releaseShared()則沒有這種要求,共享模式實(shí)質(zhì)就是控制一定量的線程并發(fā)執(zhí)行,那么擁有資源的線程在釋放掉部分資源時(shí)就可以喚醒后繼等待結(jié)點(diǎn)。例如,資源總量是13,A(5)和B(7)分別獲取到資源并發(fā)運(yùn)行,C(4)來時(shí)只剩1個(gè)資源就需要等待。A在運(yùn)行過程中釋放掉2個(gè)資源量,然后tryReleaseShared(2)返回true喚醒C,C一看只有3個(gè)仍不夠繼續(xù)等待;隨后B又釋放2個(gè),tryReleaseShared(2)返回true喚醒C,C一看有5個(gè)夠自己用了,然后C就可以跟A和B一起運(yùn)行。而ReentrantReadWriteLock讀鎖的tryReleaseShared()只有在完全釋放掉資源(state=0)才返回true,所以自定義同步器可以根據(jù)需要決定tryReleaseShared()的返回值。
3.4.1 doReleaseShared()
此方法主要用于喚醒后繼。下面是它的源碼:
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h);//喚醒后繼 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head)// head發(fā)生變化 break; } }
3.5 小結(jié)
本節(jié)我們?cè)斀饬霜?dú)占和共享兩種模式下獲取-釋放資源(acquire-release、acquireShared-releaseShared)的源碼,相信大家都有一定認(rèn)識(shí)了。值得注意的是,acquire()和acquireShared()兩種方法下,線程在等待隊(duì)列中都是忽略中斷的。AQS也支持響應(yīng)中斷的,acquireInterruptibly()/acquireSharedInterruptibly()即是,相應(yīng)的源碼跟acquire()和acquireShared()差不多,這里就不再詳解了。
四、簡(jiǎn)單應(yīng)用
通過前邊幾個(gè)章節(jié)的學(xué)習(xí),相信大家已經(jīng)基本理解AQS的原理了。這里再將“框架”一節(jié)中的一段話復(fù)制過來:
不同的自定義同步器爭(zhēng)用共享資源的方式也不同。自定義同步器在實(shí)現(xiàn)時(shí)只需要實(shí)現(xiàn)共享資源state的獲取與釋放方式即可,至于具體線程等待隊(duì)列的維護(hù)(如獲取資源失敗入隊(duì)/喚醒出隊(duì)等),AQS已經(jīng)在頂層實(shí)現(xiàn)好了。自定義同步器實(shí)現(xiàn)時(shí)主要實(shí)現(xiàn)以下幾種方法:
- isHeldExclusively():該線程是否正在獨(dú)占資源。只有用到condition才需要去實(shí)現(xiàn)它。
- tryAcquire(int):獨(dú)占方式。嘗試獲取資源,成功則返回true,失敗則返回false。
- tryRelease(int):獨(dú)占方式。嘗試釋放資源,成功則返回true,失敗則返回false。
- tryAcquireShared(int):共享方式。嘗試獲取資源。負(fù)數(shù)表示失?。?表示成功,但沒有剩余可用資源;正數(shù)表示成功,且有剩余資源。
- tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放后允許喚醒后續(xù)等待結(jié)點(diǎn)返回true,否則返回false。
OK,下面我們就以AQS源碼里的Mutex為例,講一下AQS的簡(jiǎn)單應(yīng)用。
4.1 Mutex(互斥鎖)
Mutex是一個(gè)不可重入的互斥鎖實(shí)現(xiàn)。鎖資源(AQS里的state)只有兩種狀態(tài):0表示未鎖定,1表示鎖定。下邊是Mutex的核心源碼:
class Mutex implements Lock, java.io.Serializable { // 自定義同步器 private static class Sync extends AbstractQueuedSynchronizer { // 判斷是否鎖定狀態(tài) protected boolean isHeldExclusively() { return getState() == 1; } // 嘗試獲取資源,立即返回。成功則返回true,否則false。 public boolean tryAcquire(int acquires) { assert acquires == 1; // 這里限定只能為1個(gè)量 if (compareAndSetState(0, 1)) {//state為0才設(shè)置為1,不可重入! setExclusiveOwnerThread(Thread.currentThread());//設(shè)置為當(dāng)前線程獨(dú)占資源 return true; } return false; } // 嘗試釋放資源,立即返回。成功則為true,否則false。 protected boolean tryRelease(int releases) { assert releases == 1; // 限定為1個(gè)量 if (getState() == 0)//既然來釋放,那肯定就是已占有狀態(tài)了。只是為了保險(xiǎn),多層判斷! throw new IllegalMonitorStateException(); setExclusiveOwnerThread(null); setState(0);//釋放資源,放棄占有狀態(tài) return true; } } // 真正同步類的實(shí)現(xiàn)都依賴?yán)^承于AQS的自定義同步器! private final Sync sync = new Sync(); //lock<-->acquire。兩者語(yǔ)義一樣:獲取資源,即便等待,直到成功才返回。 public void lock() { sync.acquire(1); } //tryLock<-->tryAcquire。兩者語(yǔ)義一樣:嘗試獲取資源,要求立即返回。成功則為true,失敗則為false。 public boolean tryLock() { return sync.tryAcquire(1); } //unlock<-->release。兩者語(yǔ)文一樣:釋放資源。 public void unlock() { sync.release(1); } //鎖是否占有狀態(tài) public boolean isLocked() { return sync.isHeldExclusively(); } }
同步類在實(shí)現(xiàn)時(shí)一般都將自定義同步器(sync)定義為內(nèi)部類,供自己使用;而同步類自己(Mutex)則實(shí)現(xiàn)某個(gè)接口,對(duì)外服務(wù)。當(dāng)然,接口的實(shí)現(xiàn)要直接依賴sync,它們?cè)谡Z(yǔ)義上也存在某種對(duì)應(yīng)關(guān)系?。《鴖ync只用實(shí)現(xiàn)資源state的獲取-釋放方式tryAcquire-tryRelelase,至于線程的排隊(duì)、等待、喚醒等,上層的AQS都已經(jīng)實(shí)現(xiàn)好了,我們不用關(guān)心。
除了Mutex,ReentrantLock/CountDownLatch/Semphore這些同步類的實(shí)現(xiàn)方式都差不多,不同的地方就在獲取-釋放資源的方式tryAcquire-tryRelelase。掌握了這點(diǎn),AQS的核心便被攻破了!
到此這篇關(guān)于Java并發(fā)之AQS與自旋鎖詳解的文章就介紹到這了,更多相關(guān)Java的AQS與自旋鎖內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java后臺(tái)基于POST獲取JSON格式數(shù)據(jù)
這篇文章主要介紹了Java后臺(tái)基于POST獲取JSON格式數(shù)據(jù),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-03-03Java如何比較兩個(gè)對(duì)象并獲取不相等的字段詳解
這篇文章主要給大家介紹了關(guān)于Java如何比較兩個(gè)對(duì)象并獲取不相等的字段以及JAVA判斷(獲取)兩個(gè)相同對(duì)象不同的數(shù)據(jù)的相關(guān)資料,文中通過實(shí)例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用java具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2021-11-11SpringBoot如何取消內(nèi)置Tomcat啟動(dòng)并改用外接Tomcat
這篇文章主要介紹了SpringBoot如何取消內(nèi)置Tomcat啟動(dòng)并改用外接Tomcat,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-11-11Spring Cloud Eureka 服務(wù)上下線監(jiān)控的實(shí)現(xiàn)
這篇文章主要介紹了Spring Cloud Eureka 服務(wù)上下線監(jiān)控的實(shí)現(xiàn),小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-09-09Java實(shí)現(xiàn)評(píng)論回復(fù)功能的完整步驟
這篇文章主要給大家介紹了關(guān)于Java實(shí)現(xiàn)評(píng)論回復(fù)功能的完整步驟,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11Java中Aspose組件進(jìn)行多文檔間的轉(zhuǎn)換方法總結(jié)
在本篇文章里我們給大家分享了關(guān)于Java中Aspose組件進(jìn)行多文檔間的轉(zhuǎn)換方法內(nèi)容,需要的朋友們學(xué)習(xí)下吧。2019-02-02SpringBoot使用Editor.md構(gòu)建Markdown富文本編輯器示例
這篇文章主要介紹了SpringBoot使用Editor.md構(gòu)建Markdown富文本編輯器示例,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-03-03Java項(xiàng)目部署的完整流程(超詳細(xì)!)
我相信很多Java新手都會(huì)遇到這樣一個(gè)問題,跟著教材敲代碼,很容易,但是讓他完整的實(shí)現(xiàn)一個(gè)應(yīng)用項(xiàng)目卻不會(huì),下面這篇文章主要給大家介紹了關(guān)于Java項(xiàng)目部署的完整流程,需要的朋友可以參考下2022-07-07