Java并發(fā)編程之Semaphore詳解
1 概念
Semaphore(信號(hào)量,發(fā)音:三馬佛兒),可以用來(lái)控制同時(shí)訪問(wèn)特定資源的線程數(shù)量,常用于限流場(chǎng)景。
Semaphore接收一個(gè)int整型值,表示 許可證數(shù)量。
線程通過(guò)調(diào)用acquire()獲取許可證,執(zhí)行完成之后通過(guò)調(diào)用release()歸還許可證。只有獲取到許可證的線程才能運(yùn)行,獲取不到許可證的線程將會(huì)阻塞。
Semaphore支持公平鎖和非公平鎖。
2 方法
Semaphore提供了一些方法,如下:
| 方法 | 說(shuō)明 |
| acquire() | 獲取一個(gè)許可證,在獲取到許可證、或者被其他線程調(diào)用中斷之前線程一直處于阻塞狀態(tài)。 |
| acquire(int permits) | 一次性獲取多個(gè)許可證,在獲取到多個(gè)許可證、或者被其他線程調(diào)用中斷、或超時(shí)之前線程一直處于阻塞狀態(tài)。 |
| acquireUninterruptibly() | 獲取一個(gè)許可證,在獲取到許可證之前線程一直處于阻塞狀態(tài)(忽略中斷)。 |
| tryAcquire() | 嘗試獲取許可證,返回獲取許可證成功或失敗,不阻塞線程。 |
| tryAcquire(long timeout, TimeUnit unit) | 嘗試獲取許可證,在超時(shí)時(shí)間內(nèi)循環(huán)嘗試獲取,直到嘗試獲取成功或超時(shí)返回,不阻塞線程。 |
| release() | 釋放一個(gè)許可證,喚醒等待獲取許可證的阻塞線程。 |
| release(int permits) | 一次性釋放多個(gè)許可證。 |
| drainPermits() | 清空許可證,把可用許可證數(shù)置為0,返回清空許可證的數(shù)量。 |
3 例子
public class SemaphoreTest {
public static void main(String[] args) throws InterruptedException {
Semaphore semaphore = new Semaphore(5);
System.out.println("初始總許可數(shù) 5");
WorkerThread workerThread1 = new WorkerThread("worker-thread-1", semaphore);
WorkerThread workerThread2 = new WorkerThread("worker-thread-2", semaphore);
workerThread1.start();
Thread.sleep(20);
workerThread2.start();
}
}
/**
* 工作線程
*/
class WorkerThread extends Thread {
private String name;
private Semaphore semaphore;
public WorkerThread(String name, Semaphore semaphore) {
this. name = name;
this.semaphore = semaphore;
}
@Override
public void run() {
try {
System.out.println(this.name + " 嘗試獲取許可.");
// 獲取許可證
semaphore.acquire();
System.out.println(this.name + " 獲取許可成功,當(dāng)前許可還剩 " + semaphore.availablePermits());
Thread.sleep(3000);
System.out.println(this.name + " 嘗試釋放許可.");
// 釋放許可證
semaphore.release();
System.out.println(this.name + " 釋放許可成功,當(dāng)前許可還剩 " + semaphore.availablePermits());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
運(yùn)行結(jié)果:
初始總許可數(shù) 5
worker-thread-1 嘗試獲取許可.
worker-thread-2 嘗試獲取許可.
worker-thread-1 獲取許可成功,當(dāng)前許可還剩 4
worker-thread-2 獲取許可成功,當(dāng)前許可還剩 3
worker-thread-1 嘗試釋放許可.
worker-thread-1 釋放許可成功,當(dāng)前許可還剩 4
worker-thread-2 嘗試釋放許可.
worker-thread-2 釋放許可成功,當(dāng)前許可還剩 5
Process finished with exit code 0
4 源碼解析
4.1 構(gòu)造函數(shù)
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
Semaphore有兩個(gè)構(gòu)造函數(shù):
- 第一個(gè)構(gòu)造函數(shù)接收一個(gè)int型參數(shù)permits,表示初始化許可證的數(shù)量,并且默認(rèn)使用非公平鎖。
- 第二個(gè)構(gòu)造函數(shù)接收兩個(gè)參數(shù),第二個(gè)boolean型參數(shù)fair可以用來(lái)選擇是使用公平鎖還是非公平鎖。
4.2 Sync、FairSync、NonfairSync
公平鎖FairSync和非公平鎖NonfairSync都繼承了抽象類Sync。
Sync源碼:
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
// 設(shè)置同步狀態(tài)的值為初始化的許可證數(shù)量
Sync(int permits) {
setState(permits);
}
// 獲取同步狀態(tài)的值,也就是剩余可使用的許可證的數(shù)量
final int getPermits() {
return getState();
}
// 非公平鎖嘗試獲取許可證
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
// 嘗試釋放許可證
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
// 嘗試減少許可證的數(shù)量
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
// 將許可證數(shù)量清0
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
Sync繼承AQS抽象類,構(gòu)造函數(shù)調(diào)用的是AQS的setState(int newState)方法,將同步狀態(tài)變量state的值設(shè)置為指定的初始化許可證的數(shù)量。
公平鎖源碼:
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
// 公平鎖獲取許可證
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
非公平鎖源碼:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
// 非公平鎖獲取許可證
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
從源碼中看出,公平鎖和非公平鎖在獲取許可證的時(shí)候,邏輯是不一樣的。
4.3 acquire獲取許可證
調(diào)用Semaphore的acquire()函數(shù)可以獲取許可證,源碼如下:
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
實(shí)際上,調(diào)用的是Sync對(duì)象的acquireSharedInterruptibly(int arg)方法,而Sync繼承了AQS,并且沒(méi)有重寫這個(gè)方法,因此調(diào)用的是AQS的acquireSharedInterruptibly(int arg)方法,源碼如下:
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 如果中斷,拋出異常
if (Thread.interrupted())
throw new InterruptedException();
// 嘗試獲取同步變量
if (tryAcquireShared(arg) < 0)
// 如果獲取失敗,則將當(dāng)前線程加入同步隊(duì)列中去排隊(duì)
doAcquireSharedInterruptibly(arg);
}
可以看出,調(diào)用tryAcquireShared(arg)來(lái)嘗試獲取同步變量,在這里也就是獲取許可證。這個(gè)方法在AQS和Sync中都沒(méi)有實(shí)現(xiàn),但是被FairSync和NonfairSync分別實(shí)現(xiàn)了。 如果獲取同步變量失敗,則將當(dāng)前線程放入同步隊(duì)列中排隊(duì)。
4.3.1 公平鎖獲取許可證
如果是FairSync公平鎖,則實(shí)現(xiàn)如下:
protected int tryAcquireShared(int acquires) {
// 自旋
for (;;) {
// 判斷有沒(méi)有等待獲取同步狀態(tài)的線程,有則直接返回-1
if (hasQueuedPredecessors())
return -1;
// 沒(méi)有線程在等待獲取同步狀態(tài),那么當(dāng)前線程去獲取同步狀態(tài)
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
// 通過(guò)CAS更新同步狀態(tài)的值
compareAndSetState(available, remaining))
return remaining;
}
}
公平鎖獲取許可證的原理大致如下:
- 首先查看有沒(méi)有線程在同步隊(duì)列中排隊(duì)等待獲取許可證,如果有排隊(duì)的,那么直接返回-1,這樣將會(huì)執(zhí)行doAcquireSharedInterruptibly(arg),將當(dāng)前線程加入同步隊(duì)列中去排隊(duì);
- 如果沒(méi)有線程在排隊(duì),那么當(dāng)前線程獲取同步狀態(tài)的值available,減掉想要獲取的資源值acquires,也就是想要獲取的許可證的數(shù)量,得到剩余的資源量remaining。如果remaining < 0,說(shuō)明資源不夠,本次獲取失敗,返回remaining值(這個(gè)時(shí)候返回的是< 0的值),外層代碼會(huì)調(diào)用doAcquireSharedInterruptibly(arg)將當(dāng)前線程排隊(duì);如果remaining > 0,說(shuō)明資源是夠用的,那么直接通過(guò)CAS原理更新同步狀態(tài)的值。
4.3.2 非公平鎖獲取許可證
如果是NonfairSync非公平鎖,則實(shí)現(xiàn)如下:
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
調(diào)用的是父類Sync的nonfairTryAcquireShared(int acquires)方法:
final int nonfairTryAcquireShared(int acquires) {
// 自旋
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
非公平鎖相對(duì)來(lái)說(shuō)去掉了查看同步隊(duì)列的邏輯。也就是說(shuō),在非公平鎖的實(shí)現(xiàn)中,當(dāng)前線程獲取許可證的時(shí)候,不用去查看同步隊(duì)列是否有線程在等待獲取同步狀態(tài),而是直接去嘗試獲取許可證(改變同步狀態(tài)的值)。
當(dāng)然,如果remaining < 0,說(shuō)明當(dāng)前線程沒(méi)能獲取到期望數(shù)量的許可證,獲取失敗,返回< 0的值,在外部邏輯中,將會(huì)調(diào)用doAcquireSharedInterruptibly(arg)使當(dāng)前線程進(jìn)入同步隊(duì)列中進(jìn)行等待;如果remaining > 0,則通過(guò)CAS原理更新同步狀態(tài)的值。
4.4 release釋放許可證
通過(guò)調(diào)用Semaphore的release()方法可以釋放許可證。
public void release() {
sync.releaseShared(1);
}
實(shí)際上,調(diào)用的是Sync的releaseShared(int arg),而Sync并沒(méi)有重寫這個(gè)方法,因此調(diào)用的是AQS的releaseShared(int arg)方法:
public final boolean releaseShared(int arg) {
// 嘗試釋放同步變量
if (tryReleaseShared(arg)) {
// 如果成功,則喚醒后繼節(jié)點(diǎn)
doReleaseShared();
return true;
}
return false;
}
通過(guò)tryReleaseShared(arg)嘗試釋放同步變量,如果成功,則通過(guò)doReleaseShared()喚醒后繼節(jié)點(diǎn)。
AQS并沒(méi)有實(shí)現(xiàn)tryReleaseShared(arg)方法,而是被Semaphore的Sync實(shí)現(xiàn)了:
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
這里通過(guò)CAS改變同步狀態(tài)的值,釋放了許可證。
下面來(lái)看看doReleaseShared()是如何喚醒后繼節(jié)點(diǎn)的:
private void doReleaseShared() {
for (;;) {
// 首先獲取頭節(jié)點(diǎn)
Node h = head;
// 如果頭節(jié)點(diǎn)存在
if (h != null && h != tail) {
// 獲取頭節(jié)點(diǎn)的狀態(tài)
int ws = h.waitStatus;
// 如果頭節(jié)點(diǎn)的狀態(tài)是Node.SIGNAL,說(shuō)明頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)正等待被喚醒
if (ws == Node.SIGNAL) {
// 將頭節(jié)點(diǎn)的狀態(tài)設(shè)置為初始狀態(tài)
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
continue; // loop to recheck cases
// 喚醒后繼節(jié)點(diǎn)
unparkSuccessor(h);
}
// 如果頭節(jié)點(diǎn)的狀態(tài)已經(jīng)是0了,則設(shè)置頭節(jié)點(diǎn)狀態(tài)為Node.PROPAGATE
else if (ws == 0 &&
!h.compareAndSetWaitStatus(0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
嘗試喚醒后繼節(jié)點(diǎn)的邏輯比較簡(jiǎn)單:
- 首先獲取同步隊(duì)列中的頭節(jié)點(diǎn);
- 如果頭節(jié)點(diǎn)存在,并且不是尾節(jié)點(diǎn),接著獲取頭節(jié)點(diǎn)的狀態(tài);
- 如果頭節(jié)點(diǎn)的狀態(tài)是Node.SIGNAL,說(shuō)明他的后繼節(jié)點(diǎn)正等待著被他喚醒。這個(gè)時(shí)候通過(guò)CAS原理將頭節(jié)點(diǎn)的狀態(tài)置為0,如果成功了,則通過(guò)調(diào)用unparkSuccessor(h)喚醒后繼節(jié)點(diǎn),最后實(shí)際上調(diào)用的是LockSupport.unpark(Thread thread)方法喚醒線程的。
到此這篇關(guān)于Java并發(fā)編程之Semaphore詳解的文章就介紹到這了,更多相關(guān)Semaphore詳細(xì)解析內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java實(shí)現(xiàn)將TXT文本文件轉(zhuǎn)換為PDF文件
與TXT文本文件,PDF文件更加專業(yè)也更適合傳輸,所以這篇文章小編主要為大家詳細(xì)介紹了如何使用Java實(shí)現(xiàn)將TXT文本文件轉(zhuǎn)換為PDF文件 ,需要的可以參考下2024-02-02
SpringBoot創(chuàng)建定時(shí)任務(wù)的示例詳解
在Spring Boot中創(chuàng)建定時(shí)任務(wù),通常使用@Scheduled注解,這是Spring框架提供的一個(gè)功能,允許你按照固定的頻率(如每天、每小時(shí)、每分鐘等)執(zhí)行某個(gè)方法,本文給大家介紹了SpringBoot創(chuàng)建定時(shí)任務(wù)的示例,需要的朋友可以參考下2024-04-04
如何使用axis調(diào)用WebService及Java?WebService調(diào)用工具類
Axis是一個(gè)基于Java的Web服務(wù)框架,可以用來(lái)調(diào)用Web服務(wù)接口,下面這篇文章主要給大家介紹了關(guān)于如何使用axis調(diào)用WebService及Java?WebService調(diào)用工具類的相關(guān)資料,需要的朋友可以參考下2023-04-04
使用MyEclipse 開發(fā)struts2框架實(shí)現(xiàn)登錄功能(結(jié)構(gòu)教程)
這篇文章主要介紹了使用MyEclipse 開發(fā)struts2框架實(shí)現(xiàn)登錄功能(結(jié)構(gòu)教程)的相關(guān)資料,需要的朋友可以參考下2016-03-03
Java中finally和return的關(guān)系實(shí)例解析
這篇文章主要介紹了Java中finally和return的關(guān)系實(shí)例解析,總結(jié)了二者的關(guān)系,然后分享了相關(guān)代碼示例,小編覺得還是挺不錯(cuò)的,具有一定借鑒價(jià)值,需要的朋友可以參考下2018-02-02
RestTemplate如何使用JSON發(fā)送Post請(qǐng)求
這篇文章主要介紹了RestTemplate如何使用JSON發(fā)送Post請(qǐng)求問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-09-09

