Java中實現(xiàn)線程間通信的實例教程
前言
雖然通常每個子線程只需要完成自己的任務(wù),但是有時我們希望多個線程一起工作來完成一個任務(wù),這就涉及到線程間通信。
關(guān)于線程間通信本文涉及到的方法和類包括:thread.join()、object.wait()、object.notify()、CountdownLatch、CyclicBarrier、FutureTask、Callable。
接下來將用幾個例子來介紹如何在Java中實現(xiàn)線程間通信:
- 如何讓兩個線程依次執(zhí)行,即一個線程等待另一個線程執(zhí)行完成后再執(zhí)行?
- 如何讓兩個線程以指定的方式有序相交執(zhí)行?
- 有四個線程:A、B、C、D,如何實現(xiàn) D 在 A、B、C 都同步執(zhí)行完畢后執(zhí)行?
- 三個運動員分開準備,然后在每個人準備好后同時開始跑步。
- 子線程完成任務(wù)后,將結(jié)果返回給主線程。
1. 如何讓兩個線程依次執(zhí)行?
假設(shè)有兩個線程:A 和 B,這兩個線程都可以按照順序打印數(shù)字,代碼如下:
public class Test01 { public static void main(String[] args) throws InterruptedException { demo1(); } public static void demo1() { Thread a = new Thread(() -> { printNumber("A"); }); Thread b = new Thread(() -> { printNumber("B"); }); a.start(); b.start(); } public static void printNumber(String threadName) { int i = 0; while (i++ < 3) { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(threadName + " print: " + i); } } }
得到的結(jié)果如下:
A print: 1
B print: 1
B print: 2
A print: 2
A print: 3
B print: 3
可以看到 A 和 B 同時打印數(shù)字,如果我們希望 B 在 A 執(zhí)行完成之后開始執(zhí)行,那么可以使用 thread.join() 方法實現(xiàn),代碼如下:
public static void demo2() { Thread a = new Thread(() -> { printNumber("A"); }); Thread b = new Thread(() -> { System.out.println("B 等待 A 執(zhí)行"); try { a.join(); } catch (InterruptedException e) { e.printStackTrace(); } printNumber("B"); }); a.start(); b.start(); }
得到的結(jié)果如下:
B 等待 A 執(zhí)行
A print: 1
A print: 2
A print: 3
B print: 1
B print: 2
B print: 3
我們可以看到該 a.join() 方法會讓 B 等待 A 完成打印。
thread.join() 方法的作用就是阻塞當前線程,等待調(diào)用 join() 方法的線程執(zhí)行完畢后再執(zhí)行后面的代碼。
查看 join() 方法的源碼,內(nèi)部是調(diào)用了 join(0) ,如下:
public final void join() throws InterruptedException { join(0); }
查看 join(0) 的源碼如下:
// 注意這里使用了 sychronized 加鎖,鎖對象是線程的實例對象 public final synchronized void join(long millis) throws InterruptedException { long base = System.currentTimeMillis(); long now = 0; if (millis < 0) { throw new IllegalArgumentException("timeout value is negative"); } // 調(diào)用 join(0) 執(zhí)行下面的代碼 if (millis == 0) { // 這里使用 while 循環(huán)的目的是為了避免虛假喚醒 // 如果當前線程存活則調(diào)用 wait(0), 0 表示永久等待,直到調(diào)用 notifyAll() 或者 notify() 方法 // 當線程結(jié)束的時候會調(diào)用 notifyAll() 方法 while (isAlive()) { wait(0); } } else { while (isAlive()) { long delay = millis - now; if (delay <= 0) { break; } wait(delay); now = System.currentTimeMillis() - base; } } }
從源碼中可以看出 join(long millis) 方法是通過 wait(long timeout) (Object 提供的方法)方法實現(xiàn)的,調(diào)用 wait 方法之前,當前線程必須獲得對象的鎖,所以此 join 方法使用了 synchronized 加鎖,鎖對象是線程的實例對象。其中 wait(0)方法會讓當前線程阻塞等待,直到另一個線程調(diào)用此對象的 notify() 或者 notifyAll() 方法才會繼續(xù)執(zhí)行。當調(diào)用 join 方法的線程結(jié)束的時候會調(diào)用 notifyAll() 方法,所以 join() 方法可以實現(xiàn)一個線程等待另一個調(diào)用 join() 的線程結(jié)束后再執(zhí)行。
虛假喚醒:一個線程在沒有被通知、中斷、超時的情況下被喚醒;
虛假喚醒可能導(dǎo)致條件不成立的情況下執(zhí)行代碼,破壞被鎖保護的約束關(guān)系;
為什么使用 while 循環(huán)來避免虛假喚醒:
在 if 塊中使用 wait 方法,是非常危險的,因為一旦線程被喚醒,并得到鎖,就不會再判斷 if 條件而執(zhí)行 if 語句塊外的代碼,所以建議凡是先要做條件判斷,再 wait 的地方,都使用 while 循環(huán)來做,循環(huán)會在等待之前和之后對條件進行測試。
2. 如何讓兩個線程按照指定的方式有序相交?
如果現(xiàn)在我們希望 B線程在 A 線程打印 1 后立即打印 1,2,3,然后 A 線程繼續(xù)打印 2,3,那么我們需要更細粒度的鎖來控制執(zhí)行順序。
在這里,我們可以利用 object.wait() 和 object.notify() 方法,代碼如下:
public static void demo3() { Object lock = new Object(); Thread A = new Thread(() -> { synchronized (lock) { System.out.println("A 1"); try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("A 2"); System.out.println("A 3"); } }); Thread B = new Thread(() -> { synchronized (lock) { System.out.println("B 1"); System.out.println("B 2"); System.out.println("B 3"); lock.notify(); } }); A.start(); B.start(); }
得到的結(jié)果如下:
A 1
B 1
B 2
B 3
A 2
A 3
上述代碼的執(zhí)行流程如下:
- 首先我們創(chuàng)建一個由 A 和 B 共享的對象鎖: lock = new Object();
- 當A拿到鎖時,先打印1,然后調(diào)用lock.wait()方法進入等待狀態(tài),然后交出鎖的控制權(quán);
- B 不會被執(zhí)行,直到 A 調(diào)用該lock.wait()方法釋放控制權(quán)并且 B 獲得鎖;
- B拿到鎖后打印1,2,3,然后調(diào)用lock.notify()方法喚醒正在等待的A;
- A 喚醒后繼續(xù)打印剩余的 2,3。
為了便于理解,我將上面的代碼添加了日志,代碼如下:
public static void demo3() { Object lock = new Object(); Thread A = new Thread(() -> { System.out.println("INFO:A 等待獲取鎖"); synchronized (lock) { System.out.println("INFO:A 獲取到鎖"); System.out.println("A 1"); try { System.out.println("INFO:A 進入 waiting 狀態(tài),放棄鎖的控制權(quán)"); lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("INFO:A 被 B 喚醒繼續(xù)執(zhí)行"); System.out.println("A 2"); System.out.println("A 3"); } }); Thread B = new Thread(() -> { System.out.println("INFO:B 等待獲取鎖"); synchronized (lock) { System.out.println("INFO:B 獲取到鎖"); System.out.println("B 1"); System.out.println("B 2"); System.out.println("B 3"); System.out.println("INFO:B 執(zhí)行結(jié)束,調(diào)用 notify 方法喚醒 A"); lock.notify(); } }); A.start(); B.start(); }
得到的結(jié)果如下:
INFO:A 等待獲取鎖
INFO:A 獲取到鎖
A 1
INFO:A 進入 waiting 狀態(tài),放棄鎖的控制權(quán)
INFO:B 等待獲取鎖
INFO:B 獲取到鎖
B 1
B 2
B 3
INFO:B 執(zhí)行結(jié)束,調(diào)用 notify 方法喚醒 A
INFO:A 被 B 喚醒繼續(xù)執(zhí)行
A 2
A 3
3. 線程 D 在A、B、C都同步執(zhí)行完畢后執(zhí)行
thread.join() 前面介紹的方法允許一個線程在等待另一個線程完成運行后繼續(xù)執(zhí)行。但是如果我們將A、B、C依次加入到D線程中,就會讓A、B、C依次執(zhí)行,而我們希望它們?nèi)齻€同步運行。
我們要實現(xiàn)的目標是:A、B、C三個線程可以同時開始運行,各自獨立運行完成后通知D;D 不會開始運行,直到 A、B 和 C 都運行完畢。所以我們 CountdownLatch 用來實現(xiàn)這種類型的通信。它的基本用法是:
- 創(chuàng)建一個計數(shù)器,并設(shè)置一個初始值, CountdownLatch countDownLatch = new CountDownLatch(3);
- 調(diào)用countDownLatch.await()進入等待狀態(tài),直到計數(shù)值變?yōu)?;
- 在其他線程調(diào)用countDownLatch.countDown(),該方法會將計數(shù)值減一;
- 當計數(shù)器的值變?yōu)?0 時,countDownLatch.await()等待線程中的方法會繼續(xù)執(zhí)行下面的代碼。
實現(xiàn)代碼如下:
public static void runDAfterABC() { int count = 3; CountDownLatch countDownLatch = new CountDownLatch(count); new Thread(() -> { System.out.println("INFO: D 等待 A B C 運行完成"); try { countDownLatch.await(); System.out.println("INFO: A B C 運行完成,D 開始運行"); System.out.println("D is working"); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); for (char threadName = 'A'; threadName <= 'C' ; threadName++) { final String name = String.valueOf(threadName); new Thread(() -> { System.out.println(name + " is working"); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(name + " finished"); countDownLatch.countDown(); }).start(); } }
得到的結(jié)果如下:
INFO: D 等待 A B C 運行完成
A is working
B is working
C is working
C finished
B finished
A finished
INFO: A B C 運行完成,D 開始運行
D is working
其實CountDownLatch它本身就是一個倒數(shù)計數(shù)器,我們把初始的count值設(shè)置為3。D運行的時候,首先調(diào)用該countDownLatch.await()方法檢查計數(shù)器的值是否為0,如果不是0則保持等待狀態(tài). A、B、C 運行完畢后,分別使用countDownLatch.countDown()方法將倒數(shù)計數(shù)器減1。計數(shù)器將減為 0,然后通知await()方法結(jié)束等待,D開始繼續(xù)執(zhí)行。
因此,CountDownLatch適用于一個線程需要等待多個線程的情況。
4. 三個運動員分開準備同時開跑
這一次,A、B、C這三個線程都需要分別準備,等三個線程都準備好后開始同時運行,我們應(yīng)該如何做到這一點?
CountDownLatch可以用來計數(shù),但完成計數(shù)的時候,只有一個線程的一個await()方法會得到響應(yīng),所以多線程不能在同一時間被觸發(fā)。為了達到線程相互等待的效果,我們可以使用該CyclicBarrier,其基本用法為:
- 首先創(chuàng)建一個公共對象CyclicBarrier,并設(shè)置同時等待的線程數(shù),CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
- 這些線程同時開始準備,準備好后,需要等待別人準備好,所以調(diào)用cyclicBarrier.await()方法等待別人;
- 當指定的需要同時等待的線程都調(diào)用了該cyclicBarrier.await()方法時,意味著這些線程準備好了,那么這些線程就會開始同時繼續(xù)執(zhí)行。
想象一下有三個跑步者需要同時開始跑步,所以他們需要等待其他人都準備好,實現(xiàn)代碼如下:
public static void runABCWhenAllReady() { int count = 3; CyclicBarrier cyclicBarrier = new CyclicBarrier(count); Random random = new Random(); for (char threadName = 'A'; threadName <= 'C' ; threadName++) { final String name = String.valueOf(threadName); new Thread(() -> { int prepareTime = random.nextInt(10000); System.out.println(name + " 準備時間:" + prepareTime); try { Thread.sleep(prepareTime); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(name + " 準備好了,等待其他人"); try { cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println(name + " 開始跑步"); }).start(); } }
得到結(jié)果如下:
A 準備時間:1085
B 準備時間:7729
C 準備時間:8444
A 準備好了,等待其他人
B 準備好了,等待其他人
C 準備好了,等待其他人
C 開始跑步
A 開始跑步
B 開始跑步
CyclicBarrier 的作用就是等待多個線程同時執(zhí)行。
5. 子線程將結(jié)果返回給主線程
在實際開發(fā)中,往往我們需要創(chuàng)建子線程來做一些耗時的任務(wù),然后將執(zhí)行結(jié)果傳回主線程。那么如何在 Java 中實現(xiàn)呢?
一般在創(chuàng)建線程的時候,我們會把 Runnable 對象傳遞給 Thread 執(zhí)行,Runable 的源碼如下:
@FunctionalInterface public interface Runnable { public abstract void run(); }
可以看到 Runable 是一個函數(shù)式接口,該接口中的 run 方法沒有返回值,那么如果要返回結(jié)果,可以使用另一個類似的接口 Callable。
函數(shù)式接口:只有一個方法的接口
Callable 接口的源碼如下:
@FunctionalInterface public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; }
可以看出,最大的區(qū)別Callable在于它返回的是泛型。
那么接下來的問題是,如何將子線程的結(jié)果傳回去呢?Java 有一個類,F(xiàn)utureTask,它可以與 一起工作Callable,但請注意,get用于獲取結(jié)果的方法會阻塞主線程。FutureTask 本質(zhì)上還是一個 Runnable,所以可以直接傳到 Thread 中。
比如我們想讓子線程計算1到100的總和,并將結(jié)果返回給主線程,代碼如下:
public static void getResultInWorker() { Callable<Integer> callable = () -> { System.out.println("子任務(wù)開始執(zhí)行"); Thread.sleep(1000); int result = 0; for (int i = 0; i <= 100; i++) { result += i; } System.out.println("子任務(wù)執(zhí)行完成并返回結(jié)果"); return result; }; FutureTask<Integer> futureTask = new FutureTask<>(callable); new Thread(futureTask).start(); try { System.out.println("開始執(zhí)行 futureTask.get()"); Integer result = futureTask.get(); System.out.println("執(zhí)行的結(jié)果:" + result); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }
得到的結(jié)果如下:
開始執(zhí)行 futureTask.get()
子任務(wù)開始執(zhí)行
子任務(wù)執(zhí)行完成并返回結(jié)果
執(zhí)行的結(jié)果:5050
可以看出在主線程調(diào)用futureTask.get()方法時阻塞了主線程;然后Callable開始在內(nèi)部執(zhí)行并返回操作的結(jié)果;然后futureTask.get()得到結(jié)果,主線程恢復(fù)運行。
在這里我們可以了解到,F(xiàn)utureTask和Callable可以直接在主線程中獲取子線程的結(jié)果,但是它們會阻塞主線程。當然,如果你不希望阻塞主線程,可以考慮使用ExecutorService把FutureTask到線程池來管理執(zhí)行。
參考文章:
www.tutorialdocs.com/article/jav…
總結(jié)
到此這篇關(guān)于Java中實現(xiàn)線程間通信的文章就介紹到這了,更多相關(guān)Java線程間通信內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java基于SpringBoot和tk.mybatis實現(xiàn)事務(wù)讀寫分離代碼實例
這篇文章主要介紹了Java基于SpringBoot和tk.mybatis實現(xiàn)事務(wù)讀寫分離代碼實例,讀寫分離,基本的原理是讓主數(shù)據(jù)庫處理事務(wù)性增、改、刪操作,而從數(shù)據(jù)庫處理SELECT查詢操作,數(shù)據(jù)庫復(fù)制被用來把事務(wù)性操作導(dǎo)致的變更同步到集群中的從數(shù)據(jù)庫,需要的朋友可以參考下2023-10-10java調(diào)用webService接口的代碼實現(xiàn)
本文主要介紹了java調(diào)用webService接口的代碼實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-02-02基于java file 文件操作operate file of java的應(yīng)用
本篇文章介紹了,基于java file 文件操作operate file of java的應(yīng)用。需要的朋友參考下2013-05-05對Mybatis?Plus中@TableField的使用正解
這篇文章主要介紹了對Mybatis?Plus中@TableField的使用正解,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-01-01springsecurity?登錄認證流程分析一(ajax)
這篇文章主要介紹了springsecurity?登錄認證一(ajax篇),本文通過實例代碼圖文相結(jié)合給大家介紹的非常詳細,感興趣的朋友跟隨小編一起看看吧2024-08-08java并發(fā)編程關(guān)鍵字volatile保證可見性不保證原子性詳解
這篇文章主要為大家介紹了java并發(fā)編程關(guān)鍵字volatile保證可見性不保證原子性詳解,文中附含詳細示例說明,有需要的朋友可以借鑒參考下,希望能夠有所幫助2022-02-02