淺談java線程中生產(chǎn)者與消費(fèi)者的問(wèn)題
一、概念
生產(chǎn)者與消費(fèi)者問(wèn)題是一個(gè)金典的多線程協(xié)作的問(wèn)題.生產(chǎn)者負(fù)責(zé)生產(chǎn)產(chǎn)品,并將產(chǎn)品存放到倉(cāng)庫(kù);消費(fèi)者從倉(cāng)庫(kù)中獲取產(chǎn)品并消費(fèi)。當(dāng)倉(cāng)庫(kù)滿時(shí),生產(chǎn)者必須停止生產(chǎn),直到倉(cāng)庫(kù)有位置存放產(chǎn)品;當(dāng)倉(cāng)庫(kù)空時(shí),消費(fèi)者必須停止消費(fèi),直到倉(cāng)庫(kù)中有產(chǎn)品。
解決生產(chǎn)者/消費(fèi)者問(wèn)題主要用到如下幾個(gè)技術(shù):1.用線程模擬生產(chǎn)者,在run方法中不斷地往倉(cāng)庫(kù)中存放產(chǎn)品。2.用線程模擬消費(fèi)者,在run方法中不斷地從倉(cāng)庫(kù)中獲取產(chǎn)品。3
. 倉(cāng)庫(kù)類保存產(chǎn)品,當(dāng)產(chǎn)品數(shù)量為0時(shí),調(diào)用wait方法,使得當(dāng)前消費(fèi)者線程進(jìn)入等待狀態(tài),當(dāng)有新產(chǎn)品存入時(shí),調(diào)用notify方法,喚醒等待的消費(fèi)者線程。當(dāng)倉(cāng)庫(kù)滿時(shí),調(diào)用wait方法,使得當(dāng)前生產(chǎn)者線程進(jìn)入等待狀態(tài),當(dāng)有消費(fèi)者獲取產(chǎn)品時(shí),調(diào)用notify方法,喚醒等待的生產(chǎn)者線程。
二、實(shí)例
package book.thread.product; public class Consumer extends Thread{ private Warehouse warehouse;//消費(fèi)者獲取產(chǎn)品的倉(cāng)庫(kù) private boolean running = false;//是否需要結(jié)束線程的標(biāo)志位 public Consumer(Warehouse warehouse,String name){ super(name); this.warehouse = warehouse; } public void start(){ this.running = true; super.start(); } public void run(){ Product product; try { while(running){ //從倉(cāng)庫(kù)中獲取產(chǎn)品 product = warehouse.getProduct(); sleep(500); } } catch (InterruptedException e) { e.printStackTrace(); } } //停止消費(fèi)者線程 public void stopConsumer(){ synchronized(warehouse){ this.running = false; warehouse.notifyAll();//通知等待倉(cāng)庫(kù)的線程 } } //消費(fèi)者線程是否在運(yùn)行 public boolean isRunning(){ return running; } } package book.thread.product; public class Producer extends Thread{ private Warehouse warehouse;//生產(chǎn)者存儲(chǔ)產(chǎn)品的倉(cāng)庫(kù) private static int produceName = 0;//產(chǎn)品的名字 private boolean running = false;//是否需要結(jié)束線程的標(biāo)志位 public Producer(Warehouse warehouse,String name){ super(name); this.warehouse = warehouse; } public void start(){ this.running = true; super.start(); } public void run(){ Product product; //生產(chǎn)并存儲(chǔ)產(chǎn)品 try { while(running){ product = new Product((++produceName)+""); this.warehouse.storageProduct(product); sleep(300); } } catch (InterruptedException e) { e.printStackTrace(); } } //停止生產(chǎn)者線程 public void stopProducer(){ synchronized(warehouse){ this.running = false; //通知等待倉(cāng)庫(kù)的線程 warehouse.notifyAll(); } } //生產(chǎn)者線程是否在運(yùn)行 public boolean isRunning(){ return running; } } package book.thread.product; public class Product { private String name;//產(chǎn)品名 public Product(String name){ this.name = name; } public String toString(){ return "Product-"+name; } } package book.thread.product; //產(chǎn)品的倉(cāng)庫(kù)類,內(nèi)部采用數(shù)組來(lái)表示循環(huán)隊(duì)列,以存放產(chǎn)品 public class Warehouse { private static int CAPACITY = 11;//倉(cāng)庫(kù)的容量 private Product[] products;//倉(cāng)庫(kù)里的產(chǎn)品 //[front,rear]區(qū)間的產(chǎn)品未被消費(fèi) private int front = 0;//當(dāng)前倉(cāng)庫(kù)中第一個(gè)未被消費(fèi)的產(chǎn)品的下標(biāo) private int rear = 0;//倉(cāng)庫(kù)中最后一個(gè)未被消費(fèi)的產(chǎn)品下標(biāo)加1 public Warehouse(){ this.products = new Product[CAPACITY]; } public Warehouse(int capacity){ this(); if(capacity > 0){ CAPACITY = capacity +1; this.products = new Product[CAPACITY]; } } //從倉(cāng)庫(kù)獲取一個(gè)產(chǎn)品 public Product getProduct() throws InterruptedException{ synchronized(this){ boolean consumerRunning = true;//標(biāo)志消費(fèi)者線程是否還在運(yùn)行 Thread currentThread = Thread.currentThread();//獲取當(dāng)前線程 if(currentThread instanceof Consumer){ consumerRunning = ((Consumer)currentThread).isRunning(); }else{ return null;//非消費(fèi)者不能獲取產(chǎn)品 } //若消費(fèi)者線程在運(yùn)行中,但倉(cāng)庫(kù)中沒(méi)有產(chǎn)品了,則消費(fèi)者線程繼續(xù)等待 while((front==rear) && consumerRunning){ wait(); consumerRunning = ((Consumer)currentThread).isRunning(); } //如果消費(fèi)者線程已經(jīng)停止運(yùn)行,則退出該方法,取消獲取產(chǎn)品 if(!consumerRunning){ return null; } //獲取當(dāng)前未被消費(fèi)的第一個(gè)產(chǎn)品 Product product = products[front]; System.out.println("Consumer[" + currentThread.getName()+"] getProduct:"+product); //將當(dāng)前未被消費(fèi)產(chǎn)品的下標(biāo)后移一位,如果到了數(shù)組末尾,則移動(dòng)到首部 front = (front+1+CAPACITY)%CAPACITY; System.out.println("倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:"+(rear+CAPACITY-front)%CAPACITY); //通知其他等待線程 notify(); return product; } } //向倉(cāng)庫(kù)存儲(chǔ)一個(gè)產(chǎn)品 public void storageProduct(Product product) throws InterruptedException{ synchronized(this){ boolean producerRunning = true;//標(biāo)志生產(chǎn)者線程是否在運(yùn)行 Thread currentThread = Thread.currentThread(); if(currentThread instanceof Producer){ producerRunning = ((Producer)currentThread).isRunning(); }else{ return; } //如果最后一個(gè)未被消費(fèi)的產(chǎn)品與第一個(gè)未被消費(fèi)的產(chǎn)品的下標(biāo)緊挨著,則說(shuō)明沒(méi)有存儲(chǔ)空間了。 //如果沒(méi)有存儲(chǔ)空間了,而生產(chǎn)者線程還在運(yùn)行,則生產(chǎn)者線程等待倉(cāng)庫(kù)釋放產(chǎn)品 while(((rear+1)%CAPACITY == front) && producerRunning){ wait(); producerRunning = ((Producer)currentThread).isRunning(); } //如果生產(chǎn)線程已經(jīng)停止運(yùn)行了,則停止產(chǎn)品的存儲(chǔ) if(!producerRunning){ return; } //保存產(chǎn)品到倉(cāng)庫(kù) products[rear] = product; System.out.println("Producer[" + Thread.currentThread().getName()+"] storageProduct:" + product); //將rear下標(biāo)循環(huán)后移一位 rear = (rear + 1)%CAPACITY; System.out.println("倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:"+(rear + CAPACITY -front)%CAPACITY); notify(); } } } package book.thread.product; public class TestProduct { public static void main(String[] args) { Warehouse warehouse = new Warehouse(10);//建立一個(gè)倉(cāng)庫(kù),容量為10 //建立生產(chǎn)者線程和消費(fèi)者 Producer producers1 = new Producer(warehouse,"producer-1"); Producer producers2 = new Producer(warehouse,"producer-2"); Producer producers3 = new Producer(warehouse,"producer-3"); Consumer consumer1 = new Consumer(warehouse,"consumer-1"); Consumer consumer2 = new Consumer(warehouse,"consumer-2"); Consumer consumer3 = new Consumer(warehouse,"consumer-3"); Consumer consumer4 = new Consumer(warehouse,"consumer-4"); //啟動(dòng)生產(chǎn)者線程和消費(fèi)者線程 producers1.start(); producers2.start(); consumer1.start(); producers3.start(); consumer2.start(); consumer3.start(); consumer4.start(); //讓生產(chǎn)者/消費(fèi)者程序運(yùn)行1600ms try { Thread.sleep(1600); } catch (InterruptedException e) { e.printStackTrace(); } //停止消費(fèi)者線程 producers1.stopProducer(); consumer1.stopConsumer(); producers2.stopProducer(); consumer2.stopConsumer(); producers3.stopProducer(); consumer3.stopConsumer(); consumer4.stopConsumer(); } }
輸出結(jié)果:
Producer[producer-1] storageProduct:Product-1 倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:1 Consumer[consumer-2] getProduct:Product-1 倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:0 Producer[producer-3] storageProduct:Product-3 倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:1 Producer[producer-2] storageProduct:Product-2 倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:2 Consumer[consumer-3] getProduct:Product-3 倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:1 Consumer[consumer-1] getProduct:Product-2 倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:0 Producer[producer-1] storageProduct:Product-4 倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:1 Consumer[consumer-4] getProduct:Product-4 倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:0 Producer[producer-3] storageProduct:Product-6 倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:1 Producer[producer-2] storageProduct:Product-5 倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:2 Consumer[consumer-1] getProduct:Product-6 倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:1 Consumer[consumer-2] getProduct:Product-5 倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:0 Producer[producer-1] storageProduct:Product-7 倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:1 Consumer[consumer-3] getProduct:Product-7 倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:0 Producer[producer-3] storageProduct:Product-8 倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:1 Producer[producer-2] storageProduct:Product-9 倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:2 Consumer[consumer-4] getProduct:Product-8 倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:1 Producer[producer-1] storageProduct:Product-10 倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:2 Producer[producer-3] storageProduct:Product-11 倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:3 Producer[producer-2] storageProduct:Product-12 倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:4 Consumer[consumer-1] getProduct:Product-9 倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:3 Consumer[consumer-2] getProduct:Product-10 倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:2 Consumer[consumer-3] getProduct:Product-11 倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:1 Producer[producer-3] storageProduct:Product-13 倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:2 Producer[producer-1] storageProduct:Product-14 倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:3 Producer[producer-2] storageProduct:Product-15 倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:4 Consumer[consumer-4] getProduct:Product-12 倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:3 Consumer[consumer-1] getProduct:Product-13 倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:2 Consumer[consumer-2] getProduct:Product-14 倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:1 Producer[producer-1] storageProduct:Product-16 倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:2 Producer[producer-3] storageProduct:Product-17 倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:3 Producer[producer-2] storageProduct:Product-18 倉(cāng)庫(kù)中還沒(méi)有被消費(fèi)的產(chǎn)品數(shù)量:4
分析:在main方法中建立了一個(gè)產(chǎn)品倉(cāng)庫(kù),并未該倉(cāng)庫(kù)關(guān)聯(lián)了3個(gè)生產(chǎn)者線程和4個(gè)消費(fèi)者線程,啟動(dòng)這些線程,使生產(chǎn) 者/消費(fèi)者模型運(yùn)作起來(lái),當(dāng)程序運(yùn)行1600ms時(shí),所有的生產(chǎn)者停止生產(chǎn)產(chǎn)品,消費(fèi)者停止消費(fèi)產(chǎn)品。
生產(chǎn)者線程Product在run方法中沒(méi)300ms便生產(chǎn)一個(gè)產(chǎn)品,并存入倉(cāng)庫(kù);消費(fèi)者線程Consumer在run方法中沒(méi)500ms便從倉(cāng)庫(kù)中取一個(gè)產(chǎn)品。
倉(cāng)庫(kù)類Warehouse負(fù)責(zé)存放產(chǎn)品和發(fā)放產(chǎn)品。storageProduct方法負(fù)責(zé)存儲(chǔ)產(chǎn)品,當(dāng)倉(cāng)庫(kù)滿時(shí),當(dāng)前線程進(jìn)入等待狀態(tài),即如果生產(chǎn)者線程A在調(diào)用storageProduct方法以存儲(chǔ)產(chǎn)品時(shí),發(fā)現(xiàn)倉(cāng)庫(kù)已滿,無(wú)法存儲(chǔ)時(shí),便會(huì)進(jìn)入等待狀態(tài)。當(dāng)存儲(chǔ)產(chǎn)品成功時(shí),調(diào)用notify方法,喚醒等待的消費(fèi)者線程。
getProduct方法負(fù)責(zé)提前產(chǎn)品,當(dāng)倉(cāng)庫(kù)空時(shí),當(dāng)前線程進(jìn)入等待狀態(tài),即如果消費(fèi)者線程B在調(diào)用getProduct方法以獲取產(chǎn)品時(shí),發(fā)現(xiàn)倉(cāng)庫(kù)空了,便會(huì)進(jìn)入等待狀態(tài)。當(dāng)提取產(chǎn)品成功時(shí),調(diào)用notify方法,喚醒等待的生產(chǎn)者線程。
以上這篇淺談java線程中生產(chǎn)者與消費(fèi)者的問(wèn)題就是小編分享給大家的全部?jī)?nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
- Java多線程并發(fā)生產(chǎn)者消費(fèi)者設(shè)計(jì)模式實(shí)例解析
- Java多線程生產(chǎn)者消費(fèi)者模式實(shí)現(xiàn)過(guò)程解析
- Java多線程 生產(chǎn)者消費(fèi)者模型實(shí)例詳解
- Java多線程 BlockingQueue實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型詳解
- Java多線程之線程通信生產(chǎn)者消費(fèi)者模式及等待喚醒機(jī)制代碼詳解
- java 中多線程生產(chǎn)者消費(fèi)者問(wèn)題詳細(xì)介紹
- JAVA多線程實(shí)現(xiàn)生產(chǎn)者消費(fèi)者的實(shí)例詳解
- java多線程解決生產(chǎn)者消費(fèi)者問(wèn)題
- JAVA生產(chǎn)者消費(fèi)者(線程同步)代碼學(xué)習(xí)示例
- Java如何通過(guò)線程解決生產(chǎn)者/消費(fèi)者問(wèn)題
相關(guān)文章
java計(jì)算值所占的百分比,結(jié)果為100%問(wèn)題
這篇文章主要介紹了java計(jì)算值所占的百分比,結(jié)果為100%問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-11-11springmvc數(shù)據(jù)的封裝過(guò)程詳解
這篇文章主要介紹了springmvc數(shù)據(jù)的封裝過(guò)程詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-09-09一文詳解Java如何系統(tǒng)地避免空指針問(wèn)題
新手Java開(kāi)發(fā)總是經(jīng)常空指針檢查,甚至某些老手也會(huì)犯這樣的問(wèn)題,所以這篇文章小編就帶大家一起來(lái)看看如何系統(tǒng)地避免空指針問(wèn)題,希望對(duì)大家有所幫助2024-01-01Spring?boot?RedisTemplate?序列化服務(wù)化配置方式
這篇文章主要介紹了Springboot?RedisTemplate序列化服務(wù)化配置方式,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-07-07ElasticSearch創(chuàng)建后索引修改數(shù)據(jù)類型方法步驟
Elasticsearch存儲(chǔ)數(shù)據(jù)之前需要先創(chuàng)建索引,類似于結(jié)構(gòu)型數(shù)據(jù)庫(kù)建庫(kù)建表,創(chuàng)建索引時(shí)定義了每個(gè)字段的索引方式和數(shù)據(jù)類型,這篇文章主要給大家介紹了關(guān)于ElasticSearch創(chuàng)建后索引修改數(shù)據(jù)類型的方法步驟,需要的朋友可以參考下2023-09-09使用JAVA實(shí)現(xiàn)郵件發(fā)送功能的圖文教程
郵件發(fā)送其實(shí)是一個(gè)非常常見(jiàn)的需求,用戶注冊(cè),找回密碼等地方,都會(huì)用到,下面這篇文章主要給大家介紹了關(guān)于使用JAVA實(shí)現(xiàn)郵件發(fā)送功能的相關(guān)資料,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2022-06-06