Java AQS(AbstractQueuedSynchronizer)源碼解析
AbstractQueuedSynchronizer被稱為隊列同步器,簡稱為大家熟知的AQS,這個類可以稱作concurrent包的基礎,該類提供了同步的基本功能。該類包括如下幾個核心要素:
- AQS內(nèi)部維護一個volatile修飾的state變量,state用于標記鎖的狀態(tài);
- AQS通過內(nèi)部類Node記錄當前是哪個線程持有鎖;
- AQS通過LockSupport的park和unPark方法來阻塞和喚醒線程;
- AQS通過node來維護一個隊列,用于保存所有阻塞的線程。
下面通過剖析源碼來看看AQS是如何工作的。
AQS概要
AQS通過內(nèi)部類Node記錄當前是哪個線程持有鎖,Node中有一個前驅(qū)節(jié)點和一個后繼節(jié)點,形成一個雙向鏈表,這個鏈表是一種CLH隊列,其中waitStatus表示當前線程的狀態(tài),其可能的取值包括以下幾種:
- SIGNAL(-1),表示后繼線程已經(jīng)或者即將被阻塞,當前線程釋放鎖或者獲取鎖失敗后需要喚醒后繼線程;
- CANCELLED(1),表示當前線程因為超時或者中斷被取消,這個狀態(tài)不可以被修改;
- CONDITION(-2),當前線程為條件等待,其狀態(tài)設置0之后才能去競爭鎖;
- PROPAGATE(-3),表示共享鎖釋放之后需要傳遞給后繼節(jié)點,只有頭結點的才會有該狀態(tài);
- 0,該狀態(tài)為初始值,不屬于上面任意一種狀態(tài)。
Node對象中還有一個nextWaiter變量,指向下一個條件等待節(jié)點,相當于在CLH隊列的基礎上維護了一個簡單的單鏈表來關聯(lián)條件等待的節(jié)點。
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
...
構造方法
...
}Node提供了兩種入隊列的方法,即enq和addWaiter,enq方法如下所示,當尾節(jié)點tail為null時,表明阻塞隊列還沒有被初始化,通過CAS操作來設置頭結點,頭結點為new Node(),實際上頭結點中沒有阻塞的線程,算得上是一個空的節(jié)點(注意空節(jié)點和null是不一樣的),然后進行tail=head操作,這也說明當head=tail的時候,隊列中實際上是不存在阻塞線程的,然后將需要入隊列的node放入隊列尾部,將tail指向node。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
//如果tail為空,說明CLH隊列沒有被初始化,
if (t == null) {
//初始化CLH隊列,將head和tail指向一個new Node(),
//此時雖然CLH有一個節(jié)點,但是并沒有真正意義的阻塞線程
if (compareAndSetHead(new Node()))
tail = head;
} else {
//將node放入隊列尾部,并通過cas將tail指向node
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}addWaiter通常表示添加一個條件等待的節(jié)點入隊列,該方法首先嘗試通過CAS操作快速入隊列,如果失敗則通過調(diào)用enq來入隊列。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
//嘗試快速入隊列
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//快速入隊列失敗則采用enq方入隊列
enq(node);
return node;
}Node還提供了喚醒后繼節(jié)點線程的功能,主要是通過LockSupport來實現(xiàn)的,源碼如下所示,
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}排他獲取鎖
不支持中斷的獲取鎖\color{green}{不支持中斷的獲取鎖}不支持中斷的獲取鎖
//不可中斷的獲取鎖
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//對中斷做補償,中斷當前線程
selfInterrupt();
}acquire方法首先會調(diào)用tryAcquire方法嘗試獲取鎖,如果獲取鎖失敗,首先通過addWaiter將當前線程放入CLH隊列中,然后通過acquireQueued方法獲取鎖,acquireQueued方法源碼如下所示:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
//記錄中斷狀態(tài)
boolean interrupted = false;
//自旋式的獲取鎖
for (;;) {
final Node p = node.predecessor();
//當前線程為CLH中的第一個阻塞線程才會嘗試去獲取鎖
if (p == head && tryAcquire(arg)) {
//獲取成功則更新head
setHead(node);
p.next = null; // help GC
failed = false;
//返回中斷狀態(tài)
return interrupted;
}
//判斷中斷信息
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//如果獲取鎖失敗,則取消獲取鎖的操作
if (failed)
cancelAcquire(node);
}
}acquireQueued方法是無中斷的獲取鎖,該方法有一個布爾類型的返回值,該值不是表示是否成功獲取鎖,而是標示當前線程的中斷狀態(tài),因為acquireQueued方法是無法響應中斷的,需要對中斷進行補償,這個補償體現(xiàn)在acquire方法中。
//模板方法tryAcquire需要子類進行具體實現(xiàn)
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}支持中斷的獲取鎖
//可中斷的獲取鎖
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
acquireInterruptibly方法獲取鎖的過程中能夠響應中斷,主要體現(xiàn)在獲取鎖之前會判斷一下當前線程的中斷中斷狀態(tài),若中斷則拋出InterruptedException,然后通過tryAcquire獲取鎖,獲取成功直接返回,獲取失敗則通過doAcquireInterruptibly獲取鎖,該方法和acquireQueued最大的區(qū)別就是在判斷parkAndCheckInterrupt后,acquireQueued僅僅記錄中斷狀態(tài),parkAndCheckInterrupt則會拋出異常。
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//拋出異常,響應中斷
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}支持超時時間的獲取鎖功能
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
//響應中斷
if (Thread.interrupted())
throw new InterruptedException();
//首先通過tryAcquire快速獲取鎖,若失敗則調(diào)用doAcquireNanos方法
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}從方法tryAcquireNanos的源碼可以看出,該方法也是響應中斷的,該方法首先調(diào)用模板方法tryAcquire快速的獲取鎖,如果失敗則通過doAcquireNanos獲取鎖,doAcquireNanos中支持超時機制,其源碼如下所示:
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
//判斷如果超時則直接返回false,代表獲取鎖失敗
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}doAcquireNanos方法與acquireQueued方法的區(qū)別是每次循環(huán)獲取鎖過程中都會計算deadline和當前時間的差值,如果這個差值小于0,則表示獲取鎖的操作已經(jīng)超時,則直接返回false表示獲取鎖失敗。
共享鎖獲取
AQS中不僅支持排他鎖的獲取,即acquire、acquireInterruptibly和tryAcquireNanos,還提供了共享鎖的獲取操作方法,包括acquireShared、acquireSharedInterruptibly和tryAcquireSharedNanos,這三個方法源碼如下所示:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}共享鎖的獲取和排他鎖的獲取方法類似,共享鎖調(diào)用了不同的模板方法tryAcquireShared,這里介紹一下doAcquireShared方法,其他方法變化的套路和共享鎖的使用套路一樣,doAcquireShared方法源碼如下所示:
private void doAcquireShared(int arg) {
//當前線程入隊列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
//自旋式的獲取鎖
for (;;) {
final Node p = node.predecessor();
//只有隊列中的第一個阻塞線程才能獲取鎖
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
//獲取鎖成功,補償中斷
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//通過interrupted記錄中斷信息
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}doAcquireShared方法沒有返回值,與acquireQueued不同的是:
- doAcquireShared沒有返回值,該方法的中斷補償是在方法內(nèi)完成的,獲取鎖成功之后,會判斷中斷信息interrupted的狀態(tài),如果為true則調(diào)用selfInterrupt()方法中斷當前線程;
- 獲取鎖成功之后不是簡單的設置head,而是通過setHeadAndPropagate方法來設置頭結點和并且判斷后繼節(jié)點的信息,對后繼節(jié)點中的線程進行喚醒操作等,setHeadAndPropagate方法源碼如下所示:
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
//設置新的頭結點
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
//如果后繼節(jié)點為空或者為SHARED類型的節(jié)點,執(zhí)行doReleaseShared方法
if (s == null || s.isShared())
doReleaseShared();
}
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
//狀態(tài)為SIGNAL,則喚醒后繼節(jié)點中的線程
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
//若狀態(tài)為0,則設置狀態(tài)為PROPAGATE
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}鎖的釋放
鎖的釋放也分為釋放排他鎖和釋放共享鎖,分別為release方法和releaseShared方法,源碼如下所示,
//釋放排他鎖
public final boolean release(int arg) {
//釋放鎖,然后喚醒后繼節(jié)點的線程
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//釋放共享鎖
public final boolean releaseShared(int arg) {
//釋放鎖,然后調(diào)用doReleaseShared方法
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}release方法和releaseShared方法分別調(diào)用模板方法tryRelease和tryReleaseShared來釋放鎖,release方法中直接通過調(diào)用unparkSuccessor喚醒后繼線程,而releaseShared的喚醒操作在doReleaseShared方法中進行。
取消獲取鎖
當獲取鎖失敗時,需要進行一些狀態(tài)清理和變化,cancelAcquire方法就是用來實現(xiàn)這些功能的,其源碼如下所示,
private void cancelAcquire(Node node) {
if (node == null)
return;
//節(jié)點線程置為null
node.thread = null;
//從CLH隊列中清除已經(jīng)取消的節(jié)點(CANCELLED)
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;
//判斷如果node是尾部節(jié)點,則設置尾部節(jié)點
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
//若不是頭節(jié)點則直接從CLH隊列中清除當前節(jié)點
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
//若為頭結點,則喚醒后繼節(jié)點中的線程
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
取消獲取鎖的操作首先將隊列中處于CANCELLED狀態(tài)的節(jié)點剔除,然后根據(jù)當前節(jié)點在CLH隊列中的位置進行不同的操作:
- node在隊列尾部,則重新設置CLH隊列的尾部節(jié)點;
- node為頭結點,喚醒后繼節(jié)點中的線程;
- node既不是頭結點也不是尾節(jié)點,則在CLH中剔除node。
總結
AQS是整個concurrent包的基礎,可重入鎖、線程池、信號量(Semaphore)等同步工具類都需要借助AQS來完成,了解AQS是深入學習concurrent包的前提。
以上就是Java AQS(AbstractQueuedSynchronizer)源碼解析的詳細內(nèi)容,更多關于Java AQS源碼的資料請關注腳本之家其它相關文章!
相關文章
springboot讀取resource配置文件生成容器對象的示例代碼
這篇文章主要介紹了springboot讀取resource配置文件生成容器對象的示例代碼,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-07-07

