Java并發(fā)讀寫鎖ReentrantReadWriteLock 使用場景
ReentrantReadWriteLock使用場景
ReentrantReadWriteLock 是 Java 的一種讀寫鎖,它允許多個讀線程同時訪問,但只允許一個寫線程訪問(會阻塞所有的讀寫線程)。這種鎖的設計可以提高性能,特別是在讀操作的數(shù)量遠遠超過寫操作的情況下。
在并發(fā)場景中,為了解決線程安全問題,我們通常會使用關鍵字 synchronized 或者 JUC 包中實現(xiàn)了 Lock 接口的 ReentrantLock。但它們都是獨占式獲取鎖,也就是在同一時刻只有一個線程能夠獲取鎖。
而在一些業(yè)務場景中,大部分只是讀數(shù)據(jù),寫數(shù)據(jù)很少,如果僅僅是讀數(shù)據(jù)的話并不會影響數(shù)據(jù)正確性,而如果在這種業(yè)務場景下,依然使用獨占鎖的話,很顯然會出現(xiàn)性能瓶頸。針對這種讀多寫少的情況,Java 提供了另外一個實現(xiàn) Lock 接口的 ReentrantReadWriteLock——讀寫鎖。
ReentrantReadWriteLock
其實就是 讀讀并發(fā)、讀寫互斥、寫寫互斥。如果一個對象并發(fā)讀的場景大于并發(fā)寫的場景,那就可以使用 ReentrantReadWriteLock
來達到保證線程安全的前提下提高并發(fā)效率。首先,我們先了解一下Doug Lea為我們準備的兩個demo。
CachedData
一個緩存對象的使用案例,緩存對象在使用時,一般并發(fā)讀的場景遠遠大于并發(fā)寫的場景,所以緩存對象是非常適合使用ReentrantReadWriteLock
來做控制的
class CachedData { //被緩存的具體對象 Object data; //當前對象是否可用,使用volatile來保證可見性 volatile boolean cacheValid; //ReentrantReadWriteLock final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); //業(yè)務處理邏輯 void processCachedData() { //要讀取數(shù)據(jù)時,先加讀鎖,如果加成功,說明此時沒有人在并發(fā)寫 rwl.readLock().lock(); //拿到讀鎖后,判斷當前對象是否有效 if (!cacheValid) { // Must release read lock before acquiring write lock //這里的處理非常經(jīng)典,當你持有讀鎖之后,不能直接獲取寫鎖, //因為寫鎖是獨占鎖,如果直接獲取寫鎖,那代碼就在這里死鎖了 //所以必須要先釋放讀鎖,然后手動獲取寫鎖 rwl.readLock().unlock(); rwl.writeLock().lock(); try { // Recheck state because another thread might have // acquired write lock and changed state before we did. //經(jīng)典處理之二,在獨占鎖內(nèi)部要處理數(shù)據(jù)時,一定要做二次校驗 //因為可能同時有多個線程全都在獲取寫鎖, //當時線程1釋放寫鎖之后,線程2馬上獲取到寫鎖,此時如果不做二次校驗那可能就導致某些操作做了多次 if (!cacheValid) { data = ... //當緩存對象更新成功后,重置標記為true cacheValid = true; } // Downgrade by acquiring read lock before releasing write lock //這里有一個非常神奇的鎖降級操作,所謂降級是說當你持有寫鎖后,可以再次獲取讀鎖 //這里之所以要獲取一次寫鎖是為了防止當前線程釋放寫鎖之后,其他線程馬上獲取到寫鎖,改變緩存對象 //因為讀寫互斥,所以有了這個讀鎖之后,在讀鎖釋放之前,別的線程是無法修改緩存對象的 rwl.readLock().lock(); } finally { rwl.writeLock().unlock(); // Unlock write, still hold read } } try { use(data); } finally { rwl.readLock().unlock(); } } }
RWDictionary
Doug Lea給出的第二個demo,一個并發(fā)容器的demo。并發(fā)容器我們一般都是直接使用ConcurrentHashMap的,但是我們可以使用非并發(fā)安全的容器+ReentrantReadWriteLock來組合出一個并發(fā)容器。如果這個并發(fā)容器的讀的頻率>寫的頻率,那這個效率還是不錯的
class RWDictionary { //原來非并發(fā)安全的容器 private final Map<String, Data> m = new TreeMap<String, Data>(); private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); private final Lock r = rwl.readLock(); private final Lock w = rwl.writeLock(); public Data get(String key) { //讀數(shù)據(jù),上讀鎖 r.lock(); try { return m.get(key); } finally { r.unlock(); } } public String[] allKeys() { //讀數(shù)據(jù),上讀鎖 r.lock(); try { return m.keySet().toArray(); } finally { r.unlock(); } } public Data put(String key, Data value) { //寫數(shù)據(jù),上寫鎖 w.lock(); try { return m.put(key, value); } finally { w.unlock(); } } public void clear() { //寫數(shù)據(jù),上寫鎖 w.lock(); try { m.clear(); } finally { w.unlock(); } } }
ReentrantReadWriteLock的特性
讀寫鎖允許同一時刻被多個讀線程訪問,但是在寫線程訪問時,所有的讀線程和其他的寫線程都會被阻塞。
在分析 WirteLock 和 ReadLock 的互斥性時,我們可以按照 WriteLock 與 WriteLock,WriteLock 與 ReadLock 以及 ReadLock 與 ReadLock 進行對比分析。
這里總結一下讀寫鎖的特性:
- 公平性選擇:支持非公平性(默認)和公平的鎖獲取方式,非公平的吞吐量優(yōu)于公平;
- 重入性:支持重入,讀鎖獲取后能再次獲取,寫鎖獲取之后能夠再次獲取寫鎖,同時也能夠獲取讀鎖;
- 鎖降級:寫鎖降級是一種允許寫鎖轉(zhuǎn)換為讀鎖的過程。通常的順序是:
- 獲取寫鎖:線程首先獲取寫鎖,確保在修改數(shù)據(jù)時排它訪問。
- 獲取讀鎖:在寫鎖保持的同時,線程可以再次獲取讀鎖。
- 釋放寫鎖:線程保持讀鎖的同時釋放寫鎖。
- 釋放讀鎖:最后線程釋放讀鎖。
這樣,寫鎖就降級為讀鎖,允許其他線程進行并發(fā)讀取,但仍然排除其他線程的寫操作。
接下來額外說一下鎖降級
- 鎖降級
鎖降級指的是寫鎖降級成為讀鎖。如果當前線程擁有寫鎖,然后將其釋放,最后再獲取讀鎖,這種分段完成的過程不能稱之為鎖降級。鎖降級是指把持住(當前擁有的)寫鎖,再獲取到讀鎖,隨后釋放(先前擁有的)寫鎖的過程。
接下來看一個鎖降級的示例。因為數(shù)據(jù)不常變化,所以多個線程可以并發(fā)地進行數(shù)據(jù)處理,當數(shù)據(jù)變更后,如果當前線程感知到數(shù)據(jù)變化,則進行數(shù)據(jù)的準備工作,同時其他處理線程被阻塞,直到當前線程完成數(shù)據(jù)的準備工作,如代碼如下所示:
public void processData() { readLock.lock(); if (!update) { // 必須先釋放讀鎖 readLock.unlock(); // 鎖降級從寫鎖獲取到開始 writeLock.lock(); try { if (!update) { // 準備數(shù)據(jù)的流程(略) update = true; } readLock.lock(); } finally { writeLock.unlock(); } // 鎖降級完成,寫鎖降級為讀鎖 } try { // 使用數(shù)據(jù)的流程(略) } finally { readLock.unlock(); } }
上述示例中,當數(shù)據(jù)發(fā)生變更后,update變量(布爾類型且volatile修飾)被設置為false,此時所有訪問processData()方法的線程都能夠感知到變化,但只有一個線程能夠獲取到寫鎖,其他線程會被阻塞在讀鎖和寫鎖的lock()方法上。當前線程獲取寫鎖完成數(shù)據(jù)準備之后,再獲取讀鎖,隨后釋放寫鎖,完成鎖降級。
鎖降級中讀鎖的獲取是否必要呢? 答案是必要的。主要是為了保證數(shù)據(jù)的可見性,如果當前線程不獲取讀鎖而是直接釋放寫鎖,假設此刻另一個線程(記作線程T)獲取了寫鎖并修改了數(shù)據(jù),那么當前線程無法感知線程T的數(shù)據(jù)更新。如果當前線程獲取讀鎖,即遵循鎖降級的步驟,則線程T將會被阻塞,直到當前線程使用數(shù)據(jù)并釋放讀鎖之后,線程T才能獲取寫鎖進行數(shù)據(jù)更新。
RentrantReadWriteLock不支持鎖升級(把持讀鎖、獲取寫鎖,最后釋放讀鎖的過程)。目的也是保證數(shù)據(jù)可見性,如果讀鎖已被多個線程獲取,其中任意線程成功獲取了寫鎖并更新了數(shù)據(jù),則其更新對其他獲取到讀鎖的線程是不可見的。
ReentrantReadWriteLock源碼分析
類的繼承關系
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {}
說明: 可以看到,ReentrantReadWriteLock實現(xiàn)了ReadWriteLock接口,ReadWriteLock接口定義了獲取讀鎖和寫鎖的規(guī)范,具體需要實現(xiàn)類去實現(xiàn);同時其還實現(xiàn)了Serializable接口,表示可以進行序列化,在源代碼中可以看到ReentrantReadWriteLock實現(xiàn)了自己的序列化邏輯。
類的內(nèi)部類
ReentrantReadWriteLock有五個內(nèi)部類,五個內(nèi)部類之間也是相互關聯(lián)的。內(nèi)部類的關系如下圖所示。
說明: 如上圖所示,Sync繼承自AQS、NonfairSync繼承自Sync類、FairSync繼承自Sync類;ReadLock實現(xiàn)了Lock接口、WriteLock也實現(xiàn)了Lock接口。
內(nèi)部類 -類Sync
- Sync類的繼承關系
abstract static class Sync extends AbstractQueuedSynchronizer {}
說明: Sync抽象類繼承自AQS抽象類,Sync類提供了對ReentrantReadWriteLock的支持。
- Sync類的內(nèi)部類
Sync類內(nèi)部存在兩個內(nèi)部類,分別為HoldCounter和ThreadLocalHoldCounter,其中HoldCounter主要與讀鎖配套使用,其中,HoldCounter源碼如下。
// 計數(shù)器 static final class HoldCounter { // 計數(shù) int count = 0; // Use id, not reference, to avoid garbage retention // 獲取當前線程的TID屬性的值 final long tid = getThreadId(Thread.currentThread()); }
說明: HoldCounter主要有兩個屬性,count和tid,其中count表示某個讀線程重入的次數(shù),tid表示該線程的tid字段的值,該字段可以用來唯一標識一個線程。ThreadLocalHoldCounter的源碼如下
// 本地線程計數(shù)器 static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { // 重寫初始化方法,在沒有進行set的情況下,獲取的都是該HoldCounter值 public HoldCounter initialValue() { return new HoldCounter(); } }
說明: ThreadLocalHoldCounter重寫了ThreadLocal的initialValue方法,ThreadLocal類可以將線程與對象相關聯(lián)。在沒有進行set的情況下,get到的均是initialValue方法里面生成的那個HolderCounter對象。
- Sync類的屬性
abstract static class Sync extends AbstractQueuedSynchronizer { // 版本序列號 private static final long serialVersionUID = 6317671515068378041L; // 高16位為讀鎖,低16位為寫鎖 static final int SHARED_SHIFT = 16; // 讀鎖單位 static final int SHARED_UNIT = (1 << SHARED_SHIFT); // 讀鎖最大數(shù)量 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; // 寫鎖最大數(shù)量 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; // 本地線程計數(shù)器 private transient ThreadLocalHoldCounter readHolds; // 緩存的計數(shù)器 private transient HoldCounter cachedHoldCounter; // 第一個讀線程 private transient Thread firstReader = null; // 第一個讀線程的計數(shù) private transient int firstReaderHoldCount; }
說明: 該屬性中包括了讀鎖、寫鎖線程的最大量。本地線程計數(shù)器等。
- Sync類的構造函數(shù)
// 構造函數(shù) Sync() { // 本地線程計數(shù)器 readHolds = new ThreadLocalHoldCounter(); // 設置AQS的狀態(tài) setState(getState()); // ensures visibility of readHolds }
說明:在Sync的構造函數(shù)中設置了本地線程計數(shù)器和AQS的狀態(tài)state。
類的屬性
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { // 版本序列號 private static final long serialVersionUID = -6992448646407690164L; // 讀鎖 private final ReentrantReadWriteLock.ReadLock readerLock; // 寫鎖 private final ReentrantReadWriteLock.WriteLock writerLock; // 同步隊列 final Sync sync; private static final sun.misc.Unsafe UNSAFE; // 線程ID的偏移地址 private static final long TID_OFFSET; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> tk = Thread.class; // 獲取線程的tid字段的內(nèi)存地址 TID_OFFSET = UNSAFE.objectFieldOffset (tk.getDeclaredField("tid")); } catch (Exception e) { throw new Error(e); } } }
說明: 可以看到ReentrantReadWriteLock屬性包括了一個ReentrantReadWriteLock.ReadLock對象,表示讀鎖;一個ReentrantReadWriteLock.WriteLock對象,表示寫鎖;一個Sync對象,表示同步隊列。
類的構造函數(shù)
- ReentrantReadWriteLock()型構造函數(shù)
public ReentrantReadWriteLock() { this(false); }
說明: 此構造函數(shù)會調(diào)用另外一個有參構造函數(shù)。
- ReentrantReadWriteLock(boolean)型構造函數(shù)
public ReentrantReadWriteLock(boolean fair) { // 公平策略或者是非公平策略 sync = fair ? new FairSync() : new NonfairSync(); // 讀鎖 readerLock = new ReadLock(this); // 寫鎖 writerLock = new WriteLock(this); }
說明: 可以指定設置公平策略或者非公平策略,并且該構造函數(shù)中生成了讀鎖與寫鎖兩個對象。
內(nèi)部類 - Sync核心函數(shù)分析
對ReentrantReadWriteLock對象的操作絕大多數(shù)都轉(zhuǎn)發(fā)至Sync對象進行處理。下面對Sync類中的重點函數(shù)進行分析
- sharedCount函數(shù)
表示占有讀鎖的線程數(shù)量,源碼如下
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
說明::直接將state右移16位,就可以得到讀鎖的線程數(shù)量,因為state的高16位表示讀鎖,對應的低十六位表示寫鎖數(shù)量。
- exclusiveCount函數(shù)
表示占有寫鎖的線程數(shù)量,源碼如下
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
說明:
EXCLUSIVE_MASK為:
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
EXCLUSIVE_MASK 為 1 左移 16 位然后減 1,即為 0x0000FFFF。而 exclusiveCount 方法是將同步狀態(tài)(state 為 int 類型)與 0x0000FFFF 相與,即取同步狀態(tài)的低 16 位。
那么低 16 位代表什么呢?根據(jù) exclusiveCount 方法的注釋為獨占式獲取的次數(shù)即寫鎖被獲取的次數(shù),現(xiàn)在就可以得出來一個結論同步狀態(tài)的低 16 位用來表示寫鎖的獲取次數(shù)。
寫鎖的獲取
同一時刻,ReentrantReadWriteLock 的寫鎖是不能被多個線程獲取的,很顯然 ReentrantReadWriteLock 的寫鎖是獨占式鎖,而實現(xiàn)寫鎖的同步語義是通過重寫 AQS 中的 tryAcquire 方法實現(xiàn)的。
- tryAcquire函數(shù)
protected final boolean tryAcquire(int acquires) { /* * Walkthrough: * 1. If read count nonzero or write count nonzero * and owner is a different thread, fail. * 2. If count would saturate, fail. (This can only * happen if count is already nonzero.) * 3. Otherwise, this thread is eligible for lock if * it is either a reentrant acquire or * queue policy allows it. If so, update state * and set owner. */ // 獲取當前線程 Thread current = Thread.currentThread(); // 獲取狀態(tài) int c = getState(); // 寫線程數(shù)量 int w = exclusiveCount(c); if (c != 0) { // 狀態(tài)不為0 // (Note: if c != 0 and w == 0 then shared count != 0) if (w == 0 || current != getExclusiveOwnerThread()) // 寫線程數(shù)量為0或者當前線程沒有占有獨占資源 return false; if (w + exclusiveCount(acquires) > MAX_COUNT) // 判斷是否超過最高寫線程數(shù)量 throw new Error("Maximum lock count exceeded"); // Reentrant acquire // 設置AQS狀態(tài) setState(c + acquires); return true; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) // 寫線程是否應該被阻塞 return false; // 設置獨占線程 setExclusiveOwnerThread(current); return true; }
說明: 此函數(shù)用于獲取寫鎖:首先會獲取state,判斷是否為0;
1. 若為0,表示此時沒有讀鎖線程,再判斷寫線程是否應該被阻塞,而在非公平策略下總是不會被阻塞,在公平策略下會進行判斷(判斷同步隊列中是否有等待時間更長的線程;若存在,則需要被阻塞,否則,無需阻塞),之后在設置狀態(tài)state,然后返回true。
2. 若state不為0,則表示此時存在讀鎖或?qū)戞i線程,若寫鎖線程數(shù)量為0或者當前線程為獨占鎖線程,則返回false,表示不成功,否則,判斷寫鎖線程的重入次數(shù)是否大于了最大值,若是,則拋出異常,否則,設置狀態(tài)state,返回true,表示成功。其函數(shù)流程圖如下
其主要邏輯為:當讀鎖已經(jīng)被讀線程獲取或者寫鎖已經(jīng)被其他寫線程獲取,則寫鎖獲取失?。环駝t,獲取成功并支持重入,增加寫狀態(tài)。
寫鎖的釋放
寫鎖釋放通過重寫 AQS 的 tryRelease 方法,源碼為:
- tryRelease函數(shù)
/* * Note that tryRelease and tryAcquire can be called by * Conditions. So it is possible that their arguments contain * both read and write holds that are all released during a * condition wait and re-established in tryAcquire. */ protected final boolean tryRelease(int releases) { // 判斷是否偽獨占線程 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 計算釋放資源后的寫鎖的數(shù)量 int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; // 是否釋放成功 if (free) setExclusiveOwnerThread(null); // 設置獨占線程為空 setState(nextc); // 設置狀態(tài) return free; }
說明: 此函數(shù)用于釋放寫鎖資源,首先會判斷該線程是否為獨占線程,若不為獨占線程,則拋出異常,否則,計算釋放資源后的寫鎖的數(shù)量,若為0,表示成功釋放,資源不將被占用,否則,表示資源還被占用。其函數(shù)流程圖如下。
讀鎖的獲取
看完了寫鎖,再來看看讀鎖,讀鎖不是獨占式鎖,即同一時刻該鎖可以被多個讀線程獲取,也就是一種共享式鎖。按照之前對 AQS 的介紹,實現(xiàn)共享式同步組件的同步語義需要通過重寫 AQS 的 tryAcquireShared 方法和 tryReleaseShared 方法。讀鎖的獲取實現(xiàn)方法為:
- tryAcquireShared函數(shù)
private IllegalMonitorStateException unmatchedUnlockException() { return new IllegalMonitorStateException( "attempt to unlock read lock, not locked by current thread"); } // 共享模式下獲取資源 protected final int tryAcquireShared(int unused) { /* * Walkthrough: * 1. If write lock held by another thread, fail. * 2. Otherwise, this thread is eligible for * lock wrt state, so ask if it should block * because of queue policy. If not, try * to grant by CASing state and updating count. * Note that step does not check for reentrant * acquires, which is postponed to full version * to avoid having to check hold count in * the more typical non-reentrant case. * 3. If step 2 fails either because thread * apparently not eligible or CAS fails or count * saturated, chain to version with full retry loop. */ // 獲取當前線程 Thread current = Thread.currentThread(); // 獲取狀態(tài) int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) // 寫線程數(shù)不為0并且占有資源的不是當前線程 return -1; // 讀鎖數(shù)量 int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { // 讀線程是否應該被阻塞、并且小于最大值、并且比較設置成功 if (r == 0) { // 讀鎖數(shù)量為0 // 設置第一個讀線程 firstReader = current; // 讀線程占用的資源數(shù)為1 firstReaderHoldCount = 1; } else if (firstReader == current) { // 當前線程為第一個讀線程 // 占用資源數(shù)加1 firstReaderHoldCount++; } else { // 讀鎖數(shù)量不為0并且不為當前線程 // 獲取計數(shù)器 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) // 計數(shù)器為空或者計數(shù)器的tid不為當前正在運行的線程的tid // 獲取當前線程對應的計數(shù)器 cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) // 計數(shù)為0 // 設置 readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); }
說明: 此函數(shù)表示讀鎖線程獲取讀鎖。首先判斷寫鎖是否為0并且當前線程不占有獨占鎖,直接返回;否則,判斷讀線程是否需要被阻塞并且讀鎖數(shù)量是否小于最大值并且比較設置狀態(tài)成功,若當前沒有讀鎖,則設置第一個讀線程firstReader和firstReaderHoldCount;若當前線程線程為第一個讀線程,則增加firstReaderHoldCount;否則,將設置當前線程對應的HoldCounter對象的值。流程圖如下。
當寫鎖被其他線程獲取后,讀鎖獲取失敗,否則獲取成功,會利用 CAS 更新同步狀態(tài)。
另外,當前同步狀態(tài)需要加上 SHARED_UNIT((1 << SHARED_SHIFT)
,即 0x00010000)的原因,我們在上面也說過了,同步狀態(tài)的高 16 位用來表示讀鎖被獲取的次數(shù)。
如果 CAS 失敗或者已經(jīng)獲取讀鎖的線程再次獲取讀鎖時,是靠 fullTryAcquireShared 方法實現(xiàn)的。
- fullTryAcquireShared函數(shù)
final int fullTryAcquireShared(Thread current) { /* * This code is in part redundant with that in * tryAcquireShared but is simpler overall by not * complicating tryAcquireShared with interactions between * retries and lazily reading hold counts. */ HoldCounter rh = null; for (;;) { // 無限循環(huán) // 獲取狀態(tài) int c = getState(); if (exclusiveCount(c) != 0) { // 寫線程數(shù)量不為0 if (getExclusiveOwnerThread() != current) // 不為當前線程 return -1; // else we hold the exclusive lock; blocking here // would cause deadlock. } else if (readerShouldBlock()) { // 寫線程數(shù)量為0并且讀線程被阻塞 // Make sure we're not acquiring read lock reentrantly if (firstReader == current) { // 當前線程為第一個讀線程 // assert firstReaderHoldCount > 0; } else { // 當前線程不為第一個讀線程 if (rh == null) { // 計數(shù)器不為空 // rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { // 計數(shù)器為空或者計數(shù)器的tid不為當前正在運行的線程的tid rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } if (sharedCount(c) == MAX_COUNT) // 讀鎖數(shù)量為最大值,拋出異常 throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { // 比較并且設置成功 if (sharedCount(c) == 0) { // 讀線程數(shù)量為0 // 設置第一個讀線程 firstReader = current; // firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
說明: 在tryAcquireShared函數(shù)中,如果下列三個條件不滿足(讀線程是否應該被阻塞、小于最大值、比較設置成功)則會進行fullTryAcquireShared函數(shù)中,它用來保證相關操作可以成功。其邏輯與tryAcquireShared邏輯類似,可以繼續(xù)再往后看。
讀鎖的釋放
讀鎖釋放的實現(xiàn)主要通過方法 tryReleaseShared,源碼如下,主要邏輯請看注釋:
- tryReleaseShared函數(shù)
protected final boolean tryReleaseShared(int unused) { // 獲取當前線程 Thread current = Thread.currentThread(); if (firstReader == current) { // 當前線程為第一個讀線程 // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) // 讀線程占用的資源數(shù)為1 firstReader = null; else // 減少占用的資源 firstReaderHoldCount--; } else { // 當前線程不為第一個讀線程 // 獲取緩存的計數(shù)器 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) // 計數(shù)器為空或者計數(shù)器的tid不為當前正在運行的線程的tid // 獲取當前線程對應的計數(shù)器 rh = readHolds.get(); // 獲取計數(shù) int count = rh.count; if (count <= 1) { // 計數(shù)小于等于1 // 移除 readHolds.remove(); if (count <= 0) // 計數(shù)小于等于0,拋出異常 throw unmatchedUnlockException(); } // 減少計數(shù) --rh.count; } for (;;) { // 無限循環(huán) // 獲取狀態(tài) int c = getState(); // 獲取狀態(tài) int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // 比較并進行設置 // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; } }
說明: 此函數(shù)表示讀鎖線程釋放鎖。首先判斷當前線程是否為第一個讀線程firstReader,若是,則判斷第一個讀線程占有的資源數(shù)firstReaderHoldCount是否為1,若是,則設置第一個讀線程firstReader為空,否則,將第一個讀線程占有的資源數(shù)firstReaderHoldCount減1;若當前線程不是第一個讀線程,那么首先會獲取緩存計數(shù)器(上一個讀鎖線程對應的計數(shù)器 ),若計數(shù)器為空或者tid不等于當前線程的tid值,則獲取當前線程的計數(shù)器,如果計數(shù)器的計數(shù)count小于等于1,則移除當前線程對應的計數(shù)器,如果計數(shù)器的計數(shù)count小于等于0,則拋出異常,之后再減少計數(shù)即可。無論何種情況,都會進入無限循環(huán),該循環(huán)可以確保成功設置狀態(tài)state。其流程圖如下
鎖降級
讀寫鎖支持鎖降級,遵循按照獲取寫鎖,獲取讀鎖再釋放寫鎖的次序,寫鎖能夠降級成為讀鎖,不支持鎖升級,關于鎖降級,下面的示例代碼摘自 ReentrantWriteReadLock 源碼:
void processCachedData() { rwl.readLock().lock(); if (!cacheValid) { // Must release read lock before acquiring write lock rwl.readLock().unlock(); rwl.writeLock().lock(); try { // Recheck state because another thread might have // acquired write lock and changed state before we did. if (!cacheValid) { data = ... cacheValid = true; } // Downgrade by acquiring read lock before releasing write lock rwl.readLock().lock(); } finally { rwl.writeLock().unlock(); // Unlock write, still hold read } } try { use(data); } finally { rwl.readLock().unlock(); } }
這里的流程可以解釋如下:
- 獲取讀鎖:首先嘗試獲取讀鎖來檢查某個緩存是否有效。
- 檢查緩存:如果緩存無效,則需要釋放讀鎖,因為在獲取寫鎖之前必須釋放讀鎖。
- 獲取寫鎖:獲取寫鎖以便更新緩存。此時,可能還需要重新檢查緩存狀態(tài),因為在釋放讀鎖和獲取寫鎖之間可能有其他線程修改了狀態(tài)。
- 更新緩存:如果確認緩存無效,更新緩存并將其標記為有效。
- 寫鎖降級為讀鎖:在釋放寫鎖之前,獲取讀鎖,從而實現(xiàn)寫鎖到讀鎖的降級。這樣,在釋放寫鎖后,其他線程可以并發(fā)讀取,但不能寫入。
- 使用數(shù)據(jù):現(xiàn)在可以安全地使用緩存數(shù)據(jù)了。
- 釋放讀鎖:完成操作后釋放讀鎖。
這個流程結合了讀鎖和寫鎖的優(yōu)點,確保了數(shù)據(jù)的一致性和可用性,同時允許在可能的情況下進行并發(fā)讀取。使用讀寫鎖的代碼可能看起來比使用簡單的互斥鎖更復雜,但它提供了更精細的并發(fā)控制,可能會提高多線程應用程序的性能
ReadWriteLock和StampedLock
ReadWriteLock
ReadWriteLock 是Java提供的一個接口,全類名:java.util.concurrent.locks.ReadWriteLock,上面的ReentrantReadWriteLock就是繼承自這個接口。它允許多個線程同時讀取共享資源,但只允許一個線程寫入共享資源。這種機制可以提高讀取操作的并發(fā)性,但寫入操作需要獨占資源。
特性
- 多個線程可以同時獲取讀鎖,但只有一個線程可以獲取寫鎖。
- 當一個線程持有寫鎖時,其他線程無法獲取讀鎖和寫鎖,讀寫互斥。
- 當一個線程持有讀鎖時,其他線程可以同時獲取讀鎖,讀讀共享。但無法獲取寫鎖,讀寫互斥
使用場景
ReadWriteLock 適用于讀多寫少的場景,例如緩存系統(tǒng)、數(shù)據(jù)庫連接池等。在這些場景中,讀取操作占據(jù)大部分時間,而寫入操作較少。
使用示例
使用 ReadWriteLock 的示例,實現(xiàn)了一個簡單的緩存系統(tǒng):
public class Cache { private Map<String, Object> data = new HashMap<>(); private ReadWriteLock lock = new ReentrantReadWriteLock(); public Object get(String key) { lock.readLock().lock(); try { return data.get(key); } finally { lock.readLock().unlock(); } } public void put(String key, Object value) { lock.writeLock().lock(); try { data.put(key, value); } finally { lock.writeLock().unlock(); } } }
在上述示例中,Cache 類使用 ReadWriteLock 來實現(xiàn)對 data 的并發(fā)訪問控制。get 方法獲取讀鎖并讀取數(shù)據(jù),put 方法獲取寫鎖并寫入數(shù)據(jù)。
StampedLock
StampedLock 是Java 8 中引入的一種新的鎖機制,全類名:java.util.concurrent.locks.StampedLock,它提供了一種樂觀讀的機制,可以進一步提升讀取操作的并發(fā)性能。
特性
- 與 ReadWriteLock 類似,StampedLock 也支持多個線程同時獲取讀鎖,但只允許一個線程獲取寫鎖。
- 與 ReadWriteLock 不同的是,StampedLock 還提供了一個樂觀讀鎖(Optimistic Read Lock),即不阻塞其他線程的寫操作,但在讀取完成后需要驗證數(shù)據(jù)的一致性。
使用場景
StampedLock 適用于讀遠遠大于寫的場景,并且對數(shù)據(jù)的一致性要求不高,例如統(tǒng)計數(shù)據(jù)、監(jiān)控系統(tǒng)等。
使用示例
使用 StampedLock 的示例,實現(xiàn)了一個計數(shù)器:
public class Counter { private int count = 0; private StampedLock lock = new StampedLock(); public int getCount() { long stamp = lock.tryOptimisticRead(); int value = count; if (!lock.validate(stamp)) { stamp = lock.readLock(); try { value = count; } finally { lock.unlockRead(stamp); } } return value; } public void increment() { long stamp = lock.writeLock(); try { count++; } finally { lock.unlockWrite(stamp); } } }
在上述示例中,Counter 類使用 StampedLock 來實現(xiàn)對計數(shù)器的并發(fā)訪問控制。getCount 方法首先嘗試獲取樂觀讀鎖,并讀取計數(shù)器的值,然后通過 validate 方法驗證數(shù)據(jù)的一致性。如果驗證失敗,則獲取悲觀讀鎖,并重新讀取計數(shù)器的值。increment 方法獲取寫鎖,并對計數(shù)器進行遞增操作。
小結
ReadWriteLock 和 StampedLock 都是Java中用于并發(fā)控制的重要機制。
- ReadWriteLock 適用于讀多寫少的場景;
- StampedLock 則適用于讀遠遠大于寫的場景,并且對數(shù)據(jù)的一致性要求不高;
在實際應用中,我們需要根據(jù)具體場景來選擇合適的鎖機制。通過合理使用這些鎖機制,我們可以提高并發(fā)程序的性能和可靠性。
到此這篇關于Java并發(fā)讀寫鎖ReentrantReadWriteLock 的文章就介紹到這了,更多相關Java并發(fā)讀寫鎖ReentrantReadWriteLock 內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
java.net.MalformedURLException異常的解決方法
下面小編就為大家?guī)硪黄猨ava.net.MalformedURLException異常的解決方法。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-05-05SpringCloudGateway網(wǎng)關處攔截并修改請求的操作方法
這篇文章主要介紹了SpringCloudGateway網(wǎng)關處攔截并修改請求的操作方法,本文通過示例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友參考下吧2023-12-12一個applicationContext 加載錯誤導致的阻塞問題及解決方法
這篇文章主要介紹了一個applicationContext 加載錯誤導致的阻塞問題及解決方法,需要的朋友可以參考下2018-11-11基于HttpServletResponse 相關常用方法的應用
本篇文章小編為大家介紹,基于HttpServletResponse 相關常用方法的應用,需要的朋友參考下2013-04-04阿里的Easyexcel讀取Excel文件的方法(最新版本)
這篇文章主要介紹了阿里的Easyexcel讀取Excel文件(最新版本)的方法,本文通過示例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-12-12在IDEA中 實現(xiàn)給main方法附帶參數(shù)的操作
這篇文章主要介紹了在IDEA中 實現(xiàn)給main方法附帶參數(shù)的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-01-01