基于ReentrantLock的實(shí)現(xiàn)原理講解
java.util.concurrent包中的工具實(shí)現(xiàn)核心都是AQS,了解ReentrantLock的實(shí)現(xiàn)原理,需要先分析AQS以及AQS與ReentrantLock的關(guān)系。
這篇文章中分析了ReentrantLock#lock與ReentrantLock#unlock的實(shí)現(xiàn),對(duì)于Condition的實(shí)現(xiàn)分析,另外文章再講,基本上大同小異。
ReentrantLock實(shí)現(xiàn)核心–AQS(AbstractQueuedSynchronizer)
AQS,隊(duì)列同步器,在juc包中的工具類都是依賴于AQS來(lái)實(shí)現(xiàn)同步控制,看一張AQS的結(jié)構(gòu)圖。
同步控制中主要使用到的信息如上圖所示。AQS可以被當(dāng)做是一個(gè)同步監(jiān)視器的實(shí)現(xiàn),并且具有排隊(duì)功能。當(dāng)線程嘗試獲取AQS的鎖時(shí),如果AQS已經(jīng)被別的線程獲取鎖,那么將會(huì)新建一個(gè)Node節(jié)點(diǎn),并且加入到AQS的等待隊(duì)列中,這個(gè)隊(duì)列也由AQS本身自己維護(hù)。當(dāng)鎖被釋放時(shí),喚醒下一個(gè)節(jié)點(diǎn)嘗試獲取鎖。
- 變量exclusiveOwnerThread在互斥模式下,表示當(dāng)前持有鎖的線程。
- 變量tail指向等待獲取AQS的鎖的節(jié)點(diǎn)隊(duì)列的最后一個(gè)
- 變量head指向隊(duì)列中head節(jié)點(diǎn),head節(jié)點(diǎn)信息為空,不表示任何正在等待的線程。
- 變量state表示AQS同步器的狀態(tài),在不同情況下含義可能不太一樣,例如以下幾種
- 在ReentrantLock中,表示AQS的鎖是否已經(jīng)被占用獲取,0:沒(méi)有,>=1:已被獲取,當(dāng)大于1時(shí)表示被同一線程多次重入鎖。
- 在CountDownLatch中,表示計(jì)數(shù)還剩的次數(shù),當(dāng)?shù)竭_(dá)0時(shí),喚醒等待線程。
- 在Semaphore中,表示AQS還可以被獲取鎖的次數(shù),獲取一次就減1,當(dāng)?shù)竭_(dá)0時(shí),嘗試獲取的線程將會(huì)阻塞。
Node結(jié)構(gòu)
Node節(jié)點(diǎn)是AQS管理的等待隊(duì)列的節(jié)點(diǎn)元素,除了head節(jié)點(diǎn)之外,其他一個(gè)節(jié)點(diǎn)就代表一個(gè)正在等待線程的隊(duì)列。Node一般的重要參數(shù)有幾個(gè)。
- prev 前置節(jié)點(diǎn)
- next后置節(jié)點(diǎn)
- thread 代表的線程
- waitStatus節(jié)點(diǎn)的等待狀態(tài)
- 1表示節(jié)點(diǎn)已經(jīng)取消,也就是線程可能已經(jīng)中斷,不需要再等待獲取鎖了,在后續(xù)代碼中會(huì)處理跳過(guò)waitStatus等于1的節(jié)點(diǎn)
- -1表示當(dāng)前節(jié)點(diǎn)的后置節(jié)點(diǎn)代表的線程需要被掛起
- -2表示當(dāng)前線程正在等待的是Condition鎖
ReentrantLock實(shí)現(xiàn)分析
二者關(guān)聯(lián)
ReentrantLock實(shí)現(xiàn)核心是基于AQS,看下面一張圖,分析AQS與ReentrantLock的關(guān)系。
從圖中可以看出,ReentrantLock里面有最終兩個(gè)內(nèi)部類,F(xiàn)airSync和NonfairSync通過(guò)繼承AbstractQueuedSynchronizer的功能,來(lái)實(shí)現(xiàn)兩種同步互斥方案:公平鎖和非公平鎖。
在ReentrantLock中最終lock和unlock操作,都由FairSync和NonfairSync實(shí)際完成。
NonfairSync分析
下面看一個(gè)最簡(jiǎn)單利用ReentrantLock實(shí)現(xiàn)互斥的例子。
ReentrantLock lock = new ReentrantLock(); //嘗試獲取鎖 lock.lock(); //獲得鎖后執(zhí)行邏輯...... //...... //解鎖 lock.unlock();
在ReentrantLock的默認(rèn)無(wú)參構(gòu)造方法中,創(chuàng)建的是非公平鎖
public ReentrantLock() { sync = new NonfairSync(); }
下面分析lock.lock();這句代碼是如何實(shí)現(xiàn)同步互斥的。
NonfairSync#lock
點(diǎn)開(kāi)lock.lock();源碼,可以看到最終實(shí)際調(diào)用的是NonfairSync#lock,這是分析的入口。
NonfairSync#lock源碼如下。
final void lock() { if (compareAndSetState(0, 1))//【step1】 setExclusiveOwnerThread(Thread.currentThread());//【step2】 else acquire(1);//【step3】 }
【step1】上面有提到,NonfairSync繼承自AbstractQueuedSynchronizer,NonfairSync就是一個(gè)AQS,因此在步驟【1】其實(shí)就是利用CAS(一個(gè)原子性的比較并設(shè)置操作)嘗試設(shè)置AQS的state為1。如果設(shè)置成功,表示獲取鎖成功;如果設(shè)置失敗,表示state之前已經(jīng)是>=1,已經(jīng)被別的線程占用了AQS的鎖,所示無(wú)法設(shè)置state為1,稍后會(huì)把線程加入到等待隊(duì)列。
**非公平鎖與公平鎖:**對(duì)于NonfairSync非公平鎖來(lái)說(shuō),線程只要執(zhí)行l(wèi)ock請(qǐng)求,就會(huì)立馬嘗試獲取鎖,不會(huì)管AQS當(dāng)前管理的等待隊(duì)列中有沒(méi)有正在等待的線程,這種操作是不公平的,沒(méi)有先來(lái)后到;而稍后介紹的FairSync公平鎖,則會(huì)在lock請(qǐng)求進(jìn)行時(shí),先判斷AQS管理的等待隊(duì)列中是否已經(jīng)有正在等待的線程,有的話就是不嘗試獲取鎖,直接進(jìn)入等待隊(duì)列,保證了公平性。
這一步需要熟悉的是CAS操作,分析一下compareAndSetState源碼,如下。這一步利用unsafe包的cas操作,unsafe包類似一種java留給開(kāi)發(fā)者的后門,可以用來(lái)直接操作內(nèi)存數(shù)據(jù),并且保證這個(gè)操作的原子性。在下面的代碼中,stateOffset表示state比變量的內(nèi)存偏移地址,用來(lái)尋找state變量?jī)?nèi)存位置。整個(gè)cas操作就是找到內(nèi)存中當(dāng)前的state變量值,并且與expect期待值比較,如果跟期待值相同,那么表示這個(gè)值是可以修改的,此時(shí)就會(huì)對(duì)state值進(jìn)行更新;如果與期待值不一樣,那么將不能進(jìn)行更新操作。unsafe保證了比較與設(shè)置值的過(guò)程是原子性的,在這一步不會(huì)出現(xiàn)線程安全問(wèn)題。
protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
【step2】操作是在設(shè)置state成功之后,表示當(dāng)前線程獲取AQS鎖成功,需要設(shè)置AQS中的變量exclusiveOwnerThread為當(dāng)前持有鎖的線程,做保存記錄。
【step3】當(dāng)嘗試獲取鎖失敗的時(shí)候,就需要進(jìn)行步驟3,執(zhí)行acquire,進(jìn)行再次嘗試或者線程進(jìn)入等待隊(duì)列。
AbstractQueuedSynchronizer#acquire
當(dāng)?shù)谝淮螄L試獲取鎖失敗之后,執(zhí)行【step3】acquire方法,這個(gè)方法可以講一個(gè)嘗試獲取鎖失敗的線程放入AQS管理的等待隊(duì)列進(jìn)行等待,并且將線程掛起。實(shí)現(xiàn)邏輯在AbstractQueuedSynchronizer實(shí)現(xiàn),NonfairSync通過(guò)繼承AbstractQueuedSynchronizer獲得等待隊(duì)列管理的功能。
下面看源碼。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
NonfairSync#tryAcquire–鎖重入實(shí)現(xiàn)
首先,執(zhí)行tryAcquire再次嘗試一次獲取lock,tryAcquire是由子類實(shí)現(xiàn),也就是這里調(diào)用NonfairSync#tryAcquire方法,跟蹤調(diào)用,最終執(zhí)行代碼如下。
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //如果此時(shí)state已經(jīng)變?yōu)?,再次嘗試一次獲取鎖 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { //否則判斷當(dāng)前持有AQS的鎖的線程是不是當(dāng)前請(qǐng)求獲取鎖的線程,是的話就進(jìn)行鎖重入。 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
對(duì)于NonfairSync#tryAcquire的實(shí)現(xiàn),首先是重新嘗試一次獲取鎖,如果還是獲取不到,就嘗試判斷當(dāng)前的是不是屬于重入鎖的情形。
對(duì)于重入鎖的情形,就需要對(duì)state進(jìn)行累加1,意思就是重入一次就對(duì)state加1。這樣子,在解鎖的時(shí)候,每次unlock就對(duì)state減一,等到state的值為0的時(shí)候,才能喚醒下一個(gè)等待線程。
如果獲取成功,返回true;否則返回false,繼續(xù)執(zhí)行下一步acquireQueued(addWaiter(Node.EXCLUSIVE), arg)),添加一個(gè)Node節(jié)點(diǎn)進(jìn)入AQS管理的等待隊(duì)列。
AbstractQueuedSynchronizer#addWaiter–添加Node到等待隊(duì)列
這個(gè)方法屬于隊(duì)列管理,也是由AbstractQueuedSynchronizer默認(rèn)實(shí)現(xiàn),NonfairSync繼承獲得。
查看addWaiter源碼實(shí)現(xiàn)。
private Node addWaiter(Node mode) { //新建一個(gè)Node節(jié)點(diǎn),mode傳入表示當(dāng)前線程之間競(jìng)爭(zhēng)是互斥模式 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure //嘗試添加當(dāng)前新建Node到鏈表隊(duì)列末尾 Node pred = tail; if (pred != null) { node.prev = pred; //利用cas設(shè)置尾指針指向的節(jié)點(diǎn)為當(dāng)前線新建節(jié)點(diǎn) if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //當(dāng)前是空的沒(méi)有任何正在等待的線程N(yùn)ode的時(shí)候,執(zhí)行enq(node),初始化等待隊(duì)列 enq(node); return node; }
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize //新建一個(gè)空的頭節(jié)點(diǎn) if (compareAndSetHead(new Node())) tail = head; } else { //跟之前一樣,利用cas這只當(dāng)前新建節(jié)點(diǎn)為尾節(jié)點(diǎn) node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
以上操作,完成將一個(gè)節(jié)點(diǎn)加入隊(duì)列操作。加入完成之后,返回這個(gè)新加入的節(jié)點(diǎn)Node給acquireQueued方法繼續(xù)執(zhí)行。
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))–鎖競(jìng)爭(zhēng)優(yōu)化
上面既然都完成了等待節(jié)點(diǎn)如隊(duì)列的操作,為什么不直接掛起線程進(jìn)入等待呢?因此這里的設(shè)計(jì)者做了一個(gè)優(yōu)化操作。acquireQueued方法其實(shí)就是為了減少線程掛起、喚醒次數(shù)而作的優(yōu)化操作。
下面看看acquireQueued的代碼實(shí)現(xiàn)。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //獲取當(dāng)前競(jìng)爭(zhēng)鎖的線程N(yùn)ode的前置節(jié)點(diǎn) final Node p = node.predecessor(); //【step1】 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //【step2】 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
【step1】假如前置節(jié)點(diǎn)是head,那么表示當(dāng)前線程是等待隊(duì)列中最大優(yōu)先級(jí)的等待線程,可以繼續(xù)最后的嘗試獲取鎖,因?yàn)楹苡锌赡軙?huì)獲取到鎖,從而避免線程掛起、喚醒,耗費(fèi)資源,這里也算是最終一次嘗試獲取。
【step2】shouldParkAfterFailedAcquire(p, node)是檢查當(dāng)前是否有必要掛起,前面我們說(shuō)過(guò),只有當(dāng)前置節(jié)點(diǎn)的waitStatus是-1的時(shí)候才會(huì)掛起當(dāng)前節(jié)點(diǎn)代表的線程。當(dāng)shouldParkAfterFailedAcquire(p, node)返回true的時(shí)候,就可以執(zhí)行parkAndCheckInterrupt()來(lái)掛起線程。
shouldParkAfterFailedAcquire(p, node)源碼如下。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) //前置節(jié)點(diǎn)是-1,返回true表示線程可掛起 return true; if (ws > 0) { //前置節(jié)點(diǎn)大于0表示前置節(jié)點(diǎn)已經(jīng)取消,那么進(jìn)行跳過(guò)前置節(jié)點(diǎn)的操作,做鏈表的基本刪除節(jié)點(diǎn)操作 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //如果前置節(jié)點(diǎn)還是0,表示前置節(jié)點(diǎn)Node的waitStatus是初始值,需要設(shè)置為-1,然后外層循環(huán)重新執(zhí)行shouldParkAfterFailedAcquire方法,即可掛起當(dāng)前線程。 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
private final boolean parkAndCheckInterrupt() { //阻塞掛起線程,等待喚醒 LockSupport.park(this); //喚醒后,重置中斷標(biāo)記,線程中斷標(biāo)記位為不中斷 return Thread.interrupted(); }
FairSync分析
之前提到
**非公平鎖與公平鎖:**對(duì)于NonfairSync非公平鎖來(lái)說(shuō),線程只要執(zhí)行l(wèi)ock請(qǐng)求,就會(huì)立馬嘗試獲取鎖,不會(huì)管AQS當(dāng)前管理的等待隊(duì)列中有沒(méi)有正在等待的線程,這種操作是不公平的,沒(méi)有先來(lái)后到;而稍后介紹的FairSync公平鎖,則會(huì)在lock請(qǐng)求進(jìn)行時(shí),先判斷AQS管理的等待隊(duì)列中是否已經(jīng)有正在等待的線程,有的話就是不嘗試獲取鎖,直接進(jìn)入等待隊(duì)列,保證了公平性。
所以兩者的實(shí)現(xiàn)區(qū)別在于第一次嘗試lock的動(dòng)作不一樣。
FairSync#lock
final void lock() { acquire(1); }
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
最終差異體現(xiàn)在FairSync#tryAcquire的實(shí)現(xiàn)(第一次嘗試獲取lock)
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //hasQueuedPredecessors判斷隊(duì)列是否還有別的線程在等待鎖,沒(méi)有的話就嘗試獲取lock //如果有別的線程在等待鎖,就不會(huì)嘗試獲取lock;下面如果也不是重入的情況的話就直接進(jìn)入等待隊(duì)列 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
AbstractQueuedSynchronizer#release --AQS解鎖操作
AQS中定義了解鎖操作的模板方法,tryRelease(arg)是不同AQS子類實(shí)現(xiàn),對(duì)state的多樣化操作。例如ReentrantLock中的tryRelease(arg)操作比較明顯的就是對(duì)state減一。
tryRelease(arg)返回結(jié)果表示本次操作后是否需要喚醒下一個(gè)等待節(jié)點(diǎn)。對(duì)于ReentrantLock就是state減一之后是否變?yōu)?了。如果需要喚醒下一個(gè)節(jié)點(diǎn)的線程,那么判斷一下Head有沒(méi)有下一個(gè)要喚醒的節(jié)點(diǎn)線程,有的話就進(jìn)行喚醒操作unparkSuccessor(h);
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
ReentrantLock.Sync#tryRelease–解鎖實(shí)現(xiàn)
看一下ReentrantLock.Sync的tryRelease實(shí)現(xiàn).是如何為state減一 的。。
protected final boolean tryRelease(int releases) { //獲取state減掉realease,對(duì)于ReentrantLock就是默認(rèn)減一 int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { //如果減到0了,那么久釋放鎖 free = true; //設(shè)置持有線程為null setExclusiveOwnerThread(null); } //設(shè)置state為新的 setState(c); return free; }
至于這里設(shè)置state為啥不同cas操作,因?yàn)?/p>
if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException();
所以永遠(yuǎn)只有持有鎖的線程會(huì)做解鎖減一的操作,state設(shè)置是線程安全的。
注意一下
其實(shí)這里還沒(méi)分析Condition的實(shí)現(xiàn)原理,篇幅太長(zhǎng),下次另開(kāi)文章分析。以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
使用@Value為靜態(tài)變量導(dǎo)入并使用導(dǎo)入的靜態(tài)變量進(jìn)行初始化方式
這篇文章主要介紹了使用@Value為靜態(tài)變量導(dǎo)入并使用導(dǎo)入的靜態(tài)變量進(jìn)行初始化方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-02-02聊聊springboot2.2.3升級(jí)到2.4.0單元測(cè)試的區(qū)別
這篇文章主要介紹了springboot 2.2.3 升級(jí)到 2.4.0單元測(cè)試的區(qū)別,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-10-10SpringBoot 在測(cè)試時(shí)如何指定包的掃描范圍
這篇文章主要介紹了SpringBoot 在測(cè)試時(shí)如何指定包的掃描范圍,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11SpringCloud @RefreshScope注解源碼層面深入分析
@RefreshScope注解能幫助我們做局部的參數(shù)刷新,但侵入性較強(qiáng),需要開(kāi)發(fā)階段提前預(yù)知可能的刷新點(diǎn),并且該注解底層是依賴于cglib進(jìn)行代理的,所以不要掉入cglib的坑,出現(xiàn)刷了也不更新情況2023-04-04SpringCloud通過(guò)Feign傳遞List類型參數(shù)方式
這篇文章主要介紹了SpringCloud通過(guò)Feign傳遞List類型參數(shù)方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03Spring Boot基礎(chǔ)入門之基于注解的Mybatis
這篇文章主要給大家介紹了關(guān)于Spring Boot基礎(chǔ)入門之基于注解的Mybatis的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2018-07-07