一文帶你深入理解Java?AbstractQueuedSynchronizer
前言
在并發(fā)編程中,鎖是一種保證線程安全的方式,Java 主要有兩種鎖機制,一種是 synchronized
關(guān)鍵字修飾的鎖,通過 JVM 層面的指令碼來控制鎖(依賴于底層的操作系統(tǒng));而另一種則是 JUC 包下的各類同步器如 ReentrantLock
可重入鎖,那么這類同步器是怎么樣實現(xiàn)鎖機制的呢?
其實 ReentrantLock
這類的鎖是基于 AbstractQueuedSynchronizer
(文章后面都稱為 AQS)實現(xiàn)的。那么為什么 AQS 能夠?qū)崿F(xiàn)鎖機制呢?
本篇文章將會深入講解 AQS 的數(shù)據(jù)結(jié)構(gòu)及實現(xiàn)原理。
AQS 概述
AQS 是什么?
AQS 直譯為抽象隊列同步器,是用來構(gòu)建鎖和同步器的重量級基礎(chǔ)框架。JUC包下的鎖和同步器如 ReentrantLock
、Semaphore
、ReentrantReadWriteLock
、CountDownLatch
等都是基于 AQS 實現(xiàn)的。
AQS 的原理
在并發(fā)場景下,多線程存在搶占共享資源的情況,那么必定存在搶占不到資源的線程需要進行排隊等待,并且當資源釋放時也需要喚醒這些線程進行資源爭搶,所以 AQS 提供了一套線程等待阻塞以及線程喚醒的機制來實現(xiàn)多線程下線程安全。
AQS 通過 維護一個 int 類型的狀態(tài)變量和一個 FIFO 的虛擬雙向隊列(CLH 隊列鎖的變體) 來實現(xiàn)線程等待和喚醒機制的。
原理大致為:當線程請求共享資源空閑時,AQS 會將當前線程設(shè)置為有效的工作線程并通過 CAS 的方式將狀態(tài)變量設(shè)置為鎖定狀態(tài);當線程獲取共享資源失敗時,AQS 會將線程及等待狀態(tài)封裝成一個 Node
節(jié)點,將其加入隊列中;當共享資源被釋放時,AQS 會喚醒隊列中的下一個節(jié)點再次嘗試獲取共享資源。
下文將通過源碼分析具體展示 AQS 是如何實現(xiàn)線程等待阻塞以及喚醒的。
如何自定義同步器
上面提到 ReentrantLock
、Semaphore
、ReentrantReadWriteLock
、CountDownLatch
等同步器都是基于 AQS 實現(xiàn)的(其實就是繼承 AQS),那么是不是只要繼承 AQS 也可以實現(xiàn)自定義同步器呢?
淺看 AQS 源碼,可以看到 AQS 是抽象類。其實 AQS 是基于模板模式設(shè)計的,也就是說 AQS 已經(jīng)提供了一套線程等待阻塞以喚醒的實現(xiàn),不同的同步器只需要繼承 AQS 類并重寫 AQS 指定的方法來定制自己的獲取資源和釋放資源邏輯即可。
那么有哪些方法是可以進行重寫的呢?
通過源碼會發(fā)現(xiàn) AQS 的所有方法只有 5 個方法是可以重寫的,其余要不是 private
就是 final
修飾的方法。
下面列出這五個方法:
// (獨占式)嘗試獲取資源,成功則返回true,失敗則返回false。 protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } // (獨占式)嘗試釋放資源,成功則返回true,失敗則返回false。 protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } // (共享式)嘗試獲取資源,負數(shù)表示失?。?表示成功,但沒有剩余可用資源;正數(shù)表示成功,且有剩余資源。 protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } // (共享式)嘗試釋放資源,成功則返回true,失敗則返回false。 protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } // 判斷當前線程是否正在獨占式,只有用到condition才需要去實現(xiàn)它。 protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); }
上面代碼可以看到 AQS 分為獨占式和共享式兩種獲取共享資源的方式:
- 獨占式(Exclusive):只有一個線程能獲取共享資源,如
ReentrantLock
可重入鎖。 - 共享式(Share):多個線程能同時獲取共享資源,如
Semaphore
、CountDownLatCh
、CyclicBarrier
等 JUC 工具類。
需要實現(xiàn)獨占式的同步器就去重寫?yīng)氄际降姆椒?,需要實現(xiàn)共享式的同步器就去重寫共享式的方法(注意這里重寫的方法必須都是內(nèi)部線程安全的,并且盡可能地簡短)。
下面以獨占式的 ReentrantLock
為例,看看 ReentrantLock
源碼中是如何繼承 AQS 并重寫方法實現(xiàn)同步器的。
可以看到 ReentrantLock
通過內(nèi)部類 Sync
繼承 AQS 并在 Sync
中重寫了獨占式的 tryRelease()
方法,然后將獨占式的 tryAcquire()
交給 Sync
的兩個子類(也就是公平與非公平)去按照各自的邏輯實現(xiàn)。
基本上面提到同步器都是通過依賴內(nèi)部類 Sync
繼承 AQS 實現(xiàn)地同步器,所以需要自定義同步器的也可以仿照這樣的方式創(chuàng)建。
AQS 的核心數(shù)據(jù)結(jié)構(gòu)
前文說道 AQS 是基于 CLH 隊列鎖的變體 實現(xiàn)的,所以 AQS 的核心數(shù)據(jù)結(jié)構(gòu)就是這個 CLH 隊列鎖的變體,在了解 AQS 的核心數(shù)據(jù)結(jié)構(gòu)前,還需要先介紹一下 CLH 隊列鎖是什么?
CLH 隊列鎖是什么?
相信大家都聽說過自旋鎖吧,自旋鎖是互斥鎖的一種實現(xiàn)方式,通過 CAS 的方式獲取鎖和釋放鎖來實現(xiàn)互斥鎖。但是自旋鎖存在鎖饑餓問題和鎖競爭激烈下性能較差問題。
CLH 隊列鎖是對自旋鎖的改進,有效地解決了自旋鎖上面的兩個問題,通過隊列的方式可以防止鎖饑餓問題,同時實現(xiàn)了鎖狀態(tài)去中心化,讓每個線程可以在不同的狀態(tài)變量下自旋,從而來減少 CPU 的性能開銷。
CLH 隊列鎖可以看成是一個單向鏈表隊列,將所有請求共享資源的線程封裝成 節(jié)點(包含 線程標識 和 被鎖定狀態(tài)) 排列在隊列中,如下圖。
原理流程:
- CLH 隊列鎖會維護一個
tail
用于指向隊列的末尾節(jié)點,初始化時tail
會指向一個被鎖定狀態(tài)為false
的空節(jié)點。 - 當有新的線程需要獲取共享資源時,會先將被鎖定狀態(tài)置為
true
,然后通過getAndSet
的原子操作方式獲取tail
所指向的節(jié)點并判斷節(jié)點的被鎖定狀態(tài)是否為false
,狀態(tài)為false
則獲得共享資源,為true
則加入隊列,并將tail
指向當前節(jié)點,使得當前節(jié)點稱為新的末尾節(jié)點。 - 入隊的節(jié)點會以輪詢的方式訪問上一節(jié)點的被鎖定狀態(tài),也就是說只有上一節(jié)點釋放共享資源(被鎖定狀態(tài)為
false
)后,當前節(jié)點才能獲取到共享資源(上述的共享資源都能同于鎖)。
可以看到 CLH 隊列鎖通過隊列實現(xiàn)了公平鎖,先入隊的線程會先獲得共享資源,解決了鎖饑餓問題;并且每個節(jié)點的被鎖定狀態(tài)只會影響到其后一個節(jié)點,實現(xiàn)了鎖去中心化從而減少 CPU 的開銷。
AQS 對 CLH 隊列鎖的改進
雖然 CLH 隊列鎖已經(jīng)具有良好的性能了,但是因為存在自旋所以依舊存在 CPU 開銷問題,并且 CLH 隊列鎖本身的功能單一,不能支持復(fù)雜的功能。
所以 AQS 對 CLH 隊列鎖進行了改進,使用 LockSupport
類將自旋改為阻塞線程操作(后續(xù)源碼會具體介紹)來減少 CPU 的開銷,擴展節(jié)點的狀態(tài)以及顯示的維護前驅(qū)和后續(xù)節(jié)點。
AQS 使用內(nèi)部類 Node
來實現(xiàn) CLH 隊列鎖的變體,也就是 AQS 的核心數(shù)據(jù)結(jié)構(gòu)。
下面看看 AQS 的內(nèi)部類 Node
的源碼:
static final class Node { // 共享模式 static final Node SHARED = new Node(); // 獨占模式 static final Node EXCLUSIVE = null; // 由于超時、中斷或其他原因,線程被取消 static final int CANCELLED = 1; // 當前節(jié)點的后繼節(jié)點阻塞等待共享資源 static final int SIGNAL = -1; // 當前節(jié)點在條件隊列 static final int CONDITION = -2; // 當前節(jié)點的下一個acquireShared應(yīng)無條件傳播 static final int PROPAGATE = -3; // 節(jié)點狀態(tài) volatile int waitStatus; // 前驅(qū)節(jié)點 volatile Node prev; // 后繼節(jié)點 volatile Node next; // 節(jié)點的線程 volatile Thread thread; // 下一個等待者(這個用于 Condition,這里不做過多說明) Node nextWaiter; // 是否是共享模式 final boolean isShared() { return nextWaiter == SHARED; } // 獲取前驅(qū)節(jié)點,為空則拋出異常 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } // 各種構(gòu)造器 Node() { } Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { this.waitStatus = waitStatus; this.thread = thread; } }
(說明:AQS 實際上有兩種隊列,一個是使用雙向鏈表實現(xiàn)(利用 prev
和 next
)的隊列,一個是使用單向鏈表實現(xiàn)(利用 nextWaiter
)的 Condition 隊列,文章主要講解 AQS 最為核心的雙向鏈表隊列,關(guān)于 Condition 的內(nèi)容本文不做講解,有興趣的可以通過閱讀源碼)
從上面的源碼中可以看到內(nèi)部類 Node
中三個比較重要的屬性:
waitStatus
節(jié)點狀態(tài)
CANCELLED=1
:由于超時、中斷或其他原因,當前節(jié)點的線程被取消。SIGNAL=-1
:當前節(jié)點的后繼節(jié)點的線程阻塞等待共享資源(也就是線程已經(jīng)準備好了就等共享資源釋放了)。CONDITION=-2
:當前節(jié)點在條件隊列中。PROPAGATE=-3
:當前節(jié)點的下一個acquireShared
應(yīng)無條件傳播。
前驅(qū)節(jié)點和后繼節(jié)點
prev
:前驅(qū)節(jié)點next
:后繼節(jié)點
除此之外還有一個常用的方法 predecessor()
用于獲取當前節(jié)點的前驅(qū)節(jié)點,如果前驅(qū)節(jié)點為空則拋出異常。
AQS 的 CLH 隊列鎖變體如下圖:
圖中有 head
和 tail
兩個變量分別指向隊列頭部和末尾節(jié)點,這是 AQS 類的屬性(后續(xù)源碼分析中介紹)。并且隊列的頭節(jié)點是哨兵節(jié)點(只用來占位,沒有線程),其實就是 new Node()
初始化一個空節(jié)點(后續(xù)源碼中介紹)。
AQS 源碼分析
前面已經(jīng)說明了 AQS 的核心數(shù)據(jù)結(jié)構(gòu),接下來將會通過源碼去進一步的了解 AQS 是如何憑借 同步狀態(tài)變量 和 CLH 隊列鎖的變體 實現(xiàn)線程等待喚醒機制的。
1. 繼承 AOS 類
先了解一下 AQS 繼承的父類 AOS 為 AQS 提供了什么方法。
AQS 繼承 AbstractOwnableSynchronizer
抽象類,AbstractOwnableSynchronizer
類中有exclusiveOwnerThread
屬性,表示獨占式下資源的所有者(持有共享資源的線程)。同時包含該屬性的 get/set
方法用于設(shè)置當前獨占資源的線程和獲取當前獨占資源的線程。因為繼承關(guān)系也就意味著 AQS 可以調(diào)用這兩個方法。
2. AQS 的屬性
接下來了解一下 AQS 的屬性,更好地去認識 AQS 的數(shù)據(jù)結(jié)構(gòu)以及如何獲取和設(shè)置同步狀態(tài)變量。
查看源碼:
// 序列號 private static final long serialVersionUID = 7373984972572414691L; // 頭結(jié)點 private transient volatile Node head; // 尾結(jié)點 private transient volatile Node tail; // 同步狀態(tài) private volatile int state; // 自旋時間 static final long spinForTimeoutThreshold = 1000L; // Unsafe實例以及各種內(nèi)存偏移量 private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long stateOffset; private static final long headOffset; private static final long tailOffset; private static final long waitStatusOffset; private static final long nextOffset;
這里主要關(guān)注 head
頭結(jié)點、tail
尾結(jié)點、state
同步狀態(tài)即可。
AQS 中通過 Getter/Setter
方法以及 CAS 的方式設(shè)置和獲取 state
屬性。
// 獲取同步狀態(tài) protected final int getState() { return state; } // 設(shè)置同步狀態(tài) protected final void setState(int newState) { state = newState; } // CAS方式設(shè)置同步狀態(tài)(當期待值expect等于當前同步狀態(tài)時,將同步狀態(tài)設(shè)置為update值) protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
3. AQS 如何實現(xiàn)線程等待機制
通過前文可以了解到 ReentrantLock
是如何繼承 AQS 的,那么下面以 ReentrantLock
的非公平鎖為例深入 AQS 源碼中分析 AQS 的 核心方法 以及 AQS 如何結(jié)合 同步狀態(tài) 和 CLH 隊列鎖的變體 實現(xiàn)線程的等待機制的。
首先創(chuàng)建非公平鎖的可重入鎖 Lock lock = new ReentrantLock();
,此時 AQS 的 state
同步狀態(tài)為0,表示沒有線程占用共享資源。
從 lock.lock()
開始看 AQS 如何實現(xiàn)多線程搶占式下的線程等待阻塞
進入非公平鎖的 lock()
方法,查看源碼:
可以看到調(diào)用 compareAndSetState()
用于以 CAS 的方式(原子操作)設(shè)置同步狀態(tài) state
。
- 如果
state
為 0 時,將其設(shè)置為 1,并返回 true 執(zhí)行setExclusiveOwnerThread()
方法將當前線程設(shè)置為共享資源所有者; - 如果
state
為 1 時,返回 false 進入acquire(1)
方法中。
AQS 核心方法 acquire()
AQS 實現(xiàn)線程獲取資源以及線程等待阻塞的核心方法,查看源碼:
可以看到這里共有三個 AQS 核心方法:tryAcquire()
、addWaiter()
、acquireQueued()
。
下面根據(jù) if
語句從左到右執(zhí)行的順序依次進入這三個方法。
tryAcquire()
方法:自定義獲取共享資源的邏輯
這個方法如果前文看的仔細的話,會發(fā)現(xiàn)它是需要子類重寫的方法,也就是由子類自己定義搶占式獲取資源的邏輯。
進入非公平鎖實現(xiàn)的 tryAcquire()
方法
發(fā)現(xiàn)該方法調(diào)用了 nonfairTryAcquire()
方法,查看源碼:
// 這里調(diào)用的是 nonfairTryAcquire(1),也就是acquires為 1 final boolean nonfairTryAcquire(int acquires) { // 獲取當前線程 final Thread current = Thread.currentThread(); // 獲取同步狀態(tài) int c = getState(); // 同步狀態(tài)為0,說明共享資源空閑可以直接搶占 if (c == 0) { // 再次嘗試搶占 if (compareAndSetState(0, acquires)) { // 成功則將當前線程設(shè)置為共享資源所有者 setExclusiveOwnerThread(current); // 返回 true return true; } } // 當前共享資源還在被占用,判斷當前線程是不是共享資源所有者 else if (current == getExclusiveOwnerThread()) { // 增加同步狀態(tài)(這里其實也就是 ReentrantLock 是可重入鎖的原因,如果是自己線程持有的鎖,可以再次加鎖,也就是可重入) int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); // 更新同步狀態(tài) setState(nextc); // 返回 true return true; } // 如果上面情況都不是則返回 false return false; }
這個方法其實體現(xiàn)的主要是 ReentrantLock 可重入鎖的邏輯,執(zhí)行流程如下:
- 先判斷當前共享資源是否空閑,空閑說明沒有線程占用,當前線程可以再次嘗試搶占,搶占成功則將當前線程設(shè)置為共享資源所有者并返回 true,這樣后續(xù)的
addWaiter()
、acquireQueued()
就無需執(zhí)行了。 - 當前共享資源被占用中,判斷當前線程是不是共享資源所有者,是則可以再次加鎖,這也就是可重入鎖的意義(持有鎖的線程可以重復(fù)加鎖,但是在解鎖的時候也要確保加多少鎖解多少鎖,要使同步狀態(tài)恢復(fù)為 0,否則會一直阻塞),更新同步狀態(tài)后返回 true,這樣后續(xù)的
addWaiter()
、acquireQueued()
也無需執(zhí)行了。 - 如果上面兩種情況都不是,則返回 false,進入
acquire()
的下一個addWaiter()
方法。
addWaiter()
方法:將當前線程結(jié)合模式封裝成節(jié)點加入隊列
該方法是 AQS 中的方法,查看源碼:
private Node addWaiter(Node mode) { // 以當前線程以及 mode(這里是獨占式)為參數(shù)構(gòu)建節(jié)點 Node node = new Node(Thread.currentThread(), mode); // 獲取隊列末尾節(jié)點 Node pred = tail; // 判斷末尾節(jié)點是否為空 if (pred != null) { // 末尾不為空時,將當前節(jié)點鏈接到莫尾結(jié)點后面,并且將當前節(jié)點設(shè)置為末尾節(jié)點 node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; // 返回當前節(jié)點 return node; } } // 末尾節(jié)點為空,則進入 enq 方法 enq(node); return node; }
執(zhí)行流程:
- 根據(jù)傳入的參數(shù)構(gòu)造節(jié)點,此時節(jié)點的狀態(tài)
waitStatus
為 0。 - 判斷末尾節(jié)點是否為空,這里其實就是看看之前有沒有創(chuàng)建過隊列,有則直接加入末尾節(jié)點后面,并通過CAS 將當前節(jié)點設(shè)置為末尾節(jié)點。
- 沒有則進入
enq()
方法創(chuàng)建隊列并加入節(jié)點。
enq()
方法:初始化隊列(加入哨兵節(jié)點為頭結(jié)點)并加入當前節(jié)點
該方法是 AQS 中的方法,查看源碼:
private Node enq(final Node node) { // 循環(huán)直到將 node 加入隊列為止 for (;;) { Node t = tail; // 判斷末尾節(jié)點是否為空 if (t == null) { // 為空,說明需要設(shè)置哨兵節(jié)點 if (compareAndSetHead(new Node())) tail = head; } else { // 不為空,直接加入末尾,并設(shè)置當前節(jié)點為末尾節(jié)點 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
執(zhí)行流程:
- 根據(jù)末尾節(jié)點是否為空,判斷是否有哨兵節(jié)點,沒有則使用 CAS 的方式將
new Node()
加入頭節(jié)點。 - 因為是循環(huán),所以最終會直到 node 加入末尾節(jié)點為止才會終止,并且將當前節(jié)點返回(通過循環(huán)確保節(jié)點一定加入隊列)。
這里也就是前文數(shù)據(jù)結(jié)構(gòu)圖中為什么頭節(jié)點是哨兵節(jié)點的原因,如圖:
acquireQueued()
方法:隊列中輪詢搶占共享資源,搶占不成功則進入阻塞等待喚醒
addWaiter()
執(zhí)行完加入隊列后會將節(jié)點返回作為參數(shù)執(zhí)行 acquireQueued()
,查看源碼:
// node 為當前加入隊列的節(jié)點,arg 為 1 final boolean acquireQueued(final Node node, int arg) { // 標志,用來判斷是否執(zhí)行后面的cancelAcquire方法 // 也就是中途是否存在異常中斷,有則使用cancelAcquire整理隊列 boolean failed = true; try { // 中斷標志 boolean interrupted = false; // 循環(huán) for (;;) { // 獲取當前節(jié)點的前置節(jié)點 final Node p = node.predecessor(); // 判斷前置節(jié)點是否是頭結(jié)點(當前節(jié)點是否是隊列第一個等待線程)以及再次嘗試獲取資源 if (p == head && tryAcquire(arg)) { // 滿足條件說明當前節(jié)點已經(jīng)搶占到資源,可以從隊列中移除 // 所以這里會將當前節(jié)點設(shè)置為頭結(jié)點(setHead方法操作是先將節(jié)點線程以及前驅(qū)節(jié)點置空) setHead(node); // 并且將原本的頭結(jié)點置空,通過輔助GC回收 p.next = null; // help GC // 標志置為false,表示沒有異常 failed = false; // 返回中斷標志 return interrupted; } // 上述條件不滿足,則執(zhí)行shouldParkAfterFailedAcquire和parkAndCheckInterrupt if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) // 整理隊列,將隊列中中斷的進程移除 cancelAcquire(node); } }
這里需要分情況討論:
- 假設(shè)當前的線程是隊列的第一個線程(也就是前驅(qū)節(jié)點是頭節(jié)點),并且執(zhí)行該方法時,共享資源已經(jīng)空閑,此時該線程符合前驅(qū)節(jié)點是頭節(jié)點并且該線程搶占到資源,那么就會將該線程移除隊列(AQS 的做法是將該線程的節(jié)點的線程以及前驅(qū)節(jié)點置空,并設(shè)置該節(jié)點為頭結(jié)點;然后將原本的頭結(jié)點置空交給輔助GC回收即可),移除隊列后會將
failed
標志置為 false,表示無需整理隊列并返回中斷標志(因為并沒有中斷請求所以這里中斷標志是 false,表示無需執(zhí)行中斷)。 - 假設(shè)當前的線程是隊列的第一個線程但是執(zhí)行該方法時共享資源仍被占用中或爭搶失敗(因為是非公平鎖)或者當前線程不是隊列第一個線程時,因為前面的條件無法滿足,便會去執(zhí)行
shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()
方法,下面查看這兩個方法的源碼。
shouldParkAfterFailedAcquire()
方法:更改前置節(jié)點的狀態(tài)為 SIGNAL
該方法是 AQS 中的方法,查看源碼:
// 傳參 pred 為當前節(jié)點的前置節(jié)點;node 為 當前節(jié)點 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 獲取前置節(jié)點的狀態(tài) int ws = pred.waitStatus; // 判斷是不是 Node.SIGNAL(-1) if (ws == Node.SIGNAL) // 狀態(tài)為-1表示線程已經(jīng)準備好了(阻塞等待了),就等資源釋放進行爭搶了 // 返回 true 執(zhí)行 parkAndCheckInterrupt() 方法(這玩意就是阻塞線程) return true; // 判斷狀態(tài)是否大于0,表示被中斷 if (ws > 0) { //前置節(jié)點為被中斷,意味著這個節(jié)點沒用了,需要再往前找,找到不是被中斷的狀態(tài)為止 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); // 找到后將新的前置節(jié)點與當前節(jié)點鏈接起來 pred.next = node; } else { // CAS 方式將前置節(jié)點的狀態(tài)設(shè)置為 Node.SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } // 返回 false 直接跳出 return false; }
這個方法就是當循環(huán)無法搶占到資源時,便會去更新當前線程節(jié)點的前置節(jié)點狀態(tài),更新完狀態(tài)之后等待下一次循環(huán)如果還是搶占不到資源,就會去執(zhí)行 parkAndCheckInterrupt()
,使用 LockSupport
把當前線程阻塞等待喚醒。
parkAndCheckInterrupt()
方法:阻塞當前線程
該方法是 AQS 中的方法,查看源碼:
private final boolean parkAndCheckInterrupt() { // 調(diào)用 LockSupport 類進行阻塞當前線程 LockSupport.park(this); // 返回當前線程是否中斷,同時清除中斷標志位 return Thread.interrupted(); }
這里也就是前文提到的 AQS 對 CLH 隊列鎖的改進,不再通過自旋鎖的方式輪詢前一個節(jié)點的狀態(tài),而是嘗試兩次搶占后還是搶占不到就進入阻塞狀態(tài),等待資源釋放后喚醒,這樣做可以減少 CPU 開銷。
該方法返回的是當前線程是否被中斷的結(jié)果,被中斷則返回 true 使得前面的 acquireQueued()
方法將中斷標志置為 true,在搶占到資源后會根據(jù)中斷標志最終進入語句執(zhí)行 selfInterrupt()
方法將當前線程中斷。
cancelAcquire
方法:當前線程出現(xiàn)異常時整理隊列移除當前線程節(jié)點
該方法是 AQS 中的方法,查看源碼:
// 參數(shù)為當前節(jié)點 private void cancelAcquire(Node node) { // 當前節(jié)點為空直接返回 if (node == null) return; // 將當前節(jié)點的線程置空 node.thread = null; // 這一步是找到當前節(jié)點的前驅(qū)節(jié)點 Node pred = node.prev; // 如果前驅(qū)節(jié)點狀態(tài)也為 CANCELLED 則循環(huán)找前驅(qū)節(jié)點的前驅(qū)節(jié)點直到狀態(tài)不為 CANCELLED while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 獲取前驅(qū)節(jié)點的下一個節(jié)點 Node predNext = pred.next; // 將當前節(jié)點設(shè)置為 CANCELLED node.waitStatus = Node.CANCELLED; // 判斷當前節(jié)點是否是末尾節(jié)點,是則只需要將末尾節(jié)點設(shè)置成獲取到的前驅(qū)節(jié)點即可 // 相當于移除掉了前驅(qū)節(jié)點后的所有節(jié)點 if (node == tail && compareAndSetTail(node, pred)) { // 并將前驅(qū)節(jié)點的下一節(jié)點置空 compareAndSetNext(pred, predNext, null); } else { int ws; // 滿足三個條件 // 1. 前驅(qū)節(jié)點不是頭結(jié)點 // 2. 前驅(qū)節(jié)點的狀態(tài)為 SIGNAL 或者 前驅(qū)節(jié)點狀態(tài)小于0并且可以設(shè)置為 SIGNAL // 3. 前驅(qū)節(jié)點的線程不為空 if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { // 保存當前節(jié)點的后繼節(jié)點 Node next = node.next; // 后繼節(jié)點不為空并且狀態(tài)不為 CANCELLED if (next != null && next.waitStatus <= 0) // 將前驅(qū)節(jié)點與后繼節(jié)點鏈接起來,移除中間的節(jié)點 compareAndSetNext(pred, predNext, next); } else { // 以上條件不滿足則喚醒當前線程的下一個節(jié)點 unparkSuccessor(node); } // 輔助GC回收 node.next = node; // help GC } }
執(zhí)行過程:
當前線程不能正常退出時執(zhí)行該方法,也就是出現(xiàn)異常時執(zhí)行該方法
先判斷該線程節(jié)點是否為空,為空則直接返回;不為空則將當前節(jié)點的線程置空
通過循環(huán)的方式找到當前節(jié)點的前置節(jié)點(這個節(jié)點狀態(tài)不能為 CANCELLED),并記錄前置節(jié)點的下一節(jié)點
將當前節(jié)點狀態(tài)置為 CANCELLED,分情況討論當前節(jié)點的位置
- 第一種:當前節(jié)點為末尾節(jié)點,這種情況只需要將當前節(jié)點的前置節(jié)點設(shè)置為末尾節(jié)點,并將前置節(jié)點的下一節(jié)點置空即可移除異常節(jié)點,異常節(jié)點最后通過輔助GC回收
- 第二種:當前節(jié)點位于頭結(jié)點和尾結(jié)點之間,這種情況需要獲取當前節(jié)點的后繼節(jié)點,再當前節(jié)點后繼節(jié)點不為空時,將前置節(jié)點與后繼節(jié)點鏈接起來即可移除異常節(jié)點,最后通過輔助GC回收
- 第三種:當前節(jié)點為頭結(jié)點,此時調(diào)用
unparkSuccessor()
方法喚醒當前節(jié)點的后繼節(jié)點即可,這樣在資源被搶占后會將頭結(jié)點設(shè)置為后繼節(jié)點,當前節(jié)點也就會被置空并通過輔助GC回收
執(zhí)行流程圖
4. AQS 如何實現(xiàn)線程喚醒機制
假設(shè)當前 AQS 的 state
同步狀態(tài)為 1,表示當前有線程占用共享資源。
從 lock.unlock()
開始看 AQS 如何實現(xiàn)多線程搶占式下的線程喚醒
深入源碼,調(diào)用了 ReentrantLock
的 unlock()
方法,查看源碼:
進入 release()
方法,也就是 AQS 的 release()
方法,查看源碼:
可以看到 AQS 的 release()
先調(diào)用 ReentrantLock
重寫的 tryRelease()
進行判斷。
tryRelease()
方法:自定義釋放資源邏輯
查看 ReentrantLock
中的源碼:
protected final boolean tryRelease(int releases) { // 傳入releases 為 1, getState() 為 1 // 所以 c = 1 - 1 = 0 int c = getState() - releases; // 判斷當前線程是否是共享資源所有者,不是則拋出異常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 判斷c是否等于0,共享資源是否空閑 if (c == 0) { // 更新free標志 free = true; // 將共享資源所有者置空 setExclusiveOwnerThread(null); } // 更新state setState(c); // 返回free標志 return free; }
這里是可重入鎖 ReentrantLock
自定義的釋放資源邏輯,過程較為易懂,就是將 當前的同步狀態(tài)變量−參數(shù)releases當前的同步狀態(tài)變量 - 參數(shù)releases當前的同步狀態(tài)變量−參數(shù)releases 得到新的同步狀態(tài)變量。如果為 0 說明共享資源已被當前線程釋放返回 true。
返回 release
方法體,進入 if
語句,判斷頭結(jié)點不為空并且狀態(tài)不為 0,表示隊列存在并且頭結(jié)點的后繼節(jié)點已經(jīng)準備好獲取共享資源了,進入 unparkSuccessor()
方法。
unparkSuccessor
方法:喚醒后繼節(jié)點的線程
private void unparkSuccessor(Node node) { // 獲取當前結(jié)點的狀態(tài) int ws = node.waitStatus; // 判斷是否小于0,重新將狀態(tài)置為0 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 獲取當前結(jié)點的后繼節(jié)點 Node s = node.next; // 如果后繼節(jié)點為空或者后繼節(jié)點狀態(tài)為 CANCELLED if (s == null || s.waitStatus > 0) { // 后繼節(jié)點置空 s = null; // 通過從后向前遍歷找到離當前節(jié)點最近的后一個節(jié)點并且該節(jié)點狀態(tài)小于0 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) // 如果狀態(tài)小于0說明已經(jīng)準備好獲取資源了,替換掉原先的后繼節(jié)點 s = t; } // 后繼節(jié)點不為空 if (s != null) // 喚醒該后繼節(jié)點的線程 LockSupport.unpark(s.thread); }
該方法就是獲取傳入節(jié)點的后繼節(jié)點,并且保證該節(jié)點的狀態(tài)小于0即已經(jīng)準備好獲取共享資源了,通過 LockSupport
的 unpark()
方法喚醒后繼節(jié)點的線程。
到這里就是 AQS 實現(xiàn)線程喚醒的全部過程了,AQS 的線程喚醒機制是通過上一個節(jié)點來喚醒當前節(jié)點的線程。
以上就是一文帶你深入理解Java AbstractQueuedSynchronizer的詳細內(nèi)容,更多關(guān)于Java AbstractQueuedSynchronizer的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
mybatisplus實現(xiàn)自動創(chuàng)建/更新時間的項目實踐
Mybatis-Plus提供了自動填充功能,可以通過實現(xiàn)MetaObjectHandler接口來實現(xiàn)自動更新時間的功能,本文就來介紹一下mybatisplus實現(xiàn)自動創(chuàng)建/更新時間的項目實踐,感興趣的可以了解下2024-01-01Java中關(guān)于ThreadLocal的隱式引用詳解
這篇文章主要介紹了Java中關(guān)于ThreadLocal的隱式引用,從線程的角度看,每個線程都保持一個對其線程局部變量副本的隱式引用,只要線程是活動的,ThreadLocal實例就是可訪問的,下面我們來具體看看2024-03-03Java中的HashMap和ConcurrentHashMap區(qū)別和適用場景
HashMap和ConcurrentHashMap在對null值的處理、線程安全性、性能等方面存在顯著的區(qū)別,HashMap允許鍵和值為null,適用于單線程環(huán)境下的數(shù)據(jù)存儲和查詢場景;而ConcurrentHashMap不允許鍵和值為null,適用多線程環(huán)境下的數(shù)據(jù)存儲和查詢場景,具有線程安全性和較高的并發(fā)性能2025-01-01spring中的注解@@Transactional失效的場景代碼演示
這篇文章主要介紹了spring中的注解@@Transactional失效的場景代碼演示,@Transactional注解是Spring框架提供的用于聲明事務(wù)的注解,作用于類和方法上,需要的朋友可以參考下2024-01-01基于SpringBoot解決CORS跨域的問題(@CrossOrigin)
這篇文章主要介紹了基于SpringBoot解決CORS跨域的問題(@CrossOrigin),具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-01-01IDEA解決src和resource下創(chuàng)建多級目錄的操作
這篇文章主要介紹了IDEA解決src和resource下創(chuàng)建多級目錄的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-02-02