Java AQS信號量Semaphore的使用
一.什么是Semaphore
Semaphore,俗稱信號量,它是操作系統中PV操作的原語在java的實現,它也是基于AbstractQueuedSynchronizer實現的。
Semaphore的功能非常強大,大小為1的信號量就類似于互斥鎖,通過同時只能有一個線程獲取信號量實現。大小為n(n>0)的信號量可以實現限流的功能,它可以實現只能有n個線程同時獲取信號量。
PV操作是操作系統一種實現進程互斥與同步的有效方法。PV操作與信號量(S)的處理相關,P表示通過的意思,V表示釋放的意思。用PV操作來管理共享資源時,首先要確保PV操作自身執(zhí)行的正確性。
P操作的主要動作是:
①S減1;
②若S減1后仍大于或等于0,則進程繼續(xù)執(zhí)行;
③若S減1后小于0,則該進程被阻塞后放入等待該信號量的等待隊列中,然后轉進程調度。
V操作的主要動作是:
①S加1;
②若相加后結果大于0,則進程繼續(xù)執(zhí)行;
③若相加后結果小于或等于0,則從該信號的等待隊列中釋放一個等待進程,然后再返回原進程繼續(xù)執(zhí)行或轉進程調度。
二.Semaphore的使用
構造器
public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
permits 表示許可證的數量(資源數)
fair 表示公平性,如果這個設為 true 的話,下次執(zhí)行的線程會是等待最久的線程
常用方法
public void acquire() throws InterruptedException
public boolean tryAcquire()
public void release()
public int availablePermits()
public final int getQueueLength()
public final boolean hasQueuedThreads()
protected void reducePermits(int reduction)
- acquire() 表示阻塞并獲取許可
- tryAcquire() 方法在沒有許可的情況下會立即返回 false,要獲取許可的線程不會阻塞
- release() 表示釋放許可
- int availablePermits():返回此信號量中當前可用的許可證數。
- int getQueueLength():返回正在等待獲取許可證的線程數。
- boolean hasQueuedThreads():是否有線程正在等待獲取許可證。
- void reducePermit(int reduction):減少 reduction 個許可證
- Collection getQueuedThreads():返回所有等待獲取許可證的線程集合
應用場景
可以用于做流量控制,特別是公用資源有限的應用場景
限流
/** * 實現一個同時只能處理5個請求的限流器 */ private static Semaphore semaphore = new Semaphore(5); /** * 定義一個線程池 */ private static ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 50, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(200)); public static void exec() { try { semaphore.acquire(1); // 模擬真實方法執(zhí)行 System.out.println("執(zhí)行exec方法" ); Thread.sleep(2000); } catch (Exception e) { e.printStackTrace(); } finally { semaphore.release(1); } } public static void main(String[] args) throws InterruptedException { { for (; ; ) { Thread.sleep(100); // 模擬請求以10個/s的速度 executor.execute(() -> exec()); } } }
三.Semaphore源碼分析
主要關注 Semaphore的加鎖解鎖(共享鎖)邏輯實現,線程競爭鎖失敗入隊阻塞邏輯和獲取鎖的線程釋放鎖喚醒阻塞線程競爭鎖的邏輯實現
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //嘗試獲取共享鎖,大于等于0則直接獲取鎖成功,小于0時則共享鎖阻塞 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
tryAcquireShared的實現
final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; // 當減一之后的值小于0 或者 // compareAndSetState成功,把state變?yōu)閞emaining,即將狀態(tài)減一 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
入隊阻塞
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //入隊,創(chuàng)建節(jié)點 使用共享模式 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { //獲取當前節(jié)點的前軀節(jié)點 final Node p = node.predecessor(); //如果節(jié)點為head節(jié)點 if (p == head) { //阻塞動作比較重,通常會再嘗試獲取資源,沒有獲取到返回負數 int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //判斷是否可以阻塞 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
入隊操作
private Node addWaiter(Node mode) { //構建節(jié)點,模式是共享模式 Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null) { //設置前一節(jié)點為tail node.prev = pred; //設置當前節(jié)點為尾節(jié)點 if (compareAndSetTail(pred, node)) { // 前一節(jié)點的next為當前節(jié)點 pred.next = node; return node; } } //創(chuàng)建隊列 enq(node); return node; }
創(chuàng)建隊列,經典的for循環(huán)創(chuàng)建雙向鏈表
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { //節(jié)點為空 則new一個節(jié)點 設置頭節(jié)點 if (compareAndSetHead(new Node())) //把這個節(jié)點的頭節(jié)點賦值給尾節(jié)點 tail = head; } else { // 如果尾節(jié)點存在 就將該節(jié)點的前節(jié)點指向tail node.prev = t; //設置當前節(jié)點為tail if (compareAndSetTail(t, node)) { //前一個節(jié)點的next指向當前節(jié)點,入隊操作就完成了 t.next = node; return t; } } } }
設置waitStatus狀態(tài)及獲取waitStatus狀態(tài),waitStatus為-1時可以喚醒后續(xù)節(jié)點
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * 狀態(tài)是-1 就可以喚醒后續(xù)節(jié)點 * */ return true; if (ws > 0) { /* * 前置任務已取消,刪掉節(jié)點 * */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * cas設置waitstatus狀態(tài),設置為-1 * */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
阻塞 調用LockSupport.park
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
釋放鎖的邏輯
public void release() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { //cas成功則進行釋放共享鎖 if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
state狀態(tài)+1操作,cas成功,返回true
protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } }
private void doReleaseShared() { for (;;) { Node h = head; //頭節(jié)點不為空并且不是尾節(jié)點 if (h != null && h != tail) { int ws = h.waitStatus; //waitstatus為-1 if (ws == Node.SIGNAL) { //將SIGNAL置為0 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; //喚醒 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) break; } }
喚醒操作
private void unparkSuccessor(Node node) { int ws = node.waitStatus; //ws小于0就將其設置為0 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; //當前節(jié)點的下一個節(jié)點為空或者ws大于0 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //s不為空 則進行喚醒 if (s != null) LockSupport.unpark(s.thread); }
喚醒下一個線程之后,要把上一個節(jié)點移除隊列
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); //下一個線程進來,如果前置節(jié)點是頭節(jié)點,則將前置節(jié)點出隊 if (p == head) { int r = tryAcquireShared(arg); //cas獲取資源成功 if (r >= 0) { //出隊操作 setHeadAndPropagate(node, r); //將p.next移除 p.next = null; failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
出隊操作
private void setHeadAndPropagate(Node node, int propagate) { //設置當前節(jié)點為head節(jié)點,前一節(jié)點的head屬性被刪除 Node h = head; setHead(node); //如果是傳播屬性 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { //并且是共享模式,可以持續(xù)喚醒下一個, //只要資源數充足 就可以一直往下喚醒,提高并發(fā)量 Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } } private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }
至此,線程的阻塞喚醒核心邏輯就這么多,共享鎖與獨占鎖的區(qū)別是可以喚醒后續(xù)的線程,如果資源數充足的話,可以一直往下喚醒,提高了并發(fā)量。
到此這篇關于Java AQS信號量Semaphore的使用的文章就介紹到這了,更多相關Java信號量Semaphore內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!