Java并發(fā) 結(jié)合源碼分析AQS原理
前言:
如果說(shuō)J.U.C包下的核心是什么?那我想答案只有一個(gè)就是AQS。那么AQS是什么呢?接下來(lái)讓我們一起揭開(kāi)AQS的神秘面紗
AQS是什么?
AQS是AbstractQueuedSynchronizer的簡(jiǎn)稱(chēng)。為什么說(shuō)它是核心呢?是因?yàn)樗峁┝艘粋€(gè)基于FIFO的隊(duì)列和state變量來(lái)構(gòu)建鎖和其他同步裝置的基礎(chǔ)框架。下面是其底層的數(shù)據(jù)結(jié)構(gòu)。
AQS的特點(diǎn)
1、其內(nèi)使用Node實(shí)現(xiàn)FIFO(FirstInFirstOut)隊(duì)列。可用于構(gòu)建鎖或者其他同步裝置的基礎(chǔ)框架
2、且利用了一個(gè)int類(lèi)表示狀態(tài)。在AQS中維護(hù)了一個(gè)volatile int state,通常表示有線程訪問(wèn)資源的狀態(tài),當(dāng)state>1的時(shí)候表示線程重入的數(shù)量,主要有三個(gè)方法控制:getState(),setState(),CompareAndSetState()。后面的源碼分析多用到這幾個(gè)方法
3、使用方法是繼承,子類(lèi)通過(guò)繼承并通過(guò)實(shí)現(xiàn)它的方法管理其狀態(tài)(acquire和release)的方法操縱狀態(tài)。
4、同時(shí)實(shí)現(xiàn)排它鎖和共享鎖模式。實(shí)際上AQS功能主要分為兩類(lèi):獨(dú)占(只有一個(gè)線程能執(zhí)行)和共享(多個(gè)線程同時(shí)執(zhí)行),它的子類(lèi)要么使用獨(dú)占功能要么使用共享功能,而ReentrantLock是通過(guò)兩個(gè)內(nèi)部類(lèi)來(lái)實(shí)現(xiàn)獨(dú)占和共享
CountDownLatch如何借助AQS實(shí)現(xiàn)計(jì)數(shù)功能?
先來(lái)說(shuō)一下CountDownLatch,CountDownLatch是一個(gè)同步輔助類(lèi),通過(guò)它可以來(lái)完成類(lèi)似阻塞當(dāng)前線程的功能,即一個(gè)或多個(gè)線程一起等待,直到其他線程執(zhí)行的操作完成。要實(shí)現(xiàn)上面的功能,CountDownLatch是通過(guò)一個(gè)給定的原子操作的計(jì)數(shù)器來(lái)實(shí)現(xiàn)。調(diào)用該類(lèi)的await()方法的線程會(huì)一直處于阻塞狀態(tài),直到其他線程調(diào)用countDown()方法使得計(jì)數(shù)器的值變?yōu)?之后線程才會(huì)執(zhí)行,這個(gè)計(jì)數(shù)器是不能被重置的。通常這個(gè)類(lèi)會(huì)用在程序執(zhí)行需要等待某個(gè)條件完成的場(chǎng)景,比如說(shuō)并行計(jì)算,可將一個(gè)數(shù)據(jù)量很大的計(jì)算拆分成一個(gè)個(gè)子任務(wù),當(dāng)子任務(wù)完成之后,再將最終的結(jié)果匯總。每次訪問(wèn)CountDownLatch只能有一個(gè)線程,但是這個(gè)線程在使用完countDown()方法之后能多個(gè)線程能繼續(xù)運(yùn)行,而調(diào)用await()方法的線程就一定要計(jì)數(shù)器為0才會(huì)運(yùn)行
下面來(lái)分析CountDownLatch的源碼以及如何使用AQS框架
public class CountDownLatch { /** * CountDownLatch 實(shí)現(xiàn)同步控制 * 底層是使用AQS的state來(lái)代表count */ private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; //初始化內(nèi)部類(lèi)實(shí)際上是設(shè)置AQS的state Sync(int count) { setState(count); } int getCount() { return getState(); } //嘗試獲取共享是看當(dāng)前的state是否為0 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } /*嘗試釋放共享鎖則是遞減計(jì)數(shù)直到state==0就返回false代表資源已經(jīng)釋放完全否則就會(huì)使用CAS來(lái)讓state減一*/ protected boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } private final Sync sync; /** * 初始化CountDownLatch,實(shí)際上是初始化內(nèi)部類(lèi),實(shí)際上是設(shè)置AQS的state,count不能小于0 */ public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } /** * 這里實(shí)際上是調(diào)用了AQS里的acquireSharedInterruptibly方法,完成的功能就是先去查看線程是否被中斷,中斷則拋出異常,沒(méi)有被中斷就會(huì)嘗試獲取共享資源。 * 注意在syn內(nèi)部類(lèi)中重寫(xiě)了tryAcquireShared,也就是當(dāng)state為0就返回1,這時(shí)候就會(huì)將當(dāng)前線程放入AQS的隊(duì)列中去,也就是這時(shí)候線程可以不再阻塞而是嘗試去獲取鎖 */ public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } /** * 原理同上面方法,但是加了一個(gè)時(shí)間參數(shù)來(lái)設(shè)置等待的時(shí)間 */ public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } /** * 這里傳入?yún)?shù)為1,同樣上面內(nèi)部類(lèi)一樣重寫(xiě)了AQS的tryReleaseShared方法,使用這個(gè)重寫(xiě)的方法來(lái)讓計(jì)數(shù)器原子操作的減一 */ public void countDown() { sync.releaseShared(1); } /** * 就是獲取AQS的state */ public long getCount() { return sync.getCount(); } /** * 轉(zhuǎn)換成字符串的方法 */ public String toString() { return super.toString() + "[Count = " + sync.getCount() + "]"; } }
由上面代碼可看見(jiàn)CountDownLatch實(shí)現(xiàn)了AQS的共享鎖,原理是操作state來(lái)實(shí)現(xiàn)計(jì)數(shù),并且重寫(xiě)了tryAcquireShared(),tryReleaseShared()等方法
Semaphore是如何借助AQS實(shí)現(xiàn)控制并發(fā)訪問(wèn)線程個(gè)數(shù)?
Semaphore的功能類(lèi)似于操作系統(tǒng)的信號(hào)量,可以很方便的控制某個(gè)資源同時(shí)被幾個(gè)線程訪問(wèn),即做并發(fā)訪問(wèn)控制,與CountDownLatch類(lèi)似,同樣是實(shí)現(xiàn)獲取和釋放兩個(gè)方法。Semaphore的使用場(chǎng)景:常用于僅能提供訪問(wèn)的資源,比如數(shù)據(jù)庫(kù)的連接數(shù)最大只有30,而應(yīng)用程序的并發(fā)數(shù)可能遠(yuǎn)遠(yuǎn)大于30,這時(shí)候就可以使用Semaphore來(lái)控制同時(shí)訪問(wèn)的線程數(shù)。當(dāng)Semaphore控制線程數(shù)到1的時(shí)候就和我們單線程一樣了。同樣Semaphore說(shuō)是信號(hào)量的意思,我們這里就可以把它理解為十字路口的紅綠燈,可以控制車(chē)流量(這里是控制線程數(shù))
下面來(lái)分析Semaphore的源碼以及如何使用AQS框
public class Semaphore implements java.io.Serializable { private static final long serialVersionUID = -3222578661600680210L; /** 所有機(jī)制都通過(guò)AbstractQueuedSynchronizer子類(lèi)實(shí)現(xiàn) */ private final Sync sync; /** * 同樣是通過(guò)內(nèi)部類(lèi)來(lái)實(shí)現(xiàn)AQS主要功能,使用state來(lái)表示許可證數(shù)量 */ abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L; Sync(int permits) { setState(permits); } final int getPermits() { return getState(); } /* * 不公平的獲取方式,會(huì)有一個(gè)搶占鎖的情況,即線程執(zhí)行順序會(huì)亂 */ final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } /* * 釋放資源 */ protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } } final void reducePermits(int reductions) { for (;;) { int current = getState(); int next = current - reductions; if (next > current) // underflow throw new Error("Permit count underflow"); if (compareAndSetState(current, next)) return; } } final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } } } /** * 不公平的sync版本,使用的就是sync定義的不公平鎖 */ static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } } /** * 公平版本,獲取鎖的線程順序就是線程啟動(dòng)的順序。具體是使用hasQueuedPredecessors()方法判斷“當(dāng)前線程”是不是CLH隊(duì)列中的第一個(gè)線程。 * 若不是的話,則返回-1,是就設(shè)置獲取許可證,并檢查許可證數(shù)量是否足夠 */ static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } } /** * 默認(rèn)使用不公平的版本,如果需要公平的,則需要兩個(gè)參數(shù) */ public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); } /** * 分析同CountDownLatch中的類(lèi)似方法,具體的實(shí)現(xiàn)都是內(nèi)部類(lèi)中的獲取方法,這里是獲取一個(gè)許可 */ public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } /** *功能同上,但是這里不會(huì)檢測(cè)線程是否被中斷 */ public void acquireUninterruptibly() { sync.acquireShared(1); } /** * 嘗試獲取 */ public boolean tryAcquire() { return sync.nonfairTryAcquireShared(1) >= 0; } /** * 在一段時(shí)間內(nèi)一直嘗試獲取許可 */ public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } /** * 當(dāng)前線程釋放一個(gè)許可證 */ public void release() { sync.releaseShared(1); } /** * 可以規(guī)定一個(gè)線程獲得許可證的數(shù)量 */ public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); } public void acquireUninterruptibly(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.acquireShared(permits); } public boolean tryAcquire(int permits) { if (permits < 0) throw new IllegalArgumentException(); return sync.nonfairTryAcquireShared(permits) >= 0; } public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)); } /** * 同樣可以規(guī)定一個(gè)線程釋放許可證的數(shù)量 */ public void release(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); } /** * 當(dāng)前的許可還剩幾個(gè) */ public int availablePermits() { return sync.getPermits(); } /** * 銷(xiāo)毀所有許可 */ public int drainPermits() { return sync.drainPermits(); } protected void reducePermits(int reduction) { if (reduction < 0) throw new IllegalArgumentException(); sync.reducePermits(reduction); } public final boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } public final int getQueueLength() { return sync.getQueueLength(); } protected Collection<Thread> getQueuedThreads() { return sync.getQueuedThreads(); } public String toString() { return super.toString() + "[Permits = " + sync.getPermits() + "]"; }}
上面對(duì)于Semaphore的一些重要內(nèi)部類(lèi)和常用方法進(jìn)行了解釋?zhuān)cCountDownLatch很類(lèi)似,實(shí)現(xiàn)的都是共享的功能,即Semaphore允許得到許可證的線程同時(shí)執(zhí)行,而CountDownLatch允許調(diào)用countDown()方法的線程同時(shí)執(zhí)行。并且都是通過(guò)內(nèi)部類(lèi)實(shí)現(xiàn)的。相信看到這里,你能越來(lái)越看見(jiàn)AQS為什么被稱(chēng)作JUC包的核心。下面就來(lái)介紹一下ReentrantLock
ReentrantLock是如何借助AQS實(shí)現(xiàn)鎖機(jī)制
ReentrantLock是可重入鎖,前面博客中寫(xiě)到synchronized實(shí)現(xiàn)的鎖也是可重入的。不過(guò)synchronized是基于JVM指令實(shí)現(xiàn),而ReentrantLock是使用Java代碼實(shí)現(xiàn)的。ReentrantLock重點(diǎn)就是需要我們手動(dòng)聲明加鎖和釋放鎖,如果手工忘記釋放鎖,很有可能就會(huì)導(dǎo)致死鎖,即資源永遠(yuǎn)都被鎖住,其他線程無(wú)法得到,當(dāng)前線程也釋放不出去。ReentrantLock實(shí)現(xiàn)的是自旋鎖,通過(guò)循環(huán)調(diào)用CAS操作實(shí)現(xiàn)加鎖,避免了線程進(jìn)入內(nèi)核態(tài)的阻塞狀態(tài),所以性能較好。ReentrantLock內(nèi)部同樣實(shí)現(xiàn)了公平鎖和非公平鎖。事實(shí)上Synchronized能做的ReentrantLock都能做,但是反過(guò)來(lái)就不一樣了、
經(jīng)過(guò)前面的源碼分析我們發(fā)現(xiàn)核心的都在當(dāng)前類(lèi)的內(nèi)部類(lèi)里,而當(dāng)前類(lèi)的一些方法不過(guò)是使用的內(nèi)部類(lèi)以及AQS的方法罷了,所以下面我們就來(lái)分析ReentrantLock中的三個(gè)內(nèi)部類(lèi)。
public class ReentrantLock implements Lock, java.io.Serializable { private static final long serialVersionUID = 7373984872572414699L; /** 同步的機(jī)制都是通過(guò)內(nèi)部類(lèi)來(lái)實(shí)現(xiàn)的 */ private final Sync sync; /** * 在ReentrantLock中state表示的是線程重入鎖的次數(shù),當(dāng)state為0時(shí)才能釋放鎖 */ abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; /** * 這個(gè)抽象方法提供給公平鎖和不公平鎖來(lái)單獨(dú)實(shí)現(xiàn),父類(lèi)不實(shí)現(xiàn) */ abstract void lock(); /** * 首先得到當(dāng)前線程,而后獲取state,如果state為0,也就是沒(méi)有線程獲得當(dāng)前鎖,那么就設(shè)置當(dāng)前線程擁有當(dāng)前鎖的獨(dú)占訪問(wèn)權(quán),并且返回true。 * 如果state不為0,那么就看當(dāng)前線程是否是已經(jīng)獲得過(guò)鎖的線程,如果是就讓state+=acquire,acquire一般是1,即表示線程重入并且返回true。 * 上面兩個(gè)條件都不滿足就代表是鎖被其他線程獲取了,當(dāng)前線程獲取不到,所以返回false */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } /** * 先判斷當(dāng)前線程等不等于擁有鎖的線程,不等于就會(huì)拋異常,也就是釋放不了。 * 等于之后就看state-releases是否為0,當(dāng)為0的時(shí)候就代表釋放完全。 * 可以設(shè)置鎖的狀態(tài)為沒(méi)有線程擁有,從而讓鎖能被其他線程競(jìng)爭(zhēng),否則就設(shè)置state,代表線程重入該鎖,并且線程還沒(méi)釋放完全。 */ protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } /* *該方法檢驗(yàn)當(dāng)前線程是否是鎖的獨(dú)占者 */ protected final boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); } /* *該方法是創(chuàng)建一個(gè)條件鎖,本文不做具體分析 */ final ConditionObject newCondition() { return new ConditionObject(); } // Methods relayed from outer class final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); } final int getHoldCount() { return isHeldExclusively() ? getState() : 0; } final boolean isLocked() { return getState() != 0; } /** * 使得該類(lèi)從流中能重構(gòu)實(shí)例,并且會(huì)重置為解鎖狀態(tài) */ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); } } /** * Sync 的不公平版本 */ static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * 將state從0更新到1成功的話就讓當(dāng)前線程獲取鎖,否則就會(huì)嘗試獲得鎖和獲取當(dāng)前節(jié)點(diǎn)的前一節(jié)點(diǎn),并判斷這一個(gè)節(jié)點(diǎn)是否為頭節(jié)點(diǎn),即當(dāng)前線程是不是頭節(jié)點(diǎn)的直接后繼。 * 如果兩個(gè)中有一個(gè)失敗則線程中斷,進(jìn)入阻塞狀態(tài) */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } /** * Sync 的公平版本 */ static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; /** * 嘗試獲得鎖和獲取當(dāng)前節(jié)點(diǎn)的前一節(jié)點(diǎn),并判斷這一個(gè)節(jié)點(diǎn)是否為頭節(jié)點(diǎn),即當(dāng)前線程是不是頭節(jié)點(diǎn)的直接后繼,如果兩個(gè)中有一個(gè)失敗則線程中斷,進(jìn)入阻塞狀態(tài)。 * 也就是一定按照隊(duì)列中線程的順序來(lái)實(shí)現(xiàn) */ final void lock() { acquire(1); } /** * 跟不公平的版本相比其實(shí)是在state為0的時(shí)候檢查當(dāng)前線程是不是在隊(duì)列的頭部節(jié)點(diǎn)的直接后繼,來(lái)達(dá)到公平的概念 */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } } }
ReentrantLock和上面兩個(gè)類(lèi)最不同的莫過(guò)于ReentrantLock使用的是獨(dú)占功能,即每次只能有一個(gè)線程來(lái)獲取ReentrantLock類(lèi)。ReentrantLock類(lèi)下還有很多方法,這里就不一一介紹,但是本質(zhì)都是內(nèi)部類(lèi)中的實(shí)現(xiàn)以及AQS的一些調(diào)用
總結(jié):
AQS只是一個(gè)基礎(chǔ)的框架,里面最核心的就是維護(hù)了state變量和CHL隊(duì)列,而其他的類(lèi)全部都是通過(guò)繼承的方法進(jìn)行擴(kuò)展,雖然沒(méi)有直接說(shuō)源碼,但是通過(guò)上面三個(gè)主要類(lèi)的源碼分析再去看AQS已經(jīng)不是難事。繼承主要改變的就是獲取和釋放的方法,通過(guò)這兩個(gè)方法來(lái)對(duì)state和隊(duì)列進(jìn)行操作達(dá)到我們能夠進(jìn)行的并發(fā)控制的功能,事實(shí)上J.U.C包下的類(lèi)和能夠?qū)崿F(xiàn)的功能遠(yuǎn)不止這三個(gè),后面會(huì)選擇重點(diǎn)的來(lái)介紹。
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
基于Java反射技術(shù)實(shí)現(xiàn)簡(jiǎn)單IOC容器
這篇文章主要介紹了基于Java反射技術(shù)實(shí)現(xiàn)簡(jiǎn)單IOC容器,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-07-07SpringBoot MainApplication類(lèi)文件的位置詳解
這篇文章主要介紹了SpringBoot MainApplication類(lèi)文件的位置詳解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-01-01Java Benchmark 基準(zhǔn)測(cè)試的實(shí)例詳解
這篇文章主要介紹了Java Benchmark 基準(zhǔn)測(cè)試的實(shí)例詳解的相關(guān)資料,這里提供實(shí)例幫助大家學(xué)習(xí)理解這部分內(nèi)容,需要的朋友可以參考下2017-08-08Spring Boot 2.0快速構(gòu)建服務(wù)組件全步驟
這篇文章主要給大家介紹了關(guān)于Spring Boot 2.0快速構(gòu)建服務(wù)組件的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用Spring Boot 2.0具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-04-04maven利用tomcat插件部署遠(yuǎn)程Linux服務(wù)器的步驟詳解
Maven已經(jīng)是Java的項(xiàng)目管理常用方式,下面這篇文章主要給大家介紹了關(guān)于maven利用tomcat插件部署遠(yuǎn)程Linux服務(wù)器的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),需要的朋友可以參考借鑒,下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧。2017-11-11