Java 阻塞隊(duì)列的7種類型小結(jié)
在 Java 中,阻塞隊(duì)列(BlockingQueue) 是一種線程安全的隊(duì)列結(jié)構(gòu),用于實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者模式。它的核心特性是在隊(duì)列為空時(shí)阻塞消費(fèi)者線程,在隊(duì)列滿時(shí)阻塞生產(chǎn)者線程,從而自動(dòng)協(xié)調(diào)線程之間的協(xié)作。
阻塞隊(duì)列的核心特性
- 線程安全:所有操作(如
put、take)都是線程安全的。 - 阻塞操作:
- 隊(duì)列滿時(shí):生產(chǎn)者線程阻塞(等待消費(fèi)者消費(fèi))。
- 隊(duì)列空時(shí):消費(fèi)者線程阻塞(等待生產(chǎn)者生產(chǎn))。
- 超時(shí)控制:支持帶超時(shí)時(shí)間的操作(如
offer(timeout, unit)和poll(timeout, unit))。 - 公平策略:部分實(shí)現(xiàn)(如
ArrayBlockingQueue)支持公平鎖,防止線程饑餓。
Java 中的 7 種阻塞隊(duì)列
以下是 Java 提供的 7 種阻塞隊(duì)列及其特點(diǎn)和適用場(chǎng)景:
1. ArrayBlockingQueue(有界隊(duì)列)
- 特點(diǎn):
- 基于數(shù)組實(shí)現(xiàn),容量固定(初始化時(shí)指定)。
- 支持公平鎖(默認(rèn)非公平鎖)。
- 適用場(chǎng)景:
- 需要嚴(yán)格限制隊(duì)列容量的場(chǎng)景(如任務(wù)量可控的線程池)。
- 示例代碼:
BlockingQueue<String> queue = new ArrayBlockingQueue<>(3); queue.put("A"); // 隊(duì)列滿時(shí)阻塞 String task = queue.take(); // 隊(duì)列空時(shí)阻塞
2. LinkedBlockingQueue(有界/無界隊(duì)列)
- 特點(diǎn):
- 基于鏈表實(shí)現(xiàn),默認(rèn)容量為
Integer.MAX_VALUE(可視為無界)。 - 入隊(duì)和出隊(duì)使用獨(dú)立鎖(
putLock和takeLock),提高并發(fā)性能。
- 基于鏈表實(shí)現(xiàn),默認(rèn)容量為
- 適用場(chǎng)景:
- 任務(wù)量不可預(yù)測(cè)的高吞吐量場(chǎng)景(如線程池默認(rèn)隊(duì)列)。
- 示例代碼:
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(100); // 指定容量 queue.put(1); // 隊(duì)列滿時(shí)阻塞 Integer task = queue.take(); // 隊(duì)列空時(shí)阻塞
3. PriorityBlockingQueue(無界優(yōu)先級(jí)隊(duì)列)
- 特點(diǎn):
- 無界隊(duì)列,基于優(yōu)先級(jí)堆實(shí)現(xiàn)。
- 元素必須實(shí)現(xiàn)
Comparable接口或提供比較器。
- 適用場(chǎng)景:
- 需要按優(yōu)先級(jí)處理任務(wù)(如緊急任務(wù)優(yōu)先)。
- 示例代碼:
BlockingQueue<PriorityTask> queue = new PriorityBlockingQueue<>(); queue.put(new PriorityTask(1)); // 任務(wù)優(yōu)先級(jí)由 compareTo() 決定 PriorityTask task = queue.take();
4. DelayQueue(延遲隊(duì)列)
- 特點(diǎn):
- 無界隊(duì)列,元素必須實(shí)現(xiàn)
Delayed接口。 - 只有在延遲時(shí)間到達(dá)后,任務(wù)才能被取出。
- 無界隊(duì)列,元素必須實(shí)現(xiàn)
- 適用場(chǎng)景:
- 定時(shí)任務(wù)(如緩存過期、訂單超時(shí))。
- 示例代碼:
DelayQueue<DelayedTask> queue = new DelayQueue<>(); queue.put(new DelayedTask(5000)); // 5 秒后可用 DelayedTask task = queue.take(); // 等待任務(wù)延遲時(shí)間到期
5. SynchronousQueue(同步隊(duì)列)
- 特點(diǎn):
- 無容量隊(duì)列,每個(gè)插入操作必須等待一個(gè)對(duì)應(yīng)的移除操作。
- 生產(chǎn)者和消費(fèi)者直接傳遞數(shù)據(jù)。
- 適用場(chǎng)景:
- 高吞吐量的短任務(wù)場(chǎng)景(如
newCachedThreadPool)。
- 高吞吐量的短任務(wù)場(chǎng)景(如
- 示例代碼:
BlockingQueue<String> queue = new SynchronousQueue<>(); queue.put("X"); // 阻塞直到有消費(fèi)者調(diào)用 take() String task = queue.take(); // 阻塞直到有生產(chǎn)者調(diào)用 put()
6. LinkedTransferQueue(無界隊(duì)列)
- 特點(diǎn):
- 支持“預(yù)占模式”(生產(chǎn)者和消費(fèi)者直接交互)。
- 高性能的無界隊(duì)列。
- 適用場(chǎng)景:
- 需要高效傳遞任務(wù)的場(chǎng)景(如快速響應(yīng)的系統(tǒng))。
- 示例代碼:
BlockingQueue<Integer> queue = new LinkedTransferQueue<>(); queue.put(100); // 直接傳遞給消費(fèi)者 Integer task = queue.take(); // 立即獲取任務(wù)
7. LinkedBlockingDeque(雙向阻塞隊(duì)列)
- 特點(diǎn):
- 雙端隊(duì)列,支持從兩端插入/移除元素。
- 可作為有界或無界隊(duì)列。
- 適用場(chǎng)景:
- 工作竊取算法(如
ForkJoinPool)。
- 工作竊取算法(如
- 示例代碼:
BlockingQueue<String> queue = new LinkedBlockingDeque<>(100); queue.put("A"); // 從尾部插入 String task = queue.take(); // 從頭部移除
阻塞隊(duì)列的核心方法
| 方法 | 行為 | 拋出異常 | 返回布爾值 | 阻塞 | 超時(shí)阻塞 |
|---|---|---|---|---|---|
| add(E e) | 插入元素 | 隊(duì)列滿時(shí)拋異常 | - | 否 | 否 |
| offer(E e) | 插入元素 | 隊(duì)列滿時(shí)返回 false | - | 否 | 否 |
| offer(E e, long timeout, TimeUnit unit) | 插入元素 | 超時(shí)后返回 false | - | 是 | 是 |
| put(E e) | 插入元素 | - | - | 是 | 否 |
| remove() | 移除元素 | 隊(duì)列空時(shí)拋異常 | - | 否 | 否 |
| poll() | 移除元素 | 隊(duì)列空時(shí)返回 null | - | 否 | 否 |
| poll(long timeout, TimeUnit unit) | 移除元素 | 超時(shí)后返回 null | - | 是 | 是 |
| take() | 移除元素 | - | - | 是 | 否 |
如何選擇阻塞隊(duì)列?
- 有界 vs 無界:
- 有界隊(duì)列(如 ArrayBlockingQueue):防止內(nèi)存溢出,需合理設(shè)置容量。
- 無界隊(duì)列(如 LinkedBlockingQueue):可能導(dǎo)致任務(wù)堆積,需結(jié)合拒絕策略使用。
- 優(yōu)先級(jí)需求:
- 使用 PriorityBlockingQueue 實(shí)現(xiàn)優(yōu)先級(jí)排序。
- 延遲任務(wù):
- 使用 DelayQueue 實(shí)現(xiàn)定時(shí)或延遲執(zhí)行。
- 高性能場(chǎng)景:
- SynchronousQueue 適合高吞吐量的短任務(wù)。
- LinkedTransferQueue 適合快速傳遞任務(wù)的場(chǎng)景。
示例:使用阻塞隊(duì)列實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者模型
import java.util.concurrent.*;
public class ProducerConsumerExample {
public static void main(String[] args) {
BlockingQueue<String> queue = new LinkedBlockingQueue<>(10);
// 生產(chǎn)者線程
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
String task = "Task-" + i;
queue.put(task); // 隊(duì)列滿時(shí)阻塞
System.out.println("Produced: " + task);
Thread.sleep(100);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 消費(fèi)者線程
Thread consumer = new Thread(() -> {
try {
while (true) {
String task = queue.take(); // 隊(duì)列空時(shí)阻塞
System.out.println("Consumed: " + task);
Thread.sleep(200);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
}
}
注意事項(xiàng)
- 避免內(nèi)存泄漏:
- 使用無界隊(duì)列時(shí),需結(jié)合線程池的拒絕策略(如
CallerRunsPolicy)。
- 使用無界隊(duì)列時(shí),需結(jié)合線程池的拒絕策略(如
- 公平性:
ArrayBlockingQueue支持公平鎖(構(gòu)造函數(shù)傳入true)。
- 線程中斷:
- 阻塞操作(如
put/take)會(huì)響應(yīng)中斷,需捕獲InterruptedException。
- 阻塞操作(如
通過合理選擇阻塞隊(duì)列類型,可以高效地實(shí)現(xiàn)線程間的協(xié)作,解決生產(chǎn)者-消費(fèi)者問題。
到此這篇關(guān)于Java 阻塞隊(duì)列的7種類型小結(jié)的文章就介紹到這了,更多相關(guān)Java 阻塞隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
springboot整合mybatis分頁攔截器的問題小結(jié)
springboot整合mybatis分頁攔截器,分頁攔截實(shí)際上就是獲取sql后將sql拼接limit,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2021-07-07
多模塊的springboot項(xiàng)目發(fā)布指定模塊的腳本方式
該文章主要介紹了如何在多模塊的SpringBoot項(xiàng)目中發(fā)布指定模塊的腳本,作者原先的腳本會(huì)清理并編譯所有模塊,導(dǎo)致發(fā)布時(shí)間過長(zhǎng),通過簡(jiǎn)化腳本,只使用`mvn clean install`命令,可以快速發(fā)布指定模塊及其依賴的模塊2025-01-01
Resilience4J通過yml設(shè)置circuitBreaker的方法
Resilience4j是一個(gè)輕量級(jí)、易于使用的容錯(cuò)庫,其靈感來自Netflix Hystrix,但專為Java 8和函數(shù)式編程設(shè)計(jì),這篇文章主要介紹了Resilience4J通過yml設(shè)置circuitBreaker的方法,需要的朋友可以參考下2022-10-10

