java之阻塞隊列BlockingQueue解析
一、阻塞隊列概念
阻塞隊列,顧名思義,首先它是一個隊列(先進先出),而一個阻塞隊列在數(shù)據(jù)結(jié)構(gòu)所起到的作用大致如下圖:

- 線程1往阻塞隊列中添加元素,而線程2從阻塞隊列中移除元素
- 當阻塞隊列是空是,從隊列中獲取元素的操作會被阻塞
- 當阻塞隊列是滿時,從隊列中添加元素的操作會被阻塞
- 試圖從空的阻塞隊列中獲取元素的線程將會被阻塞,直到其他的線程往空的隊列插入新的元素。
- 試圖網(wǎng)已滿的阻塞隊列中添加新元素的線程同樣會被阻塞,直到其他的線程從列中移除一個或者多個元素或者完全清空隊列后使隊列重新變得空閑起來并后續(xù)新增
二、阻塞隊列的好處
1.在多線程領(lǐng)域:所謂阻塞,在某些情況下會掛起線程,一旦滿足條件,被掛起的線程又會自動被喚醒。
2.我們不需要關(guān)心什么時候需要阻塞線程,什么時候需要喚醒線程,因為這一切BlockingQueue(阻塞隊列)都給你一手包辦了
PS:在concurrent包發(fā)布以前,在多線程環(huán)境下,我們每個程序員都必須自己控制這些細節(jié),尤其還要兼顧效率和線程安全,而這回給我們程序帶來不小的復雜度
三、阻塞隊列種類
- ArrayBlockingQueue:由數(shù)據(jù)結(jié)構(gòu)組成的有界阻塞隊列
- LinkedBlockingQueue:由鏈表結(jié)構(gòu)組成的有界(但大小默認值為 Integer.MAX_VALUE )阻塞隊列
- PriorityBlockingQueue:支持優(yōu)先級排序的無界阻塞隊列
- DelayQueue:使用優(yōu)先級隊列實現(xiàn)的延遲無界阻塞隊列
- SynchronousQueue:不存儲元素的阻塞隊列,也即單個元素的隊列
- LinkedTransferQueue:由鏈表結(jié)構(gòu)組成的無界阻塞隊列
- LinkedBlockingDeque:由歷覽表結(jié)構(gòu)組成的雙向阻塞隊列
PS:重點掌握ArrayBlockingQueue、LinkedBlockingQueue、SychronousQueue三種
四、BlockingQueue的核心方法
| 方法類型 | 拋出異常 | 特殊值 | 一直阻塞 | 超時退出 |
| 插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
| 移除 | remove() | poll() | take | poll(time,unit) |
| 檢查 | element() | peek() | 不可用 | 不可用 |
說明:
| 方法類型 | 情況 |
| 拋出異常 | 當阻塞隊列滿時,再往隊列中add會拋 IllegalStateException: Queue full; 當阻塞隊列空時,再從隊列里remove會拋 NoSuchElementException |
| 特殊值 | offer(e)插入方法,成功true失敗false poll() 移除方法,成功返回出隊列的元素,隊列里沒有就返回null |
| 一直阻塞 | 當阻塞隊列滿時,生產(chǎn)者線程繼續(xù)往隊列里put元素,隊列會一直阻塞線程直到take數(shù)據(jù)或響應(yīng)中斷退出 當阻塞隊列空時,消費者線程試圖從隊列take元素,隊列會一直阻塞消費者線程直到隊列可用 |
| 超時退出 | 當阻塞隊列滿時,隊列會阻塞生產(chǎn)者線程一定時間,超過限時后生產(chǎn)者線程會退出 |
五、示例代碼
1.使用SychronousQueue隊列
package com.jian8.juc.queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
/*
* 阻塞隊列SynchronousQueue演示
* */
public class SynchronousQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "\t put 1");
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName() + "\t put 2");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName() + "\t put 3");
blockingQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "AAA").start();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + "\t take" + blockingQueue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + "\t take" + blockingQueue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + "\t take" + blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "BBB").start();
}
}
運行結(jié)果:

2.傳統(tǒng)版生產(chǎn)者消費者模式
package com.jian8.juc.queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 一個初始值為零的變量,兩個線程對其交替操作,一個加1一個減1,來5輪
* 1. 線程 操作 資源類
* 2. 判斷 干活 通知
* 3. 防止虛假喚起機制
*/
public class ProdConsumer_TraditionDemo {
public static void main(String[] args) {
ShareData shareData = new ShareData();
for (int i = 1; i <= 5; i++) {
new Thread(() -> {
try {
shareData.increment();
} catch (Exception e) {
e.printStackTrace();
}
}, "ProductorA " + i).start();
}
for (int i = 1; i <= 5; i++) {
new Thread(() -> {
try {
shareData.decrement();
} catch (Exception e) {
e.printStackTrace();
}
}, "ConsumerA " + i).start();
}
for (int i = 1; i <= 5; i++) {
new Thread(() -> {
try {
shareData.increment();
} catch (Exception e) {
e.printStackTrace();
}
}, "ProductorB " + i).start();
}
for (int i = 1; i <= 5; i++) {
new Thread(() -> {
try {
shareData.decrement();
} catch (Exception e) {
e.printStackTrace();
}
}, "ConsumerB " + i).start();
}
}
}
//資源類
class ShareData {
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
//生產(chǎn)
public void increment() throws Exception {
lock.lock();
try {
//1.判斷
while (number != 0) {
//等待中,不能生產(chǎn)
condition.await();
}
//2.干活
number++;
System.out.println(Thread.currentThread().getName() + "\t" + number);
//3.通知
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
//消費
public void decrement() throws Exception {
lock.lock();
try {
//1.判斷
while (number == 0) {
//等待中,不能消費
condition.await();
}
//2.消費
number--;
System.out.println(Thread.currentThread().getName() + "\t" + number);
//3.通知
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
運行結(jié)果(部分):

3.阻塞隊列版(以ArrayBlockingQueue為例)生產(chǎn)者消費者模式
package com.jian8.juc.queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class ProdConsumer_BlockQueueDemo {
public static void main(String[] args) {
MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10));
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t生產(chǎn)線程啟動");
try {
myResource.myProd();
} catch (Exception e) {
e.printStackTrace();
}
}, "Prod").start();
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t消費線程啟動");
try {
myResource.myConsumer();
} catch (Exception e) {
e.printStackTrace();
}
}, "Consumer").start();
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("5s后main叫停,線程結(jié)束");
try {
myResource.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
}
//資源類
class MyResource {
private volatile boolean flag = true;//默認開啟,進行“生產(chǎn)+消費”活動
private AtomicInteger atomicInteger = new AtomicInteger();
BlockingQueue<String> blockingQueue = null;//消息隊列
public MyResource(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
System.out.println(blockingQueue.getClass().getName());
}
//生產(chǎn)
public void myProd() throws Exception {
String data = null;
boolean retValue;
while (flag) {
data = atomicInteger.incrementAndGet() + "";
//超過2s沒生產(chǎn)成功,退出生產(chǎn)
retValue = blockingQueue.offer(data, 2, TimeUnit.SECONDS);
if (retValue) {
System.out.println(Thread.currentThread().getName() + "\t插入隊列" + data + "成功");
} else {
System.out.println(Thread.currentThread().getName() + "\t插入隊列" + data + "失敗");
}
TimeUnit.SECONDS.sleep(1);
}
System.out.println(Thread.currentThread().getName() + "\t大老板叫停了,flag=false,生產(chǎn)結(jié)束");
}
//消費
public void myConsumer() throws Exception {
String result = null;
while (flag) {
//超過2s沒從隊列獲取數(shù)據(jù),消費退出
result = blockingQueue.poll(2, TimeUnit.SECONDS);
if (null == result || result.equalsIgnoreCase("")) {
flag = false;
System.out.println(Thread.currentThread().getName() + "\t超過2s沒有取到蛋糕,消費退出");
System.out.println();
return;
}
System.out.println(Thread.currentThread().getName() + "\t消費隊列" + result + "成功");
}
}
public void stop() throws Exception {
flag = false;
}
}
運行結(jié)果:

到此這篇關(guān)于java之阻塞隊列BlockingQueue解析的文章就介紹到這了,更多相關(guān)java阻塞隊列BlockingQueue內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
springboot如何通過controller層實現(xiàn)頁面切換
在Spring Boot中,通過Controller層實現(xiàn)頁面切換背景,Spring Boot的默認注解是@RestController,它包含了@Controller和@ResponseBody,@ResponseBody會將返回值轉(zhuǎn)換為字符串返回,因此無法實現(xiàn)頁面切換,將@RestController換成@Controller2024-12-12
使用try-with-resource的輸入輸出流自動關(guān)閉
這篇文章主要介紹了使用try-with-resource的輸入輸出流自動關(guān)閉方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-07-07

