詳解Java信號(hào)量Semaphore的原理及使用
1.Semaphore的概述
public class Semaphore extends Object implements Serializable
Semaphore來自于JDK1.5的JUC包,直譯過來就是信號(hào)量,被作為一種多線程并發(fā)控制工具來使用。
Semaphore可以控制同時(shí)訪問共享資源的線程個(gè)數(shù),線程通過 acquire方法獲取一個(gè)信號(hào)量,信號(hào)量減一,如果沒有就等待;通過release方法釋放一個(gè)信號(hào)量,信號(hào)量加一。它通過控制信號(hào)量的總數(shù)量,以及每個(gè)線程所需獲取的信號(hào)量數(shù)量,進(jìn)而控制多個(gè)線程對(duì)共享資源訪問的并發(fā)度,以保證合理的使用共享資源。相比synchronized和獨(dú)占鎖一次只能允許一個(gè)線程訪問共享資源,功能更加強(qiáng)大,有點(diǎn)類似于共享鎖!
2.Semaphore的原理
2.1 基本結(jié)構(gòu)

根據(jù)uml類圖,可以很明顯的看出來Semaphore和CountDownLatch一樣都是直接使用AQS實(shí)現(xiàn)的。區(qū)別就是Semaphore還分別實(shí)現(xiàn)了公平模式FairSync和非公平模式NonfairSync兩個(gè)內(nèi)部類。
實(shí)際上公平與非公平只是在獲取信號(hào)量的時(shí)候得到體現(xiàn),它們的釋放信號(hào)量的方法都是一樣的,這就類似于ReentrantLock:公平與非公平只是在獲取鎖的時(shí)候得到體現(xiàn),它們的釋放鎖的方法都是一樣的!或許這里有人在想,信號(hào)量是不是可以看作鎖資源呢?某些時(shí)候這么看是沒問題的,比如都是獲取了只有獲取了“信號(hào)量”或者“鎖”才能訪問共享資源,但是它們又有區(qū)別,鎖資源會(huì)和線程綁定,而信號(hào)量則不會(huì)和線程綁定。
在構(gòu)造器部分,如同CountDownLatch 構(gòu)造函數(shù)傳遞的初始化計(jì)數(shù)個(gè)數(shù)count被賦給了AQS 的state 狀態(tài)變量一樣,Semaphore的信號(hào)量個(gè)數(shù)permits同樣賦給了AQS 的state 值。
在創(chuàng)建Semaphore時(shí)可以使用一個(gè)fair變量指定是否使用公平策略,默認(rèn)是非公平的模式。公平模式會(huì)確保所有等待的獲取信號(hào)量的線程按照先進(jìn)先出的順序獲取信號(hào)量,而非公平模式則沒有這個(gè)保證。非公平模式的吞吐量比公平模式的吞吐量要高,而公平模式則可以避免線程饑餓。
/**
* 保存某個(gè)AQS子類實(shí)例
*/
private final Sync sync;
/**
* 創(chuàng)建具有給定的信號(hào)量數(shù)和非公平的公平模式的 Semaphore。
*
* @param permits 初始的可用信號(hào)量數(shù)目。此值可能為負(fù)數(shù),在這種情況下,必須在授予任何獲取信號(hào)量前進(jìn)行釋放信號(hào)量。
*/
public Semaphore(int permits) {
//默認(rèn)初始化NonfairSync實(shí)例
sync = new NonfairSync(permits);
}
/**
* 創(chuàng)建具有給定的信號(hào)量數(shù)和給定的公平設(shè)置的 Semaphore。
*
* @param permits 初始的可用信號(hào)量數(shù)目。此值可能為負(fù)數(shù),在這種情況下,必須在授予任何獲取信號(hào)量前進(jìn)行釋放信號(hào)量。
* @param fair 如果此信號(hào)量保證在爭(zhēng)用時(shí)按先進(jìn)先出的順序授予信號(hào)量,則為 true;否則為 false。
*/
public Semaphore(int permits, boolean fair) {
//根據(jù)fair參數(shù)選擇初始化一個(gè)公平FairSync類或者非公平NonfairSync類的實(shí)例
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
/**
* 非公平模式的實(shí)現(xiàn)
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
//…………其他方法后面再講
}
/**
* 公平模式的實(shí)現(xiàn)
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
//…………其他方法后面再講
}
/**
* 信號(hào)量的同步實(shí)現(xiàn)。 使用 AQS 的state狀態(tài)表示信號(hào)量。子分類為公平和非公平模式。
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
/**
* 構(gòu)造器
*
* @param permits 初始的可用信號(hào)量數(shù)目。
*/
Sync(int permits) {
//被設(shè)置為state值
setState(permits);
}
//…………其他方法后面再講
}
2.2 可中斷獲取信號(hào)量
public void acquire()
可中斷的獲取一個(gè)信號(hào)量,沒有則一直阻塞,直到在其他線程提供信號(hào)量并喚醒該線程或者線程被中斷。獲取一個(gè)信號(hào)量就立即返回,將可用的信號(hào)量數(shù)減 1。 如果調(diào)用此方法時(shí)已被中斷或者等待時(shí)被中斷,則拋出 InterruptedException,并且清除當(dāng)前線程的已中斷狀態(tài)。
public void acquire(int permits)
可中斷的獲取permits 個(gè)信號(hào)量。
內(nèi)部調(diào)用AQS的acquireSharedInterruptibly方法,這實(shí)際上就是共享式可中斷獲取資源的模版方法,因此Semaphore和CountDownLatch一樣都是基于共享資源模式。
/**
* Semaphore的acquire方法
* 從信號(hào)量獲取一個(gè)信號(hào)量,沒有則一直阻塞,直到在其他線程提供信號(hào)量并喚醒或者線程被中斷。
*
* @throws InterruptedException 如果調(diào)用此方法時(shí)已被中斷或者等待時(shí)被中斷
*/
public void acquire() throws InterruptedException {
//內(nèi)部調(diào)用AQS的acquireSharedInterruptibly方法
//這實(shí)際上就是共享式可中斷獲取資源模版方法
sync.acquireSharedInterruptibly(1);
}
/**
* 從信號(hào)量獲取permits個(gè)信號(hào)量,沒有則一直阻塞,直到在其他線程提供信號(hào)量并喚醒或者線程被中斷。
*
* @param permits 需要獲取的信號(hào)量數(shù)量
* @throws InterruptedException 如果調(diào)用此方法時(shí)已被中斷或者等待時(shí)被中斷
*/
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
//參數(shù)就是permits
sync.acquireSharedInterruptibly(permits);
}
/**
1. AQS的acquireSharedInterruptibly方法
2. 共享式可中斷獲取信號(hào)量資源的模版方法
3. 4. @param arg 需要獲取的信號(hào)量資源數(shù)量
5. @throws InterruptedException 如果調(diào)用此方法時(shí)已被中斷或者等待時(shí)被中斷
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//最開始就檢查一次,如果當(dāng)前線程是被中斷狀態(tài),直接拋出異常
if (Thread.interrupted())
throw new InterruptedException();
//調(diào)用tryAcquireShared嘗試獲取共享信號(hào)量資源,這個(gè)方法是子類自己重寫的
//如果返回值小于0,表示當(dāng)前線程共享信號(hào)量資源失敗,否則表示成功
//Semaphore的FairSync和NonfairSync對(duì)tryAcquireShared分別做出了公平和不公平的實(shí)現(xiàn)
if (tryAcquireShared(arg) < 0)
//獲取不到就執(zhí)行doAcquireSharedInterruptibly方法
doAcquireSharedInterruptibly(arg);
}
在獲取共享信號(hào)量資源的時(shí)候,Semaphore還實(shí)現(xiàn)了公平模式和非公平模式!它們的實(shí)現(xiàn)實(shí)際上和lock鎖的實(shí)現(xiàn)中鎖資源的公平、非公平獲取非常類似!
2.1.1 公平模式
公平模式調(diào)用FairSync的tryAcquireShared方法!
如果我們學(xué)習(xí)了AQS、ReentrantLock、ReadWriteLock的源碼,我們第一個(gè)就會(huì)發(fā)現(xiàn)hasQueuedPredecessors方法,這個(gè)方法是AQS為實(shí)現(xiàn)公平模式的預(yù)定義的方法,AQS幫我們實(shí)現(xiàn)好了,該方法用于查詢是否有任何線程等待獲取信號(hào)量資源的時(shí)間超過當(dāng)前線程。
大概步驟為:
- 開啟一個(gè)死循環(huán):
- 調(diào)用hasQueuedPredecessors方法,判斷是否有線程比當(dāng)前線程更早地請(qǐng)求獲取信號(hào)量資源。如果該方法返回true,則表示有線程比當(dāng)前線程更早地請(qǐng)求獲取信號(hào)量資源,由于是公平的的,因此當(dāng)前線程不應(yīng)該獲取信號(hào)量資源,直接返回-1,表示獲取信號(hào)量資源失敗。
- 到這里還沒有返回,表示當(dāng)前線程就是最早請(qǐng)求獲取信號(hào)量資源,可以嘗試獲取。
- 獲取state的值available,我們知道state代表信號(hào)量資源數(shù)量。remaining為available減去需要獲取的信號(hào)量資源數(shù)量之后的差值。
- 如果remaining小于0,那么返回remaining值,由于是負(fù)數(shù),因此獲取失敗,如果大于等于0,那么表示可以獲取成功,嘗試CAS的更新state,更新成功之后同樣返回remaining,由于是大于等于0的數(shù),因此獲取成功。
- 如果remaining大于等于0,但是CAS更新state失敗,那么循環(huán)重試。
原理還是很簡(jiǎn)單的,就是判斷目前的信號(hào)量資源數(shù)量—state的值,是否滿足要獲取的信號(hào)量資源數(shù)量,acquire()方法默認(rèn)獲取1個(gè)資源。獲取到了就是CAS的原子性的將state遞減,否則表示獲取資源失敗,那么可能會(huì)阻塞。但是我們也會(huì)發(fā)現(xiàn):如果remaining大于等于0,但是CAS更新state失敗,那么會(huì)循環(huán)重試,這里為什么要重試呢?
實(shí)際上我們的在AQS文章的“可重入共享鎖的實(shí)現(xiàn)” 部分已經(jīng)講過:因?yàn)榭赡軙?huì)有多個(gè)線程同時(shí)獲取信號(hào)量資源,但是由于CAS只能保證一次只有一個(gè)線程成功,因此其他線程必定失敗,但此時(shí),實(shí)際上還是存在剩余的信號(hào)量資源沒有被獲取完畢的,因此讓其他線程重試,相比于直接加入到同步隊(duì)列中,對(duì)于信號(hào)量資源的利用率更高!
/**
* 公平模式
*/
static final class FairSync extends Sync {
/**
* 嘗試公平的獲取共享信號(hào)量資源
*
* @param acquires 獲取信號(hào)量資源數(shù)量
* @return 如果返回值小于0,表示當(dāng)前線程共享信號(hào)量資源失敗,否則表示成功
*/
protected int tryAcquireShared(int acquires) {
/*開啟一個(gè)循環(huán)嘗試獲取共享信號(hào)量資源*/
for (; ; ) {
//這是AQS實(shí)現(xiàn)公平模式的預(yù)定義的方法,AQS幫我們實(shí)現(xiàn)好了。該方法用于查詢是否有任何線程等待獲取信號(hào)量資源的時(shí)間超過當(dāng)前線程
//如果該方法返回true,則表示有線程比當(dāng)前線程更早地請(qǐng)求獲取信號(hào)量資源。由于是公平的的,因此當(dāng)前線程不應(yīng)該獲取信號(hào)量資源,直接返回-1,表示獲取信號(hào)量資源失敗
if (hasQueuedPredecessors())
return -1;
//到這里,表示當(dāng)前線程就是最早請(qǐng)求獲取信號(hào)量資源,可以嘗試獲取
//獲取state的值available,我們知道state代表信號(hào)量資源數(shù)量
int available = getState();
//remaining為available減去需要獲取的信號(hào)量資源數(shù)量之后的差值
int remaining = available - acquires;
//如果remaining小于0,那么返回remaining值,由于是負(fù)數(shù),因此獲取失敗
//如果大于等于0,那么表示可以獲取成功,嘗試CAS的更新state,更新成功之后同樣返回remaining,由于是大于等于0的數(shù),因此獲取成功
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
//如果remaining大于等于0,但是CAS更新state失敗,那么循環(huán)重試
}
}
}
2.1.2 非公平模式
非公平模式調(diào)用NonfairSync的tryAcquireShared方法!
相比于公平模式的實(shí)現(xiàn),少了hasQueuedPredecessors的判斷。可以想象:如果某線程A 先調(diào)用了aquire()方法獲取信號(hào)量,但是如果當(dāng)前信號(hào)量個(gè)數(shù)為0,那么線程A 會(huì)被放入AQS 的同步隊(duì)列阻塞。
過一段時(shí)間后線程B調(diào)用了release()方法釋放了一個(gè)信號(hào)量,他它會(huì)喚醒隊(duì)列中等待的線程A,但是這時(shí)線程C又調(diào)用了aquire()方法。如果采用非公平策略,那么線程C就會(huì)和線程A 去競(jìng)爭(zhēng)這個(gè)信號(hào)量資源。由nonfairTryAcquireShared的代碼可知,線程C完全可以在線程A 被激活前,或者激活后先于線程A 獲取到該信號(hào)量,也就是在這種模式下阻塞線程和當(dāng)前請(qǐng)求的線程是競(jìng)爭(zhēng)關(guān)系,而不遵循先來先得的策略。
另外,非公平模式的具體實(shí)現(xiàn)是在父類Sync中的nonfairTryAcquireShared方方法,為什么該方法要實(shí)現(xiàn)在父類中的,因?yàn)闊o論是指定的公平模式還是非公平模式,它們的tryAcquire方法都是調(diào)用的nonfairTryAcquireShared方法,即非公平的,因此實(shí)現(xiàn)在父類中!
/**
* 非公平模式
*/
static final class NonfairSync extends Sync {
/**
* 嘗試非公平的獲取共享信號(hào)量資源
*
* @param acquires 獲取信號(hào)量資源數(shù)量
* @return 如果返回值小于0,表示當(dāng)前線程共享信號(hào)量資源失敗,否則表示成功
*/
protected int tryAcquireShared(int acquires) {
//調(diào)用父類Sync的nonfairTryAcquireShared方法
//為什么該方法要實(shí)現(xiàn)在父類中的,因?yàn)闊o論是指定的公平模式還是非公平模式,
//它們的tryAcquire方法都是調(diào)用的nonfairTryAcquireShared方法,即非公平的,因此實(shí)現(xiàn)在父類中
return nonfairTryAcquireShared(acquires);
}
}
/**
* AQS的實(shí)現(xiàn),作為公平和非公平模式的父類,有一些共享方法
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
/**
* 嘗試非公平的獲取共享信號(hào)量資源
*
* @param acquires 獲取信號(hào)量資源數(shù)量
* @return 如果返回值小于0,表示當(dāng)前線程共享信號(hào)量資源失敗,否則表示成功
*/
final int nonfairTryAcquireShared(int acquires) {
/*開啟一個(gè)循環(huán)嘗試獲取共享信號(hào)量資源*/
for (; ; ) {
//相比于公平模式,少了hasQueuedPredecessors的實(shí)現(xiàn)
//獲取state的值available,我們知道state代表信號(hào)量資源數(shù)量
int available = getState();
//remaining為available減去需要獲取的信號(hào)量資源數(shù)量之后的差值
int remaining = available - acquires;
//如果remaining小于0,那么返回remaining值,由于是負(fù)數(shù),因此獲取失敗
//如果大于等于0,那么表示可以獲取成功,嘗試CAS的更新state,更新成功之后同樣返回remaining,由于是大于等于0的數(shù),因此獲取成功
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
//如果remaining大于等于0,但是CAS更新state失敗,那么循環(huán)重試
}
}
}
2.3 不可中斷獲取信號(hào)量
public void acquireUninterruptibly()
不可中斷的獲取一個(gè)信號(hào)量,沒有則一直阻塞,直到在其他線程提供信號(hào)量并喚醒該線程。獲取一個(gè)信號(hào)量就立即返回,將可用的信號(hào)量數(shù)減 1。
相比于acquire()方法,該方法不響應(yīng)中斷,不會(huì)拋出InterruptedException
public void acquireUninterruptibly(int permits)
不可中斷的獲取permits個(gè)信號(hào)量。
相比于acquire方法,acquireUninterruptibly方法不響應(yīng)中斷,不會(huì)拋出InterruptedException。實(shí)際上內(nèi)部調(diào)用AQS的acquireShared方法,這實(shí)際上就是共享式獲取資源的模版方法式。
/**
* 獲取一個(gè)信號(hào)量,沒有則一直阻塞,直到在其他線程提供信號(hào)量并喚醒該線程。
* 獲取一個(gè)信號(hào)量就立即返回,將可用的信號(hào)量數(shù)減 1。
*/
public void acquireUninterruptibly() {
//內(nèi)部調(diào)用AQS的acquireShared方法
//這實(shí)際上就是共享式不可中斷獲取資源模版方法
sync.acquireShared(1);
}
/**
* AQS的acquireShared方法
* 共享式不可中斷獲取資源模版方法
*
* @param arg 獲取的資源數(shù)量
*/
public final void acquireShared(int arg) {
//并沒有檢查中斷
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
/**
* 獲取permits個(gè)信號(hào)量,沒有則一直阻塞,直到在其他線程提供信號(hào)量并喚醒該線程。
*
* @param permits 獲取的信號(hào)量數(shù)量
*/
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
//參數(shù)就是permits
sync.acquireShared(permits);
}
2.4 超時(shí)可中斷獲取信號(hào)量
public boolean tryAcquire(long timeout, TimeUnit unit)
超時(shí)可中斷的獲取一個(gè)信號(hào)量,沒有則一直阻塞,直到在其他線程提供信號(hào)量并喚醒該線程或者線程被中斷或者阻塞超時(shí)。獲取一個(gè)信號(hào)量就立即返回,將可用的信號(hào)量數(shù)減 1。
如果調(diào)用此方法時(shí)已被中斷或者等待時(shí)被中斷,則拋出 InterruptedException,并且清除當(dāng)前線程的已中斷狀態(tài)。
public boolean tryAcquire(int permits,long timeout,TimeUnit unit)
超時(shí)可中斷的獲取permits 個(gè)信號(hào)量。
實(shí)際上內(nèi)部調(diào)用AQS的tryAcquireSharedNanos方法,這實(shí)際上就是共享式超時(shí)可中斷獲取資源的模版方法。
/**
* @param timeout 超時(shí)時(shí)間
* @param unit 時(shí)間單位
* @return 是否獲取資源成功
* @throws InterruptedException 如果調(diào)用此方法時(shí)已被中斷或者等待時(shí)被中斷
*/
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
//實(shí)際上就是調(diào)用的AQS的共享式超時(shí)獲取資源的方法,獲取1個(gè)資源
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
/**
* @param permits 獲取的資源數(shù)量
* @param timeout 超時(shí)時(shí)間
* @param unit 時(shí)間單位
* @return 是否獲取資源成功
* @throws InterruptedException 如果調(diào)用此方法時(shí)已被中斷或者等待時(shí)被中斷
*/
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
//實(shí)際上就是調(diào)用的AQS的共享式超時(shí)獲取資源的方法,獲取permits個(gè)資源
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
/**
* AQS的共享式超時(shí)獲取資源的模版方法,支持中斷
*
* @param arg 參數(shù)
* @param nanosTimeout 超時(shí)時(shí)間,納秒
* @return 是否獲取資源成功
* @throws InterruptedException 如果調(diào)用此方法時(shí)已被中斷或者等待時(shí)被中斷
*/
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
//最開始就檢查一次,如果當(dāng)前線程是被中斷狀態(tài),直接拋出異常
if (Thread.interrupted())
throw new InterruptedException();
//下面是一個(gè)||運(yùn)算進(jìn)行短路連接的代碼,同樣左邊是調(diào)用子類實(shí)現(xiàn)的tryAcquireShared嘗試獲取資源,獲取到了直接返回true
//獲取不到資源就執(zhí)行doAcquireSharedNanos方法,這個(gè)方法是AQS的方法,因此超時(shí)機(jī)制是AQS幫我們實(shí)現(xiàn)的!
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
2.5 嘗試獲取信號(hào)量
public boolean tryAcquire()
僅在調(diào)用時(shí)至少存在至少一個(gè)可用信號(hào)量,才嘗試獲取一個(gè)信號(hào)量。
public boolean tryAcquire(int permits)
僅在調(diào)用時(shí)至少存在permits個(gè)的信號(hào)量,才嘗試獲取permits個(gè)信號(hào)量。
實(shí)際上內(nèi)部就是直接調(diào)用的nonfairTryAcquireShared方法,即公平模式和非公平模式的tryAcquire實(shí)現(xiàn)是一樣的!并且該方法不會(huì)阻塞線程,獲取成功返回true,獲取失敗返回false!
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
//調(diào)用nonfairTryAcquireShared方法
return sync.nonfairTryAcquireShared(permits) >= 0;
}
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
//調(diào)用nonfairTryAcquireShared方法
return sync.nonfairTryAcquireShared(permits) >= 0;
}
2.6 釋放信號(hào)量
public void release()
釋放一個(gè)信號(hào)量,信號(hào)量總數(shù)加1。釋放成功后,將喚醒在同步隊(duì)列中等待獲取信號(hào)量的結(jié)點(diǎn)(線程)!
public void release(int permits)
釋放permits個(gè)信號(hào)量,信號(hào)量總數(shù)加permits。釋放成功后,將喚醒在同步隊(duì)列中等待獲取信號(hào)量的結(jié)點(diǎn)(線程)!
公平模式和非公平模式的信號(hào)量的釋放都是一樣的。實(shí)際上內(nèi)部調(diào)用AQS的releaseShared方法,這實(shí)際上就是共享式釋放資源的模版方法。
/**
* 釋放一個(gè)信號(hào)量,信號(hào)量總數(shù)加1。
*/
public void release() {
//內(nèi)部調(diào)用AQS的releaseShared方法
//這實(shí)際上就是共享式釋放資源的模版方法
sync.releaseShared(1);
}
/**
* 釋放permits個(gè)信號(hào)量,信號(hào)量總數(shù)加permits。
*
* @param permits 釋放的信號(hào)量個(gè)數(shù)
*/
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
//參數(shù)就是permits
sync.releaseShared(permits);
}
/**
* AQS的共享模式下釋放資源的模版方法。
* 如果成功釋放則會(huì)調(diào)用doReleaseShared
*/
public final boolean releaseShared(int arg) {
//tryReleaseShared釋放信號(hào)量資源,該方法由子類自己實(shí)現(xiàn)
if (tryReleaseShared(arg)) {
//釋放成功,必定調(diào)用doReleaseShared嘗試喚醒后繼結(jié)點(diǎn),即阻塞的線程
doReleaseShared();
return true;
}
return false;
}
/**
* Sync的tryReleaseShared實(shí)現(xiàn)
*
* @param releases 要釋放的資源數(shù)量
* @return true 成功 false 失敗
*/
protected final boolean tryReleaseShared(int releases) {
for (; ; ) {
//很簡(jiǎn)單,就是嘗試CAS的增加state值,增加releases
int current = getState();
int next = current + releases;
//這里可以知道,信號(hào)量資源數(shù)量不可超過int的最大值
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//CAS的增加state值,CAS成功之后返回true,否則循環(huán)重試
if (compareAndSetState(current, next))
return true;
}
}
3.Semaphore的使用
Semaphore可以用來控制多線程對(duì)于共享資源訪問的并發(fā)量!
案例:若一個(gè)工廠有5臺(tái)機(jī)器,但是有8個(gè)工人,一臺(tái)機(jī)器同時(shí)只能被一個(gè)工人使用,只有使用完了,其他工人才能繼續(xù)使用,每個(gè)工人之多工作10秒,最后統(tǒng)計(jì)工作量。
我們可以通過Semaphore與之前的CountDownLatch搭配線程池來輕松實(shí)現(xiàn)。我們能發(fā)現(xiàn),采用非公平模式的Semaphore時(shí)工人的總工作量大部分情況下要高于采用公平模式的工人總工作量,即非公平模式的執(zhí)行效率更高(這是不一定的)。我們還能發(fā)現(xiàn),在非公平模式工人的總工作量高于公平模式的工人總工作量時(shí),非公平模式下總會(huì)有某些工人工(特別是工人0、1、2)作量更多,而另一些工人工作量更少,這就是線程饑餓!
/**
* @author lx
*/
public class SemaphoreTest {
/**
* 機(jī)器數(shù)目,實(shí)際上就是信號(hào)量為5,非公平模式
*/
private static Semaphore semaphore = new Semaphore(5, false);
/**
* 機(jī)器數(shù)目,實(shí)際上就是信號(hào)量為5,公平模式
*/
//private static Semaphore semaphore = new Semaphore(5, true);
/**
* 當(dāng)所有工人都完成任務(wù),那么統(tǒng)計(jì)工作量
*/
private static CountDownLatch countDownLatch = new CountDownLatch(10);
/**
* 工人數(shù)目,8
*/
private static final int NUM = 10;
/**
* 當(dāng)前時(shí)間
*/
private static final long NOW = System.nanoTime();
/**
* 納秒單位
*/
private static final long NANOUNIT = 1000000000;
/**
* 工作量
*/
private static final LongAdder WORKLOAD = new LongAdder();
static class Worker implements Runnable {
public Worker(int num) {
this.num = num;
}
private int num;
private long timed = 20 * NANOUNIT;
@Override
public void run() {
while (true) {
//獲取信號(hào)量
try {
if (semaphore.tryAcquire(timed, TimeUnit.NANOSECONDS)) {
System.out.println("工人" + this.num + "占用一個(gè)機(jī)器在生產(chǎn)...");
//占用一定時(shí)間
LockSupport.parkNanos((long) (NANOUNIT * num * 0.5));
//統(tǒng)一調(diào)整為2秒,將會(huì)看到更明顯的Semaphore效果
//LockSupport.parkNanos((long) (NANOUNIT * 2));
System.out.println("工人" + this.num + "生產(chǎn)完畢,釋放出機(jī)器");
//釋放信號(hào)量
//每個(gè)工人最多執(zhí)行20秒
WORKLOAD.increment();
if ((timed = timed - (System.nanoTime() - NOW)) <= 0) {
semaphore.release();
countDownLatch.countDown();
break;
}
semaphore.release();
} else {
countDownLatch.countDown();
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < NUM; i++) {
executorService.execute(new Worker(i));
}
executorService.shutdown();
countDownLatch.await();
System.out.println("工作完畢,空閑機(jī)器為:" + semaphore.availablePermits());
System.out.println("總工作量為:" + WORKLOAD.sum());
}
}
4.Semaphore的總結(jié)
Semaphore和CountDownLatch的原理都差不多,都是直接使用AQS的共享模式實(shí)現(xiàn)自己的邏輯,都是對(duì)于AQS的state資源的利用,但是它們卻實(shí)現(xiàn)了不同的功能,CountDownLatch中state被看作一個(gè)倒計(jì)數(shù)器,當(dāng)state變?yōu)?時(shí),表示線程可以放開執(zhí)行。而Semaphore中的state被看作信號(hào)量資源,獲取不到資源則可能會(huì)阻塞,獲取到資源則可以訪問共享區(qū)域,共享區(qū)域使用完畢要記得還回信號(hào)量。
很明顯Semaphore的信號(hào)量資源很像鎖資源,但是我們前面就說過他們的不同,那就是鎖資源是和獲得鎖的線程綁定的,而這里的信號(hào)量資源并沒有和線程綁定,也就是說你可以讓一些線程不停的“釋放信號(hào)量”,而另一些線程只是不停的“獲取信號(hào)量”,這在AQS內(nèi)部實(shí)際上就是對(duì)state狀態(tài)的值的改變而已,與線程無關(guān)!
通常Semaphore可以用來控制多線程對(duì)于共享資源訪問的并發(fā)量,在上面的案例中我們就見過!另外還需要注意的是,如果在AQS的同步隊(duì)列中隊(duì)頭結(jié)點(diǎn)線程需要獲取n個(gè)資源,目前有m個(gè)資源,如果m小于n,那么這個(gè)隊(duì)列中的頭結(jié)點(diǎn)線程以及后面的所有結(jié)點(diǎn)線程都會(huì)因?yàn)椴荒塬@取到資源而繼續(xù)阻塞,即使頭結(jié)點(diǎn)后面的結(jié)點(diǎn)中的線程所需的資源數(shù)量小于m也不行。即已經(jīng)在AQS同步隊(duì)列中阻塞的線程,只能按照先進(jìn)先出的順序去獲取資源,如果頭部線程因?yàn)樗栀Y源數(shù)量不夠而一直阻塞,那么隊(duì)列后面的線程必定不能獲取資源!
和CountDownLatch一樣,Semaphore的源碼看起來非常簡(jiǎn)單,那是因?yàn)閺?fù)雜的線程等待、喚醒機(jī)制都被AQS實(shí)現(xiàn)了,如果想要真正了解Semaphore的原理,那么AQS是必須要了解的。實(shí)際上如果學(xué)會(huì)了AQS,那么JUC中的鎖或者其他同步組件就很簡(jiǎn)單了!
以上就是詳解Java信號(hào)量Semaphore的原理及使用的詳細(xì)內(nèi)容,更多關(guān)于Java信號(hào)量Semaphore的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Spring Boot 如何將 Word 轉(zhuǎn)換為 PDF
這篇文章主要介紹了Spring Boot將Word轉(zhuǎn)換為 PDF,本文通過示例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-08-08
Java使用Sftp和Ftp實(shí)現(xiàn)對(duì)文件的上傳和下載
這篇文章主要介紹了Java使用Sftp和Ftp實(shí)現(xiàn)對(duì)文件的上傳和下載,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-03-03
Spring MVC+FastJson+Swagger集成的完整實(shí)例教程
這篇文章主要給大家分享介紹了關(guān)于Spring MVC+FastJson+Swagger集成的完整實(shí)例教程,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧。2018-04-04
阿里云主機(jī)上安裝jdk 某庫出現(xiàn)問題的解決方法
今天安裝jdk到阿里云服務(wù)上,首先看下阿里云是32位還是64位的,如果是32位下載32位的包,如果是64位的下載64位的包,下面與大家分享下安裝過程中遇到問題的解決方法2013-06-06
springcloud?gateway高級(jí)功能之集成apollo后動(dòng)態(tài)刷新路由方式
這篇文章主要介紹了springcloud?gateway高級(jí)功能之集成apollo后動(dòng)態(tài)刷新路由方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-08-08
關(guān)于Java限流功能的簡(jiǎn)單實(shí)現(xiàn)
這篇文章主要介紹了關(guān)于Java限流功能的簡(jiǎn)單實(shí)現(xiàn),在Java中,限流是一種常見的技術(shù)手段,用于控制系統(tǒng)的訪問速率,以保護(hù)系統(tǒng)免受過載和濫用,需要的朋友可以參考下2023-07-07
Spring security密碼加密實(shí)現(xiàn)代碼實(shí)例
這篇文章主要介紹了Spring security密碼加密實(shí)現(xiàn)代碼實(shí)例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-04-04

