深入理解Java 線程通信
當線程在系統(tǒng)內運行時,線程的調度具有一定的透明性,程序通常無法準確控制線程的輪換執(zhí)行,但 Java 也提供了一些機制來保證線程協(xié)調運行。
傳統(tǒng)的線程通信
假設現(xiàn)在系統(tǒng)中有兩個線程,這兩個線程分別代表存款者和取錢者——現(xiàn)在假設系統(tǒng)有一種特殊的要求,系統(tǒng)要求存款者和取錢者不斷地重復存款、取錢的動作,而且要求每當存款者將錢存入指定賬戶后,取錢者就立即取出該筆錢。不允許存款者連續(xù)兩次存錢,也不允許取錢者連續(xù)兩次取錢。
為了實現(xiàn)這種功能,可以借助于 Object 類提供的 wait()、 notify() 和 notifyAll() 三個方法,這三個方法并不屬于 Thread 類,而是屬于 Object 類。但這三個方法必須由同步監(jiān)視器對象來調用,這可分成以下兩種情況。
- 對于使用 synchronized 修飾的同步方法,因為該類的默認實例(this)就是同步監(jiān)視器,所以可以在同步方法中直接調用這三個方法。
- 對于使用 synchronized 修飾的同步代碼塊,同步監(jiān)視器是 synchronized 后括號里的對象,所以必須使用該對象調用這三個方法。
關于這三個方法的解釋如下。
- wait():導致當前線程等待,直到其他線程調用該同步監(jiān)視器的 notify() 方法或 notifyAll() 方法來喚醒該線程。該 wait() 方法有三種形式——無時間參數(shù)的 wait (—直等待,直到其他線程通知 )、帶毫秒?yún)?shù)的 wait() 和帶毫秒、毫微秒?yún)?shù)的 wait() (這兩種方法都是等待指定時間后自動蘇醒)。調用 wait() 方法的當前線程會釋放對該同步監(jiān)視器的鎖定。
- notify():喚醒在此同步監(jiān)視器上等待的單個線程。如果所有線程都在此同步監(jiān)視器上等待,則會選擇喚醒其中一個線程。選擇是任意性的。只有當前線程放棄對該同步監(jiān)視器的鎖定后(使用 wait() 方法),才可以執(zhí)行被喚醒的線程。
- notifyAll():喚醒在此同步監(jiān)視器上等待的所有線程。只有當前線程放棄對該同步監(jiān)視器的鎖定后,才可以執(zhí)行被喚醒的線程。
程序中可以通過一個旗標來標識賬戶中是否已有存款,當旗標為 false 時,表明賬戶中沒有存款,存款者線程可以向下執(zhí)行,當存款者把錢存入賬戶后,將旗標設為 true ,并調用 notify() 或 notifyAll() 方法來喚醒其他線程;當存款者線程進入線程體后,如果旗標為 true 就調用 wait() 方法讓該線程等待。
當旗標為 true 時,表明賬戶中已經(jīng)存入了存款,則取錢者線程可以向下執(zhí)行,當取錢者把錢從賬戶中取出后,將旗標設為 false ,并調用 notify() 或 notifyAll() 方法來喚醒其他線程;當取錢者線程進入線程體后,如果旗標為 false 就調用 wait() 方法讓該線程等待。
本程序為 Account 類提供 draw() 和 deposit() 兩個方法,分別對應該賬戶的取錢、存款等操作,因為這兩個方法可能需要并發(fā)修改 Account 類的 balance 成員變量的值,所以這兩個方法都使用 synchronized 修飾成同步方法。除此之外,這兩個方法還使用了 wait() 和 notifyAll() 來控制線程的協(xié)作。
public class Account{ private String accountNo; private double balance; //標識賬戶中是否已有存款的旗標 private boolean flag = false; public Account(){} public Account(String accountNo , double balance){ this.accountNo = accountNo; this.balance = balance; } public void setAccountNo(String accountNo){ this.accountNo = accountNo; } public String getAccountNo(){ return this.accountNo; } public double getBalance(){ return this.balance; } public synchronized void draw(double drawAmount){ try{ //如果flag為假,表明賬戶中還沒有人存錢進去,則取錢方法阻塞 if (!flag){ wait(); }else{ //執(zhí)行取錢 System.out.println(Thread.currentThread().getName() + " 取錢:" + drawAmount); balance -= drawAmount; System.out.println("賬戶余額為:" + balance); //將標識賬戶是否已有存款的旗標設為false。 flag = false; //喚醒其他線程 notifyAll(); } }catch (InterruptedException ex){ ex.printStackTrace(); } } public synchronized void deposit(double depositAmount){ try{ //如果flag為真,表明賬戶中已有人存錢進去,則存錢方法阻塞 if (flag){ // ① wait(); }else{ //執(zhí)行存款 System.out.println(Thread.currentThread().getName() + " 存款:" + depositAmount); balance += depositAmount; System.out.println("賬戶余額為:" + balance); //將表示賬戶是否已有存款的旗標設為true flag = true; //喚醒其他線程 notifyAll(); } }catch (InterruptedException ex){ ex.printStackTrace(); } } public int hashCode(){ return accountNo.hashCode(); } public boolean equals(Object obj){ if (obj != null && obj.getClass() == Account.class){ Account target = (Account)obj; return target.getAccountNo().equals(accountNo); } return false; } }
上面程序中的粗體字代碼使用 wait() 和 notifyAll() 進行了控制,對存款者線程而言,當程序進入 deposit() 方法后,如果 flag 為 true ,則表明賬戶中已有存款,程序調用 wait() 方法阻塞;否則程序向下執(zhí)行存款操作,當存款操作執(zhí)行完成后,系統(tǒng)將 flag 設為 true,然后調用 notifyAll() 來喚醒其他被阻塞的線程——如果系統(tǒng)中有存款者線程,存款者線程也會被喚醒,但該存款者線程執(zhí)行到①號代碼處時再次進入阻塞狀態(tài),只有執(zhí)行 draw() 方法的取錢者線程才可以向下執(zhí)行。同理,取錢者線程的運行流程也是如此。
程序中的存款者線程循環(huán)100次重復存款,而取錢者線程則循環(huán)100次重復取錢,存款者線程和取錢者線程分別調用 Account 對象的 deposit()、 draw() 方法來實現(xiàn)。
public class DrawThread extends Thread{ //模擬用戶賬戶 private Account account; //當前取錢線程所希望取的錢數(shù) private double drawAmount; public DrawThread(String name, Account account, double drawAmount){ super(name); this.account = account; this.drawAmount = drawAmount; } //重復100次執(zhí)行取錢操作 public void run(){ for (int i = 0 ; i < 100 ; i++ ){ account.draw(drawAmount); } } }
public class DepositThread extends Thread{ //模擬用戶賬戶 private Account account; //當前取錢線程所希望存款的錢數(shù) private double depositAmount; public DepositThread(String name, Account account, double depositAmount){ super(name); this.account = account; this.depositAmount = depositAmount; } //重復100次執(zhí)行存款操作 public void run(){ for (int i = 0 ; i < 100 ; i++ ){ account.deposit(depositAmount); } } }
主程序可以啟動任意多個存款線程和取錢線程,可以看到所有的取錢線程必須等存款線程存錢后才可以向下執(zhí)行,而存款線程也必須等取錢線程取錢后才可以向下執(zhí)行。主程序代碼如下。
public class TestDraw{ public static void main(String[] args){ //創(chuàng)建一個賬戶 Account acct = new Account("1234567" , 0); new DrawThread("取錢者" , acct , 800).start(); new DepositThread("存款者甲" , acct , 800).start(); new DepositThread("存款者乙" , acct , 800).start(); new DepositThread("存款者丙" , acct , 800).start(); } }
運行該程序,可以看到存款者線程、取錢者線程交替執(zhí)行的情形,每當存款者向賬戶中存入800元之后,取錢者線程立即從賬戶中取出這筆錢。存款完成后賬戶余額總是800元,取錢結束后賬戶余額總是0元。運行該程序,會看到如下圖所示的結果。
從上圖中可以看出 , 3個存款者線程隨機地向賬戶中存款,只有1個取錢者線程執(zhí)行取錢操作。只有當取錢者取錢后,存款者才可以存款;同理,只有等存款者存款后,取錢者線程才可以取錢。
上圖顯示程序最后被阻塞無法繼續(xù)向下執(zhí)行,這因為3個存款者線程共有300次存款操作,但1個取錢者線程只有100次取錢操作,所以程序最后被阻塞。
注意:上圖所示的阻塞并不是死鎖,對于這種情況,取錢者線程已經(jīng)執(zhí)行結束,而存款者線程只是在等待其他線程來取錢而已,并不是等待其他線程釋放同步監(jiān)視器。不要把死鎖和程序阻塞等同起來!
使用 Condition 控制線程通信
如果程序不使用 synchronized 關鍵字來保證同步,而是直接使用 Lock 對象來保證同步,則系統(tǒng)中不存在隱式的同步監(jiān)視器,也就不能使用 wait()、notify()、notifyAll() 方法進行線程通信了。
當使用 Lock 對象來保證同步時,Java 提供了一個 Condition 類來保持協(xié)調,使用 Condition 可以讓那些已經(jīng)得到 Lock 對象卻無法繼續(xù)執(zhí)行的線程釋放 Lock 對象,Condition 對象也可以喚醒其他處于等待的線程。
Condition 將同步監(jiān)視器方法(wait()、notify() 和 notifyAll() )分解成截然不同的對象,以便通過將這些對象與 Lock 對象組合使用,為每個對象提供多個等待集(wait-set)。在這種情況下,Lock 替代了同步方法或同步代碼塊,Condition 替代了同步監(jiān)視器的功能。
Condition 實例被綁定在一個 Lock 對象上。要獲得特定 Lock 實例的 Condition 實例,調用 Lock 對象的 newCondition() 方法即可。Condition 類提供了如下三個方法。
- await():類似于隱式同步監(jiān)視器上的 wait() 方法,導致當前線程等待,直到其他線程調用該 Condition 的 signal() 方法或 signalAll() 方法來喚醒該線程。該 await() 方法有更多變體,如 long awaitNanos(long nanosTimeout)、 void awaitUninterruptibly() 、 awaitUntil(Date deadline) 等,可以完成更豐富的等待操作。
- signal():喚醒在此 Lock 對象上等待的單個線程。如果所有線程都在該 Lock 對象上等待,則會選擇喚醒其中一個線程。選擇是任意性的。只有當前線程放棄對該 Lock 對象的鎖定后(使用 await() 方法),才可以執(zhí)行被喚醒的線程。
- signalAll():喚醒在此 Lock 對象上等待的所有線程。只有當前線程放棄對該 Lock 對象的鎖定后,才可以執(zhí)行被喚醒的線程。
下面程序中 Account 使用 Lock 對象來控制同步,并使用 Condition 對象來控制線程的協(xié)調運行。
public class Account{ //顯示定義Lock對象 private final Lock lock = new ReentrantLock(); //獲得指定Lock對象對應的條件變量 private final Condition cond = lock.newCondition(); private String accountNo; private double balance; //標識賬戶中是否已經(jīng)存款的旗標 private boolean flag = false; public Account(){} public Account(String accountNo , double balance){ this.accountNo = accountNo; this.balance = balance; } public void setAccountNo(String accountNo){ this.accountNo = accountNo; } public String getAccountNo(){ return this.accountNo; } public double getBalance(){ return this.balance; } public void draw(double drawAmount){ //加鎖 lock.lock(); try{ //如果賬戶中還沒有存入存款,該線程等待 if (!flag){ cond.await(); }else{ //執(zhí)行取錢操作 System.out.println(Thread.currentThread().getName() + " 取錢:" + drawAmount); balance -= drawAmount; System.out.println("賬戶余額為:" + balance); //將標識是否成功存入存款的旗標設為false flag = false; //喚醒該Lock對象對應的其他線程 cond.signalAll(); } }catch (InterruptedException ex){ ex.printStackTrace(); } //使用finally塊來確保釋放鎖 finally{ lock.unlock(); } } public void deposit(double depositAmount){ lock.lock(); try{ //如果賬戶中已經(jīng)存入了存款,該線程等待 if(flag){ cond.await(); }else{ //執(zhí)行存款操作 System.out.println(Thread.currentThread().getName() + " 存款:" + depositAmount); balance += depositAmount; System.out.println("賬戶余額為:" + balance); //將標識是否成功存入存款的旗標設為true flag = true; //喚醒該Lock對象對應的其他線程 cond.signalAll(); } }catch (InterruptedException ex){ ex.printStackTrace(); } //使用finally塊來確保釋放鎖 finally{ lock.unlock(); } } public int hashCode(){ return accountNo.hashCode(); } public boolean equals(Object obj){ if (obj != null && obj.getClass() == Account.class){ Account target = (Account)obj; return target.getAccountNo().equals(accountNo); } return false; } }
顯式地使用 Lock 對象來充當同步監(jiān)視器,則需要使用 Condition 對象來暫停、喚醒指定線程。存取錢的代碼和最上面相同。
使用阻塞隊列(BlockingQueue)控制線程通信
Java5 提供了一個 BlockingQueue 接口,雖然 BlockingQueue 也是 Queue 的子接口,但它的主要用途并不是作為容器,而是作為線程同步的工具。 BlockingQueue 具有一個特征:當生產(chǎn)者線程試圖向 BlockingQueue 中放入元素時,如果該隊列已滿,則該線程被阻塞;當消費者線程試圖從 BlockingQueue 中取出元素時,如果該隊列已空,則該線程被阻塞。
程序的兩個線程通過交替向 BlockingQueue 中放入元素、取出元素,即可很好地控制線程的通信。BlockingQueue 提供如下兩個支持阻塞的方法。
- put(E e):嘗試把 E 元素放入 BlockingQueue 中,如果該隊列的元素己滿,則阻塞該線程。
- take():嘗試從 BlockingQueue 的頭部取出元素,如果該隊列的元素已空,則阻塞該線程。
BlockingQueue 繼承了 Queue 接口,當然也可使用 Queue 接口中的方法。這些方法歸納起來可分為如下三組。
- 在隊列尾部插入元素。包括 add(E e)、offer(E e) 和 put(E e) 方法,當該隊列已滿時,這三個方法分別會拋出異常、返回 false 、阻塞隊列。
- 在隊列頭部刪除并返回刪除的元素。包括 remove()、 poll() 和 take() 方法。當該隊列已空時,這三個方法分別會拋出異常、返回 false 、阻塞隊列。
- 在隊列頭部取出但不刪除元素。包括 element() 和 peek() 方法,當隊列已空時,這兩個方法分別拋出異常、返回 false
BlockingQueue 包含的方法之間的對應關系如下表所示:
BlockingQueue 與其實現(xiàn)類之間的類圖如下圖所示。
上圖中以黑色方框框出的都是 Java7 新增的阻塞隊列??梢钥吹?, BlockingQueue 包含如下5個實現(xiàn)類。
- ArrayBlockingQueue:基于數(shù)組實現(xiàn)的 BlockingQueue 隊列。
- LinkedBlockingQueue:基于鏈表實現(xiàn)的 BlockingQueue 隊列。
- PriorityBlockingQueue:它并不是標準的阻塞隊列。與前面介紹的 PriorityQueue 類似,該隊列調用 remove()、poll()、take() 等方法取出元素時,并不是取出隊列中存在時間最長的元素,而是隊列中最小的元素。 PriorityBlockingQueue 判斷元素的大小即可根據(jù)元素(實現(xiàn) Comparable 接口)的本身大小來自然排序,也可使用 Comparator 進行定制排序。
- SynchronousQueue:同步隊列。對該隊列的存、取操作必須交替進行。
- DelayQueue:它是一個特殊的 BlockingQueue ,底層基于 PriorityBlockingQueue 實現(xiàn)。不過,DelayQueue 要求集合元素都實現(xiàn) Delay 接口(該接口里只有一個 long getDelay() 方法),DelayQueue 根據(jù)集合元素的 getDalay() 方法的返回值進行排序。
下面以 ArrayBlockingQueue 為例介紹阻塞隊列的功能和用法。下面先用一個最簡單的程序來測試 BlockingQueue 的 put() 方法。
public class BlockingQueueTest { public static void main(String[] args) throws Exception { BlockingQueue<String> bq = new ArrayBlockingQueue<>(2); bq.put("Java"); // 與bq.add("Java")、bq.offer("Java") 相同 bq.put("Java"); // 與bq.add("Java")、bq.offer("Java") 相同 bq.put("Java"); // ① 阻塞線程 } }
上面程序先定義一個大小為2的 BlockingQueue,程序先向該隊列中放入兩個元素,此時隊列還沒有滿,兩個元紊都可以放入,因此使用 put()、add() 和 offer() 方法效果完全一樣。當程序試圖放入第三個元素時,如果使用 put() 方法嘗試放入元素將會阻寒線程,如上面程序①號代碼所示。如果使用 add() 方法嘗試放入元素將會引發(fā)異常;如果使用 offer() 方法嘗試放入元素則會返回 false,元素不會被放入。
與此類似的是,在 BlockingQueue 已空的情況下,程序使用 take() 方法嘗試取出元素將會阻塞線程:使用 remove() 方法嘗試取出元素將引發(fā)異常:使用 poll() 方法嘗試取出元素將返回 false,元索不會被刪除。
掌握了 BlodcingQuene 阻塞隊列的特性之后,下面程序就可以利用 BlockingQueue 來實現(xiàn)線程通信了。
public class Producer extends Thread { private BlockingQueue<String> bq; public Producer(BlockingQueue<String> bq) { this.bq = bq; } public void run() { String[] strArr = new String[] { "Java", "Struts", "Spring" }; for(int i=0;i<99999999;i++) { System.out.println(getName()+"生產(chǎn)者準備生產(chǎn)集合元素"); try { Thread.sleep(200); // 嘗試放入元素,如果隊列已滿,則線程被阻塞 bq.put(strArr[i%3]); }catch(Exception ex) { ex.printStackTrace(); } System.out.println(getName()+"生產(chǎn)完成:"+bq); } } } public class Consumer extends Thread { private BlockingQueue<String> bq; public Consumer(BlockingQueue<String> bq) { this.bq = bq; } public void run() { while(true) { System.out.println(getName()+"消費者準備消費集合元素!"); try { Thread.sleep(200); // 嘗試取出元素,如果隊列已空,則線程被阻塞 bq.take(); }catch(Exception ex) { ex.printStackTrace(); } System.out.println(getName()+"消費完成:"+bq); } } } public class BlockingQueueTest2 { public static void main(String[] args) { // 創(chuàng)建一個容量為1的BlockingQueue BlockingQueue<String> bq = new ArrayBlockingQueue<>(1); // 啟動3個生產(chǎn)者線程 new Producer(bq).start(); new Producer(bq).start(); new Producer(bq).start(); // 啟動一個消費者線程 new Consumer(bq).start(); } }
上面程序啟動了 3個生產(chǎn)者線程向 BlockingQueue 集合放入元素,啟動了 1個消費者線程從 BlockingQueue 集合取出元素。本程序的 BlockingQueue 集合容量為1,因此3個生產(chǎn)者線程無法連續(xù)放入元素,必須等待消費者線程取出一個元素后 , 3個生產(chǎn)者線程的其中之一才能放入一個元素。運行該程序,會看到如下圖所示的結果。
從上圖可以看出,3個生產(chǎn)者線程都想向 BlockingQueue 中放入元素,但只要其中一個線程向該隊列中放入元素之后,其他生產(chǎn)者線程就必須等待,等待消費者線程取出 BlockingQueue 隊列里的元素。
以上就是深入理解Java 線程通信的詳細內容,更多關于Java 線程通信的資料請關注腳本之家其它相關文章!
相關文章
Spring AOP之@Around,@AfterReturning使用、切不進去的解決方案
這篇文章主要介紹了Spring AOP之@Around,@AfterReturning使用、切不進去的解決方案,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-05-05從Android源碼剖析Intent查詢匹配的實現(xiàn)
這篇文章主要介紹了從Android源碼剖析Intent查詢匹配的實現(xiàn),Intent部分的源碼為Java代碼,需要的朋友可以參考下2015-07-07