java通過信號(hào)量實(shí)現(xiàn)限流的示例
信號(hào)量(Semaphore)是 Java 多線程并發(fā)中的一種 JDK 內(nèi)置同步器,通過它可以實(shí)現(xiàn)多線程對(duì)公共資源的并發(fā)訪問控制。
信號(hào)量由來
限流器
信號(hào)量的主要應(yīng)用場(chǎng)景是控制最多 N 個(gè)線程同時(shí)地訪問資源,其中計(jì)數(shù)器的最大值即是許可的最大值 N。
現(xiàn)在我們需要開發(fā)一個(gè)限流器,同一時(shí)刻最多有10個(gè)請(qǐng)求可以執(zhí)行。對(duì)于這樣的需求,我們實(shí)現(xiàn)的方案有:
- 使用Atomic類
- 使用Lock
- 使用條件變量
- 使用信號(hào)量
使用Atomic類實(shí)現(xiàn)
public class LimitByAtomic { ? ?private static final AtomicInteger COUNTER = new AtomicInteger(10); ? ?public void f() { ? ?int count = COUNTER.decrementAndGet(); ? ?if (count < 0) { ? ? ?COUNTER.incrementAndGet(); ? ? ?System.out.println("拒絕執(zhí)行業(yè)務(wù)邏輯"); ? ? ?return; // 拒絕執(zhí)行業(yè)務(wù)邏輯 ? } ? ? ?try { ? ? ?// 執(zhí)行業(yè)務(wù)邏輯 ? ? ?System.out.println("執(zhí)行業(yè)務(wù)邏輯"); ? } finally { ? ? ?COUNTER.incrementAndGet(); ? } } }
使用Lock實(shí)現(xiàn)
public class LimitByLock { ? ?private int count = 10; ? ?public void f() { ? ?if (count <= 0) { ? ? ?System.out.println("拒絕執(zhí)行業(yè)務(wù)邏輯"); ? ? ?return; ? } ? ? ?synchronized (this) { ? ? ?if (count <= 0) { ? ? ? ?System.out.println("拒絕執(zhí)行業(yè)務(wù)邏輯"); ? ? ? ?return; ? ? } ? ? ?count--; ? } ? ? ?try { ? ? ?// 執(zhí)行業(yè)務(wù)邏輯 ? ? ?System.out.println("執(zhí)行業(yè)務(wù)邏輯"); ? } finally { ? ? ?synchronized (this) { ? ? ? ?count++; ? ? } ? } } }
使用條件變量實(shí)現(xiàn)
對(duì)于使用Atomic類還是Lock這兩種實(shí)現(xiàn)方式,都有一個(gè)缺點(diǎn),如果10個(gè)線程同時(shí)執(zhí)行,當(dāng)?shù)?1個(gè)線程來執(zhí)行的時(shí)候,會(huì)被拒絕掉,這樣就沒有執(zhí)行業(yè)務(wù)邏輯的機(jī)會(huì),造成請(qǐng)求丟失。
所以我們可以通過線程等待-通知機(jī)制來解決上面的問題。如果10個(gè)線程同時(shí)執(zhí)行,當(dāng)?shù)?1個(gè)線程來執(zhí)行的時(shí)候,先阻塞這第11個(gè)線程,等待前面的10個(gè)線程只要執(zhí)行完一個(gè),就通知第11個(gè)線程來執(zhí)行。
public class LimitByCondition { ? ?private int count = 10; ? ?public void f() throws Exception { ? ?synchronized (this) { ? ? ?while (count <= 0) { ? ? ? ?System.out.println("等待執(zhí)行業(yè)務(wù)邏輯"); ? ? ? ?this.wait(); ? ? } ? ? ?count--; ? } ? ? ?try { ? ? ?System.out.println("執(zhí)行業(yè)務(wù)邏輯"); ? } finally { ? ? ?synchronized (this) { ? ? ? ?count++; ? ? ? ?this.notifyAll(); ? ? } ? } } }
使用Semaphore實(shí)現(xiàn)
除了使用條件變量,java sdk中還可以使用Semaphore
來實(shí)現(xiàn)。
public class LimitBySemaphore { ? ?private final Semaphore semaphore = new Semaphore(10); ? ?public void f() throws Exception { ? ?semaphore.acquire(); ? ?try { ? ? ?System.out.println("執(zhí)行業(yè)務(wù)邏輯"); ? } finally { ? ? ?semaphore.release(); ? } } }
接下來我們就來探討一下Semaphore
的實(shí)現(xiàn)原理。
Semaphore實(shí)現(xiàn)原理
信號(hào)量模型
實(shí)際上Semaphore的實(shí)現(xiàn)原理非常簡單,總結(jié)下來就是:一個(gè)計(jì)數(shù)器,一個(gè)等待隊(duì)列,三個(gè)方法。
在信號(hào)量模型里,計(jì)數(shù)器和等待隊(duì)列對(duì)外是透明,所以只能通過信號(hào)量模型提供的三個(gè)方法訪問,init(),down(),up()
--這些方法都是原子性的。
init()
:設(shè)置計(jì)數(shù)器的初始值。
down()
:計(jì)數(shù)器的值減1;如果此時(shí)計(jì)數(shù)器的值小于0,則當(dāng)前線程將被阻塞,否則當(dāng)前線程可以繼續(xù)執(zhí)行。
up()
:計(jì)數(shù)器的值加1;如果此時(shí)計(jì)數(shù)器的值小于等于0,則喚醒等待隊(duì)列中的一個(gè)線程,并將其從等待隊(duì)列中移除。
class MySemaphore{ ?// 計(jì)數(shù)器 ?int count; ?// 等待隊(duì)列 ?Queue queue; ?// 初始化操作 ?MySemaphore(int c){ ? ?this.count=c; } ?// ?void down(){ ? ?this.count--; ? ?if(this.count<0){ ? ? ?// 將當(dāng)前線程插入等待隊(duì)列 ? ? ?// 阻塞當(dāng)前線程 ? } } ?void up(){ ? ?this.count++; ? ?if(this.count<=0) { ? ? ?// 移除等待隊(duì)列中的某個(gè)線程 T ? ? ?// 喚醒線程 T ? } } } ?
使用方法如下:
static int count; // 初始化信號(hào)量 static final MySemaphore s ? ?= new MySemaphore(1); // 用信號(hào)量保證互斥 ? ? static void addOne() { ?s.down(); ?try { ? ?count+=1; } finally { ? ?s.up(); } } ?
實(shí)際上信號(hào)量模型,down()、up() 這兩個(gè)操作歷史上最早稱為 P 操作和 V 操作,所以信號(hào)量模型也被稱為 PV 原語。
Java Semaphore的實(shí)現(xiàn)
public class Semaphore implements java.io.Serializable { public void acquire() throws InterruptedException; ?public void acquireUninterruptibly(); ?public boolean tryAcquire(); ?public boolean tryAcquire(long timeout, TimeUnit unit); ?public void release(); ?public void acquire(int permits) throws InterruptedException; ?public void acquireUninterruptibly(int permits) ; ?public boolean tryAcquire(int permits); ?public boolean tryAcquire(int permits, long timeout, TimeUnit unit) ? ?throws InterruptedException; ?public void release(int permits); }
Java Semaphore的實(shí)現(xiàn),acquire()
對(duì)應(yīng)信號(hào)量模型里的down()
方法,release()
對(duì)應(yīng)信號(hào)量模型里的up()
方法。
Semaphore類提供的常用方法有以下幾個(gè)。我們可以粗略地將以下方法分為兩組。前五個(gè)為一組,默認(rèn)一次獲取或釋放的許可(permit)個(gè)數(shù)為1。后五個(gè)為一組,可以指定一次獲取或釋放的許可個(gè)數(shù)。對(duì)于每組方法來說,都有4個(gè)不同的獲取許可的方法:可中斷獲取、不可中斷獲取、非阻塞獲取、可超時(shí)獲取,這跟Lock提供的各種加鎖方法非常相似。
Java Semaphore的實(shí)現(xiàn)也是基于AQS來實(shí)現(xiàn)的,跟ReentrantLock一樣,Semaphore中的AQS也有公平鎖與非公平鎖這兩種實(shí)現(xiàn)。
public class Semaphore implements java.io.Serializable { abstract static class Sync extends AbstractQueuedSynchronizer { ? } ? ? ?// 非公平鎖 ? ?static final class NonfairSync extends Sync { ? } ? ?// 公平鎖 ? ?static final class FairSync extends Sync { ? } ? ?// 默認(rèn)使用非公平鎖 ? ?public Semaphore(int permits) { ? ? ? ?sync = new NonfairSync(permits); ? } ? ? ?public Semaphore(int permits, boolean fair) { ? ? ? ?sync = fair ? new FairSync(permits) : new NonfairSync(permits); ? } }
Semaphore可以看做是一種共享鎖,因此,F(xiàn)airSync類和NofairSync類實(shí)現(xiàn)了AQS的tryAcquireShared()抽象方法,不過,實(shí)現(xiàn)邏輯并不相同。對(duì)于tryReleaseShared()抽象方法,因?yàn)樵贔airSync和NofairSync中的實(shí)現(xiàn)邏輯相同,因此,它被放置于FairSync和NofairSync的公共父類Sync中。
acquire()
實(shí)現(xiàn)如下:
// java.util.concurrent.Semaphore#acquire() public void acquire() throws InterruptedException { ?sync.acquireSharedInterruptibly(1); } ? // java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly public final void acquireSharedInterruptibly(int arg) ? ? ? ? ? ?throws InterruptedException { ?// 先判斷線程有沒有被中斷 ?if (Thread.interrupted()) ? ?throw new InterruptedException(); ?// 嘗試獲取共享鎖,如果獲取許可失敗,返回值<0, 需要進(jìn)入等待隊(duì)列 ?if (tryAcquireShared(arg) < 0) ? ?doAcquireSharedInterruptibly(arg); // 排隊(duì)等待隊(duì)列 } ?
tryAcquireShared()
實(shí)現(xiàn)
// java.util.concurrent.Semaphore.FairSync#tryAcquireShared protected int tryAcquireShared(int acquires) { ?for (;;) { ? ?if (hasQueuedPredecessors()) // 比非公平鎖多了這一行 ? ? ?return -1; ? ?int available = getState(); ? ?int remaining = available - acquires; ? ?if (remaining < 0 || ? ? ? ?compareAndSetState(available, remaining)) ? ? ?return remaining; } } ? // java.util.concurrent.Semaphore.Sync#nonfairTryAcquireShared final int nonfairTryAcquireShared(int acquires) { ?for (;;) { ? ?int available = getState(); ? ?int remaining = available - acquires; ? ?if (remaining < 0 || ? ? ? ?compareAndSetState(available, remaining)) ? ? ?return remaining; } }
以上兩個(gè)tryAcquireShared()
函數(shù)的代碼實(shí)現(xiàn)基本相同。許可個(gè)數(shù)存放在AQS的state變量中,兩個(gè)函數(shù)都是通過自旋+CAS的方式來獲取許可。兩個(gè)函數(shù)唯一的區(qū)別在于,對(duì)于公平模式下的Semaphore,當(dāng)線程調(diào)用tryAcquireShared()函數(shù)時(shí),如果等待隊(duì)列中有等待許可的線程,那么,線程將直接去排隊(duì)等待許可,而不是像非公平模式下的Semaphore那樣,線程可以插隊(duì)直接競爭許可。
release()
實(shí)現(xiàn)
// java.util.concurrent.Semaphore#release() public void release() { ?sync.releaseShared(1); } ? // java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared public final boolean releaseShared(int arg) { ?// 嘗試釋放許可 ?if (tryReleaseShared(arg)) { ? ?// 喚醒等待隊(duì)列其中一個(gè)線程 ? ?doReleaseShared(); ? ?return true; } ?return false; } ? // java.util.concurrent.Semaphore.Sync#tryReleaseShared protected final boolean tryReleaseShared(int releases) { ?// 采用自旋 + CAS來更新state ?for (;;) { ? ?int current = getState(); ? ?int next = current + releases; ? ?if (next < current) // overflow ? ? ?throw new Error("Maximum permit count exceeded"); ? ?if (compareAndSetState(current, next)) ? ? ?return true; } }
總結(jié)
semaphore其中一個(gè)功能是lock不容易實(shí)現(xiàn)的,那就是:semaphore可以允許多個(gè)線程訪問同一個(gè)臨界區(qū)。
比較常見的需求就是我們工作中遇到各種池化資源,例如連接池,對(duì)象池,線程池等等。其中,最熟悉的可能是數(shù)據(jù)庫連接池,在同一時(shí)刻,一定是允許多個(gè)線程同時(shí)使用連接池的,當(dāng)然,每個(gè)鏈接在被釋放前,是不允許其他線程使用的。
對(duì)象池:一次性創(chuàng)建出N個(gè)對(duì)象,之后所有的線程重復(fù)利用這N個(gè)對(duì)象,對(duì)象在被釋放前,也是不允許其他線程使用的。對(duì)象池,可以用List保存實(shí)例對(duì)象。
class ObjPool<T, R> { ?final List<T> pool; ?// 用信號(hào)量實(shí)現(xiàn)限流器 ?final Semaphore sem; ?// 構(gòu)造函數(shù) ?ObjPool(int size, T t){ ? ?pool = new Vector<T>(){}; ? ?for(int i=0; i<size; i++){ ? ? ?pool.add(t); ? } ? ?sem = new Semaphore(size); } ?// 利用對(duì)象池的對(duì)象,調(diào)用 func 限流 ?R exec(Function<T,R> func) { ? ?T t = null; ? ?sem.acquire(); ? ?try { ? ? ?t = pool.remove(0); ? ? ?return func.apply(t); ? } finally { ? ? ?pool.add(t); ? ? ?sem.release(); ? } } } // 創(chuàng)建對(duì)象池 ObjPool<Long, String> pool = ?new ObjPool<Long, String>(10, 2); // 通過對(duì)象池獲取 t,之后執(zhí)行 ? pool.exec(t -> { ? ?System.out.println(t); ? ?return t.toString(); }); ?
到此這篇關(guān)于java通過信號(hào)量實(shí)現(xiàn)限流的示例的文章就介紹到這了,更多相關(guān)java 限流內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- java實(shí)現(xiàn)單機(jī)限流
- Java限流實(shí)現(xiàn)的幾種方法詳解
- 詳解5種Java中常見限流算法
- Java服務(wù)限流算法的6種實(shí)現(xiàn)
- Java中實(shí)現(xiàn)接口限流的方案詳解
- 關(guān)于Java限流功能的簡單實(shí)現(xiàn)
- 使用Java自定義注解實(shí)現(xiàn)一個(gè)簡單的令牌桶限流器
- Java中常見的4種限流算法詳解
- Java實(shí)現(xiàn)限流接口的示例詳解
- Java面試之限流的實(shí)現(xiàn)方式小結(jié)
- Java代碼實(shí)現(xiàn)四種限流算法詳細(xì)介紹
相關(guān)文章
Java使用JMeter進(jìn)行高并發(fā)測(cè)試
軟件的壓力測(cè)試是一種保證軟件質(zhì)量的行為,本文主要介紹了Java使用JMeter進(jìn)行高并發(fā)測(cè)試,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-11-11Java算法之BFS,DFS,動(dòng)態(tài)規(guī)劃和貪心算法的實(shí)現(xiàn)
廣度優(yōu)先搜索(BFS)和深度優(yōu)先搜索(DFS)是圖遍歷算法中最常見的兩種算法,主要用于解決搜索和遍歷問題。動(dòng)態(tài)規(guī)劃和貪心算法則用來解決優(yōu)化問題。本文就來看看這些算法的具體實(shí)現(xiàn)吧2023-04-04MyBatis-Plus如何最優(yōu)雅最簡潔地完成數(shù)據(jù)庫操作
Mybatis-Plus是一個(gè)?Mybatis?的增強(qiáng)工具,在?Mybatis?的基礎(chǔ)上只做增強(qiáng)不做改變,為簡化開發(fā)、提高效率而生,下面這篇文章主要給大家介紹了關(guān)于MyBatis-Plus如何最優(yōu)雅最簡潔地完成數(shù)據(jù)庫操作的相關(guān)資料,需要的朋友可以參考下2022-03-03Spring?Boot?根據(jù)配置決定服務(wù)(集群、單機(jī))是否使用某些主件的操作代碼
這篇文章主要介紹了Spring?Boot根據(jù)配置決定服務(wù)(集群、單機(jī))是否使用某些主件,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2025-04-04