java線程安全鎖ReentrantReadWriteLock原理分析readLock
前言
很多時候,我們?yōu)榱吮WC線程安全,會對一段代碼加鎖,但是加鎖就意味著程序效率的下降,所以,我們經(jīng)常會對鎖進行一些優(yōu)化,例如嚴格控制加鎖的粒度,利用cas來代替加鎖等。而今天我們介紹的讀寫鎖,也是對鎖的一種優(yōu)化方案的實現(xiàn)。試想一下,如果我們的線程大部分時候都是讀操作,那么讀操作與讀操作直接有必要互斥嗎?答案是沒有必要的,只有讀寫操作,寫寫操作才需要通過互斥來保證線程安全。今天我們通過ReentrantReadWriteLock來看看讀寫鎖是如何實現(xiàn)的。
ReentrantReadWriteLock的簡單使用
public void test1(){
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
new Thread(()->{
Lock readLock = readWriteLock.readLock();
readLock.lock();
try{
//讀操作
}catch (Exception e){
e.printStackTrace();
}finally {
readLock.unlock();
}
}).start();
new Thread(()->{
Lock writeLock = readWriteLock.writeLock();
writeLock.lock();
try{
//寫操作
}catch (Exception e){
e.printStackTrace();
}finally {
writeLock.unlock();
}
}).start();
}
readLock源碼分析
首先是lock()方法。
lock()
readLock是通過共享鎖來實現(xiàn)的。lock()方法會調(diào)用acquireShared()方法。所以我們直接分析acquireShared()方法。
public void lock() {
sync.acquireShared(1);
}
acquireShared()
acquireShared()主要的邏輯就在tryAcquireShared()方法和doAcquireShared()方法,首先調(diào)用tryAcquireShared()方法,如果返回的值小于0,那么就需要調(diào)用doAcquireShared()方法進行阻塞,否則就會直接去執(zhí)行業(yè)務代碼。接下來我們重點分析tryAcquireShared()方法和doAcquireShared()方法。
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
tryAcquireShared()
這個方法的邏輯其實比較簡單??梢苑譃槿齻€部分來看
1.首先通過state(state的高16位表示讀鎖的獲取次數(shù),低16位表示寫鎖的次數(shù))獲取到寫鎖的count,會判斷寫鎖的count是否大于0(大于0意味著有線程獲取了寫鎖)并且獲取寫鎖的線程不是當前線程,那么返回-1,需要執(zhí)行doAcquireShared()方法去阻塞當前線程。
2.通過cas修改讀鎖的count(保證線程安全)。如果當前線程是第一個獲取讀鎖的線程,那么修改firstReader(第一個獲取讀鎖的線程),firstReaderHoldCount(第一個獲取讀鎖的線程的重入次數(shù)),否則通過當前線程獲取讀鎖的次數(shù)構(gòu)建成一個HoldCounter對象,并且放入到readHolds中(readHolds是一個ThreadLocal),同時維護一個cachedHoldCounter的緩存(當前線程獲取讀鎖的重入次數(shù)的緩存)。
3.cas失敗,讀鎖的數(shù)量達到最大值或者readerShouldBlock()方法判斷需要等待的話,就調(diào)用fullTryAcquireShared()方法。
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
//獲取寫鎖的數(shù)量,如果有線程獲取了寫鎖,
//并且獲取寫鎖的線程不是當前線程 直接返回-1
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
//獲取讀鎖的數(shù)量
int r = sharedCount(c);
//readerShouldBlock()判斷是否需要等待,如果不需要等待并且當前獲取讀鎖的數(shù)量小于最大值的限制,cas也成功替換了讀鎖數(shù)量
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
//如果當前讀鎖數(shù)量為0 那么當前線程就是第一個獲取讀鎖的線程
if (r == 0) {
//將當前線程賦值給firstReader
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
//如果當前線程是第一個獲取讀鎖的線程,那么只需要將firstReaderHoldCount +1
//從這里我們就可以知道讀鎖是支持重入的
firstReaderHoldCount++;
} else {
//HoldCounter用來保存當前線程獲取讀鎖的次數(shù),因為讀鎖是支持重入的,
//readHolds是一個ThreadLocal,用來保存當前線程的HoldCounter
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
//readerShouldBlock()返回true cas失敗 或者已經(jīng)達到最大數(shù)量
return fullTryAcquireShared(current);
}
fullTryAcquireShared()
tryAcquireShared()在cas失敗的話,讀鎖數(shù)量達到最大值或者readerShouldBlock()方法判斷需要等待的話就會進入fullTryAcquireShared()方法,而fullTryAcquireShared()方法方法就會分別針對這三種情況進行處理。
1.首先依舊會先判斷是否有線程獲取寫鎖,如果有,直接返回-1.
2.如果readerShouldBlock()方法返回true,如果返回true,表示應該阻塞等待,就將當前線程獲取讀鎖的數(shù)量置為0,并且返回-1(返回-1就會調(diào)用doAcquireShared()方法去阻塞線程)。
3.如果讀鎖的數(shù)量已經(jīng)達到最大值,那么就直接拋出異常
4.如果是因為cas失敗,那么再進行cas一次,并且修改firstReader(第一個獲取讀鎖的線程),firstReaderHoldCount(第一個獲取讀鎖的線程的重入次數(shù)),cachedHoldCounter(當前線程獲取讀鎖的重入次數(shù)的緩存)等變量的值。注意cas失敗不會跳出for循環(huán),所以這里是會自旋重試的。
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
接下來我們需要看一下readerShouldBlock()方法,readerShouldBlock()方法如果返回true那么代表線程獲取讀鎖的時候需要阻塞,那我們就分析一下readerShouldBlock()方法,看什么時候獲取讀鎖需要阻塞線程。
readerShouldBlock()
readerShouldBlock()方法有兩個實現(xiàn),一個是公平鎖的實現(xiàn),一個是非公平鎖的實現(xiàn)。
公平鎖實現(xiàn):
判斷阻塞隊列中是否有節(jié)點,并且第一個節(jié)點不是當前線程。那么作為公平鎖,這就代表需要等待。
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
非公平鎖實現(xiàn):
判斷阻塞隊列中的第一個節(jié)點是否不是共享節(jié)點,如果不是,那么就需要等待,否則就代表可以插隊,也就不需要阻塞。
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
那么接下來我們分析doAcquireShared()方法,看doAcquireShared()方法是如何阻塞線程的。
doAcquireShared()
1.調(diào)用addWaiter()方法將當前線程封裝成Node并且加入到阻塞隊列中。
2.如果發(fā)現(xiàn)當前節(jié)點的前節(jié)點是head節(jié)點(代表當前節(jié)點是隊列中第一個節(jié)點,因為head節(jié)點始終是空的,所以head的next就是實際上的第一個節(jié)點),那么就再次調(diào)用tryAcquireShared()方法嘗試獲取鎖。如果調(diào)用tryAcquireShared()方法獲取鎖成功,那么就喚醒阻塞隊列中所有的共享節(jié)點。
3.將阻塞隊列中前一個節(jié)點的狀態(tài)修改為SIGNAL,并且調(diào)用LockSupport.park()方法阻塞當前線程。
private void doAcquireShared(int arg) {
//將線程封裝成狀態(tài)為SHARED的Node節(jié)點,并且加入到阻塞隊列中
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//獲取當前節(jié)點的前一個節(jié)點
final Node p = node.predecessor();
if (p == head) {
//再次調(diào)用tryAcquireShared()方法嘗試獲取鎖
int r = tryAcquireShared(arg);
if (r >= 0) {
//獲取鎖成功 因為是共享鎖,那么需要喚醒所有的共享節(jié)點
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//將前節(jié)點設(shè)置為SIGNAL狀態(tài) 并且調(diào)用LockSupport.park()阻塞當前線程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
//如果失敗 那么就將節(jié)點狀態(tài)修改為
cancelAcquire(node);
}
}
接下來我們重點分析setHeadAndPropagate(),shouldParkAfterFailedAcquire(), parkAndCheckInterrupt()三個方法。
注:如果你看過我的另一篇文章# CountDownLatch源碼分析,那么相信你對這三個方法已經(jīng)很熟悉了,因為CountDownLatch也是通過AQS共享鎖來實現(xiàn)的。
setHeadAndPropagate()
1.重新設(shè)置頭節(jié)點,并且如果阻塞隊列中的下一個節(jié)點是共享節(jié)點,那么就需要調(diào)用doReleaseShared()方法嘗試去喚醒阻塞隊列中其他的共享節(jié)點。doReleaseShared()方法我們在分析unlock()的時候再詳細分析。
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
shouldParkAfterFailedAcquire()
shouldParkAfterFailedAcquire()方法的作用就是將前面一個節(jié)點的狀態(tài)修改為SIGNAL狀態(tài),并且將CANCELLED狀態(tài)的節(jié)點去除(waitStatus大于0,只能是CANCELLED狀態(tài))。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
//如果前節(jié)點的狀態(tài)是CANCELLED狀態(tài),那么嘗試去除阻塞隊列中的其他的CANCELLED狀態(tài)的節(jié)點(注意這里只會從后往前遍歷嗎,去除連續(xù)的CANCELLED狀態(tài)的節(jié)點)
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//cas修改前一個節(jié)點的狀態(tài)為SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt()
parkAndCheckInterrupt()方法會調(diào)用LockSupport.park()方法,是真正阻塞線程的地方。線程被喚醒后,會從這個地方繼續(xù)執(zhí)行代碼。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
至此readLock的lock()方法,我們就分析完了。接下來我們繼續(xù)分析readLock的unlock()方法。
unlock()
unlock()方法的邏輯都是在releaseShared()方法中完成的,所以我們具體看releaseShared()方法。
public void unlock() {
sync.releaseShared(1);
}
releaseShared()
releaseShared()方法先調(diào)用tryReleaseShared()方法嘗試是否鎖,如果返回true,那么調(diào)用doReleaseShared()方法釋放鎖。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared()
tryReleaseShared()方法的邏輯主要分為以下幾步。
1.首先判斷當前線程是否是第一個獲取讀鎖的線程,如果是,那么就維護firstReader和firstReaderHoldCount變量,否則讀取緩存cachedHoldCounter中的值,如果緩存的不是當前線程的值,那么就需要從readHolds中獲取到當前線程的HoldCounter對象(保存了當前線程的重入次數(shù)),將當前線程的重入次數(shù)-1。
2.通過cas和自旋將獲取總的讀鎖的數(shù)量-1,減完之后當前占有讀鎖的數(shù)量為0,那么就返回true。
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
tryReleaseShared()方法返回true之后,需要調(diào)用doReleaseShared()方法喚醒被阻塞的線程。
doReleaseShared()
doReleaseShared()方法需要喚醒阻塞隊列中所有的共享節(jié)點,通過自旋和unparkSuccessor()方法不斷嘗試喚醒阻塞隊列中的節(jié)點。
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//如果頭結(jié)點的狀態(tài)是SIGNAL
if (ws == Node.SIGNAL) {
//cas修改節(jié)點的狀態(tài)為0 失敗的話繼續(xù)自旋
// 成功的話調(diào)用unparkSuccessor喚醒頭結(jié)點的下一個正常節(jié)點
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
//如果節(jié)點狀態(tài)為0 那么cas替換為PROPAGATE 失敗進入下一次自旋
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
unparkSuccessor()
unparkSuccessor()方法的作用是喚醒頭節(jié)點后第一個不為null且狀態(tài)不為cancelled的節(jié)點。通過LockSupport.unpark()方法喚醒阻塞的線程。
private void unparkSuccessor(Node node) {
//獲取頭結(jié)點的狀態(tài) 將頭結(jié)點狀態(tài)設(shè)置為0 代表現(xiàn)在正在有線程被喚醒 如果head狀態(tài)為0 就不會進入這個方法了
int ws = node.waitStatus;
if (ws < 0)
//將頭結(jié)點狀態(tài)設(shè)置為0
compareAndSetWaitStatus(node, ws, 0);
//喚醒頭結(jié)點的下一個狀態(tài)不是cancelled的節(jié)點 (因為頭結(jié)點是不存儲阻塞線程的)
Node s = node.next;
//當前節(jié)點是null 或者是cancelled狀態(tài)
if (s == null || s.waitStatus > 0) {
s = null;
//從aqs鏈表的尾部開始遍歷 找到離頭結(jié)點最近的 不為空的 狀態(tài)不是cancelled的節(jié)點 賦值給s
//這里為什么從尾結(jié)點開始遍歷而不是頭結(jié)點 是因為添加結(jié)點的時候是先初始化結(jié)點的prev的, 從尾結(jié)點開始遍歷 不會出現(xiàn)prve沒有賦值的情況
//如果從頭結(jié)點進行遍歷 next為null 并不能保證鏈表遍歷完了
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//調(diào)用LockSupport.unpark()喚醒指定的線程
LockSupport.unpark(s.thread);
}
至此readLock的unlock()方法也就分析完了。
小結(jié)
1.ReentrantReadWriteLock的readLock是支持重入的。
2.ReentrantReadWriteLock的readLock是通過AQS的共享鎖來實現(xiàn)的。
3.readLock中提供了firstReader,firstReaderHoldCount,cachedHoldCounter等變量來提供效率,當前線程的讀鎖重入次數(shù)一般都是存放在readHolds中(readHolds是一個TheadLocal),只有一個獲取讀鎖的線程是通過firstReader,firstReaderHoldCount兩個變量來維護的,而cachedHoldCounter則是一個緩存,這樣通過這三個變量就可以減少從readHolds獲取值的次數(shù)。(因為大部分情況下并發(fā)不高或許只有一個線程獲取讀鎖)。
4.讀鎖與讀鎖之間不是互斥的,讀鎖和寫鎖是互斥的,但是如果獲取寫鎖的線程是當前線程,那么當前線程是可以獲取讀鎖的。
以上就是java線程安全鎖ReentrantReadWriteLock原理分析readLock的詳細內(nèi)容,更多關(guān)于java鎖ReentrantReadWriteLock readLock的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java 其中翻轉(zhuǎn)字符串的實現(xiàn)方法
這篇文章主要介紹了Java 其中翻轉(zhuǎn)字符串的實現(xiàn)方法,需要的朋友可以參考下2014-02-02
運行Springboot測試類查詢數(shù)據(jù)庫數(shù)據(jù)顯示白網(wǎng)頁問題及解決方法
Spring Boot應用未能啟動的原因是它沒有找到合適的數(shù)據(jù)庫配置具體來說,它需要一個數(shù)據(jù)源(DataSource),但未能在你的配置中找出,也沒有找到任何嵌入式數(shù)據(jù)庫(H2, HSQL 或 Derby),本文給大家分享運行Springboot測試類查詢數(shù)據(jù)庫數(shù)據(jù)顯示白網(wǎng)頁問題及解決方法,一起看看吧2023-11-11
Spring Boot 整合 TKMybatis 二次簡化持久層代碼的實現(xiàn)
這篇文章主要介紹了Spring Boot 整合 TKMybatis 二次簡化持久層代碼的實現(xiàn),本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-01-01
Java報錯Non-terminating?decimal?expansion解決分析
這篇文章主要為大家介紹了Java報錯Non-terminating?decimal?expansion解決方案及原理分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-09-09
java web開發(fā)中大量數(shù)據(jù)導出Excel超時(504)問題解決
開發(fā)測試時候?qū)霐?shù)據(jù)遇到大數(shù)據(jù)導入的問題,整理了下,需要的朋友可以參考下2017-04-04
Spring中ApplicationContextAware的使用方法詳解
ApplicationContextAware?通過它Spring容器會自動把上下文環(huán)境對象調(diào)用ApplicationContextAware接口中的setApplicationContext方法,這篇文章主要介紹了Spring中ApplicationContextAware的作用,需要的朋友可以參考下2023-03-03

