欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java多線程之ReentrantReadWriteLock源碼解析

 更新時間:2021年05月06日 12:01:55   作者:Java與大數(shù)據進階  
這篇文章主要介紹了Java多線程之ReentrantReadWriteLock源碼解析,文中有非常詳細的代碼示例,對正在學習java基礎的小伙伴們有非常好的幫助,需要的朋友可以參考下

一、介紹

1.1 ReentrantReadWriteLock

ReentrantReadWriteLock 是一個讀寫鎖,允許多個讀或者一個寫線程在執(zhí)行。

內部的 Sync 繼承自 AQS,這個 Sync 包含一個共享讀鎖 ReadLock 和一個獨占寫鎖 WriteLock。

該鎖可以設置公平和非公平,默認非公平。

一個持有寫鎖的線程可以獲取讀鎖。如果該線程先持有寫鎖,再持有讀鎖并釋放寫鎖,稱為鎖降級。

WriteLock支持Condition并且與ReentrantLock語義一致,而ReadLock則不能使用Condition,否則拋出UnsupportedOperationException異常。

public class ReentrantReadWriteLock implements ReadWriteLock {
    /** 讀鎖 */
    private final ReentrantReadWriteLock.ReadLock readerLock;
    /** 寫鎖 */
    private final ReentrantReadWriteLock.WriteLock writerLock;
    /** 持有的AQS子類對象 */
    final Sync sync;

    abstract static class Sync extends AbstractQueuedSynchronizer {}

    static final class NonfairSync extends Sync {}

    static final class FairSync extends Sync {}

    public static class ReadLock implements Lock {}

    public static class WriteLock implements Lock {}
  
    //默認非公平
    public ReentrantReadWriteLock() {
        this(false);
    }

    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

    public static class ReadLock implements Lock {
    	private final Sync sync;
        protected ReadLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }
    }

    public static class WriteLock implements Lock {
    	private final Sync sync;
        protected WriteLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }
    }

}

1.2 state

Sync 繼承了 AQS,其中有一個 int 的成員變量 state,int 共32位,這里將其視為兩部分,高16位表示讀的數(shù)量,低16位表示寫的數(shù)量,這里的數(shù)量表示線程重入后的總數(shù)量。

abstract static class Sync extends AbstractQueuedSynchronizer {
  	//繼承的一個int的成員變量,將其拆分為高16位和低16位
    //private volatile int state;
    static final int SHARED_SHIFT   = 16;
  	//讀一次,鎖增加的值
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

    //讀的數(shù)量
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
    //寫的數(shù)量
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
}

1.3 HoldCounter

讀鎖使用了一個 ThreadLocal<HoldCounter> 讓每個線程有一個線程私有的 HoldCounter,HoldCounter包含一個線程 id 以及讀重入的次數(shù)。

查找對應線程的HoldCounter 其實只用一個 ThreadLocalHoldCounter 也足夠了。這里為了加快查詢,用了兩個額外的緩存,即 cachedHoldCounter、firstReaderfirstReaderHoldCount(后兩個組合起來相當于一個 HoldCounter)。

在讀鎖的相關操作中,先檢查 firstReader 是否為當前線程,否則檢查 cachedHoldCounter 內部的線程是否為當前線程,如果失敗最后會通過 readHolds 來獲取當前線程的 HoldCounter。

static final class HoldCounter {
    int count = 0;
    // 使用線程id,而不是線程的引用。這樣可以防止垃圾不被回收
    final long tid = getThreadId(Thread.currentThread());
}

static final class ThreadLocalHoldCounter
    extends ThreadLocal<HoldCounter> {
    public HoldCounter initialValue() {
        return new HoldCounter();
    }
}
//使用的ThreadLocal
private transient ThreadLocalHoldCounter readHolds;
//一個緩存
private transient HoldCounter cachedHoldCounter;
//組合起來相當于一個緩存
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;

二、讀鎖

2.1 讀鎖的獲取

下面講解 tryAcquireSharedtryReadLock,tryReadLock 是一種直接搶占的非公平獲取,和 tryAcquireShared 中的非公平獲取有所不同。

2.1.1 tryAcquireShared

根據注釋

1.檢查是否存在其他線程持有的寫鎖,是的話失敗,返回 -1;

2.判斷在當前公平狀態(tài)下能否讀,以及是否超過讀的最大數(shù)量,滿足條件則嘗試 CAS 修改狀態(tài),讓 state 加一個單位的讀 SHARED_UNIT;修改成功后會根據三種情況,即首次讀、firstReader 是當前線程,以及其他情況分別進行處理,成功,返回1;

3.前面未返回結果,會執(zhí)行 fullTryAcquireShared

可以將該方法視為 fullTryAcquireShared 的一次快速嘗試,如果嘗試失敗,會在 fullTryAcquireShared 的自旋中一直執(zhí)行,直到返回成功或者失敗。

//ReadLock
public void lock() {
    sync.acquireShared(1);
}  
//AQS
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
} 
//Sync
protected final int tryAcquireShared(int unused) {
    /*
     * Walkthrough:
     * 1. If write lock held by another thread, fail.
     * 2. Otherwise, this thread is eligible for
     *    lock wrt state, so ask if it should block
     *    because of queue policy. If not, try
     *    to grant by CASing state and updating count.
     *    Note that step does not check for reentrant
     *    acquires, which is postponed to full version
     *    to avoid having to check hold count in
     *    the more typical non-reentrant case.
     * 3. If step 2 fails either because thread
     *    apparently not eligible or CAS fails or count
     *    saturated, chain to version with full retry loop.
     */
    Thread current = Thread.currentThread();
    int c = getState();
  	// 如果寫的數(shù)量不是0,且寫線程不是當前線程,失敗
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
  	// 獲取讀的個數(shù)
    int r = sharedCount(c);
  	// 如果當前線程想要讀,沒有被堵塞
  	// 當前讀的數(shù)量未超過最大允許的讀的個數(shù)
  	// CAS執(zhí)行成功
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
      	// 第一次讀,修改firstReader和firstReaderHoldCount 
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
          // 如果當前線程正好是firstReader
        } else if (firstReader == current) {
            firstReaderHoldCount++;
          // 其他情況,經過一系列處理后,使得rh為當前線程的HoldCounter
          // 對rh的記數(shù)加一
        } else {
            HoldCounter rh = cachedHoldCounter;
          	// 如果cached為null或者不是當前線程
            if (rh == null || rh.tid != getThreadId(current))
              	// 從readHolds中get,并修改cached
                cachedHoldCounter = rh = readHolds.get();
          	// 如果cached不是null,但記數(shù)為null
          	// 這種情況表示當前線程的HoldCounter已經被刪除,即為null,
          	// 但cached仍然保留著null之前的那個HoldCounter,
          	// 為了方便,直接將cached設置給ThreadLocal即可
            else if (rh.count == 0)
                readHolds.set(rh);
          	//執(zhí)行到這里,rh表示當前線程的HoldCounter,記數(shù)加1
            rh.count++;
        }
        return 1;
    }
  	// 前面未返回結果,執(zhí)行第三步
    return fullTryAcquireShared(current);
}

2.1.2 fullTryAcquireShared

在上述的簡單嘗試 tryAcquireShared 未能確定結果后,執(zhí)行第三步 fullTryAcquireShared 自旋來不斷嘗試獲取讀鎖,直到成功獲取鎖返回1,或者滿足相應條件認定失敗返回-1。

1.其他線程持有寫鎖,失敗

2.當前線程讀的嘗試滿足堵塞條件表示當前線程排在其他線程后面,且當前線程沒有持有鎖即非重入的情況,失敗

3.其他情況則不斷自旋CAS,達到最大讀的數(shù)量會拋出異常,其他情況在成功后返回1。

final int fullTryAcquireShared(Thread current) {
    /*
     * This code is in part redundant with that in
     * tryAcquireShared but is simpler overall by not
     * complicating tryAcquireShared with interactions between
     * retries and lazily reading hold counts.
     */
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0) {
          	// 存在其他線程持有寫鎖,返回-1
            if (getExclusiveOwnerThread() != current)
                return -1;
            // else we hold the exclusive lock; blocking here
            // would cause deadlock.
          //沒有寫鎖,且該線程排在其他線程后面,應該被堵塞
          //如果已經持有讀鎖,此次獲取是重入,可以執(zhí)行else if 之后的操作;
          //否則,會被堵塞,返回-1。
        } else if (readerShouldBlock()) {
            // Make sure we're not acquiring read lock reentrantly
          	//檢查firstReader
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                      	//執(zhí)行到下一步rh是cached或者readHolds.get(),檢查rh
                        rh = readHolds.get();
                      	//在get時,如果不存在,會產生一個新的HoldCounter
                      	//記數(shù)為0表示不是重入鎖,會刪除讓其重新為null
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
              	//返回失敗
                if (rh.count == 0)
                    return -1;
            }
        }
      	//達到最大值,不允許繼續(xù)增加
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
      	//和2.1.1中相似
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

2.1.3 readerShouldBlock

該方法返回當前線程請求獲得讀鎖是否應該被堵塞,在公平鎖和非公平鎖中的實現(xiàn)不同

在公平鎖中,返回在排隊的隊列中當前線程之前是否存在其他線程,是的話返回 true,當前線程在隊列頭部或者隊列為空返回 false。

// FairSync
final boolean readerShouldBlock() {
    return hasQueuedPredecessors();
}
// AQS
public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

在非公平鎖中,隊列中存在兩個節(jié)點,且第二個節(jié)點是獨占的寫節(jié)點,會返回 true,使得新來的讀線程堵塞。

這種方式只能在第二個節(jié)點是請求寫鎖的情況下返回 true,避免寫鎖的無限等待;如果寫鎖的請求節(jié)點在隊列的其他位置,返回 false,不影響新來的讀線程獲取讀鎖。

如果不按照這種方式處理,而按照隊列中的順序進行處理,則只要存在其他線程在讀,每次來一個新的線程請求讀鎖,總是成功,寫鎖會一直等待下去。

// NonfairSync
final boolean readerShouldBlock() {
    /* As a heuristic to avoid indefinite writer starvation,
     * block if the thread that momentarily appears to be head
     * of queue, if one exists, is a waiting writer.  This is
     * only a probabilistic effect since a new reader will not
     * block if there is a waiting writer behind other enabled
     * readers that have not yet drained from the queue.
     */
    return apparentlyFirstQueuedIsExclusive();
}
// AQS
final boolean apparentlyFirstQueuedIsExclusive() {
    Node h, s;
    return (h = head) != null &&
        (s = h.next)  != null &&
        !s.isShared()         &&
        s.thread != null;
}

2.1.4 tryReadLock

fullTryAcquireShared 有相似之處,該方法總是直接去搶占鎖,直到其他線程獲取寫鎖返回失敗,或者當前當前線程獲取讀鎖返回成功。

//ReadLock
public boolean tryLock() {
    return sync.tryReadLock();
}
//Sync
final boolean tryReadLock() {
    Thread current = Thread.currentThread();
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return false;
        int r = sharedCount(c);
        if (r == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (r == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return true;
        }
    }
}

2.2 讀鎖的釋放

tryReleaseShared 在 if/else 中實現(xiàn)了通過 first/cached/readHolds 獲取相應的 HoldCounter,并修改其中的記數(shù),記數(shù)為0則刪除;在 for 中,不斷自旋實現(xiàn) CAS 修改狀態(tài) c,如果修改后的狀態(tài)為0,表示讀寫鎖全部釋放,返回 true,否則是 false。

// ReadLockpublic void unlock() {    sync.releaseShared(1);}// AQSpublic final boolean releaseShared(int arg) {    if (tryReleaseShared(arg)) {        doReleaseShared();        return true;    }    return false;}// Syncprotected final boolean tryReleaseShared(int unused) {    Thread current = Thread.currentThread();  	// 先檢查 firstReader是否是當前線程    if (firstReader == current) {        // assert firstReaderHoldCount > 0;        if (firstReaderHoldCount == 1)            firstReader = null;        else            firstReaderHoldCount--;      //否則,處理 cached/readHolds中的HoldCounter    } else {        HoldCounter rh = cachedHoldCounter;        if (rh == null || rh.tid != getThreadId(current))            rh = readHolds.get();        int count = rh.count;        if (count <= 1) {            readHolds.remove();            if (count <= 0)                throw unmatchedUnlockException();        }        --rh.count;    }  	//自旋修改 state    for (;;) {        int c = getState();        int nextc = c - SHARED_UNIT;        if (compareAndSetState(c, nextc))            // Releasing the read lock has no effect on readers,            // but it may allow waiting writers to proceed if            // both read and write locks are now free.          	//只有讀寫鎖均釋放干凈,才返回true            return nextc == 0;    }}

三、寫鎖

3.1 寫鎖的獲取

下面講解 tryAcquiretryWriteLocktryWriteLock 是一種非公平的獲取。

3.1.1 tryAcquire

根據注釋,tryAcquire 分為三步

1.如果讀記數(shù)非0,或者寫記數(shù)非0且寫線程不是當前線程,失敗

2.寫鎖的獲取應該被堵塞或者CAS失敗,失敗

3.其他情況,寫重入和新來的寫線程,均成功

//WriteLockpublic void lock() {    sync.acquire(1);}//AQSpublic final void acquire(int arg) {    if (!tryAcquire(arg) &&        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))        selfInterrupt();}//Syncprotected final boolean tryAcquire(int acquires) {    /*     * Walkthrough:     * 1. If read count nonzero or write count nonzero     *    and owner is a different thread, fail.     * 2. If count would saturate, fail. (This can only     *    happen if count is already nonzero.)     * 3. Otherwise, this thread is eligible for lock if     *    it is either a reentrant acquire or     *    queue policy allows it. If so, update state     *    and set owner.     */    Thread current = Thread.currentThread();    int c = getState();    int w = exclusiveCount(c);  	//c分為兩部分,寫和讀    if (c != 0) {        // (Note: if c != 0 and w == 0 then shared count != 0)      	// c非0,w是0,則讀記數(shù)非0 || 獨占的寫線程不是當前線程      	// 返回 false        if (w == 0 || current != getExclusiveOwnerThread())            return false;        if (w + exclusiveCount(acquires) > MAX_COUNT)            throw new Error("Maximum lock count exceeded");        // Reentrant acquire      	// 重入的情況        setState(c + acquires);        return true;    }  	// 寫應該被堵塞或者CAS失敗,返回false    if (writerShouldBlock() ||        !compareAndSetState(c, c + acquires))        return false;  	// 非重入,在CAS成功后,設定獨占寫線程為當前線程,返回true    setExclusiveOwnerThread(current);    return true;}

3.1.2 writerShouldBlock

在公平鎖中,檢查隊列前面是否有其他線程在排隊,在非公平鎖中,總是返回false,即總是不堵塞。

//FairSyncfinal boolean writerShouldBlock() {    return hasQueuedPredecessors();}//NonfairSyncfinal boolean writerShouldBlock() {    return false; // writers can always barge}

3.1.3 tryWriteLock

tryAcquire 在非公平鎖的寫法基本一樣。

final boolean tryWriteLock() {    Thread current = Thread.currentThread();    int c = getState();    if (c != 0) {        int w = exclusiveCount(c);        if (w == 0 || current != getExclusiveOwnerThread())            return false;        if (w == MAX_COUNT)            throw new Error("Maximum lock count exceeded");    }    if (!compareAndSetState(c, c + 1))        return false;    setExclusiveOwnerThread(current);    return true;}

3.2 寫鎖的釋放

tryRelease 中,修改相應的狀態(tài),如果修改后寫鎖記數(shù)為0,則返回 true。

//WriteLockpublic void unlock() {    sync.release(1);}//AQSpublic final boolean release(int arg) {    if (tryRelease(arg)) {        Node h = head;        if (h != null && h.waitStatus != 0)            unparkSuccessor(h);        return true;    }    return false;}//Syncprotected final boolean tryRelease(int releases) {  	// 首先檢查當前線程是否持有寫鎖    if (!isHeldExclusively())        throw new IllegalMonitorStateException();    int nextc = getState() - releases;  	// 根據修改后的寫記數(shù)來確定free    boolean free = exclusiveCount(nextc) == 0;  	// 此時,寫鎖完全釋放,設定寫獨占線程為null    if (free)        setExclusiveOwnerThread(null);    setState(nextc);  	// 返回 free    return free;}

四、鎖降級

如果一個線程已經持有寫鎖,再去獲取讀鎖并釋放寫鎖,這個過程稱為鎖降級。

持有寫鎖的時候去獲取讀鎖,只有該持有寫鎖的線程能夠成功獲取讀鎖,然后再釋放寫鎖,保證此時當前線程是有讀鎖的;如果有寫鎖,先釋放寫鎖,再獲取讀鎖,可能暫時不能獲取讀鎖,會在隊列中排隊等待。

到此這篇關于Java基礎之ReentrantReadWriteLock源碼解析的文章就介紹到這了,更多相關Java ReentrantReadWriteLock源碼解析內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • Java 集合的Contains和Remove方法

    Java 集合的Contains和Remove方法

    這篇文章主要介紹了Java 集合Contains和Remove方法的相關資料,幫助大家更好的理解和學習使用Java,感興趣的朋友可以了解下
    2021-02-02
  • 淺談Java Fork/Join并行框架

    淺談Java Fork/Join并行框架

    這篇文章主要介紹了淺談Java Fork/Join并行框架,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-09-09
  • java 對象輸入輸出流讀寫文件的操作實例

    java 對象輸入輸出流讀寫文件的操作實例

    這篇文章主要介紹了java 對象輸入輸出流讀寫文件的操作實例的相關資料,這里使用實現(xiàn)Serializable接口,需要的朋友可以參考下
    2017-07-07
  • 使用MAT進行JVM內存分析實例

    使用MAT進行JVM內存分析實例

    這篇文章主要介紹了使用MAT進行JVM內存分析實例,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-04-04
  • Java使用OpenOffice將office文件轉換為PDF的示例方法

    Java使用OpenOffice將office文件轉換為PDF的示例方法

    OpenOffice是一個開源的辦公套件,它包含了文檔處理、電子表格、演示文稿以及繪圖等多種功能,類似于Microsoft Office,本文將給大家介紹Java使用OpenOffice將office文件轉換為PDF的示例方法,需要的朋友可以參考下
    2024-09-09
  • Java實戰(zhàn)之實現(xiàn)物流配送系統(tǒng)示例詳解

    Java實戰(zhàn)之實現(xiàn)物流配送系統(tǒng)示例詳解

    這篇文章主要介紹了一個java實戰(zhàn)項目:通過java、SSM、JSP、mysql和redis實現(xiàn)一個物流配送系統(tǒng)。文中的示例代碼非常詳細,需要的朋友可以參考一下
    2021-12-12
  • Eolink上傳文件到Java后臺進行處理的示例代碼

    Eolink上傳文件到Java后臺進行處理的示例代碼

    這篇文章主要介紹了Eolink上傳文件到Java后臺進行處理,這里是上傳的excel表格數(shù)據并轉換為java集合對象、然后進行業(yè)務邏輯處理判斷最后保存到數(shù)據庫?,需要的朋友可以參考下
    2022-12-12
  • Java并發(fā)Map面試線程安全數(shù)據結構全面分析

    Java并發(fā)Map面試線程安全數(shù)據結構全面分析

    本文將探討如何在Java中有效地應對這些挑戰(zhàn),介紹一種強大的工具并發(fā)Map,它能夠幫助您管理多線程環(huán)境下的共享數(shù)據,確保數(shù)據的一致性和高性能,深入了解Java中的并發(fā)Map實現(xiàn),包括ConcurrentHashMap和ConcurrentSkipListMap,及相關知識點
    2023-09-09
  • Java 基礎:string中的compareTo方法

    Java 基礎:string中的compareTo方法

    這篇文章主要介紹了Java 基礎:string中的compareTo方法,文章圍繞string中的compareTo方法的相關資料展開文章詳細內容,希望對待大家有所幫助
    2021-12-12
  • Kotlin中l(wèi)et、run、with、apply及also的用法和差別

    Kotlin中l(wèi)et、run、with、apply及also的用法和差別

    作用域函數(shù)是Kotlin比較重要的一個特性,分為5種let、run、with、apply及also,這五個函數(shù)的工作方式非常相似,但是我們需要了解這5種函數(shù)的差異,以便在不同的場景更好的利用它,這篇文章主要介紹了Kotlin中l(wèi)et、run、with、apply及also的差別,需要的朋友可以參考下
    2023-11-11

最新評論