Java并發(fā)系列之Semaphore源碼分析
Semaphore(信號(hào)量)是JUC包中比較常用到的一個(gè)類(lèi),它是AQS共享模式的一個(gè)應(yīng)用,可以允許多個(gè)線程同時(shí)對(duì)共享資源進(jìn)行操作,并且可以有效的控制并發(fā)數(shù),利用它可以很好的實(shí)現(xiàn)流量控制。Semaphore提供了一個(gè)許可證的概念,可以把這個(gè)許可證看作公共汽車(chē)車(chē)票,只有成功獲取車(chē)票的人才能夠上車(chē),并且車(chē)票是有一定數(shù)量的,不可能毫無(wú)限制的發(fā)下去,這樣就會(huì)導(dǎo)致公交車(chē)超載。所以當(dāng)車(chē)票發(fā)完的時(shí)候(公交車(chē)以滿(mǎn)載),其他人就只能等下一趟車(chē)了。如果中途有人下車(chē),那么他的位置將會(huì)空閑出來(lái),因此如果這時(shí)其他人想要上車(chē)的話就又可以獲得車(chē)票了。利用Semaphore可以實(shí)現(xiàn)各種池,我們?cè)诒酒┪矊?huì)動(dòng)手寫(xiě)一個(gè)簡(jiǎn)易的數(shù)據(jù)庫(kù)連接池。首先我們來(lái)看一下Semaphore的構(gòu)造器。
//構(gòu)造器1 public Semaphore(int permits) { sync = new NonfairSync(permits); } //構(gòu)造器2 public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
Semaphore提供了兩個(gè)帶參構(gòu)造器,沒(méi)有提供無(wú)參構(gòu)造器。這兩個(gè)構(gòu)造器都必須傳入一個(gè)初始的許可證數(shù)量,使用構(gòu)造器1構(gòu)造出來(lái)的信號(hào)量在獲取許可證時(shí)會(huì)采用非公平方式獲取,使用構(gòu)造器2可以通過(guò)參數(shù)指定獲取許可證的方式(公平or非公平)。Semaphore主要對(duì)外提供了兩類(lèi)API,獲取許可證和釋放許可證,默認(rèn)的是獲取和釋放一個(gè)許可證,也可以傳入?yún)?shù)來(lái)同時(shí)獲取和釋放多個(gè)許可證。在本篇中我們只講每次獲取和釋放一個(gè)許可證的情況。
1.獲取許可證
//獲取一個(gè)許可證(響應(yīng)中斷) public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } //獲取一個(gè)許可證(不響應(yīng)中斷) public void acquireUninterruptibly() { sync.acquireShared(1); } //嘗試獲取許可證(非公平獲取) public boolean tryAcquire() { return sync.nonfairTryAcquireShared(1) >= 0; } //嘗試獲取許可證(定時(shí)獲取) public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }
上面的API是Semaphore提供的默認(rèn)獲取許可證操作。每次只獲取一個(gè)許可證,這也是現(xiàn)實(shí)生活中較常遇到的情況。除了直接獲取還提供了嘗試獲取,直接獲取操作在失敗之后可能會(huì)阻塞線程,而嘗試獲取則不會(huì)。另外還需注意的是tryAcquire方法是使用非公平方式嘗試獲取的。在平時(shí)我們比較常用到的是acquire方法去獲取許可證。下面我們就來(lái)看看它是怎樣獲取的??梢钥吹絘cquire方法里面直接就是調(diào)用sync.acquireSharedInterruptibly(1),這個(gè)方法是AQS里面的方法,我們?cè)谥vAQS源碼系列文章的時(shí)候曾經(jīng)講過(guò),現(xiàn)在我們?cè)賮?lái)回顧一下。
//以可中斷模式獲取鎖(共享模式) public final void acquireSharedInterruptibly(int arg) throws InterruptedException { //首先判斷線程是否中斷, 如果是則拋出異常 if (Thread.interrupted()) { throw new InterruptedException(); } //1.嘗試去獲取鎖 if (tryAcquireShared(arg) < 0) { //2. 如果獲取失敗則進(jìn)人該方法 doAcquireSharedInterruptibly(arg); } }
acquireSharedInterruptibly方法首先就是去調(diào)用tryAcquireShared方法去嘗試獲取,tryAcquireShared在AQS里面是抽象方法,F(xiàn)airSync和NonfairSync這兩個(gè)派生類(lèi)實(shí)現(xiàn)了該方法的邏輯。FairSync實(shí)現(xiàn)的是公平獲取的邏輯,而NonfairSync實(shí)現(xiàn)的非公平獲取的邏輯。
abstract static class Sync extends AbstractQueuedSynchronizer { //非公平方式嘗試獲取 final int nonfairTryAcquireShared(int acquires) { for (;;) { //獲取可用許可證 int available = getState(); //獲取剩余許可證 int remaining = available - acquires; //1.如果remaining小于0則直接返回remaining //2.如果remaining大于0則先更新同步狀態(tài)再返回remaining if (remaining < 0 || compareAndSetState(available, remaining)) { return remaining; } } } } //非公平同步器 static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); } //嘗試獲取許可證 protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } } //公平同步器 static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } //嘗試獲取許可證 protected int tryAcquireShared(int acquires) { for (;;) { //判斷同步隊(duì)列前面有沒(méi)有人排隊(duì) if (hasQueuedPredecessors()) { //如果有的話就直接返回-1,表示嘗試獲取失敗 return -1; } //獲取可用許可證 int available = getState(); //獲取剩余許可證 int remaining = available - acquires; //1.如果remaining小于0則直接返回remaining //2.如果remaining大于0則先更新同步狀態(tài)再返回remaining if (remaining < 0 || compareAndSetState(available, remaining)) { return remaining; } } } }
這里需要注意的是NonfairSync的tryAcquireShared方法直接調(diào)用的是nonfairTryAcquireShared方法,這個(gè)方法是在父類(lèi)Sync里面的。非公平獲取鎖的邏輯是先取出當(dāng)前同步狀態(tài)(同步狀態(tài)表示許可證個(gè)數(shù)),將當(dāng)前同步狀態(tài)減去參入的參數(shù),如果結(jié)果不小于0的話證明還有可用的許可證,那么就直接使用CAS操作更新同步狀態(tài)的值,最后不管結(jié)果是否小于0都會(huì)返回該結(jié)果值。這里我們要了解tryAcquireShared方法返回值的含義,返回負(fù)數(shù)表示獲取失敗,零表示當(dāng)前線程獲取成功但后續(xù)線程不能再獲取,正數(shù)表示當(dāng)前線程獲取成功并且后續(xù)線程也能夠獲取。我們?cè)賮?lái)看acquireSharedInterruptibly方法的代碼。
//以可中斷模式獲取鎖(共享模式) public final void acquireSharedInterruptibly(int arg) throws InterruptedException { //首先判斷線程是否中斷, 如果是則拋出異常 if (Thread.interrupted()) { throw new InterruptedException(); } //1.嘗試去獲取鎖 //負(fù)數(shù):表示獲取失敗 //零值:表示當(dāng)前線程獲取成功, 但是后繼線程不能再獲取了 //正數(shù):表示當(dāng)前線程獲取成功, 并且后繼線程同樣可以獲取成功 if (tryAcquireShared(arg) < 0) { //2. 如果獲取失敗則進(jìn)人該方法 doAcquireSharedInterruptibly(arg); } }
如果返回的remaining小于0的話就代表獲取失敗,因此tryAcquireShared(arg) < 0就為true,所以接下來(lái)就會(huì)調(diào)用doAcquireSharedInterruptibly方法,這個(gè)方法我們?cè)谥vAQS的時(shí)候講過(guò),它會(huì)將當(dāng)前線程包裝成結(jié)點(diǎn)放入同步隊(duì)列尾部,并且有可能掛起線程。這也是當(dāng)remaining小于0時(shí)線程會(huì)排隊(duì)阻塞的原因。而如果返回的remaining>=0的話就代表當(dāng)前線程獲取成功,因此tryAcquireShared(arg) < 0就為flase,所以就不會(huì)再去調(diào)用doAcquireSharedInterruptibly方法阻塞當(dāng)前線程了。以上是非公平獲取的整個(gè)邏輯,而公平獲取時(shí)僅僅是在此之前先去調(diào)用hasQueuedPredecessors方法判斷同步隊(duì)列是否有人在排隊(duì),如果有的話就直接return -1表示獲取失敗,否則才繼續(xù)執(zhí)行下面和非公平獲取一樣的步驟。
2.釋放許可證
//釋放一個(gè)許可證 public void release() { sync.releaseShared(1); }
調(diào)用release方法是釋放一個(gè)許可證,它的操作很簡(jiǎn)單,就調(diào)用了AQS的releaseShared方法,我們來(lái)看看這個(gè)方法。
//釋放鎖的操作(共享模式) public final boolean releaseShared(int arg) { //1.嘗試去釋放鎖 if (tryReleaseShared(arg)) { //2.如果釋放成功就喚醒其他線程 doReleaseShared(); return true; } return false; }
AQS的releaseShared方法首先調(diào)用tryReleaseShared方法嘗試釋放鎖,這個(gè)方法的實(shí)現(xiàn)邏輯在子類(lèi)Sync里面。
abstract static class Sync extends AbstractQueuedSynchronizer { ... //嘗試釋放操作 protected final boolean tryReleaseShared(int releases) { for (;;) { //獲取當(dāng)前同步狀態(tài) int current = getState(); //將當(dāng)前同步狀態(tài)加上傳入的參數(shù) int next = current + releases; //如果相加結(jié)果小于當(dāng)前同步狀態(tài)的話就報(bào)錯(cuò) if (next < current) { throw new Error("Maximum permit count exceeded"); } //以CAS方式更新同步狀態(tài)的值, 更新成功則返回true, 否則繼續(xù)循環(huán) if (compareAndSetState(current, next)) { return true; } } } ... }
可以看到tryReleaseShared方法里面采用for循環(huán)進(jìn)行自旋,首先獲取同步狀態(tài),將同步狀態(tài)加上傳入的參數(shù),然后以CAS方式更新同步狀態(tài),更新成功就返回true并跳出方法,否則就繼續(xù)循環(huán)直到成功為止,這就是Semaphore釋放許可證的流程。
3.動(dòng)手寫(xiě)個(gè)連接池
Semaphore代碼并沒(méi)有很復(fù)雜,常用的操作就是獲取和釋放一個(gè)許可證,這些操作的實(shí)現(xiàn)邏輯也都比較簡(jiǎn)單,但這并不妨礙Semaphore的廣泛應(yīng)用。下面我們就來(lái)利用Semaphore實(shí)現(xiàn)一個(gè)簡(jiǎn)單的數(shù)據(jù)庫(kù)連接池,通過(guò)這個(gè)例子希望讀者們能更加深入的掌握Semaphore的運(yùn)用。
public class ConnectPool { //連接池大小 private int size; //數(shù)據(jù)庫(kù)連接集合 private Connect[] connects; //連接狀態(tài)標(biāo)志 private boolean[] connectFlag; //剩余可用連接數(shù) private volatile int available; //信號(hào)量 private Semaphore semaphore; //構(gòu)造器 public ConnectPool(int size) { this.size = size; this.available = size; semaphore = new Semaphore(size, true); connects = new Connect[size]; connectFlag = new boolean[size]; initConnects(); } //初始化連接 private void initConnects() { //生成指定數(shù)量的數(shù)據(jù)庫(kù)連接 for(int i = 0; i < this.size; i++) { connects[i] = new Connect(); } } //獲取數(shù)據(jù)庫(kù)連接 private synchronized Connect getConnect(){ for(int i = 0; i < connectFlag.length; i++) { //遍歷集合找到未使用的連接 if(!connectFlag[i]) { //將連接設(shè)置為使用中 connectFlag[i] = true; //可用連接數(shù)減1 available--; System.out.println("【"+Thread.currentThread().getName()+"】以獲取連接 剩余連接數(shù):" + available); //返回連接引用 return connects[i]; } } return null; } //獲取一個(gè)連接 public Connect openConnect() throws InterruptedException { //獲取許可證 semaphore.acquire(); //獲取數(shù)據(jù)庫(kù)連接 return getConnect(); } //釋放一個(gè)連接 public synchronized void release(Connect connect) { for(int i = 0; i < this.size; i++) { if(connect == connects[i]){ //將連接設(shè)置為未使用 connectFlag[i] = false; //可用連接數(shù)加1 available++; System.out.println("【"+Thread.currentThread().getName()+"】以釋放連接 剩余連接數(shù):" + available); //釋放許可證 semaphore.release(); } } } //剩余可用連接數(shù) public int available() { return available; } }
測(cè)試代碼:
public class TestThread extends Thread { private static ConnectPool pool = new ConnectPool(3); @Override public void run() { try { Connect connect = pool.openConnect(); Thread.sleep(100); //休息一下 pool.release(connect); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { for(int i = 0; i < 10; i++) { new TestThread().start(); } } }
測(cè)試結(jié)果:
我們使用一個(gè)數(shù)組來(lái)存放數(shù)據(jù)庫(kù)連接的引用,在初始化連接池的時(shí)候會(huì)調(diào)用initConnects方法創(chuàng)建指定數(shù)量的數(shù)據(jù)庫(kù)連接,并將它們的引用存放到數(shù)組中,此外還有一個(gè)相同大小的數(shù)組來(lái)記錄連接是否可用。每當(dāng)外部線程請(qǐng)求獲取一個(gè)連接時(shí),首先調(diào)用semaphore.acquire()方法獲取一個(gè)許可證,然后將連接狀態(tài)設(shè)置為使用中,最后返回該連接的引用。許可證的數(shù)量由構(gòu)造時(shí)傳入的參數(shù)決定,每調(diào)用一次semaphore.acquire()方法許可證數(shù)量減1,當(dāng)數(shù)量減為0時(shí)說(shuō)明已經(jīng)沒(méi)有連接可以使用了,這時(shí)如果其他線程再來(lái)獲取就會(huì)被阻塞。每當(dāng)線程釋放一個(gè)連接的時(shí)候會(huì)調(diào)用semaphore.release()將許可證釋放,此時(shí)許可證的總量又會(huì)增加,代表可用的連接數(shù)增加了,那么之前被阻塞的線程將會(huì)醒來(lái)繼續(xù)獲取連接,這時(shí)再次獲取就能夠成功獲取連接了。測(cè)試示例中初始化了一個(gè)3個(gè)連接的連接池,我們從測(cè)試結(jié)果中可以看到,每當(dāng)線程獲取一個(gè)連接剩余的連接數(shù)將會(huì)減1,等到減為0時(shí)其他線程就不能再獲取了,此時(shí)必須等待一個(gè)線程將連接釋放之后才能繼續(xù)獲取??梢钥吹绞S噙B接數(shù)總是在0到3之間變動(dòng),說(shuō)明我們這次的測(cè)試是成功的。
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
- Java并發(fā)編程Semaphore計(jì)數(shù)信號(hào)量詳解
- Java并發(fā)編程之Semaphore(信號(hào)量)詳解及實(shí)例
- Java信號(hào)量Semaphore原理及代碼實(shí)例
- Java基于Semaphore構(gòu)建阻塞對(duì)象池
- Java 信號(hào)量Semaphore的實(shí)現(xiàn)
- Java中Semaphore(信號(hào)量)的使用方法
- Java并發(fā)編程:CountDownLatch與CyclicBarrier和Semaphore的實(shí)例詳解
- JAVA 多線程之信號(hào)量(Semaphore)實(shí)例詳解
- java線程并發(fā)semaphore類(lèi)示例
- 分析Java并發(fā)編程之信號(hào)量Semaphore
相關(guān)文章
java如何獲取用戶(hù)登錄ip、瀏覽器信息、SessionId
這篇文章主要介紹了java如何獲取用戶(hù)登錄ip、瀏覽器信息、SessionId,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11Java elasticSearch-api的具體操作步驟講解
這篇文章主要介紹了elasticSearch-api的具體操作步驟講解,本文通過(guò)詳細(xì)的步驟介紹和圖文代碼展示講解了該項(xiàng)技術(shù),需要的朋友可以參考下2021-06-06idea中javaweb的jsp頁(yè)面圖片加載不出來(lái)問(wèn)題及解決
這篇文章主要介紹了idea中javaweb的jsp頁(yè)面圖片加載不出來(lái)問(wèn)題及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-07-07解決Unable to start embedded container&nbs
這篇文章主要介紹了解決Unable to start embedded container SpringBoot啟動(dòng)報(bào)錯(cuò)問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-07-07使用eclipse快速新建spirngboot項(xiàng)目的方法
本篇文章主要介紹了使用eclipse快速新建spirngboot項(xiàng)目的方法,具有一定的參考價(jià)值,有興趣的可以了解一下2017-04-04Java map 優(yōu)雅的元素遍歷方式說(shuō)明
這篇文章主要介紹了Java map 優(yōu)雅的元素遍歷方式說(shuō)明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-10-10java實(shí)現(xiàn)利用String類(lèi)的簡(jiǎn)單方法讀取xml文件中某個(gè)標(biāo)簽中的內(nèi)容
下面小編就為大家?guī)?lái)一篇java實(shí)現(xiàn)利用String類(lèi)的簡(jiǎn)單方法讀取xml文件中某個(gè)標(biāo)簽中的內(nèi)容。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2016-12-12Java Socket使用加密協(xié)議進(jìn)行傳輸對(duì)象的方法
這篇文章主要介紹了Java Socket使用加密協(xié)議進(jìn)行傳輸對(duì)象的方法,結(jié)合實(shí)例形式分析了java socket加密協(xié)議相關(guān)接口與類(lèi)的調(diào)用方法,以及服務(wù)器、客戶(hù)端實(shí)現(xiàn)技巧,需要的朋友可以參考下2017-06-06mybatis通過(guò)TypeHandler?list轉(zhuǎn)換string類(lèi)型轉(zhuǎn)換方式
這篇文章主要介紹了mybatis通過(guò)TypeHandler?list轉(zhuǎn)換string類(lèi)型轉(zhuǎn)換方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-07-07