Java并發(fā)讀寫鎖ReentrantReadWriteLock 使用場景
ReentrantReadWriteLock使用場景
ReentrantReadWriteLock 是 Java 的一種讀寫鎖,它允許多個讀線程同時訪問,但只允許一個寫線程訪問(會阻塞所有的讀寫線程)。這種鎖的設(shè)計可以提高性能,特別是在讀操作的數(shù)量遠(yuǎn)遠(yuǎn)超過寫操作的情況下。
在并發(fā)場景中,為了解決線程安全問題,我們通常會使用關(guān)鍵字 synchronized 或者 JUC 包中實現(xiàn)了 Lock 接口的 ReentrantLock。但它們都是獨(dú)占式獲取鎖,也就是在同一時刻只有一個線程能夠獲取鎖。
而在一些業(yè)務(wù)場景中,大部分只是讀數(shù)據(jù),寫數(shù)據(jù)很少,如果僅僅是讀數(shù)據(jù)的話并不會影響數(shù)據(jù)正確性,而如果在這種業(yè)務(wù)場景下,依然使用獨(dú)占鎖的話,很顯然會出現(xiàn)性能瓶頸。針對這種讀多寫少的情況,Java 提供了另外一個實現(xiàn) Lock 接口的 ReentrantReadWriteLock——讀寫鎖。
ReentrantReadWriteLock其實就是 讀讀并發(fā)、讀寫互斥、寫寫互斥。如果一個對象并發(fā)讀的場景大于并發(fā)寫的場景,那就可以使用 ReentrantReadWriteLock來達(dá)到保證線程安全的前提下提高并發(fā)效率。首先,我們先了解一下Doug Lea為我們準(zhǔn)備的兩個demo。
CachedData
一個緩存對象的使用案例,緩存對象在使用時,一般并發(fā)讀的場景遠(yuǎn)遠(yuǎn)大于并發(fā)寫的場景,所以緩存對象是非常適合使用ReentrantReadWriteLock來做控制的
class CachedData {
//被緩存的具體對象
Object data;
//當(dāng)前對象是否可用,使用volatile來保證可見性
volatile boolean cacheValid;
//ReentrantReadWriteLock
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
//業(yè)務(wù)處理邏輯
void processCachedData() {
//要讀取數(shù)據(jù)時,先加讀鎖,如果加成功,說明此時沒有人在并發(fā)寫
rwl.readLock().lock();
//拿到讀鎖后,判斷當(dāng)前對象是否有效
if (!cacheValid) {
// Must release read lock before acquiring write lock
//這里的處理非常經(jīng)典,當(dāng)你持有讀鎖之后,不能直接獲取寫鎖,
//因為寫鎖是獨(dú)占鎖,如果直接獲取寫鎖,那代碼就在這里死鎖了
//所以必須要先釋放讀鎖,然后手動獲取寫鎖
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// Recheck state because another thread might have
// acquired write lock and changed state before we did.
//經(jīng)典處理之二,在獨(dú)占鎖內(nèi)部要處理數(shù)據(jù)時,一定要做二次校驗
//因為可能同時有多個線程全都在獲取寫鎖,
//當(dāng)時線程1釋放寫鎖之后,線程2馬上獲取到寫鎖,此時如果不做二次校驗?zāi)强赡芫蛯?dǎo)致某些操作做了多次
if (!cacheValid) {
data = ...
//當(dāng)緩存對象更新成功后,重置標(biāo)記為true
cacheValid = true;
}
// Downgrade by acquiring read lock before releasing write lock
//這里有一個非常神奇的鎖降級操作,所謂降級是說當(dāng)你持有寫鎖后,可以再次獲取讀鎖
//這里之所以要獲取一次寫鎖是為了防止當(dāng)前線程釋放寫鎖之后,其他線程馬上獲取到寫鎖,改變緩存對象
//因為讀寫互斥,所以有了這個讀鎖之后,在讀鎖釋放之前,別的線程是無法修改緩存對象的
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock(); // Unlock write, still hold read
}
}
try {
use(data);
} finally {
rwl.readLock().unlock();
}
}
}RWDictionary
Doug Lea給出的第二個demo,一個并發(fā)容器的demo。并發(fā)容器我們一般都是直接使用ConcurrentHashMap的,但是我們可以使用非并發(fā)安全的容器+ReentrantReadWriteLock來組合出一個并發(fā)容器。如果這個并發(fā)容器的讀的頻率>寫的頻率,那這個效率還是不錯的
class RWDictionary {
//原來非并發(fā)安全的容器
private final Map<String, Data> m = new TreeMap<String, Data>();
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock r = rwl.readLock();
private final Lock w = rwl.writeLock();
public Data get(String key) {
//讀數(shù)據(jù),上讀鎖
r.lock();
try { return m.get(key); }
finally { r.unlock(); }
}
public String[] allKeys() {
//讀數(shù)據(jù),上讀鎖
r.lock();
try { return m.keySet().toArray(); }
finally { r.unlock(); }
}
public Data put(String key, Data value) {
//寫數(shù)據(jù),上寫鎖
w.lock();
try { return m.put(key, value); }
finally { w.unlock(); }
}
public void clear() {
//寫數(shù)據(jù),上寫鎖
w.lock();
try { m.clear(); }
finally { w.unlock(); }
}
}ReentrantReadWriteLock的特性
讀寫鎖允許同一時刻被多個讀線程訪問,但是在寫線程訪問時,所有的讀線程和其他的寫線程都會被阻塞。
在分析 WirteLock 和 ReadLock 的互斥性時,我們可以按照 WriteLock 與 WriteLock,WriteLock 與 ReadLock 以及 ReadLock 與 ReadLock 進(jìn)行對比分析。
這里總結(jié)一下讀寫鎖的特性:
- 公平性選擇:支持非公平性(默認(rèn))和公平的鎖獲取方式,非公平的吞吐量優(yōu)于公平;
- 重入性:支持重入,讀鎖獲取后能再次獲取,寫鎖獲取之后能夠再次獲取寫鎖,同時也能夠獲取讀鎖;
- 鎖降級:寫鎖降級是一種允許寫鎖轉(zhuǎn)換為讀鎖的過程。通常的順序是:
- 獲取寫鎖:線程首先獲取寫鎖,確保在修改數(shù)據(jù)時排它訪問。
- 獲取讀鎖:在寫鎖保持的同時,線程可以再次獲取讀鎖。
- 釋放寫鎖:線程保持讀鎖的同時釋放寫鎖。
- 釋放讀鎖:最后線程釋放讀鎖。
這樣,寫鎖就降級為讀鎖,允許其他線程進(jìn)行并發(fā)讀取,但仍然排除其他線程的寫操作。
接下來額外說一下鎖降級
- 鎖降級
鎖降級指的是寫鎖降級成為讀鎖。如果當(dāng)前線程擁有寫鎖,然后將其釋放,最后再獲取讀鎖,這種分段完成的過程不能稱之為鎖降級。鎖降級是指把持住(當(dāng)前擁有的)寫鎖,再獲取到讀鎖,隨后釋放(先前擁有的)寫鎖的過程。
接下來看一個鎖降級的示例。因為數(shù)據(jù)不常變化,所以多個線程可以并發(fā)地進(jìn)行數(shù)據(jù)處理,當(dāng)數(shù)據(jù)變更后,如果當(dāng)前線程感知到數(shù)據(jù)變化,則進(jìn)行數(shù)據(jù)的準(zhǔn)備工作,同時其他處理線程被阻塞,直到當(dāng)前線程完成數(shù)據(jù)的準(zhǔn)備工作,如代碼如下所示:
public void processData() {
readLock.lock();
if (!update) {
// 必須先釋放讀鎖
readLock.unlock();
// 鎖降級從寫鎖獲取到開始
writeLock.lock();
try {
if (!update) {
// 準(zhǔn)備數(shù)據(jù)的流程(略)
update = true;
}
readLock.lock();
} finally {
writeLock.unlock();
}
// 鎖降級完成,寫鎖降級為讀鎖
}
try {
// 使用數(shù)據(jù)的流程(略)
} finally {
readLock.unlock();
}
}上述示例中,當(dāng)數(shù)據(jù)發(fā)生變更后,update變量(布爾類型且volatile修飾)被設(shè)置為false,此時所有訪問processData()方法的線程都能夠感知到變化,但只有一個線程能夠獲取到寫鎖,其他線程會被阻塞在讀鎖和寫鎖的lock()方法上。當(dāng)前線程獲取寫鎖完成數(shù)據(jù)準(zhǔn)備之后,再獲取讀鎖,隨后釋放寫鎖,完成鎖降級。
鎖降級中讀鎖的獲取是否必要呢? 答案是必要的。主要是為了保證數(shù)據(jù)的可見性,如果當(dāng)前線程不獲取讀鎖而是直接釋放寫鎖,假設(shè)此刻另一個線程(記作線程T)獲取了寫鎖并修改了數(shù)據(jù),那么當(dāng)前線程無法感知線程T的數(shù)據(jù)更新。如果當(dāng)前線程獲取讀鎖,即遵循鎖降級的步驟,則線程T將會被阻塞,直到當(dāng)前線程使用數(shù)據(jù)并釋放讀鎖之后,線程T才能獲取寫鎖進(jìn)行數(shù)據(jù)更新。
RentrantReadWriteLock不支持鎖升級(把持讀鎖、獲取寫鎖,最后釋放讀鎖的過程)。目的也是保證數(shù)據(jù)可見性,如果讀鎖已被多個線程獲取,其中任意線程成功獲取了寫鎖并更新了數(shù)據(jù),則其更新對其他獲取到讀鎖的線程是不可見的。
ReentrantReadWriteLock源碼分析
類的繼承關(guān)系
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {}說明: 可以看到,ReentrantReadWriteLock實現(xiàn)了ReadWriteLock接口,ReadWriteLock接口定義了獲取讀鎖和寫鎖的規(guī)范,具體需要實現(xiàn)類去實現(xiàn);同時其還實現(xiàn)了Serializable接口,表示可以進(jìn)行序列化,在源代碼中可以看到ReentrantReadWriteLock實現(xiàn)了自己的序列化邏輯。
類的內(nèi)部類
ReentrantReadWriteLock有五個內(nèi)部類,五個內(nèi)部類之間也是相互關(guān)聯(lián)的。內(nèi)部類的關(guān)系如下圖所示。

說明: 如上圖所示,Sync繼承自AQS、NonfairSync繼承自Sync類、FairSync繼承自Sync類;ReadLock實現(xiàn)了Lock接口、WriteLock也實現(xiàn)了Lock接口。
內(nèi)部類 -類Sync
- Sync類的繼承關(guān)系
abstract static class Sync extends AbstractQueuedSynchronizer {}說明: Sync抽象類繼承自AQS抽象類,Sync類提供了對ReentrantReadWriteLock的支持。
- Sync類的內(nèi)部類
Sync類內(nèi)部存在兩個內(nèi)部類,分別為HoldCounter和ThreadLocalHoldCounter,其中HoldCounter主要與讀鎖配套使用,其中,HoldCounter源碼如下。
// 計數(shù)器
static final class HoldCounter {
// 計數(shù)
int count = 0;
// Use id, not reference, to avoid garbage retention
// 獲取當(dāng)前線程的TID屬性的值
final long tid = getThreadId(Thread.currentThread());
}說明: HoldCounter主要有兩個屬性,count和tid,其中count表示某個讀線程重入的次數(shù),tid表示該線程的tid字段的值,該字段可以用來唯一標(biāo)識一個線程。ThreadLocalHoldCounter的源碼如下
// 本地線程計數(shù)器
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
// 重寫初始化方法,在沒有進(jìn)行set的情況下,獲取的都是該HoldCounter值
public HoldCounter initialValue() {
return new HoldCounter();
}
}說明: ThreadLocalHoldCounter重寫了ThreadLocal的initialValue方法,ThreadLocal類可以將線程與對象相關(guān)聯(lián)。在沒有進(jìn)行set的情況下,get到的均是initialValue方法里面生成的那個HolderCounter對象。
- Sync類的屬性
abstract static class Sync extends AbstractQueuedSynchronizer {
// 版本序列號
private static final long serialVersionUID = 6317671515068378041L;
// 高16位為讀鎖,低16位為寫鎖
static final int SHARED_SHIFT = 16;
// 讀鎖單位
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
// 讀鎖最大數(shù)量
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
// 寫鎖最大數(shù)量
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 本地線程計數(shù)器
private transient ThreadLocalHoldCounter readHolds;
// 緩存的計數(shù)器
private transient HoldCounter cachedHoldCounter;
// 第一個讀線程
private transient Thread firstReader = null;
// 第一個讀線程的計數(shù)
private transient int firstReaderHoldCount;
}說明: 該屬性中包括了讀鎖、寫鎖線程的最大量。本地線程計數(shù)器等。
- Sync類的構(gòu)造函數(shù)
// 構(gòu)造函數(shù)
Sync() {
// 本地線程計數(shù)器
readHolds = new ThreadLocalHoldCounter();
// 設(shè)置AQS的狀態(tài)
setState(getState()); // ensures visibility of readHolds
}說明:在Sync的構(gòu)造函數(shù)中設(shè)置了本地線程計數(shù)器和AQS的狀態(tài)state。
類的屬性
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
// 版本序列號
private static final long serialVersionUID = -6992448646407690164L;
// 讀鎖
private final ReentrantReadWriteLock.ReadLock readerLock;
// 寫鎖
private final ReentrantReadWriteLock.WriteLock writerLock;
// 同步隊列
final Sync sync;
private static final sun.misc.Unsafe UNSAFE;
// 線程ID的偏移地址
private static final long TID_OFFSET;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> tk = Thread.class;
// 獲取線程的tid字段的內(nèi)存地址
TID_OFFSET = UNSAFE.objectFieldOffset
(tk.getDeclaredField("tid"));
} catch (Exception e) {
throw new Error(e);
}
}
}說明: 可以看到ReentrantReadWriteLock屬性包括了一個ReentrantReadWriteLock.ReadLock對象,表示讀鎖;一個ReentrantReadWriteLock.WriteLock對象,表示寫鎖;一個Sync對象,表示同步隊列。
類的構(gòu)造函數(shù)
- ReentrantReadWriteLock()型構(gòu)造函數(shù)
public ReentrantReadWriteLock() {
this(false);
}說明: 此構(gòu)造函數(shù)會調(diào)用另外一個有參構(gòu)造函數(shù)。
- ReentrantReadWriteLock(boolean)型構(gòu)造函數(shù)
public ReentrantReadWriteLock(boolean fair) {
// 公平策略或者是非公平策略
sync = fair ? new FairSync() : new NonfairSync();
// 讀鎖
readerLock = new ReadLock(this);
// 寫鎖
writerLock = new WriteLock(this);
}說明: 可以指定設(shè)置公平策略或者非公平策略,并且該構(gòu)造函數(shù)中生成了讀鎖與寫鎖兩個對象。
內(nèi)部類 - Sync核心函數(shù)分析
對ReentrantReadWriteLock對象的操作絕大多數(shù)都轉(zhuǎn)發(fā)至Sync對象進(jìn)行處理。下面對Sync類中的重點函數(shù)進(jìn)行分析
- sharedCount函數(shù)
表示占有讀鎖的線程數(shù)量,源碼如下
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }說明::直接將state右移16位,就可以得到讀鎖的線程數(shù)量,因為state的高16位表示讀鎖,對應(yīng)的低十六位表示寫鎖數(shù)量。
- exclusiveCount函數(shù)
表示占有寫鎖的線程數(shù)量,源碼如下
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }說明:
EXCLUSIVE_MASK為:
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
EXCLUSIVE_MASK 為 1 左移 16 位然后減 1,即為 0x0000FFFF。而 exclusiveCount 方法是將同步狀態(tài)(state 為 int 類型)與 0x0000FFFF 相與,即取同步狀態(tài)的低 16 位。
那么低 16 位代表什么呢?根據(jù) exclusiveCount 方法的注釋為獨(dú)占式獲取的次數(shù)即寫鎖被獲取的次數(shù),現(xiàn)在就可以得出來一個結(jié)論同步狀態(tài)的低 16 位用來表示寫鎖的獲取次數(shù)。
寫鎖的獲取
同一時刻,ReentrantReadWriteLock 的寫鎖是不能被多個線程獲取的,很顯然 ReentrantReadWriteLock 的寫鎖是獨(dú)占式鎖,而實現(xiàn)寫鎖的同步語義是通過重寫 AQS 中的 tryAcquire 方法實現(xiàn)的。
- tryAcquire函數(shù)
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
// 獲取當(dāng)前線程
Thread current = Thread.currentThread();
// 獲取狀態(tài)
int c = getState();
// 寫線程數(shù)量
int w = exclusiveCount(c);
if (c != 0) { // 狀態(tài)不為0
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread()) // 寫線程數(shù)量為0或者當(dāng)前線程沒有占有獨(dú)占資源
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT) // 判斷是否超過最高寫線程數(shù)量
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
// 設(shè)置AQS狀態(tài)
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires)) // 寫線程是否應(yīng)該被阻塞
return false;
// 設(shè)置獨(dú)占線程
setExclusiveOwnerThread(current);
return true;
}說明: 此函數(shù)用于獲取寫鎖:首先會獲取state,判斷是否為0;
1. 若為0,表示此時沒有讀鎖線程,再判斷寫線程是否應(yīng)該被阻塞,而在非公平策略下總是不會被阻塞,在公平策略下會進(jìn)行判斷(判斷同步隊列中是否有等待時間更長的線程;若存在,則需要被阻塞,否則,無需阻塞),之后在設(shè)置狀態(tài)state,然后返回true。
2. 若state不為0,則表示此時存在讀鎖或?qū)戞i線程,若寫鎖線程數(shù)量為0或者當(dāng)前線程為獨(dú)占鎖線程,則返回false,表示不成功,否則,判斷寫鎖線程的重入次數(shù)是否大于了最大值,若是,則拋出異常,否則,設(shè)置狀態(tài)state,返回true,表示成功。其函數(shù)流程圖如下

其主要邏輯為:當(dāng)讀鎖已經(jīng)被讀線程獲取或者寫鎖已經(jīng)被其他寫線程獲取,則寫鎖獲取失?。环駝t,獲取成功并支持重入,增加寫狀態(tài)。
寫鎖的釋放
寫鎖釋放通過重寫 AQS 的 tryRelease 方法,源碼為:
- tryRelease函數(shù)
/*
* Note that tryRelease and tryAcquire can be called by
* Conditions. So it is possible that their arguments contain
* both read and write holds that are all released during a
* condition wait and re-established in tryAcquire.
*/
protected final boolean tryRelease(int releases) {
// 判斷是否偽獨(dú)占線程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 計算釋放資源后的寫鎖的數(shù)量
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0; // 是否釋放成功
if (free)
setExclusiveOwnerThread(null); // 設(shè)置獨(dú)占線程為空
setState(nextc); // 設(shè)置狀態(tài)
return free;
}說明: 此函數(shù)用于釋放寫鎖資源,首先會判斷該線程是否為獨(dú)占線程,若不為獨(dú)占線程,則拋出異常,否則,計算釋放資源后的寫鎖的數(shù)量,若為0,表示成功釋放,資源不將被占用,否則,表示資源還被占用。其函數(shù)流程圖如下。

讀鎖的獲取
看完了寫鎖,再來看看讀鎖,讀鎖不是獨(dú)占式鎖,即同一時刻該鎖可以被多個讀線程獲取,也就是一種共享式鎖。按照之前對 AQS 的介紹,實現(xiàn)共享式同步組件的同步語義需要通過重寫 AQS 的 tryAcquireShared 方法和 tryReleaseShared 方法。讀鎖的獲取實現(xiàn)方法為:
- tryAcquireShared函數(shù)
private IllegalMonitorStateException unmatchedUnlockException() {
return new IllegalMonitorStateException(
"attempt to unlock read lock, not locked by current thread");
}
// 共享模式下獲取資源
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
// 獲取當(dāng)前線程
Thread current = Thread.currentThread();
// 獲取狀態(tài)
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current) // 寫線程數(shù)不為0并且占有資源的不是當(dāng)前線程
return -1;
// 讀鎖數(shù)量
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) { // 讀線程是否應(yīng)該被阻塞、并且小于最大值、并且比較設(shè)置成功
if (r == 0) { // 讀鎖數(shù)量為0
// 設(shè)置第一個讀線程
firstReader = current;
// 讀線程占用的資源數(shù)為1
firstReaderHoldCount = 1;
} else if (firstReader == current) { // 當(dāng)前線程為第一個讀線程
// 占用資源數(shù)加1
firstReaderHoldCount++;
} else { // 讀鎖數(shù)量不為0并且不為當(dāng)前線程
// 獲取計數(shù)器
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) // 計數(shù)器為空或者計數(shù)器的tid不為當(dāng)前正在運(yùn)行的線程的tid
// 獲取當(dāng)前線程對應(yīng)的計數(shù)器
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0) // 計數(shù)為0
// 設(shè)置
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}說明: 此函數(shù)表示讀鎖線程獲取讀鎖。首先判斷寫鎖是否為0并且當(dāng)前線程不占有獨(dú)占鎖,直接返回;否則,判斷讀線程是否需要被阻塞并且讀鎖數(shù)量是否小于最大值并且比較設(shè)置狀態(tài)成功,若當(dāng)前沒有讀鎖,則設(shè)置第一個讀線程firstReader和firstReaderHoldCount;若當(dāng)前線程線程為第一個讀線程,則增加firstReaderHoldCount;否則,將設(shè)置當(dāng)前線程對應(yīng)的HoldCounter對象的值。流程圖如下。

當(dāng)寫鎖被其他線程獲取后,讀鎖獲取失敗,否則獲取成功,會利用 CAS 更新同步狀態(tài)。
另外,當(dāng)前同步狀態(tài)需要加上 SHARED_UNIT((1 << SHARED_SHIFT),即 0x00010000)的原因,我們在上面也說過了,同步狀態(tài)的高 16 位用來表示讀鎖被獲取的次數(shù)。
如果 CAS 失敗或者已經(jīng)獲取讀鎖的線程再次獲取讀鎖時,是靠 fullTryAcquireShared 方法實現(xiàn)的。
- fullTryAcquireShared函數(shù)
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
for (;;) { // 無限循環(huán)
// 獲取狀態(tài)
int c = getState();
if (exclusiveCount(c) != 0) { // 寫線程數(shù)量不為0
if (getExclusiveOwnerThread() != current) // 不為當(dāng)前線程
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) { // 寫線程數(shù)量為0并且讀線程被阻塞
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) { // 當(dāng)前線程為第一個讀線程
// assert firstReaderHoldCount > 0;
} else { // 當(dāng)前線程不為第一個讀線程
if (rh == null) { // 計數(shù)器不為空
//
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) { // 計數(shù)器為空或者計數(shù)器的tid不為當(dāng)前正在運(yùn)行的線程的tid
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT) // 讀鎖數(shù)量為最大值,拋出異常
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) { // 比較并且設(shè)置成功
if (sharedCount(c) == 0) { // 讀線程數(shù)量為0
// 設(shè)置第一個讀線程
firstReader = current;
//
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}說明: 在tryAcquireShared函數(shù)中,如果下列三個條件不滿足(讀線程是否應(yīng)該被阻塞、小于最大值、比較設(shè)置成功)則會進(jìn)行fullTryAcquireShared函數(shù)中,它用來保證相關(guān)操作可以成功。其邏輯與tryAcquireShared邏輯類似,可以繼續(xù)再往后看。
讀鎖的釋放
讀鎖釋放的實現(xiàn)主要通過方法 tryReleaseShared,源碼如下,主要邏輯請看注釋:
- tryReleaseShared函數(shù)
protected final boolean tryReleaseShared(int unused) {
// 獲取當(dāng)前線程
Thread current = Thread.currentThread();
if (firstReader == current) { // 當(dāng)前線程為第一個讀線程
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1) // 讀線程占用的資源數(shù)為1
firstReader = null;
else // 減少占用的資源
firstReaderHoldCount--;
} else { // 當(dāng)前線程不為第一個讀線程
// 獲取緩存的計數(shù)器
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) // 計數(shù)器為空或者計數(shù)器的tid不為當(dāng)前正在運(yùn)行的線程的tid
// 獲取當(dāng)前線程對應(yīng)的計數(shù)器
rh = readHolds.get();
// 獲取計數(shù)
int count = rh.count;
if (count <= 1) { // 計數(shù)小于等于1
// 移除
readHolds.remove();
if (count <= 0) // 計數(shù)小于等于0,拋出異常
throw unmatchedUnlockException();
}
// 減少計數(shù)
--rh.count;
}
for (;;) { // 無限循環(huán)
// 獲取狀態(tài)
int c = getState();
// 獲取狀態(tài)
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc)) // 比較并進(jìn)行設(shè)置
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}說明: 此函數(shù)表示讀鎖線程釋放鎖。首先判斷當(dāng)前線程是否為第一個讀線程firstReader,若是,則判斷第一個讀線程占有的資源數(shù)firstReaderHoldCount是否為1,若是,則設(shè)置第一個讀線程firstReader為空,否則,將第一個讀線程占有的資源數(shù)firstReaderHoldCount減1;若當(dāng)前線程不是第一個讀線程,那么首先會獲取緩存計數(shù)器(上一個讀鎖線程對應(yīng)的計數(shù)器 ),若計數(shù)器為空或者tid不等于當(dāng)前線程的tid值,則獲取當(dāng)前線程的計數(shù)器,如果計數(shù)器的計數(shù)count小于等于1,則移除當(dāng)前線程對應(yīng)的計數(shù)器,如果計數(shù)器的計數(shù)count小于等于0,則拋出異常,之后再減少計數(shù)即可。無論何種情況,都會進(jìn)入無限循環(huán),該循環(huán)可以確保成功設(shè)置狀態(tài)state。其流程圖如下

鎖降級
讀寫鎖支持鎖降級,遵循按照獲取寫鎖,獲取讀鎖再釋放寫鎖的次序,寫鎖能夠降級成為讀鎖,不支持鎖升級,關(guān)于鎖降級,下面的示例代碼摘自 ReentrantWriteReadLock 源碼:
void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
// Must release read lock before acquiring write lock
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// Recheck state because another thread might have
// acquired write lock and changed state before we did.
if (!cacheValid) {
data = ...
cacheValid = true;
}
// Downgrade by acquiring read lock before releasing write lock
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock(); // Unlock write, still hold read
}
}
try {
use(data);
} finally {
rwl.readLock().unlock();
}
}這里的流程可以解釋如下:
- 獲取讀鎖:首先嘗試獲取讀鎖來檢查某個緩存是否有效。
- 檢查緩存:如果緩存無效,則需要釋放讀鎖,因為在獲取寫鎖之前必須釋放讀鎖。
- 獲取寫鎖:獲取寫鎖以便更新緩存。此時,可能還需要重新檢查緩存狀態(tài),因為在釋放讀鎖和獲取寫鎖之間可能有其他線程修改了狀態(tài)。
- 更新緩存:如果確認(rèn)緩存無效,更新緩存并將其標(biāo)記為有效。
- 寫鎖降級為讀鎖:在釋放寫鎖之前,獲取讀鎖,從而實現(xiàn)寫鎖到讀鎖的降級。這樣,在釋放寫鎖后,其他線程可以并發(fā)讀取,但不能寫入。
- 使用數(shù)據(jù):現(xiàn)在可以安全地使用緩存數(shù)據(jù)了。
- 釋放讀鎖:完成操作后釋放讀鎖。
這個流程結(jié)合了讀鎖和寫鎖的優(yōu)點,確保了數(shù)據(jù)的一致性和可用性,同時允許在可能的情況下進(jìn)行并發(fā)讀取。使用讀寫鎖的代碼可能看起來比使用簡單的互斥鎖更復(fù)雜,但它提供了更精細(xì)的并發(fā)控制,可能會提高多線程應(yīng)用程序的性能
ReadWriteLock和StampedLock
ReadWriteLock
ReadWriteLock 是Java提供的一個接口,全類名:java.util.concurrent.locks.ReadWriteLock,上面的ReentrantReadWriteLock就是繼承自這個接口。它允許多個線程同時讀取共享資源,但只允許一個線程寫入共享資源。這種機(jī)制可以提高讀取操作的并發(fā)性,但寫入操作需要獨(dú)占資源。
特性
- 多個線程可以同時獲取讀鎖,但只有一個線程可以獲取寫鎖。
- 當(dāng)一個線程持有寫鎖時,其他線程無法獲取讀鎖和寫鎖,讀寫互斥。
- 當(dāng)一個線程持有讀鎖時,其他線程可以同時獲取讀鎖,讀讀共享。但無法獲取寫鎖,讀寫互斥
使用場景
ReadWriteLock 適用于讀多寫少的場景,例如緩存系統(tǒng)、數(shù)據(jù)庫連接池等。在這些場景中,讀取操作占據(jù)大部分時間,而寫入操作較少。
使用示例
使用 ReadWriteLock 的示例,實現(xiàn)了一個簡單的緩存系統(tǒng):
public class Cache {
private Map<String, Object> data = new HashMap<>();
private ReadWriteLock lock = new ReentrantReadWriteLock();
public Object get(String key) {
lock.readLock().lock();
try {
return data.get(key);
} finally {
lock.readLock().unlock();
}
}
public void put(String key, Object value) {
lock.writeLock().lock();
try {
data.put(key, value);
} finally {
lock.writeLock().unlock();
}
}
}在上述示例中,Cache 類使用 ReadWriteLock 來實現(xiàn)對 data 的并發(fā)訪問控制。get 方法獲取讀鎖并讀取數(shù)據(jù),put 方法獲取寫鎖并寫入數(shù)據(jù)。
StampedLock
StampedLock 是Java 8 中引入的一種新的鎖機(jī)制,全類名:java.util.concurrent.locks.StampedLock,它提供了一種樂觀讀的機(jī)制,可以進(jìn)一步提升讀取操作的并發(fā)性能。
特性
- 與 ReadWriteLock 類似,StampedLock 也支持多個線程同時獲取讀鎖,但只允許一個線程獲取寫鎖。
- 與 ReadWriteLock 不同的是,StampedLock 還提供了一個樂觀讀鎖(Optimistic Read Lock),即不阻塞其他線程的寫操作,但在讀取完成后需要驗證數(shù)據(jù)的一致性。
使用場景
StampedLock 適用于讀遠(yuǎn)遠(yuǎn)大于寫的場景,并且對數(shù)據(jù)的一致性要求不高,例如統(tǒng)計數(shù)據(jù)、監(jiān)控系統(tǒng)等。
使用示例
使用 StampedLock 的示例,實現(xiàn)了一個計數(shù)器:
public class Counter {
private int count = 0;
private StampedLock lock = new StampedLock();
public int getCount() {
long stamp = lock.tryOptimisticRead();
int value = count;
if (!lock.validate(stamp)) {
stamp = lock.readLock();
try {
value = count;
} finally {
lock.unlockRead(stamp);
}
}
return value;
}
public void increment() {
long stamp = lock.writeLock();
try {
count++;
} finally {
lock.unlockWrite(stamp);
}
}
}在上述示例中,Counter 類使用 StampedLock 來實現(xiàn)對計數(shù)器的并發(fā)訪問控制。getCount 方法首先嘗試獲取樂觀讀鎖,并讀取計數(shù)器的值,然后通過 validate 方法驗證數(shù)據(jù)的一致性。如果驗證失敗,則獲取悲觀讀鎖,并重新讀取計數(shù)器的值。increment 方法獲取寫鎖,并對計數(shù)器進(jìn)行遞增操作。
小結(jié)
ReadWriteLock 和 StampedLock 都是Java中用于并發(fā)控制的重要機(jī)制。
- ReadWriteLock 適用于讀多寫少的場景;
- StampedLock 則適用于讀遠(yuǎn)遠(yuǎn)大于寫的場景,并且對數(shù)據(jù)的一致性要求不高;
在實際應(yīng)用中,我們需要根據(jù)具體場景來選擇合適的鎖機(jī)制。通過合理使用這些鎖機(jī)制,我們可以提高并發(fā)程序的性能和可靠性。
到此這篇關(guān)于Java并發(fā)讀寫鎖ReentrantReadWriteLock 的文章就介紹到這了,更多相關(guān)Java并發(fā)讀寫鎖ReentrantReadWriteLock 內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java.net.MalformedURLException異常的解決方法
下面小編就為大家?guī)硪黄猨ava.net.MalformedURLException異常的解決方法。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-05-05
SpringCloudGateway網(wǎng)關(guān)處攔截并修改請求的操作方法
這篇文章主要介紹了SpringCloudGateway網(wǎng)關(guān)處攔截并修改請求的操作方法,本文通過示例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧2023-12-12
一個applicationContext 加載錯誤導(dǎo)致的阻塞問題及解決方法
這篇文章主要介紹了一個applicationContext 加載錯誤導(dǎo)致的阻塞問題及解決方法,需要的朋友可以參考下2018-11-11
基于HttpServletResponse 相關(guān)常用方法的應(yīng)用
本篇文章小編為大家介紹,基于HttpServletResponse 相關(guān)常用方法的應(yīng)用,需要的朋友參考下2013-04-04
阿里的Easyexcel讀取Excel文件的方法(最新版本)
這篇文章主要介紹了阿里的Easyexcel讀取Excel文件(最新版本)的方法,本文通過示例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-12-12
EntityWrapper如何在and條件中嵌套o(hù)r語句
這篇文章主要介紹了EntityWrapper如何在and條件中嵌套o(hù)r語句,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-03-03
在IDEA中 實現(xiàn)給main方法附帶參數(shù)的操作
這篇文章主要介紹了在IDEA中 實現(xiàn)給main方法附帶參數(shù)的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-01-01

