Java多線程之條件對象Condition
1 簡介
Condition
中的await()
方法相當于Object
的wait()
方法,Condition
中的signal()
方法相當于Object
的notify()
方法,Condition
中的signalAll()
相當于Object
的notifyAll()
方法。
不同的是,Object
中的wait()
,notify()
,notifyAll()
方法是和"同步鎖"(synchronized
關鍵字)捆綁使用的;而Condition
是需要與"互斥鎖"/"共享鎖"捆綁使用的。
2 Condition的實現(xiàn)分析
Condition
是同步器AbstractQueuedSynchronized
的內部類,因為Condition
的操作需要獲取相關的鎖,所以作為同步器的內部類比較合理。每個Condition
對象都包含著一個隊列(等待隊列),該隊列是Condition
對象實現(xiàn)等待/通知功能的關鍵。
等待隊列
等待隊列是一個FIFO
的隊列,隊列的每一個節(jié)點都包含了一個線程引用,該線程就是在Condition對象上等待的線程,如果一個線程調用了await()
方法,該線程就會釋放鎖、構造成節(jié)點進入等待隊列并進入等待狀態(tài)。
這里的節(jié)點定義也就是AbstractQueuedSynchronizer.Node
的定義。
一個Condition
包含一個等待隊列,Condition
擁有首節(jié)點(firstWaiter
)和尾節(jié)點(lastWaiter
)。當前線程調用Condition.await()
方法時,將會以當前線程構造節(jié)點,并將節(jié)點從尾部加入等待隊列。
在Object
的監(jiān)視器模型上,一個對象擁有一個同步隊列和等待隊列,而Lock
(同步器)擁有一個同步隊列和多個等待隊列。
等待(await):AbstractQueuedLongSynchronizer中實現(xiàn)
調用Condition
的await()
方法,會使當前線程進入等待隊列并釋放鎖,同時線程狀態(tài)變?yōu)榈却隣顟B(tài)。
從隊列的角度來看,相當于同步隊列的首節(jié)點(獲取了鎖的節(jié)點)移動到Condition
的等待隊列中。
當?shù)却犃兄械墓?jié)點被喚醒,則喚醒節(jié)點的線程開始嘗試獲取同步狀態(tài)。如果不是通過Condition.signal()
方法喚醒,而是對等待線程進行中斷,則拋出InterruptedException
。
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); long savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
Condition等待通知的本質
總的來說,Condition
的本質就是等待隊列和同步隊列的交互:
當一個持有鎖的線程調用Condition.await()時,它會執(zhí)行以下步驟:
構造一個新的等待隊列節(jié)點加入到等待隊列隊尾
釋放鎖,也就是將它的同步隊列節(jié)點從同步隊列隊首移除
自旋,直到它在等待隊列上的節(jié)點移動到了同步隊列(通過其他線程調用signal()
)或被中斷
阻塞當前節(jié)點,直到它獲取到了鎖,也就是它在同步隊列上的節(jié)點排隊排到了隊首。
當一個持有鎖的線程調用Condition.signal()
時,它會執(zhí)行以下操作:
從等待隊列的隊首開始,嘗試對隊首節(jié)點執(zhí)行喚醒操作;如果節(jié)點CANCELLED
,就嘗試喚醒下一個節(jié)點;如果再CANCELLED
則繼續(xù)迭代。
對每個節(jié)點執(zhí)行喚醒操作時,首先將節(jié)點加入同步隊列,此時await()操作的步驟3的解鎖條件就已經開啟了。
然后分兩種情況討論:
如果先驅節(jié)點的狀態(tài)為CANCELLED(>0)
或設置先驅節(jié)點的狀態(tài)為SIGNAL
失敗,那么就立即喚醒當前節(jié)點對應的線程,此時await()方法就會完成步驟3,進入步驟4.
如果成功把先驅節(jié)點的狀態(tài)設置為了SIGNAL
,那么就不立即喚醒了。等到先驅節(jié)點成為同步隊列首節(jié)點并釋放了同步狀態(tài)后,會自動喚醒當前節(jié)點對應線程的,這時候await()
的步驟3才執(zhí)行完成,而且有很大概率快速完成步驟4.
通知(signal):AbstractQueuedLongSynchronizer中實現(xiàn)
調用Condition
的signal()
方法,將會喚醒在等待隊列中等待時間最長的節(jié)點(首節(jié)點),在喚醒節(jié)點之前,會將節(jié)點移到同步隊列中。
Condition
的signalAll()
方法,相當于對等待隊列中的每個節(jié)點均執(zhí)行一次signal()
方法,將等待隊列中的節(jié)點全部移動到同步隊列中,并喚醒每個節(jié)點的線程。
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); }
最后還要注意,Java
中有 signal
和 signalAll
兩種方法,signal
是隨機解除一個等待集中的線程的阻塞狀態(tài),signalAll
是解除所有等待集中的線程的阻塞狀態(tài)。signal
方法的效率會比 signalAll
高,但是它存在危險,因為它一次只解除一個線程的阻塞狀態(tài),因此,如果等待集中有多個線程都滿足了條件,也只能喚醒一個,其他的線程可能會導致死鎖
3 Condition 實例
消費生產者模式:
public class ConditionTest { public static void main(String[] args) { // 倉庫 Depot depot = new Depot(100); // 消費者 Consumer consumer = new Consumer(depot); // 生產者 Produce produce = new Produce(depot); produce.produceThing(5); consumer.consumerThing(5); produce.produceThing(2); consumer.consumerThing(5); produce.produceThing(3); } } class Depot { private int capacity; private int size; private Lock lock; private Condition consumerCond; private Condition produceCond; public Depot(int capacity) { this.capacity = capacity; this.size = 0; this.lock = new ReentrantLock(); this.consumerCond = lock.newCondition(); this.produceCond = lock.newCondition(); } public void produce(int val) { lock.lock(); try { int left = val; while (left > 0) { while (size >= capacity) { produceCond.await(); } int produce = (left+size) > capacity ? (capacity-size) : left; size += produce; left -= produce; System.out.println(Thread.currentThread().getName() + ", ProduceVal=" + val + ", produce=" + produce + ", size=" + size); consumerCond.signalAll(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void consumer(int val) { lock.lock(); try { int left = val; while (left > 0) { while (size <= 0) { consumerCond.await(); } int consumer = (size <= left) ? size : left; size -= consumer; left -= consumer; System.out.println(Thread.currentThread().getName() + ", ConsumerVal=" + val + ", consumer=" + consumer + ", size=" + size); produceCond.signalAll(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } class Consumer { private Depot depot; public Consumer(Depot depot) { this.depot = depot; } public void consumerThing(final int amount) { new Thread(new Runnable() { public void run() { depot.consumer(amount); } }).start(); } } class Produce { private Depot depot; public Produce(Depot depot) { this.depot = depot; } public void produceThing(final int amount) { new Thread(new Runnable() { public void run() { depot.produce(amount); } }).start(); } }
Thread-0, ProduceVal=5, produce=5, size=5
Thread-1, ConsumerVal=5, consumer=5, size=0
Thread-2, ProduceVal=2, produce=2, size=2
Thread-3, ConsumerVal=5, consumer=2, size=0
Thread-4, ProduceVal=3, produce=3, size=3
Thread-3, ConsumerVal=5, consumer=3, size=0
輸出結果中,Thread-3
出現(xiàn)兩次,就是因為要消費5個產品,但倉庫中只有2個產品,所以先將庫存的2個產品全部消費,然后這個線程進入等待隊列,等待生產,隨后生產出了3個產品,生產者生產后又執(zhí)行signalAll
方法將等待隊列中所有的線程都喚醒,Thread-3
繼續(xù)消費還需要的3個產品。
三個線程依次打印ABC
class Business { private Lock lock = new ReentrantLock(); private Condition conditionA = lock.newCondition(); private Condition conditionB = lock.newCondition(); private Condition conditionC = lock.newCondition(); private String type = "A"; //內部狀態(tài) /* * 方法的基本要求為: * 1、該方法必須為原子的。 * 2、當前狀態(tài)必須滿足條件。若不滿足,則等待;滿足,則執(zhí)行業(yè)務代碼。 * 3、業(yè)務執(zhí)行完畢后,修改狀態(tài),并喚醒指定條件下的線程。 */ public void printA() { lock.lock(); //鎖,保證了線程安全。 try { while (type != "A") { //type不為A, try { conditionA.await(); //將當前線程阻塞于conditionA對象上,將被阻塞。 } catch (InterruptedException e) { e.printStackTrace(); } } //type為A,則執(zhí)行。 System.out.println(Thread.currentThread().getName() + " 正在打印A"); type = "B"; //將type設置為B。 conditionB.signal(); //喚醒在等待conditionB對象上的一個線程。將信號傳遞出去。 } finally { lock.unlock(); //解鎖 } } public void printB() { lock.lock(); //鎖 try { while (type != "B") { //type不為B, try { conditionB.await(); //將當前線程阻塞于conditionB對象上,將被阻塞。 } catch (InterruptedException e) { e.printStackTrace(); } } //type為B,則執(zhí)行。 System.out.println(Thread.currentThread().getName() + " 正在打印B"); type = "C"; //將type設置為C。 conditionC.signal(); //喚醒在等待conditionC對象上的一個線程。將信號傳遞出去。 } finally { lock.unlock(); //解鎖 } } public void printC() { lock.lock(); //鎖 try { while (type != "C") { try { conditionC.await(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + " 正在打印C"); type = "A"; conditionA.signal(); } finally { lock.unlock(); //解鎖 } } } public class ConditionTest{ public static void main(String[] args) { final Business business = new Business();//業(yè)務對象。 //線程1號,打印10次A。 Thread ta = new Thread(new Runnable() { @Override public void run() { for(int i=0;i<10;i++){ business.printA(); } } }); //線程2號,打印10次B。 Thread tb = new Thread(new Runnable() { @Override public void run() { for(int i=0;i<10;i++){ business.printB(); } } }); //線程3號,打印10次C。 Thread tc = new Thread(new Runnable() { @Override public void run() { for(int i=0;i<10;i++){ business.printC(); } } }); //執(zhí)行3條線程。 ta.start(); tb.start(); tc.start(); } }
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
虛假喚醒
所謂"虛假喚醒",即其他地方的代碼觸發(fā)了condition.signal()
,喚醒condition
上等待的線程。但被喚醒的線程仍然不滿足執(zhí)行條件。
condition
通常與條件語句一起使用:
if(!條件){ condition.await(); //不滿足條件,當前線程等待; }
更好的方法是使用while:
while(!條件){ condition.await(); //不滿足條件,當前線程等待; }
在等待Condition
時,允許發(fā)生"虛假喚醒",這通常作為對基礎平臺語義的讓步。若使用"if(!條件)"則被"虛假喚醒"的線程可能繼續(xù)執(zhí)行。所以"while
(!條件)"可以防止"虛假喚醒"。建議總是假定這些"虛假喚醒"可能發(fā)生,因此總是在一個循環(huán)中等待。
4、總結
如果知道Object
的等待通知機制,Condition
的使用是比較容易掌握的,因為和Object
等待通知的使用基本一致。
對Condition
的源碼理解,主要就是理解等待隊列,等待隊列可以類比同步隊列,而且等待隊列比同步隊列要簡單,因為等待隊列是單向隊列,同步隊列是雙向隊列。
以下是筆者對等待隊列是單向隊列、同步隊列是雙向隊列的一些思考,歡迎提出不同意見:
之所以同步隊列要設計成雙向的,是因為在同步隊列中,節(jié)點喚醒是接力式的,由每一個節(jié)點喚醒它的下一個節(jié)點,如果是由next指針獲取下一個節(jié)點,是有可能獲取失敗的,因為虛擬隊列每添加一個節(jié)點,是先用CAS把tail設置為新節(jié)點,然后才修改原tail的next指針到新節(jié)點的。因此用next向后遍歷是不安全的,但是如果在設置新節(jié)點為tail前,為新節(jié)點設置prev,則可以保證從tail往前遍歷是安全的。因此要安全的獲取一個節(jié)點Node的下一個節(jié)點,先要看next是不是null,如果是null,還要從tail往前遍歷看看能不能遍歷到Node。
而等待隊列就簡單多了,等待的線程就是等待者,只負責等待,喚醒的線程就是喚醒者,只負責喚醒,因此每次要執(zhí)行喚醒操作的時候,直接喚醒等待隊列的首節(jié)點就行了。等待隊列的實現(xiàn)中不需要遍歷隊列,因此也不需要prev
指針。
到此這篇關于Java多線程之條件對象Condition的文章就介紹到這了,更多相關Java多線程 Condition內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
java實現(xiàn)微信公眾平臺發(fā)送模板消息的示例代碼
這篇文章主要介紹了java實現(xiàn)微信公眾平臺發(fā)送模板消息的示例代碼,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-09-09Java實現(xiàn)獲取銀行卡所屬銀行,驗證銀行卡號是否正確的方法詳解
這篇文章主要介紹了Java實現(xiàn)獲取銀行卡所屬銀行,驗證銀行卡號是否正確的方法,結合實例形式詳細分析了java判斷銀行卡歸屬地及有效性的原理與相關實現(xiàn)技巧,需要的朋友可以參考下2019-09-09SpringBoot使用異步線程池實現(xiàn)生產環(huán)境批量數(shù)據(jù)推送
本文主要介紹了SpringBoot使用異步線程池實現(xiàn)生產環(huán)境批量數(shù)據(jù)推送,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2022-02-02Java GZip 基于磁盤實現(xiàn)壓縮和解壓的方法
這篇文章主要介紹了Java GZip 基于磁盤實現(xiàn)壓縮和解壓,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考需要的朋友可以參考下2020-08-08Java實現(xiàn)的properties文件動態(tài)修改并自動保存工具類
這篇文章主要介紹了Java實現(xiàn)的properties文件動態(tài)修改并自動保存工具類,可實現(xiàn)針對properties配置文件的相關修改與保存功能,需要的朋友可以參考下2017-11-11SpringApplicationRunListener監(jiān)聽器源碼詳解
這篇文章主要介紹了SpringApplicationRunListener監(jiān)聽器源碼詳解,springboot提供了兩個類SpringApplicationRunListeners、SpringApplicationRunListener(EventPublishingRunListener),spring框架還提供了一個ApplicationListener接口,需要的朋友可以參考下2023-11-11