Java生產(chǎn)者消費者的三種實現(xiàn)方式
Java生產(chǎn)者消費者的實現(xiàn)
Java生產(chǎn)者消費者是最基礎(chǔ)的線程同步問題,java崗面試中還是很容易遇到的,之前沒寫過多線程的代碼,面試中被問到很尬啊,面完回來惡補下。
在網(wǎng)上查到大概有5種生產(chǎn)者消費者的寫法,分別如下。
- 用synchronized對存儲加鎖,然后用object原生的wait() 和 notify()做同步。
- 用concurrent.locks.Lock,然后用condition的await() 和signal()做同步。
- 直接使用concurrent.BlockingQueue。
- 使用PipedInputStream/PipedOutputStream。
- 使用信號量semaphore?! ?/li>
我的理解,生產(chǎn)者消費者模式,其實只要保證在存儲端同一時刻只有一個線程讀或?qū)懢筒粫袉栴},然后再去考慮線程同步。
方法1 2 5都比較類似,都是加鎖來限制同一時刻只能有一個讀或?qū)?。而方? 4其實是在存儲內(nèi)部去保證讀和寫的唯一的,最低層肯定還是通過鎖機制來實現(xiàn)的,java底層代碼都封裝好了而已?! ?/p>
我自己嘗試寫了下前三種,代碼如下:
synchronized版本
import java.util.LinkedList; import java.util.Queue; public class ProducerAndConsumer { private final int MAX_LEN = 10; private Queue<Integer> queue = new LinkedList<Integer>(); class Producer extends Thread { @Override public void run() { producer(); } private void producer() { while(true) { synchronized (queue) { while (queue.size() == MAX_LEN) { queue.notify(); System.out.println("當(dāng)前隊列滿"); try { queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.add(1); queue.notify(); System.out.println("生產(chǎn)者生產(chǎn)一條任務(wù),當(dāng)前隊列長度為" + queue.size()); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } } } class Consumer extends Thread { @Override public void run() { consumer(); } private void consumer() { while (true) { synchronized (queue) { while (queue.size() == 0) { queue.notify(); System.out.println("當(dāng)前隊列為空"); try { queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.poll(); queue.notify(); System.out.println("消費者消費一條任務(wù),當(dāng)前隊列長度為" + queue.size()); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } } } public static void main(String[] args) { ProducerAndConsumer pc = new ProducerAndConsumer(); Producer producer = pc.new Producer(); Consumer consumer = pc.new Consumer(); producer.start(); consumer.start(); } }
lock版實現(xiàn),使用了condition做線程之間的同步
import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * version 1 doesn't use synchronized to improve performance */ public class ProducerAndConsumer1 { private final int MAX_LEN = 10; private Queue<Integer> queue = new LinkedList<Integer>(); private final Lock lock = new ReentrantLock(); private final Condition condition = lock.newCondition(); class Producer extends Thread { @Override public void run() { producer(); } private void producer() { while(true) { lock.lock(); try { while (queue.size() == MAX_LEN) { System.out.println("當(dāng)前隊列滿"); try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.add(1); condition.signal(); System.out.println("生產(chǎn)者生產(chǎn)一條任務(wù),當(dāng)前隊列長度為" + queue.size()); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } finally { lock.unlock(); } } } } class Consumer extends Thread { @Override public void run() { consumer(); } private void consumer() { while (true) { lock.lock(); try { while (queue.size() == 0) { System.out.println("當(dāng)前隊列為空"); try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.poll(); condition.signal(); System.out.println("消費者消費一條任務(wù),當(dāng)前隊列長度為" + queue.size()); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } finally { lock.unlock(); } } } } public static void main(String[] args) { ProducerAndConsumer pc = new ProducerAndConsumer(); Producer producer = pc.new Producer(); Consumer consumer = pc.new Consumer(); producer.start(); consumer.start(); } }
BlockingQueue版實現(xiàn)
import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class ProducerAndConsumer { private BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>(10); class Producer extends Thread { @Override public void run() { producer(); } private void producer() { while(true) { try { queue.put(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("生產(chǎn)者生產(chǎn)一條任務(wù),當(dāng)前隊列長度為" + queue.size()); try { Thread.sleep(new Random().nextInt(1000)+500); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Consumer extends Thread { @Override public void run() { consumer(); } private void consumer() { while (true) { try { queue.take(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("消費者消費一條任務(wù),當(dāng)前隊列長度為" + queue.size()); try { Thread.sleep(new Random().nextInt(1000)+500); } catch (InterruptedException e) { e.printStackTrace(); } } } } public static void main(String[] args) { ProducerAndConsumer pc = new ProducerAndConsumer(); Producer producer = pc.new Producer(); Consumer consumer = pc.new Consumer(); producer.start(); consumer.start(); } }
總結(jié)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
Maven?Web項目使用Cargo插件實現(xiàn)自動化部署的詳細(xì)步驟
cargo ,它是一組幫助用戶實現(xiàn)自動化部署,操作Web容器的工具,并且?guī)缀踔С炙械腤eb容器,這篇文章主要介紹了Maven?Web項目使用Cargo實現(xiàn)自動化部署,需要的朋友可以參考下2023-02-02詳解Java8?CompletableFuture的并行處理用法
Java8中有一個工具非常有用,那就是CompletableFuture,本章主要講解CompletableFuture的并行處理用法,感興趣的小伙伴可以了解一下2022-04-04Spring如何集成ibatis項目并實現(xiàn)dao層基類封裝
這篇文章主要介紹了Spring如何集成ibatis項目并實現(xiàn)dao層基類封裝,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-09-09Spring 使用JavaConfig實現(xiàn)配置的方法步驟
這篇文章主要介紹了Spring 使用JavaConfig實現(xiàn)配置的方法步驟,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-01-01Java報錯Java.net.SocketTimeoutException的幾種解決方法
在 Java 網(wǎng)絡(luò)編程中,SocketTimeoutException 通常表示在進行網(wǎng)絡(luò)操作時,等待響應(yīng)的時間超過了設(shè)定的超時時間,本文將深入探討 Java.net.SocketTimeoutException 的問題,并為開發(fā)者和環(huán)境配置者提供詳細(xì)的解決方案,需要的朋友可以參考下2024-10-10