Java并發(fā)編程之Condition源碼分析(推薦)
Condition介紹
上篇文章講了ReentrantLock的加鎖和釋放鎖的使用,這篇文章是對ReentrantLock的補充。ReentrantLock#newCondition()可以創(chuàng)建Condition,在ReentrantLock加鎖過程中可以利用Condition阻塞當前線程并臨時釋放鎖,待另外線程獲取到鎖并在邏輯后通知阻塞線程"激活"。Condition常用在基于異步通信的同步機制實現(xiàn)中,比如dubbo中的請求和獲取應答結果的實現(xiàn)。
常用方法
Condition中主要的方法有2個
- (1)await()方法可以阻塞當前線程,并釋放鎖。
- (2)在獲取鎖后可以調用signal()通知被await()阻塞的線程"激活"。
這里的await(),signal()必須在ReentrantLock#lock()和ReentrantLock#unlock()之間調用。
Condition實現(xiàn)分析
Condition的實現(xiàn)也是利用AbstractQueuedSynchronizer隊列來實現(xiàn),await()在被調用后先將當前線程加入到等待隊列中,然后釋放鎖,最后阻塞當前線程。signal()在被調用后會先獲取等待隊列中第一個節(jié)點,并將這個節(jié)點轉化成ReentrantLock中的節(jié)點并加入到同步阻塞隊列的結尾,這樣此節(jié)點的上個節(jié)點線程釋放鎖后會激活此節(jié)點線程取來獲取鎖。
await()方法源碼分析
await()源碼如下
public final void await() throws InterruptedException { //判斷是否當前線程是否被中斷中斷則拋出中斷異常 if (Thread.interrupted()) throw new InterruptedException(); //加入等待隊列 Node node = addConditionWaiter(); //釋放當前線程鎖 int savedState = fullyRelease(node); int interruptMode = 0; //判斷是否在同步阻塞隊列,如果不在一直循環(huán)到被加入 while (!isOnSyncQueue(node)) { //阻塞當前線程 LockSupport.park(this); //判斷是否被中斷 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //獲取鎖,如果獲取中被中斷則設置中斷狀態(tài) if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; //清除等待隊列中被"激活"的節(jié)點 if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); //如果當前線程被中斷,處理中斷邏輯 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
主要分以下幾步
- (1)先判斷是否當前線程是否被中斷中斷則拋出中斷異常如果未中斷調用addConditionWaiter()加入等待隊列
- (2)調用fullyRelease(node)釋放鎖使同步阻塞隊列的下個節(jié)點線程能獲取鎖。
- (3)調用isOnSyncQueue(node)判斷是否在同步阻塞隊列,這里的加入同步阻塞隊列操作是在另一個線程調用signal()后加入,如果不在同步阻塞隊列會進行阻塞直到被激活。
- (4)如果被激活然后調用checkInterruptWhileWaiting(node)判斷是否被中斷并獲取中斷模式。
- (5)繼續(xù)調用isOnSyncQueue(node)判斷是否在同步阻塞隊列。
- (6)是則調用acquireQueued(node, savedState) 獲取鎖,這里如果獲取不到也會被阻塞,獲取不到原因是在第一次調用isOnSyncQueue(node)前,可能另一個線程已經(jīng)調用signal()后加入到同步阻塞隊列,然后調用acquireQueued(node, savedState) 獲取不到鎖并阻塞。acquireQueued(node, savedState)也會返回當前線程是否被中斷,如果被中斷設置中斷模式。
- (7)在激活后調用unlinkCancelledWaiters()清理等待隊列的已經(jīng)被激活的節(jié)點。
- (8)最后判斷當前線程是否被中斷,如果被中斷則對中斷線程做處理。
下面來看下addConditionWaiter()實現(xiàn)
private Node addConditionWaiter() { //獲取等待隊列尾部節(jié)點 Node t = lastWaiter; //如果尾部狀態(tài)不為CONDITION,如果已經(jīng)被"激活",清理之,然后重新獲取尾部節(jié)點 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } //創(chuàng)建以當前線程為基礎的節(jié)點,并將節(jié)點模式設置成CONDITION Node node = new Node(Thread.currentThread(), Node.CONDITION); //如果尾節(jié)點不存在,說明隊列為空,將頭節(jié)點設置成當前節(jié)點 if (t == null) firstWaiter = node; //如果尾節(jié)點存在,將此節(jié)點設置成尾節(jié)點的下個節(jié)點 else t.nextWaiter = node; //將尾節(jié)點設置成當前節(jié)點 lastWaiter = node; return node; }
addConditionWaiter()的邏輯很簡單,就是創(chuàng)建以當前線程為基礎的節(jié)點并把節(jié)點加入等待隊列的尾部待其他線程處理。
下面來看下fullyRelease(Node node)實現(xiàn)
final int fullyRelease(Node node) { boolean failed = true; try { //獲取阻塞隊列中當前線程節(jié)點的鎖狀態(tài)值 int savedState = getState(); //釋放當前線程節(jié)點鎖 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { //釋放失敗講節(jié)點等待狀態(tài)設置成關閉 if (failed) node.waitStatus = Node.CANCELLED; } }
調用getState()先獲取阻塞隊列中當前線程節(jié)點的鎖狀態(tài)值,這個值可能大于1表示多次重入,然后調用release(savedState)釋放所有鎖,如果釋放成功返回鎖狀態(tài)值。
下面來看下isOnSyncQueue(Node node)實現(xiàn)
final boolean isOnSyncQueue(Node node) { //判斷當前節(jié)點是否是CONDITION或者前置節(jié)點是否為空如果為空直接返回false if (node.waitStatus == Node.CONDITION || node.prev == null) return false; //如果下個節(jié)點存在,則在同步阻塞隊列中返回true if (node.next != null) // If has successor, it must be on queue return true; //遍歷查找當前節(jié)點是否在同步阻塞隊列中 return findNodeFromTail(node); } private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } }
此方法的功能是查找當前節(jié)點是否在同步阻塞隊列中,方法先是快速判斷,判斷不了再進行遍歷查找。
- (1)第一步先判斷次節(jié)點是否CONDITION狀態(tài)或者前置節(jié)點是否存在,如果是表明不在隊列中返回false,阻塞隊列中的狀態(tài)一般是0或者SIGNAL狀態(tài)而且如果當前如果當前節(jié)點在隊列阻塞中且未被激活前置節(jié)點一定不為空。
- (2)第二步判斷節(jié)點的下個節(jié)點是否存在,如果存在則表明當前當前節(jié)點已加入到阻塞隊列中。
- (3)如果以上2點都沒法判斷,也有可能剛剛加入到同步阻塞隊列中,所以調用findNodeFromTail(Node node)做最后的遍歷查找。查找從隊列尾部開始查,從尾部開始查的原因是可能剛剛加入到同步阻塞隊列中,從尾部能快速定位。
下面看下checkInterruptWhileWaiting(Node node)實現(xiàn)
private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } final boolean transferAfterCancelledWait(Node node) { if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { enq(node); return true; } while (!isOnSyncQueue(node)) Thread.yield(); return false; }
此方法在線程被激活后被調用,主要功能就是判斷被激活的線程是否被中斷。此方法會返回2種中斷狀態(tài)THROW_IE和REINTERRUPT,THROW_IE是調用signal()前被中斷返回,REINTERRUPT在調用signal()后被中斷返回。 此方法先判斷是否被標記中斷,是的話再調用transferAfterCancelledWait(node)取判斷是那種中斷狀態(tài),transferAfterCancelledWait(node)方法分2步
- (1)用CAS方式將節(jié)點狀態(tài)改錯等待狀態(tài)改成CONDITION,并加入到同步阻塞隊列中返回true
- (2)如果不能加入到同步阻塞隊列就自旋一直等待加入
如果使用await()方法上面2步其實是沒什么作用其最后一定會返回false,因為await()被激活只能調用 signal()方法,而signal()方法肯定已經(jīng)將節(jié)點加入到同步阻塞隊列中。所以以上邏輯是給await(long time, TimeUnit unit)等帶超時激活方法用的。
acquireQueued(node, savedState)方法再上一章節(jié)已經(jīng)講過這邊就不重復了,下面分析下unlinkCancelledWaiters()方法
private void unlinkCancelledWaiters() { //獲取等待隊列頭節(jié)點 Node t = firstWaiter; Node trail = null; while (t != null) { //獲取下個節(jié)點 Node next = t.nextWaiter; //如果狀態(tài)不為CONDITION說明已經(jīng)加入阻塞隊列需要清理掉 if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else //獲取下個節(jié)點 trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } }
此方法就是從頭開始查找狀態(tài)不為CONDITION的節(jié)點并清理,狀態(tài)不為CONDITION節(jié)點說明此節(jié)點已經(jīng)加入到阻塞隊列,已經(jīng)不需要維護。
下面來看下reportInterruptAfterWait(interruptMode)方法
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { //如果是THROW_IE模式直接拋出異常 if (interruptMode == THROW_IE) throw new InterruptedException(); //如果是REINTERRUPT模式標記線程中斷由上層處理中斷 else if (interruptMode == REINTERRUPT) selfInterrupt(); }
此方法處理中斷邏輯。如果是THROW_IE模式直接拋出異常,如果是REINTERRUPT模式標記線程中斷由上層處理中斷。
signal()方法源碼分析
signal()源碼如下
public final void signal() { //是否當前線程持有鎖 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; //通知"激活"頭節(jié)點線程 if (first != null) doSignal(first); }
先調用isHeldExclusively()判斷鎖是否被當前線程持有,然后檢查等待隊列是否為空,不為空就是可以取第一個節(jié)點調用doSignal(first)去"激活",這里激活不是真正的激活而只是將節(jié)點加入到同步阻塞隊列尾部,所以上下文中帶""的激活都是這種解釋。
下面看下isHeldExclusively()實現(xiàn)
protected final boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); }
實現(xiàn)就是比較下當前線程和持有鎖的線程是否同一個
下面看下doSignal(first)的實現(xiàn)
private void doSignal(Node first) { do { //頭指頭后移一位,如果后面的節(jié)點為空,則將尾指頭也指向空,說明隊列為空了 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; //清空頭節(jié)點的下個節(jié)點 first.nextWaiter = null; //如果"激活"失敗者取下個繼續(xù),直到成功或者遍歷完 } while (!transferForSignal(first) && (first = firstWaiter) != null); }
此方法就是取當前頭節(jié)點一直去嘗試"激活",直到成功或者遍歷完。
下面來看下transferForSignal(first)方法
final boolean transferForSignal(Node node) { //將CONDITION狀態(tài)設置成0 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; //加入到同步阻塞隊列 Node p = enq(node); int ws = p.waitStatus; //狀態(tài)異常直接激活 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
(1)此方法先先將CONDITION狀態(tài)設置成0,因為如果是CONDITION狀態(tài)加入到同步阻塞隊列,激活的時候是不識別的。
(2)加入到同步阻塞隊列的尾部。所以同步阻塞隊列中前面如果有多個在排隊,調用unlock()不會馬上激活此節(jié)點。
(3)狀態(tài)異常直接調用unpark激活,這邊按理說如果狀態(tài)異常情況下激活,await()在調用unlock()被激活后會進行相應的異常處理,但看await()代碼沒有處理則是正常執(zhí)行。
這個方法主要就是把節(jié)點加入到同步阻塞隊列的,真正的激活則是調用unlock()去處理。
以上所述是小編給大家介紹的Java并發(fā)編程之Condition源碼分析詳解整合,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時回復大家的。在此也非常感謝大家對腳本之家網(wǎng)站的支持!
相關文章
Java基于Guava Retrying實現(xiàn)重試功能
這篇文章主要介紹了Java基于Guava Retrying實現(xiàn)重試功能,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-07-07詳解Java編譯優(yōu)化之循環(huán)展開和粗化鎖
之前在講JIT的時候,有提到在編譯過程中的兩種優(yōu)化循環(huán)展開和粗化鎖,今天從Assembly的角度來驗證一下這兩種編譯優(yōu)化方法,快來看看吧。2021-06-06Spring攔截器之HandlerInterceptor使用方式
這篇文章主要介紹了Spring攔截器之HandlerInterceptor使用方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-08-08Spring?Boot如何配置yml配置文件定義集合、數(shù)組和Map
這篇文章主要介紹了Spring?Boot?優(yōu)雅配置yml配置文件定義集合、數(shù)組和Map,包括Spring?Boot?yml配置文件定義基本數(shù)據(jù)類型和引用數(shù)據(jù)類型的方式,需要的朋友可以參考下2023-10-10SpringBoot中使用Jsoup爬取網(wǎng)站數(shù)據(jù)的方法
這篇文章主要介紹了SpringBoot中使用Jsoup爬取網(wǎng)站數(shù)據(jù)的方法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-06-06Java Web監(jiān)聽器如何實現(xiàn)定時發(fā)送郵件
這篇文章主要介紹了Java Web監(jiān)聽器如何實現(xiàn)定時發(fā)送郵件,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-12-12SpringBoot實現(xiàn)設置動態(tài)定時任務的方法詳解
這篇文章主要介紹了SpringBoot實現(xiàn)設置動態(tài)定時任務的方法詳解,SpringBoot是一個快速開發(fā)的Java框架,而動態(tài)定時任務是指可以在運行時動態(tài)添加、修改和刪除定時任務的功能,需要的朋友可以參考下2023-10-10