Java線程間通訊的幾種方法小結
一、使用同一個共享變量控制
1、Synchronized、wait、notify
public class Demo1 {
private final List<Integer> list =new ArrayList<>();
public static void main(String[] args) {
Demo1 demo =new Demo1();
new Thread(()->{
for (int i=0;i<10;i++){
synchronized (demo.list){
if(demo.list.size()%2==1){
try {
demo.list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
demo.list.add(i);
System.out.print(Thread.currentThread().getName());
System.out.println(demo.list);
demo.list.notify();
}
}
}).start();
new Thread(()->{
for (int i=0;i<10;i++){
synchronized (demo.list){
if(demo.list.size()%2==0){
try {
demo.list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
demo.list.add(i);
System.out.print(Thread.currentThread().getName());
System.out.println(demo.list);
demo.list.notify();
}
}
}).start();
}
}
這段代碼演示了如何使用synchronized、wait和notify來實現(xiàn)兩個線程間的通信,以確保它們交替地向一個ArrayList中添加數(shù)字。以下是代碼的詳細解釋:
類定義:
Demo1類中有一個私有的、不可變的(由final修飾)成員變量list,它是ArrayList<Integer>類型的。
主函數(shù):
- 在
main方法中,首先創(chuàng)建了Demo1的一個實例demo。 - 然后啟動了兩個線程,每個線程都執(zhí)行一個特定的任務。
- 在
第一個線程的任務:
- 使用一個for循環(huán),循環(huán)10次。
- 在每次循環(huán)中,首先獲得
demo.list的鎖,這是通過synchronized (demo.list)實現(xiàn)的。 - 檢查當前列表的大小是否為奇數(shù)(通過
demo.list.size()%2==1)。如果是,則調用demo.list.wait()使當前線程進入等待狀態(tài),并釋放demo.list的鎖,這樣其他線程可以獲取該鎖并執(zhí)行其任務。 - 當線程從等待狀態(tài)被喚醒時(通過另一個線程的
notify調用),它會繼續(xù)執(zhí)行,并將當前的數(shù)字添加到列表中。 - 打印當前線程的名稱和更新后的列表。
- 通過調用
demo.list.notify()喚醒可能正在等待的另一個線程。
第二個線程的任務:
- 它的工作方式與第一個線程非常相似,但有一個關鍵的區(qū)別:它檢查列表的大小是否為偶數(shù),并在這種情況下使線程進入等待狀態(tài)。
交替執(zhí)行:
- 由于兩個線程的工作方式,它們將交替地向列表中添加數(shù)字。當一個線程發(fā)現(xiàn)列表的大小是其期望的(奇數(shù)或偶數(shù))時,它會暫停并等待另一個線程添加一個數(shù)字。然后,它會被另一個線程的
notify調用喚醒,繼續(xù)其執(zhí)行,并再次使另一個線程等待。
- 由于兩個線程的工作方式,它們將交替地向列表中添加數(shù)字。當一個線程發(fā)現(xiàn)列表的大小是其期望的(奇數(shù)或偶數(shù))時,它會暫停并等待另一個線程添加一個數(shù)字。然后,它會被另一個線程的
注意事項:
- 使用
wait和notify時,必須在同步塊或方法中這樣做,否則會拋出IllegalMonitorStateException。 - 當多個線程可能訪問共享資源(在這里是
demo.list)時,使用同步是必要的,以確保數(shù)據(jù)的完整性和一致性。 - 雖然在這個特定的例子中只有兩個線程,但這種方法可以擴展到更多的線程,只要它們遵循相同的通信和同步協(xié)議。
- 使用
潛在問題:
- 這個代碼可能存在一個潛在的問題,即“假喚醒”。理論上,一個線程可能會無故地(或由于系統(tǒng)中的其他原因)從
wait方法中喚醒,即使沒有其他線程明確地調用了notify或notifyAll。為了避免這種情況導致的問題,通常在wait調用周圍使用一個循環(huán)來檢查預期的條件是否仍然成立。如果條件不滿足,則繼續(xù)等待。這通常被稱為“條件變量”的使用模式。
- 這個代碼可能存在一個潛在的問題,即“假喚醒”。理論上,一個線程可能會無故地(或由于系統(tǒng)中的其他原因)從
2、Lock、Condition
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Task {
// 創(chuàng)建一個可重入鎖,用于同步訪問共享資源(即列表)
private final Lock lock = new ReentrantLock();
// 創(chuàng)建兩個條件變量,一個用于表示列表未滿,一個用于表示列表非空
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
// 定義列表的最大容量
private static final int MAX_SIZE = 10;
// 創(chuàng)建一個ArrayList作為共享資源,用于在兩個線程之間傳遞數(shù)據(jù)
private final List<String> list = new ArrayList<>(MAX_SIZE);
// add方法用于向列表中添加元素
public void add() {
for (int i = 0; i < 10; i++) {
lock.lock(); // 獲取鎖,開始同步代碼塊
try {
// 如果列表已滿,則當前線程等待,直到其他線程從列表中移除元素
while (list.size() == MAX_SIZE) {
notFull.await(); // 等待列表不滿的條件成立
}
// 模擬耗時操作(比如網(wǎng)絡請求或數(shù)據(jù)處理)
Thread.sleep(100);
// 向列表中添加一個新元素,并打印相關信息
list.add("add " + (i + 1));
System.out.println("The list size is " + list.size());
System.out.println("The add thread is " + Thread.currentThread().getName());
System.out.println("-------------");
// 通知可能在等待的移除線程,現(xiàn)在列表不為空,可以執(zhí)行移除操作了
notEmpty.signal();
} catch (InterruptedException e) {
// 打印異常信息,實際開發(fā)中可能需要更復雜的錯誤處理邏輯
e.printStackTrace();
} finally {
lock.unlock(); // 釋放鎖,允許其他線程訪問同步代碼塊
}
}
}
// sub方法用于從列表中移除元素
public void sub() {
for (int i = 0; i < 10; i++) {
lock.lock(); // 獲取鎖,開始同步代碼塊
try {
// 如果列表為空,則當前線程等待,直到其他線程向列表中添加元素
while (list.isEmpty()) {
notEmpty.await(); // 等待列表非空的條件成立
}
// 模擬耗時操作(比如網(wǎng)絡請求或數(shù)據(jù)處理)
Thread.sleep(100);
// 從列表中移除第一個元素,并打印相關信息
list.remove(0);
System.out.println("The list size is " + list.size());
System.out.println("The sub thread is " + Thread.currentThread().getName());
System.out.println("-------------");
// 通知可能在等待的添加線程,現(xiàn)在列表不滿,可以執(zhí)行添加操作了
notFull.signal();
} catch (InterruptedException e) {
// 打印異常信息,實際開發(fā)中可能需要更復雜的錯誤處理邏輯
e.printStackTrace();
} finally {
lock.unlock(); // 釋放鎖,允許其他線程訪問同步代碼塊
}
}
}
// main方法作為程序的入口點,創(chuàng)建Task對象并啟動兩個線程來執(zhí)行add和sub方法
public static void main(String[] args) {
Task task = new Task(); // 創(chuàng)建Task對象,它包含共享資源和同步機制
// 使用Lambda表達式和方法引用啟動兩個線程,分別執(zhí)行add和sub方法,并為它們設置名稱以便區(qū)分輸出中的信息來源
new Thread(task::add, "AddThread").start(); // 啟動添加線程
new Thread(task::sub, "SubThread").start(); // 啟動移除線程
}
}
這段代碼定義了一個名為Task的類,它主要實現(xiàn)了線程安全的列表添加和移除操作。類內(nèi)部使用了java.util.concurrent.locks包下的ReentrantLock可重入鎖以及相關的Condition條件變量來同步訪問共享資源(即一個ArrayList)。
在Task類中,有兩個主要的方法:add和sub。add方法用于向列表中添加元素,而sub方法用于從列表中移除元素。這兩個方法在被調用時都需要獲取鎖,以確保同一時間只有一個線程可以訪問共享資源。
當添加線程調用add方法時,它首先檢查列表是否已滿。如果已滿,則通過調用notFull.await()使當前線程等待,直到其他線程從列表中移除元素并發(fā)出通知。一旦列表不滿,添加線程就會向列表中添加一個新元素,并通過調用notEmpty.signal()通知可能在等待的移除線程。
類似地,當移除線程調用sub方法時,它首先檢查列表是否為空。如果為空,則通過調用notEmpty.await()使當前線程等待,直到其他線程向列表中添加元素并發(fā)出通知。一旦列表非空,移除線程就會從列表中移除一個元素,并通過調用notFull.signal()通知可能在等待的添加線程。
這種使用鎖和條件變量的方式實現(xiàn)了線程間的同步和通信,確保了共享資源(即列表)在任何時候都不會被多個線程同時修改,從而避免了數(shù)據(jù)競爭和不一致的問題。同時,通過條件變量的等待和通知機制,有效地協(xié)調了添加線程和移除線程的執(zhí)行順序,使得它們能夠按照預期的方式交替進行添加和移除操作。
3、利用volatile
volatile修飾的變量值直接存在主內(nèi)存里面,子線程對該變量的讀寫直接寫住內(nèi)存,而不是像其它變量一樣在local thread里面產(chǎn)生一份copy。volatile能保證所修飾的變量對于多個線程可見性,即只要被修改,其它線程讀到的一定是最新的值。
public class Demo2 {
private volatile List<Integer> list =new ArrayList<>();
public static void main(String[] args) {
Demo2 demo =new Demo2();
new Thread(()->{
for (int i=0;i<10;i++){
demo.list.add(i);
System.out.print(Thread.currentThread().getName());
System.out.println(demo.list);
}
}).start();
new Thread(()->{
for (int i=0;i<10;i++){
demo.list.add(i);
System.out.print(Thread.currentThread().getName());
System.out.println(demo.list);
}
}).start();
}
}
4、利用AtomicInteger
和volatile類似
二、PipedInputStream、PipedOutputStream
這里用流在兩個線程間通信,但是Java中的Stream是單向的,所以在兩個線程中分別建了一個input和output
public class PipedDemo {
private final PipedInputStream inputStream1;
private final PipedOutputStream outputStream1;
private final PipedInputStream inputStream2;
private final PipedOutputStream outputStream2;
public PipedDemo(){
inputStream1 = new PipedInputStream();
outputStream1 = new PipedOutputStream();
inputStream2 = new PipedInputStream();
outputStream2 = new PipedOutputStream();
try {
inputStream1.connect(outputStream2);
inputStream2.connect(outputStream1);
} catch (IOException e) {
e.printStackTrace();
}
}
/**程序退出時,需要關閉stream*/
public void shutdown() throws IOException {
inputStream1.close();
inputStream2.close();
outputStream1.close();
outputStream2.close();
}
public static void main(String[] args) throws IOException {
PipedDemo demo =new PipedDemo();
new Thread(()->{
PipedInputStream in = demo.inputStream2;
PipedOutputStream out = demo.outputStream2;
for (int i = 0; i < 10; i++) {
try {
byte[] inArr = new byte[2];
in.read(inArr);
System.out.print(Thread.currentThread().getName()+": "+i+" ");
System.out.println(new String(inArr));
while(true){
if("go".equals(new String(inArr)))
break;
}
out.write("ok".getBytes());
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
new Thread(()->{
PipedInputStream in = demo.inputStream1;
PipedOutputStream out = demo.outputStream1;
for (int i = 0; i < 10; i++) {
try {
out.write("go".getBytes());
byte[] inArr = new byte[2];
in.read(inArr);
System.out.print(Thread.currentThread().getName()+": "+i+" ");
System.out.println(new String(inArr));
while(true){
if("ok".equals(new String(inArr)))
break;
}
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
// demo.shutdown();
}
}
輸出
Thread-0: 0 go
Thread-1: 0 ok
Thread-0: 1 go
Thread-1: 1 ok
Thread-0: 2 go
Thread-1: 2 ok
Thread-0: 3 go
Thread-1: 3 ok
Thread-0: 4 go
Thread-1: 4 ok
Thread-0: 5 go
Thread-1: 5 ok
Thread-0: 6 go
Thread-1: 6 ok
Thread-0: 7 go
Thread-1: 7 ok
Thread-0: 8 go
Thread-1: 8 ok
Thread-0: 9 go
Thread-1: 9 ok
三、利用BlockingQueue
BlockingQueue定義的常用方法如下:
- add(Object):把Object加到BlockingQueue里,如果BlockingQueue可以容納,則返回true,否則拋出異常。
- offer(Object):表示如果可能的話,將Object加到BlockingQueue里,即如果BlockingQueue可以容納,則返回true,否則返回false。
- put(Object):把Object加到BlockingQueue里,如果BlockingQueue沒有空間,則調用此方法的線程被阻斷直到BlockingQueue里有空間再繼續(xù)。
- poll(time):獲取并刪除BlockingQueue里排在首位的對象,若不能立即取出,則可以等time參數(shù)規(guī)定的時間,取不到時返回null。當不傳入time值時,立刻返回。
- peek():立刻獲取BlockingQueue里排在首位的對象,但不從隊列里刪除,如果隊列為空,則返回null。
- take():獲取并刪除BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態(tài)直到BlockingQueue有新的對象被加入為止。
BlockingQueue有四個具體的實現(xiàn)類:
- ArrayBlockingQueue:數(shù)組阻塞隊列,規(guī)定大小,其構造函數(shù)必須帶一個int參數(shù)來指明其大小。其所含的對象是以FIFO(先入先出)順序排序的。
- LinkedBlockingQueue:鏈阻塞隊列,大小不定,若其構造函數(shù)帶一個規(guī)定大小的參數(shù),生成的BlockingQueue有大小限制,若不帶大小參數(shù),所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定。其所含的對象是以FIFO順序排序的。
- PriorityBlockingQueue:類似于LinkedBlockingQueue,但其所含對象的排序不是FIFO,而是依據(jù)對象的自然排序順序或者是構造函數(shù)所帶的Comparator決定的順序。
- SynchronousQueue:特殊的BlockingQueue,它的內(nèi)部同時只能夠容納單個元素,對其的操作必須是放和取交替完成的。
- DelayQueue:延遲隊列,注入其中的元素必須實現(xiàn) java.util.concurrent.Delayed 接口
所有BlockingQueue的使用方式類似,以下例子一個線程寫入,一個線程讀取,操作的是同一個Queue:
public class BlockingQueueDemo {
public static void main(String[] args) {
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
//讀線程
new Thread(() -> {
int i =0;
while (true) {
try {
String item = queue.take();
System.out.print(Thread.currentThread().getName() + ": " + i + " ");
System.out.println(item);
i++;
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
//寫線程
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
String item = "go"+i;
System.out.print(Thread.currentThread().getName() + ": " + i + " ");
System.out.println(item);
queue.put(item);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}到此這篇關于Java線程間通訊的幾種方法小結的文章就介紹到這了,更多相關Java線程間通訊內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Jmeter自定義函數(shù)base64加密實現(xiàn)過程解析
這篇文章主要介紹了Jmeter自定義函數(shù)base64加密實現(xiàn)過程解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-07-07
SpringBoot如何整合redis實現(xiàn)過期key監(jiān)聽事件
這篇文章主要介紹了SpringBoot如何整合redis實現(xiàn)過期key監(jiān)聽事件,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-09-09
java ExecutorService CompletionService線程池區(qū)別與選擇
這篇文章主要為大家介紹了java ExecutorService CompletionService線程池區(qū)別與選擇使用示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-09-09
使用springmvc的controller層獲取到請求的數(shù)據(jù)方式
這篇文章主要介紹了使用springmvc的controller層獲取到請求的數(shù)據(jù)方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-08-08
Java8 中使用Stream 讓List 轉 Map使用問題小結
這篇文章主要介紹了Java8 中使用Stream 讓List 轉 Map使用總結,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-06-06
Java NumberFormat格式化float類型的bug
今天小編就為大家分享一篇關于Java NumberFormat格式化float類型的bug,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2018-10-10
IntelliJ IDEA 2020最新激活碼(親測有效,可激活至 2089 年
這篇文章主要介紹了IntelliJ IDEA 2021最新激活碼(親測有效,可激活至 2089 年),非常不錯,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-04-04

