Java?AQS中ReentrantLock條件鎖的使用
一.什么是AQS
1.定義
java.util.concurrent包中的大多數(shù)同步器實現(xiàn)都是圍繞著共同的基礎(chǔ)行為,比如等待隊列、條件隊列、獨占獲取、共享獲取等,而這些行為的抽象就是基于AbstractQueuedSynchronizer(簡稱AQS)實現(xiàn)的,AQS是一個抽象同步框架,可以用來實現(xiàn)一個依賴狀態(tài)的同步器。
JDK中提供的大多數(shù)的同步器如Lock, Latch, Barrier等,都是基于AQS框架來實現(xiàn)的。

- 一般都是通過一個內(nèi)部類sync繼承AQS
- 將同步器所有調(diào)用都同步到Sync對應(yīng)的方法
2.特性
- 阻塞等待隊列
- 共享/獨占
- 公平/非公平
- 可重入
- 允許中斷
3.屬性
內(nèi)部維護屬性volatile int state,表示資源的可用狀態(tài)
- getState()
- setState()
- compareAndSetState()
4.資源共享方式
- Exclusive-獨占,只有一個線程能執(zhí)行,如ReentrantLock
- Share-共享,多個線程可以同時執(zhí)行,如Semaphore/CountDownLatch
5.兩種隊列
- 同步等待隊列: 主要用于維護獲取鎖失敗時入隊的線程
- 條件等待隊列: 調(diào)用await()的時候會釋放鎖,然后線程會加入到條件隊列,調(diào)用signal()喚醒的時候會把條件隊列中的線程節(jié)點移動到同步隊列中,等待再次獲得鎖
6.隊列節(jié)點狀態(tài)
- 值為0,初始化狀態(tài),表示當(dāng)前節(jié)點在sync隊列中,等待著獲取鎖。
- CANCELLED,值為1,表示當(dāng)前的線程被取消;
- SIGNAL,值為-1,表示當(dāng)前節(jié)點的后繼節(jié)點包含的線程需要運行,也就是unpark;
- CONDITION,值為-2,表示當(dāng)前節(jié)點在等待condition,也就是在condition隊列中;
- PROPAGATE,值為-3,表示當(dāng)前場景下后續(xù)的acquireShared能夠得以執(zhí)行;
7.實現(xiàn)方法
自定義同步器實現(xiàn)時主要實現(xiàn)以下幾種方法:
- isHeldExclusively():該線程是否正在獨占資源。只有用到condition才需要去實現(xiàn)它。
- tryAcquire(int):獨占方式。嘗試獲取資源,成功則返回true,失敗則返回false。
- tryRelease(int):獨占方式。嘗試釋放資源,成功則返回true,失敗則返回false。
- tryAcquireShared(int):共享方式。嘗試獲取資源。負數(shù)表示失?。?表示成功,但沒有剩余可用資源;正數(shù)表示成功,且有剩余資源。
- tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放后允許喚醒后續(xù)等待結(jié)點返回true,否則返回false。
二.等待隊列
1.同步等待隊列
AQS當(dāng)中的同步等待隊列也稱CLH隊列,CLH隊列是Craig、Landin、Hagersten三人發(fā)明的一種基于雙向鏈表數(shù)據(jù)結(jié)構(gòu)的隊列,是FIFO先進先出線程等待隊列,Java中的CLH隊列是原CLH隊列的一個變種,線程由原自旋機制改為阻塞機制。
AQS 依賴CLH同步隊列來完成同步狀態(tài)的管理:
- 當(dāng)前線程如果獲取同步狀態(tài)失敗時,AQS則會將當(dāng)前線程已經(jīng)等待狀態(tài)等信息構(gòu)造成一個節(jié)點(Node)并將其加入到CLH同步隊列,同時會阻塞當(dāng)前線程
- 當(dāng)同步狀態(tài)釋放時,會把首節(jié)點喚醒(公平鎖),使其再次嘗試獲取同步狀態(tài)。
- 通過signal或signalAll將條件隊列中的節(jié)點轉(zhuǎn)移到同步隊列。(由條件隊列轉(zhuǎn)化為同步隊列)
2.條件等待隊列
AQS中條件隊列是使用單向列表保存的,用nextWaiter來連接:
- 調(diào)用await方法阻塞線程;
- 當(dāng)前線程存在于同步隊列的頭結(jié)點,調(diào)用await方法進行阻塞(從同步隊列轉(zhuǎn)化到條件隊列)
三.condition接口

- 調(diào)用Condition#await方法會釋放當(dāng)前持有的鎖,然后阻塞當(dāng)前線程,同時向Condition隊列尾部添加一個節(jié)點,所以調(diào)用Condition#await方法的時候必須持有鎖。
- 調(diào)用Condition#signal方法會將Condition隊列的首節(jié)點移動到阻塞隊列尾部,然后喚醒因調(diào)用Condition#await方法而阻塞的線程(喚醒之后這個線程就可以去競爭鎖了),所以調(diào)用Condition#signal方法的時候必須持有鎖,持有鎖的線程喚醒被因調(diào)用Condition#await方法而阻塞的線程。
public static void main(String[] args) {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
new Thread(() -> {
lock.lock();
try {
log.debug(Thread.currentThread().getName() + " 開始處理任務(wù)");
//會釋放當(dāng)前持有的鎖,然后阻塞當(dāng)前線程
condition.await();
log.debug(Thread.currentThread().getName() + " 結(jié)束處理任務(wù)");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}).start();
new Thread(() -> {
lock.lock();
try {
log.debug(Thread.currentThread().getName() + " 開始處理任務(wù)");
Thread.sleep(2000);
//喚醒因調(diào)用Condition#await方法而阻塞的線程
condition.signal();
log.debug(Thread.currentThread().getName() + " 結(jié)束處理任務(wù)");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}).start();
}
Thread-0 開始處理任務(wù)
Thread-1 開始處理任務(wù)
Thread-1 結(jié)束處理任務(wù)
Thread-0 結(jié)束處理任務(wù)
四.ReentrantLock
1.ReentrantLock是什么
ReentrantLock是一種基于AQS框架的應(yīng)用實現(xiàn),是JDK中的一種線程并發(fā)訪問的同步手段,它的功能類似于synchronized是一種互斥鎖,可以保證線程安全。
2.特點
- 可中斷
- 可以設(shè)置超時時間
- 可以設(shè)置為公平鎖
- 支持多個條件變量
- 與 synchronized 一樣,都支持可重入
3. ReentrantLock和synchronized的區(qū)別
- synchronized是JVM層次的鎖實現(xiàn),ReentrantLock是JDK層次的鎖實現(xiàn);
- synchronized的鎖狀態(tài)是無法在代碼中直接判斷的,但是ReentrantLock可以通過ReentrantLock#isLocked判斷;
- synchronized是非公平鎖,ReentrantLock可以是公平也可以是非公平的,默認是非公平的;
- synchronized是不可以被中斷的,而ReentrantLock#lockInterruptibly方法是可以被中斷的;
- 在發(fā)生異常時synchronized會自動釋放鎖,而ReentrantLock需要開發(fā)者在finally塊中顯示釋放鎖;
- ReentrantLock獲取鎖的形式有多種:如立即返回是否成功的tryLock(),以及等待指定時長的獲取,更加靈活;
- synchronized在特定的情況下對于已經(jīng)在等待的線程是后來的線程先獲得鎖(回顧一下sychronized的喚醒策略),而ReentrantLock對于已經(jīng)在等待的線程是先來的線程先獲得鎖;
4. ReentrantLock的使用
偽代碼:
ReentrantLock lock = new ReentrantLock(); //參數(shù)默認false,不公平鎖
ReentrantLock lock = new ReentrantLock(true); //公平鎖
//加鎖
lock.lock();
try {
//臨界區(qū)
} finally {
// 解鎖
lock.unlock();
例子:基本使用
private static int sum = 0;
private static Lock lock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 3; i++) {
Thread thread = new Thread(()->{
//加鎖 一般寫在try前面
lock.lock();
try {
// 臨界區(qū)代碼 業(yè)務(wù)邏輯
for (int j = 0; j < 10000; j++) {
sum++;
}
} finally {
// 解鎖
lock.unlock();
}
});
thread.start();
}
Thread.sleep(2000);
System.out.println(sum);
}
30000
可重入
public static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
method1();
}
public static void method1() {
lock.lock();
try {
log.debug("execute method1");
method2();
} finally {
lock.unlock();
}
}
public static void method2() {
lock.lock();
try {
log.debug("execute method2");
method3();
} finally {
lock.unlock();
}
}
public static void method3() {
lock.lock();
try {
log.debug("execute method3");
} finally {
lock.unlock();
}
}
execute method1
execute method2
execute method3
可中斷
public static void main(String[] args) throws InterruptedException {
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
log.debug("t1啟動...");
try {
lock.lockInterruptibly();
try {
log.debug("t1獲得了鎖");
} finally {
lock.unlock();
}
} catch (InterruptedException e) {
e.printStackTrace();
log.debug("t1等鎖的過程中被中斷");
}
}, "t1");
lock.lock();
try {
log.debug("main線程獲得了鎖");
t1.start();
//先讓線程t1執(zhí)行
Thread.sleep(1000);
t1.interrupt();
log.debug("線程t1執(zhí)行中斷");
} finally {
lock.unlock();
}
}
main線程獲得了鎖
t1啟動…
線程t1執(zhí)行中斷
t1等鎖的過程中被中斷
鎖超時
public static void main(String[] args) throws InterruptedException {
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
log.debug("t1啟動...");
try {
//if (!lock.tryLock()) {
// log.debug("t1獲取鎖失敗,立即返回false");
// return;
//}
if (!lock.tryLock(1, TimeUnit.SECONDS)) {
log.debug("等待 1s 后獲取鎖失敗,返回");
return;
}
} catch (Exception e) {
e.printStackTrace();
return;
}
try {
log.debug("t1獲得了鎖");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
try {
log.debug("main線程獲得了鎖");
t1.start();
//先讓線程t1執(zhí)行
Thread.sleep(2000);
} finally {
lock.unlock();
}
}
main線程獲得了鎖
t1啟動…
等待 1s 后獲取鎖失敗,返回
公平鎖和非公平鎖
public static void main(String[] args) throws InterruptedException {
// ReentrantLock lock = new ReentrantLock(true); //公平鎖
ReentrantLock lock = new ReentrantLock(); //非公平鎖
for (int i = 0; i < 500; i++) {
new Thread(() -> {
lock.lock();
try {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug(Thread.currentThread().getName() + " running...");
} finally {
lock.unlock();
}
}, "t" + i).start();
}
// 1s 之后去爭搶鎖
Thread.sleep(1000);
for (int i = 0; i < 500; i++) {
new Thread(() -> {
lock.lock();
try {
log.debug(Thread.currentThread().getName() + " running...");
} finally {
lock.unlock();
}
}, "強行插入" + i).start();
}
}
條件變量
private static ReentrantLock lock = new ReentrantLock();
private static Condition cigCon = lock.newCondition();
private static Condition takeCon = lock.newCondition();
private static boolean hashcig = false;
private static boolean hastakeout = false;
//送煙
public void cigratee(){
lock.lock();
try {
while(!hashcig){
try {
log.debug("沒有煙,歇一會");
cigCon.await();
}catch (Exception e){
e.printStackTrace();
}
}
log.debug("有煙了,干活");
}finally {
lock.unlock();
}
}
//送外賣
public void takeout(){
lock.lock();
try {
while(!hastakeout){
try {
log.debug("沒有飯,歇一會");
takeCon.await();
}catch (Exception e){
e.printStackTrace();
}
}
log.debug("有飯了,干活");
}finally {
lock.unlock();
}
}
public static void main(String[] args) {
ReentrantLockDemo6 test = new ReentrantLockDemo6();
new Thread(() ->{
test.cigratee();
}).start();
new Thread(() -> {
test.takeout();
}).start();
new Thread(() ->{
lock.lock();
try {
hashcig = true;
log.debug("喚醒送煙的等待線程");
cigCon.signal();
}finally {
lock.unlock();
}
},"t1").start();
new Thread(() ->{
lock.lock();
try {
hastakeout = true;
log.debug("喚醒送飯的等待線程");
takeCon.signal();
}finally {
lock.unlock();
}
},"t2").start();
}
沒有煙,歇一會
沒有飯,歇一會
喚醒送煙的等待線程
喚醒送飯的等待線程
有煙了,干活
有飯了,干活
五.源碼解析
首先會調(diào)用lock方法
public void lock() {
sync.lock();
}
lock會調(diào)用公平方法或者非公平的方法,默認是非公平鎖方法,非公平鎖則會cas嘗試加鎖,state是不是0,是0的話就把它改為1,并設(shè)置當(dāng)前線程為獨占線程,加鎖成功,待下個線程進來時已經(jīng)變成1,則失敗阻塞。
加鎖
final void lock() {
// 看狀態(tài)是不是0,如果是0 則改為1,加鎖成功
if (compareAndSetState(0, 1))
// 并設(shè)置當(dāng)前線程為獨占線程
setExclusiveOwnerThread(Thread.currentThread());
else
//不是0則失敗阻塞
acquire(1);
}
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
加鎖失敗(入隊 阻塞)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//恢復(fù)中斷標識位
selfInterrupt();
}
首先tryAcquire 又進行了一次判斷,看是否能獲取鎖,
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//其他線程進來,狀態(tài)值是1
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 重入,將狀態(tài)值+1
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
添加進隊列
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//第一次tail為空
if (pred != null) {
//尾插法
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//tail為空則在這里創(chuàng)建隊列
enq(node);
return node;
}
創(chuàng)建隊列并且入隊
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
//創(chuàng)建隊列
if (compareAndSetHead(new Node()))
// 將頭節(jié)點指向前一節(jié)點的尾節(jié)點,這時候tail不為空了
tail = head;
} else {
//雙向接口,前一節(jié)點的尾節(jié)點也指向當(dāng)前節(jié)點的頭節(jié)點
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
阻塞
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) { //保證一定獲取鎖
//獲取head節(jié)點
final Node p = node.predecessor();
//是頭節(jié)點則嘗試獲取鎖
if (p == head && tryAcquire(arg)) {
//設(shè)置頭節(jié)點
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//獲取鎖失敗的情況,阻塞,在for循環(huán)里,第一次shouldParkAfterFailedAcquire為false,會將其設(shè)置為-1,第二次就可以阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
是否需要阻塞,把狀態(tài)設(shè)置為SIGNAL,可以被喚醒了
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//是-1了就可以去阻塞
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do { //把節(jié)點去掉
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//把狀態(tài)設(shè)置為SIGNAL,可以被喚醒了
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
真正的阻塞方法
private final boolean parkAndCheckInterrupt() {
//阻塞
LockSupport.park(this);
//清除中斷標識位,在加鎖失敗方法的后面恢復(fù)中斷標識位,可能其他地方還用到這個鎖標識位
return Thread.interrupted();
}
喚醒 unlock()
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
// 嘗試喚醒
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//喚醒阻塞的線程
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
//當(dāng)前狀態(tài)-1
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
//設(shè)置狀態(tài)
setState(c);
return free;
}
在這里喚醒
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//后面一個節(jié)點不為空 則直接喚醒當(dāng)前線程
if (s != null)
LockSupport.unpark(s.thread);
}
線程取消獲取鎖
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
//將前一個節(jié)點干掉
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
至此加鎖、解鎖、阻塞、喚醒的底層源碼都梳理完了。
到此這篇關(guān)于Java AQS中ReentrantLock條件鎖的使用的文章就介紹到這了,更多相關(guān)Java ReentrantLock條件鎖內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot URL帶有特殊字符([]/{}等),報400錯誤的解決
這篇文章主要介紹了SpringBoot URL帶有特殊字符([]/{}等),報400錯誤的解決,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-02-02
java異步編程CompletableFuture使用示例詳解
這篇文章主要為大家介紹了java異步編程CompletableFuture使用示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-11-11
Springboot詳細講解RocketMQ實現(xiàn)順序消息的發(fā)送與消費流程
RocketMQ作為一款純java、分布式、隊列模型的開源消息中間件,支持事務(wù)消息、順序消息、批量消息、定時消息、消息回溯等,本篇我們了解如何實現(xiàn)順序消息的發(fā)送與消費2022-06-06
Java的web開發(fā)中SSH框架的協(xié)作處理應(yīng)用筆記
這篇文章主要介紹了Java的web開發(fā)中SSH框架的協(xié)作處理應(yīng)用筆記,SSH是指Struts和Spring以及Hibernate的框架搭配,需要的朋友可以參考下2015-12-12

