一文搞懂Java并發(fā)AQS的共享鎖模式
概述
這篇文章深入淺出理解Java并發(fā)AQS的獨(dú)占鎖模式講解了AQS的獨(dú)占鎖實現(xiàn)原理,那么本篇文章在闡述AQS另外一個重要模式,共享鎖模式,那什么是共享鎖呢?
共享鎖可以由多個線程同時獲取, 比較典型的就是讀鎖,讀操作并不會產(chǎn)生副作用,所以可以允許多個線程同時對數(shù)據(jù)進(jìn)行讀操作而不會有線程安全問題,jdk中的很多并發(fā)工具比如ReadWriteLock和CountdownLatch就是依賴AQS的共享鎖實現(xiàn)的。
本文重點(diǎn)講解下AQS是如何實現(xiàn)共享鎖的。
自定義共享鎖例子
首先我們通過AQS實現(xiàn)一個非常最最最輕量簡單的共享鎖例子,幫助大家對共享鎖有一個整體的感知。
@Slf4j
public class ShareLock {
/**
* 共享鎖幫助類
*/
private static class ShareSync extends AbstractQueuedSynchronizer {
private int lockCount;
/**
* 創(chuàng)建共享鎖幫助類,最多有count把共享鎖,超過了則阻塞
*
* @param count 共享鎖數(shù)量
*/
public ShareSync(int count) {
this.lockCount = count;
}
/**
* 嘗試獲取共享鎖
*
* @param arg 每次獲取鎖的數(shù)量
* @return 返回正數(shù),表示后續(xù)其他線程獲取共享鎖可能成功; 返回0,表示后續(xù)其他線程無法獲取共享鎖;返回負(fù)數(shù),表示當(dāng)前線程獲取共享鎖失敗
*/
@Override
protected int tryAcquireShared(int arg) {
// 自旋
for (;;) {
int c = getState();
// 如果持有鎖的數(shù)量大于指定數(shù)量,返回-1,線程進(jìn)入阻塞
if(c >= lockCount) {
return -1;
}
int nextc = c + 1;
// cas設(shè)置成功,返回1,獲取到共享鎖
if (compareAndSetState(c, nextc)) {
return 1;
}
}
}
/**
* 嘗試釋放共享鎖
*
* @param arg 釋放鎖的數(shù)量
* @return 如果釋放后允許喚醒后續(xù)等待結(jié)點(diǎn)返回true,否則返回false
*/
@Override
protected boolean tryReleaseShared(int arg) {
// 自旋操作
for (; ; ) {
int c = getState();
// 如果沒有鎖了
if (c == 0) {
return false;
}
// 否則鎖量-1
int nextc = c - 1;
// cas修改狀態(tài)
if (compareAndSetState(c, nextc)) {
return true;
}
}
}
}
private final ShareSync sync;
public ShareLock(int count) {
this.sync = new ShareSync(count);
}
/**
* 加共享鎖
*/
public void lockShare() {
sync.acquireShared(1);
}
/**
* 釋放共享鎖
*/
public void releaseShare() {
sync.releaseShared(1);
}
}
創(chuàng)建內(nèi)部類共享幫助鎖ShareSync類,繼承自AbstractQueuedSynchronizer類,實現(xiàn)了共享鎖相關(guān)的方法tryAcquireShared()和tryReleaseShared()。
創(chuàng)建ShareLock,提供了lockShare()加鎖和releaseShare()兩個API。
驗證:
public static void main(String[] args) throws InterruptedException {
ShareLock shareLock = new ShareLock(3);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
shareLock.lockShare();
try {
log.info("lock success");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
shareLock.releaseShare();
log.info("release success");
}
}, "thread-" + i).start();
}
Thread.sleep(10000);
}
- 一共創(chuàng)建最多共同有3個線程共享的共享鎖。
- 創(chuàng)建5個線程去競爭共享鎖。
運(yùn)行結(jié)果:

- 運(yùn)行結(jié)果顯示每次最多只有3個
lock success,說明同時只有3個線程共享。 - 只有在釋放共享鎖以后,其他線程才能獲取鎖。
下面對它的實現(xiàn)原理一探究竟。
核心原理機(jī)制
共享模式也是由AQS提供的,首先我們關(guān)注下AQS的數(shù)據(jù)結(jié)構(gòu)。

AQS內(nèi)部維護(hù)了一個volatile int state(代表共享資源)和一個FIFO線程等待隊列(多線程爭用資源被阻塞時會進(jìn)入此隊列)。
AQS作為一個抽象方法,提供了加鎖、和釋放鎖的框架,這里采用的模板方模式,在上面中提到的tryAcquireShared、tryReleaseShared就是和共享模式相關(guān)的模板方法。
| 方法名 | 描述 |
|---|---|
| protected int tryAcquireShared(int arg) | 共享方式。arg為獲取鎖的次數(shù),嘗試獲取資源。負(fù)數(shù)表示失??;0表示成功,但沒有剩余可用資源;正數(shù)表示成功,且有剩余資源。 |
| protected boolean tryReleaseShared(int arg) | 共享方式。arg為釋放鎖的次數(shù),嘗試釋放資源,如果釋放后允許喚醒后續(xù)等待結(jié)點(diǎn)返回True,否則返回False。 |
共享模式的入口方法如下:
| 方法名 | 描述 |
|---|---|
| void acquireShared(int arg) | 共享模式獲取鎖,不響應(yīng)中斷。 |
| void acquireSharedInterruptibly(int arg) | 共享模式獲取鎖,響應(yīng)中斷。 |
| tryAcquireSharedNanos(int arg, long nanosTimeout) | 嘗試在共享模式下獲取鎖,如果中斷則中止,如果超過給定超時則失敗。 |
| boolean releaseShared(int arg) | 共享模式下釋放鎖。 |
源碼解析

上圖是AQS的類結(jié)構(gòu)圖,其中標(biāo)紅部分是組成AQS的重要成員變量。
成員變量
1.state共享變量
AQS中里一個很重要的字段state,表示同步狀態(tài),是由volatile修飾的,用于展示當(dāng)前臨界資源的獲鎖情況。通過getState(),setState(),compareAndSetState()三個方法進(jìn)行維護(hù)。
關(guān)于state的幾個要點(diǎn):
- 使用volatile修飾,保證多線程間的可見性。
- getState()、setState()、compareAndSetState()使用final修飾,限制子類不能對其重寫。
- compareAndSetState()采用樂觀鎖思想的CAS算法,保證原子性操作。
2.CLH隊列(FIFO隊列)
AQS里另一個重要的概念就是CLH隊列,它是一個雙向鏈表隊列,其內(nèi)部由head和tail分別記錄頭結(jié)點(diǎn)和尾結(jié)點(diǎn),隊列的元素類型是Node。
private transient volatile Node head; private transient volatile Node tail;
Node的結(jié)構(gòu)如下:
static final class Node {
//共享模式下的等待標(biāo)記
static final Node SHARED = new Node();
//獨(dú)占模式下的等待標(biāo)記
static final Node EXCLUSIVE = null;
//表示當(dāng)前結(jié)點(diǎn)已取消調(diào)度。當(dāng)timeout或被中斷(響應(yīng)中斷的情況下),會觸發(fā)變更為此狀態(tài),進(jìn)入該狀態(tài)后的結(jié)點(diǎn)將不會再變化。
static final int CANCELLED = 1;
//表示后繼結(jié)點(diǎn)在等待當(dāng)前結(jié)點(diǎn)喚醒。后繼結(jié)點(diǎn)入隊時,會將前繼結(jié)點(diǎn)的狀態(tài)更新為SIGNAL。
static final int SIGNAL = -1;
//表示結(jié)點(diǎn)等待在Condition上,當(dāng)其他線程調(diào)用了Condition的signal()方法后,CONDITION狀態(tài)的結(jié)點(diǎn)將從等待隊列轉(zhuǎn)移到同步隊列中,等待獲取同步鎖。
static final int CONDITION = -2;
//共享模式下,前繼結(jié)點(diǎn)不僅會喚醒其后繼結(jié)點(diǎn),同時也可能會喚醒后繼的后繼結(jié)點(diǎn)。
static final int PROPAGATE = -3;
//狀態(tài),包括上面的四種狀態(tài)值,初始值為0,一般是節(jié)點(diǎn)的初始狀態(tài)
volatile int waitStatus;
//上一個節(jié)點(diǎn)的引用
volatile Node prev;
//下一個節(jié)點(diǎn)的引用
volatile Node next;
//保存在當(dāng)前節(jié)點(diǎn)的線程引用
volatile Thread thread;
//condition隊列的后續(xù)節(jié)點(diǎn)
Node nextWaiter;
}
注意,waitSstatus負(fù)值表示結(jié)點(diǎn)處于有效等待狀態(tài),而正值表示結(jié)點(diǎn)已被取消。所以源碼中很多地方用>0、<0來判斷結(jié)點(diǎn)的狀態(tài)是否正常。
3.exclusiveOwnerThread
AQS通過繼承AbstractOwnableSynchronizer類,擁有的屬性。表示獨(dú)占模式下同步器持有的線程。
共享鎖獲取acquireShared(int)
acquireShared(int)是共享鎖模式下線程獲取共享資源的入口方法,它會獲取指定量的資源,獲取成功則直接返回,獲取失敗則進(jìn)入等待隊列,直到獲取到資源為止,整個過程無法響應(yīng)中斷。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
方法的整體流程如下:
- tryAcquireShared()嘗試獲取資源,需要自定義同步器去實現(xiàn),返回負(fù)值代表獲取失?。?代表獲取成功,但沒有剩余資源;正數(shù)表示獲取成功,還有剩余資源,其他線程還可以去獲取。
- 如果失敗則通過doAcquireShared()進(jìn)入等待隊列,直到獲取到資源為止才返回。
doAcquireShared(int)
此方法用于將當(dāng)前線程加入等待隊列尾部休息,直到其他線程釋放資源喚醒自己,自己成功拿到相應(yīng)量的資源后才返回。
private void doAcquireShared(int arg) {
//封裝線程為共享Node 加入隊列尾部
final Node node = addWaiter(Node.SHARED);
//是否成功標(biāo)志
boolean failed = true;
try {
//等待過程中是否被中斷過的標(biāo)志
boolean interrupted = false;
// 自旋操作
for (;;) {
// 獲取前驅(qū)節(jié)點(diǎn)
final Node p = node.predecessor();
//如果到head的下一個,因為head是拿到資源的線程,此時node被喚醒,很可能是head用完資源來喚醒自己的
if (p == head) {
//嘗試獲取資源
int r = tryAcquireShared(arg);
//成功
if (r >= 0) {
//將head指向自己,還有剩余資源可以再喚醒之后的線程
setHeadAndPropagate(node, r);
p.next = null; // help GC
//如果等待過程中被打斷過,此時將中斷補(bǔ)上。
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//判斷狀態(tài),尋找安全點(diǎn),進(jìn)入waiting狀態(tài),等著被unpark()或interrupt()
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
doAcquireShared方法的實現(xiàn)和獲取獨(dú)占鎖中的acquireQueued方法很類似,但是主要有一點(diǎn)不同,那就是線程在被喚醒后,若成功獲取到了共享鎖,還需要判斷共享鎖是否還能被其他線程獲取,若可以,則繼續(xù)向后喚醒它的下一個節(jié)點(diǎn)對應(yīng)的線程。
setHeadAndPropagate(Node, int)
該方法主要將當(dāng)前節(jié)點(diǎn)設(shè)置為頭節(jié)點(diǎn),同時判斷條件是否符合(比如還有剩余資源),還會去喚醒后繼結(jié)點(diǎn),畢竟是共享模式。
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
//head指向自己
setHead(node);
//如果還有剩余量,繼續(xù)喚醒下一個鄰居線程
if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
// 喚醒操作
doReleaseShared();
}
}
共享釋放releaseShared(int)
releaseShared(int)是共享模式下線程釋放共享資源的入口,它會釋放指定量的資源,如果成功釋放且允許喚醒等待線程,它會喚醒等待隊列里的其他線程來獲取資源。
public final boolean releaseShared(int arg) {
//嘗試釋放資源
if (tryReleaseShared(arg)) {
//喚醒后繼結(jié)點(diǎn)
doReleaseShared();
return true;
}
return false;
}
方法的整體流程如下:
- tryReleaseShared嘗試釋放鎖,這由自定義同步器去實現(xiàn), 返回true表示釋放成功。
- doReleaseShared喚醒后續(xù)隊列中等待的節(jié)點(diǎn),
doReleaseShared()
此方法主要用于喚醒隊列中等待的共享節(jié)點(diǎn)。
private void doReleaseShared() {
// 自旋操作
for (;;) {
// 獲取頭節(jié)點(diǎn)
Node h = head;
if (h != null && h != tail) {
// 獲取節(jié)點(diǎn)的等待狀態(tài)
int ws = h.waitStatus;
// 如果節(jié)點(diǎn)等待狀態(tài)是-1, -1表示有責(zé)任喚醒后續(xù)節(jié)點(diǎn)的狀態(tài)
if (ws == Node.SIGNAL) {
// cas修改當(dāng)前節(jié)點(diǎn)的等待狀態(tài)為0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
//喚醒后續(xù)節(jié)點(diǎn)
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)// head發(fā)生變化
break;
}
}
邏輯是一個死循環(huán),每次循環(huán)中重新讀取一次head,然后保存在局部變量h中,再配合if(h == head) break;,這樣,循環(huán)檢測到head沒有變化時就會退出循環(huán)。注意,head變化一定是因為:acquire thread被喚醒,之后它成功獲取鎖,然后setHead設(shè)置了新head。而且注意,只有通過if(h == head) break;即head不變才能退出循環(huán),不然會執(zhí)行多次循環(huán)。
if (h != null && h != tail)判斷隊列是否至少有兩個node,如果隊列從來沒有初始化過(head為null),或者h(yuǎn)ead就是tail,那么中間邏輯直接不走,直接判斷head是否變化了。
如果隊列中有兩個或以上個node,那么檢查局部變量h的狀態(tài):
- 如果狀態(tài)為SIGNAL,說明h的后繼是需要被通知的。通過對CAS操作結(jié)果取反,將
compareAndSetWaitStatus(h, Node.SIGNAL, 0)和unparkSuccessor(h)綁定在了一起。說明了只要head成功得從SIGNAL修改為0,那么head的后繼的代表線程肯定會被喚醒了。 - 如果狀態(tài)為0,說明h的后繼所代表的線程已經(jīng)被喚醒或即將被喚醒,并且這個中間狀態(tài)即將消失,要么由于acquire thread獲取鎖失敗再次設(shè)置head為SIGNAL并再次阻塞,要么由于acquire thread獲取鎖成功而將自己(head后繼)設(shè)置為新head并且只要head后繼不是隊尾,那么新head肯定為SIGNAL。所以設(shè)置這種中間狀態(tài)的head的status為PROPAGATE,讓其status又變成負(fù)數(shù),這樣可能被被喚醒線程檢測到。
如果狀態(tài)為PROPAGATE,直接判斷head是否變化。
兩個continue保證了進(jìn)入那兩個分支后,只有當(dāng)CAS操作成功后,才可能去執(zhí)行if(h == head) break;,才可能退出循環(huán)。
if(h == head) break;保證了,只要在某個循環(huán)的過程中有線程剛獲取了鎖且設(shè)置了新head,就會再次循環(huán)。目的當(dāng)然是為了再次執(zhí)行unparkSuccessor(h),即喚醒隊列中第一個等待的線程。
以上就是一文搞懂Java并發(fā)AQS的共享鎖模式的詳細(xì)內(nèi)容,更多關(guān)于Java AQS共享鎖模式的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
spring事務(wù)隔離級別、傳播機(jī)制以及簡單配置方式
這篇文章主要介紹了spring事務(wù)隔離級別、傳播機(jī)制以及簡單配置方式,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-01-01
如何使用SpringMVC的消息轉(zhuǎn)換器設(shè)置日期格式
這篇文章主要介紹了如何使用SpringMVC的消息轉(zhuǎn)換器設(shè)置日期格式問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-07-07
java使用Logback配置輸出日志內(nèi)容到文件示例代碼
這篇文章主要介紹了java?Logback輸出日志內(nèi)容到文件,要將logger.info的信息輸出到文件,您可以使用Logback配置,本文通過實例代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下2023-09-09
mybatis?plus框架@TableField注解不生效問題及解決方案
最近遇到一個mybatis plus的問題,@TableField注解不生效,導(dǎo)致查出來的字段反序列化后為空,今天通過本文給大家介紹下mybatis?plus框架的@TableField注解不生效問題總結(jié),需要的朋友可以參考下2022-03-03
mybatis實現(xiàn)遍歷Map的key和value
這篇文章主要介紹了mybatis實現(xiàn)遍歷Map的key和value方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-01-01

