java?Semaphore共享鎖實現(xiàn)原理解析
正文
在線程間通信方式中,我們了解到可以使用Semaphore信號量來實現(xiàn)線程間通信,Semaphore支持公平鎖和非公平鎖,Semaphore底層是通過共享鎖來實現(xiàn)的,其支持兩種構造函數(shù),如下所示:
// 默認使用非公平鎖實現(xiàn) public Semaphore(int permits) { sync = new NonfairSync(permits); } ? public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
Semaphore提供的常用函數(shù)如下所示:
函數(shù)名 | 說明 | 備注 |
---|---|---|
acquire | 獲取鎖 | / |
release | 釋放鎖 | / |
下面我們來看下Semaphore內(nèi)部的實現(xiàn)原理
Semaphore內(nèi)部類及繼承關系
可以看出Semaphore和ReentrantLock實現(xiàn)原理基本一致,包含NonfairSync和FairSync兩個內(nèi)部類,這兩個內(nèi)部類的父類均為AQS,不妨大膽猜測Semaphore也是依賴AQS實現(xiàn)的,接下來我們一起來看下Semaphore獲取和釋放鎖的流程。
Semaphore.acquire流程分析(以非公平鎖為例)
從上圖可以看出,針對阻塞線程的部分實現(xiàn),和ReentrantLock基本一致,我們不做贅述,主要來看下前半部分的源碼實現(xiàn):
// Semaphore.java public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
// AbstractQueuedSynchronizer.java public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // 如果線程是中斷狀態(tài),拋出異常 if (Thread.interrupted()) throw new InterruptedException(); // 嘗試獲取共享資源 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
從源碼可以看出acquire主要依賴于tryAcquireShared和doAcquireSharedInterruptibly,接下來我們來分別看下這兩塊的代碼
tryAcquireShared
// NonfairSync protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); }
// Sync final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
// AbstractQueuedSynchronizer.java protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
從代碼可以看出這里主要是根據(jù)申請的許可證數(shù)量,比較時否有許可證數(shù)量,如果可用許可證數(shù)量小于0,則直接返回,如果大于0,則通過CAS將state設置為可用許可證數(shù)量。
doAcquireSharedInterruptibly
當tryAcquireShared中返回的可用許可證數(shù)量小于0時,執(zhí)行doAcquireSharedInterruptibly流程,代碼如下:
// AbstractQueuedSynchronizer.java // 在隊尾新建Node對象并添加 private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
// AbstractQueuedSynchronizer.java private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 將當前線程添加到等待隊列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { // for循環(huán)自旋 for (;;) { // 獲取node的前一個節(jié)點 final Node p = node.predecessor(); // 如果前一個節(jié)點是頭節(jié)點 if (p == head) { // 嘗試獲取鎖 int r = tryAcquireShared(arg); if (r >= 0) { // 獲取鎖成功,更新node信息設置為頭節(jié)點,并通知其他節(jié)點 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // 判斷是否需要阻塞線程,設置waitStatus并阻塞 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
執(zhí)行setHeadAndPropagate的主要目的在于,這里能獲取到說明在該線程自旋過程中有線程釋放了許可證,釋放的許可證數(shù)量有可能還有剩余,所以傳遞給其他節(jié)點的線程,喚醒其他阻塞狀態(tài)的線程也嘗試去獲取許可證。
Semaphore.release流程分析(以非公平鎖為例)
Semaphore.release流程相對而言,就比較簡單,將release傳遞到AQS內(nèi)部通過CAS更新許可證數(shù)量信息,更新完成后,遍歷隊列中Node節(jié)點,將Node waitStatus設置為0,并對對應線程執(zhí)行unpark,相關代碼如下:
protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); // 通過CAS更新許可證數(shù)量 if (compareAndSetState(current, next)) return true; } }
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); } // 許可證數(shù)量更新完成后,調(diào)用該方法喚醒線程 private void doReleaseShared() { // 自旋 for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 喚醒后繼節(jié)點線程搶占許可證 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
綜上,我們分析了Smaphore非公平鎖的實現(xiàn),感興趣的可以分析下公平鎖的實現(xiàn),其本質(zhì)區(qū)別在于在tryAcquireShared中只有當?shù)却犃袨榭諘r,才會去嘗試更新剩余許可證數(shù)量。
以上就是Semaphore共享鎖實現(xiàn)原理解析的詳細內(nèi)容,更多關于Semaphore共享鎖的資料請關注腳本之家其它相關文章!
相關文章
Java Chassis3過載狀態(tài)下的快速失敗解決分析
本文解密了Java Chassis 3快速失敗相關的機制和背后故事,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2024-01-01springboot vue組件開發(fā)實現(xiàn)接口斷言功能
這篇文章主要為大家介紹了springboot+vue組件開發(fā)實現(xiàn)接口斷言功能,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-05-05Myeclipse鏈接Oracle等數(shù)據(jù)庫時lo exception: The Network Adapter coul
今天小編就為大家分享一篇關于Myeclipse鏈接Oracle等數(shù)據(jù)庫時lo exception: The Network Adapter could not establish the connection,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2019-03-03