Java并發(fā)編程之Semaphore詳解
1 概念
Semaphore(信號(hào)量,發(fā)音:三馬佛兒),可以用來(lái)控制同時(shí)訪問(wèn)特定資源的線程數(shù)量,常用于限流場(chǎng)景。
Semaphore接收一個(gè)int整型值,表示 許可證數(shù)量。
線程通過(guò)調(diào)用acquire()獲取許可證,執(zhí)行完成之后通過(guò)調(diào)用release()歸還許可證。只有獲取到許可證的線程才能運(yùn)行,獲取不到許可證的線程將會(huì)阻塞。
Semaphore支持公平鎖和非公平鎖。
2 方法
Semaphore提供了一些方法,如下:
方法 | 說(shuō)明 |
acquire() | 獲取一個(gè)許可證,在獲取到許可證、或者被其他線程調(diào)用中斷之前線程一直處于阻塞狀態(tài)。 |
acquire(int permits) | 一次性獲取多個(gè)許可證,在獲取到多個(gè)許可證、或者被其他線程調(diào)用中斷、或超時(shí)之前線程一直處于阻塞狀態(tài)。 |
acquireUninterruptibly() | 獲取一個(gè)許可證,在獲取到許可證之前線程一直處于阻塞狀態(tài)(忽略中斷)。 |
tryAcquire() | 嘗試獲取許可證,返回獲取許可證成功或失敗,不阻塞線程。 |
tryAcquire(long timeout, TimeUnit unit) | 嘗試獲取許可證,在超時(shí)時(shí)間內(nèi)循環(huán)嘗試獲取,直到嘗試獲取成功或超時(shí)返回,不阻塞線程。 |
release() | 釋放一個(gè)許可證,喚醒等待獲取許可證的阻塞線程。 |
release(int permits) | 一次性釋放多個(gè)許可證。 |
drainPermits() | 清空許可證,把可用許可證數(shù)置為0,返回清空許可證的數(shù)量。 |
3 例子
public class SemaphoreTest { public static void main(String[] args) throws InterruptedException { Semaphore semaphore = new Semaphore(5); System.out.println("初始總許可數(shù) 5"); WorkerThread workerThread1 = new WorkerThread("worker-thread-1", semaphore); WorkerThread workerThread2 = new WorkerThread("worker-thread-2", semaphore); workerThread1.start(); Thread.sleep(20); workerThread2.start(); } } /** * 工作線程 */ class WorkerThread extends Thread { private String name; private Semaphore semaphore; public WorkerThread(String name, Semaphore semaphore) { this. name = name; this.semaphore = semaphore; } @Override public void run() { try { System.out.println(this.name + " 嘗試獲取許可."); // 獲取許可證 semaphore.acquire(); System.out.println(this.name + " 獲取許可成功,當(dāng)前許可還剩 " + semaphore.availablePermits()); Thread.sleep(3000); System.out.println(this.name + " 嘗試釋放許可."); // 釋放許可證 semaphore.release(); System.out.println(this.name + " 釋放許可成功,當(dāng)前許可還剩 " + semaphore.availablePermits()); } catch (InterruptedException e) { e.printStackTrace(); } } }
運(yùn)行結(jié)果:
初始總許可數(shù) 5
worker-thread-1 嘗試獲取許可.
worker-thread-2 嘗試獲取許可.
worker-thread-1 獲取許可成功,當(dāng)前許可還剩 4
worker-thread-2 獲取許可成功,當(dāng)前許可還剩 3
worker-thread-1 嘗試釋放許可.
worker-thread-1 釋放許可成功,當(dāng)前許可還剩 4
worker-thread-2 嘗試釋放許可.
worker-thread-2 釋放許可成功,當(dāng)前許可還剩 5
Process finished with exit code 0
4 源碼解析
4.1 構(gòu)造函數(shù)
public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
Semaphore有兩個(gè)構(gòu)造函數(shù):
- 第一個(gè)構(gòu)造函數(shù)接收一個(gè)int型參數(shù)permits,表示初始化許可證的數(shù)量,并且默認(rèn)使用非公平鎖。
- 第二個(gè)構(gòu)造函數(shù)接收兩個(gè)參數(shù),第二個(gè)boolean型參數(shù)fair可以用來(lái)選擇是使用公平鎖還是非公平鎖。
4.2 Sync、FairSync、NonfairSync
公平鎖FairSync和非公平鎖NonfairSync都繼承了抽象類(lèi)Sync。
Sync源碼:
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L; // 設(shè)置同步狀態(tài)的值為初始化的許可證數(shù)量 Sync(int permits) { setState(permits); } // 獲取同步狀態(tài)的值,也就是剩余可使用的許可證的數(shù)量 final int getPermits() { return getState(); } // 非公平鎖嘗試獲取許可證 final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } // 嘗試釋放許可證 protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } } // 嘗試減少許可證的數(shù)量 final void reducePermits(int reductions) { for (;;) { int current = getState(); int next = current - reductions; if (next > current) // underflow throw new Error("Permit count underflow"); if (compareAndSetState(current, next)) return; } } // 將許可證數(shù)量清0 final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } } }
Sync繼承AQS抽象類(lèi),構(gòu)造函數(shù)調(diào)用的是AQS的setState(int newState)方法,將同步狀態(tài)變量state的值設(shè)置為指定的初始化許可證的數(shù)量。
公平鎖源碼:
static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } // 公平鎖獲取許可證 protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }
非公平鎖源碼:
static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); } // 非公平鎖獲取許可證 protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } }
從源碼中看出,公平鎖和非公平鎖在獲取許可證的時(shí)候,邏輯是不一樣的。
4.3 acquire獲取許可證
調(diào)用Semaphore的acquire()函數(shù)可以獲取許可證,源碼如下:
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
實(shí)際上,調(diào)用的是Sync對(duì)象的acquireSharedInterruptibly(int arg)方法,而Sync繼承了AQS,并且沒(méi)有重寫(xiě)這個(gè)方法,因此調(diào)用的是AQS的acquireSharedInterruptibly(int arg)方法,源碼如下:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // 如果中斷,拋出異常 if (Thread.interrupted()) throw new InterruptedException(); // 嘗試獲取同步變量 if (tryAcquireShared(arg) < 0) // 如果獲取失敗,則將當(dāng)前線程加入同步隊(duì)列中去排隊(duì) doAcquireSharedInterruptibly(arg); }
可以看出,調(diào)用tryAcquireShared(arg)來(lái)嘗試獲取同步變量,在這里也就是獲取許可證。這個(gè)方法在AQS和Sync中都沒(méi)有實(shí)現(xiàn),但是被FairSync和NonfairSync分別實(shí)現(xiàn)了。 如果獲取同步變量失敗,則將當(dāng)前線程放入同步隊(duì)列中排隊(duì)。
4.3.1 公平鎖獲取許可證
如果是FairSync公平鎖,則實(shí)現(xiàn)如下:
protected int tryAcquireShared(int acquires) { // 自旋 for (;;) { // 判斷有沒(méi)有等待獲取同步狀態(tài)的線程,有則直接返回-1 if (hasQueuedPredecessors()) return -1; // 沒(méi)有線程在等待獲取同步狀態(tài),那么當(dāng)前線程去獲取同步狀態(tài) int available = getState(); int remaining = available - acquires; if (remaining < 0 || // 通過(guò)CAS更新同步狀態(tài)的值 compareAndSetState(available, remaining)) return remaining; } }
公平鎖獲取許可證的原理大致如下:
- 首先查看有沒(méi)有線程在同步隊(duì)列中排隊(duì)等待獲取許可證,如果有排隊(duì)的,那么直接返回-1,這樣將會(huì)執(zhí)行doAcquireSharedInterruptibly(arg),將當(dāng)前線程加入同步隊(duì)列中去排隊(duì);
- 如果沒(méi)有線程在排隊(duì),那么當(dāng)前線程獲取同步狀態(tài)的值available,減掉想要獲取的資源值acquires,也就是想要獲取的許可證的數(shù)量,得到剩余的資源量remaining。如果remaining < 0,說(shuō)明資源不夠,本次獲取失敗,返回remaining值(這個(gè)時(shí)候返回的是< 0的值),外層代碼會(huì)調(diào)用doAcquireSharedInterruptibly(arg)將當(dāng)前線程排隊(duì);如果remaining > 0,說(shuō)明資源是夠用的,那么直接通過(guò)CAS原理更新同步狀態(tài)的值。
4.3.2 非公平鎖獲取許可證
如果是NonfairSync非公平鎖,則實(shí)現(xiàn)如下:
protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); }
調(diào)用的是父類(lèi)Sync的nonfairTryAcquireShared(int acquires)方法:
final int nonfairTryAcquireShared(int acquires) { // 自旋 for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
非公平鎖相對(duì)來(lái)說(shuō)去掉了查看同步隊(duì)列的邏輯。也就是說(shuō),在非公平鎖的實(shí)現(xiàn)中,當(dāng)前線程獲取許可證的時(shí)候,不用去查看同步隊(duì)列是否有線程在等待獲取同步狀態(tài),而是直接去嘗試獲取許可證(改變同步狀態(tài)的值)。
當(dāng)然,如果remaining < 0,說(shuō)明當(dāng)前線程沒(méi)能獲取到期望數(shù)量的許可證,獲取失敗,返回< 0的值,在外部邏輯中,將會(huì)調(diào)用doAcquireSharedInterruptibly(arg)使當(dāng)前線程進(jìn)入同步隊(duì)列中進(jìn)行等待;如果remaining > 0,則通過(guò)CAS原理更新同步狀態(tài)的值。
4.4 release釋放許可證
通過(guò)調(diào)用Semaphore的release()方法可以釋放許可證。
public void release() { sync.releaseShared(1); }
實(shí)際上,調(diào)用的是Sync的releaseShared(int arg),而Sync并沒(méi)有重寫(xiě)這個(gè)方法,因此調(diào)用的是AQS的releaseShared(int arg)方法:
public final boolean releaseShared(int arg) { // 嘗試釋放同步變量 if (tryReleaseShared(arg)) { // 如果成功,則喚醒后繼節(jié)點(diǎn) doReleaseShared(); return true; } return false; }
通過(guò)tryReleaseShared(arg)嘗試釋放同步變量,如果成功,則通過(guò)doReleaseShared()喚醒后繼節(jié)點(diǎn)。
AQS并沒(méi)有實(shí)現(xiàn)tryReleaseShared(arg)方法,而是被Semaphore的Sync實(shí)現(xiàn)了:
protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } }
這里通過(guò)CAS改變同步狀態(tài)的值,釋放了許可證。
下面來(lái)看看doReleaseShared()是如何喚醒后繼節(jié)點(diǎn)的:
private void doReleaseShared() { for (;;) { // 首先獲取頭節(jié)點(diǎn) Node h = head; // 如果頭節(jié)點(diǎn)存在 if (h != null && h != tail) { // 獲取頭節(jié)點(diǎn)的狀態(tài) int ws = h.waitStatus; // 如果頭節(jié)點(diǎn)的狀態(tài)是Node.SIGNAL,說(shuō)明頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)正等待被喚醒 if (ws == Node.SIGNAL) { // 將頭節(jié)點(diǎn)的狀態(tài)設(shè)置為初始狀態(tài) if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)) continue; // loop to recheck cases // 喚醒后繼節(jié)點(diǎn) unparkSuccessor(h); } // 如果頭節(jié)點(diǎn)的狀態(tài)已經(jīng)是0了,則設(shè)置頭節(jié)點(diǎn)狀態(tài)為Node.PROPAGATE else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
嘗試喚醒后繼節(jié)點(diǎn)的邏輯比較簡(jiǎn)單:
- 首先獲取同步隊(duì)列中的頭節(jié)點(diǎn);
- 如果頭節(jié)點(diǎn)存在,并且不是尾節(jié)點(diǎn),接著獲取頭節(jié)點(diǎn)的狀態(tài);
- 如果頭節(jié)點(diǎn)的狀態(tài)是Node.SIGNAL,說(shuō)明他的后繼節(jié)點(diǎn)正等待著被他喚醒。這個(gè)時(shí)候通過(guò)CAS原理將頭節(jié)點(diǎn)的狀態(tài)置為0,如果成功了,則通過(guò)調(diào)用unparkSuccessor(h)喚醒后繼節(jié)點(diǎn),最后實(shí)際上調(diào)用的是LockSupport.unpark(Thread thread)方法喚醒線程的。
到此這篇關(guān)于Java并發(fā)編程之Semaphore詳解的文章就介紹到這了,更多相關(guān)Semaphore詳細(xì)解析內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java實(shí)現(xiàn)將TXT文本文件轉(zhuǎn)換為PDF文件
與TXT文本文件,PDF文件更加專(zhuān)業(yè)也更適合傳輸,所以這篇文章小編主要為大家詳細(xì)介紹了如何使用Java實(shí)現(xiàn)將TXT文本文件轉(zhuǎn)換為PDF文件 ,需要的可以參考下2024-02-02SpringBoot創(chuàng)建定時(shí)任務(wù)的示例詳解
在Spring Boot中創(chuàng)建定時(shí)任務(wù),通常使用@Scheduled注解,這是Spring框架提供的一個(gè)功能,允許你按照固定的頻率(如每天、每小時(shí)、每分鐘等)執(zhí)行某個(gè)方法,本文給大家介紹了SpringBoot創(chuàng)建定時(shí)任務(wù)的示例,需要的朋友可以參考下2024-04-04如何使用axis調(diào)用WebService及Java?WebService調(diào)用工具類(lèi)
Axis是一個(gè)基于Java的Web服務(wù)框架,可以用來(lái)調(diào)用Web服務(wù)接口,下面這篇文章主要給大家介紹了關(guān)于如何使用axis調(diào)用WebService及Java?WebService調(diào)用工具類(lèi)的相關(guān)資料,需要的朋友可以參考下2023-04-04使用MyEclipse 開(kāi)發(fā)struts2框架實(shí)現(xiàn)登錄功能(結(jié)構(gòu)教程)
這篇文章主要介紹了使用MyEclipse 開(kāi)發(fā)struts2框架實(shí)現(xiàn)登錄功能(結(jié)構(gòu)教程)的相關(guān)資料,需要的朋友可以參考下2016-03-03Java中finally和return的關(guān)系實(shí)例解析
這篇文章主要介紹了Java中finally和return的關(guān)系實(shí)例解析,總結(jié)了二者的關(guān)系,然后分享了相關(guān)代碼示例,小編覺(jué)得還是挺不錯(cuò)的,具有一定借鑒價(jià)值,需要的朋友可以參考下2018-02-02RestTemplate如何使用JSON發(fā)送Post請(qǐng)求
這篇文章主要介紹了RestTemplate如何使用JSON發(fā)送Post請(qǐng)求問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-09-09