Java線程間通訊的幾種方法小結(jié)
一、使用同一個(gè)共享變量控制
1、Synchronized、wait、notify
public class Demo1 { private final List<Integer> list =new ArrayList<>(); public static void main(String[] args) { Demo1 demo =new Demo1(); new Thread(()->{ for (int i=0;i<10;i++){ synchronized (demo.list){ if(demo.list.size()%2==1){ try { demo.list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } demo.list.add(i); System.out.print(Thread.currentThread().getName()); System.out.println(demo.list); demo.list.notify(); } } }).start(); new Thread(()->{ for (int i=0;i<10;i++){ synchronized (demo.list){ if(demo.list.size()%2==0){ try { demo.list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } demo.list.add(i); System.out.print(Thread.currentThread().getName()); System.out.println(demo.list); demo.list.notify(); } } }).start(); } }
這段代碼演示了如何使用synchronized
、wait
和notify
來實(shí)現(xiàn)兩個(gè)線程間的通信,以確保它們交替地向一個(gè)ArrayList
中添加數(shù)字。以下是代碼的詳細(xì)解釋:
類定義:
Demo1
類中有一個(gè)私有的、不可變的(由final
修飾)成員變量list
,它是ArrayList<Integer>
類型的。
主函數(shù):
- 在
main
方法中,首先創(chuàng)建了Demo1
的一個(gè)實(shí)例demo
。 - 然后啟動(dòng)了兩個(gè)線程,每個(gè)線程都執(zhí)行一個(gè)特定的任務(wù)。
- 在
第一個(gè)線程的任務(wù):
- 使用一個(gè)for循環(huán),循環(huán)10次。
- 在每次循環(huán)中,首先獲得
demo.list
的鎖,這是通過synchronized (demo.list)
實(shí)現(xiàn)的。 - 檢查當(dāng)前列表的大小是否為奇數(shù)(通過
demo.list.size()%2==1
)。如果是,則調(diào)用demo.list.wait()
使當(dāng)前線程進(jìn)入等待狀態(tài),并釋放demo.list
的鎖,這樣其他線程可以獲取該鎖并執(zhí)行其任務(wù)。 - 當(dāng)線程從等待狀態(tài)被喚醒時(shí)(通過另一個(gè)線程的
notify
調(diào)用),它會(huì)繼續(xù)執(zhí)行,并將當(dāng)前的數(shù)字添加到列表中。 - 打印當(dāng)前線程的名稱和更新后的列表。
- 通過調(diào)用
demo.list.notify()
喚醒可能正在等待的另一個(gè)線程。
第二個(gè)線程的任務(wù):
- 它的工作方式與第一個(gè)線程非常相似,但有一個(gè)關(guān)鍵的區(qū)別:它檢查列表的大小是否為偶數(shù),并在這種情況下使線程進(jìn)入等待狀態(tài)。
交替執(zhí)行:
- 由于兩個(gè)線程的工作方式,它們將交替地向列表中添加數(shù)字。當(dāng)一個(gè)線程發(fā)現(xiàn)列表的大小是其期望的(奇數(shù)或偶數(shù))時(shí),它會(huì)暫停并等待另一個(gè)線程添加一個(gè)數(shù)字。然后,它會(huì)被另一個(gè)線程的
notify
調(diào)用喚醒,繼續(xù)其執(zhí)行,并再次使另一個(gè)線程等待。
- 由于兩個(gè)線程的工作方式,它們將交替地向列表中添加數(shù)字。當(dāng)一個(gè)線程發(fā)現(xiàn)列表的大小是其期望的(奇數(shù)或偶數(shù))時(shí),它會(huì)暫停并等待另一個(gè)線程添加一個(gè)數(shù)字。然后,它會(huì)被另一個(gè)線程的
注意事項(xiàng):
- 使用
wait
和notify
時(shí),必須在同步塊或方法中這樣做,否則會(huì)拋出IllegalMonitorStateException
。 - 當(dāng)多個(gè)線程可能訪問共享資源(在這里是
demo.list
)時(shí),使用同步是必要的,以確保數(shù)據(jù)的完整性和一致性。 - 雖然在這個(gè)特定的例子中只有兩個(gè)線程,但這種方法可以擴(kuò)展到更多的線程,只要它們遵循相同的通信和同步協(xié)議。
- 使用
潛在問題:
- 這個(gè)代碼可能存在一個(gè)潛在的問題,即“假喚醒”。理論上,一個(gè)線程可能會(huì)無故地(或由于系統(tǒng)中的其他原因)從
wait
方法中喚醒,即使沒有其他線程明確地調(diào)用了notify
或notifyAll
。為了避免這種情況導(dǎo)致的問題,通常在wait
調(diào)用周圍使用一個(gè)循環(huán)來檢查預(yù)期的條件是否仍然成立。如果條件不滿足,則繼續(xù)等待。這通常被稱為“條件變量”的使用模式。
- 這個(gè)代碼可能存在一個(gè)潛在的問題,即“假喚醒”。理論上,一個(gè)線程可能會(huì)無故地(或由于系統(tǒng)中的其他原因)從
2、Lock、Condition
import java.util.ArrayList; import java.util.List; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class Task { // 創(chuàng)建一個(gè)可重入鎖,用于同步訪問共享資源(即列表) private final Lock lock = new ReentrantLock(); // 創(chuàng)建兩個(gè)條件變量,一個(gè)用于表示列表未滿,一個(gè)用于表示列表非空 private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition(); // 定義列表的最大容量 private static final int MAX_SIZE = 10; // 創(chuàng)建一個(gè)ArrayList作為共享資源,用于在兩個(gè)線程之間傳遞數(shù)據(jù) private final List<String> list = new ArrayList<>(MAX_SIZE); // add方法用于向列表中添加元素 public void add() { for (int i = 0; i < 10; i++) { lock.lock(); // 獲取鎖,開始同步代碼塊 try { // 如果列表已滿,則當(dāng)前線程等待,直到其他線程從列表中移除元素 while (list.size() == MAX_SIZE) { notFull.await(); // 等待列表不滿的條件成立 } // 模擬耗時(shí)操作(比如網(wǎng)絡(luò)請(qǐng)求或數(shù)據(jù)處理) Thread.sleep(100); // 向列表中添加一個(gè)新元素,并打印相關(guān)信息 list.add("add " + (i + 1)); System.out.println("The list size is " + list.size()); System.out.println("The add thread is " + Thread.currentThread().getName()); System.out.println("-------------"); // 通知可能在等待的移除線程,現(xiàn)在列表不為空,可以執(zhí)行移除操作了 notEmpty.signal(); } catch (InterruptedException e) { // 打印異常信息,實(shí)際開發(fā)中可能需要更復(fù)雜的錯(cuò)誤處理邏輯 e.printStackTrace(); } finally { lock.unlock(); // 釋放鎖,允許其他線程訪問同步代碼塊 } } } // sub方法用于從列表中移除元素 public void sub() { for (int i = 0; i < 10; i++) { lock.lock(); // 獲取鎖,開始同步代碼塊 try { // 如果列表為空,則當(dāng)前線程等待,直到其他線程向列表中添加元素 while (list.isEmpty()) { notEmpty.await(); // 等待列表非空的條件成立 } // 模擬耗時(shí)操作(比如網(wǎng)絡(luò)請(qǐng)求或數(shù)據(jù)處理) Thread.sleep(100); // 從列表中移除第一個(gè)元素,并打印相關(guān)信息 list.remove(0); System.out.println("The list size is " + list.size()); System.out.println("The sub thread is " + Thread.currentThread().getName()); System.out.println("-------------"); // 通知可能在等待的添加線程,現(xiàn)在列表不滿,可以執(zhí)行添加操作了 notFull.signal(); } catch (InterruptedException e) { // 打印異常信息,實(shí)際開發(fā)中可能需要更復(fù)雜的錯(cuò)誤處理邏輯 e.printStackTrace(); } finally { lock.unlock(); // 釋放鎖,允許其他線程訪問同步代碼塊 } } } // main方法作為程序的入口點(diǎn),創(chuàng)建Task對(duì)象并啟動(dòng)兩個(gè)線程來執(zhí)行add和sub方法 public static void main(String[] args) { Task task = new Task(); // 創(chuàng)建Task對(duì)象,它包含共享資源和同步機(jī)制 // 使用Lambda表達(dá)式和方法引用啟動(dòng)兩個(gè)線程,分別執(zhí)行add和sub方法,并為它們?cè)O(shè)置名稱以便區(qū)分輸出中的信息來源 new Thread(task::add, "AddThread").start(); // 啟動(dòng)添加線程 new Thread(task::sub, "SubThread").start(); // 啟動(dòng)移除線程 } }
這段代碼定義了一個(gè)名為Task
的類,它主要實(shí)現(xiàn)了線程安全的列表添加和移除操作。類內(nèi)部使用了java.util.concurrent.locks
包下的ReentrantLock
可重入鎖以及相關(guān)的Condition
條件變量來同步訪問共享資源(即一個(gè)ArrayList
)。
在Task
類中,有兩個(gè)主要的方法:add
和sub
。add
方法用于向列表中添加元素,而sub
方法用于從列表中移除元素。這兩個(gè)方法在被調(diào)用時(shí)都需要獲取鎖,以確保同一時(shí)間只有一個(gè)線程可以訪問共享資源。
當(dāng)添加線程調(diào)用add
方法時(shí),它首先檢查列表是否已滿。如果已滿,則通過調(diào)用notFull.await()
使當(dāng)前線程等待,直到其他線程從列表中移除元素并發(fā)出通知。一旦列表不滿,添加線程就會(huì)向列表中添加一個(gè)新元素,并通過調(diào)用notEmpty.signal()
通知可能在等待的移除線程。
類似地,當(dāng)移除線程調(diào)用sub
方法時(shí),它首先檢查列表是否為空。如果為空,則通過調(diào)用notEmpty.await()
使當(dāng)前線程等待,直到其他線程向列表中添加元素并發(fā)出通知。一旦列表非空,移除線程就會(huì)從列表中移除一個(gè)元素,并通過調(diào)用notFull.signal()
通知可能在等待的添加線程。
這種使用鎖和條件變量的方式實(shí)現(xiàn)了線程間的同步和通信,確保了共享資源(即列表)在任何時(shí)候都不會(huì)被多個(gè)線程同時(shí)修改,從而避免了數(shù)據(jù)競(jìng)爭和不一致的問題。同時(shí),通過條件變量的等待和通知機(jī)制,有效地協(xié)調(diào)了添加線程和移除線程的執(zhí)行順序,使得它們能夠按照預(yù)期的方式交替進(jìn)行添加和移除操作。
3、利用volatile
volatile修飾的變量值直接存在主內(nèi)存里面,子線程對(duì)該變量的讀寫直接寫住內(nèi)存,而不是像其它變量一樣在local thread里面產(chǎn)生一份copy。volatile能保證所修飾的變量對(duì)于多個(gè)線程可見性,即只要被修改,其它線程讀到的一定是最新的值。
public class Demo2 { private volatile List<Integer> list =new ArrayList<>(); public static void main(String[] args) { Demo2 demo =new Demo2(); new Thread(()->{ for (int i=0;i<10;i++){ demo.list.add(i); System.out.print(Thread.currentThread().getName()); System.out.println(demo.list); } }).start(); new Thread(()->{ for (int i=0;i<10;i++){ demo.list.add(i); System.out.print(Thread.currentThread().getName()); System.out.println(demo.list); } }).start(); } }
4、利用AtomicInteger
和volatile類似
二、PipedInputStream、PipedOutputStream
這里用流在兩個(gè)線程間通信,但是Java中的Stream是單向的,所以在兩個(gè)線程中分別建了一個(gè)input和output
public class PipedDemo { private final PipedInputStream inputStream1; private final PipedOutputStream outputStream1; private final PipedInputStream inputStream2; private final PipedOutputStream outputStream2; public PipedDemo(){ inputStream1 = new PipedInputStream(); outputStream1 = new PipedOutputStream(); inputStream2 = new PipedInputStream(); outputStream2 = new PipedOutputStream(); try { inputStream1.connect(outputStream2); inputStream2.connect(outputStream1); } catch (IOException e) { e.printStackTrace(); } } /**程序退出時(shí),需要關(guān)閉stream*/ public void shutdown() throws IOException { inputStream1.close(); inputStream2.close(); outputStream1.close(); outputStream2.close(); } public static void main(String[] args) throws IOException { PipedDemo demo =new PipedDemo(); new Thread(()->{ PipedInputStream in = demo.inputStream2; PipedOutputStream out = demo.outputStream2; for (int i = 0; i < 10; i++) { try { byte[] inArr = new byte[2]; in.read(inArr); System.out.print(Thread.currentThread().getName()+": "+i+" "); System.out.println(new String(inArr)); while(true){ if("go".equals(new String(inArr))) break; } out.write("ok".getBytes()); } catch (IOException e) { e.printStackTrace(); } } }).start(); new Thread(()->{ PipedInputStream in = demo.inputStream1; PipedOutputStream out = demo.outputStream1; for (int i = 0; i < 10; i++) { try { out.write("go".getBytes()); byte[] inArr = new byte[2]; in.read(inArr); System.out.print(Thread.currentThread().getName()+": "+i+" "); System.out.println(new String(inArr)); while(true){ if("ok".equals(new String(inArr))) break; } } catch (IOException e) { e.printStackTrace(); } } }).start(); // demo.shutdown(); } }
輸出
Thread-0: 0 go
Thread-1: 0 ok
Thread-0: 1 go
Thread-1: 1 ok
Thread-0: 2 go
Thread-1: 2 ok
Thread-0: 3 go
Thread-1: 3 ok
Thread-0: 4 go
Thread-1: 4 ok
Thread-0: 5 go
Thread-1: 5 ok
Thread-0: 6 go
Thread-1: 6 ok
Thread-0: 7 go
Thread-1: 7 ok
Thread-0: 8 go
Thread-1: 8 ok
Thread-0: 9 go
Thread-1: 9 ok
三、利用BlockingQueue
BlockingQueue定義的常用方法如下:
- add(Object):把Object加到BlockingQueue里,如果BlockingQueue可以容納,則返回true,否則拋出異常。
- offer(Object):表示如果可能的話,將Object加到BlockingQueue里,即如果BlockingQueue可以容納,則返回true,否則返回false。
- put(Object):把Object加到BlockingQueue里,如果BlockingQueue沒有空間,則調(diào)用此方法的線程被阻斷直到BlockingQueue里有空間再繼續(xù)。
- poll(time):獲取并刪除BlockingQueue里排在首位的對(duì)象,若不能立即取出,則可以等time參數(shù)規(guī)定的時(shí)間,取不到時(shí)返回null。當(dāng)不傳入time值時(shí),立刻返回。
- peek():立刻獲取BlockingQueue里排在首位的對(duì)象,但不從隊(duì)列里刪除,如果隊(duì)列為空,則返回null。
- take():獲取并刪除BlockingQueue里排在首位的對(duì)象,若BlockingQueue為空,阻斷進(jìn)入等待狀態(tài)直到BlockingQueue有新的對(duì)象被加入為止。
BlockingQueue有四個(gè)具體的實(shí)現(xiàn)類:
- ArrayBlockingQueue:數(shù)組阻塞隊(duì)列,規(guī)定大小,其構(gòu)造函數(shù)必須帶一個(gè)int參數(shù)來指明其大小。其所含的對(duì)象是以FIFO(先入先出)順序排序的。
- LinkedBlockingQueue:鏈阻塞隊(duì)列,大小不定,若其構(gòu)造函數(shù)帶一個(gè)規(guī)定大小的參數(shù),生成的BlockingQueue有大小限制,若不帶大小參數(shù),所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定。其所含的對(duì)象是以FIFO順序排序的。
- PriorityBlockingQueue:類似于LinkedBlockingQueue,但其所含對(duì)象的排序不是FIFO,而是依據(jù)對(duì)象的自然排序順序或者是構(gòu)造函數(shù)所帶的Comparator決定的順序。
- SynchronousQueue:特殊的BlockingQueue,它的內(nèi)部同時(shí)只能夠容納單個(gè)元素,對(duì)其的操作必須是放和取交替完成的。
- DelayQueue:延遲隊(duì)列,注入其中的元素必須實(shí)現(xiàn) java.util.concurrent.Delayed 接口
所有BlockingQueue的使用方式類似,以下例子一個(gè)線程寫入,一個(gè)線程讀取,操作的是同一個(gè)Queue:
public class BlockingQueueDemo { public static void main(String[] args) { LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(); //讀線程 new Thread(() -> { int i =0; while (true) { try { String item = queue.take(); System.out.print(Thread.currentThread().getName() + ": " + i + " "); System.out.println(item); i++; } catch (Exception e) { e.printStackTrace(); } } }).start(); //寫線程 new Thread(() -> { for (int i = 0; i < 10; i++) { try { String item = "go"+i; System.out.print(Thread.currentThread().getName() + ": " + i + " "); System.out.println(item); queue.put(item); } catch (Exception e) { e.printStackTrace(); } } }).start(); } }
到此這篇關(guān)于Java線程間通訊的幾種方法小結(jié)的文章就介紹到這了,更多相關(guān)Java線程間通訊內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
當(dāng)mybatis返回值遇見內(nèi)部類的問題
這篇文章主要介紹了當(dāng)mybatis返回值遇見內(nèi)部類的問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-12-12Jmeter自定義函數(shù)base64加密實(shí)現(xiàn)過程解析
這篇文章主要介紹了Jmeter自定義函數(shù)base64加密實(shí)現(xiàn)過程解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-07-07SpringBoot如何整合redis實(shí)現(xiàn)過期key監(jiān)聽事件
這篇文章主要介紹了SpringBoot如何整合redis實(shí)現(xiàn)過期key監(jiān)聽事件,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-09-09java ExecutorService CompletionService線程池區(qū)別與選擇
這篇文章主要為大家介紹了java ExecutorService CompletionService線程池區(qū)別與選擇使用示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-09-09使用springmvc的controller層獲取到請(qǐng)求的數(shù)據(jù)方式
這篇文章主要介紹了使用springmvc的controller層獲取到請(qǐng)求的數(shù)據(jù)方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08Java8 中使用Stream 讓List 轉(zhuǎn) Map使用問題小結(jié)
這篇文章主要介紹了Java8 中使用Stream 讓List 轉(zhuǎn) Map使用總結(jié),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-06-06Java NumberFormat格式化float類型的bug
今天小編就為大家分享一篇關(guān)于Java NumberFormat格式化float類型的bug,小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來看看吧2018-10-10IntelliJ IDEA 2020最新激活碼(親測(cè)有效,可激活至 2089 年
這篇文章主要介紹了IntelliJ IDEA 2021最新激活碼(親測(cè)有效,可激活至 2089 年),非常不錯(cuò),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-04-04