Java并發(fā)編程ArrayBlockingQueue的使用
一、ArrayBlockingQueue概述
ArrayBlockingQueue是一個(gè)基于數(shù)組的有界阻塞隊(duì)列。它在創(chuàng)建時(shí)需要指定隊(duì)列的大小,并且這個(gè)大小在之后是不能改變的。隊(duì)列中的元素按照FIFO(先進(jìn)先出)的原則進(jìn)行排序。ArrayBlockingQueue是線(xiàn)程安全的,可以在多線(xiàn)程環(huán)境下安全地使用。
二、內(nèi)部機(jī)制
2.1. 數(shù)據(jù)結(jié)構(gòu)
ArrayBlockingQueue內(nèi)部使用一個(gè)循環(huán)數(shù)組作為存儲(chǔ)結(jié)構(gòu)。它有兩個(gè)關(guān)鍵索引:takeIndex和putIndex,分別用于從隊(duì)列中取出元素和向隊(duì)列中添加元素。當(dāng)添加元素時(shí),putIndex會(huì)遞增;當(dāng)取出元素時(shí),takeIndex會(huì)遞增。當(dāng)索引達(dá)到數(shù)組的末尾時(shí),它們會(huì)回到數(shù)組的開(kāi)頭,形成一個(gè)循環(huán)。
2.2. 鎖和條件變量
為了保證線(xiàn)程安全,ArrayBlockingQueue使用了一個(gè)重入鎖(ReentrantLock)以及與之關(guān)聯(lián)的條件變量(Condition)。鎖用于保護(hù)隊(duì)列的狀態(tài),而條件變量用于在隊(duì)列為空或滿(mǎn)時(shí)等待和通知線(xiàn)程。具體來(lái)說(shuō),ArrayBlockingQueue內(nèi)部有兩個(gè)條件變量:notEmpty和notFull。當(dāng)隊(duì)列滿(mǎn)時(shí),生產(chǎn)者線(xiàn)程會(huì)等待在notFull條件變量上;當(dāng)隊(duì)列空時(shí),消費(fèi)者線(xiàn)程會(huì)等待在notEmpty條件變量上。
2.3. 入隊(duì)和出隊(duì)操作
- 入隊(duì)操作(put):當(dāng)調(diào)用
put方法向隊(duì)列中添加元素時(shí),如果隊(duì)列已滿(mǎn),生產(chǎn)者線(xiàn)程會(huì)被阻塞,直到隊(duì)列中有空閑位置。一旦有空閑位置,生產(chǎn)者線(xiàn)程會(huì)將元素添加到隊(duì)列中,并通知可能在等待的消費(fèi)者線(xiàn)程。 - 出隊(duì)操作(take):當(dāng)調(diào)用
take方法從隊(duì)列中取出元素時(shí),如果隊(duì)列為空,消費(fèi)者線(xiàn)程會(huì)被阻塞,直到隊(duì)列中有元素可供消費(fèi)。一旦有元素可供消費(fèi),消費(fèi)者線(xiàn)程會(huì)從隊(duì)列中取出元素,并通知可能在等待的生產(chǎn)者線(xiàn)程。
三、使用場(chǎng)景
- 生產(chǎn)者-消費(fèi)者模式:
ArrayBlockingQueue非常適合實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者模式。生產(chǎn)者線(xiàn)程將元素添加到隊(duì)列中,消費(fèi)者線(xiàn)程從隊(duì)列中取出元素進(jìn)行處理。通過(guò)阻塞隊(duì)列,可以很好地協(xié)調(diào)生產(chǎn)者和消費(fèi)者之間的速率差異,避免資源的浪費(fèi)。 - 限流:由于
ArrayBlockingQueue是一個(gè)有界隊(duì)列,它可以用于實(shí)現(xiàn)限流功能。當(dāng)隊(duì)列已滿(mǎn)時(shí),新的請(qǐng)求會(huì)被阻塞或拒絕,從而保護(hù)系統(tǒng)免受過(guò)多的請(qǐng)求沖擊。 - 任務(wù)調(diào)度:在并發(fā)編程中,
ArrayBlockingQueue可以用作任務(wù)調(diào)度器的一部分。將任務(wù)作為元素添加到隊(duì)列中,然后由工作線(xiàn)程從隊(duì)列中取出任務(wù)進(jìn)行處理。這種方式可以實(shí)現(xiàn)任務(wù)的異步執(zhí)行和資源的有效利用。
四、最佳實(shí)踐
- 合理設(shè)置隊(duì)列大小:在使用
ArrayBlockingQueue時(shí),應(yīng)根據(jù)實(shí)際需求合理設(shè)置隊(duì)列的大小。過(guò)小的隊(duì)列可能導(dǎo)致頻繁的阻塞和上下文切換;過(guò)大的隊(duì)列可能導(dǎo)致內(nèi)存浪費(fèi)和長(zhǎng)時(shí)間的等待。 - 避免在隊(duì)列中存儲(chǔ)大量數(shù)據(jù):由于
ArrayBlockingQueue是基于數(shù)組的實(shí)現(xiàn),每個(gè)元素都會(huì)占用一定的內(nèi)存空間。因此,應(yīng)避免在隊(duì)列中存儲(chǔ)大量數(shù)據(jù),以減少內(nèi)存消耗和垃圾回收的壓力??梢詫?shù)據(jù)拆分成較小的單元進(jìn)行傳輸和處理。 - 注意線(xiàn)程安全:雖然
ArrayBlockingQueue本身是線(xiàn)程安全的,但在使用過(guò)程中仍需注意線(xiàn)程安全的問(wèn)題。例如,在多個(gè)線(xiàn)程同時(shí)訪(fǎng)問(wèn)隊(duì)列時(shí),應(yīng)確保對(duì)隊(duì)列的訪(fǎng)問(wèn)是原子的,以避免競(jìng)態(tài)條件和數(shù)據(jù)不一致的問(wèn)題。 - 優(yōu)雅地處理中斷:當(dāng)線(xiàn)程在等待從隊(duì)列中取出元素或向隊(duì)列中添加元素時(shí),可能會(huì)被中斷。在編寫(xiě)代碼時(shí),應(yīng)優(yōu)雅地處理這些中斷情況,例如通過(guò)捕獲
InterruptedException并適當(dāng)?shù)仨憫?yīng)中斷請(qǐng)求。 - 使用try-with-resources語(yǔ)句:在使用
ArrayBlockingQueue的迭代器時(shí),建議使用try-with-resources語(yǔ)句來(lái)自動(dòng)關(guān)閉迭代器。這樣可以確保在迭代過(guò)程中及時(shí)釋放資源,避免資源泄漏的問(wèn)題。
五、ArrayBlockingQueue實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者
下面是一個(gè)使用ArrayBlockingQueue實(shí)現(xiàn)的稍微復(fù)雜的生產(chǎn)者-消費(fèi)者示例。代碼中模擬一個(gè)生產(chǎn)者線(xiàn)程生產(chǎn)數(shù)據(jù),多個(gè)消費(fèi)者線(xiàn)程消費(fèi)數(shù)據(jù)的場(chǎng)景,并且消費(fèi)者在處理完數(shù)據(jù)后會(huì)將結(jié)果存回另一個(gè)阻塞隊(duì)列中以供后續(xù)處理。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class ProducerConsumerWithArrayBlockingQueue {
public static void main(String[] args) {
// 創(chuàng)建一個(gè)容量為10的ArrayBlockingQueue作為生產(chǎn)者和消費(fèi)者的共享隊(duì)列
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
// 創(chuàng)建一個(gè)容量為5的ArrayBlockingQueue用于存儲(chǔ)消費(fèi)者的處理結(jié)果
BlockingQueue<Integer> resultQueue = new ArrayBlockingQueue<>(5);
// 創(chuàng)建一個(gè)AtomicInteger作為數(shù)據(jù)生成的計(jì)數(shù)器
AtomicInteger counter = new AtomicInteger();
// 創(chuàng)建一個(gè)生產(chǎn)者線(xiàn)程
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 20; i++) {
int item = counter.incrementAndGet();
System.out.println("生產(chǎn)者生產(chǎn)數(shù)據(jù):" + item);
// 將數(shù)據(jù)放入隊(duì)列中
queue.put(item);
// 稍微延遲一下,模擬生產(chǎn)數(shù)據(jù)的時(shí)間消耗
Thread.sleep(200);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 創(chuàng)建一個(gè)固定線(xiàn)程池的ExecutorService用于執(zhí)行消費(fèi)者任務(wù)
ExecutorService executorService = Executors.newFixedThreadPool(3);
// 提交3個(gè)消費(fèi)者任務(wù)到線(xiàn)程池中
for (int i = 0; i < 3; i++) {
executorService.submit(() -> {
try {
while (true) {
// 從隊(duì)列中取出數(shù)據(jù)
int item = queue.take();
// 處理數(shù)據(jù)(此處僅打印作為示例)
System.out.println("消費(fèi)者" + Thread.currentThread().getId() + "消費(fèi)數(shù)據(jù):" + item);
// 假設(shè)處理后的數(shù)據(jù)是原始數(shù)據(jù)的平方
int processedItem = item * item;
// 將處理后的結(jié)果存入結(jié)果隊(duì)列中
resultQueue.put(processedItem);
// 稍微延遲一下,模擬處理數(shù)據(jù)的時(shí)間消耗
Thread.sleep(500);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 啟動(dòng)生產(chǎn)者線(xiàn)程
producer.start();
// 等待生產(chǎn)者線(xiàn)程完成
try {
producer.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 關(guān)閉ExecutorService(這將導(dǎo)致消費(fèi)者線(xiàn)程中斷)
executorService.shutdown();
try {
// 等待一段時(shí)間,讓消費(fèi)者線(xiàn)程處理剩余的數(shù)據(jù)
if (!executorService.awaitTermination(2, TimeUnit.SECONDS)) {
executorService.shutdownNow(); // 如果超時(shí)則強(qiáng)制關(guān)閉消費(fèi)者線(xiàn)程
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
// 處理結(jié)果隊(duì)列中的數(shù)據(jù)(此處僅打印作為示例)
while (!resultQueue.isEmpty()) {
System.out.println("處理結(jié)果:" + resultQueue.poll());
}
}
}
在上面的代碼中,我們定義了兩個(gè)阻塞隊(duì)列
queue和resultQueue,一個(gè)用于生產(chǎn)者和消費(fèi)者之間傳遞數(shù)據(jù),另一個(gè)用于存儲(chǔ)消費(fèi)者的處理結(jié)果。我們還使用了一個(gè)
AtomicInteger作為數(shù)據(jù)生成的計(jì)數(shù)器。生產(chǎn)者線(xiàn)程每次生產(chǎn)一個(gè)數(shù)據(jù)就將其放入queue中,而消費(fèi)者線(xiàn)程則從queue中取出數(shù)據(jù)進(jìn)行處理,并將處理結(jié)果放入resultQueue中。最后,我們?cè)谥骶€(xiàn)程中等待生產(chǎn)者線(xiàn)程完成后,關(guān)閉消費(fèi)者線(xiàn)程的
ExecutorService,并處理resultQueue中的剩余數(shù)據(jù)。
需要注意的是,在實(shí)際的生產(chǎn)環(huán)境中,消費(fèi)者線(xiàn)程通常會(huì)有退出條件,而不是無(wú)限循環(huán)地處理數(shù)據(jù)。在這個(gè)示例中,由于我們?cè)O(shè)置了executorService.awaitTermination的超時(shí)時(shí)間,所以當(dāng)超時(shí)發(fā)生時(shí),會(huì)強(qiáng)制關(guān)閉消費(fèi)者線(xiàn)程。但是,在更復(fù)雜的場(chǎng)景下,我們可能需要使用其他機(jī)制來(lái)優(yōu)雅地關(guān)閉消費(fèi)者線(xiàn)程,例如使用一個(gè)特殊的結(jié)束信號(hào)或定期檢查某個(gè)關(guān)閉標(biāo)志。
請(qǐng)注意,在ArrayBlockingQueue中,queue.isEmpty()并不是一個(gè)可靠的退出條件,因?yàn)樵诙嗑€(xiàn)程環(huán)境下,你可能會(huì)遇到競(jìng)態(tài)條件的問(wèn)題。更可靠的方式是使用一個(gè)特殊的結(jié)束信號(hào)或定期檢查某個(gè)關(guān)閉標(biāo)志來(lái)退出循環(huán)。
六、總結(jié)
ArrayBlockingQueue是Java并發(fā)編程中一個(gè)非常有用的數(shù)據(jù)結(jié)構(gòu)。它提供了一個(gè)高效、線(xiàn)程安全的有界阻塞隊(duì)列實(shí)現(xiàn),適用于多種場(chǎng)景如生產(chǎn)者-消費(fèi)者模式、限流和任務(wù)調(diào)度等。在使用過(guò)程中,我們應(yīng)注意合理設(shè)置隊(duì)列大小、避免存儲(chǔ)大量數(shù)據(jù)、注意線(xiàn)程安全、優(yōu)雅地處理中斷以及使用try-with-resources語(yǔ)句等最佳實(shí)踐。通過(guò)深入了解ArrayBlockingQueue的內(nèi)部機(jī)制和最佳實(shí)踐,我們可以更好地利用它來(lái)解決并發(fā)編程中的挑戰(zhàn)。
到此這篇關(guān)于Java并發(fā)編程ArrayBlockingQueue的使用的文章就介紹到這了,更多相關(guān)Java ArrayBlockingQueue內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- java ArrayBlockingQueue阻塞隊(duì)列的實(shí)現(xiàn)示例
- Java中ArrayBlockingQueue和LinkedBlockingQueue
- Java 并發(fā)編程ArrayBlockingQueue的實(shí)現(xiàn)
- java ArrayBlockingQueue的方法及缺點(diǎn)分析
- Java源碼解析阻塞隊(duì)列ArrayBlockingQueue介紹
- Java源碼解析阻塞隊(duì)列ArrayBlockingQueue常用方法
- Java源碼解析阻塞隊(duì)列ArrayBlockingQueue功能簡(jiǎn)介
- 詳細(xì)分析Java并發(fā)集合ArrayBlockingQueue的用法
- java并發(fā)之ArrayBlockingQueue詳細(xì)介紹
相關(guān)文章
Spring Boot 2.0多數(shù)據(jù)源配置方法實(shí)例詳解
這篇文章主要介紹了Spring Boot 2.0多數(shù)據(jù)源配置方法實(shí)例詳解,非常不錯(cuò),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2018-09-09
Springboot上傳文件時(shí)提示405問(wèn)題及排坑過(guò)程
這篇文章主要介紹了Springboot上傳文件時(shí)提示405問(wèn)題及排坑過(guò)程,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-07-07
一篇文中細(xì)看Java多線(xiàn)程的創(chuàng)建方式
隨著計(jì)算機(jī)的配置越來(lái)越高,我們需要將進(jìn)程進(jìn)一步優(yōu)化,細(xì)分為線(xiàn)程,充分提高圖形化界面的多線(xiàn)程的開(kāi)發(fā),這篇文章主要給大家介紹了如何通過(guò)一篇文中細(xì)看Java多線(xiàn)程的創(chuàng)建方式,需要的朋友可以參考下2021-07-07
Java連接合并2個(gè)數(shù)組(Array)的5種方法例子
最近在寫(xiě)代碼時(shí)遇到了需要合并兩個(gè)數(shù)組的需求,突然發(fā)現(xiàn)以前沒(méi)用過(guò),于是研究了一下合并數(shù)組的方式,這篇文章主要給大家介紹了關(guān)于Java連接合并2個(gè)數(shù)組(Array)的5種方法,需要的朋友可以參考下2023-12-12
詳解Java如何使用集合來(lái)實(shí)現(xiàn)一個(gè)客戶(hù)信息管理系統(tǒng)
讀萬(wàn)卷書(shū)不如行萬(wàn)里路,只學(xué)書(shū)上的理論是遠(yuǎn)遠(yuǎn)不夠的,只有在實(shí)戰(zhàn)中才能獲得能力的提升,本篇文章手把手帶你用Java 集合實(shí)現(xiàn)一個(gè)客戶(hù)信息管理系統(tǒng),大家可以在過(guò)程中查缺補(bǔ)漏,提升水平2021-11-11
SpringBoot設(shè)置默認(rèn)主頁(yè)的方法步驟
這篇文章主要介紹了SpringBoot設(shè)置默認(rèn)主頁(yè)的方法步驟,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-12-12

