AQS同步組件Semaphore信號量案例剖析
基本概念
Semaphore也是一個線程同步的輔助類,可以維護(hù)當(dāng)前訪問自身的線程個數(shù),并提供了同步機(jī)制。使用Semaphore可以控制并發(fā)訪問資源的線程個數(shù)。
例如排隊買票的情況,如果只有三個窗口,那么同一時間最多也只能有三個人買票。第四個人來了之后就必須在后面等著,只有其他人買好了,才可以去相應(yīng)的窗口進(jìn)行買票 。
作用和使用場景
- 用于保證同一時間并發(fā)訪問線程的數(shù)目。
- 信號量在操作系統(tǒng)中是很重要的概念,Java并發(fā)庫里的Semaphore就可以很輕松的完成類似操作系統(tǒng)信號量的控制。Semaphore可以很容易控制系統(tǒng)中某個資源被同時訪問的線程個數(shù)。
- 在數(shù)據(jù)結(jié)構(gòu)中我們學(xué)過鏈表,鏈表正常是可以保存無限個節(jié)點的,而Semaphore可以實現(xiàn)有限大小的列表。
使用場景:僅能提供有限訪問的資源。比如數(shù)據(jù)庫連接。
源碼分析
構(gòu)造函數(shù)
/** *接受一個整型的數(shù)字,表示可用的許可證數(shù)量。Semaphore(10)表*示允許10個線程獲取許可證, *也就是最大并發(fā)數(shù)是10。 * * @param permits 可用許可證的初始數(shù)量。 **/ public Semaphore(int permits) { sync = new NonfairSync(permits); } /** * 使用給定的許可數(shù)量和給定的公平性設(shè)置 * * @param permits 可用許可證的初始數(shù)量。 * * @param fair 指定是公平模式還是非公平模式,默認(rèn)非公平模式 . 公平模式:先啟動的線程優(yōu)先得到 * 許可。 非公平模式:先啟動的線程并不一定先獲得許可,誰搶到誰就獲得許可。 */ public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
常用方法
acquire() 獲取一個許可
acquire(int permits) 獲取指定個數(shù)的許可
tryAcquire()方法嘗試獲取1個許可證
tryAcquire(long timeout, TimeUnit unit) 最大等待許可的時間
tryAcquire(int permits) 獲取指定個數(shù)的許可
tryAcquire(int permits, long timeout, TimeUnit unit) 最大等待許可的時間
availablePermits() : 返回此信號量中當(dāng)前可用的許可證數(shù)
release() 釋放許可
release(int permits) 釋放指定個數(shù)的許可
int getQueueLength() 返回正在等待獲取許可證的線程數(shù)。
boolean hasQueuedThreads() 是否有線程正在等待獲取許可證。
void reducePermits(int reduction) 減少reduction個許可證。是個protected方法。
Collection getQueuedThreads() 返回所有等待獲取許可證的線程集合。是個protected方法。
使用案例
acquire()獲取單個許可
/** * 線程數(shù)量 */ private final static int threadCount = 15; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(3); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { //獲取一個許可 semaphore.acquire(); test(threadNum); //釋放一個許可 semaphore.release(); } catch (Exception e) { log.error("exception", e); } }); } exec.shutdown(); } private static void test(int threadNum) throws Exception { // 模擬請求的耗時操作 Thread.sleep(1000); log.info("{}", threadNum); }
輸出結(jié)果:
根據(jù)輸出結(jié)果的時間可以看出來同一時間最多只能3個線程執(zhí)行,符合預(yù)期
acquire(int permits)獲取多個許可
/** * 線程數(shù)量 */ private final static int threadCount = 15; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); //信號量設(shè)置為3,也就是最大并發(fā)量為3,同時只允許3個線程獲得許可 final Semaphore semaphore = new Semaphore(3); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { //獲取多個許可 semaphore.acquire(3); test(threadNum); //釋放多個許可 semaphore.release(3); } catch (Exception e) { log.error("exception", e); } }); } exec.shutdown(); } private static void test(int threadNum) throws Exception { // 模擬請求的耗時操作 Thread.sleep(1000); log.info("{}", threadNum); }
輸出結(jié)果:
設(shè)置了3個許可,每個線程每次獲取3個許可,因此同一時間只能有1個線程執(zhí)行 。
tryAcquire()獲取許可
tryAcquire()嘗試獲取一個許可,如果未獲取到,不等待,將直接丟棄該線程不執(zhí)行
/** * 線程數(shù)量 */ private final static int threadCount = 15; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); //信號量設(shè)置為3,也就是最大并發(fā)量為3,同時只允許3個線程獲得許可 final Semaphore semaphore = new Semaphore(3); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { //嘗試獲取一個許可,如果未獲取到,不等待,將直接丟棄該線程不執(zhí)行 if(semaphore.tryAcquire()) { test(threadNum); //釋放許可 semaphore.release(); } } catch (Exception e) { log.error("exception", e); } }); } exec.shutdown(); } private static void test(int threadNum) throws Exception { // 模擬請求的耗時操作 Thread.sleep(1000); log.info("{}", threadNum); }
輸出結(jié)果:
從輸出可以看到,在3個線程獲取到3個許可后,因為每個線程調(diào)用的方法要執(zhí)行1秒中,最早的一個許可也要在1S后釋放,剩下的17個線程未獲取到許可,使用了semaphore.tryAcquire()方法,沒有設(shè)置等待時間,所以便直接被丟棄,不執(zhí)行了。
tryAcquire(long timeout, TimeUnit unit)
tryAcquire(long timeout, TimeUnit unit)未獲取到許可,設(shè)置等待時長
/** * 線程數(shù)量 */ private final static int threadCount = 15; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); //信號量設(shè)置為3,也就是最大并發(fā)量為3,同時只允許3個線程獲得許可 final Semaphore semaphore = new Semaphore(3); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { //設(shè)置了獲取許可等待時間為2秒,如果兩秒后還是未獲得許可的線程便得不到執(zhí)行 if(semaphore.tryAcquire(2000, TimeUnit.MILLISECONDS)) { test(threadNum); //釋放許可 semaphore.release(); } } catch (Exception e) { log.error("exception", e); } }); } exec.shutdown(); } private static void test(int threadNum) throws Exception { // 模擬請求的耗時操作 Thread.sleep(1000); log.info("{}", threadNum); }
輸出結(jié)果:
tryAcquire通過參數(shù)指定了2秒的等待時間。 上述代碼中同一時間最多執(zhí)行3個。第4個線程因前3個線程執(zhí)行需要耗時一秒未釋放許可,因此需要等待。
但是由于設(shè)置了2秒的等待時間,所以在5秒內(nèi)等待到了釋放的許可,繼續(xù)執(zhí)行,循環(huán)往復(fù)。
但是15個線程 ,每秒并發(fā)3個,2S是執(zhí)行不完的。所以上面執(zhí)行到第6個(0開始,顯示是5)就結(jié)束了,【每次執(zhí)行結(jié)果會有差異,取決于CPU】,并沒有全部執(zhí)行完15個線程。
以上就是AQS同步組件Semaphore信號量案例剖析的詳細(xì)內(nèi)容,更多關(guān)于AQS同步組件Semaphore的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java中Timer的schedule()方法參數(shù)詳解
今天小編就為大家分享一篇關(guān)于Java中Timer的schedule()方法參數(shù)詳解,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2019-03-03spring boot開發(fā)遇到坑之spring-boot-starter-web配置文件使用教程
Spring Boot支持容器的自動配置,默認(rèn)是Tomcat,當(dāng)然我們也是可以進(jìn)行修改的。這篇文章給大家介紹了spring boot開發(fā)遇到坑之spring-boot-starter-web配置文件使用教程,需要的朋友參考下吧2018-01-01Java 中HashCode作用_動力節(jié)點Java學(xué)院整理
這篇文章主要介紹了Java 中HashCode作用以及hashcode對于一個對象的重要性,對java中hashcode的作用相關(guān)知識感興趣的朋友一起學(xué)習(xí)吧2017-05-05java多線程CountDownLatch與線程池ThreadPoolExecutor/ExecutorService案
這篇文章主要介紹了java多線程CountDownLatch與線程池ThreadPoolExecutor/ExecutorService案例,2021-02-02java設(shè)計模式原型模式與享元模式調(diào)優(yōu)系統(tǒng)性能詳解
這篇文章主要為大家介紹了java設(shè)計模式原型模式與享元模式調(diào)優(yōu)系統(tǒng)性能方法詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-05-05