Java中的Semaphore原理解析
1. Semaphore是什么?
Semaphore(信號量)是用來控制同時訪問特定資源的線程數(shù)量,它通過協(xié)調(diào)各個線程,以保證合理的使用公共資源。
Semaphore一般用于流量的控制,特別是公共資源有限的應(yīng)用場景。例如數(shù)據(jù)庫的連接,假設(shè)數(shù)據(jù)庫的連接數(shù)上線為10個,多個線程并發(fā)操作數(shù)據(jù)庫可以使用Semaphore來控制并發(fā)操作數(shù)據(jù)庫的線程個數(shù)最多為10個。
2. 類圖
通過類圖可以看到,Semaphore與ReentrantLock的內(nèi)部類的結(jié)構(gòu)相同,類內(nèi)部總共存在Sync、NonfairSync、FairSync三個類,NonfairSync與FairSync類繼承自Sync類,Sync類繼承自AbstractQueuedSynchronizer抽象類。
3. 實現(xiàn)原理
3.1 使用示例
// 定義一個資源池類 class Pool { // 可用資源數(shù)100 private static final int MAX_AVAILABLE = 100; // 定義信號量100 private final Semaphore available = new Semaphore(MAX_AVAILABLE, true); // 獲取資源 public Object getItem() throws InterruptedException { // 嘗試獲取 available.acquire(); // 返回可用資源 return getNextAvailableItem(); } // 釋放資源 public void putItem(Object x) { // 如果資源標(biāo)記為未被使用 if (markAsUnused(x)) // 釋放資源 available.release(); } // Not a particularly efficient data structure; just for demo // 定義資源類型,可以是滿足業(yè)務(wù)的任何類型 protected Object[] items = new Object[MAX_AVAILABLE] ... whatever kinds of items being managed // 是否被使用標(biāo)記 protected boolean[] used = new boolean[MAX_AVAILABLE]; // 獲取下一個可用資源 protected synchronized Object getNextAvailableItem() { // 循環(huán)遍歷 for (int i = 0; i < MAX_AVAILABLE; ++i) { // 如果未被使用 if (!used[i]) { // 使用標(biāo)記設(shè)置為true used[i] = true; // 返回當(dāng)前的資源 return items[i]; } } return null; // not reached } // 標(biāo)記資源為未被使用 protected synchronized boolean markAsUnused(Object item) { // 循環(huán)遍歷 for (int i = 0; i < MAX_AVAILABLE; ++i) { // 找到需要釋放的資源 if (item == items[i]) { // 如果是被使用中 if (used[i]) { // 使用標(biāo)記設(shè)置為false used[i] = false; // 返回true表示標(biāo)記成功 return true; } else // 返回false表示標(biāo)記失敗 return false; } } return false; } }
3.2 Sync
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L; // 構(gòu)造方法,調(diào)用父類AQS的setState方法,給共享變量state賦值 // 即通過構(gòu)造方法給鎖的數(shù)量附初始值 Sync(int permits) { setState(permits); } // 獲取鎖,也叫許可 final int getPermits() { return getState(); } // 共享模式下的非公平獲取 // 此方法也體現(xiàn)出與ReentrantLock中Sync的實現(xiàn)不同 // ReentrantLock中Sync是獨占模式下的獲取 // 具體實現(xiàn)的不同體現(xiàn)在int remaining = available - acquires; final int nonfairTryAcquireShared(int acquires) { for (;;) { // 獲取鎖的可用數(shù)量 int available = getState(); // 可用數(shù)量 - 請求的數(shù)量(acquires默認值為1) = 剩余量 int remaining = available - acquires; // 如果remaining < 0即請求的鎖大于可用的數(shù)量,馬上返回負數(shù),表示獲取鎖失敗 if (remaining < 0 || // 否則通過CAS的方式將可用數(shù)量換成剩余量,并返回剩余量 // 自旋 + CAS 保證線程安全,線程不用排隊體現(xiàn)出非公平性 compareAndSetState(available, remaining)) return remaining; } } // 共享模式下釋放鎖 protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); // CAS修改鎖數(shù)量,成功則返回,失敗則繼續(xù)自旋 if (compareAndSetState(current, next)) return true; } } // 根據(jù)指定的縮減量減小可用鎖的數(shù)目 final void reducePermits(int reductions) { for (;;) { int current = getState(); int next = current - reductions; if (next > current) // underflow throw new Error("Permit count underflow"); if (compareAndSetState(current, next)) return; } } // 獲取并返回立即可用的所有鎖 final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } } }
3.3 NonfairSync
static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; // 構(gòu)造方法初始化鎖數(shù)量 NonfairSync(int permits) { super(permits); } // 直接調(diào)用nonfairTryAcquireShared方法,走非公平策略 protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } }
3.4 FairSync
static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; // 構(gòu)造方法初始化鎖數(shù)量 FairSync(int permits) { super(permits); } // 共享模式下的公平策略獲取 // 與非公平策略唯一的不同體現(xiàn)在線程是否需要排隊 // 即是否調(diào)用hasQueuedPredecessors()方法進行判斷 // 如果需要排隊則立即返回繼續(xù)排隊 // 否則通過CAS方式獲取鎖并返貨鎖的剩余量,結(jié)束自旋 protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }
通過分析代碼發(fā)現(xiàn),Semaphore與ReentrantLock的內(nèi)部類的結(jié)構(gòu)相同,具體實現(xiàn)的不同體現(xiàn)在 int remaining = available - acquires這行代碼上。
ReentrantLock對于鎖的控制是 int c = getState(); if (c == 0){....}。體現(xiàn)為一種獨占的控制。
Semaphore對鎖的控制是 for (;;) { int available = getState(); int remaining = available - acquires;......}。即所有線程都可以進入自旋,只要鎖有剩余量都可以嘗試獲取鎖,體現(xiàn)為一種共享的控制。
3.5 Semaphore
public class Semaphore implements java.io.Serializable { private static final long serialVersionUID = -3222578661600680210L; /** All mechanics via AbstractQueuedSynchronizer subclass */ // 同步隊列 private final Sync sync; // 構(gòu)造方法初始話鎖數(shù)量 // 默認采用非公平策略 public Semaphore(int permits) { sync = new NonfairSync(permits); } // 構(gòu)造方法,帶一個布爾參數(shù),true表示采用公平策略,false表示采用非公平策略 public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); } }
3.5.1 acquire() 方法解析
// Semaphore public void acquire() throws InterruptedException { // 調(diào)用sync的acquireSharedInterruptibly,即響應(yīng)中斷的獲取 // 因為sync繼承AbstractQueuedSynchronizer // 即調(diào)用AQS的acquireSharedInterruptibly sync.acquireSharedInterruptibly(1); } // 進入AQS public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // 如果線程被中斷,則響應(yīng)中斷 if (Thread.interrupted()) throw new InterruptedException(); // 否則調(diào)用tryAcquireShared,如果獲取的鎖小于0即獲取鎖失敗則調(diào)用doAcquireSharedInterruptibly方法,進入同步隊列排隊 // 如果獲取鎖成功則不排隊,走業(yè)務(wù)邏輯 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } // Semaphore 中tryAcquireShared的實現(xiàn) // 公平策略 protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } // 非公平策略 protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } // 前面已經(jīng)解析過,不在贅述
3.5.2 release() 方法解析
// Semaphore public void release() { sync.releaseShared(1); } // 進入AQS public final boolean releaseShared(int arg) { // 嘗試釋放鎖, 如果釋放鎖成功 if (tryReleaseShared(arg)) { // 線程出同步隊列,返回true doReleaseShared(); return true; } // 否則返回false return false; } // Semaphore 中tryReleaseShared實現(xiàn) protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } }
3.5.3 其他方法
方法 | 說明 | 調(diào)用 |
acquire(int permits) | 獲取信號量,指定獲取許可的個數(shù),響應(yīng)中斷 | sync.acquireSharedInterruptibly(permits) |
acquireUninterruptibly() | 獲取信號量,默認獲取1個許可,不響應(yīng)中斷 | sync.acquireShared(1) |
acquireUninterruptibly(int permits) | 獲取信號量,指定獲取許可的個數(shù),不響應(yīng)中斷 | sync.acquireShared(permits) |
release(int permits) | 釋放信號量,指定釋放許可的個數(shù) | sync.releaseShared(permits); |
tryAcquire() | 嘗試獲取許可,如果獲取成功返回true,否則返回false,不會阻塞線程,而且不響應(yīng)中斷 | sync.nonfairTryAcquireShared(1) |
tryAcquire(int permits) | 同上,可以指定獲取許可的個數(shù) | sync.nonfairTryAcquireShared(permits) |
tryAcquire(long timeout, TimeUnit unit) | 共享式超時獲取 | sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)) |
tryAcquire(int permits, long timeout, TimeUnit unit) | 同上,可以指定獲取許可的個數(shù) | sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)) |
availablePermits() | 獲取可用許可數(shù) | sync.getPermits() |
drainPermits() | 將剩下的信號量一次性消耗光,并且返回所消耗的信號量 | sync.drainPermits() |
reducePermits(int reduction) | 減少信號量的總數(shù),不會導(dǎo)致任何線程阻塞,調(diào)用該方法可能會導(dǎo)致信號量最終為負數(shù) | sync.reducePermits(reduction) |
isFair() | 是否采用公平策略 | |
hasQueuedThreads() | 是否是已排隊的線程 | |
getQueueLength() | 獲取排隊線程的長度 | |
getQueuedThreads() | 獲取排隊線程 |
4. 總結(jié)
Semaphore是一個有效的流量控制工具,它基于AQS共享鎖實現(xiàn)。我們常常用它來控制對有限資源的訪問。使用步驟
每次使用資源前,先申請一個信號量,如果資源數(shù)不夠,就會阻塞等待;每次釋放資源后,就釋放一個信號量。
到此這篇關(guān)于Java中的Semaphore原理解析的文章就介紹到這了,更多相關(guān)Semaphore原理內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring Boot使用profile如何配置不同環(huán)境的配置文件
,springboot支持通過不同的profile來配置不同環(huán)境的配置,下面就大致介紹一下yml配置文件跟properties配置文件怎么使用profile配置不同環(huán)境的配置文件2018-01-01JAXB命名空間及前綴_動力節(jié)點Java學(xué)院整理
這篇文章主要給大家介紹了關(guān)于JAXB命名空間及前綴的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧。2017-08-08java微信公眾號開發(fā)第一步 公眾號接入和access_token管理
這篇文章主要為大家介紹了java微信公眾號開發(fā),主要內(nèi)容包括公眾號接入和access_token管理,感興趣的小伙伴們可以參考一下2016-01-01java中為什么要謹慎使用Arrays.asList、ArrayList的subList
這篇文章主要介紹了java中為什么要謹慎使用Arrays.asList、ArrayList的subList,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-02-02