Java多線程之并發(fā)編程的核心AQS詳解
前言:Java并發(fā)包很多的同步工具類底層都是基于AQS來實現(xiàn)的,比如我們工作中經(jīng)常用的Lock工具ReentrantLock、柵欄CountDownLatch、信號量Semaphore等。如果你想深入研究Java并發(fā)編程的話,那么AQS一定是繞不開的一塊知識點,而且關于AQS的知識點也是面試中經(jīng)常考察的內(nèi)容,所以深入學習AQS很有必要。
學習AQS之前,我們有必要了解一下AQS底層中大量使用的CAS:Java多線程10:并發(fā)編程的基石CAS機制
一、AQS簡介
1.1、AOS概念
AQS,全名AbstractQueuedSynchronizer,是一個抽象類的隊列式同步器,它的內(nèi)部通過維護一個狀態(tài)volatile int state(共享資源),一個FIFO線程等待隊列來實現(xiàn)同步功能。AQS類是整個 JUC包的核心類,JUC 中的ReentrantLock、ReentrantReadWriteLock、CountDownLatch、Semaphore和LimitLatch等同步工具都是基于AQS實現(xiàn)的。
state用關鍵字volatile修飾,代表著該共享資源的狀態(tài)一更改就能被所有線程可見,而AQS的加鎖方式本質上就是多個線程在競爭state,當state為0時代表線程可以競爭鎖,不為0時代表當前對象鎖已經(jīng)被占有,其他線程來加鎖時則會失敗,加鎖失敗的線程會被放入一個FIFO的等待隊列中,這些線程會被UNSAFE.park()操作掛起,等待其他獲取鎖的線程釋放鎖才能夠被喚醒。
而這個等待隊列其實就相當于一個CLH隊列,用一張原理圖來表示大致如下:

1.2、AQS的核心思想
如果被請求的共享資源空閑,則將當前請求資源的線程設置為有效的工作線程,并將共享資源設置為鎖定狀態(tài),如果被請求的共享資源被占用,那么就需要一套線程阻塞等待以及被喚醒時鎖分配的機制,這個機制AQS是用CLH隊列鎖實現(xiàn)的,即將暫時獲取不到鎖的線程加入到隊列中。CLH(Craig,Landin,and Hagersten)隊列是一個虛擬的雙向隊列,虛擬的雙向隊列即不存在隊列實例,僅存在節(jié)點之間的關聯(lián)關系。
AQS是將每一條請求共享資源的線程封裝成一個CLH鎖隊列的一個結點(Node),來實現(xiàn)鎖的分配。
用大白話來說,AQS就是基于CLH隊列,用volatile修飾共享變量state,線程通過CAS去改變狀態(tài)符,成功則獲取鎖成功,失敗則進入等待隊列,等待被喚醒。

1.3、AQS是自旋鎖
AQS是自旋鎖:在等待喚醒的時候,經(jīng)常會使用自旋(while(!cas()))的方式,不停地嘗試獲取鎖,直到被其他線程獲取成功
實現(xiàn)了AQS的鎖有:自旋鎖、互斥鎖、讀鎖寫鎖、條件產(chǎn)量、信號量、柵欄都是AQS的衍生物
1.4、AQS支持兩種資源分享的方式
Exclusive(獨占,只有一個線程能執(zhí)行,如ReentrantLock)和Share(共享,多個線程可同時執(zhí)行,如Semaphore/CountDownLatch)。
自定義的同步器繼承AQS后,只需要實現(xiàn)共享資源state的獲取和釋放方式即可,其他如線程隊列的維護(如獲取資源失敗入隊/喚醒出隊等)等操作,AQS在底層已經(jīng)實現(xiàn)了。
線程的阻塞和喚醒
在JDK1.5之前,除了內(nèi)置的監(jiān)視器機制外,沒有其它方法可以安全且便捷得阻塞和喚醒當前線程。
JDK1.5以后,java.util.concurrent.locks包提供了LockSupport類來作為線程阻塞和喚醒的工具。
二、AQS原理
2.1、同步狀態(tài)的管理
同步狀態(tài),其實就是資源。AQS使用單個int(32位)來保存同步狀態(tài),并暴露出getState、setState以及compareAndSetState操作來讀取和更新這個狀態(tài)。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
//省略展示其它代碼...
}
這幾個方法都是Final修飾的,說明子類中無法重寫它們。我們可以通過修改State字段表示的同步狀態(tài)來實現(xiàn)多線程的獨占模式和共享模式(加鎖過程)。

2.2、等待隊列
等待隊列,是AQS框架的核心,整個框架的關鍵其實就是如何在并發(fā)狀態(tài)下管理被阻塞的線程。
等待隊列是嚴格的FIFO隊列,是Craig,Landin和Hagersten鎖(CLH鎖)的一種變種,采用雙向循環(huán)鏈表實現(xiàn),因此也叫CLH隊列。
2.3、CLH隊列中的結點
AQS內(nèi)部還定義了一個靜態(tài)類Node,表示CLH隊列的每一個結點,該結點的作用是對每一個等待獲取資源做了封裝,包含了需要同步的線程本身、線程等待狀態(tài)....
LH隊列中的結點是對線程的包裝,結點一共有兩種類型:獨占(EXCLUSIVE)和共享(SHARED)。
每種類型的結點都有一些狀態(tài),其中獨占結點使用其中的CANCELLED(1)、SIGNAL(-1)、CONDITION(-2),共享結點使用其中的CANCELLED(1)、SIGNAL(-1)、PROPAGATE(-3)。
| 結點狀態(tài) | 值 | 描述 |
|---|---|---|
| CANCELLED | 1 | 取消。表示后驅結點被中斷或超時,需要移出隊列 |
| SIGNAL | -1 | 發(fā)信號。表示后驅結點被阻塞了(當前結點在入隊后、阻塞前,應確保將其prev結點類型改為SIGNAL,以便prev結點取消或釋放時將當前結點喚醒。) |
| CONDITION | -2 | Condition專用。表示當前結點在Condition隊列中,因為等待某個條件而被阻塞了 |
| PROPAGATE | -3 | 傳播。適用于共享模式(比如連續(xù)的讀操作結點可以依次進入臨界區(qū),設為PROPAGATE有助于實現(xiàn)這種迭代操作。) |
| INITIAL | 0 | 默認。新結點會處于這種狀態(tài) |
2.4、隊列定義
對于CLH隊列,當線程請求資源時,如果請求不到,會將線程包裝成結點,將其掛載在隊列尾部。
下面結合代碼一起看下節(jié)點進入隊列的過程。
private Node enq(final Node node) {
for (;;) {
Node t = tail; // 1
if (t == null) { // Must initialize
if (compareAndSetHead(new Node())) // 2
tail = head;
} else {
node.prev = t; // 3
if (compareAndSetTail(t, node)) { // 4
t.next = node;
return t;
}
}
}
}
2.5、AQS底層的CAS機制
在研究JDK中AQS時,會發(fā)現(xiàn)這個類很多地方都使用了CAS操作,在并發(fā)實現(xiàn)中CAS操作必須具備原子性,而且是硬件級別的原子性,Java被隔離在硬件之上,明顯力不從心,這時為了能直接操作操作系統(tǒng)層面,肯定要通過用C++編寫的native本地方法來擴展實現(xiàn)。JDK提供了一個類來滿足CAS的要求,sun.misc.Unsafe,從名字上可以大概知道它用于執(zhí)行低級別、不安全的操作,AQS就是使用此類完成硬件級別的原子操作。UnSafe通過JNI調(diào)用本地C++代碼,C++代碼調(diào)用CPU硬件指令集。
Unsafe是一個很強大的類,它可以分配內(nèi)存、釋放內(nèi)存、可以定位對象某字段的位置、可以修改對象的字段值、可以使線程掛起、使線程恢復、可進行硬件級別原子的CAS操作等等。
2.6、通過ReentrantLock理解AQS
ReentrantLock中公平鎖和非公平鎖在底層是相同的,這里以非公平鎖為例進行分析。
在非公平鎖中,有一段這樣的代碼:
// java.util.concurrent.locks.ReentrantLock
static final class NonfairSync extends Sync {
...
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
...
}
看一下這個Acquire是怎么寫的:
// java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
再看一下tryAcquire方法:
// java.util.concurrent.locks.AbstractQueuedSynchronizer
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
可以看出,這里只是AQS的簡單實現(xiàn),具體獲取鎖的實現(xiàn)方法是由各自的公平鎖和非公平鎖單獨實現(xiàn)的(以ReentrantLock為例)。如果該方法返回了True,則說明當前線程獲取鎖成功,就不用往后執(zhí)行了;如果獲取失敗,就需要加入到等待隊列中。
三、AQS方法
AQS代碼內(nèi)部提供了一系列操作鎖和線程隊列的方法,主要操作鎖的方法包含以下幾個:
compareAndSetState():利用CAS的操作來設置state的值
tryAcquire(int):獨占方式獲取鎖。成功則返回true,失敗則返回false。
tryRelease(int):獨占方式釋放鎖。成功則返回true,失敗則返回false。
tryReleaseShared(int):共享方式釋放鎖。如果釋放后允許喚醒后續(xù)等待結點返回true,否則返回false。
像ReentrantLock就是實現(xiàn)了自定義的tryAcquire-tryRelease,從而操作state的值來實現(xiàn)同步效果。
3.1、用戶需要自己重寫的方法
上面介紹到 AQS 已經(jīng)幫用戶解決了同步器定義過程中的大部分問題,只將下面兩個問題丟給用戶解決:
- 什么是資源
- 什么情況下資源是可以被訪問的
具體的,AQS 是通過暴露以下 API 來讓用戶解決上面的問題的。
| 鉤子方法 | 描述 |
|---|---|
| tryAcquire | 獨占方式。嘗試獲取資源,成功則返回true,失敗則返回false。 |
| tryRelease | 獨占方式。嘗試釋放資源,成功則返回true,失敗則返回false。 |
| tryAcquireShared | 共享方式。嘗試獲取資源。負數(shù)表示失?。?表示成功,但沒有剩余可用資源;正數(shù)表示成功,且有剩余資源。 |
| tryReleaseShared | 共享方式。嘗試釋放資源,如果釋放后允許喚醒后續(xù)等待結點返回true,否則返回false。 |
| isHeldExclusively | 該線程是否正在獨占資源。只有用到condition才需要去實現(xiàn)它。 |
如果你需要實現(xiàn)一個自己的同步器,一般情況下只要繼承 AQS ,并重寫 AQS 中的這個幾個方法就行了。至于具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經(jīng)在頂層實現(xiàn)好了。要不怎么說Doug Lea貼心呢。
需要注意的是:如果你沒在子類中重寫這幾個方法就直接調(diào)用了,會直接拋出異常。所以,在你調(diào)用這些方法之前必須重寫他們。不使用的話可以不重寫。
3.2、AQS 提供的一系列模板方法
查看 AQS 的源碼我們就可以發(fā)現(xiàn)這個類提供了很多方法,看起來讓人“眼花繚亂”的。但是最主要的兩類方法就是獲取資源的方法和釋放資源的方法。因此我們抓住主要矛盾就行了:
- public final void acquire(int arg) // 獨占模式的獲取資源
- public final boolean release(int arg) // 獨占模式的釋放資源
- public final void acquireShared(int arg) // 共享模式的獲取資源
- public final boolean releaseShared(int arg) // 共享模式的釋放資源
3.3、acquire(int)方法
該方法以獨占方式獲取資源,如果獲取到資源,線程繼續(xù)往下執(zhí)行,否則進入等待隊列,直到獲取到資源為止,且整個過程忽略中斷的影響。該方法是獨占模式下線程獲取共享資源的頂層入口。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
下面分析下這個acquire方法的具體執(zhí)行流程:
step1:首先這個方法調(diào)用了用戶自己實現(xiàn)的方法tryAcquire方法嘗試獲取資源,如果這個方法返回true,也就是表示獲取資源成功,那么整個acquire方法就執(zhí)行結束了,線程繼續(xù)往下執(zhí)行;
step2:如果tryAcquir方法返回false,也就表示嘗試獲取資源失敗。這時acquire方法會先調(diào)用addWaiter方法將當前線程封裝成Node類并加入一個FIFO的雙向隊列的尾部。
step3:再看acquireQueued這個關鍵方法。首先要注意的是這個方法中哪個無條件的for循環(huán),這個for循環(huán)說明acquireQueued方法一直在自旋嘗試獲取資源。進入for循環(huán)后,首先判斷了當前節(jié)點的前繼節(jié)點是不是頭節(jié)點,如果是的話就再次嘗試獲取資源,獲取資源成功的話就直接返回false(表示未被中斷過)
假如還是沒有獲取資源成功,判斷是否需要讓當前節(jié)點進入waiting狀態(tài),經(jīng)過 shouldParkAfterFailedAcquire這個方法判斷,如果需要讓線程進入waiting狀態(tài)的話,就調(diào)用LockSupport的park方法讓線程進入waiting狀態(tài)。進入waiting狀態(tài)后,這線程等待被interupt或者unpark(在release操作中會進行這樣的操作,可以參見后面的代碼)。這個線程被喚醒后繼續(xù)執(zhí)行for循環(huán)來嘗試獲取資源。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//首先判斷了當前節(jié)點的前繼節(jié)點是不是頭節(jié)點,如果是的話就再次嘗試獲取資源,
//獲取資源成功的話就直接返回false(表示未被中斷過)
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//判斷是否需要讓當前節(jié)點進入waiting狀態(tài)
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 如果在整個等待過程中被中斷過,則返回true,否則返回false。
// 如果線程在等待過程中被中斷過,它是不響應的。只是獲取資源后才再進行自我中斷selfInterrupt(),將中斷補上。
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
以上就是acquire方法的簡單分析。
單獨看這個方法的話可能會不太清晰,結合ReentrantLock、ReentrantReadWriteLock、CountDownLatch、Semaphore和LimitLatch等同步工具看這個代碼的話就會好理解很多。
3.4、release(int)方法
release(int)方法是獨占模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,如果徹底釋放了(即state=0),它會喚醒等待隊列里的其他線程來獲取資源。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//上面已經(jīng)講過了,需要用戶自定義實現(xiàn)
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
與acquire()方法中的tryAcquire()類似,tryRelease()方法也是需要獨占模式的自定義同步器去實現(xiàn)的。正常來說,tryRelease()都會成功的,因為這是獨占模式,該線程來釋放資源,那么它肯定已經(jīng)拿到獨占資源了,直接減掉相應量的資源即可(state-=arg),也不需要考慮線程安全的問題。
但要注意它的返回值,上面已經(jīng)提到了,release()是根據(jù)tryRelease()的返回值來判斷該線程是否已經(jīng)完成釋放掉資源了!所以自義定同步器在實現(xiàn)時,如果已經(jīng)徹底釋放資源(state=0),要返回true,否則返回false。
unparkSuccessor(Node)方法用于喚醒等待隊列中下一個線程。這里要注意的是,下一個線程并不一定是當前節(jié)點的next節(jié)點,而是下一個可以用來喚醒的線程,如果這個節(jié)點存在,調(diào)用unpark()方法喚醒。
總之,release()是獨占模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,如果徹底釋放了(即state=0),它會喚醒等待隊列里的其他線程來獲取資源。(需要注意的是隊列中被喚醒的線程不一定能立馬獲取資源,因為資源在釋放后可能立馬被其他線程(不是在隊列中等待的線程)搶掉了)
3.5、acquireShared(int)方法
acquireShared(int)方法是共享模式下線程獲取共享資源的頂層入口。它會獲取指定量的資源,獲取成功則直接返回,獲取失敗則進入等待隊列,直到獲取到資源為止,整個過程忽略中斷。
public final void acquireShared(int arg) {
//tryAcquireShared需要用戶自定義實現(xiàn)
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
可以發(fā)現(xiàn),這個方法的關鍵實現(xiàn)其實是獲取資源失敗后,怎么管理線程。也就是doAcquireShared的邏輯。
//不響應中斷
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
可以看出,doAcquireShared的邏輯和acquireQueued的邏輯差不多。將當前線程加入等待隊列尾部休息,直到其他線程釋放資源喚醒自己,自己成功拿到相應量的資源后才返回。
簡單總結下acquireShared的流程:
step1:tryAcquireShared()嘗試獲取資源,成功則直接返回;
step2:失敗則通過doAcquireShared()進入等待隊列park(),直到被unpark()/interrupt()并成功獲取到資源才返回。整個等待過程也是忽略中斷的。
3.6、releaseShared(int)方法
releaseShared(int)方法是共享模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,如果成功釋放且允許喚醒等待線程,它會喚醒等待隊列里的其他線程來獲取資源。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
釋放掉資源后,喚醒后繼。跟獨占模式下的release()相似,但有一點稍微需要注意:獨占模式下的tryRelease()在完全釋放掉資源(state=0)后,才會返回true去喚醒其他線程,這主要是基于獨占下可重入的考量;而共享模式下的releaseShared()則沒有這種要求,共享模式實質就是控制一定量的線程并發(fā)執(zhí)行,那么擁有資源的線程在釋放掉部分資源時就可以喚醒后繼等待結點。
參考鏈接:
從ReentrantLock的實現(xiàn)看AQS的原理及應用
總結
本篇文章就到這里了,希望能夠給你帶來幫助,也希望您能夠多多關注腳本之家的更多內(nèi)容!
相關文章
Java xml出現(xiàn)錯誤 javax.xml.transform.TransformerException: java.
這篇文章主要介紹了Java xml出現(xiàn)錯誤 javax.xml.transform.TransformerException: java.lang.NullPointerException的相關資料,需要的朋友可以參考下2016-11-11
Java創(chuàng)建對象之顯示創(chuàng)建與隱式創(chuàng)建
在本篇文章中,小編會帶大家學習面向對象中關于對象的創(chuàng)建之顯示創(chuàng)建和隱式創(chuàng)建,其實類和對象作為面向對象中最基本的,也是最重要的,需要的朋友可以參考下2023-05-05
JavaWeb頁面中防止點擊Backspace網(wǎng)頁后退情況
當鍵盤敲下后退鍵(Backspace)后怎么防止網(wǎng)頁后退情況呢?今天小編通過本文給大家詳細介紹下,感興趣的朋友一起看看吧2016-11-11
SpringBoot整合Mybatis自定義攔截器不起作用的處理方案
這篇文章主要介紹了SpringBoot整合Mybatis自定義攔截器不起作用的處理方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-09-09

