java wait()/notify() 實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模式詳解
java wait()/notify() 實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模式
java中的多線程會(huì)涉及到線程間通信,常見的線程通信方式,例如共享變量、管道流等,這里我們要實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模式,也需要涉及到線程通信,不過這里我們用到了java中的wait()、notify()方法:
wait():進(jìn)入臨界區(qū)的線程在運(yùn)行到一部分后,發(fā)現(xiàn)進(jìn)行后面的任務(wù)所需的資源還沒有準(zhǔn)備充分,所以調(diào)用wait()方法,讓線程阻塞,等待資源,同時(shí)釋放臨界區(qū)的鎖,此時(shí)線程的狀態(tài)也從RUNNABLE狀態(tài)變?yōu)閃AITING狀態(tài);
notify():準(zhǔn)備資源的線程在準(zhǔn)備好資源后,調(diào)用notify()方法通知需要使用資源的線程,同時(shí)釋放臨界區(qū)的鎖,將臨界區(qū)的鎖交給使用資源的線程。
wait()、notify()這兩個(gè)方法,都必須要在臨界區(qū)中調(diào)用,即是在synchronized同步塊中調(diào)用,不然會(huì)拋出IllegalMonitorStateException的異常。
實(shí)現(xiàn)源碼:
生產(chǎn)者線程類:
package threads;
import java.util.List;
import java.util.UUID;
public class Producer extends Thread{
private List<String> storage;//生產(chǎn)者倉庫
public Producer(List<String> storage) {
this.storage = storage;
}
public void run(){
//生產(chǎn)者每隔1s生產(chǎn)1~100消息
long oldTime = System.currentTimeMillis();
while(true){
synchronized(storage){
if (System.currentTimeMillis() - oldTime >= 1000) {
oldTime = System.currentTimeMillis();
int size = (int)(Math.random()*100) + 1;
for (int i = 0; i < size; i++) {
String msg = UUID.randomUUID().toString();
storage.add(msg);
}
System.out.println("線程"+this.getName()+"生產(chǎn)消息"+size+"條");
storage.notify();
}
}
}
}
}
消費(fèi)者線程類:
package threads;
import java.util.List;
public class Consumer extends Thread{
private List<String> storage;//倉庫
public Consumer(List<String> storage) {
this.storage = storage;
}
public void run(){
while(true){
synchronized(storage){
//消費(fèi)者去倉庫拿消息的時(shí)候,如果發(fā)現(xiàn)倉庫數(shù)據(jù)為空,則等待
if (storage.isEmpty()) {
try {
storage.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
int size = storage.size();
for (int i = size - 1; i >= 0; i--) {
storage.remove(i);
}
System.out.println("線程"+this.getName()+"成功消費(fèi)"+size+"條消息");
}
}
}
}
倉庫類:
package threads;
import java.util.ArrayList;
import java.util.List;
public class Storage {
private List<String> storage;//生產(chǎn)者和消費(fèi)者共享的倉庫
public Storage() {
storage = new ArrayList<String>();
}
public List<String> getStorage() {
return storage;
}
public void setStorage(List<String> storage) {
this.storage = storage;
}
}
main方法類:
package threads;
public class App {
public static void main(String[] args) {
Storage storage = new Storage();
Producer producer = new Producer(storage.getStorage());
Consumer consumer = new Consumer(storage.getStorage());
producer.start();
consumer.start();
}
}
生產(chǎn)消費(fèi)效果:

Wait/Notify通知機(jī)制解析
前言
我們知道,java的wait/notify的通知機(jī)制可以用來實(shí)現(xiàn)線程間通信。wait表示線程的等待,調(diào)用該方法會(huì)導(dǎo)致線程阻塞,直至另一線程調(diào)用notify或notifyAll方法才可另其繼續(xù)執(zhí)行。經(jīng)典的生產(chǎn)者、消費(fèi)者模式即是使用wait/notify機(jī)制得以完成。在這篇文章中,我們將深入解析這一機(jī)制,了解其背后的原理。
線程的狀態(tài)
在了解wait/notify機(jī)制前,先熟悉一下java線程的幾個(gè)生命周期。分別為初始(NEW)、運(yùn)行(RUNNABLE)、阻塞(BLOCKED)、等待(WAITING)、超時(shí)等待(TIMED_WAITING)、終止(TERMINATED)等狀態(tài)(位于java.lang.Thread.State枚舉類中)。
以下是對(duì)這幾個(gè)狀態(tài)的簡要說明,詳細(xì)說明見該類注釋。
| 狀態(tài)名稱 | 說明 |
|---|---|
| NEW | 初始狀態(tài),線程被構(gòu)建,但未調(diào)用start()方法 |
| RUNNABLE | 運(yùn)行狀態(tài),調(diào)用start()方法后。在java線程中,將操作系統(tǒng)線程的就緒和運(yùn)行統(tǒng)稱運(yùn)行狀態(tài) |
| BLOCKED | 阻塞狀態(tài),線程等待進(jìn)入synchronized代碼塊或方法中,等待獲取鎖 |
| WAITING | 等待狀態(tài),線程可調(diào)用wait、join等操作使自己陷入等待狀態(tài),并等待其他線程做出特定操作(如notify或中斷) |
| TIMED_WAITING | 超時(shí)等待,線程調(diào)用sleep(timeout)、wait(timeout)等操作進(jìn)入超時(shí)等待狀態(tài),超時(shí)后自行返回 |
| TERMINATED | 終止?fàn)顟B(tài),線程運(yùn)行結(jié)束 |
對(duì)于以上線程間的狀態(tài)及轉(zhuǎn)化關(guān)系,我們需要知道
- WAITING(等待狀態(tài))和TIMED_WAITING(超時(shí)等待)都會(huì)令線程進(jìn)入等待狀態(tài),不同的是TIMED_WAITING會(huì)在超時(shí)后自行返回,而WAITING則需要等待至條件改變。
- 進(jìn)入阻塞狀態(tài)的唯一前提是在等待獲取同步鎖。java注釋說的很明白,只有兩種情況可以使線程進(jìn)入阻塞狀態(tài):一是等待進(jìn)入synchronized塊或方法,另一個(gè)是在調(diào)用wait()方法后重新進(jìn)入synchronized塊或方法。下文會(huì)有詳細(xì)解釋。
- Lock類對(duì)于鎖的實(shí)現(xiàn)不會(huì)令線程進(jìn)入阻塞狀態(tài),Lock底層調(diào)用LockSupport.park()方法,使線程進(jìn)入的是等待狀態(tài)。
wait/notify用例
讓我們先通過一個(gè)示例解析
wait()方法可以使線程進(jìn)入等待狀態(tài),而notify()可以使等待的狀態(tài)喚醒。這樣的同步機(jī)制十分適合生產(chǎn)者、消費(fèi)者模式:消費(fèi)者消費(fèi)某個(gè)資源,而生產(chǎn)者生產(chǎn)該資源。當(dāng)該資源缺失時(shí),消費(fèi)者調(diào)用wait()方法進(jìn)行自我阻塞,等待生產(chǎn)者的生產(chǎn);生產(chǎn)者生產(chǎn)完畢后調(diào)用notify/notifyAll()喚醒消費(fèi)者進(jìn)行消費(fèi)。
以下是代碼示例,其中flag標(biāo)志表示資源的有無。
public class ThreadTest {
static final Object obj = new Object();
private static boolean flag = false;
public static void main(String[] args) throws Exception {
Thread consume = new Thread(new Consume(), "Consume");
Thread produce = new Thread(new Produce(), "Produce");
consume.start();
Thread.sleep(1000);
produce.start();
try {
produce.join();
consume.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 生產(chǎn)者線程
static class Produce implements Runnable {
@Override
public void run() {
synchronized (obj) {
System.out.println("進(jìn)入生產(chǎn)者線程");
System.out.println("生產(chǎn)");
try {
TimeUnit.MILLISECONDS.sleep(2000); //模擬生產(chǎn)過程
flag = true;
obj.notify(); //通知消費(fèi)者
TimeUnit.MILLISECONDS.sleep(1000); //模擬其他耗時(shí)操作
System.out.println("退出生產(chǎn)者線程");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//消費(fèi)者線程
static class Consume implements Runnable {
@Override
public void run() {
synchronized (obj) {
System.out.println("進(jìn)入消費(fèi)者線程");
System.out.println("wait flag 1:" + flag);
while (!flag) { //判斷條件是否滿足,若不滿足則等待
try {
System.out.println("還沒生產(chǎn),進(jìn)入等待");
obj.wait();
System.out.println("結(jié)束等待");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("wait flag 2:" + flag);
System.out.println("消費(fèi)");
System.out.println("退出消費(fèi)者線程");
}
}
}
}
輸出結(jié)果為:
進(jìn)入消費(fèi)者線程
wait flag 1:false
還沒生產(chǎn),進(jìn)入等待
進(jìn)入生產(chǎn)者線程
生產(chǎn)
退出生產(chǎn)者線程
結(jié)束等待
wait flag 2:true
消費(fèi)
退出消費(fèi)者線程
理解了輸出結(jié)果的順序,也就明白了wait/notify的基本用法。有以下幾點(diǎn)需要知道:
- 在示例中沒有體現(xiàn)但很重要的是,wait/notify方法的調(diào)用必須處在該對(duì)象的鎖(Monitor)中,也即,在調(diào)用這些方法時(shí)首先需要獲得該對(duì)象的鎖。否則會(huì)爬出IllegalMonitorStateException異常。
- 從輸出結(jié)果來看,在生產(chǎn)者調(diào)用notify()后,消費(fèi)者并沒有立即被喚醒,而是等到生產(chǎn)者退出同步塊后才喚醒執(zhí)行。(這點(diǎn)其實(shí)也好理解,synchronized同步方法(塊)同一時(shí)刻只允許一個(gè)線程在里面,生產(chǎn)者不退出,消費(fèi)者也進(jìn)不去)
- 注意,消費(fèi)者被喚醒后是從wait()方法(被阻塞的地方)后面執(zhí)行,而不是重新從同步塊開頭。
深入了解
這一節(jié)我們探討wait/notify與線程狀態(tài)之間的關(guān)系。深入了解線程的生命周期。
由前面線程的狀態(tài)轉(zhuǎn)化圖可知,當(dāng)調(diào)用wait()方法后,線程會(huì)進(jìn)入WAITING(等待狀態(tài)),后續(xù)被notify()后,并沒有立即被執(zhí)行,而是進(jìn)入等待獲取鎖的阻塞隊(duì)列。
對(duì)于每個(gè)對(duì)象來說,都有自己的等待隊(duì)列和阻塞隊(duì)列。以前面的生產(chǎn)者、消費(fèi)者為例,我們拿obj對(duì)象作為對(duì)象鎖,配合圖示。內(nèi)部流程如下
- 當(dāng)線程A(消費(fèi)者)調(diào)用wait()方法后,線程A讓出鎖,自己進(jìn)入等待狀態(tài),同時(shí)加入鎖對(duì)象的等待隊(duì)列。
- 線程B(生產(chǎn)者)獲取鎖后,調(diào)用notify方法通知鎖對(duì)象的等待隊(duì)列,使得線程A從等待隊(duì)列進(jìn)入阻塞隊(duì)列。
- 線程A進(jìn)入阻塞隊(duì)列后,直至線程B釋放鎖后,線程A競爭得到鎖繼續(xù)從wait()方法后執(zhí)行。
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
SpringBoot詳細(xì)講解通過自定義classloader加密保護(hù)class文件
這篇文章主要介紹了SpringBoot通過自定義classloader加密class文件,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-04-04
Java中g(shù)etSuperclass()方法的使用與原理解讀
文章介紹了Java中的getSuperclass()方法,該方法用于獲取一個(gè)類的直接父類,通過理解其使用方式、工作原理以及實(shí)際應(yīng)用場景,可以更好地利用反射機(jī)制處理類的繼承關(guān)系,實(shí)現(xiàn)動(dòng)態(tài)類型檢查、類加載以及序列化等功能2025-01-01
java實(shí)現(xiàn)字符串和日期類型相互轉(zhuǎn)換的方法
這篇文章主要介紹了java實(shí)現(xiàn)字符串和日期類型相互轉(zhuǎn)換的方法,涉及java針對(duì)日期與字符串的轉(zhuǎn)換與運(yùn)算相關(guān)操作技巧,需要的朋友可以參考下2017-02-02
SpringBoot實(shí)現(xiàn)二維碼掃碼登錄的原理及項(xiàng)目實(shí)踐
本文主要介紹了SpringBoot實(shí)現(xiàn)二維碼掃碼登錄的原理及項(xiàng)目實(shí)踐,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-04-04
Java使用自動(dòng)化部署工具Gradle中的任務(wù)設(shè)定教程
Grandle使用同樣運(yùn)行于JVM上的Groovy語言編寫,本文會(huì)對(duì)此進(jìn)行初步夠用的講解,接下來我們就一起來看一下Java使用自動(dòng)化部署工具Gradle中的任務(wù)設(shè)定教程:2016-06-06
java累加和校驗(yàn)實(shí)現(xiàn)方式16進(jìn)制(推薦)
下面小編就為大家?guī)硪黄猨ava累加和校驗(yàn)實(shí)現(xiàn)方式16進(jìn)制(推薦)。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2016-11-11
Intellij IDEA基于Springboot的遠(yuǎn)程調(diào)試(圖文)
這篇文章主要介紹了Intellij IDEA基于Springboot的遠(yuǎn)程調(diào)試(圖文),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-10-10
Java HashMap 如何正確遍歷并刪除元素的方法小結(jié)
這篇文章主要介紹了Java HashMap 如何正確遍歷并刪除元素的方法小結(jié),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-05-05

