欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java多線程中不同條件下編寫生產(chǎn)消費(fèi)者模型方法介紹

 更新時(shí)間:2017年11月21日 10:51:49   作者:nullzx  
這篇文章主要介紹了Java多線程中不同條件下編寫生產(chǎn)消費(fèi)者模型方法介紹,介紹了生產(chǎn)消費(fèi)者模型,然后分享了相關(guān)代碼示例,具有一定參考價(jià)值,需要的朋友可以了解下。

簡(jiǎn)介:

生產(chǎn)者、消費(fèi)者模型是多線程編程的常見問題,最簡(jiǎn)單的一個(gè)生產(chǎn)者、一個(gè)消費(fèi)者線程模型大多數(shù)人都能夠?qū)懗鰜?lái),但是一旦條件發(fā)生變化,我們就很容易掉進(jìn)多線程的bug中。這篇文章主要講解了生產(chǎn)者和消費(fèi)者的數(shù)量,商品緩存位置數(shù)量,商品數(shù)量等多個(gè)條件的不同組合下,寫出正確的生產(chǎn)者消費(fèi)者模型的方法。

歡迎探討,如有錯(cuò)誤敬請(qǐng)指正

生產(chǎn)消費(fèi)者模型

    生產(chǎn)者消費(fèi)者模型具體來(lái)講,就是在一個(gè)系統(tǒng)中,存在生產(chǎn)者和消費(fèi)者兩種角色,他們通過內(nèi)存緩沖區(qū)進(jìn)行通信,生產(chǎn)者生產(chǎn)消費(fèi)者需要的資料,消費(fèi)者把資料做成產(chǎn)品。生產(chǎn)消費(fèi)者模式如下圖。

定義商品類

package demo;
/*定義商品*/
public class Goods {
	public final String name;
	public final int price;
	public final int id;
	public Goods(String name, int price, int id){
		this.name = name;
		/*類型*/
		this.price = price;
		/*價(jià)格*/
		this.id = id;
		/*商品序列號(hào)*/
	}
	@Override
	  public String toString(){
		return "name: " + name + ",  price:"+ price + ",  id: " + id;
	}
}

基本要求:

1)生產(chǎn)者不能重復(fù)生產(chǎn)一個(gè)商品,也就是說不能有兩個(gè)id相同的商品

2)生產(chǎn)者不能覆蓋一個(gè)商品(當(dāng)前商品還未被消費(fèi),就被下一個(gè)新商品覆蓋)。也就是說消費(fèi)商品時(shí),商品的id屬性可以不連續(xù),但不能出現(xiàn)缺號(hào)的情況

3)消費(fèi)者不能重復(fù)消費(fèi)一個(gè)商品

1.生產(chǎn)者線程無(wú)線生產(chǎn),消費(fèi)者線程無(wú)限消費(fèi)的模式

1.1使用線程對(duì)象,一個(gè)生產(chǎn)者線程,一個(gè)消費(fèi)者線程,一個(gè)商品存儲(chǔ)位置

package demo;
import java.util.Random;
/*使用線程對(duì)象,一個(gè)緩存位置,一個(gè)生產(chǎn)者,一個(gè)消費(fèi)者,無(wú)限生產(chǎn)商品消費(fèi)商品*/
public class ProducterComsumerDemo1 {
	/*定義一個(gè)商品緩存位置*/
	private volatile Goods goods;
	/*定義一個(gè)對(duì)象作為鎖,不使用goods作為鎖是因?yàn)樯a(chǎn)者每次會(huì)產(chǎn)生一個(gè)新的對(duì)象*/
	private Object obj = new Object();
	/*isFull == true 生產(chǎn)者線程休息,消費(fèi)者線程消費(fèi) 
   *isFull == false 消費(fèi)者線程休息,生產(chǎn)者線程生產(chǎn)*/
	private volatile Boolean isFull = false;
	/*商品的id編號(hào),生產(chǎn)者制造的每個(gè)商品的id都不一樣,每生產(chǎn)一個(gè)id自增1*/
	private int id = 1;
	/*隨機(jī)產(chǎn)生一個(gè)sleep時(shí)間*/
	private Random rnd = new Random();
	/*=================定義消費(fèi)者線程==================*/
	public class ComsumeThread implements Runnable{
		@Override
		    public void run(){
			try{
				while(true){
					/*獲取obj對(duì)象的鎖, id 和 isFull 的操作都在同步代碼塊中*/
					synchronized(obj){
						if(!isFull){
							/*wait方法使當(dāng)前線程阻塞,并釋放鎖*/
							obj.wait();
						}
						/*隨機(jī)延時(shí)一段時(shí)間*/
						Thread.sleep(rnd.nextint(250));
						/*模擬消費(fèi)商品*/
						System.out.println(goods);
						/*隨機(jī)延時(shí)一段時(shí)間*/
						Thread.sleep(rnd.nextint(250));
						isFull = false;
						/*喚醒阻塞obj上的生產(chǎn)者線程*/
						obj.notify();
					}
					/*隨機(jī)延時(shí)一段時(shí)間*/
					Thread.sleep(rnd.nextint(250));
				}
			}
			catch (InterruptedException e){
				/*什么都不做*/
			}
		}
	}
	/*=================定義生產(chǎn)者線程==================*/
	public class ProductThread implements Runnable{
		@Override
		    public void run(){
			try {
				while(true){
					synchronized(obj){
						if(isFull){
							obj.wait();
						}
						Thread.sleep(rnd.nextint(500));
						/*如果id為偶數(shù),生產(chǎn)價(jià)格為2的產(chǎn)品A
             *如果id為奇數(shù),生產(chǎn)價(jià)格為1的產(chǎn)品B*/
						if(id % 2 == 0){
							goods = new Goods("A", 2, id);
						} else{
							goods = new Goods("B", 1, id);
						}
						Thread.sleep(rnd.nextint(250));
						id++;
						isFull = true;
						/*喚醒阻塞的消費(fèi)者線程*/
						obj.notify();
					}
				}
			}
			catch (InterruptedException e) {
				/*什么都不做*/
			}
		}
	}
	public static void main(String[] args) throws InterruptedException{
		ProducterComsumerDemo1 pcd = new ProducterComsumerDemo1();
		Runnable c = pcd.new ComsumeThread();
		Runnable p = pcd.new ProductThread();
		new Thread(p).start();
		new Thread(c).start();
	}
}

運(yùn)行結(jié)果

name: B,  price:1,  id: 1
name: A,  price:2,  id: 2
name: B,  price:1,  id: 3
name: A,  price:2,  id: 4
name: B,  price:1,  id: 5
name: A,  price:2,  id: 6
name: B,  price:1,  id: 7
name: A,  price:2,  id: 8
name: B,  price:1,  id: 9
name: A,  price:2,  id: 10
name: B,  price:1,  id: 11
name: A,  price:2,  id: 12
name: B,  price:1,  id: 13
……

從結(jié)果看出,商品類型交替生產(chǎn),每個(gè)商品的id都不相同,且不會(huì)漏過任何一個(gè)id,生產(chǎn)者沒有重復(fù)生產(chǎn),消費(fèi)者沒有重復(fù)消費(fèi),結(jié)果完全正確。

1.2.使用線程對(duì)象,多個(gè)生產(chǎn)者線程,多個(gè)消費(fèi)者線程,1個(gè)緩存位置

1.2.1一個(gè)經(jīng)典的bug

對(duì)于多生產(chǎn)者,多消費(fèi)者這個(gè)問題,看起來(lái)我們似乎不用修改代碼,只需在main方法中多添加幾個(gè)線程就好。假設(shè)我們需要三個(gè)消費(fèi)者,一個(gè)生產(chǎn)者,那么我們只需要在main方法中再添加兩個(gè)消費(fèi)者線程。

public static void main(String[] args) throws InterruptedException{
	ProducterComsumerDemo1 pcd = new ProducterComsumerDemo1();
	Runnable c = pcd.new ComsumeThread();
	Runnable p = pcd.new ProductThread();
	new Thread(c).start();
	new Thread(p).start();
	new Thread(c).start();
	new Thread(c).start();
}

運(yùn)行結(jié)果

name: B,  price:1,  id: 1
name: A,  price:2,  id: 2
name: A,  price:2,  id: 2
name: B,  price:1,  id: 3
name: B,  price:1,  id: 3
name: A,  price:2,  id: 4
name: A,  price:2,  id: 4
name: B,  price:1,  id: 5
name: B,  price:1,  id: 5
name: A,  price:2,  id: 6
……

從結(jié)果中,我們發(fā)現(xiàn)消費(fèi)者重復(fù)消費(fèi)了商品,所以這樣做顯然是錯(cuò)誤的。這里我們定義多個(gè)消費(fèi)者,一個(gè)生產(chǎn)者,所以遇到了重復(fù)消費(fèi)的問題,如果定義成一個(gè)消費(fèi)者,多個(gè)生產(chǎn)者就會(huì)遇到id覆蓋的問題。如果我們定義多個(gè)消費(fèi)者,多個(gè)生產(chǎn)者,那么即會(huì)遇到重復(fù)消費(fèi),也會(huì)遇到id覆蓋的問題。注意,上面的代碼使用的notifyAll喚醒方法,如果使用notify方法喚醒bug仍然可能發(fā)生。

現(xiàn)在我們來(lái)分析一下原因。當(dāng)生產(chǎn)者生產(chǎn)好了商品,會(huì)喚醒因沒有商品而阻塞消費(fèi)者線程,假設(shè)喚醒的消費(fèi)者線程超過兩個(gè),這兩個(gè)線程會(huì)競(jìng)爭(zhēng)獲取鎖,獲取到鎖的線程就會(huì)從obj.wait()方法中返回,然后消費(fèi)商品,并把isFull置為false,然后釋放鎖。當(dāng)被喚醒的另一個(gè)線程競(jìng)爭(zhēng)獲取到鎖了以后也會(huì)從obj.wait()方法中返回。會(huì)再次消費(fèi)同一個(gè)商品。顯然,每一個(gè)被喚醒的線程應(yīng)該再次檢查isFull這個(gè)條件。所以無(wú)論是消費(fèi)者,還是生產(chǎn)者,isFull的判斷必須改成while循環(huán),這樣才能得到正確的結(jié)果而不受生產(chǎn)者的線程數(shù)和消費(fèi)者的線程數(shù)的影響。

而對(duì)于只有一個(gè)生產(chǎn)者線程,一個(gè)消費(fèi)者線程,用if判斷是沒有問題的,但是仍然強(qiáng)烈建議改成while語(yǔ)句進(jìn)行判斷。

1.2.2正確的姿勢(shì)

package demo;
import java.util.Random;
/*使用線程對(duì)象,一個(gè)緩存位置,一個(gè)生產(chǎn)者,一個(gè)消費(fèi)者,無(wú)限生產(chǎn)商品消費(fèi)商品*/
public class ProducterComsumerDemo1 {
	/*定義一個(gè)商品緩存位置*/
	private volatile Goods goods;
	/*定義一個(gè)對(duì)象作為鎖,不使用goods作為鎖是因?yàn)樯a(chǎn)者每次會(huì)產(chǎn)生一個(gè)新的對(duì)象*/
	private Object obj = new Object();
	/*isFull == true 生產(chǎn)者線程休息,消費(fèi)者線程消費(fèi) 
   *isFull == false 消費(fèi)者線程消費(fèi),生產(chǎn)者線程生產(chǎn)*/
	private volatile Boolean isFull = false;
	/*商品的id編號(hào),生產(chǎn)者制造的每個(gè)商品的id都不一樣,每生產(chǎn)一個(gè)id自增1*/
	private int id = 1;
	/*隨機(jī)產(chǎn)生一個(gè)sleep時(shí)間*/
	private Random rnd = new Random();
	/*=================定義消費(fèi)者線程==================*/
	public class ComsumeThread implements Runnable{
		@Override
		    public void run(){
			try{
				while(true){
					/*獲取obj對(duì)象的鎖, id 和 isFull 的操作都在同步代碼塊中*/
					synchronized(obj){
						while(!isFull){
							/*wait方法使當(dāng)前線程阻塞,并釋放鎖*/
							obj.wait();
						}
						/*隨機(jī)延時(shí)一段時(shí)間*/
						Thread.sleep(rnd.nextint(250));
						/*模擬消費(fèi)商品*/
						System.out.println(goods);
						/*隨機(jī)延時(shí)一段時(shí)間*/
						Thread.sleep(rnd.nextint(250));
						isFull = false;
						/*喚醒阻塞obj上的生產(chǎn)者線程*/
						obj.notifyAll();
					}
					/*隨機(jī)延時(shí)一段時(shí)間*/
					Thread.sleep(rnd.nextint(250));
				}
			}
			catch (InterruptedException e){
				/*我就是任性,這里什么都不做*/
			}
		}
	}
	/*=================定義生產(chǎn)者線程==================*/
	public class ProductThread implements Runnable{
		@Override
		    public void run(){
			try {
				while(true){
					synchronized(obj){
						while(isFull){
							obj.wait();
						}
						Thread.sleep(rnd.nextint(500));
						/*如果id為偶數(shù),生產(chǎn)價(jià)格為2的產(chǎn)品A
               如果id為奇數(shù),生產(chǎn)價(jià)格為1的產(chǎn)品B*/
						if(id % 2 == 0){
							goods = new Goods("A", 2, id);
						} else{
							goods = new Goods("B", 1, id);
						}
						Thread.sleep(rnd.nextint(250));
						id++;
						isFull = true;
						/*喚醒阻塞的消費(fèi)者線程*/
						obj.notifyAll();
					}
				}
			}
			catch (InterruptedException e) {
				/*我就是任性,這里什么都不做*/
			}
		}
	}
	public static void main(String[] args) throws InterruptedException{
		ProducterComsumerDemo1 pcd = new ProducterComsumerDemo1();
		Runnable c = pcd.new ComsumeThread();
		Runnable p = pcd.new ProductThread();
		new Thread(p).start();
		new Thread(p).start();
		new Thread(p).start();
		new Thread(c).start();
		new Thread(c).start();
		new Thread(c).start();
	}
}

1.3使用線程對(duì)象,多個(gè)緩存位置(有界),多生產(chǎn)者,多消費(fèi)者

1)當(dāng)緩存位置滿時(shí),我們應(yīng)該阻塞生產(chǎn)者線程

2)當(dāng)緩存位置空時(shí),我們應(yīng)該阻塞消費(fèi)者線程

下面的代碼我沒有用java對(duì)象內(nèi)置的鎖,而是用了ReentrantLock對(duì)象。是因?yàn)槠胀▽?duì)象的鎖只有一個(gè)阻塞隊(duì)列,如果使用notify方式,無(wú)法保證喚醒的就是特定類型的線程(消費(fèi)者線程或生產(chǎn)者線程),而notifyAll方法會(huì)喚醒所有的線程,當(dāng)剩余的緩存商品的數(shù)量小于生產(chǎn)者線程數(shù)量或已緩存商品的數(shù)量小于消費(fèi)者線程時(shí)效率就比較低。所以這里我們通過ReentrantLock對(duì)象構(gòu)造兩個(gè)阻塞隊(duì)列提高效率。

1.3.1普通方式

package demo;
import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/*使用線程對(duì)象,多個(gè)緩存位置(有界),多生產(chǎn)者,多消費(fèi)者,無(wú)限循環(huán)模式*/
public class ProducterComsumerDemo2 {
	/*最大緩存商品數(shù)*/
	private final int MAX_SLOT = 2;
	/*定義緩存商品的容器*/
	private LinkedList<Goods> queue = new LinkedList<Goods>();
	/*定義線程鎖和鎖對(duì)應(yīng)的阻塞隊(duì)列*/
	private Lock lock = new ReentrantLock();
	private Condition full = lock.newCondition();
	private Condition empty = lock.newCondition();
	/*商品的id編號(hào),生產(chǎn)者制造的每個(gè)商品的id都不一樣,每生產(chǎn)一個(gè)id自增1*/
	private int id = 1;
	/*隨機(jī)產(chǎn)生一個(gè)sleep時(shí)間*/
	private Random rnd = new Random();
	/*=================定義消費(fèi)者線程==================*/
	public class ComsumeThread implements Runnable{
		@Override
		    public void run(){
			while(true){
				/*加鎖,queue的出列操作都在同步代碼塊中*/
				lock.lock();
				try {
					while(queue.isEmpty()){
						System.out.println("queue is empty");
						empty.await();
					}
					/*隨機(jī)延時(shí)一段時(shí)間*/
					Thread.sleep(rnd.nextint(200));
					/*模擬消費(fèi)商品*/
					Goods goods = queue.remove();
					System.out.println(goods);
					/*隨機(jī)延時(shí)一段時(shí)間*/
					Thread.sleep(rnd.nextint(200));
					/*喚醒阻塞的生產(chǎn)者線程*/
					full.signal();
				}
				catch (InterruptedException e) {
					/*什么都不做*/
				}
				finally{
					lock.unlock();
				}
				/*釋放鎖后隨機(jī)延時(shí)一段時(shí)間*/
				try {
					Thread.sleep(rnd.nextint(200));
				}
				catch (InterruptedException e) {
					/*什么都不做*/
				}
			}
		}
	}
	/*=================定義生產(chǎn)者線程==================*/
	public class ProductThread implements Runnable{
		@Override
		    public void run(){
			while(true){
				/*加鎖,queue的入列操作,id操作都在同步代碼塊中*/
				lock.lock();
				try{
					while(queue.size() == MAX_SLOT){
						System.out.println("queue is full");
						full.await();
					}
					Thread.sleep(rnd.nextint(200));
					Goods goods = null;
					/*根據(jù)序號(hào)產(chǎn)生不同的商品*/
					switch(id%3){
						case 0 : goods = new Goods("A", 1, id);
						break;
						case 1 : goods = new Goods("B", 2, id);
						break;
						case 2 : goods = new Goods("C", 3, id);
						break;
					}
					Thread.sleep(rnd.nextint(200));
					queue.add(goods);
					id++;
					/*喚醒阻塞的消費(fèi)者線程*/
					empty.signal();
				}
				catch(InterruptedException e){
					/*什么都不做*/
				}
				finally{
					lock.unlock();
				}
				/*釋放鎖后隨機(jī)延時(shí)一段時(shí)間*/
				try {
					Thread.sleep(rnd.nextint(100));
				}
				catch (InterruptedException e) {
					/*什么都不做*/
				}
			}
		}
	}
	/*=================main==================*/
	public static void main(String[] args) throws InterruptedException{
		ProducterComsumerDemo2 pcd = new ProducterComsumerDemo2();
		Runnable c = pcd.new ComsumeThread();
		Runnable p = pcd.new ProductThread();
		/*兩個(gè)生產(chǎn)者線程,兩個(gè)消費(fèi)者線程*/
		new Thread(p).start();
		new Thread(p).start();
		new Thread(c).start();
		new Thread(c).start();
	}
}

運(yùn)行結(jié)果

queue is empty
queue is empty
name: B,  price:2,  id: 1
name: C,  price:3,  id: 2
name: A,  price:1,  id: 3
queue is full
name: B,  price:2,  id: 4
name: C,  price:3,  id: 5
queue is full
name: A,  price:1,  id: 6
name: B,  price:2,  id: 7
name: C,  price:3,  id: 8
name: A,  price:1,  id: 9
name: B,  price:2,  id: 10
name: C,  price:3,  id: 11
name: A,  price:1,  id: 12
name: B,  price:2,  id: 13
name: C,  price:3,  id: 14
……

1.3.2 更優(yōu)雅的實(shí)現(xiàn)方式

下面使用線程池(ThreadPool)和阻塞隊(duì)列(LinkedBlockingQueue)原子類(AtomicInteger)以更加優(yōu)雅的方式實(shí)現(xiàn)上述功能。LinkedBlockingQueue阻塞隊(duì)列僅在take和put方法上鎖,所以id必須定義為原子類。

package demo;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/*使用線程對(duì)象,多個(gè)緩存位置(有界),多生產(chǎn)者,多消費(fèi)者,無(wú)限循環(huán)模式*/
public class ProducterComsumerDemo4 {
	/*最大緩存商品數(shù)*/
	private final int MAX_SLOT = 3;
	/*定義緩存商品的容器*/
	private LinkedBlockingQueue<Goods> queue = new LinkedBlockingQueue<Goods>(MAX_SLOT);
	/*商品的id編號(hào),生產(chǎn)者制造的每個(gè)商品的id都不一樣,每生產(chǎn)一個(gè)id自增1*/
	private AtomicInteger id = new AtomicInteger(1);
	/*隨機(jī)產(chǎn)生一個(gè)sleep時(shí)間*/
	private Random rnd = new Random();
	/*=================定義消費(fèi)者線程==================*/
	public class ComsumeThread implements Runnable{
		@Override
		    public void run(){
			while(true){
				try {
					/*隨機(jī)延時(shí)一段時(shí)間*/
					Thread.sleep(rnd.nextint(200));
					/*模擬消費(fèi)商品*/
					Goods goods = queue.take();
					System.out.println(goods);
					/*隨機(jī)延時(shí)一段時(shí)間*/
					Thread.sleep(rnd.nextint(200));
				}
				catch (InterruptedException e) {
					/*什么都不做*/
				}
			}
		}
	}
	/*=================定義生產(chǎn)者線程==================*/
	public class ProductThread implements Runnable{
		@Override
		    public void run(){
			while(true){
				try{
					int x = id.getAndIncrement();
					Goods goods = null;
					Thread.sleep(rnd.nextint(200));
					/*根據(jù)序號(hào)產(chǎn)生不同的商品*/
					switch(x%3){
						case 0 : goods = new Goods("A", 1, x);
						break;
						case 1 : goods = new Goods("B", 2, x);
						break;
						case 2 : goods = new Goods("C", 3, x);
						break;
					}
					Thread.sleep(rnd.nextint(200));
					queue.put(goods);
					Thread.sleep(rnd.nextint(100));
				}
				catch(InterruptedException e){
					/*什么都不做*/
				}
			}
		}
	}
	/*=================main==================*/
	public static void main(String[] args) throws InterruptedException{
		ProducterComsumerDemo4 pcd = new ProducterComsumerDemo4();
		Runnable c = pcd.new ComsumeThread();
		Runnable p = pcd.new ProductThread();
		/*定義線程池*/
		ExecutorService es = Executors.newCachedThreadPool();
		/*三個(gè)生產(chǎn)者線程,兩個(gè)消費(fèi)者線程*/
		es.execute(p);
		es.execute(p);
		es.execute(p);
		es.execute(c);
		es.execute(c);
		es.shutdown();
	}
}

2.有限商品個(gè)數(shù)

這個(gè)問題顯然比上面的問題要復(fù)雜不少,原因在于要保證緩存區(qū)的商品要全部消費(fèi)掉,沒有重復(fù)消費(fèi)商品,沒有覆蓋商品,同時(shí)還要保證所有線程能夠正常結(jié)束,防止存在一直阻塞的線程。

2.1使用線程對(duì)象,多個(gè)緩存位置(有界),多生產(chǎn)者,多消費(fèi)者

思路定義一下三個(gè)變量

/*需要生產(chǎn)的總商品數(shù)*/
  private final int TOTAL_NUM = 30;
   
  /*已產(chǎn)生的數(shù)量*/
  private volatile int productNum = 0;
   
  /*已消耗的商品數(shù)*/
  private volatile int comsumedNum = 0;

每生產(chǎn)一個(gè)商品 productNum 自增1,直到TOTAL_NUM為止,如果不滿足條件 productNum < TOTAL_NUM 則結(jié)束進(jìn)程,自增操作必須在full.await()方法調(diào)用之前,防止生產(chǎn)者線程無(wú)法喚醒。

同理,每消費(fèi)一個(gè)商品 comsumedNum 自增1,直到TOTAL_NUM為止,如果不滿足條件 comsumedNum < TOTAL_NUM 則結(jié)束進(jìn)程,自增操作必須在empty.await()方法調(diào)用之前,防止消費(fèi)者線程無(wú)法喚醒。

comsumedNum和productNum相當(dāng)于計(jì)劃經(jīng)濟(jì)時(shí)代的糧票一樣,有了它能夠保證生產(chǎn)者線程在喚醒后一定需要生產(chǎn)一個(gè)商品,消費(fèi)者線程在喚醒以后一定能夠消費(fèi)一個(gè)商品

package demo;
import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/*使用線程對(duì)象,多個(gè)緩存位置(有界),多生產(chǎn)者,多消費(fèi)者, 有限商品個(gè)數(shù)*/
public class ProducterComsumerDemo3 {
	/*需要生產(chǎn)的總商品數(shù)*/
	private final int TOTAL_NUM = 30;
	/*已產(chǎn)生的數(shù)量*/
	private volatile int productNum = 0;
	/*已消耗的商品數(shù)*/
	private volatile int comsumedNum = 0;
	/*最大緩存商品數(shù)*/
	private final int MAX_SLOT = 2;
	/*定義線程公用的鎖和條件*/
	private Lock lock = new ReentrantLock();
	private Condition full = lock.newCondition();
	private Condition empty = lock.newCondition();
	/*定義緩存商品的容器*/
	private LinkedList<Goods> queue = new LinkedList<Goods>();
	/*商品的id編號(hào),生產(chǎn)者制造的每個(gè)商品的id都不一樣,每生產(chǎn)一個(gè)id自增1*/
	private int id = 1;
	/*隨機(jī)產(chǎn)生一個(gè)sleep時(shí)間*/
	private Random rnd = new Random();
	/*=================定義消費(fèi)者線程==================*/
	public class ComsumeThread implements Runnable{
		@Override
		    public void run(){
			while(true){
				/*加鎖, id、comsumedNum 操作都在同步代碼塊中*/
				lock.lock();
				try {
					/*隨機(jī)延時(shí)一段時(shí)間*/
					Thread.sleep(rnd.nextint(250));
					if(comsumedNum < TOTAL_NUM){
						comsumedNum++;
					} else{
						/*這里會(huì)自動(dòng)執(zhí)行finally的語(yǔ)句,釋放鎖*/
						break;
					}
					while(queue.isEmpty()){
						System.out.println("queue is empty");
						empty.await();
					}
					/*隨機(jī)延時(shí)一段時(shí)間*/
					Thread.sleep(rnd.nextint(250));
					/*模擬消費(fèi)商品*/
					Goods goods = queue.remove();
					System.out.println(goods);
					/*隨機(jī)延時(shí)一段時(shí)間*/
					Thread.sleep(rnd.nextint(250));
					/*喚醒阻塞的生產(chǎn)者線程*/
					full.signal();
				}
				catch (InterruptedException e) {
				}
				finally{
					lock.unlock();
				}
				/*釋放鎖后,隨機(jī)延時(shí)一段時(shí)間*/
				try {
					Thread.sleep(rnd.nextint(250));
				}
				catch (InterruptedException e) {
				}
			}
			System.out.println(
			          "customer "
			          + Thread.currentThread().getName()
			          + " is over");
		}
	}
	/*=================定義生產(chǎn)者線程==================*/
	public class ProductThread implements Runnable{
		@Override
		    public void run(){
			while(true){
				lock.lock();
				try{
					/*隨機(jī)延時(shí)一段時(shí)間*/
					Thread.sleep(rnd.nextint(250));
					if(productNum < TOTAL_NUM){
						productNum++;
					} else{
						/*這里會(huì)自動(dòng)執(zhí)行finally的語(yǔ)句,釋放鎖*/
						break;
					}
					Thread.sleep(rnd.nextint(250));
					while(queue.size() == MAX_SLOT){
						System.out.println("queue is full");
						full.await();
					}
					Thread.sleep(rnd.nextint(250));
					Goods goods = null;
					/*根據(jù)序號(hào)產(chǎn)生不同的商品*/
					switch(id%3){
						case 0 : goods = new Goods("A", 1, id);
						break;
						case 1 : goods = new Goods("B", 2, id);
						break;
						case 2 : goods = new Goods("C", 3, id);
						break;
					}
					queue.add(goods);
					id++;
					/*喚醒阻塞的消費(fèi)者線程*/
					empty.signal();
				}
				catch(InterruptedException e){
				}
				finally{
					lock.unlock();
				}
				/*釋放鎖后,隨機(jī)延時(shí)一段時(shí)間*/
				try {
					Thread.sleep(rnd.nextint(250));
				}
				catch (InterruptedException e) {
					/*什么都不做*/
				}
			}
			System.out.println(
			          "producter "
			          + Thread.currentThread().getName()
			          + " is over");
		}
	}
	/*=================main==================*/
	public static void main(String[] args) throws InterruptedException{
		ProducterComsumerDemo3 pcd = new ProducterComsumerDemo3();
		ComsumeThread c = pcd.new ComsumeThread();
		ProductThread p = pcd.new ProductThread();
		new Thread(p).start();
		new Thread(p).start();
		new Thread(p).start();
		new Thread(c).start();
		new Thread(c).start();
		new Thread(c).start();
		System.out.println("main Thread is over");
	}
}

2.2利用線程池,原子類,阻塞隊(duì)列,以更優(yōu)雅的方式實(shí)現(xiàn)

LinkedBlockingQueue阻塞隊(duì)列僅在take和put方法上鎖,所以productNum和comsumedNum必須定義為原子類。

package demo;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/*使用線程池,多個(gè)緩存位置(有界),多生產(chǎn)者,多消費(fèi)者, 有限商品個(gè)數(shù)*/
public class LinkedBlockingQueueDemo {
	/*需要生產(chǎn)的總商品數(shù)*/
	private final int TOTAL_NUM = 20;
	/*已產(chǎn)生商品的數(shù)量*/
	volatile AtomicInteger productNum = new AtomicInteger(0);
	/*已消耗的商品數(shù)*/
	volatile AtomicInteger comsumedNum = new AtomicInteger(0);
	/*最大緩存商品數(shù)*/
	private final int MAX_SLOT = 5;
	/*同步阻塞隊(duì)列,隊(duì)列容量為MAX_SLOT*/
	private LinkedBlockingQueue<Goods> lbq = new LinkedBlockingQueue<Goods>(MAX_SLOT);
	/*隨機(jī)數(shù)*/
	private Random rnd = new Random();
	/*pn表示產(chǎn)品的編號(hào),產(chǎn)品編號(hào)從1開始*/
	private volatile AtomicInteger pn = new AtomicInteger(1);
	/*=================定義消費(fèi)者線程==================*/
	public class CustomerThread implements Runnable{
		@Override
		    public void run(){
			while(comsumedNum.getAndIncrement() < TOTAL_NUM){
				try{
					/*隨機(jī)延時(shí)一段時(shí)間*/
					Thread.sleep(rnd.nextint(500));
					/*從隊(duì)列中取出商品,隊(duì)列空時(shí)發(fā)生阻塞*/
					Goods goods = lbq.take();
					/*隨機(jī)延時(shí)一段時(shí)間*/
					Thread.sleep(rnd.nextint(500));
					/*模擬消耗商品*/
					System.out.println(goods);
					/*隨機(jī)延時(shí)一段時(shí)間*/
					Thread.sleep(rnd.nextint(500));
				}
				catch(InterruptedException e){
				}
			}
			System.out.println(
			          "customer "
			          + Thread.currentThread().getName()
			          + " is over");
		}
	}
	/*=================定義生產(chǎn)者線程==================*/
	public class ProducerThread implements Runnable{
		@Override
		    public void run(){
			while(productNum.getAndIncrement() < TOTAL_NUM){
				try {
					int x = pn.getAndIncrement();
					Goods goods = null;
					/*根據(jù)序號(hào)產(chǎn)生不同的商品*/
					switch(x%3){
						case 0 : goods = new Goods("A", 1, x);
						break;
						case 1 : goods = new Goods("B", 2, x);
						break;
						case 2 : goods = new Goods("C", 3, x);
						break;
					}
					/*隨機(jī)延時(shí)一段時(shí)間*/
					Thread.sleep(rnd.nextint(500));
					/*產(chǎn)生的新產(chǎn)品入列,隊(duì)列滿時(shí)發(fā)生阻塞*/
					lbq.put(goods);
					/*隨機(jī)延時(shí)一段時(shí)間*/
					Thread.sleep(rnd.nextint(500));
				}
				catch (InterruptedException e1) {
					/*什么都不做*/
				}
			}
			System.out.println(
			          "producter "
			          + Thread.currentThread().getName()
			          + " is over ");
		}
	}
	/*=================main==================*/
	public static void main(String[] args){
		LinkedBlockingQueueDemo lbqd = new LinkedBlockingQueueDemo();
		Runnable c = lbqd.new CustomerThread();
		Runnable p = lbqd.new ProducerThread();
		ExecutorService es = Executors.newCachedThreadPool();
		es.execute(c);
		es.execute(c);
		es.execute(c);
		es.execute(p);
		es.execute(p);
		es.execute(p);
		es.shutdown();
		System.out.println("main Thread is over");
	}
}

總結(jié)

以上就是本文關(guān)于Java多線程中不同條件下編寫生產(chǎn)消費(fèi)者模型方法介紹的全部?jī)?nèi)容,希望對(duì)大家有所幫助。感興趣的朋友可以繼續(xù)參閱本站:

Java設(shè)計(jì)模式之代理模式原理及實(shí)現(xiàn)代碼分享

快速理解Java設(shè)計(jì)模式中的組合模式

Java設(shè)計(jì)模式之訪問者模式使用場(chǎng)景及代碼示例

如有不足之處,歡迎留言指出。

相關(guān)文章

  • 關(guān)于Java反編譯字節(jié)碼文件

    關(guān)于Java反編譯字節(jié)碼文件

    將高級(jí)語(yǔ)言翻譯成匯編語(yǔ)言或機(jī)器語(yǔ)言的過程Java語(yǔ)言中的編譯一般指將Java文件轉(zhuǎn)換成class文件顧名思義反編譯就是編譯的逆向過程其實(shí)我們常用的開發(fā)工具(例如:IDEA、Eclipse)都帶有反編譯功能,需要的朋友可以參考下
    2023-05-05
  • 詳解如何使用Spring的@FeignClient注解實(shí)現(xiàn)通信功能

    詳解如何使用Spring的@FeignClient注解實(shí)現(xiàn)通信功能

    SpringBoot是一個(gè)非常流行的Java框架,它提供了一系列工具來(lái)使這種交互無(wú)縫且高效,在這些工具中,@FeignClient注解因其易用性和強(qiáng)大的功能而脫穎而出, 在這篇文章中,我們將探討如何使用Spring的@FeignClient注解進(jìn)行客戶端-服務(wù)器通信,需要的朋友可以參考下
    2023-11-11
  • 一文帶你玩轉(zhuǎn)Java異常處理

    一文帶你玩轉(zhuǎn)Java異常處理

    這篇文章主要為大家介紹一下Java中的異常處理機(jī)制,文中通過示例為大家進(jìn)行了詳細(xì)的介紹,對(duì)我們學(xué)習(xí)有一定的幫助,感興趣的可以了解一下
    2022-08-08
  • Java 詳解異常的處理機(jī)制

    Java 詳解異常的處理機(jī)制

    異常是程序中的一些錯(cuò)誤,但并不是所有的錯(cuò)誤都是異常,并且錯(cuò)誤有時(shí)候是可以避免的。比如你的代碼少一個(gè)分號(hào),那運(yùn)行出來(lái)結(jié)果是提示是錯(cuò)誤 java.lang.Error;如果你用System.out.println(11/0),那你是因?yàn)橛?做了除數(shù),會(huì)拋出 java.lang.ArithmeticException 的異常
    2021-11-11
  • Spring高級(jí)之注解@PropertySource的原理

    Spring高級(jí)之注解@PropertySource的原理

    這篇文章主要介紹了Spring高級(jí)之注解@PropertySource的原理,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-03-03
  • java教程之二個(gè)arraylist排序的示例分享

    java教程之二個(gè)arraylist排序的示例分享

    常常遇到數(shù)組排序的問題,下面提供二個(gè)java的arraylist排序示例,需要的朋友可以參考下
    2014-03-03
  • 對(duì)arraylist中元素進(jìn)行排序?qū)嵗a

    對(duì)arraylist中元素進(jìn)行排序?qū)嵗a

    這篇文章主要介紹了對(duì)arraylist中元素進(jìn)行排序?qū)嵗a,還是比較不錯(cuò)的,這里分享給大家,供需要的朋友參考。
    2017-11-11
  • MyEclipse配置JDK的全過程

    MyEclipse配置JDK的全過程

    這篇文章主要介紹了MyEclipse配置JDK的全過程,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-04-04
  • SpringBoot整合Dubbo框架,實(shí)現(xiàn)RPC服務(wù)遠(yuǎn)程調(diào)用

    SpringBoot整合Dubbo框架,實(shí)現(xiàn)RPC服務(wù)遠(yuǎn)程調(diào)用

    Dubbo是一款高性能、輕量級(jí)的開源Java RPC框架,它提供了三大核心能力:面向接口的遠(yuǎn)程方法調(diào)用,智能容錯(cuò)和負(fù)載均衡,以及服務(wù)自動(dòng)注冊(cè)和發(fā)現(xiàn)。今天就來(lái)看下SpringBoot整合Dubbo框架的步驟
    2021-06-06
  • Intellij IDEA實(shí)現(xiàn)springboot熱部署過程解析

    Intellij IDEA實(shí)現(xiàn)springboot熱部署過程解析

    這篇文章主要介紹了Intellij IDEA實(shí)現(xiàn)springboot熱部署過程解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-08-08

最新評(píng)論