欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

一文帶你深入理解Java?AbstractQueuedSynchronizer

 更新時(shí)間:2023年07月17日 08:32:12   作者:單程車票  
在并發(fā)編程中,鎖是一種保證線程安全的方式,這篇文章主要為大家介紹了AbstractQueuedSynchronizer(AQS)的數(shù)據(jù)結(jié)構(gòu)及實(shí)現(xiàn)原理,感興趣的小伙伴可以了解一下

前言

在并發(fā)編程中,鎖是一種保證線程安全的方式,Java 主要有兩種鎖機(jī)制,一種是 synchronized 關(guān)鍵字修飾的鎖,通過 JVM 層面的指令碼來控制鎖(依賴于底層的操作系統(tǒng));而另一種則是 JUC 包下的各類同步器如 ReentrantLock 可重入鎖,那么這類同步器是怎么樣實(shí)現(xiàn)鎖機(jī)制的呢?

其實(shí) ReentrantLock 這類的鎖是基于 AbstractQueuedSynchronizer(文章后面都稱為 AQS)實(shí)現(xiàn)的。那么為什么 AQS 能夠?qū)崿F(xiàn)鎖機(jī)制呢?

本篇文章將會深入講解 AQS 的數(shù)據(jù)結(jié)構(gòu)及實(shí)現(xiàn)原理。

AQS 概述

AQS 是什么?

AQS 直譯為抽象隊(duì)列同步器,是用來構(gòu)建鎖和同步器的重量級基礎(chǔ)框架。JUC包下的鎖和同步器如 ReentrantLock、Semaphore、ReentrantReadWriteLockCountDownLatch等都是基于 AQS 實(shí)現(xiàn)的。

AQS 的原理

在并發(fā)場景下,多線程存在搶占共享資源的情況,那么必定存在搶占不到資源的線程需要進(jìn)行排隊(duì)等待,并且當(dāng)資源釋放時(shí)也需要喚醒這些線程進(jìn)行資源爭搶,所以 AQS 提供了一套線程等待阻塞以及線程喚醒的機(jī)制來實(shí)現(xiàn)多線程下線程安全。

AQS 通過 維護(hù)一個(gè) int 類型的狀態(tài)變量和一個(gè) FIFO 的虛擬雙向隊(duì)列(CLH 隊(duì)列鎖的變體) 來實(shí)現(xiàn)線程等待和喚醒機(jī)制的。

原理大致為:當(dāng)線程請求共享資源空閑時(shí),AQS 會將當(dāng)前線程設(shè)置為有效的工作線程并通過 CAS 的方式將狀態(tài)變量設(shè)置為鎖定狀態(tài);當(dāng)線程獲取共享資源失敗時(shí),AQS 會將線程及等待狀態(tài)封裝成一個(gè) Node 節(jié)點(diǎn),將其加入隊(duì)列中;當(dāng)共享資源被釋放時(shí),AQS 會喚醒隊(duì)列中的下一個(gè)節(jié)點(diǎn)再次嘗試獲取共享資源。

下文將通過源碼分析具體展示 AQS 是如何實(shí)現(xiàn)線程等待阻塞以及喚醒的。

如何自定義同步器

上面提到 ReentrantLock、SemaphoreReentrantReadWriteLock、CountDownLatch等同步器都是基于 AQS 實(shí)現(xiàn)的(其實(shí)就是繼承 AQS),那么是不是只要繼承 AQS 也可以實(shí)現(xiàn)自定義同步器呢?

淺看 AQS 源碼,可以看到 AQS 是抽象類。其實(shí) AQS 是基于模板模式設(shè)計(jì)的,也就是說 AQS 已經(jīng)提供了一套線程等待阻塞以喚醒的實(shí)現(xiàn),不同的同步器只需要繼承 AQS 類并重寫 AQS 指定的方法來定制自己的獲取資源和釋放資源邏輯即可。

那么有哪些方法是可以進(jìn)行重寫的呢?

通過源碼會發(fā)現(xiàn) AQS 的所有方法只有 5 個(gè)方法是可以重寫的,其余要不是 private 就是 final 修飾的方法。

下面列出這五個(gè)方法:

// (獨(dú)占式)嘗試獲取資源,成功則返回true,失敗則返回false。
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
// (獨(dú)占式)嘗試釋放資源,成功則返回true,失敗則返回false。
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}
// (共享式)嘗試獲取資源,負(fù)數(shù)表示失敗;0表示成功,但沒有剩余可用資源;正數(shù)表示成功,且有剩余資源。
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}
// (共享式)嘗試釋放資源,成功則返回true,失敗則返回false。
protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}
// 判斷當(dāng)前線程是否正在獨(dú)占式,只有用到condition才需要去實(shí)現(xiàn)它。
protected boolean isHeldExclusively() {
    throw new UnsupportedOperationException();
}

上面代碼可以看到 AQS 分為獨(dú)占式和共享式兩種獲取共享資源的方式:

  • 獨(dú)占式(Exclusive):只有一個(gè)線程能獲取共享資源,如 ReentrantLock 可重入鎖。
  • 共享式(Share):多個(gè)線程能同時(shí)獲取共享資源,如Semaphore、CountDownLatChCyclicBarrier等 JUC 工具類。

需要實(shí)現(xiàn)獨(dú)占式的同步器就去重寫?yīng)氄际降姆椒?,需要?shí)現(xiàn)共享式的同步器就去重寫共享式的方法(注意這里重寫的方法必須都是內(nèi)部線程安全的,并且盡可能地簡短)。

下面以獨(dú)占式的 ReentrantLock 為例,看看 ReentrantLock 源碼中是如何繼承 AQS 并重寫方法實(shí)現(xiàn)同步器的。

可以看到 ReentrantLock 通過內(nèi)部類 Sync 繼承 AQS 并在 Sync 中重寫了獨(dú)占式的 tryRelease() 方法,然后將獨(dú)占式的 tryAcquire() 交給 Sync 的兩個(gè)子類(也就是公平與非公平)去按照各自的邏輯實(shí)現(xiàn)。

基本上面提到同步器都是通過依賴內(nèi)部類 Sync 繼承 AQS 實(shí)現(xiàn)地同步器,所以需要自定義同步器的也可以仿照這樣的方式創(chuàng)建。

AQS 的核心數(shù)據(jù)結(jié)構(gòu)

前文說道 AQS 是基于 CLH 隊(duì)列鎖的變體 實(shí)現(xiàn)的,所以 AQS 的核心數(shù)據(jù)結(jié)構(gòu)就是這個(gè) CLH 隊(duì)列鎖的變體,在了解 AQS 的核心數(shù)據(jù)結(jié)構(gòu)前,還需要先介紹一下 CLH 隊(duì)列鎖是什么?

CLH 隊(duì)列鎖是什么?

相信大家都聽說過自旋鎖吧,自旋鎖是互斥鎖的一種實(shí)現(xiàn)方式,通過 CAS 的方式獲取鎖和釋放鎖來實(shí)現(xiàn)互斥鎖。但是自旋鎖存在鎖饑餓問題鎖競爭激烈下性能較差問題。

CLH 隊(duì)列鎖是對自旋鎖的改進(jìn),有效地解決了自旋鎖上面的兩個(gè)問題,通過隊(duì)列的方式可以防止鎖饑餓問題,同時(shí)實(shí)現(xiàn)了鎖狀態(tài)去中心化,讓每個(gè)線程可以在不同的狀態(tài)變量下自旋,從而來減少 CPU 的性能開銷。

CLH 隊(duì)列鎖可以看成是一個(gè)單向鏈表隊(duì)列,將所有請求共享資源的線程封裝成 節(jié)點(diǎn)(包含 線程標(biāo)識 和 被鎖定狀態(tài)) 排列在隊(duì)列中,如下圖。

原理流程:

  • CLH 隊(duì)列鎖會維護(hù)一個(gè) tail 用于指向隊(duì)列的末尾節(jié)點(diǎn),初始化時(shí) tail 會指向一個(gè)被鎖定狀態(tài)為 false 的空節(jié)點(diǎn)。
  • 當(dāng)有新的線程需要獲取共享資源時(shí),會先將被鎖定狀態(tài)置為 true,然后通過 getAndSet 的原子操作方式獲取 tail 所指向的節(jié)點(diǎn)并判斷節(jié)點(diǎn)的被鎖定狀態(tài)是否為 false,狀態(tài)為 false 則獲得共享資源,為 true 則加入隊(duì)列,并將 tail 指向當(dāng)前節(jié)點(diǎn),使得當(dāng)前節(jié)點(diǎn)稱為新的末尾節(jié)點(diǎn)。
  • 入隊(duì)的節(jié)點(diǎn)會以輪詢的方式訪問上一節(jié)點(diǎn)的被鎖定狀態(tài),也就是說只有上一節(jié)點(diǎn)釋放共享資源(被鎖定狀態(tài)為 false)后,當(dāng)前節(jié)點(diǎn)才能獲取到共享資源(上述的共享資源都能同于鎖)。

可以看到 CLH 隊(duì)列鎖通過隊(duì)列實(shí)現(xiàn)了公平鎖,先入隊(duì)的線程會先獲得共享資源,解決了鎖饑餓問題;并且每個(gè)節(jié)點(diǎn)的被鎖定狀態(tài)只會影響到其后一個(gè)節(jié)點(diǎn),實(shí)現(xiàn)了鎖去中心化從而減少 CPU 的開銷。

AQS 對 CLH 隊(duì)列鎖的改進(jìn)

雖然 CLH 隊(duì)列鎖已經(jīng)具有良好的性能了,但是因?yàn)榇嬖谧孕砸琅f存在 CPU 開銷問題,并且 CLH 隊(duì)列鎖本身的功能單一,不能支持復(fù)雜的功能。

所以 AQS 對 CLH 隊(duì)列鎖進(jìn)行了改進(jìn),使用 LockSupport 類將自旋改為阻塞線程操作(后續(xù)源碼會具體介紹)來減少 CPU 的開銷,擴(kuò)展節(jié)點(diǎn)的狀態(tài)以及顯示的維護(hù)前驅(qū)和后續(xù)節(jié)點(diǎn)。

AQS 使用內(nèi)部類 Node 來實(shí)現(xiàn) CLH 隊(duì)列鎖的變體,也就是 AQS 的核心數(shù)據(jù)結(jié)構(gòu)。

下面看看 AQS 的內(nèi)部類 Node 的源碼:

static final class Node {
    // 共享模式
    static final Node SHARED = new Node();
    // 獨(dú)占模式
    static final Node EXCLUSIVE = null;
    // 由于超時(shí)、中斷或其他原因,線程被取消
    static final int CANCELLED =  1;
    // 當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)阻塞等待共享資源
    static final int SIGNAL    = -1;
    // 當(dāng)前節(jié)點(diǎn)在條件隊(duì)列
    static final int CONDITION = -2;
    // 當(dāng)前節(jié)點(diǎn)的下一個(gè)acquireShared應(yīng)無條件傳播
    static final int PROPAGATE = -3;
    // 節(jié)點(diǎn)狀態(tài)
    volatile int waitStatus;
    // 前驅(qū)節(jié)點(diǎn)
    volatile Node prev;
    // 后繼節(jié)點(diǎn)
    volatile Node next;
    // 節(jié)點(diǎn)的線程
    volatile Thread thread;
    // 下一個(gè)等待者(這個(gè)用于 Condition,這里不做過多說明)
    Node nextWaiter;
    // 是否是共享模式
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
    // 獲取前驅(qū)節(jié)點(diǎn),為空則拋出異常
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
    // 各種構(gòu)造器
    Node() {
    }
    Node(Thread thread, Node mode) {
        this.nextWaiter = mode;
        this.thread = thread;
    }
    Node(Thread thread, int waitStatus) {
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

(說明:AQS 實(shí)際上有兩種隊(duì)列,一個(gè)是使用雙向鏈表實(shí)現(xiàn)(利用 prevnext)的隊(duì)列,一個(gè)是使用單向鏈表實(shí)現(xiàn)(利用 nextWaiter)的 Condition 隊(duì)列,文章主要講解 AQS 最為核心的雙向鏈表隊(duì)列,關(guān)于 Condition 的內(nèi)容本文不做講解,有興趣的可以通過閱讀源碼)

從上面的源碼中可以看到內(nèi)部類 Node 中三個(gè)比較重要的屬性:

waitStatus 節(jié)點(diǎn)狀態(tài)

  • CANCELLED=1:由于超時(shí)、中斷或其他原因,當(dāng)前節(jié)點(diǎn)的線程被取消。
  • SIGNAL=-1:當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)的線程阻塞等待共享資源(也就是線程已經(jīng)準(zhǔn)備好了就等共享資源釋放了)。
  • CONDITION=-2當(dāng)前節(jié)點(diǎn)在條件隊(duì)列中。
  • PROPAGATE=-3當(dāng)前節(jié)點(diǎn)的下一個(gè) acquireShared 應(yīng)無條件傳播。

前驅(qū)節(jié)點(diǎn)和后繼節(jié)點(diǎn)

  • prev:前驅(qū)節(jié)點(diǎn)
  • next:后繼節(jié)點(diǎn)

除此之外還有一個(gè)常用的方法 predecessor() 用于獲取當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn),如果前驅(qū)節(jié)點(diǎn)為空則拋出異常。

AQS 的 CLH 隊(duì)列鎖變體如下圖

圖中有 headtail 兩個(gè)變量分別指向隊(duì)列頭部和末尾節(jié)點(diǎn),這是 AQS 類的屬性(后續(xù)源碼分析中介紹)。并且隊(duì)列的頭節(jié)點(diǎn)是哨兵節(jié)點(diǎn)(只用來占位,沒有線程),其實(shí)就是 new Node() 初始化一個(gè)空節(jié)點(diǎn)(后續(xù)源碼中介紹)。

AQS 源碼分析

前面已經(jīng)說明了 AQS 的核心數(shù)據(jù)結(jié)構(gòu),接下來將會通過源碼去進(jìn)一步的了解 AQS 是如何憑借 同步狀態(tài)變量CLH 隊(duì)列鎖的變體 實(shí)現(xiàn)線程等待喚醒機(jī)制的。

1. 繼承 AOS 類

先了解一下 AQS 繼承的父類 AOS 為 AQS 提供了什么方法。

AQS 繼承 AbstractOwnableSynchronizer 抽象類,AbstractOwnableSynchronizer 類中有exclusiveOwnerThread 屬性,表示獨(dú)占式下資源的所有者(持有共享資源的線程)。同時(shí)包含該屬性的 get/set 方法用于設(shè)置當(dāng)前獨(dú)占資源的線程和獲取當(dāng)前獨(dú)占資源的線程。因?yàn)槔^承關(guān)系也就意味著 AQS 可以調(diào)用這兩個(gè)方法。

2. AQS 的屬性

接下來了解一下 AQS 的屬性,更好地去認(rèn)識 AQS 的數(shù)據(jù)結(jié)構(gòu)以及如何獲取和設(shè)置同步狀態(tài)變量。

查看源碼:

// 序列號
private static final long serialVersionUID = 7373984972572414691L;
// 頭結(jié)點(diǎn)
private transient volatile Node head;
// 尾結(jié)點(diǎn)
private transient volatile Node tail;
// 同步狀態(tài)
private volatile int state;
// 自旋時(shí)間
static final long spinForTimeoutThreshold = 1000L;
// Unsafe實(shí)例以及各種內(nèi)存偏移量
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;

這里主要關(guān)注 head 頭結(jié)點(diǎn)、tail 尾結(jié)點(diǎn)、state 同步狀態(tài)即可。

AQS 中通過 Getter/Setter 方法以及 CAS 的方式設(shè)置和獲取 state 屬性。

// 獲取同步狀態(tài)
protected final int getState() {
    return state;
}
// 設(shè)置同步狀態(tài)
protected final void setState(int newState) {
    state = newState;
}
// CAS方式設(shè)置同步狀態(tài)(當(dāng)期待值expect等于當(dāng)前同步狀態(tài)時(shí),將同步狀態(tài)設(shè)置為update值)
protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

3. AQS 如何實(shí)現(xiàn)線程等待機(jī)制

通過前文可以了解到 ReentrantLock 是如何繼承 AQS 的,那么下面以 ReentrantLock 的非公平鎖為例深入 AQS 源碼中分析 AQS 的 核心方法 以及 AQS 如何結(jié)合 同步狀態(tài)CLH 隊(duì)列鎖的變體 實(shí)現(xiàn)線程的等待機(jī)制的。

首先創(chuàng)建非公平鎖的可重入鎖 Lock lock = new ReentrantLock();,此時(shí) AQS 的 state 同步狀態(tài)為0,表示沒有線程占用共享資源。

lock.lock() 開始看 AQS 如何實(shí)現(xiàn)多線程搶占式下的線程等待阻塞

進(jìn)入非公平鎖的 lock()方法,查看源碼:

可以看到調(diào)用 compareAndSetState() 用于以 CAS 的方式(原子操作)設(shè)置同步狀態(tài) state。

  • 如果 state 為 0 時(shí),將其設(shè)置為 1,并返回 true 執(zhí)行 setExclusiveOwnerThread() 方法將當(dāng)前線程設(shè)置為共享資源所有者;
  • 如果 state 為 1 時(shí),返回 false 進(jìn)入 acquire(1) 方法中。

AQS 核心方法 acquire()

AQS 實(shí)現(xiàn)線程獲取資源以及線程等待阻塞的核心方法,查看源碼:

可以看到這里共有三個(gè) AQS 核心方法:tryAcquire()、addWaiter()、acquireQueued()。

下面根據(jù) if 語句從左到右執(zhí)行的順序依次進(jìn)入這三個(gè)方法。

tryAcquire() 方法:自定義獲取共享資源的邏輯

這個(gè)方法如果前文看的仔細(xì)的話,會發(fā)現(xiàn)它是需要子類重寫的方法,也就是由子類自己定義搶占式獲取資源的邏輯。

進(jìn)入非公平鎖實(shí)現(xiàn)的 tryAcquire() 方法

發(fā)現(xiàn)該方法調(diào)用了 nonfairTryAcquire() 方法,查看源碼:

// 這里調(diào)用的是 nonfairTryAcquire(1),也就是acquires為 1
final boolean nonfairTryAcquire(int acquires) {
    // 獲取當(dāng)前線程
    final Thread current = Thread.currentThread();
    // 獲取同步狀態(tài)
    int c = getState();
    // 同步狀態(tài)為0,說明共享資源空閑可以直接搶占
    if (c == 0) {
        // 再次嘗試搶占
        if (compareAndSetState(0, acquires)) {
            // 成功則將當(dāng)前線程設(shè)置為共享資源所有者
            setExclusiveOwnerThread(current);
            // 返回 true
            return true;
        }
    }
    // 當(dāng)前共享資源還在被占用,判斷當(dāng)前線程是不是共享資源所有者
    else if (current == getExclusiveOwnerThread()) {
        // 增加同步狀態(tài)(這里其實(shí)也就是 ReentrantLock 是可重入鎖的原因,如果是自己線程持有的鎖,可以再次加鎖,也就是可重入)
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        // 更新同步狀態(tài)
        setState(nextc);
        // 返回 true
        return true;
    }
    // 如果上面情況都不是則返回 false
    return false;
}

這個(gè)方法其實(shí)體現(xiàn)的主要是 ReentrantLock 可重入鎖的邏輯,執(zhí)行流程如下:

  • 先判斷當(dāng)前共享資源是否空閑,空閑說明沒有線程占用,當(dāng)前線程可以再次嘗試搶占,搶占成功則將當(dāng)前線程設(shè)置為共享資源所有者并返回 true,這樣后續(xù)的 addWaiter()、acquireQueued() 就無需執(zhí)行了。
  • 當(dāng)前共享資源被占用中,判斷當(dāng)前線程是不是共享資源所有者,是則可以再次加鎖,這也就是可重入鎖的意義(持有鎖的線程可以重復(fù)加鎖,但是在解鎖的時(shí)候也要確保加多少鎖解多少鎖,要使同步狀態(tài)恢復(fù)為 0,否則會一直阻塞),更新同步狀態(tài)后返回 true,這樣后續(xù)的 addWaiter()acquireQueued() 也無需執(zhí)行了。
  • 如果上面兩種情況都不是,則返回 false,進(jìn)入 acquire() 的下一個(gè) addWaiter() 方法。

addWaiter() 方法:將當(dāng)前線程結(jié)合模式封裝成節(jié)點(diǎn)加入隊(duì)列

該方法是 AQS 中的方法,查看源碼:

private Node addWaiter(Node mode) {
    // 以當(dāng)前線程以及 mode(這里是獨(dú)占式)為參數(shù)構(gòu)建節(jié)點(diǎn)
    Node node = new Node(Thread.currentThread(), mode);
    // 獲取隊(duì)列末尾節(jié)點(diǎn)
    Node pred = tail;
    // 判斷末尾節(jié)點(diǎn)是否為空
    if (pred != null) {
        // 末尾不為空時(shí),將當(dāng)前節(jié)點(diǎn)鏈接到莫尾結(jié)點(diǎn)后面,并且將當(dāng)前節(jié)點(diǎn)設(shè)置為末尾節(jié)點(diǎn)
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            // 返回當(dāng)前節(jié)點(diǎn)
            return node;
        }
    }
    // 末尾節(jié)點(diǎn)為空,則進(jìn)入 enq 方法
    enq(node);
    return node;
}

執(zhí)行流程:

  • 根據(jù)傳入的參數(shù)構(gòu)造節(jié)點(diǎn),此時(shí)節(jié)點(diǎn)的狀態(tài) waitStatus 為 0。
  • 判斷末尾節(jié)點(diǎn)是否為空,這里其實(shí)就是看看之前有沒有創(chuàng)建過隊(duì)列,有則直接加入末尾節(jié)點(diǎn)后面,并通過CAS 將當(dāng)前節(jié)點(diǎn)設(shè)置為末尾節(jié)點(diǎn)。
  • 沒有則進(jìn)入 enq() 方法創(chuàng)建隊(duì)列并加入節(jié)點(diǎn)。

enq() 方法:初始化隊(duì)列(加入哨兵節(jié)點(diǎn)為頭結(jié)點(diǎn))并加入當(dāng)前節(jié)點(diǎn)

該方法是 AQS 中的方法,查看源碼:

private Node enq(final Node node) {
    // 循環(huán)直到將 node 加入隊(duì)列為止
    for (;;) {
        Node t = tail;
        // 判斷末尾節(jié)點(diǎn)是否為空
        if (t == null) {
            // 為空,說明需要設(shè)置哨兵節(jié)點(diǎn)
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 不為空,直接加入末尾,并設(shè)置當(dāng)前節(jié)點(diǎn)為末尾節(jié)點(diǎn)
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

執(zhí)行流程:

  • 根據(jù)末尾節(jié)點(diǎn)是否為空,判斷是否有哨兵節(jié)點(diǎn),沒有則使用 CAS 的方式將 new Node() 加入頭節(jié)點(diǎn)。
  • 因?yàn)槭茄h(huán),所以最終會直到 node 加入末尾節(jié)點(diǎn)為止才會終止,并且將當(dāng)前節(jié)點(diǎn)返回(通過循環(huán)確保節(jié)點(diǎn)一定加入隊(duì)列)。

這里也就是前文數(shù)據(jù)結(jié)構(gòu)圖中為什么頭節(jié)點(diǎn)是哨兵節(jié)點(diǎn)的原因,如圖:

acquireQueued() 方法:隊(duì)列中輪詢搶占共享資源,搶占不成功則進(jìn)入阻塞等待喚醒

addWaiter() 執(zhí)行完加入隊(duì)列后會將節(jié)點(diǎn)返回作為參數(shù)執(zhí)行 acquireQueued(),查看源碼:

// node 為當(dāng)前加入隊(duì)列的節(jié)點(diǎn),arg 為 1
final boolean acquireQueued(final Node node, int arg) {
    // 標(biāo)志,用來判斷是否執(zhí)行后面的cancelAcquire方法
    // 也就是中途是否存在異常中斷,有則使用cancelAcquire整理隊(duì)列
    boolean failed = true;
    try {
        // 中斷標(biāo)志
        boolean interrupted = false;
        // 循環(huán)
        for (;;) {
            // 獲取當(dāng)前節(jié)點(diǎn)的前置節(jié)點(diǎn)
            final Node p = node.predecessor();
            // 判斷前置節(jié)點(diǎn)是否是頭結(jié)點(diǎn)(當(dāng)前節(jié)點(diǎn)是否是隊(duì)列第一個(gè)等待線程)以及再次嘗試獲取資源
            if (p == head && tryAcquire(arg)) {
                // 滿足條件說明當(dāng)前節(jié)點(diǎn)已經(jīng)搶占到資源,可以從隊(duì)列中移除
                // 所以這里會將當(dāng)前節(jié)點(diǎn)設(shè)置為頭結(jié)點(diǎn)(setHead方法操作是先將節(jié)點(diǎn)線程以及前驅(qū)節(jié)點(diǎn)置空)
                setHead(node);
                // 并且將原本的頭結(jié)點(diǎn)置空,通過輔助GC回收
                p.next = null; // help GC
                // 標(biāo)志置為false,表示沒有異常
                failed = false;
                // 返回中斷標(biāo)志
                return interrupted;
            }
            // 上述條件不滿足,則執(zhí)行shouldParkAfterFailedAcquire和parkAndCheckInterrupt
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            // 整理隊(duì)列,將隊(duì)列中中斷的進(jìn)程移除
            cancelAcquire(node);
    }
}

這里需要分情況討論:

  • 假設(shè)當(dāng)前的線程是隊(duì)列的第一個(gè)線程(也就是前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn)),并且執(zhí)行該方法時(shí),共享資源已經(jīng)空閑,此時(shí)該線程符合前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn)并且該線程搶占到資源,那么就會將該線程移除隊(duì)列(AQS 的做法是將該線程的節(jié)點(diǎn)的線程以及前驅(qū)節(jié)點(diǎn)置空,并設(shè)置該節(jié)點(diǎn)為頭結(jié)點(diǎn);然后將原本的頭結(jié)點(diǎn)置空交給輔助GC回收即可),移除隊(duì)列后會將 failed 標(biāo)志置為 false,表示無需整理隊(duì)列并返回中斷標(biāo)志(因?yàn)椴]有中斷請求所以這里中斷標(biāo)志是 false,表示無需執(zhí)行中斷)。
  • 假設(shè)當(dāng)前的線程是隊(duì)列的第一個(gè)線程但是執(zhí)行該方法時(shí)共享資源仍被占用中或爭搶失?。ㄒ?yàn)槭欠枪芥i)或者當(dāng)前線程不是隊(duì)列第一個(gè)線程時(shí),因?yàn)榍懊娴臈l件無法滿足,便會去執(zhí)行 shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt() 方法,下面查看這兩個(gè)方法的源碼。

shouldParkAfterFailedAcquire() 方法:更改前置節(jié)點(diǎn)的狀態(tài)為 SIGNAL

該方法是 AQS 中的方法,查看源碼:

// 傳參 pred 為當(dāng)前節(jié)點(diǎn)的前置節(jié)點(diǎn);node 為 當(dāng)前節(jié)點(diǎn)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 獲取前置節(jié)點(diǎn)的狀態(tài)
    int ws = pred.waitStatus;
    // 判斷是不是 Node.SIGNAL(-1)
    if (ws == Node.SIGNAL)
        // 狀態(tài)為-1表示線程已經(jīng)準(zhǔn)備好了(阻塞等待了),就等資源釋放進(jìn)行爭搶了
        // 返回 true 執(zhí)行 parkAndCheckInterrupt() 方法(這玩意就是阻塞線程)
        return true;
    // 判斷狀態(tài)是否大于0,表示被中斷
    if (ws > 0) {
        //前置節(jié)點(diǎn)為被中斷,意味著這個(gè)節(jié)點(diǎn)沒用了,需要再往前找,找到不是被中斷的狀態(tài)為止
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        // 找到后將新的前置節(jié)點(diǎn)與當(dāng)前節(jié)點(diǎn)鏈接起來
        pred.next = node;
    } else {
        // CAS 方式將前置節(jié)點(diǎn)的狀態(tài)設(shè)置為 Node.SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    // 返回 false 直接跳出
    return false;
}

這個(gè)方法就是當(dāng)循環(huán)無法搶占到資源時(shí),便會去更新當(dāng)前線程節(jié)點(diǎn)的前置節(jié)點(diǎn)狀態(tài),更新完?duì)顟B(tài)之后等待下一次循環(huán)如果還是搶占不到資源,就會去執(zhí)行 parkAndCheckInterrupt() ,使用 LockSupport 把當(dāng)前線程阻塞等待喚醒。

parkAndCheckInterrupt() 方法:阻塞當(dāng)前線程

該方法是 AQS 中的方法,查看源碼:

private final boolean parkAndCheckInterrupt() {
    // 調(diào)用 LockSupport 類進(jìn)行阻塞當(dāng)前線程
    LockSupport.park(this);
    // 返回當(dāng)前線程是否中斷,同時(shí)清除中斷標(biāo)志位
    return Thread.interrupted();
}

這里也就是前文提到的 AQS 對 CLH 隊(duì)列鎖的改進(jìn),不再通過自旋鎖的方式輪詢前一個(gè)節(jié)點(diǎn)的狀態(tài),而是嘗試兩次搶占后還是搶占不到就進(jìn)入阻塞狀態(tài),等待資源釋放后喚醒,這樣做可以減少 CPU 開銷。

該方法返回的是當(dāng)前線程是否被中斷的結(jié)果,被中斷則返回 true 使得前面的 acquireQueued() 方法將中斷標(biāo)志置為 true,在搶占到資源后會根據(jù)中斷標(biāo)志最終進(jìn)入語句執(zhí)行 selfInterrupt() 方法將當(dāng)前線程中斷。

cancelAcquire 方法:當(dāng)前線程出現(xiàn)異常時(shí)整理隊(duì)列移除當(dāng)前線程節(jié)點(diǎn)

該方法是 AQS 中的方法,查看源碼:

// 參數(shù)為當(dāng)前節(jié)點(diǎn)
private void cancelAcquire(Node node) {
    // 當(dāng)前節(jié)點(diǎn)為空直接返回
    if (node == null)
        return;
    // 將當(dāng)前節(jié)點(diǎn)的線程置空
    node.thread = null;
    // 這一步是找到當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)
    Node pred = node.prev;
    // 如果前驅(qū)節(jié)點(diǎn)狀態(tài)也為 CANCELLED 則循環(huán)找前驅(qū)節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)直到狀態(tài)不為 CANCELLED
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
    // 獲取前驅(qū)節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)
    Node predNext = pred.next;
    // 將當(dāng)前節(jié)點(diǎn)設(shè)置為 CANCELLED
    node.waitStatus = Node.CANCELLED;
    // 判斷當(dāng)前節(jié)點(diǎn)是否是末尾節(jié)點(diǎn),是則只需要將末尾節(jié)點(diǎn)設(shè)置成獲取到的前驅(qū)節(jié)點(diǎn)即可
    // 相當(dāng)于移除掉了前驅(qū)節(jié)點(diǎn)后的所有節(jié)點(diǎn)
    if (node == tail && compareAndSetTail(node, pred)) {
        // 并將前驅(qū)節(jié)點(diǎn)的下一節(jié)點(diǎn)置空
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
        // 滿足三個(gè)條件
        // 1. 前驅(qū)節(jié)點(diǎn)不是頭結(jié)點(diǎn)
        // 2. 前驅(qū)節(jié)點(diǎn)的狀態(tài)為 SIGNAL 或者 前驅(qū)節(jié)點(diǎn)狀態(tài)小于0并且可以設(shè)置為 SIGNAL
        // 3. 前驅(qū)節(jié)點(diǎn)的線程不為空
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            // 保存當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)
            Node next = node.next;
            // 后繼節(jié)點(diǎn)不為空并且狀態(tài)不為 CANCELLED
            if (next != null && next.waitStatus <= 0)
                // 將前驅(qū)節(jié)點(diǎn)與后繼節(jié)點(diǎn)鏈接起來,移除中間的節(jié)點(diǎn)
                compareAndSetNext(pred, predNext, next);
        } else {
            // 以上條件不滿足則喚醒當(dāng)前線程的下一個(gè)節(jié)點(diǎn)
            unparkSuccessor(node);
        }
        // 輔助GC回收
        node.next = node; // help GC
    }
}

執(zhí)行過程:

當(dāng)前線程不能正常退出時(shí)執(zhí)行該方法,也就是出現(xiàn)異常時(shí)執(zhí)行該方法

先判斷該線程節(jié)點(diǎn)是否為空,為空則直接返回;不為空則將當(dāng)前節(jié)點(diǎn)的線程置空

通過循環(huán)的方式找到當(dāng)前節(jié)點(diǎn)的前置節(jié)點(diǎn)(這個(gè)節(jié)點(diǎn)狀態(tài)不能為 CANCELLED),并記錄前置節(jié)點(diǎn)的下一節(jié)點(diǎn)

將當(dāng)前節(jié)點(diǎn)狀態(tài)置為 CANCELLED,分情況討論當(dāng)前節(jié)點(diǎn)的位置

  • 第一種:當(dāng)前節(jié)點(diǎn)為末尾節(jié)點(diǎn),這種情況只需要將當(dāng)前節(jié)點(diǎn)的前置節(jié)點(diǎn)設(shè)置為末尾節(jié)點(diǎn),并將前置節(jié)點(diǎn)的下一節(jié)點(diǎn)置空即可移除異常節(jié)點(diǎn),異常節(jié)點(diǎn)最后通過輔助GC回收
  • 第二種:當(dāng)前節(jié)點(diǎn)位于頭結(jié)點(diǎn)和尾結(jié)點(diǎn)之間,這種情況需要獲取當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn),再當(dāng)前節(jié)點(diǎn)后繼節(jié)點(diǎn)不為空時(shí),將前置節(jié)點(diǎn)與后繼節(jié)點(diǎn)鏈接起來即可移除異常節(jié)點(diǎn),最后通過輔助GC回收
  • 第三種:當(dāng)前節(jié)點(diǎn)為頭結(jié)點(diǎn),此時(shí)調(diào)用 unparkSuccessor() 方法喚醒當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)即可,這樣在資源被搶占后會將頭結(jié)點(diǎn)設(shè)置為后繼節(jié)點(diǎn),當(dāng)前節(jié)點(diǎn)也就會被置空并通過輔助GC回收

執(zhí)行流程圖

4. AQS 如何實(shí)現(xiàn)線程喚醒機(jī)制

假設(shè)當(dāng)前 AQS 的 state 同步狀態(tài)為 1,表示當(dāng)前有線程占用共享資源。

lock.unlock() 開始看 AQS 如何實(shí)現(xiàn)多線程搶占式下的線程喚醒

深入源碼,調(diào)用了 ReentrantLockunlock() 方法,查看源碼:

進(jìn)入 release() 方法,也就是 AQS 的 release() 方法,查看源碼:

可以看到 AQS 的 release() 先調(diào)用 ReentrantLock 重寫的 tryRelease() 進(jìn)行判斷。

tryRelease() 方法:自定義釋放資源邏輯

查看 ReentrantLock 中的源碼:

protected final boolean tryRelease(int releases) {
    // 傳入releases 為 1, getState() 為 1
    // 所以 c = 1 - 1 = 0
    int c = getState() - releases;
    // 判斷當(dāng)前線程是否是共享資源所有者,不是則拋出異常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 判斷c是否等于0,共享資源是否空閑
    if (c == 0) {
        // 更新free標(biāo)志
        free = true;
        // 將共享資源所有者置空
        setExclusiveOwnerThread(null);
    }
    // 更新state
    setState(c);
    // 返回free標(biāo)志
    return free;
}

這里是可重入鎖 ReentrantLock 自定義的釋放資源邏輯,過程較為易懂,就是將 當(dāng)前的同步狀態(tài)變量−參數(shù)releases當(dāng)前的同步狀態(tài)變量 - 參數(shù)releases當(dāng)前的同步狀態(tài)變量−參數(shù)releases 得到新的同步狀態(tài)變量。如果為 0 說明共享資源已被當(dāng)前線程釋放返回 true。

返回 release 方法體,進(jìn)入 if 語句,判斷頭結(jié)點(diǎn)不為空并且狀態(tài)不為 0,表示隊(duì)列存在并且頭結(jié)點(diǎn)的后繼節(jié)點(diǎn)已經(jīng)準(zhǔn)備好獲取共享資源了,進(jìn)入 unparkSuccessor() 方法。

unparkSuccessor 方法:喚醒后繼節(jié)點(diǎn)的線程

private void unparkSuccessor(Node node) {
    // 獲取當(dāng)前結(jié)點(diǎn)的狀態(tài)
    int ws = node.waitStatus;
    // 判斷是否小于0,重新將狀態(tài)置為0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    // 獲取當(dāng)前結(jié)點(diǎn)的后繼節(jié)點(diǎn)
    Node s = node.next;
    // 如果后繼節(jié)點(diǎn)為空或者后繼節(jié)點(diǎn)狀態(tài)為 CANCELLED
    if (s == null || s.waitStatus > 0) {
        // 后繼節(jié)點(diǎn)置空
        s = null;
        // 通過從后向前遍歷找到離當(dāng)前節(jié)點(diǎn)最近的后一個(gè)節(jié)點(diǎn)并且該節(jié)點(diǎn)狀態(tài)小于0
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                // 如果狀態(tài)小于0說明已經(jīng)準(zhǔn)備好獲取資源了,替換掉原先的后繼節(jié)點(diǎn)
                s = t;
    }
    // 后繼節(jié)點(diǎn)不為空
    if (s != null)
        // 喚醒該后繼節(jié)點(diǎn)的線程
        LockSupport.unpark(s.thread);
}

該方法就是獲取傳入節(jié)點(diǎn)的后繼節(jié)點(diǎn),并且保證該節(jié)點(diǎn)的狀態(tài)小于0即已經(jīng)準(zhǔn)備好獲取共享資源了,通過 LockSupportunpark() 方法喚醒后繼節(jié)點(diǎn)的線程。

到這里就是 AQS 實(shí)現(xiàn)線程喚醒的全部過程了,AQS 的線程喚醒機(jī)制是通過上一個(gè)節(jié)點(diǎn)來喚醒當(dāng)前節(jié)點(diǎn)的線程。

以上就是一文帶你深入理解Java AbstractQueuedSynchronizer的詳細(xì)內(nèi)容,更多關(guān)于Java AbstractQueuedSynchronizer的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • mybatisplus實(shí)現(xiàn)自動創(chuàng)建/更新時(shí)間的項(xiàng)目實(shí)踐

    mybatisplus實(shí)現(xiàn)自動創(chuàng)建/更新時(shí)間的項(xiàng)目實(shí)踐

    Mybatis-Plus提供了自動填充功能,可以通過實(shí)現(xiàn)MetaObjectHandler接口來實(shí)現(xiàn)自動更新時(shí)間的功能,本文就來介紹一下mybatisplus實(shí)現(xiàn)自動創(chuàng)建/更新時(shí)間的項(xiàng)目實(shí)踐,感興趣的可以了解下
    2024-01-01
  • Java中關(guān)于ThreadLocal的隱式引用詳解

    Java中關(guān)于ThreadLocal的隱式引用詳解

    這篇文章主要介紹了Java中關(guān)于ThreadLocal的隱式引用,從線程的角度看,每個(gè)線程都保持一個(gè)對其線程局部變量副本的隱式引用,只要線程是活動的,ThreadLocal實(shí)例就是可訪問的,下面我們來具體看看
    2024-03-03
  • Java中的HashMap和ConcurrentHashMap區(qū)別和適用場景

    Java中的HashMap和ConcurrentHashMap區(qū)別和適用場景

    HashMap和ConcurrentHashMap在對null值的處理、線程安全性、性能等方面存在顯著的區(qū)別,HashMap允許鍵和值為null,適用于單線程環(huán)境下的數(shù)據(jù)存儲和查詢場景;而ConcurrentHashMap不允許鍵和值為null,適用多線程環(huán)境下的數(shù)據(jù)存儲和查詢場景,具有線程安全性和較高的并發(fā)性能
    2025-01-01
  • spring中的注解@@Transactional失效的場景代碼演示

    spring中的注解@@Transactional失效的場景代碼演示

    這篇文章主要介紹了spring中的注解@@Transactional失效的場景代碼演示,@Transactional注解是Spring框架提供的用于聲明事務(wù)的注解,作用于類和方法上,需要的朋友可以參考下
    2024-01-01
  • Java IO字符流緩沖區(qū)實(shí)現(xiàn)原理解析

    Java IO字符流緩沖區(qū)實(shí)現(xiàn)原理解析

    這篇文章主要介紹了Java IO字符流緩沖區(qū)實(shí)現(xiàn)原理解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-05-05
  • Java實(shí)現(xiàn)人機(jī)對戰(zhàn)猜拳游戲

    Java實(shí)現(xiàn)人機(jī)對戰(zhàn)猜拳游戲

    這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)人機(jī)對戰(zhàn)猜拳游戲,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2021-07-07
  • JFreeChart插件實(shí)現(xiàn)的折線圖效果實(shí)例

    JFreeChart插件實(shí)現(xiàn)的折線圖效果實(shí)例

    這篇文章主要介紹了JFreeChart插件實(shí)現(xiàn)的折線圖效果,結(jié)合實(shí)例形式分析了基于JFreeChart繪制折線圖的相關(guān)實(shí)現(xiàn)技巧,需要的朋友可以參考下
    2016-08-08
  • 基于SpringBoot解決CORS跨域的問題(@CrossOrigin)

    基于SpringBoot解決CORS跨域的問題(@CrossOrigin)

    這篇文章主要介紹了基于SpringBoot解決CORS跨域的問題(@CrossOrigin),具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-01-01
  • IDEA解決src和resource下創(chuàng)建多級目錄的操作

    IDEA解決src和resource下創(chuàng)建多級目錄的操作

    這篇文章主要介紹了IDEA解決src和resource下創(chuàng)建多級目錄的操作,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-02-02
  • 查看import的類是出自哪個(gè)jar包的方法

    查看import的類是出自哪個(gè)jar包的方法

    下面小編就為大家?guī)硪黄榭磇mport的類是出自哪個(gè)jar包的方法。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2017-03-03

最新評論