java?Semaphore共享鎖實現(xiàn)原理解析
正文
在線程間通信方式中,我們了解到可以使用Semaphore信號量來實現(xiàn)線程間通信,Semaphore支持公平鎖和非公平鎖,Semaphore底層是通過共享鎖來實現(xiàn)的,其支持兩種構(gòu)造函數(shù),如下所示:
// 默認使用非公平鎖實現(xiàn)
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
?
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
Semaphore提供的常用函數(shù)如下所示:
| 函數(shù)名 | 說明 | 備注 |
|---|---|---|
| acquire | 獲取鎖 | / |
| release | 釋放鎖 | / |
下面我們來看下Semaphore內(nèi)部的實現(xiàn)原理
Semaphore內(nèi)部類及繼承關(guān)系

可以看出Semaphore和ReentrantLock實現(xiàn)原理基本一致,包含NonfairSync和FairSync兩個內(nèi)部類,這兩個內(nèi)部類的父類均為AQS,不妨大膽猜測Semaphore也是依賴AQS實現(xiàn)的,接下來我們一起來看下Semaphore獲取和釋放鎖的流程。
Semaphore.acquire流程分析(以非公平鎖為例)

從上圖可以看出,針對阻塞線程的部分實現(xiàn),和ReentrantLock基本一致,我們不做贅述,主要來看下前半部分的源碼實現(xiàn):
// Semaphore.java
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// AbstractQueuedSynchronizer.java
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 如果線程是中斷狀態(tài),拋出異常
if (Thread.interrupted())
throw new InterruptedException();
// 嘗試獲取共享資源
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
從源碼可以看出acquire主要依賴于tryAcquireShared和doAcquireSharedInterruptibly,接下來我們來分別看下這兩塊的代碼
tryAcquireShared
// NonfairSync
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
// Sync
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
// AbstractQueuedSynchronizer.java
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
從代碼可以看出這里主要是根據(jù)申請的許可證數(shù)量,比較時否有許可證數(shù)量,如果可用許可證數(shù)量小于0,則直接返回,如果大于0,則通過CAS將state設(shè)置為可用許可證數(shù)量。
doAcquireSharedInterruptibly
當(dāng)tryAcquireShared中返回的可用許可證數(shù)量小于0時,執(zhí)行doAcquireSharedInterruptibly流程,代碼如下:
// AbstractQueuedSynchronizer.java
// 在隊尾新建Node對象并添加
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
// AbstractQueuedSynchronizer.java
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 將當(dāng)前線程添加到等待隊列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// for循環(huán)自旋
for (;;) {
// 獲取node的前一個節(jié)點
final Node p = node.predecessor();
// 如果前一個節(jié)點是頭節(jié)點
if (p == head) {
// 嘗試獲取鎖
int r = tryAcquireShared(arg);
if (r >= 0) {
// 獲取鎖成功,更新node信息設(shè)置為頭節(jié)點,并通知其他節(jié)點
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 判斷是否需要阻塞線程,設(shè)置waitStatus并阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
執(zhí)行setHeadAndPropagate的主要目的在于,這里能獲取到說明在該線程自旋過程中有線程釋放了許可證,釋放的許可證數(shù)量有可能還有剩余,所以傳遞給其他節(jié)點的線程,喚醒其他阻塞狀態(tài)的線程也嘗試去獲取許可證。
Semaphore.release流程分析(以非公平鎖為例)

Semaphore.release流程相對而言,就比較簡單,將release傳遞到AQS內(nèi)部通過CAS更新許可證數(shù)量信息,更新完成后,遍歷隊列中Node節(jié)點,將Node waitStatus設(shè)置為0,并對對應(yīng)線程執(zhí)行unpark,相關(guān)代碼如下:
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// 通過CAS更新許可證數(shù)量
if (compareAndSetState(current, next))
return true;
}
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
// 許可證數(shù)量更新完成后,調(diào)用該方法喚醒線程
private void doReleaseShared() {
// 自旋
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 喚醒后繼節(jié)點線程搶占許可證
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
綜上,我們分析了Smaphore非公平鎖的實現(xiàn),感興趣的可以分析下公平鎖的實現(xiàn),其本質(zhì)區(qū)別在于在tryAcquireShared中只有當(dāng)?shù)却犃袨榭諘r,才會去嘗試更新剩余許可證數(shù)量。
以上就是Semaphore共享鎖實現(xiàn)原理解析的詳細內(nèi)容,更多關(guān)于Semaphore共享鎖的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java Chassis3過載狀態(tài)下的快速失敗解決分析
本文解密了Java Chassis 3快速失敗相關(guān)的機制和背后故事,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2024-01-01
springboot vue組件開發(fā)實現(xiàn)接口斷言功能
這篇文章主要為大家介紹了springboot+vue組件開發(fā)實現(xiàn)接口斷言功能,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-05-05
Myeclipse鏈接Oracle等數(shù)據(jù)庫時lo exception: The Network Adapter coul
今天小編就為大家分享一篇關(guān)于Myeclipse鏈接Oracle等數(shù)據(jù)庫時lo exception: The Network Adapter could not establish the connection,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2019-03-03

