Java AQS的實現(xiàn)原理詳解
使用
我們這里借助ReentrantLock
來搞清楚AQS的實現(xiàn)原理。
lock
這個方法就是開始獲取鎖運行的入口,在這個方法的實現(xiàn)中,交給了sync
對象來獲取鎖。
public void lock() { sync.acquire(1); } private final Sync sync; // Sync對象是一個ReentrantLock實現(xiàn)的內(nèi)部抽象類,具體的實現(xiàn)又分為了公平版本與非公平兩種 abstract static class Sync extends AbstractQueuedSynchronizer {} // 在ReentrantLock的無參構(gòu)造器中,默認使用的實現(xiàn)就是非公平鎖的實現(xiàn) public ReentrantLock() { sync = new NonfairSync(); } // 也可以通過帶參數(shù)的構(gòu)造器來使用公平鎖 public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
Sync
由于公平鎖FairSync
和NonfairSync
的差別主要在tryAcquire
方法上,別的邏輯都是相同的,因此我們就直接看Sync
和AQS中的實現(xiàn)。
acquire
方法實現(xiàn)如下,來自AQS的實現(xiàn):
// 首先會調(diào)用 tryAcquire 和 acquireQueued 方法,如果2個方法都返回true的話, // 那么才會調(diào)用自行中斷的邏輯 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();
tryAcquire
方法就會因為公平鎖和非公平鎖的差異,有2種不同的實現(xiàn),首先來看看非公平鎖的實現(xiàn),也就是ReentrantLock
的默認策略。
NonfairSync.tryAcquire
這個方法會直接調(diào)用并返回 Sync
實現(xiàn)的 nonfairTryAcquire(acquires)
方法。
Sync類中的實現(xiàn)
// 這里的參數(shù) acquires = 1 final boolean nonfairTryAcquire(int acquires) { // 獲取當前調(diào)用者的線程對象 final Thread current = Thread.currentThread(); // 獲取AQS中定義的state值,這個state值是AQS的核心之一 int c = getState(); // 在ReentrantLock的實現(xiàn)中,state就表示當前是否有線程持有鎖,0代表沒有線程持有鎖, // 當前訪問的線程就可以繼續(xù)執(zhí)行代碼,如果大于0則表示當前持有鎖的線程的數(shù)量。 // 由于ReentrantLock屬于可重入鎖,因此,這個值會>=1 if (c == 0) { // 能進來就表示當前沒有線程持有鎖,那么嘗試用CAS獲取鎖 if (compareAndSetState(0, acquires)) { // 獲取鎖成功,那么將當前線程設置到AQS中的當前線程中 setExclusiveOwnerThread(current); return true; } } // 如果當前持有鎖的就是自己,那么就代表是鎖的重入 else if (current == getExclusiveOwnerThread()) { // 累計持有鎖的次數(shù) int nextc = c + acquires; // 這里就說明了,state能夠設置的最大值就是Int.MAX_VALUE, // 當處于MAX_VALUE的時候再加1,那么Int數(shù)字的最高位就會變成1,符號位為負 if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); // 更新新的state值 setState(nextc); return true; } return false; }
AQS中的實現(xiàn)
private volatile int state; // 當前持有鎖的線程對象 private transient Thread exclusiveOwnerThread; protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; }
小結(jié)一下,非公平鎖tryAcquire
方法就是先看看有沒有線程持有鎖,沒有的話自己就通過CAS的方式嘗試獲取一下鎖,如果獲取鎖成功或者是自己重入,那么tryAcquire
方法就會返回true,acquire
方法中的條件判斷就會直接返回false,lock方法結(jié)束,線程繼續(xù)支持下面的代碼。
FairSync.tryAcquire
下面來看看公平鎖的實現(xiàn),大體的邏輯跟非公平的是相同的。
FairSync中的實現(xiàn)
// 這里的參數(shù) acquire = 1 protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 判斷當前是不是有線程持有鎖 if (c == 0) { // 當前沒有線程持有鎖就進來 // 由于是公平鎖,那么就要保證只有在當前等待隊列為空或者隊列中等待的線程 // 都沒有到運行的條件的時候,才嘗試通過CAS來獲取鎖。否則就去乖乖排隊 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 同非公平鎖,鎖重入 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
AQS中的實現(xiàn)
// 檢查當前AQS等待隊列中是否有正在等待的有效線程節(jié)點 public final boolean hasQueuedPredecessors() { Node h, s; // 首先讓參數(shù)h指向當前隊列的頭部 if ((h = head) != null) { // 隊列不為空 // 將臨時變量賦值為當前第二個節(jié)點 // 這里需要簡單說明一下AQS的等待隊列的構(gòu)成,第一個節(jié)點是沒有業(yè)務含義的, // 只是用作喚醒下一個待執(zhí)行的線程節(jié)點 if ((s = h.next) == null || s.waitStatus > 0) { // 能進來就表示當前隊列只有一個頭結(jié)點,或者第二個節(jié)點的狀態(tài)是已取消 // - > 參考說明1 // 如果第二個節(jié)點不為空,那么就釋放這個引用 s = null; // traverse in case of concurrent cancellation // 從后往前遍歷,找到距離隊列頭最近的有效節(jié)點 for (Node p = tail; p != h && p != null; p = p.prev) { if (p.waitStatus <= 0) s = p; } } // 如果找到了正在隊列中的排隊的有效節(jié)點并且不是當前訪問的線程,那么就返回true if (s != null && s.thread != Thread.currentThread()) return true; } // 頭結(jié)點指向NULL,那么說明隊列是空的,直接返回false return false; }
說明1:這里就引入了隊列節(jié)點中的等待狀態(tài)這個重要的概念,在ReentrantLock
中,我們只需要關(guān)注CANCELLED
和SIGNAL
即可。只有CANCELLED
是大于0的,新節(jié)點的默認值為0。因此只要等待狀態(tài)大于0就代表該節(jié)點被取消了。
// 等待隊列中節(jié)點的等待狀態(tài) volatile int waitStatus; // 當前節(jié)點因為等待超時或者被中斷了被取消 static final int CANCELLED = 1; // 接下來有資格被喚醒獲得鎖的標記,只有獲得了這個標記的節(jié)點才能被執(zhí)行完的線程喚醒 static final int SIGNAL = -1;
小結(jié)一下,公平鎖對比非公平鎖,在最開始有機會獲取鎖的時候,會先檢查一下當前隊列中是否已經(jīng)有線程在排隊等待執(zhí)行了,如果等待隊列中是空的或者沒有有效的排隊節(jié)點,才會獲取鎖。如果獲取鎖成功,或者鎖重入成功,那么同樣會結(jié)束AQS的邏輯,繼續(xù)執(zhí)行業(yè)務代碼。
acquireQueued
上面分析完在tryAcquire
方法中如果成功獲得鎖的情況,就會結(jié)束AQS的邏輯,接下來就來分析未能成功獲得鎖的邏輯,即:acquire
方法中條件判斷的第二個條件判斷。
在AQS中實現(xiàn)
// 這個方法就會將當前線程添加到等待隊列中,并且返回操作是否成功,arg就是傳入的acquire值,為1 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
首先通過addWaiter
方法構(gòu)建一個包含線程對象的節(jié)點并且添加到隊列中
// 這里的參數(shù)為 Node.EXCLUSIVE,表示這是一個排它鎖的實現(xiàn),這里的值為NULL private Node addWaiter(Node mode) { // 構(gòu)建一個線程節(jié)點 Node node = new Node(mode); // 這里就是AQS的核心理念了,通過不斷的自旋,將線程節(jié)點插入到隊列中 for (;;) { // 獲取原來的隊列的隊尾,因為AQS才去的尾插方式 Node oldTail = tail; if (oldTail != null) { // 將新插入的節(jié)點指向原來的尾結(jié)點 node.setPrevRelaxed(oldTail); // 通過CAS的方式將當前節(jié)點設置到線程共享隊列的尾部去,這里要注意, // 凡是涉及到多線程操作的屬性,都需要通過CAS保證操作的原子性 if (compareAndSetTail(oldTail, node)) { oldTail.next = node; // 設置成功,就返回插入的節(jié)點對象;如若不成功, // 就表示有別的線程也修改了尾結(jié)點,那么就要等下一次循環(huán)重試 return node; } } else { // 沒有尾結(jié)點說明隊列不存在,那么就進行初始化 initializeSyncQueue(); } } } // 初始化等待隊列 private final void initializeSyncQueue() { Node h; // 還是通過CAS的方式,給隊列初始化一個默認的Node節(jié)點,幾個重要的屬性的初始值如下 // waitStatus = 0; thread = null if (HEAD.compareAndSet(this, null, (h = new Node()))) tail = h; }
小結(jié)一下,addWaiter
方法通過尾插的方式將沒有搶到鎖的線程封裝為Node
節(jié)點,插入到AQS的等待隊列中。如果隊列還未初始化,那么就先初始化隊列,隊列自帶一個無實際含義的頭結(jié)點。
acquireQueued
在AQS中實現(xiàn)
// arg就是傳入的acquire參數(shù),為1;該方法返回值的含義為,線程在等待過程中是否被中斷 final boolean acquireQueued(final Node node, int arg) { boolean interrupted = false; try { // 還是不斷的自旋,等待機會進行操作 for (;;) { // 獲取新插入節(jié)點的上一個節(jié)點 final Node p = node.predecessor(); // 如果上一個節(jié)點已經(jīng)是頭結(jié)點,那么說明當前就已經(jīng)輪到當前線程獲取鎖執(zhí)行業(yè)務了 // 那么就再次嘗試搶一次鎖 if (p == head && tryAcquire(arg)) { // 能進來就說明獲取鎖成功了 // 那么就將當前線程的節(jié)點設置為頭結(jié)點 // 在這個方法中,會去除節(jié)點中的信息,做一個純粹的頭結(jié)點 setHead(node); // 將已經(jīng)沒有指向的原頭結(jié)點的next指為空,等待回收 p.next = null; // help GC // 這里返回的值為false,因為當前線程并未被阻塞就獲得了鎖 return interrupted; } // 走到這里說明當前線程并沒有獲取到鎖,那么就要考慮是否要將線程阻塞了 if (shouldParkAfterFailedAcquire(p, node)) interrupted |= parkAndCheckInterrupt(); } } catch (Throwable t) { cancelAcquire(node); if (interrupted) selfInterrupt(); throw t; } } private void setHead(Node node) { head = node; node.thread = null; node.prev = null; } // 線程獲取鎖失敗之后,是否需要將線程阻塞,這里2個參數(shù), // pred是新插入節(jié)點的上一個節(jié)點,node是新插入的節(jié)點 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 獲取上一個節(jié)點的等待狀態(tài) int ws = pred.waitStatus; // 判斷上一個節(jié)點的狀態(tài)是不是SIGNAL,只有狀態(tài)為SIGNAL的才是有效的可指向節(jié)點 if (ws == Node.SIGNAL) // 如果上一個節(jié)點是SIGNAL狀態(tài),那就說明當前線程可以連接上該節(jié)點,然后被掛起了 return true; // 上面我們提到過,只有節(jié)點被取消了,等待狀態(tài)才會>0 if (ws > 0) { // 一直往前找,直到找到等待狀態(tài)<=0的,數(shù)據(jù)規(guī)范的話即找到最近的一個等待狀態(tài) // 為SIGNAL的節(jié)點,跳過全部被取消的節(jié)點 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); // 此時pred指向的即第一個合法的線程節(jié)點,指向當前新插入的節(jié)點 pred.next = node; } else { // 這里的意思就是上一個節(jié)點就是有效節(jié)點,那么就將上一個節(jié)點的等待狀態(tài)強制 // 更新為SIGNAL,即-1。毫無意義的那個頭結(jié)點也會被設置為SIGNAL狀態(tài) pred.compareAndSetWaitStatus(ws, Node.SIGNAL); } return false; }
小結(jié)一下,這個方法的含義就是,由于當前線程沒有獲取到鎖資源,因此就需要被阻塞。同時在這個方法中,也會整理等待隊列,將那些已經(jīng)被取消的節(jié)點從隊列中移除。接著就是調(diào)用條件判斷中的執(zhí)行方法,將線程掛起阻塞起來。
private final boolean parkAndCheckInterrupt() { // 調(diào)用了park方法,底層是調(diào)用UnSafe類的park方法來實現(xiàn) LockSupport.park(this); return Thread.interrupted(); }
至此,lock
方法的全部情況都清楚了,如果線程能拿到鎖,那就直接結(jié)束lock階段,要是搶不到鎖,那么就進入等待隊列中,在進入隊列之前,如果發(fā)現(xiàn)還有機會獲取鎖,那么會再次嘗試獲取一次,如若還是獲取不到,那么就以尾插的方式進入等待隊列,通過調(diào)用LockSupport.park方法,將線程阻塞,等待被喚醒。
unlock
主動調(diào)用這個方法以后,就代表對于鎖的占用結(jié)束。
在ReentrantLock中實現(xiàn)
public void unlock() { sync.release(1); }
在AQS中實現(xiàn)
public final boolean release(int arg) { // 調(diào)用tryRelease方法真正實現(xiàn)解除鎖 // 只有state加的所有鎖被解除了,那么才會喚醒下一個線程 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
在Sync中實現(xiàn)
// 這里的releases就是傳入的參數(shù)1,即如果是重入鎖,那么這里需要解鎖多次 protected final boolean tryRelease(int releases) { // 計算state的值,這里的含義就是state-1 int c = getState() - releases; // 如果當前線程不是持有鎖的線程,那么就報錯 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); // 鎖是否完全釋放完的標記 boolean free = false; // state減到0說明鎖已經(jīng)完全釋放完了 if (c == 0) { free = true; setExclusiveOwnerThread(null); } // 更新state的值 setState(c); // 只有鎖被完全釋放完,才返回true return free; }
在AQS中實現(xiàn)
// 喚醒下一個需要鎖的線程,這里的node是頭結(jié)點 private void unparkSuccessor(Node node) { // 獲得頭結(jié)點的等待狀態(tài) int ws = node.waitStatus; // 如果頭結(jié)點是SIGNAL,那么重置為0,因為這個節(jié)點已經(jīng)沒有意義了,會被移除 if (ws < 0) node.compareAndSetWaitStatus(ws, 0); // 獲得頭結(jié)點后面的待喚醒的節(jié)點 Node s = node.next; // 如果這個待喚醒的節(jié)點為空或者等待狀態(tài)不正確,在這里就是不等于SIGNAL if (s == null || s.waitStatus > 0) { s = null; // 從尾部開始查詢,找到合法的待喚醒節(jié)點 for (Node p = tail; p != node && p != null; p = p.prev) if (p.waitStatus <= 0) s = p; } if (s != null) // 喚醒線程 LockSupport.unpark(s.thread); }
cancelAcquire
這個方法提供了取消線程等待獲取鎖的功能
在AQS中被實現(xiàn)
private void cancelAcquire(Node node) { // 健壯性判斷,節(jié)點非空才可以被取消 if (node == null) return; // 將節(jié)點中的線程指向去掉 node.thread = null; // 獲取到當前節(jié)點的上一個節(jié)點 Node pred = node.prev; // 通過循環(huán),跳過所有當前被取消節(jié)點之前的也已經(jīng)被標記取消的節(jié)點 while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 通過上面的循環(huán)以后pred的值就為等待隊列中隊尾的一個合法的未被取消的節(jié)點 // 獲取到該合法節(jié)點下一個指向的節(jié)點 Node predNext = pred.next; // 標記當前被取消的節(jié)點的等待狀態(tài)為被取消 node.waitStatus = Node.CANCELLED; // 如果被取消的節(jié)點就是隊尾的節(jié)點,那么就通過CAS將pred設置為尾結(jié)點 // 就可以拋棄中間那些同樣被標記為被取消的節(jié)點,如果有的是 if (node == tail && compareAndSetTail(node, pred)) { // 將pred的下一個節(jié)點指向NULL,因為pred現(xiàn)在就是隊尾 pred.compareAndSetNext(predNext, null); } else { // 這說明取消的節(jié)點不是尾結(jié)點,而是中間的節(jié)點 // 這個值會在下面的條件判斷被賦值為上一個節(jié)點的等待狀態(tài) int ws; // 如果找到的上一個合法節(jié)點不是頭結(jié)點 // 并且上一個節(jié)點的等待狀態(tài)是SIGNAL,并且將不是SIGNAL狀態(tài)的負數(shù)狀態(tài)轉(zhuǎn)換為SIGNAL // 并且線程不為空 if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) && pred.thread != null) { // 能進來就說明被取消的節(jié)點處于中間,那么就要將這個node從隊列中跳過 Node next = node.next; // 如果被取消的節(jié)點是一個有效的節(jié)點,不為空并且狀態(tài)也是對的 if (next != null && next.waitStatus <= 0) // 那么就將上一個節(jié)點指向被取消節(jié)點的下一個節(jié)點 pred.compareAndSetNext(predNext, next); } else { // 能進入這里說明上一個合法節(jié)點已經(jīng)是頭結(jié)點了, // 那么就說明被取消的這個節(jié)點已經(jīng)是原本除了頭結(jié)點以外的最靠前面的節(jié)點, // 那么被取消的這個節(jié)點其實就等價于頭結(jié)點了,應該喚醒后面還在等待的線程節(jié)點 // 喚醒下一個被掛起的線程,具體已經(jīng)分析過了,這里就省略了 unparkSuccessor(node); } node.next = node; } }
以上就是Java AQS的實現(xiàn)原理詳解的詳細內(nèi)容,更多關(guān)于Java AQS的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
java自帶命令行工具jmap、jhat與jinfo的使用實例代碼詳解
本篇文章主要通過代碼實例對java自帶命令行工具jmap、jhat與jinfo的使用做出了詳解,需要的朋友可以參考下2017-04-04Java圖形化界面設計之布局管理器之BorderLayout案例詳解
這篇文章主要介紹了Java圖形化界面設計之布局管理器之BorderLayout案例詳解,本篇文章通過簡要的案例,講解了該項技術(shù)的了解與使用,以下就是詳細內(nèi)容,需要的朋友可以參考下2021-08-08