Java中的Semaphore源碼分析
(一)概念簡介
Semaphore是一個訪問公共資源的線程數(shù)量如限流、停車等,它是一個基于AQS實現(xiàn)的共享鎖,主要是通過控制state變量來實現(xiàn)。
其內部結構關系為:Semaphore內部是通過一個內部核心成員變量sync去調用AQS中父類的方法,NoneFairSync/FairSync繼承于內部類Sync,Sync繼承于AQS,在使用Semaphore構造方法進行實例化時可指定公平或非公平,其內部主要是靠acquire和release方法進行阻塞或釋放。
(二)使用場景
當主線程進行執(zhí)行時,利用構造方法初始化一個公平或非公平的線程訪問總數(shù),子線程調用acquire嘗試獲取訪問資源即訪問令牌,待到線程訪問總數(shù)不夠分配即分配出現(xiàn)負數(shù)時則進行阻塞,當其他占用資源被釋放時,會調用release方法進行喚醒阻塞中的線程。
(1)經(jīng)典停車位或餐廳,因位置有限無法容納更多。
(2)數(shù)據(jù)庫連接數(shù)限制或控制系統(tǒng)并發(fā)如限流;
(三)特點
(1)子線程調用acquire方法去資源總數(shù)中分配,如果分配成功則不會阻塞,否則會被阻塞,等待被喚醒;
(2)當一個子線程任務執(zhí)行結束,會通過release方法去喚醒阻塞中的線程。
Semaphore簡單使用
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(5);
for (int i = 0; i < 10; i++){
int num = i;
new Thread(()->{
System.out.println("線程"+num+"初始化!");
try {
semaphore.acquire();
System.out.println("線程"+num+"拿到了鎖執(zhí)行權!");
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("------------------線程"+num+"被喚醒執(zhí)行!-----------------");
semaphore.release();
}).start();
}
System.out.println("main 線程執(zhí)行!");
}
(四)Semaphore源碼分析
(1)構造函數(shù)
/**
* Semaphore一個參數(shù)值容量,使用非公平NonfairSync類去實例化Sync繼承的AQS中的原子變量state值
*/
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
/**
* Semaphore采用兩個參數(shù),一個容量和是否使用公平或非公平標記來初始化Sync繼承AQS中的原子變量state值
*/
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
(2)acquire方法(核心)
/**
* 暴露給外部用于是否阻塞調用的API
* 其內部是Semaphore內部類(Sync)變量sync去調用acquireSharedInterruptibly
*/
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
* AQS類中確定獲取共享鎖和是否阻塞
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())//判斷執(zhí)行線程是否有中斷標記
throw new InterruptedException();
/**
* AQS定義了共享鎖方法并強制子類去重寫該方法(模板模式)
* 由其子類NonfairSync繼承內部類Sync,Sync繼承于AQS類,最終由NoneFairSync去重寫
* NoneFairSync又調用了父類Sync中的nonfairTryAcquireShared方法確定是否需要阻塞
*/
if (tryAcquireShared(arg) < 0)//嘗試獲取共享鎖
doAcquireSharedInterruptibly(arg);//在未標記中斷前提下,是否需要真正阻塞
}
NoneFairSync類:繼承于Semaphore中的內部類Sync
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
Sync類:
//自旋的模式對state進行-1并返回state-1后的值
final int nonfairTryAcquireShared(int acquires) {
/**
* (1)自旋保證本次對state進行-1
* (2)state-1后的值小于0的情況下對state進行CAS設置新的值
*/
for (;;) {
int available = getState();//獲取state值
int remaining = available - acquires;//當前線程對其-1
//如果state-1后的值小于0代表無容量可用
//如果state-1后的值大于等于0,則代表共享鎖還有位置可用,并設置新的state值
if (remaining < 0 || compareAndSetState(available, remaining))
return remaining;//返回state-1后的值
}
}
(3)doAcquireSharedInterruptibly方法(核心)
/**
* AQS中定義的是否阻塞方法
* 如果在自旋過程中獲取到了共享鎖,則不進行阻塞,這也是非公平的原因之一
* 如果在自旋中未獲取到共享鎖則shouldParkAfterFailedAcquire進行更改等待狀態(tài)
* 在成功改變等待線程信號量后再調用parkAndCheckInterrupt是否阻塞
* 上述的parkAndCheckInterrupt在被喚醒之前的一段時間內,如果存在中斷標記則會拋出異常,否則正常執(zhí)行
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);//以共享鎖模式創(chuàng)建隊列
boolean failed = true;
try {
for (;;) {//自旋獲取共享鎖或阻塞
final Node p = node.predecessor();//獲取當前線程的前繼節(jié)點
if (p == head) {//前繼節(jié)點是否為頭節(jié)點
int r = tryAcquireShared(arg);//嘗試再次獲取共享鎖
if (r >= 0) {//獲取到了共享鎖
setHeadAndPropagate(node, r);//設置新的頭節(jié)點和喚醒后繼節(jié)點
p.next = null; // help GC
failed = false;//不需要執(zhí)行婁底的cancelAcquire方法
return;
}
}
/**
* shouldParkAfterFailedAcquire改變前繼節(jié)點的等待狀態(tài)信號量
* parkAndCheckInterrupt真正阻塞該線程,使用LockSupport的park方法
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())//改變等前繼節(jié)點的等待狀態(tài)信號量和阻塞該線程
throw new InterruptedException();
}
} finally {
if (failed)//是否需要執(zhí)行保底方法
//當該線程有中斷標記或不需要阻塞時發(fā)生異常取消獲取鎖和過濾超時節(jié)點
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/**
* propagate > 0表示可以嘗試喚醒node結點的后繼結點,可能是-3那個線程進行導致其大于0
* (h = head) == null || h.waitStatus < 0(此時的h被重新賦值)可能會引起沒必要的喚醒操作
* 比如線程A任務結束后釋放許可,但是線程B任務還沒結束,此時線程C獲取到許可走到這里
* 執(zhí)行完上面的setHead后,然后 h = head 即h執(zhí)行線程C的結點,
* 而線程C對應的結點的 waitStatus = SIGNAL,所以也會執(zhí)行doReleaseShared喚醒線程D
* 線程D喚醒后接著去執(zhí)行 doAcquireSharedInterruptibly 中的for循環(huán),
* 執(zhí)行tryAcquireShared去拿許可證的時候發(fā)現(xiàn)是小于0,后面繼續(xù)走掛起方法去掛起線程D
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();//喚醒等待中的線程
}
}
(4)release方法(核心)
/**
* 暴露給外部調用釋放API
* Semaphore內部核心成員變量sync調用AQS中的releaseShared方法進行釋放喚醒
*/
public void release() {
sync.releaseShared(1);//調用AQS的釋放并喚醒方法
}
AQS類:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//強制子類Sync重寫該方法,模板模式
doReleaseShared();//調用AQS自身釋放共享鎖
return true;
}
return false;
}
Sync類:強制重寫父類AQS中的嘗試釋放鎖方法
protected final boolean tryReleaseShared(int releases) {
for (;;) {//自旋操作來喚醒
int current = getState();//獲取state值
int next = current + releases;//將state值+1
if (next < current) //Integer.MAX是否達到最大,防止溢出如最大數(shù)+1變?yōu)樨摂?shù)
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))//CAS改變state值
return true;
}
}
(5)doReleaseShared方法(核心)
/**
* 嘗試喚醒等待線程
* (1)正常喚醒頭結點的后繼節(jié)點線程
* (2)可能伴隨著多個共享鎖被釋放,為了防止空閑許可浪費,會喚醒頭節(jié)點的后繼節(jié)點的后繼節(jié)點
*/
private void doReleaseShared() {
for (;;) {
Node h = head;//獲取隊列中的頭結點
if (h != null && h != tail) {//判斷隊列中是否還有等待線程
int ws = h.waitStatus;//獲取頭結點的等待狀態(tài)
if (ws == Node.SIGNAL) {//可喚醒狀態(tài)
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//將頭結點等待狀態(tài)設置為0
continue;//改變頭結點等待狀態(tài)失敗,則跳過本次操作,自旋再次設置
unparkSuccessor(h);//在可喚醒狀態(tài)下且改變頭節(jié)點等待狀態(tài)成功的前提下進行喚醒后繼節(jié)點
}
/**
* ws等于0,然后設置waitStatus為Node.PROPAGATE,表示在自旋過程同時有多個鎖都在被釋放
* 線程A執(zhí)行上面的cas操作將頭結點等待狀態(tài)設置為0
* 此時的線程B剛好在執(zhí)行上面if不滿足時則執(zhí)行else if邏輯,將頭結點狀態(tài)設置為-3,
* 主要是方便被喚醒線程自旋執(zhí)行doAcquireSharedInterruptibly中的setHeadAndPropagate方法
* 即表示要喚醒head后繼結點的后繼結點
* waitStatus=PROPAGATE就表示要喚醒head后繼結點的后繼結點
* 線程A和B都釋放許可了,如果有多個等待線程在等待喚醒,在setHeadAndPropagate方法中會有邏輯判斷
* 防止已經(jīng)有2張許可卻只有線程C拿到許可,線程D還在傻乎乎的等線程C釋放許可來喚醒線程D
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;//CAS設置失敗,自旋重新設置
}
if (h == head)//結束死循環(huán)
break;
}
}
到此這篇關于Java中的Semaphore源碼分析的文章就介紹到這了,更多相關Semaphore源碼分析內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
使用 Redis 緩存實現(xiàn)點贊和取消點贊的示例代碼
這篇文章主要介紹了使用 Redis 緩存實現(xiàn)點贊和取消點贊的實現(xiàn)示例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2021-03-03
Spring-boot JMS 發(fā)送消息慢的解決方法
這篇文章主要為大家詳細介紹了Spring-boot JMS 發(fā)送消息慢的解決方法,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-08-08
mybatis攔截器無法注入spring bean的問題解決
本文主要介紹了mybatis攔截器無法注入spring bean的問題解決,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2022-02-02
Java?MyBatis實戰(zhàn)之QueryWrapper中and和or拼接技巧大全
在Java中QueryWrapper是MyBatis-Plus框架中的一個查詢構造器,它提供了豐富的查詢方法,其中包括and和or方法,可以用于構建復雜的查詢條件,這篇文章主要給大家介紹了關于Java?MyBatis實戰(zhàn)之QueryWrapper中and和or拼接技巧的相關資料,需要的朋友可以參考下2024-07-07

