Java中的Semaphore源碼分析
(一)概念簡介
Semaphore是一個訪問公共資源的線程數(shù)量如限流、停車等,它是一個基于AQS實(shí)現(xiàn)的共享鎖,主要是通過控制state變量來實(shí)現(xiàn)。
其內(nèi)部結(jié)構(gòu)關(guān)系為:Semaphore內(nèi)部是通過一個內(nèi)部核心成員變量sync去調(diào)用AQS中父類的方法,NoneFairSync/FairSync繼承于內(nèi)部類Sync,Sync繼承于AQS,在使用Semaphore構(gòu)造方法進(jìn)行實(shí)例化時可指定公平或非公平,其內(nèi)部主要是靠acquire和release方法進(jìn)行阻塞或釋放。
(二)使用場景
當(dāng)主線程進(jìn)行執(zhí)行時,利用構(gòu)造方法初始化一個公平或非公平的線程訪問總數(shù),子線程調(diào)用acquire嘗試獲取訪問資源即訪問令牌,待到線程訪問總數(shù)不夠分配即分配出現(xiàn)負(fù)數(shù)時則進(jìn)行阻塞,當(dāng)其他占用資源被釋放時,會調(diào)用release方法進(jìn)行喚醒阻塞中的線程。
(1)經(jīng)典停車位或餐廳,因位置有限無法容納更多。
(2)數(shù)據(jù)庫連接數(shù)限制或控制系統(tǒng)并發(fā)如限流;
(三)特點(diǎn)
(1)子線程調(diào)用acquire方法去資源總數(shù)中分配,如果分配成功則不會阻塞,否則會被阻塞,等待被喚醒;
(2)當(dāng)一個子線程任務(wù)執(zhí)行結(jié)束,會通過release方法去喚醒阻塞中的線程。
Semaphore簡單使用
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(5);
for (int i = 0; i < 10; i++){
int num = i;
new Thread(()->{
System.out.println("線程"+num+"初始化!");
try {
semaphore.acquire();
System.out.println("線程"+num+"拿到了鎖執(zhí)行權(quán)!");
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("------------------線程"+num+"被喚醒執(zhí)行!-----------------");
semaphore.release();
}).start();
}
System.out.println("main 線程執(zhí)行!");
}
(四)Semaphore源碼分析
(1)構(gòu)造函數(shù)
/**
* Semaphore一個參數(shù)值容量,使用非公平NonfairSync類去實(shí)例化Sync繼承的AQS中的原子變量state值
*/
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
/**
* Semaphore采用兩個參數(shù),一個容量和是否使用公平或非公平標(biāo)記來初始化Sync繼承AQS中的原子變量state值
*/
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
(2)acquire方法(核心)
/**
* 暴露給外部用于是否阻塞調(diào)用的API
* 其內(nèi)部是Semaphore內(nèi)部類(Sync)變量sync去調(diào)用acquireSharedInterruptibly
*/
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
* AQS類中確定獲取共享鎖和是否阻塞
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())//判斷執(zhí)行線程是否有中斷標(biāo)記
throw new InterruptedException();
/**
* AQS定義了共享鎖方法并強(qiáng)制子類去重寫該方法(模板模式)
* 由其子類NonfairSync繼承內(nèi)部類Sync,Sync繼承于AQS類,最終由NoneFairSync去重寫
* NoneFairSync又調(diào)用了父類Sync中的nonfairTryAcquireShared方法確定是否需要阻塞
*/
if (tryAcquireShared(arg) < 0)//嘗試獲取共享鎖
doAcquireSharedInterruptibly(arg);//在未標(biāo)記中斷前提下,是否需要真正阻塞
}
NoneFairSync類:繼承于Semaphore中的內(nèi)部類Sync
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
Sync類:
//自旋的模式對state進(jìn)行-1并返回state-1后的值
final int nonfairTryAcquireShared(int acquires) {
/**
* (1)自旋保證本次對state進(jìn)行-1
* (2)state-1后的值小于0的情況下對state進(jìn)行CAS設(shè)置新的值
*/
for (;;) {
int available = getState();//獲取state值
int remaining = available - acquires;//當(dāng)前線程對其-1
//如果state-1后的值小于0代表無容量可用
//如果state-1后的值大于等于0,則代表共享鎖還有位置可用,并設(shè)置新的state值
if (remaining < 0 || compareAndSetState(available, remaining))
return remaining;//返回state-1后的值
}
}
(3)doAcquireSharedInterruptibly方法(核心)
/**
* AQS中定義的是否阻塞方法
* 如果在自旋過程中獲取到了共享鎖,則不進(jìn)行阻塞,這也是非公平的原因之一
* 如果在自旋中未獲取到共享鎖則shouldParkAfterFailedAcquire進(jìn)行更改等待狀態(tài)
* 在成功改變等待線程信號量后再調(diào)用parkAndCheckInterrupt是否阻塞
* 上述的parkAndCheckInterrupt在被喚醒之前的一段時間內(nèi),如果存在中斷標(biāo)記則會拋出異常,否則正常執(zhí)行
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);//以共享鎖模式創(chuàng)建隊(duì)列
boolean failed = true;
try {
for (;;) {//自旋獲取共享鎖或阻塞
final Node p = node.predecessor();//獲取當(dāng)前線程的前繼節(jié)點(diǎn)
if (p == head) {//前繼節(jié)點(diǎn)是否為頭節(jié)點(diǎn)
int r = tryAcquireShared(arg);//嘗試再次獲取共享鎖
if (r >= 0) {//獲取到了共享鎖
setHeadAndPropagate(node, r);//設(shè)置新的頭節(jié)點(diǎn)和喚醒后繼節(jié)點(diǎn)
p.next = null; // help GC
failed = false;//不需要執(zhí)行婁底的cancelAcquire方法
return;
}
}
/**
* shouldParkAfterFailedAcquire改變前繼節(jié)點(diǎn)的等待狀態(tài)信號量
* parkAndCheckInterrupt真正阻塞該線程,使用LockSupport的park方法
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())//改變等前繼節(jié)點(diǎn)的等待狀態(tài)信號量和阻塞該線程
throw new InterruptedException();
}
} finally {
if (failed)//是否需要執(zhí)行保底方法
//當(dāng)該線程有中斷標(biāo)記或不需要阻塞時發(fā)生異常取消獲取鎖和過濾超時節(jié)點(diǎn)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/**
* propagate > 0表示可以嘗試喚醒node結(jié)點(diǎn)的后繼結(jié)點(diǎn),可能是-3那個線程進(jìn)行導(dǎo)致其大于0
* (h = head) == null || h.waitStatus < 0(此時的h被重新賦值)可能會引起沒必要的喚醒操作
* 比如線程A任務(wù)結(jié)束后釋放許可,但是線程B任務(wù)還沒結(jié)束,此時線程C獲取到許可走到這里
* 執(zhí)行完上面的setHead后,然后 h = head 即h執(zhí)行線程C的結(jié)點(diǎn),
* 而線程C對應(yīng)的結(jié)點(diǎn)的 waitStatus = SIGNAL,所以也會執(zhí)行doReleaseShared喚醒線程D
* 線程D喚醒后接著去執(zhí)行 doAcquireSharedInterruptibly 中的for循環(huán),
* 執(zhí)行tryAcquireShared去拿許可證的時候發(fā)現(xiàn)是小于0,后面繼續(xù)走掛起方法去掛起線程D
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();//喚醒等待中的線程
}
}
(4)release方法(核心)
/**
* 暴露給外部調(diào)用釋放API
* Semaphore內(nèi)部核心成員變量sync調(diào)用AQS中的releaseShared方法進(jìn)行釋放喚醒
*/
public void release() {
sync.releaseShared(1);//調(diào)用AQS的釋放并喚醒方法
}
AQS類:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//強(qiáng)制子類Sync重寫該方法,模板模式
doReleaseShared();//調(diào)用AQS自身釋放共享鎖
return true;
}
return false;
}
Sync類:強(qiáng)制重寫父類AQS中的嘗試釋放鎖方法
protected final boolean tryReleaseShared(int releases) {
for (;;) {//自旋操作來喚醒
int current = getState();//獲取state值
int next = current + releases;//將state值+1
if (next < current) //Integer.MAX是否達(dá)到最大,防止溢出如最大數(shù)+1變?yōu)樨?fù)數(shù)
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))//CAS改變state值
return true;
}
}
(5)doReleaseShared方法(核心)
/**
* 嘗試喚醒等待線程
* (1)正常喚醒頭結(jié)點(diǎn)的后繼節(jié)點(diǎn)線程
* (2)可能伴隨著多個共享鎖被釋放,為了防止空閑許可浪費(fèi),會喚醒頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)的后繼節(jié)點(diǎn)
*/
private void doReleaseShared() {
for (;;) {
Node h = head;//獲取隊(duì)列中的頭結(jié)點(diǎn)
if (h != null && h != tail) {//判斷隊(duì)列中是否還有等待線程
int ws = h.waitStatus;//獲取頭結(jié)點(diǎn)的等待狀態(tài)
if (ws == Node.SIGNAL) {//可喚醒狀態(tài)
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//將頭結(jié)點(diǎn)等待狀態(tài)設(shè)置為0
continue;//改變頭結(jié)點(diǎn)等待狀態(tài)失敗,則跳過本次操作,自旋再次設(shè)置
unparkSuccessor(h);//在可喚醒狀態(tài)下且改變頭節(jié)點(diǎn)等待狀態(tài)成功的前提下進(jìn)行喚醒后繼節(jié)點(diǎn)
}
/**
* ws等于0,然后設(shè)置waitStatus為Node.PROPAGATE,表示在自旋過程同時有多個鎖都在被釋放
* 線程A執(zhí)行上面的cas操作將頭結(jié)點(diǎn)等待狀態(tài)設(shè)置為0
* 此時的線程B剛好在執(zhí)行上面if不滿足時則執(zhí)行else if邏輯,將頭結(jié)點(diǎn)狀態(tài)設(shè)置為-3,
* 主要是方便被喚醒線程自旋執(zhí)行doAcquireSharedInterruptibly中的setHeadAndPropagate方法
* 即表示要喚醒head后繼結(jié)點(diǎn)的后繼結(jié)點(diǎn)
* waitStatus=PROPAGATE就表示要喚醒head后繼結(jié)點(diǎn)的后繼結(jié)點(diǎn)
* 線程A和B都釋放許可了,如果有多個等待線程在等待喚醒,在setHeadAndPropagate方法中會有邏輯判斷
* 防止已經(jīng)有2張?jiān)S可卻只有線程C拿到許可,線程D還在傻乎乎的等線程C釋放許可來喚醒線程D
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;//CAS設(shè)置失敗,自旋重新設(shè)置
}
if (h == head)//結(jié)束死循環(huán)
break;
}
}
到此這篇關(guān)于Java中的Semaphore源碼分析的文章就介紹到這了,更多相關(guān)Semaphore源碼分析內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
使用 Redis 緩存實(shí)現(xiàn)點(diǎn)贊和取消點(diǎn)贊的示例代碼
這篇文章主要介紹了使用 Redis 緩存實(shí)現(xiàn)點(diǎn)贊和取消點(diǎn)贊的實(shí)現(xiàn)示例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-03-03
SpringBoot?Http遠(yuǎn)程調(diào)用的方法
這篇文章主要為大家詳細(xì)介紹了SpringBoot?Http遠(yuǎn)程調(diào)用的方法,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-08-08
Spring-boot JMS 發(fā)送消息慢的解決方法
這篇文章主要為大家詳細(xì)介紹了Spring-boot JMS 發(fā)送消息慢的解決方法,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-08-08
基于javassist進(jìn)行動態(tài)編程過程解析
這篇文章主要介紹了基于javassist進(jìn)行動態(tài)編程過程解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-05-05
Java實(shí)現(xiàn)滑動驗(yàn)證碼的示例代碼
這篇文章主要為大家介紹了如何用Java語言實(shí)現(xiàn)滑動驗(yàn)證碼的生成,項(xiàng)目采用了springboot,maven等技術(shù),感興趣的小伙伴可以跟隨小編學(xué)習(xí)一下2022-02-02
mybatis攔截器無法注入spring bean的問題解決
本文主要介紹了mybatis攔截器無法注入spring bean的問題解決,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-02-02
Java?MyBatis實(shí)戰(zhàn)之QueryWrapper中and和or拼接技巧大全
在Java中QueryWrapper是MyBatis-Plus框架中的一個查詢構(gòu)造器,它提供了豐富的查詢方法,其中包括and和or方法,可以用于構(gòu)建復(fù)雜的查詢條件,這篇文章主要給大家介紹了關(guān)于Java?MyBatis實(shí)戰(zhàn)之QueryWrapper中and和or拼接技巧的相關(guān)資料,需要的朋友可以參考下2024-07-07

