Java多線程之條件對(duì)象Condition
1 簡(jiǎn)介
Condition
中的await()
方法相當(dāng)于Object
的wait()
方法,Condition
中的signal()
方法相當(dāng)于Object
的notify()
方法,Condition
中的signalAll()
相當(dāng)于Object
的notifyAll()
方法。
不同的是,Object
中的wait()
,notify()
,notifyAll()
方法是和"同步鎖"(synchronized
關(guān)鍵字)捆綁使用的;而Condition
是需要與"互斥鎖"/"共享鎖"捆綁使用的。
2 Condition的實(shí)現(xiàn)分析
Condition
是同步器AbstractQueuedSynchronized
的內(nèi)部類,因?yàn)?code>Condition的操作需要獲取相關(guān)的鎖,所以作為同步器的內(nèi)部類比較合理。每個(gè)Condition
對(duì)象都包含著一個(gè)隊(duì)列(等待隊(duì)列),該隊(duì)列是Condition
對(duì)象實(shí)現(xiàn)等待/通知功能的關(guān)鍵。
等待隊(duì)列
等待隊(duì)列是一個(gè)FIFO
的隊(duì)列,隊(duì)列的每一個(gè)節(jié)點(diǎn)都包含了一個(gè)線程引用,該線程就是在Condition對(duì)象上等待的線程,如果一個(gè)線程調(diào)用了await()
方法,該線程就會(huì)釋放鎖、構(gòu)造成節(jié)點(diǎn)進(jìn)入等待隊(duì)列并進(jìn)入等待狀態(tài)。
這里的節(jié)點(diǎn)定義也就是AbstractQueuedSynchronizer.Node
的定義。
一個(gè)Condition
包含一個(gè)等待隊(duì)列,Condition
擁有首節(jié)點(diǎn)(firstWaiter
)和尾節(jié)點(diǎn)(lastWaiter
)。當(dāng)前線程調(diào)用Condition.await()
方法時(shí),將會(huì)以當(dāng)前線程構(gòu)造節(jié)點(diǎn),并將節(jié)點(diǎn)從尾部加入等待隊(duì)列。
在Object
的監(jiān)視器模型上,一個(gè)對(duì)象擁有一個(gè)同步隊(duì)列和等待隊(duì)列,而Lock
(同步器)擁有一個(gè)同步隊(duì)列和多個(gè)等待隊(duì)列。
等待(await):AbstractQueuedLongSynchronizer中實(shí)現(xiàn)
調(diào)用Condition
的await()
方法,會(huì)使當(dāng)前線程進(jìn)入等待隊(duì)列并釋放鎖,同時(shí)線程狀態(tài)變?yōu)榈却隣顟B(tài)。
從隊(duì)列的角度來(lái)看,相當(dāng)于同步隊(duì)列的首節(jié)點(diǎn)(獲取了鎖的節(jié)點(diǎn))移動(dòng)到Condition
的等待隊(duì)列中。
當(dāng)?shù)却?duì)列中的節(jié)點(diǎn)被喚醒,則喚醒節(jié)點(diǎn)的線程開(kāi)始嘗試獲取同步狀態(tài)。如果不是通過(guò)Condition.signal()
方法喚醒,而是對(duì)等待線程進(jìn)行中斷,則拋出InterruptedException
。
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); long savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
Condition等待通知的本質(zhì)
總的來(lái)說(shuō),Condition
的本質(zhì)就是等待隊(duì)列和同步隊(duì)列的交互:
當(dāng)一個(gè)持有鎖的線程調(diào)用Condition.await()時(shí),它會(huì)執(zhí)行以下步驟:
構(gòu)造一個(gè)新的等待隊(duì)列節(jié)點(diǎn)加入到等待隊(duì)列隊(duì)尾
釋放鎖,也就是將它的同步隊(duì)列節(jié)點(diǎn)從同步隊(duì)列隊(duì)首移除
自旋,直到它在等待隊(duì)列上的節(jié)點(diǎn)移動(dòng)到了同步隊(duì)列(通過(guò)其他線程調(diào)用signal()
)或被中斷
阻塞當(dāng)前節(jié)點(diǎn),直到它獲取到了鎖,也就是它在同步隊(duì)列上的節(jié)點(diǎn)排隊(duì)排到了隊(duì)首。
當(dāng)一個(gè)持有鎖的線程調(diào)用Condition.signal()
時(shí),它會(huì)執(zhí)行以下操作:
從等待隊(duì)列的隊(duì)首開(kāi)始,嘗試對(duì)隊(duì)首節(jié)點(diǎn)執(zhí)行喚醒操作;如果節(jié)點(diǎn)CANCELLED
,就嘗試喚醒下一個(gè)節(jié)點(diǎn);如果再CANCELLED
則繼續(xù)迭代。
對(duì)每個(gè)節(jié)點(diǎn)執(zhí)行喚醒操作時(shí),首先將節(jié)點(diǎn)加入同步隊(duì)列,此時(shí)await()操作的步驟3的解鎖條件就已經(jīng)開(kāi)啟了。
然后分兩種情況討論:
如果先驅(qū)節(jié)點(diǎn)的狀態(tài)為CANCELLED(>0)
或設(shè)置先驅(qū)節(jié)點(diǎn)的狀態(tài)為SIGNAL
失敗,那么就立即喚醒當(dāng)前節(jié)點(diǎn)對(duì)應(yīng)的線程,此時(shí)await()方法就會(huì)完成步驟3,進(jìn)入步驟4.
如果成功把先驅(qū)節(jié)點(diǎn)的狀態(tài)設(shè)置為了SIGNAL
,那么就不立即喚醒了。等到先驅(qū)節(jié)點(diǎn)成為同步隊(duì)列首節(jié)點(diǎn)并釋放了同步狀態(tài)后,會(huì)自動(dòng)喚醒當(dāng)前節(jié)點(diǎn)對(duì)應(yīng)線程的,這時(shí)候await()
的步驟3才執(zhí)行完成,而且有很大概率快速完成步驟4.
通知(signal):AbstractQueuedLongSynchronizer中實(shí)現(xiàn)
調(diào)用Condition
的signal()
方法,將會(huì)喚醒在等待隊(duì)列中等待時(shí)間最長(zhǎng)的節(jié)點(diǎn)(首節(jié)點(diǎn)),在喚醒節(jié)點(diǎn)之前,會(huì)將節(jié)點(diǎn)移到同步隊(duì)列中。
Condition
的signalAll()
方法,相當(dāng)于對(duì)等待隊(duì)列中的每個(gè)節(jié)點(diǎn)均執(zhí)行一次signal()
方法,將等待隊(duì)列中的節(jié)點(diǎn)全部移動(dòng)到同步隊(duì)列中,并喚醒每個(gè)節(jié)點(diǎn)的線程。
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); }
最后還要注意,Java
中有 signal
和 signalAll
兩種方法,signal
是隨機(jī)解除一個(gè)等待集中的線程的阻塞狀態(tài),signalAll
是解除所有等待集中的線程的阻塞狀態(tài)。signal
方法的效率會(huì)比 signalAll
高,但是它存在危險(xiǎn),因?yàn)樗淮沃唤獬粋€(gè)線程的阻塞狀態(tài),因此,如果等待集中有多個(gè)線程都滿足了條件,也只能喚醒一個(gè),其他的線程可能會(huì)導(dǎo)致死鎖
3 Condition 實(shí)例
消費(fèi)生產(chǎn)者模式:
public class ConditionTest { public static void main(String[] args) { // 倉(cāng)庫(kù) Depot depot = new Depot(100); // 消費(fèi)者 Consumer consumer = new Consumer(depot); // 生產(chǎn)者 Produce produce = new Produce(depot); produce.produceThing(5); consumer.consumerThing(5); produce.produceThing(2); consumer.consumerThing(5); produce.produceThing(3); } } class Depot { private int capacity; private int size; private Lock lock; private Condition consumerCond; private Condition produceCond; public Depot(int capacity) { this.capacity = capacity; this.size = 0; this.lock = new ReentrantLock(); this.consumerCond = lock.newCondition(); this.produceCond = lock.newCondition(); } public void produce(int val) { lock.lock(); try { int left = val; while (left > 0) { while (size >= capacity) { produceCond.await(); } int produce = (left+size) > capacity ? (capacity-size) : left; size += produce; left -= produce; System.out.println(Thread.currentThread().getName() + ", ProduceVal=" + val + ", produce=" + produce + ", size=" + size); consumerCond.signalAll(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void consumer(int val) { lock.lock(); try { int left = val; while (left > 0) { while (size <= 0) { consumerCond.await(); } int consumer = (size <= left) ? size : left; size -= consumer; left -= consumer; System.out.println(Thread.currentThread().getName() + ", ConsumerVal=" + val + ", consumer=" + consumer + ", size=" + size); produceCond.signalAll(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } class Consumer { private Depot depot; public Consumer(Depot depot) { this.depot = depot; } public void consumerThing(final int amount) { new Thread(new Runnable() { public void run() { depot.consumer(amount); } }).start(); } } class Produce { private Depot depot; public Produce(Depot depot) { this.depot = depot; } public void produceThing(final int amount) { new Thread(new Runnable() { public void run() { depot.produce(amount); } }).start(); } }
Thread-0, ProduceVal=5, produce=5, size=5
Thread-1, ConsumerVal=5, consumer=5, size=0
Thread-2, ProduceVal=2, produce=2, size=2
Thread-3, ConsumerVal=5, consumer=2, size=0
Thread-4, ProduceVal=3, produce=3, size=3
Thread-3, ConsumerVal=5, consumer=3, size=0
輸出結(jié)果中,Thread-3
出現(xiàn)兩次,就是因?yàn)橐M(fèi)5個(gè)產(chǎn)品,但倉(cāng)庫(kù)中只有2個(gè)產(chǎn)品,所以先將庫(kù)存的2個(gè)產(chǎn)品全部消費(fèi),然后這個(gè)線程進(jìn)入等待隊(duì)列,等待生產(chǎn),隨后生產(chǎn)出了3個(gè)產(chǎn)品,生產(chǎn)者生產(chǎn)后又執(zhí)行signalAll
方法將等待隊(duì)列中所有的線程都喚醒,Thread-3
繼續(xù)消費(fèi)還需要的3個(gè)產(chǎn)品。
三個(gè)線程依次打印ABC
class Business { private Lock lock = new ReentrantLock(); private Condition conditionA = lock.newCondition(); private Condition conditionB = lock.newCondition(); private Condition conditionC = lock.newCondition(); private String type = "A"; //內(nèi)部狀態(tài) /* * 方法的基本要求為: * 1、該方法必須為原子的。 * 2、當(dāng)前狀態(tài)必須滿足條件。若不滿足,則等待;滿足,則執(zhí)行業(yè)務(wù)代碼。 * 3、業(yè)務(wù)執(zhí)行完畢后,修改狀態(tài),并喚醒指定條件下的線程。 */ public void printA() { lock.lock(); //鎖,保證了線程安全。 try { while (type != "A") { //type不為A, try { conditionA.await(); //將當(dāng)前線程阻塞于conditionA對(duì)象上,將被阻塞。 } catch (InterruptedException e) { e.printStackTrace(); } } //type為A,則執(zhí)行。 System.out.println(Thread.currentThread().getName() + " 正在打印A"); type = "B"; //將type設(shè)置為B。 conditionB.signal(); //喚醒在等待conditionB對(duì)象上的一個(gè)線程。將信號(hào)傳遞出去。 } finally { lock.unlock(); //解鎖 } } public void printB() { lock.lock(); //鎖 try { while (type != "B") { //type不為B, try { conditionB.await(); //將當(dāng)前線程阻塞于conditionB對(duì)象上,將被阻塞。 } catch (InterruptedException e) { e.printStackTrace(); } } //type為B,則執(zhí)行。 System.out.println(Thread.currentThread().getName() + " 正在打印B"); type = "C"; //將type設(shè)置為C。 conditionC.signal(); //喚醒在等待conditionC對(duì)象上的一個(gè)線程。將信號(hào)傳遞出去。 } finally { lock.unlock(); //解鎖 } } public void printC() { lock.lock(); //鎖 try { while (type != "C") { try { conditionC.await(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + " 正在打印C"); type = "A"; conditionA.signal(); } finally { lock.unlock(); //解鎖 } } } public class ConditionTest{ public static void main(String[] args) { final Business business = new Business();//業(yè)務(wù)對(duì)象。 //線程1號(hào),打印10次A。 Thread ta = new Thread(new Runnable() { @Override public void run() { for(int i=0;i<10;i++){ business.printA(); } } }); //線程2號(hào),打印10次B。 Thread tb = new Thread(new Runnable() { @Override public void run() { for(int i=0;i<10;i++){ business.printB(); } } }); //線程3號(hào),打印10次C。 Thread tc = new Thread(new Runnable() { @Override public void run() { for(int i=0;i<10;i++){ business.printC(); } } }); //執(zhí)行3條線程。 ta.start(); tb.start(); tc.start(); } }
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
虛假喚醒
所謂"虛假喚醒",即其他地方的代碼觸發(fā)了condition.signal()
,喚醒condition
上等待的線程。但被喚醒的線程仍然不滿足執(zhí)行條件。
condition
通常與條件語(yǔ)句一起使用:
if(!條件){ condition.await(); //不滿足條件,當(dāng)前線程等待; }
更好的方法是使用while:
while(!條件){ condition.await(); //不滿足條件,當(dāng)前線程等待; }
在等待Condition
時(shí),允許發(fā)生"虛假喚醒",這通常作為對(duì)基礎(chǔ)平臺(tái)語(yǔ)義的讓步。若使用"if(!條件)"則被"虛假喚醒"的線程可能繼續(xù)執(zhí)行。所以"while
(!條件)"可以防止"虛假喚醒"。建議總是假定這些"虛假喚醒"可能發(fā)生,因此總是在一個(gè)循環(huán)中等待。
4、總結(jié)
如果知道Object
的等待通知機(jī)制,Condition
的使用是比較容易掌握的,因?yàn)楹?code>Object等待通知的使用基本一致。
對(duì)Condition
的源碼理解,主要就是理解等待隊(duì)列,等待隊(duì)列可以類比同步隊(duì)列,而且等待隊(duì)列比同步隊(duì)列要簡(jiǎn)單,因?yàn)榈却?duì)列是單向隊(duì)列,同步隊(duì)列是雙向隊(duì)列。
以下是筆者對(duì)等待隊(duì)列是單向隊(duì)列、同步隊(duì)列是雙向隊(duì)列的一些思考,歡迎提出不同意見(jiàn):
之所以同步隊(duì)列要設(shè)計(jì)成雙向的,是因?yàn)樵谕疥?duì)列中,節(jié)點(diǎn)喚醒是接力式的,由每一個(gè)節(jié)點(diǎn)喚醒它的下一個(gè)節(jié)點(diǎn),如果是由next指針獲取下一個(gè)節(jié)點(diǎn),是有可能獲取失敗的,因?yàn)樘摂M隊(duì)列每添加一個(gè)節(jié)點(diǎn),是先用CAS把tail設(shè)置為新節(jié)點(diǎn),然后才修改原tail的next指針到新節(jié)點(diǎn)的。因此用next向后遍歷是不安全的,但是如果在設(shè)置新節(jié)點(diǎn)為tail前,為新節(jié)點(diǎn)設(shè)置prev,則可以保證從tail往前遍歷是安全的。因此要安全的獲取一個(gè)節(jié)點(diǎn)Node的下一個(gè)節(jié)點(diǎn),先要看next是不是null,如果是null,還要從tail往前遍歷看看能不能遍歷到Node。
而等待隊(duì)列就簡(jiǎn)單多了,等待的線程就是等待者,只負(fù)責(zé)等待,喚醒的線程就是喚醒者,只負(fù)責(zé)喚醒,因此每次要執(zhí)行喚醒操作的時(shí)候,直接喚醒等待隊(duì)列的首節(jié)點(diǎn)就行了。等待隊(duì)列的實(shí)現(xiàn)中不需要遍歷隊(duì)列,因此也不需要prev
指針。
到此這篇關(guān)于Java多線程之條件對(duì)象Condition的文章就介紹到這了,更多相關(guān)Java多線程 Condition內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java實(shí)現(xiàn)微信公眾平臺(tái)發(fā)送模板消息的示例代碼
這篇文章主要介紹了java實(shí)現(xiàn)微信公眾平臺(tái)發(fā)送模板消息的示例代碼,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-09-09Java實(shí)現(xiàn)獲取銀行卡所屬銀行,驗(yàn)證銀行卡號(hào)是否正確的方法詳解
這篇文章主要介紹了Java實(shí)現(xiàn)獲取銀行卡所屬銀行,驗(yàn)證銀行卡號(hào)是否正確的方法,結(jié)合實(shí)例形式詳細(xì)分析了java判斷銀行卡歸屬地及有效性的原理與相關(guān)實(shí)現(xiàn)技巧,需要的朋友可以參考下2019-09-09SpringBoot使用異步線程池實(shí)現(xiàn)生產(chǎn)環(huán)境批量數(shù)據(jù)推送
本文主要介紹了SpringBoot使用異步線程池實(shí)現(xiàn)生產(chǎn)環(huán)境批量數(shù)據(jù)推送,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-02-02Java GZip 基于磁盤(pán)實(shí)現(xiàn)壓縮和解壓的方法
這篇文章主要介紹了Java GZip 基于磁盤(pán)實(shí)現(xiàn)壓縮和解壓,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考需要的朋友可以參考下2020-08-08Java實(shí)現(xiàn)的properties文件動(dòng)態(tài)修改并自動(dòng)保存工具類
這篇文章主要介紹了Java實(shí)現(xiàn)的properties文件動(dòng)態(tài)修改并自動(dòng)保存工具類,可實(shí)現(xiàn)針對(duì)properties配置文件的相關(guān)修改與保存功能,需要的朋友可以參考下2017-11-11SpringApplicationRunListener監(jiān)聽(tīng)器源碼詳解
這篇文章主要介紹了SpringApplicationRunListener監(jiān)聽(tīng)器源碼詳解,springboot提供了兩個(gè)類SpringApplicationRunListeners、SpringApplicationRunListener(EventPublishingRunListener),spring框架還提供了一個(gè)ApplicationListener接口,需要的朋友可以參考下2023-11-11