java線程安全鎖ReentrantReadWriteLock原理分析readLock
前言
很多時候,我們?yōu)榱吮WC線程安全,會對一段代碼加鎖,但是加鎖就意味著程序效率的下降,所以,我們經(jīng)常會對鎖進(jìn)行一些優(yōu)化,例如嚴(yán)格控制加鎖的粒度,利用cas來代替加鎖等。而今天我們介紹的讀寫鎖,也是對鎖的一種優(yōu)化方案的實現(xiàn)。試想一下,如果我們的線程大部分時候都是讀操作,那么讀操作與讀操作直接有必要互斥嗎?答案是沒有必要的,只有讀寫操作,寫寫操作才需要通過互斥來保證線程安全。今天我們通過ReentrantReadWriteLock來看看讀寫鎖是如何實現(xiàn)的。
ReentrantReadWriteLock的簡單使用
public void test1(){ ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); new Thread(()->{ Lock readLock = readWriteLock.readLock(); readLock.lock(); try{ //讀操作 }catch (Exception e){ e.printStackTrace(); }finally { readLock.unlock(); } }).start(); new Thread(()->{ Lock writeLock = readWriteLock.writeLock(); writeLock.lock(); try{ //寫操作 }catch (Exception e){ e.printStackTrace(); }finally { writeLock.unlock(); } }).start(); }
readLock源碼分析
首先是lock()方法。
lock()
readLock是通過共享鎖來實現(xiàn)的。lock()方法會調(diào)用acquireShared()方法。所以我們直接分析acquireShared()方法。
public void lock() { sync.acquireShared(1); }
acquireShared()
acquireShared()主要的邏輯就在tryAcquireShared()方法和doAcquireShared()方法,首先調(diào)用tryAcquireShared()方法,如果返回的值小于0,那么就需要調(diào)用doAcquireShared()方法進(jìn)行阻塞,否則就會直接去執(zhí)行業(yè)務(wù)代碼。接下來我們重點分析tryAcquireShared()方法和doAcquireShared()方法。
if (tryAcquireShared(arg) < 0) doAcquireShared(arg);
tryAcquireShared()
這個方法的邏輯其實比較簡單??梢苑譃槿齻€部分來看
1.首先通過state(state的高16位表示讀鎖的獲取次數(shù),低16位表示寫鎖的次數(shù))獲取到寫鎖的count,會判斷寫鎖的count是否大于0(大于0意味著有線程獲取了寫鎖)并且獲取寫鎖的線程不是當(dāng)前線程,那么返回-1,需要執(zhí)行doAcquireShared()方法去阻塞當(dāng)前線程。
2.通過cas修改讀鎖的count(保證線程安全)。如果當(dāng)前線程是第一個獲取讀鎖的線程,那么修改firstReader(第一個獲取讀鎖的線程),firstReaderHoldCount(第一個獲取讀鎖的線程的重入次數(shù)),否則通過當(dāng)前線程獲取讀鎖的次數(shù)構(gòu)建成一個HoldCounter對象,并且放入到readHolds中(readHolds是一個ThreadLocal),同時維護(hù)一個cachedHoldCounter的緩存(當(dāng)前線程獲取讀鎖的重入次數(shù)的緩存)。
3.cas失敗,讀鎖的數(shù)量達(dá)到最大值或者readerShouldBlock()方法判斷需要等待的話,就調(diào)用fullTryAcquireShared()方法。
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); //獲取寫鎖的數(shù)量,如果有線程獲取了寫鎖, //并且獲取寫鎖的線程不是當(dāng)前線程 直接返回-1 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //獲取讀鎖的數(shù)量 int r = sharedCount(c); //readerShouldBlock()判斷是否需要等待,如果不需要等待并且當(dāng)前獲取讀鎖的數(shù)量小于最大值的限制,cas也成功替換了讀鎖數(shù)量 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { //如果當(dāng)前讀鎖數(shù)量為0 那么當(dāng)前線程就是第一個獲取讀鎖的線程 if (r == 0) { //將當(dāng)前線程賦值給firstReader firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { //如果當(dāng)前線程是第一個獲取讀鎖的線程,那么只需要將firstReaderHoldCount +1 //從這里我們就可以知道讀鎖是支持重入的 firstReaderHoldCount++; } else { //HoldCounter用來保存當(dāng)前線程獲取讀鎖的次數(shù),因為讀鎖是支持重入的, //readHolds是一個ThreadLocal,用來保存當(dāng)前線程的HoldCounter HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } //readerShouldBlock()返回true cas失敗 或者已經(jīng)達(dá)到最大數(shù)量 return fullTryAcquireShared(current); }
fullTryAcquireShared()
tryAcquireShared()在cas失敗的話,讀鎖數(shù)量達(dá)到最大值或者readerShouldBlock()方法判斷需要等待的話就會進(jìn)入fullTryAcquireShared()方法,而fullTryAcquireShared()方法方法就會分別針對這三種情況進(jìn)行處理。
1.首先依舊會先判斷是否有線程獲取寫鎖,如果有,直接返回-1.
2.如果readerShouldBlock()方法返回true,如果返回true,表示應(yīng)該阻塞等待,就將當(dāng)前線程獲取讀鎖的數(shù)量置為0,并且返回-1(返回-1就會調(diào)用doAcquireShared()方法去阻塞線程)。
3.如果讀鎖的數(shù)量已經(jīng)達(dá)到最大值,那么就直接拋出異常
4.如果是因為cas失敗,那么再進(jìn)行cas一次,并且修改firstReader(第一個獲取讀鎖的線程),firstReaderHoldCount(第一個獲取讀鎖的線程的重入次數(shù)),cachedHoldCounter(當(dāng)前線程獲取讀鎖的重入次數(shù)的緩存)等變量的值。注意cas失敗不會跳出for循環(huán),所以這里是會自旋重試的。
final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { int c = getState(); if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; // else we hold the exclusive lock; blocking here // would cause deadlock. } else if (readerShouldBlock()) { // Make sure we're not acquiring read lock reentrantly if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
接下來我們需要看一下readerShouldBlock()方法,readerShouldBlock()方法如果返回true那么代表線程獲取讀鎖的時候需要阻塞,那我們就分析一下readerShouldBlock()方法,看什么時候獲取讀鎖需要阻塞線程。
readerShouldBlock()
readerShouldBlock()方法有兩個實現(xiàn),一個是公平鎖的實現(xiàn),一個是非公平鎖的實現(xiàn)。
公平鎖實現(xiàn):
判斷阻塞隊列中是否有節(jié)點,并且第一個節(jié)點不是當(dāng)前線程。那么作為公平鎖,這就代表需要等待。
final boolean readerShouldBlock() { return hasQueuedPredecessors(); } public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
非公平鎖實現(xiàn):
判斷阻塞隊列中的第一個節(jié)點是否不是共享節(jié)點,如果不是,那么就需要等待,否則就代表可以插隊,也就不需要阻塞。
final boolean readerShouldBlock() { return apparentlyFirstQueuedIsExclusive(); } final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null; }
那么接下來我們分析doAcquireShared()方法,看doAcquireShared()方法是如何阻塞線程的。
doAcquireShared()
1.調(diào)用addWaiter()方法將當(dāng)前線程封裝成Node并且加入到阻塞隊列中。
2.如果發(fā)現(xiàn)當(dāng)前節(jié)點的前節(jié)點是head節(jié)點(代表當(dāng)前節(jié)點是隊列中第一個節(jié)點,因為head節(jié)點始終是空的,所以head的next就是實際上的第一個節(jié)點),那么就再次調(diào)用tryAcquireShared()方法嘗試獲取鎖。如果調(diào)用tryAcquireShared()方法獲取鎖成功,那么就喚醒阻塞隊列中所有的共享節(jié)點。
3.將阻塞隊列中前一個節(jié)點的狀態(tài)修改為SIGNAL,并且調(diào)用LockSupport.park()方法阻塞當(dāng)前線程。
private void doAcquireShared(int arg) { //將線程封裝成狀態(tài)為SHARED的Node節(jié)點,并且加入到阻塞隊列中 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { //獲取當(dāng)前節(jié)點的前一個節(jié)點 final Node p = node.predecessor(); if (p == head) { //再次調(diào)用tryAcquireShared()方法嘗試獲取鎖 int r = tryAcquireShared(arg); if (r >= 0) { //獲取鎖成功 因為是共享鎖,那么需要喚醒所有的共享節(jié)點 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } //將前節(jié)點設(shè)置為SIGNAL狀態(tài) 并且調(diào)用LockSupport.park()阻塞當(dāng)前線程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) //如果失敗 那么就將節(jié)點狀態(tài)修改為 cancelAcquire(node); } }
接下來我們重點分析setHeadAndPropagate(),shouldParkAfterFailedAcquire(), parkAndCheckInterrupt()三個方法。
注:如果你看過我的另一篇文章# CountDownLatch源碼分析,那么相信你對這三個方法已經(jīng)很熟悉了,因為CountDownLatch也是通過AQS共享鎖來實現(xiàn)的。
setHeadAndPropagate()
1.重新設(shè)置頭節(jié)點,并且如果阻塞隊列中的下一個節(jié)點是共享節(jié)點,那么就需要調(diào)用doReleaseShared()方法嘗試去喚醒阻塞隊列中其他的共享節(jié)點。doReleaseShared()方法我們在分析unlock()的時候再詳細(xì)分析。
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
shouldParkAfterFailedAcquire()
shouldParkAfterFailedAcquire()方法的作用就是將前面一個節(jié)點的狀態(tài)修改為SIGNAL狀態(tài),并且將CANCELLED狀態(tài)的節(jié)點去除(waitStatus大于0,只能是CANCELLED狀態(tài))。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; //如果前節(jié)點的狀態(tài)是CANCELLED狀態(tài),那么嘗試去除阻塞隊列中的其他的CANCELLED狀態(tài)的節(jié)點(注意這里只會從后往前遍歷嗎,去除連續(xù)的CANCELLED狀態(tài)的節(jié)點) if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //cas修改前一個節(jié)點的狀態(tài)為SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
parkAndCheckInterrupt()
parkAndCheckInterrupt()方法會調(diào)用LockSupport.park()方法,是真正阻塞線程的地方。線程被喚醒后,會從這個地方繼續(xù)執(zhí)行代碼。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
至此readLock的lock()方法,我們就分析完了。接下來我們繼續(xù)分析readLock的unlock()方法。
unlock()
unlock()方法的邏輯都是在releaseShared()方法中完成的,所以我們具體看releaseShared()方法。
public void unlock() { sync.releaseShared(1); }
releaseShared()
releaseShared()方法先調(diào)用tryReleaseShared()方法嘗試是否鎖,如果返回true,那么調(diào)用doReleaseShared()方法釋放鎖。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
tryReleaseShared()
tryReleaseShared()方法的邏輯主要分為以下幾步。
1.首先判斷當(dāng)前線程是否是第一個獲取讀鎖的線程,如果是,那么就維護(hù)firstReader和firstReaderHoldCount變量,否則讀取緩存cachedHoldCounter中的值,如果緩存的不是當(dāng)前線程的值,那么就需要從readHolds中獲取到當(dāng)前線程的HoldCounter對象(保存了當(dāng)前線程的重入次數(shù)),將當(dāng)前線程的重入次數(shù)-1。
2.通過cas和自旋將獲取總的讀鎖的數(shù)量-1,減完之后當(dāng)前占有讀鎖的數(shù)量為0,那么就返回true。
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0; } }
tryReleaseShared()方法返回true之后,需要調(diào)用doReleaseShared()方法喚醒被阻塞的線程。
doReleaseShared()
doReleaseShared()方法需要喚醒阻塞隊列中所有的共享節(jié)點,通過自旋和unparkSuccessor()方法不斷嘗試喚醒阻塞隊列中的節(jié)點。
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; //如果頭結(jié)點的狀態(tài)是SIGNAL if (ws == Node.SIGNAL) { //cas修改節(jié)點的狀態(tài)為0 失敗的話繼續(xù)自旋 // 成功的話調(diào)用unparkSuccessor喚醒頭結(jié)點的下一個正常節(jié)點 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } //如果節(jié)點狀態(tài)為0 那么cas替換為PROPAGATE 失敗進(jìn)入下一次自旋 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
unparkSuccessor()
unparkSuccessor()方法的作用是喚醒頭節(jié)點后第一個不為null且狀態(tài)不為cancelled的節(jié)點。通過LockSupport.unpark()方法喚醒阻塞的線程。
private void unparkSuccessor(Node node) { //獲取頭結(jié)點的狀態(tài) 將頭結(jié)點狀態(tài)設(shè)置為0 代表現(xiàn)在正在有線程被喚醒 如果head狀態(tài)為0 就不會進(jìn)入這個方法了 int ws = node.waitStatus; if (ws < 0) //將頭結(jié)點狀態(tài)設(shè)置為0 compareAndSetWaitStatus(node, ws, 0); //喚醒頭結(jié)點的下一個狀態(tài)不是cancelled的節(jié)點 (因為頭結(jié)點是不存儲阻塞線程的) Node s = node.next; //當(dāng)前節(jié)點是null 或者是cancelled狀態(tài) if (s == null || s.waitStatus > 0) { s = null; //從aqs鏈表的尾部開始遍歷 找到離頭結(jié)點最近的 不為空的 狀態(tài)不是cancelled的節(jié)點 賦值給s //這里為什么從尾結(jié)點開始遍歷而不是頭結(jié)點 是因為添加結(jié)點的時候是先初始化結(jié)點的prev的, 從尾結(jié)點開始遍歷 不會出現(xiàn)prve沒有賦值的情況 //如果從頭結(jié)點進(jìn)行遍歷 next為null 并不能保證鏈表遍歷完了 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) //調(diào)用LockSupport.unpark()喚醒指定的線程 LockSupport.unpark(s.thread); }
至此readLock的unlock()方法也就分析完了。
小結(jié)
1.ReentrantReadWriteLock的readLock是支持重入的。
2.ReentrantReadWriteLock的readLock是通過AQS的共享鎖來實現(xiàn)的。
3.readLock中提供了firstReader,firstReaderHoldCount,cachedHoldCounter等變量來提供效率,當(dāng)前線程的讀鎖重入次數(shù)一般都是存放在readHolds中(readHolds是一個TheadLocal),只有一個獲取讀鎖的線程是通過firstReader,firstReaderHoldCount兩個變量來維護(hù)的,而cachedHoldCounter則是一個緩存,這樣通過這三個變量就可以減少從readHolds獲取值的次數(shù)。(因為大部分情況下并發(fā)不高或許只有一個線程獲取讀鎖)。
4.讀鎖與讀鎖之間不是互斥的,讀鎖和寫鎖是互斥的,但是如果獲取寫鎖的線程是當(dāng)前線程,那么當(dāng)前線程是可以獲取讀鎖的。
以上就是java線程安全鎖ReentrantReadWriteLock原理分析readLock的詳細(xì)內(nèi)容,更多關(guān)于java鎖ReentrantReadWriteLock readLock的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java 其中翻轉(zhuǎn)字符串的實現(xiàn)方法
這篇文章主要介紹了Java 其中翻轉(zhuǎn)字符串的實現(xiàn)方法,需要的朋友可以參考下2014-02-02運行Springboot測試類查詢數(shù)據(jù)庫數(shù)據(jù)顯示白網(wǎng)頁問題及解決方法
Spring Boot應(yīng)用未能啟動的原因是它沒有找到合適的數(shù)據(jù)庫配置具體來說,它需要一個數(shù)據(jù)源(DataSource),但未能在你的配置中找出,也沒有找到任何嵌入式數(shù)據(jù)庫(H2, HSQL 或 Derby),本文給大家分享運行Springboot測試類查詢數(shù)據(jù)庫數(shù)據(jù)顯示白網(wǎng)頁問題及解決方法,一起看看吧2023-11-11Spring Boot 整合 TKMybatis 二次簡化持久層代碼的實現(xiàn)
這篇文章主要介紹了Spring Boot 整合 TKMybatis 二次簡化持久層代碼的實現(xiàn),本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-01-01Java報錯Non-terminating?decimal?expansion解決分析
這篇文章主要為大家介紹了Java報錯Non-terminating?decimal?expansion解決方案及原理分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-09-09java web開發(fā)中大量數(shù)據(jù)導(dǎo)出Excel超時(504)問題解決
開發(fā)測試時候?qū)霐?shù)據(jù)遇到大數(shù)據(jù)導(dǎo)入的問題,整理了下,需要的朋友可以參考下2017-04-04Spring中ApplicationContextAware的使用方法詳解
ApplicationContextAware?通過它Spring容器會自動把上下文環(huán)境對象調(diào)用ApplicationContextAware接口中的setApplicationContext方法,這篇文章主要介紹了Spring中ApplicationContextAware的作用,需要的朋友可以參考下2023-03-03