一文搞懂Java并發(fā)AQS的共享鎖模式
概述
這篇文章深入淺出理解Java并發(fā)AQS的獨(dú)占鎖模式講解了AQS的獨(dú)占鎖實(shí)現(xiàn)原理,那么本篇文章在闡述AQS另外一個(gè)重要模式,共享鎖模式,那什么是共享鎖呢?
共享鎖可以由多個(gè)線程同時(shí)獲取, 比較典型的就是讀鎖,讀操作并不會(huì)產(chǎn)生副作用,所以可以允許多個(gè)線程同時(shí)對數(shù)據(jù)進(jìn)行讀操作而不會(huì)有線程安全問題,jdk中的很多并發(fā)工具比如ReadWriteLock和CountdownLatch就是依賴AQS的共享鎖實(shí)現(xiàn)的。
本文重點(diǎn)講解下AQS是如何實(shí)現(xiàn)共享鎖的。
自定義共享鎖例子
首先我們通過AQS實(shí)現(xiàn)一個(gè)非常最最最輕量簡單的共享鎖例子,幫助大家對共享鎖有一個(gè)整體的感知。
@Slf4j public class ShareLock { /** * 共享鎖幫助類 */ private static class ShareSync extends AbstractQueuedSynchronizer { private int lockCount; /** * 創(chuàng)建共享鎖幫助類,最多有count把共享鎖,超過了則阻塞 * * @param count 共享鎖數(shù)量 */ public ShareSync(int count) { this.lockCount = count; } /** * 嘗試獲取共享鎖 * * @param arg 每次獲取鎖的數(shù)量 * @return 返回正數(shù),表示后續(xù)其他線程獲取共享鎖可能成功; 返回0,表示后續(xù)其他線程無法獲取共享鎖;返回負(fù)數(shù),表示當(dāng)前線程獲取共享鎖失敗 */ @Override protected int tryAcquireShared(int arg) { // 自旋 for (;;) { int c = getState(); // 如果持有鎖的數(shù)量大于指定數(shù)量,返回-1,線程進(jìn)入阻塞 if(c >= lockCount) { return -1; } int nextc = c + 1; // cas設(shè)置成功,返回1,獲取到共享鎖 if (compareAndSetState(c, nextc)) { return 1; } } } /** * 嘗試釋放共享鎖 * * @param arg 釋放鎖的數(shù)量 * @return 如果釋放后允許喚醒后續(xù)等待結(jié)點(diǎn)返回true,否則返回false */ @Override protected boolean tryReleaseShared(int arg) { // 自旋操作 for (; ; ) { int c = getState(); // 如果沒有鎖了 if (c == 0) { return false; } // 否則鎖量-1 int nextc = c - 1; // cas修改狀態(tài) if (compareAndSetState(c, nextc)) { return true; } } } } private final ShareSync sync; public ShareLock(int count) { this.sync = new ShareSync(count); } /** * 加共享鎖 */ public void lockShare() { sync.acquireShared(1); } /** * 釋放共享鎖 */ public void releaseShare() { sync.releaseShared(1); } }
創(chuàng)建內(nèi)部類共享幫助鎖ShareSync
類,繼承自AbstractQueuedSynchronizer
類,實(shí)現(xiàn)了共享鎖相關(guān)的方法tryAcquireShared()
和tryReleaseShared()
。
創(chuàng)建ShareLock
,提供了lockShare()
加鎖和releaseShare()
兩個(gè)API。
驗(yàn)證:
public static void main(String[] args) throws InterruptedException { ShareLock shareLock = new ShareLock(3); for (int i = 0; i < 5; i++) { new Thread(() -> { shareLock.lockShare(); try { log.info("lock success"); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { shareLock.releaseShare(); log.info("release success"); } }, "thread-" + i).start(); } Thread.sleep(10000); }
- 一共創(chuàng)建最多共同有3個(gè)線程共享的共享鎖。
- 創(chuàng)建5個(gè)線程去競爭共享鎖。
運(yùn)行結(jié)果:
- 運(yùn)行結(jié)果顯示每次最多只有3個(gè)
lock success
,說明同時(shí)只有3個(gè)線程共享。 - 只有在釋放共享鎖以后,其他線程才能獲取鎖。
下面對它的實(shí)現(xiàn)原理一探究竟。
核心原理機(jī)制
共享模式也是由AQS提供的,首先我們關(guān)注下AQS的數(shù)據(jù)結(jié)構(gòu)。
AQS內(nèi)部維護(hù)了一個(gè)volatile int state(代表共享資源)和一個(gè)FIFO線程等待隊(duì)列(多線程爭用資源被阻塞時(shí)會(huì)進(jìn)入此隊(duì)列)。
AQS作為一個(gè)抽象方法,提供了加鎖、和釋放鎖的框架,這里采用的模板方模式,在上面中提到的tryAcquireShared
、tryReleaseShared
就是和共享模式相關(guān)的模板方法。
方法名 | 描述 |
---|---|
protected int tryAcquireShared(int arg) | 共享方式。arg為獲取鎖的次數(shù),嘗試獲取資源。負(fù)數(shù)表示失??;0表示成功,但沒有剩余可用資源;正數(shù)表示成功,且有剩余資源。 |
protected boolean tryReleaseShared(int arg) | 共享方式。arg為釋放鎖的次數(shù),嘗試釋放資源,如果釋放后允許喚醒后續(xù)等待結(jié)點(diǎn)返回True,否則返回False。 |
共享模式的入口方法如下:
方法名 | 描述 |
---|---|
void acquireShared(int arg) | 共享模式獲取鎖,不響應(yīng)中斷。 |
void acquireSharedInterruptibly(int arg) | 共享模式獲取鎖,響應(yīng)中斷。 |
tryAcquireSharedNanos(int arg, long nanosTimeout) | 嘗試在共享模式下獲取鎖,如果中斷則中止,如果超過給定超時(shí)則失敗。 |
boolean releaseShared(int arg) | 共享模式下釋放鎖。 |
源碼解析
上圖是AQS的類結(jié)構(gòu)圖,其中標(biāo)紅部分是組成AQS的重要成員變量。
成員變量
1.state共享變量
AQS中里一個(gè)很重要的字段state,表示同步狀態(tài),是由volatile修飾的,用于展示當(dāng)前臨界資源的獲鎖情況。通過getState(),setState(),compareAndSetState()三個(gè)方法進(jìn)行維護(hù)。
關(guān)于state的幾個(gè)要點(diǎn):
- 使用volatile修飾,保證多線程間的可見性。
- getState()、setState()、compareAndSetState()使用final修飾,限制子類不能對其重寫。
- compareAndSetState()采用樂觀鎖思想的CAS算法,保證原子性操作。
2.CLH隊(duì)列(FIFO隊(duì)列)
AQS里另一個(gè)重要的概念就是CLH隊(duì)列,它是一個(gè)雙向鏈表隊(duì)列,其內(nèi)部由head和tail分別記錄頭結(jié)點(diǎn)和尾結(jié)點(diǎn),隊(duì)列的元素類型是Node。
private transient volatile Node head; private transient volatile Node tail;
Node的結(jié)構(gòu)如下:
static final class Node { //共享模式下的等待標(biāo)記 static final Node SHARED = new Node(); //獨(dú)占模式下的等待標(biāo)記 static final Node EXCLUSIVE = null; //表示當(dāng)前結(jié)點(diǎn)已取消調(diào)度。當(dāng)timeout或被中斷(響應(yīng)中斷的情況下),會(huì)觸發(fā)變更為此狀態(tài),進(jìn)入該狀態(tài)后的結(jié)點(diǎn)將不會(huì)再變化。 static final int CANCELLED = 1; //表示后繼結(jié)點(diǎn)在等待當(dāng)前結(jié)點(diǎn)喚醒。后繼結(jié)點(diǎn)入隊(duì)時(shí),會(huì)將前繼結(jié)點(diǎn)的狀態(tài)更新為SIGNAL。 static final int SIGNAL = -1; //表示結(jié)點(diǎn)等待在Condition上,當(dāng)其他線程調(diào)用了Condition的signal()方法后,CONDITION狀態(tài)的結(jié)點(diǎn)將從等待隊(duì)列轉(zhuǎn)移到同步隊(duì)列中,等待獲取同步鎖。 static final int CONDITION = -2; //共享模式下,前繼結(jié)點(diǎn)不僅會(huì)喚醒其后繼結(jié)點(diǎn),同時(shí)也可能會(huì)喚醒后繼的后繼結(jié)點(diǎn)。 static final int PROPAGATE = -3; //狀態(tài),包括上面的四種狀態(tài)值,初始值為0,一般是節(jié)點(diǎn)的初始狀態(tài) volatile int waitStatus; //上一個(gè)節(jié)點(diǎn)的引用 volatile Node prev; //下一個(gè)節(jié)點(diǎn)的引用 volatile Node next; //保存在當(dāng)前節(jié)點(diǎn)的線程引用 volatile Thread thread; //condition隊(duì)列的后續(xù)節(jié)點(diǎn) Node nextWaiter; }
注意,waitSstatus負(fù)值表示結(jié)點(diǎn)處于有效等待狀態(tài),而正值表示結(jié)點(diǎn)已被取消。所以源碼中很多地方用>0、<0來判斷結(jié)點(diǎn)的狀態(tài)是否正常。
3.exclusiveOwnerThread
AQS通過繼承AbstractOwnableSynchronizer類,擁有的屬性。表示獨(dú)占模式下同步器持有的線程。
共享鎖獲取acquireShared(int)
acquireShared(int)是共享鎖模式下線程獲取共享資源的入口方法,它會(huì)獲取指定量的資源,獲取成功則直接返回,獲取失敗則進(jìn)入等待隊(duì)列,直到獲取到資源為止,整個(gè)過程無法響應(yīng)中斷。
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
方法的整體流程如下:
- tryAcquireShared()嘗試獲取資源,需要自定義同步器去實(shí)現(xiàn),返回負(fù)值代表獲取失敗;0代表獲取成功,但沒有剩余資源;正數(shù)表示獲取成功,還有剩余資源,其他線程還可以去獲取。
- 如果失敗則通過doAcquireShared()進(jìn)入等待隊(duì)列,直到獲取到資源為止才返回。
doAcquireShared(int)
此方法用于將當(dāng)前線程加入等待隊(duì)列尾部休息,直到其他線程釋放資源喚醒自己,自己成功拿到相應(yīng)量的資源后才返回。
private void doAcquireShared(int arg) { //封裝線程為共享Node 加入隊(duì)列尾部 final Node node = addWaiter(Node.SHARED); //是否成功標(biāo)志 boolean failed = true; try { //等待過程中是否被中斷過的標(biāo)志 boolean interrupted = false; // 自旋操作 for (;;) { // 獲取前驅(qū)節(jié)點(diǎn) final Node p = node.predecessor(); //如果到head的下一個(gè),因?yàn)閔ead是拿到資源的線程,此時(shí)node被喚醒,很可能是head用完資源來喚醒自己的 if (p == head) { //嘗試獲取資源 int r = tryAcquireShared(arg); //成功 if (r >= 0) { //將head指向自己,還有剩余資源可以再喚醒之后的線程 setHeadAndPropagate(node, r); p.next = null; // help GC //如果等待過程中被打斷過,此時(shí)將中斷補(bǔ)上。 if (interrupted) selfInterrupt(); failed = false; return; } } //判斷狀態(tài),尋找安全點(diǎn),進(jìn)入waiting狀態(tài),等著被unpark()或interrupt() if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
doAcquireShared
方法的實(shí)現(xiàn)和獲取獨(dú)占鎖中的acquireQueued
方法很類似,但是主要有一點(diǎn)不同,那就是線程在被喚醒后,若成功獲取到了共享鎖,還需要判斷共享鎖是否還能被其他線程獲取,若可以,則繼續(xù)向后喚醒它的下一個(gè)節(jié)點(diǎn)對應(yīng)的線程。
setHeadAndPropagate(Node, int)
該方法主要將當(dāng)前節(jié)點(diǎn)設(shè)置為頭節(jié)點(diǎn),同時(shí)判斷條件是否符合(比如還有剩余資源),還會(huì)去喚醒后繼結(jié)點(diǎn),畢竟是共享模式。
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; //head指向自己 setHead(node); //如果還有剩余量,繼續(xù)喚醒下一個(gè)鄰居線程 if (propagate > 0 || h == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) // 喚醒操作 doReleaseShared(); } }
共享釋放releaseShared(int)
releaseShared(int)
是共享模式下線程釋放共享資源的入口,它會(huì)釋放指定量的資源,如果成功釋放且允許喚醒等待線程,它會(huì)喚醒等待隊(duì)列里的其他線程來獲取資源。
public final boolean releaseShared(int arg) { //嘗試釋放資源 if (tryReleaseShared(arg)) { //喚醒后繼結(jié)點(diǎn) doReleaseShared(); return true; } return false; }
方法的整體流程如下:
- tryReleaseShared嘗試釋放鎖,這由自定義同步器去實(shí)現(xiàn), 返回true表示釋放成功。
- doReleaseShared喚醒后續(xù)隊(duì)列中等待的節(jié)點(diǎn),
doReleaseShared()
此方法主要用于喚醒隊(duì)列中等待的共享節(jié)點(diǎn)。
private void doReleaseShared() { // 自旋操作 for (;;) { // 獲取頭節(jié)點(diǎn) Node h = head; if (h != null && h != tail) { // 獲取節(jié)點(diǎn)的等待狀態(tài) int ws = h.waitStatus; // 如果節(jié)點(diǎn)等待狀態(tài)是-1, -1表示有責(zé)任喚醒后續(xù)節(jié)點(diǎn)的狀態(tài) if (ws == Node.SIGNAL) { // cas修改當(dāng)前節(jié)點(diǎn)的等待狀態(tài)為0 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; //喚醒后續(xù)節(jié)點(diǎn) unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head)// head發(fā)生變化 break; } }
邏輯是一個(gè)死循環(huán),每次循環(huán)中重新讀取一次head,然后保存在局部變量h中,再配合if(h == head) break;
,這樣,循環(huán)檢測到head沒有變化時(shí)就會(huì)退出循環(huán)。注意,head變化一定是因?yàn)椋篴cquire thread被喚醒,之后它成功獲取鎖,然后setHead設(shè)置了新head。而且注意,只有通過if(h == head) break;
即head不變才能退出循環(huán),不然會(huì)執(zhí)行多次循環(huán)。
if (h != null && h != tail)
判斷隊(duì)列是否至少有兩個(gè)node,如果隊(duì)列從來沒有初始化過(head為null),或者h(yuǎn)ead就是tail,那么中間邏輯直接不走,直接判斷head是否變化了。
如果隊(duì)列中有兩個(gè)或以上個(gè)node,那么檢查局部變量h的狀態(tài):
- 如果狀態(tài)為SIGNAL,說明h的后繼是需要被通知的。通過對CAS操作結(jié)果取反,將
compareAndSetWaitStatus(h, Node.SIGNAL, 0)
和unparkSuccessor(h)
綁定在了一起。說明了只要head成功得從SIGNAL修改為0,那么head的后繼的代表線程肯定會(huì)被喚醒了。 - 如果狀態(tài)為0,說明h的后繼所代表的線程已經(jīng)被喚醒或即將被喚醒,并且這個(gè)中間狀態(tài)即將消失,要么由于acquire thread獲取鎖失敗再次設(shè)置head為SIGNAL并再次阻塞,要么由于acquire thread獲取鎖成功而將自己(head后繼)設(shè)置為新head并且只要head后繼不是隊(duì)尾,那么新head肯定為SIGNAL。所以設(shè)置這種中間狀態(tài)的head的status為PROPAGATE,讓其status又變成負(fù)數(shù),這樣可能被被喚醒線程檢測到。
如果狀態(tài)為PROPAGATE,直接判斷head是否變化。
兩個(gè)continue保證了進(jìn)入那兩個(gè)分支后,只有當(dāng)CAS操作成功后,才可能去執(zhí)行if(h == head) break;,才可能退出循環(huán)。
if(h == head) break;保證了,只要在某個(gè)循環(huán)的過程中有線程剛獲取了鎖且設(shè)置了新head,就會(huì)再次循環(huán)。目的當(dāng)然是為了再次執(zhí)行unparkSuccessor(h),即喚醒隊(duì)列中第一個(gè)等待的線程。
以上就是一文搞懂Java并發(fā)AQS的共享鎖模式的詳細(xì)內(nèi)容,更多關(guān)于Java AQS共享鎖模式的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
spring事務(wù)隔離級別、傳播機(jī)制以及簡單配置方式
這篇文章主要介紹了spring事務(wù)隔離級別、傳播機(jī)制以及簡單配置方式,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-01-01如何使用SpringMVC的消息轉(zhuǎn)換器設(shè)置日期格式
這篇文章主要介紹了如何使用SpringMVC的消息轉(zhuǎn)換器設(shè)置日期格式問題,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-07-07java使用Logback配置輸出日志內(nèi)容到文件示例代碼
這篇文章主要介紹了java?Logback輸出日志內(nèi)容到文件,要將logger.info的信息輸出到文件,您可以使用Logback配置,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下2023-09-09java實(shí)現(xiàn)斗地主發(fā)牌系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)斗地主發(fā)牌系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2020-04-04mybatis?plus框架@TableField注解不生效問題及解決方案
最近遇到一個(gè)mybatis plus的問題,@TableField注解不生效,導(dǎo)致查出來的字段反序列化后為空,今天通過本文給大家介紹下mybatis?plus框架的@TableField注解不生效問題總結(jié),需要的朋友可以參考下2022-03-03mybatis實(shí)現(xiàn)遍歷Map的key和value
這篇文章主要介紹了mybatis實(shí)現(xiàn)遍歷Map的key和value方式,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-01-01