Java?AQS?線程安全同步隊列的實現(xiàn)
AQS 同步隊列是很多的 Java 線程安全對象的實現(xiàn),例如 ReentrantLock, Semaphore, CountDownLatch, ReentrantReadWriteLock 等等。
AQS 是 AbstractQueuedSynchronizer 的簡稱,它是一個抽象類,我們需要實現(xiàn)其中的一些關鍵方法來完成他的基本功能。
這里簡單介紹一下它的實現(xiàn)方式,當一個線程想要獲取該對象的鎖的時候,會通過方法檢查該線程是否能夠獲取鎖,如果能夠獲取鎖就結束了,完事兒;如果不能夠獲取鎖,就加入同步隊列等待,同時掛起該線程,如果這個時候還有別的線程在競爭該對象的鎖接著加入同步隊列,掛起,當占有這個鎖的線程完事兒后會釋放鎖,釋放時會去檢查同步隊列,取出最先進入隊列的線程,然后把它喚醒,它就獲得了鎖,當它也完事兒釋放后,又喚醒下一個,直到隊列中的等待線程全部喚醒。
網上已經有很多的源碼分析的文章了,所以我想盡可能的簡化分析,很多的細節(jié)我就不說了。
1 自定義 AQS 的重要方法
val qs = object : AbstractQueuedSynchronizer() { /** * 嘗試獲取互斥鎖,如果返回,如果獲取失敗,后續(xù)就會進入同步隊列,同時掛起線程 */ override fun tryAcquire(arg: Int): Boolean { return super.tryAcquire(arg) } /** * 嘗試釋放互斥鎖 */ override fun tryRelease(arg: Int): Boolean { return super.tryRelease(arg) } /** * 嘗試獲取共享鎖,和同步鎖一樣,失敗就進入隊列 */ override fun tryAcquireShared(arg: Int): Int { return super.tryAcquireShared(arg) } /** * 嘗試釋放同步鎖 */ override fun tryReleaseShared(arg: Int): Boolean { return super.tryReleaseShared(arg) } /** * 當前線程是否獲得鎖 */ override fun isHeldExclusively(): Boolean { return super.isHeldExclusively() } }
下面是 JDK 中 ReentrantLock 中不公平鎖的實現(xiàn):
? ? class Sync extends AbstractQueuedSynchronizer { ? ? ? ? private static final long serialVersionUID = -5179523762034025860L; ? ? ? ? /** ? ? ? ? ?* Performs non-fair tryLock. ?tryAcquire is implemented in ? ? ? ? ?* subclasses, but both need nonfair try for trylock method. ? ? ? ? ?*/ ? ? ? ? @ReservedStackAccess ? ? ? ? final boolean nonfairTryAcquire(int acquires) { ? ? ? ? ? ? final Thread current = Thread.currentThread(); ? ? ? ? ? ? int c = getState(); ? ? ? ? ? ? if (c == 0) { ? ? ? ? ? ? ? ? if (compareAndSetState(0, acquires)) { ? ? ? ? ? ? ? ? ? ? setExclusiveOwnerThread(current); ? ? ? ? ? ? ? ? ? ? return true; ? ? ? ? ? ? ? ? } ? ? ? ? ? ? } ? ? ? ? ? ? else if (current == getExclusiveOwnerThread()) { ? ? ? ? ? ? ? ? int nextc = c + acquires; ? ? ? ? ? ? ? ? if (nextc < 0) // overflow ? ? ? ? ? ? ? ? ? ? throw new Error("Maximum lock count exceeded"); ? ? ? ? ? ? ? ? setState(nextc); ? ? ? ? ? ? ? ? return true; ? ? ? ? ? ? } ? ? ? ? ? ? return false; ? ? ? ? } ?? ? ? ? protected final boolean tryAcquire(int acquires) { ? ? ? ? ? ? return nonfairTryAcquire(acquires); ? ? ? ? } ? ? ? ? @ReservedStackAccess ? ? ? ? protected final boolean tryRelease(int releases) { ? ? ? ? ? ? int c = getState() - releases; ? ? ? ? ? ? if (Thread.currentThread() != getExclusiveOwnerThread()) ? ? ? ? ? ? ? ? throw new IllegalMonitorStateException(); ? ? ? ? ? ? boolean free = false; ? ? ? ? ? ? if (c == 0) { ? ? ? ? ? ? ? ? free = true; ? ? ? ? ? ? ? ? setExclusiveOwnerThread(null); ? ? ? ? ? ? } ? ? ? ? ? ? setState(c); ? ? ? ? ? ? return free; ? ? ? ? } ? ? ? ? protected final boolean isHeldExclusively() { ? ? ? ? ? ? // While we must in general read state before owner, ? ? ? ? ? ? // we don't need to do so to check if current thread is owner ? ? ? ? ? ? return getExclusiveOwnerThread() == Thread.currentThread(); ? ? ? ? } ? ? ? ? final ConditionObject newCondition() { ? ? ? ? ? ? return new ConditionObject(); ? ? ? ? } ? ? ? ? // Methods relayed from outer class ? ? ? ? final Thread getOwner() { ? ? ? ? ? ? return getState() == 0 ? null : getExclusiveOwnerThread(); ? ? ? ? } ? ? ? ? final int getHoldCount() { ? ? ? ? ? ? return isHeldExclusively() ? getState() : 0; ? ? ? ? } ? ? ? ? final boolean isLocked() { ? ? ? ? ? ? return getState() != 0; ? ? ? ? } ? ? }
ReentrantLock 只實現(xiàn)了互斥鎖。
- 嘗試獲取互斥鎖
acquires 這個值外部每次都是傳的 1,首先通過 getState() 方法獲取 state。
如果 state 為 0 就表示鎖可以使用,這時通過 CAS 的方式設置新的狀態(tài),如果 CAS 競爭失敗,說明有其他線程同時也在競爭這個鎖,這時就直接返回失敗,如果競爭成功就會通過setExclusiveOwnerThread 方法設置當前線程為擁有者,返回成功。
如果 state 不為 0 但是 owner 就是為當前線程,就表示當前線程調用了多次 lock() 方法,這次就是簡單的在原有的 state 上再加 1,同時返回獲取鎖成功。
其他的情況也就是返回失敗了,失敗了就會和前面說到的就會進入同步隊列,同時掛起當前線程。
- 嘗試釋放互斥鎖
同樣的 releases 每次傳的值也是 1。
首先判斷當前線程是否擁有鎖,如果不擁有鎖直接拋出異常,通過 getState() 獲取上次的狀態(tài),上次的狀態(tài)再減去 releases,就是新的狀態(tài),如果新的狀態(tài)為 0 就表示應該釋放鎖,同時通過 setExclusiveOwnerThread 方法把擁有線程設置為空,同時返回 true,其他情況返回 false。
根據(jù)前面介紹的,如果成功釋放,這時 AQS 還得去檢查同步隊列,拿到最近一個等待鎖的線程,并喚醒。
上面鎖的代碼很簡單,相信你已經看懂了,這也解釋了為什么叫可重入鎖,也就是同一個線程可以多次調用 lock() 方法,同樣的也要對等的調用 unlock() 方法完成解鎖,lock() 和 unlock() 必須成對出現(xiàn)。共享鎖的實現(xiàn)我就不貼了,大同小異。
2 互斥鎖
2.1 沒有出現(xiàn)鎖競爭
前面的 Reentrant 源碼也提到了,在沒有鎖競爭的時候和被其他線程占有鎖的情況下,只是簡單的設置 state 為 1 和 設置 owner 的線程。
這個過程的性能消耗是非常小的,幾乎可以忽略不計。
2.2 鎖競爭失敗或者鎖已經被占用
如果嘗試獲取鎖失敗,就會新建一個 Node 對象(AQS 的隊列實現(xiàn)是雙向鏈表,Node 就是他的節(jié)點實現(xiàn)),其中包含了節(jié)點狀態(tài)和關聯(lián)線程等關鍵信息,創(chuàng)建后將其加入到等待隊列的隊尾,同時將其線程掛起(掛起是使用的 LockSupport.park() 方法,其內部的實現(xiàn)是 C++ 代碼,使用的是 Mutex 和 synchronized 用的是一樣的方法),需要等待占用鎖的線程釋放鎖后,根據(jù)同步隊列的順序把下一個同步隊列的線程喚醒(喚醒使用的是 LockSupport.unpark() 方法)。
這種情況和沒有鎖競爭的情況性能消耗就要大一些了。在進入隊列和釋放鎖的過程中可能會有多次的 CAS 自旋(也就是 CAS 失敗后通過 while 循環(huán)重試,直到成功,這時 CPU 是在空轉);還有關鍵一點是線程的掛起和喚醒是需要操作系統(tǒng)來操作的,也就是會涉及到用戶態(tài)向內核態(tài)的轉換,這個過程是比較消耗性能的。
3 共享鎖
共享鎖如果在理解了互斥鎖的前提下是比較簡單的。
在沒有被互斥鎖占用的情況下(tryAcquireShared() 方法返回 true),共享鎖是每一個線程都不會被掛起。
在互斥鎖被占用的情況下(tryAcquireShared() 方法返回 false),也會創(chuàng)建一個 Node 對象加入到隊列中,不過添加了一個 waiter 對象來標記這是一個共享的節(jié)點,同樣的這個線程也會被掛起,等待互斥鎖被釋放后,按照先后會喚醒該線程,當該線程被喚醒后如果他的下一個節(jié)點也是共享的節(jié)點也會被喚醒,就像多米諾骨牌一樣挨個喚醒所有的共享節(jié)點,直到又被下一個互斥結點把互斥鎖給占用。
4 Condition
AQS 的 Condition 的 await/signal 和 synchronized 的 wait/notify 有異曲同工之妙。在獲取到互斥鎖后可以通過 Condition#await 方法把鎖釋放出去,同時自己被掛起,當獲取到鎖的線程調用 Condition#signal 方法又可以喚醒之前 await 掛起的線程。
在每個 Condition 對象中也會維護一個隊列(和 AQS 中的隊列是分開的,但是都是 Node 對象),每次有獲取鎖的線程調用 await 方法后都會在其中添加一個 Node,會用 CONDITION 標注狀態(tài),同時釋放當前占用的鎖喚醒同步隊列中的下一個線程,并把自己掛起。當有線程調用 signal 方法后,會把 Condition 對象中的頭節(jié)點(如果是 signalAll 就是把全部的節(jié)點都加入到 AQS 隊列中)加入到 AQS 的同步隊列中,同時觸發(fā) await 方法重新去獲取鎖,這里也和前面說的獲取同步鎖一樣,就相當于 signal 后,await 需要重新排隊去獲取互斥鎖。
5 最后
最后再簡單聊一下 synchronized 和 AQS, 在 Java 早期的版本,每次 synchronized 都會去請求 mutex,導致沒有鎖競爭的時候性能不好,在 1.6 版本后加入了多級鎖,性能得到了不錯的提升。
在 Java 對象中定義了一個 Mark Word 空間來記錄對象的一些信息,其中就包括了重要的鎖信息,在對象沒有鎖的時候,一個線程需獲取鎖默認就是偏向鎖, 只需要在對象的 Mark Word 中通過 CAS 設置鎖的類型和鎖屬于的線程 ID,當沒有別的線程競爭那就皆大歡喜,完事兒了;如果在 偏向鎖競爭失敗或者占有偏向鎖的線程還沒有完事兒,那么鎖就會升級成輕量鎖,當然升級后還是之前持有偏向鎖的線程繼續(xù)持有,其中輕量鎖需要在持有的線程中添加一個 Lock Record 來指向對應的對象,對象的 Mark Work 也會添加指向 Thread 對應的 Lock Record,在等待獲取鎖的線程也會通過 CAS 自旋的方式去修改這些值,來嘗試獲取輕量鎖,當自旋超過一定次數(shù)了或者有別的線程來競爭,這時就會升級成 重量鎖,重量鎖也是用了 monitor 鎖,內部也是用 mutex 實現(xiàn)。
synchronized 和 AQS 目前在性能上差距不大,當有多個線程競爭是都會升級成 mutex,不同的是 synchronized 使用起來非常簡單,但是功能沒有那么多,AQS 使用起來比較復雜,但是包含互斥鎖和共享鎖,他們之間的組合能夠完成很多復雜的功能,JDK 中很多的線程安全對象也用到了 AQS。
到此這篇關于Java AQS 線程安全同步隊列的實現(xiàn)的文章就介紹到這了,更多相關Java AQS 線程安全同步隊列內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
java.util.Collections類—emptyList()方法的使用
這篇文章主要介紹了java.util.Collections類—emptyList()方法的使用,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-11-11SpringBoot整合Druid實現(xiàn)數(shù)據(jù)庫連接池和監(jiān)控
Druid是Java語言中使用的比較多的數(shù)據(jù)庫連接池。Druid還提供了強大的監(jiān)控和擴展功能。面將介紹SpringBoot整合Druid實現(xiàn)數(shù)據(jù)庫連接池和監(jiān)控功能,感興趣的可以了解一下2021-08-08使用java實現(xiàn)百萬級別數(shù)據(jù)導出excel的三種方式
這篇文章主要介紹了使用java實現(xiàn)百萬級別數(shù)據(jù)導出excel的三種方式,有些業(yè)務系統(tǒng)可能動輒涉及到百萬上千萬的數(shù)據(jù),用正常的方法效率就變得很低,今天我們來看看這幾種實現(xiàn)思路2023-03-03Java中的InputStreamReader和OutputStreamWriter源碼分析_動力節(jié)點Java學院整理
本文通過示例代碼給大家解析了Java中的InputStreamReader和OutputStreamWriter知識,需要的的朋友參考下吧2017-05-05Java實現(xiàn)Excel文件轉PDF(無水印無限制)
這篇文章主要為大家詳細介紹了如何利用Java語言實現(xiàn)Excel文件轉PDF的效果,并可以無水印、無限制。文中的示例代碼講解詳細,需要的可以參考一下2022-06-06重學SpringBoot3之如何發(fā)送Email郵件功能
這篇文章主要給大家介紹了重學SpringBoot3之如何發(fā)送Email郵件功能的相關資料,文中包括環(huán)境準備、項目配置、代碼實現(xiàn)、最佳實踐和安全性建議,通過采用異步發(fā)送、重試機制、限流等最佳實踐,可以構建一個健壯的郵件發(fā)送系統(tǒng),需要的朋友可以參考下2024-11-11