Java AQS(AbstractQueuedSynchronizer)源碼解析
AbstractQueuedSynchronizer被稱為隊(duì)列同步器,簡(jiǎn)稱為大家熟知的AQS,這個(gè)類可以稱作concurrent包的基礎(chǔ),該類提供了同步的基本功能。該類包括如下幾個(gè)核心要素:
- AQS內(nèi)部維護(hù)一個(gè)volatile修飾的state變量,state用于標(biāo)記鎖的狀態(tài);
- AQS通過(guò)內(nèi)部類Node記錄當(dāng)前是哪個(gè)線程持有鎖;
- AQS通過(guò)LockSupport的park和unPark方法來(lái)阻塞和喚醒線程;
- AQS通過(guò)node來(lái)維護(hù)一個(gè)隊(duì)列,用于保存所有阻塞的線程。
下面通過(guò)剖析源碼來(lái)看看AQS是如何工作的。
AQS概要
AQS通過(guò)內(nèi)部類Node記錄當(dāng)前是哪個(gè)線程持有鎖,Node中有一個(gè)前驅(qū)節(jié)點(diǎn)和一個(gè)后繼節(jié)點(diǎn),形成一個(gè)雙向鏈表,這個(gè)鏈表是一種CLH隊(duì)列,其中waitStatus表示當(dāng)前線程的狀態(tài),其可能的取值包括以下幾種:
- SIGNAL(-1),表示后繼線程已經(jīng)或者即將被阻塞,當(dāng)前線程釋放鎖或者獲取鎖失敗后需要喚醒后繼線程;
- CANCELLED(1),表示當(dāng)前線程因?yàn)槌瑫r(shí)或者中斷被取消,這個(gè)狀態(tài)不可以被修改;
- CONDITION(-2),當(dāng)前線程為條件等待,其狀態(tài)設(shè)置0之后才能去競(jìng)爭(zhēng)鎖;
- PROPAGATE(-3),表示共享鎖釋放之后需要傳遞給后繼節(jié)點(diǎn),只有頭結(jié)點(diǎn)的才會(huì)有該狀態(tài);
- 0,該狀態(tài)為初始值,不屬于上面任意一種狀態(tài)。
Node對(duì)象中還有一個(gè)nextWaiter變量,指向下一個(gè)條件等待節(jié)點(diǎn),相當(dāng)于在CLH隊(duì)列的基礎(chǔ)上維護(hù)了一個(gè)簡(jiǎn)單的單鏈表來(lái)關(guān)聯(lián)條件等待的節(jié)點(diǎn)。
static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; final boolean isShared() { return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } ... 構(gòu)造方法 ... }
Node提供了兩種入隊(duì)列的方法,即enq和addWaiter,enq方法如下所示,當(dāng)尾節(jié)點(diǎn)tail為null時(shí),表明阻塞隊(duì)列還沒(méi)有被初始化,通過(guò)CAS操作來(lái)設(shè)置頭結(jié)點(diǎn),頭結(jié)點(diǎn)為new Node(),實(shí)際上頭結(jié)點(diǎn)中沒(méi)有阻塞的線程,算得上是一個(gè)空的節(jié)點(diǎn)(注意空節(jié)點(diǎn)和null是不一樣的),然后進(jìn)行tail=head操作,這也說(shuō)明當(dāng)head=tail的時(shí)候,隊(duì)列中實(shí)際上是不存在阻塞線程的,然后將需要入隊(duì)列的node放入隊(duì)列尾部,將tail指向node。
private Node enq(final Node node) { for (;;) { Node t = tail; //如果tail為空,說(shuō)明CLH隊(duì)列沒(méi)有被初始化, if (t == null) { //初始化CLH隊(duì)列,將head和tail指向一個(gè)new Node(), //此時(shí)雖然CLH有一個(gè)節(jié)點(diǎn),但是并沒(méi)有真正意義的阻塞線程 if (compareAndSetHead(new Node())) tail = head; } else { //將node放入隊(duì)列尾部,并通過(guò)cas將tail指向node node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
addWaiter通常表示添加一個(gè)條件等待的節(jié)點(diǎn)入隊(duì)列,該方法首先嘗試通過(guò)CAS操作快速入隊(duì)列,如果失敗則通過(guò)調(diào)用enq來(lái)入隊(duì)列。
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); //嘗試快速入隊(duì)列 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //快速入隊(duì)列失敗則采用enq方入隊(duì)列 enq(node); return node; }
Node還提供了喚醒后繼節(jié)點(diǎn)線程的功能,主要是通過(guò)LockSupport來(lái)實(shí)現(xiàn)的,源碼如下所示,
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
排他獲取鎖
不支持中斷的獲取鎖\color{green}{不支持中斷的獲取鎖}不支持中斷的獲取鎖
//不可中斷的獲取鎖 public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //對(duì)中斷做補(bǔ)償,中斷當(dāng)前線程 selfInterrupt(); }
acquire方法首先會(huì)調(diào)用tryAcquire方法嘗試獲取鎖,如果獲取鎖失敗,首先通過(guò)addWaiter將當(dāng)前線程放入CLH隊(duì)列中,然后通過(guò)acquireQueued方法獲取鎖,acquireQueued方法源碼如下所示:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { //記錄中斷狀態(tài) boolean interrupted = false; //自旋式的獲取鎖 for (;;) { final Node p = node.predecessor(); //當(dāng)前線程為CLH中的第一個(gè)阻塞線程才會(huì)嘗試去獲取鎖 if (p == head && tryAcquire(arg)) { //獲取成功則更新head setHead(node); p.next = null; // help GC failed = false; //返回中斷狀態(tài) return interrupted; } //判斷中斷信息 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { //如果獲取鎖失敗,則取消獲取鎖的操作 if (failed) cancelAcquire(node); } }
acquireQueued方法是無(wú)中斷的獲取鎖,該方法有一個(gè)布爾類型的返回值,該值不是表示是否成功獲取鎖,而是標(biāo)示當(dāng)前線程的中斷狀態(tài),因?yàn)閍cquireQueued方法是無(wú)法響應(yīng)中斷的,需要對(duì)中斷進(jìn)行補(bǔ)償,這個(gè)補(bǔ)償體現(xiàn)在acquire方法中。
//模板方法tryAcquire需要子類進(jìn)行具體實(shí)現(xiàn) protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
支持中斷的獲取鎖
//可中斷的獲取鎖 public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
acquireInterruptibly方法獲取鎖的過(guò)程中能夠響應(yīng)中斷,主要體現(xiàn)在獲取鎖之前會(huì)判斷一下當(dāng)前線程的中斷中斷狀態(tài),若中斷則拋出InterruptedException,然后通過(guò)tryAcquire獲取鎖,獲取成功直接返回,獲取失敗則通過(guò)doAcquireInterruptibly獲取鎖,該方法和acquireQueued最大的區(qū)別就是在判斷parkAndCheckInterrupt后,acquireQueued僅僅記錄中斷狀態(tài),parkAndCheckInterrupt則會(huì)拋出異常。
private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //拋出異常,響應(yīng)中斷 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
支持超時(shí)時(shí)間的獲取鎖功能
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { //響應(yīng)中斷 if (Thread.interrupted()) throw new InterruptedException(); //首先通過(guò)tryAcquire快速獲取鎖,若失敗則調(diào)用doAcquireNanos方法 return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); }
從方法tryAcquireNanos的源碼可以看出,該方法也是響應(yīng)中斷的,該方法首先調(diào)用模板方法tryAcquire快速的獲取鎖,如果失敗則通過(guò)doAcquireNanos獲取鎖,doAcquireNanos中支持超時(shí)機(jī)制,其源碼如下所示:
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } nanosTimeout = deadline - System.nanoTime(); //判斷如果超時(shí)則直接返回false,代表獲取鎖失敗 if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
doAcquireNanos方法與acquireQueued方法的區(qū)別是每次循環(huán)獲取鎖過(guò)程中都會(huì)計(jì)算deadline和當(dāng)前時(shí)間的差值,如果這個(gè)差值小于0,則表示獲取鎖的操作已經(jīng)超時(shí),則直接返回false表示獲取鎖失敗。
共享鎖獲取
AQS中不僅支持排他鎖的獲取,即acquire、acquireInterruptibly和tryAcquireNanos,還提供了共享鎖的獲取操作方法,包括acquireShared、acquireSharedInterruptibly和tryAcquireSharedNanos,這三個(gè)方法源碼如下所示:
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); }
共享鎖的獲取和排他鎖的獲取方法類似,共享鎖調(diào)用了不同的模板方法tryAcquireShared,這里介紹一下doAcquireShared方法,其他方法變化的套路和共享鎖的使用套路一樣,doAcquireShared方法源碼如下所示:
private void doAcquireShared(int arg) { //當(dāng)前線程入隊(duì)列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; //自旋式的獲取鎖 for (;;) { final Node p = node.predecessor(); //只有隊(duì)列中的第一個(gè)阻塞線程才能獲取鎖 if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC //獲取鎖成功,補(bǔ)償中斷 if (interrupted) selfInterrupt(); failed = false; return; } } //通過(guò)interrupted記錄中斷信息 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
doAcquireShared方法沒(méi)有返回值,與acquireQueued不同的是:
- doAcquireShared沒(méi)有返回值,該方法的中斷補(bǔ)償是在方法內(nèi)完成的,獲取鎖成功之后,會(huì)判斷中斷信息interrupted的狀態(tài),如果為true則調(diào)用selfInterrupt()方法中斷當(dāng)前線程;
- 獲取鎖成功之后不是簡(jiǎn)單的設(shè)置head,而是通過(guò)setHeadAndPropagate方法來(lái)設(shè)置頭結(jié)點(diǎn)和并且判斷后繼節(jié)點(diǎn)的信息,對(duì)后繼節(jié)點(diǎn)中的線程進(jìn)行喚醒操作等,setHeadAndPropagate方法源碼如下所示:
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; //設(shè)置新的頭結(jié)點(diǎn) setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; //如果后繼節(jié)點(diǎn)為空或者為SHARED類型的節(jié)點(diǎn),執(zhí)行doReleaseShared方法 if (s == null || s.isShared()) doReleaseShared(); } } private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { //狀態(tài)為SIGNAL,則喚醒后繼節(jié)點(diǎn)中的線程 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h); } //若狀態(tài)為0,則設(shè)置狀態(tài)為PROPAGATE else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) break; } }
鎖的釋放
鎖的釋放也分為釋放排他鎖和釋放共享鎖,分別為release方法和releaseShared方法,源碼如下所示,
//釋放排他鎖 public final boolean release(int arg) { //釋放鎖,然后喚醒后繼節(jié)點(diǎn)的線程 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } //釋放共享鎖 public final boolean releaseShared(int arg) { //釋放鎖,然后調(diào)用doReleaseShared方法 if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
release方法和releaseShared方法分別調(diào)用模板方法tryRelease和tryReleaseShared來(lái)釋放鎖,release方法中直接通過(guò)調(diào)用unparkSuccessor喚醒后繼線程,而releaseShared的喚醒操作在doReleaseShared方法中進(jìn)行。
取消獲取鎖
當(dāng)獲取鎖失敗時(shí),需要進(jìn)行一些狀態(tài)清理和變化,cancelAcquire方法就是用來(lái)實(shí)現(xiàn)這些功能的,其源碼如下所示,
private void cancelAcquire(Node node) { if (node == null) return; //節(jié)點(diǎn)線程置為null node.thread = null; //從CLH隊(duì)列中清除已經(jīng)取消的節(jié)點(diǎn)(CANCELLED) Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; Node predNext = pred.next; node.waitStatus = Node.CANCELLED; //判斷如果node是尾部節(jié)點(diǎn),則設(shè)置尾部節(jié)點(diǎn) if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { int ws; //若不是頭節(jié)點(diǎn)則直接從CLH隊(duì)列中清除當(dāng)前節(jié)點(diǎn) if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); //若為頭結(jié)點(diǎn),則喚醒后繼節(jié)點(diǎn)中的線程 } else { unparkSuccessor(node); } node.next = node; // help GC } }
取消獲取鎖的操作首先將隊(duì)列中處于CANCELLED狀態(tài)的節(jié)點(diǎn)剔除,然后根據(jù)當(dāng)前節(jié)點(diǎn)在CLH隊(duì)列中的位置進(jìn)行不同的操作:
- node在隊(duì)列尾部,則重新設(shè)置CLH隊(duì)列的尾部節(jié)點(diǎn);
- node為頭結(jié)點(diǎn),喚醒后繼節(jié)點(diǎn)中的線程;
- node既不是頭結(jié)點(diǎn)也不是尾節(jié)點(diǎn),則在CLH中剔除node。
總結(jié)
AQS是整個(gè)concurrent包的基礎(chǔ),可重入鎖、線程池、信號(hào)量(Semaphore)等同步工具類都需要借助AQS來(lái)完成,了解AQS是深入學(xué)習(xí)concurrent包的前提。
以上就是Java AQS(AbstractQueuedSynchronizer)源碼解析的詳細(xì)內(nèi)容,更多關(guān)于Java AQS源碼的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
springboot讀取resource配置文件生成容器對(duì)象的示例代碼
這篇文章主要介紹了springboot讀取resource配置文件生成容器對(duì)象的示例代碼,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-07-07一文詳解Spring?Boot可以同時(shí)處理多少請(qǐng)求
SpringBoot是一個(gè)流行的Java開(kāi)發(fā)框架,它被廣泛用于構(gòu)建Web應(yīng)用程序,但是,開(kāi)發(fā)人員通常會(huì)擔(dān)心它的性能問(wèn)題,特別是在高負(fù)載條件下,Spring?Boot能夠同時(shí)處理多少請(qǐng)求是一個(gè)重要的問(wèn)題,在本文中,我們將討論SpringBoot的請(qǐng)求處理能力,并介紹如何提高性能2023-10-10基于java類路徑classpath和包的實(shí)例講解
下面小編就為大家分享一篇基于java類路徑classpath和包的實(shí)例講解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-01-01解決因缺少Log4j依賴導(dǎo)致應(yīng)用啟動(dòng)失敗的問(wèn)題
日志是應(yīng)用軟件中不可缺少的部分,Apache的開(kāi)源項(xiàng)目log4j是一個(gè)功能強(qiáng)大的日志組件,提供方便的日志記錄。但這篇文章不是介紹Log4j,這篇文章主要介紹了關(guān)于因缺少Log4j依賴導(dǎo)致應(yīng)用啟動(dòng)失敗問(wèn)題的相關(guān)資料,需要的朋友可以參考下。2017-04-04Springboot之idea之pom文件圖標(biāo)不對(duì)問(wèn)題
這篇文章主要介紹了Springboot之idea之pom文件圖標(biāo)不對(duì)問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-04-04mybatis plus代碼生成工具的實(shí)現(xiàn)代碼
這篇文章主要介紹了mybatis plus代碼生成工具的實(shí)現(xiàn)代碼,需要的朋友可以參考下2021-04-04