java ArrayBlockingQueue阻塞隊列的實現示例
在Java并發(fā)編程中,ArrayBlockingQueue
是一個非常常用的工具類。它是一個由數組支持的有界阻塞隊列,提供了線程安全的隊列操作。
1.ArrayBlockingQueue概述
ArrayBlockingQueue
是一個基于數組實現的阻塞隊列,它繼承自AbstractQueue
并實現了BlockingQueue
接口。這個隊列在創(chuàng)建時需要指定一個固定的大小,之后這個大小就不能再改變了。當隊列滿時,如果再有新的元素試圖加入隊列,那么這個操作會被阻塞;同樣地,如果隊列為空,那么從隊列中取元素的操作也會被阻塞。這種特性使得ArrayBlockingQueue
非常適合作為生產者-消費者模式中的緩沖區(qū)。
2.ArrayBlockingQueue的核心特性
2.1.線程安全性
ArrayBlockingQueue
是線程安全的,它通過內部鎖機制保證了在多線程環(huán)境下的安全性。因此,在多線程環(huán)境中,你可以放心地使用它而不需要擔心數據的一致性問題。
2.2.阻塞控制
ArrayBlockingQueue
提供了阻塞控制機制。當隊列滿時,嘗試向隊列中添加元素的線程會被阻塞,直到隊列中有空間可用;同樣,當隊列為空時,嘗試從隊列中取出元素的線程也會被阻塞,直到隊列中有元素可供消費。這種機制可以有效地控制生產者和消費者的速度,避免資源的浪費。
2.3.有界性
ArrayBlockingQueue
的有界性可以防止隊列無限制地增長,從而避免內存溢出。在實際應用中,這種有界性可以作為系統(tǒng)的一個流量控制閥,當系統(tǒng)過載時,通過阻塞或拒絕請求來保護系統(tǒng)。
3.ArrayBlockingQueue的使用
3.1.創(chuàng)建ArrayBlockingQueue
創(chuàng)建一個ArrayBlockingQueue
非常簡單,只需要指定隊列的大小即可:
int queueSize = 10; BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(queueSize);
3.2.生產者-消費者模式
ArrayBlockingQueue
常用于生產者-消費者模式。生產者負責生成數據并添加到隊列中,而消費者則從隊列中取出數據并處理。下面是一個簡單的生產者-消費者示例:
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class ProducerConsumerExample { public static void main(String[] args) { BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5); Thread producer = new Thread(() -> { for (int i = 0; i < 10; i++) { try { System.out.println("生產者生產了數據:" + i); queue.put(i); Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread consumer = new Thread(() -> { while (true) { try { Integer data = queue.take(); System.out.println("消費者消費了數據:" + data); } catch (InterruptedException e) { e.printStackTrace(); } } }); producer.start(); consumer.start(); } }
運行結果:
生產者生產了數據:0
消費者消費了數據:0
生產者生產了數據:1
消費者消費了數據:1
生產者生產了數據:2
消費者消費了數據:2
生產者生產了數據:3
消費者消費了數據:3
生產者生產了數據:4
消費者消費了數據:4
生產者生產了數據:5
消費者消費了數據:5
生產者生產了數據:6
消費者消費了數據:6
生產者生產了數據:7
消費者消費了數據:7
生產者生產了數據:8
消費者消費了數據:8
生產者生產了數據:9
消費者消費了數據:9
在這個示例中,我們創(chuàng)建了一個大小為5的ArrayBlockingQueue
,然后啟動了一個生產者線程和一個消費者線程。生產者線程會生成10個數據,并嘗試將它們添加到隊列中;消費者線程則會不斷地從隊列中取出數據并處理。由于隊列的大小只有5,因此當生產者生產了5個數據后,它會被阻塞,直到消費者消費了一些數據釋放出空間。同樣地,當隊列為空時,消費者線程也會被阻塞,直到生產者生產了新的數據。
4.ArrayBlockingQueue的最佳實踐
4.1.選擇合適的隊列大小
隊列的大小應根據具體的應用場景來設置。如果設置得太小,可能會導致頻繁的阻塞和上下文切換,影響性能;如果設置得太大,可能會浪費內存資源。因此,在選擇隊列大小時,需要綜合考慮系統(tǒng)的負載、內存資源和性能要求等因素。
4.2.合理使用阻塞方法
ArrayBlockingQueue
提供了多種阻塞方法,如put
、take
、offer
和poll
等。在使用這些方法時,需要根據具體的需求來選擇合適的方法。例如,如果你希望當隊列滿時生產者線程能夠阻塞等待空間可用,那么可以使用put
方法;如果你希望生產者線程在隊列滿時能夠立即返回并做其他處理,那么可以使用offer
方法。
4.3.避免死鎖
在使用ArrayBlockingQueue
時,需要注意避免死鎖的發(fā)生。例如,不要在持有其他鎖的情況下調用ArrayBlockingQueue
的阻塞方法,否則可能會導致死鎖。此外,還需要注意避免循環(huán)等待和饑餓等問題。
4.4.考慮使用公平策略
ArrayBlockingQueue
的構造函數允許指定一個公平性參數。如果設置為true,等待時間最長的線程將優(yōu)先獲得訪問隊列的機會。但需要注意的是,公平性可能會降低性能。因此,在決定是否使用公平策略時,需要綜合考慮系統(tǒng)的性能和公平性要求。
5.源碼詳解
5.1.主要屬性
// 用于存儲隊列元素的數組 final Object[] items; // 隊列的容量 int count; // 控制并發(fā)訪問的鎖 final ReentrantLock lock; // 隊列不滿時的等待條件 private final Condition notFull; // 隊列不為空時的等待條件 private final Condition notEmpty; // 隊列中等待取數據的線程數 final AtomicInteger waitingConsumers = new AtomicInteger(); // 隊列中等待插入數據的線程數 final AtomicInteger waitingProducers = new AtomicInteger();
5.2.構造函數
ArrayBlockingQueue
提供了幾種構造函數,其中最基本的兩個是接受隊列容量和指定是否公平的構造函數。
public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
5.3.入隊操作
put(E e)
和 offer(E e)
是兩種入隊操作,其中 put
方法在隊列滿時會阻塞,而 offer
方法在隊列滿時會立即返回失敗或者根據提供的超時時間等待。
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } } public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } enqueue(e); return true; } finally { lock.unlock(); } } private void enqueue(E x) { // 隊列尾部插入元素 final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; // 通知可能在等待的消費者線程 notEmpty.signal(); }
5.4.出隊操作
take()
和 poll()
是兩種出隊操作,其中 take
方法在隊列空時會阻塞,而 poll
方法在隊列空時會立即返回 null
或者根據提供的超時時間等待。
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } return dequeue(); } finally { lock.unlock(); } } private E dequeue() { // 隊列頭部取出元素 final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); // 通知可能在等待的生產者線程 notFull.signal(); return x; }
6.總結
ArrayBlockingQueue
是Java并發(fā)編程中一個非常實用的工具類。它提供了線程安全的阻塞隊列實現,支持生產者-消費者模式,并允許通過隊列的大小來控制系統(tǒng)的流量。在使用ArrayBlockingQueue
時,需要注意選擇合適的隊列大小、合理使用阻塞方法、避免死鎖和考慮使用公平策略等問題。通過合理地使用ArrayBlockingQueue
,可以有效地提高系統(tǒng)的并發(fā)性能和穩(wěn)定性。
到此這篇關于java ArrayBlockingQueue阻塞隊列的實現示例的文章就介紹到這了,更多相關java ArrayBlockingQueue阻塞隊列內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
- Java中ArrayBlockingQueue和LinkedBlockingQueue
- Java 并發(fā)編程ArrayBlockingQueue的實現
- java ArrayBlockingQueue的方法及缺點分析
- Java源碼解析阻塞隊列ArrayBlockingQueue介紹
- Java源碼解析阻塞隊列ArrayBlockingQueue常用方法
- Java源碼解析阻塞隊列ArrayBlockingQueue功能簡介
- 詳細分析Java并發(fā)集合ArrayBlockingQueue的用法
- java并發(fā)之ArrayBlockingQueue詳細介紹
- Java并發(fā)編程ArrayBlockingQueue的使用