Java并發(fā)系列之AbstractQueuedSynchronizer源碼分析(獨(dú)占模式)
在上一篇《Java并發(fā)系列[1]----AbstractQueuedSynchronizer源碼分析之概要分析》中我們介紹了AbstractQueuedSynchronizer基本的一些概念,主要講了AQS的排隊(duì)區(qū)是怎樣實(shí)現(xiàn)的,什么是獨(dú)占模式和共享模式以及如何理解結(jié)點(diǎn)的等待狀態(tài)。理解并掌握這些內(nèi)容是后續(xù)閱讀AQS源碼的關(guān)鍵,所以建議讀者先看完我的上一篇文章再回過頭來看這篇就比較容易理解。在本篇中會(huì)介紹在獨(dú)占模式下結(jié)點(diǎn)是怎樣進(jìn)入同步隊(duì)列排隊(duì)的,以及離開同步隊(duì)列之前會(huì)進(jìn)行哪些操作。AQS為在獨(dú)占模式和共享模式下獲取鎖分別提供三種獲取方式:不響應(yīng)線程中斷獲取,響應(yīng)線程中斷獲取,設(shè)置超時(shí)時(shí)間獲取。這三種方式整體步驟大致是相同的,只有少部分不同的地方,所以理解了一種方式再看其他方式的實(shí)現(xiàn)都是大同小異。在本篇中我會(huì)著重講不響應(yīng)線程中斷的獲取方式,其他兩種方式也會(huì)順帶講一下不一致的地方。
1. 怎樣以不響應(yīng)線程中斷獲取鎖?
//不響應(yīng)中斷方式獲取(獨(dú)占模式)
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
selfInterrupt();
}
}
上面代碼中雖然看起來簡單,但是它按照順序執(zhí)行了下圖所示的4個(gè)步驟。下面我們會(huì)逐個(gè)步驟進(jìn)行演示分析。

第一步:!tryAcquire(arg)
//嘗試去獲取鎖(獨(dú)占模式)
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
這時(shí)候來了一個(gè)人,他首先嘗試著去敲了敲門,如果發(fā)現(xiàn)門沒鎖(tryAcquire(arg)=true),那就直接進(jìn)去了。如果發(fā)現(xiàn)門鎖了(tryAcquire(arg)=false),就執(zhí)行下一步。這個(gè)tryAcquire方法決定了什么時(shí)候鎖是開著的,什么時(shí)候鎖是關(guān)閉的。這個(gè)方法必須要讓子類去覆蓋,重寫里面的判斷邏輯。
第二步:addWaiter(Node.EXCLUSIVE)
//將當(dāng)前線程包裝成結(jié)點(diǎn)并添加到同步隊(duì)列尾部
private Node addWaiter(Node mode) {
//指定持有鎖的模式
Node node = new Node(Thread.currentThread(), mode);
//獲取同步隊(duì)列尾結(jié)點(diǎn)引用
Node pred = tail;
//如果尾結(jié)點(diǎn)不為空, 表明同步隊(duì)列已存在結(jié)點(diǎn)
if (pred != null) {
//1.指向當(dāng)前尾結(jié)點(diǎn)
node.prev = pred;
//2.設(shè)置當(dāng)前結(jié)點(diǎn)為尾結(jié)點(diǎn)
if (compareAndSetTail(pred, node)) {
//3.將舊的尾結(jié)點(diǎn)的后繼指向新的尾結(jié)點(diǎn)
pred.next = node;
return node;
}
}
//否則表明同步隊(duì)列還沒有進(jìn)行初始化
enq(node);
return node;
}
//結(jié)點(diǎn)入隊(duì)操作
private Node enq(final Node node) {
for (;;) {
//獲取同步隊(duì)列尾結(jié)點(diǎn)引用
Node t = tail;
//如果尾結(jié)點(diǎn)為空說明同步隊(duì)列還沒有初始化
if (t == null) {
//初始化同步隊(duì)列
if (compareAndSetHead(new Node())) {
tail = head;
}
} else {
//1.指向當(dāng)前尾結(jié)點(diǎn)
node.prev = t;
//2.設(shè)置當(dāng)前結(jié)點(diǎn)為尾結(jié)點(diǎn)
if (compareAndSetTail(t, node)) {
//3.將舊的尾結(jié)點(diǎn)的后繼指向新的尾結(jié)點(diǎn)
t.next = node;
return t;
}
}
}
}
執(zhí)行到這一步表明第一次獲取鎖失敗,那么這個(gè)人就給自己領(lǐng)了塊號(hào)碼牌進(jìn)入排隊(duì)區(qū)去排隊(duì)了,在領(lǐng)號(hào)碼牌的時(shí)候會(huì)聲明自己想要以什么樣的方式來占用房間(獨(dú)占模式or共享模式)。注意,這時(shí)候他并沒有坐下來休息(將自己掛起)哦。
第三步:acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
//以不可中斷方式獲取鎖(獨(dú)占模式)
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//獲取給定結(jié)點(diǎn)的前繼結(jié)點(diǎn)的引用
final Node p = node.predecessor();
//如果當(dāng)前結(jié)點(diǎn)是同步隊(duì)列的第一個(gè)結(jié)點(diǎn), 就嘗試去獲取鎖
if (p == head && tryAcquire(arg)) {
//將給定結(jié)點(diǎn)設(shè)置為head結(jié)點(diǎn)
setHead(node);
//為了幫助垃圾收集, 將上一個(gè)head結(jié)點(diǎn)的后繼清空
p.next = null;
//設(shè)置獲取成功狀態(tài)
failed = false;
//返回中斷的狀態(tài), 整個(gè)循環(huán)執(zhí)行到這里才是出口
return interrupted;
}
//否則說明鎖的狀態(tài)還是不可獲取, 這時(shí)判斷是否可以掛起當(dāng)前線程
//如果判斷結(jié)果為真則掛起當(dāng)前線程, 否則繼續(xù)循環(huán), 在這期間線程不響應(yīng)中斷
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
interrupted = true;
}
}
} finally {
//在最后確保如果獲取失敗就取消獲取
if (failed) {
cancelAcquire(node);
}
}
}
//判斷是否可以將當(dāng)前結(jié)點(diǎn)掛起
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//獲取前繼結(jié)點(diǎn)的等待狀態(tài)
int ws = pred.waitStatus;
//如果前繼結(jié)點(diǎn)狀態(tài)為SIGNAL, 表明前繼結(jié)點(diǎn)會(huì)喚醒當(dāng)前結(jié)點(diǎn), 所以當(dāng)前結(jié)點(diǎn)可以安心的掛起了
if (ws == Node.SIGNAL) {
return true;
}
if (ws > 0) {
//下面的操作是清理同步隊(duì)列中所有已取消的前繼結(jié)點(diǎn)
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//到這里表示前繼結(jié)點(diǎn)狀態(tài)不是SIGNAL, 很可能還是等于0, 這樣的話前繼結(jié)點(diǎn)就不會(huì)去喚醒當(dāng)前結(jié)點(diǎn)了
//所以當(dāng)前結(jié)點(diǎn)必須要確保前繼結(jié)點(diǎn)的狀態(tài)為SIGNAL才能安心的掛起自己
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
//掛起當(dāng)前線程
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
領(lǐng)完號(hào)碼牌進(jìn)入排隊(duì)區(qū)后就會(huì)立馬執(zhí)行這個(gè)方法,當(dāng)一個(gè)結(jié)點(diǎn)首次進(jìn)入排隊(duì)區(qū)后有兩種情況,一種是發(fā)現(xiàn)他前面的那個(gè)人已經(jīng)離開座位進(jìn)入房間了,那他就不坐下來休息了,會(huì)再次去敲一敲門看看那小子有沒有完事。如果里面的人剛好完事出來了,都不用他叫自己就直接沖進(jìn)去了。否則,就要考慮坐下來休息一會(huì)兒了,但是他還是不放心,如果他坐下來睡著后沒人提醒他怎么辦?他就在前面那人的座位上留一個(gè)小紙條,好讓從里面出來的人看到紙條后能夠喚醒他。還有一種情況是,當(dāng)他進(jìn)入排隊(duì)區(qū)后發(fā)現(xiàn)前面還有好幾個(gè)人在座位上排隊(duì)呢,那他就可以安心的坐下來咪一會(huì)兒了,但在此之前他還是會(huì)在前面那人(此時(shí)已經(jīng)睡著了)的座位上留一個(gè)紙條,好讓這個(gè)人在走之前能夠去喚醒自己。當(dāng)一切事情辦妥了之后,他就安安心心的睡覺了,注意,我們看到整個(gè)for循環(huán)就只有一個(gè)出口,那就是等線程成功的獲取到鎖之后才能出去,在沒有獲取到鎖之前就一直是掛在for循環(huán)的parkAndCheckInterrupt()方法里頭。線程被喚醒后也是從這個(gè)地方繼續(xù)執(zhí)行for循環(huán)。
第四步:selfInterrupt()
//當(dāng)前線程將自己中斷
private static void selfInterrupt() {
Thread.currentThread().interrupt();
}
由于上面整個(gè)線程一直是掛在for循環(huán)的parkAndCheckInterrupt()方法里頭,沒有成功獲取到鎖之前不響應(yīng)任何形式的線程中斷,只有當(dāng)線程成功獲取到鎖并從for循環(huán)出來后,他才會(huì)查看在這期間是否有人要求中斷線程,如果是的話再去調(diào)用selfInterrupt()方法將自己掛起。
2. 怎樣以響應(yīng)線程中斷獲取鎖?
//以可中斷模式獲取鎖(獨(dú)占模式)
private void doAcquireInterruptibly(int arg) throws InterruptedException {
//將當(dāng)前線程包裝成結(jié)點(diǎn)添加到同步隊(duì)列中
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
//獲取當(dāng)前結(jié)點(diǎn)的前繼結(jié)點(diǎn)
final Node p = node.predecessor();
//如果p是head結(jié)點(diǎn), 那么當(dāng)前線程就再次嘗試獲取鎖
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
//獲取鎖成功后返回
return;
}
//如果滿足條件就掛起當(dāng)前線程, 此時(shí)響應(yīng)中斷并拋出異常
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
//線程被喚醒后如果發(fā)現(xiàn)中斷請(qǐng)求就拋出異常
throw new InterruptedException();
}
}
} finally {
if (failed) {
cancelAcquire(node);
}
}
}
響應(yīng)線程中斷方式和不響應(yīng)線程中斷方式獲取鎖流程上大致上是相同的。唯一的一點(diǎn)區(qū)別就是線程從parkAndCheckInterrupt方法中醒來后會(huì)檢查線程是否中斷,如果是的話就拋出InterruptedException異常,而不響應(yīng)線程中斷獲取鎖是在收到中斷請(qǐng)求后只是設(shè)置一下中斷狀態(tài),并不會(huì)立馬結(jié)束當(dāng)前獲取鎖的方法,一直到結(jié)點(diǎn)成功獲取到鎖之后才會(huì)根據(jù)中斷狀態(tài)決定是否將自己掛起。
3. 怎樣設(shè)置超時(shí)時(shí)間獲取鎖?
//以限定超時(shí)時(shí)間獲取鎖(獨(dú)占模式)
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
//獲取系統(tǒng)當(dāng)前時(shí)間
long lastTime = System.nanoTime();
//將當(dāng)前線程包裝成結(jié)點(diǎn)添加到同步隊(duì)列中
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
//獲取當(dāng)前結(jié)點(diǎn)的前繼結(jié)點(diǎn)
final Node p = node.predecessor();
//如果前繼是head結(jié)點(diǎn), 那么當(dāng)前線程就再次嘗試獲取鎖
if (p == head && tryAcquire(arg)) {
//更新head結(jié)點(diǎn)
setHead(node);
p.next = null;
failed = false;
return true;
}
//超時(shí)時(shí)間用完了就直接退出循環(huán)
if (nanosTimeout <= 0) {
return false;
}
//如果超時(shí)時(shí)間大于自旋時(shí)間, 那么等判斷可以掛起線程之后就會(huì)將線程掛起一段時(shí)間
if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) {
//將當(dāng)前線程掛起一段時(shí)間, 之后再自己醒來
LockSupport.parkNanos(this, nanosTimeout);
}
//獲取系統(tǒng)當(dāng)前時(shí)間
long now = System.nanoTime();
//超時(shí)時(shí)間每次都減去獲取鎖的時(shí)間間隔
nanosTimeout -= now - lastTime;
//再次更新lastTime
lastTime = now;
//在獲取鎖的期間收到中斷請(qǐng)求就拋出異常
if (Thread.interrupted()) {
throw new InterruptedException();
}
}
} finally {
if (failed) {
cancelAcquire(node);
}
}
}
設(shè)置超時(shí)時(shí)間獲取首先會(huì)去獲取一下鎖,第一次獲取鎖失敗后會(huì)根據(jù)情況,如果傳入的超時(shí)時(shí)間大于自旋時(shí)間那么就會(huì)將線程掛起一段時(shí)間,否則的話就會(huì)進(jìn)行自旋,每次獲取鎖之后都會(huì)將超時(shí)時(shí)間減去獲取一次鎖所用的時(shí)間。一直到超時(shí)時(shí)間小于0也就說明超時(shí)時(shí)間用完了,那么這時(shí)就會(huì)結(jié)束獲取鎖的操作然后返回獲取失敗標(biāo)志。注意在以超時(shí)時(shí)間獲取鎖的過程中是可以響應(yīng)線程中斷請(qǐng)求的。
4. 線程釋放鎖并離開同步隊(duì)列是怎樣進(jìn)行的?
//釋放鎖的操作(獨(dú)占模式)
public final boolean release(int arg) {
//撥動(dòng)密碼鎖, 看看是否能夠開鎖
if (tryRelease(arg)) {
//獲取head結(jié)點(diǎn)
Node h = head;
//如果head結(jié)點(diǎn)不為空并且等待狀態(tài)不等于0就去喚醒后繼結(jié)點(diǎn)
if (h != null && h.waitStatus != 0) {
//喚醒后繼結(jié)點(diǎn)
unparkSuccessor(h);
}
return true;
}
return false;
}
//喚醒后繼結(jié)點(diǎn)
private void unparkSuccessor(Node node) {
//獲取給定結(jié)點(diǎn)的等待狀態(tài)
int ws = node.waitStatus;
//將等待狀態(tài)更新為0
if (ws < 0) {
compareAndSetWaitStatus(node, ws, 0);
}
//獲取給定結(jié)點(diǎn)的后繼結(jié)點(diǎn)
Node s = node.next;
//后繼結(jié)點(diǎn)為空或者等待狀態(tài)為取消狀態(tài)
if (s == null || s.waitStatus > 0) {
s = null;
//從后向前遍歷隊(duì)列找到第一個(gè)不是取消狀態(tài)的結(jié)點(diǎn)
for (Node t = tail; t != null && t != node; t = t.prev) {
if (t.waitStatus <= 0) {
s = t;
}
}
}
//喚醒給定結(jié)點(diǎn)后面首個(gè)不是取消狀態(tài)的結(jié)點(diǎn)
if (s != null) {
LockSupport.unpark(s.thread);
}
}
線程持有鎖進(jìn)入房間后就會(huì)去辦自己的事情,等事情辦完后它就會(huì)釋放鎖并離開房間。通過tryRelease方法可以撥動(dòng)密碼鎖進(jìn)行解鎖,我們知道tryRelease方法是需要讓子類去覆蓋的,不同的子類實(shí)現(xiàn)的規(guī)則不一樣,也就是說不同的子類設(shè)置的密碼不一樣。像在ReentrantLock當(dāng)中,房間里面的人每調(diào)用tryRelease方法一次,state就減1,直到state減到0的時(shí)候密碼鎖就開了。大家想想這個(gè)過程像不像我們?cè)诓煌5霓D(zhuǎn)動(dòng)密碼鎖的轉(zhuǎn)輪,而每次轉(zhuǎn)動(dòng)轉(zhuǎn)輪數(shù)字只是減少1。CountDownLatch和這個(gè)也有點(diǎn)類似,只不過它不是一個(gè)人在轉(zhuǎn),而是多個(gè)人每人都去轉(zhuǎn)一下,集中大家的力量把鎖給開了。線程出了房間后它會(huì)找到自己原先的座位,也就是找到head結(jié)點(diǎn)??纯醋簧嫌袥]有人給它留了小紙條,如果有的話它就知道有人睡著了需要讓它幫忙喚醒,那么它就會(huì)去喚醒那個(gè)線程。如果沒有的話就表明同步隊(duì)列中暫時(shí)還沒有人在等待,也沒有人需要它喚醒,所以它就可以安心的離去了。以上過程就是在獨(dú)占模式下釋放鎖的過程。
注:以上全部分析基于JDK1.7,不同版本間會(huì)有差異,讀者需要注意。
以上就是本文的全部內(nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
IntelliJ IDEA(或者JetBrains PyCharm)中彈出"IntelliJ IDEA License
今天小編就為大家分享一篇關(guān)于IntelliJ IDEA(或者JetBrains PyCharm)中彈出"IntelliJ IDEA License Activation"的解決辦法,小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來看看吧2018-10-10
java中超過long范圍的超大整數(shù)相加算法詳解(面試高頻)
這篇文章主要介紹了java中超過long范圍的超大整數(shù)相加算法(面試高頻),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-08-08
教你如何測(cè)試Spring Data JPA的Repository
Spring Data JPA 提供了一些便捷的方式來測(cè)試這種持久層的代碼,常見的兩種測(cè)試類型是集成測(cè)試和單元測(cè)試,本文通過示例代碼給大家介紹的非常詳細(xì),感興趣的朋友跟隨小編一起看看吧2024-08-08
SpringBoot實(shí)現(xiàn)服務(wù)接入nacos注冊(cè)中心流程詳解
這篇文章主要介紹了SpringBoot實(shí)現(xiàn)服務(wù)接入nacos注冊(cè)中心流程,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧2023-01-01

