Java concurrency集合之ArrayBlockingQueue_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
ArrayBlockingQueue介紹
ArrayBlockingQueue是數(shù)組實(shí)現(xiàn)的線程安全的有界的阻塞隊(duì)列。
線程安全是指,ArrayBlockingQueue內(nèi)部通過(guò)“互斥鎖”保護(hù)競(jìng)爭(zhēng)資源,實(shí)現(xiàn)了多線程對(duì)競(jìng)爭(zhēng)資源的互斥訪問(wèn)。而有界,則是指ArrayBlockingQueue對(duì)應(yīng)的數(shù)組是有界限的。 阻塞隊(duì)列,是指多線程訪問(wèn)競(jìng)爭(zhēng)資源時(shí),當(dāng)競(jìng)爭(zhēng)資源已被某線程獲取時(shí),其它要獲取該資源的線程需要阻塞等待;而且,ArrayBlockingQueue是按 FIFO(先進(jìn)先出)原則對(duì)元素進(jìn)行排序,元素都是從尾部插入到隊(duì)列,從頭部開始返回。
注意:ArrayBlockingQueue不同于ConcurrentLinkedQueue,ArrayBlockingQueue是數(shù)組實(shí)現(xiàn)的,并且是有界限的;而ConcurrentLinkedQueue是鏈表實(shí)現(xiàn)的,是無(wú)界限的。
ArrayBlockingQueue原理和數(shù)據(jù)結(jié)構(gòu)
ArrayBlockingQueue的數(shù)據(jù)結(jié)構(gòu),如下圖所示:

說(shuō)明:
1. ArrayBlockingQueue繼承于AbstractQueue,并且它實(shí)現(xiàn)了BlockingQueue接口。
2. ArrayBlockingQueue內(nèi)部是通過(guò)Object[]數(shù)組保存數(shù)據(jù)的,也就是說(shuō)ArrayBlockingQueue本質(zhì)上是通過(guò)數(shù)組實(shí)現(xiàn)的。ArrayBlockingQueue的大小,即數(shù)組的容量是創(chuàng)建ArrayBlockingQueue時(shí)指定的。
3. ArrayBlockingQueue與ReentrantLock是組合關(guān)系,ArrayBlockingQueue中包含一個(gè)ReentrantLock對(duì)象(lock)。
ReentrantLock是可重入的互斥鎖,ArrayBlockingQueue就是根據(jù)該互斥鎖實(shí)現(xiàn)“多線程對(duì)競(jìng)爭(zhēng)資源的互斥訪問(wèn)”。而且,ReentrantLock分為公平鎖和非公平鎖,關(guān)于具體使用公平鎖還是非公平鎖,在創(chuàng)建ArrayBlockingQueue時(shí)可以指定;而且,ArrayBlockingQueue默認(rèn)會(huì)使用非公平鎖。
4. ArrayBlockingQueue與Condition是組合關(guān)系,ArrayBlockingQueue中包含兩個(gè)Condition對(duì)象(notEmpty和notFull)。而且,Condition又依賴于ArrayBlockingQueue而存在,通過(guò)Condition可以實(shí)現(xiàn)對(duì)ArrayBlockingQueue的更精確的訪問(wèn) -- (01)若某線程(線程A)要取數(shù)據(jù)時(shí),數(shù)組正好為空,則該線程會(huì)執(zhí)行notEmpty.await()進(jìn)行等待;當(dāng)其它某個(gè)線程(線程B)向數(shù)組中插入了數(shù)據(jù)之后,會(huì)調(diào)用notEmpty.signal()喚醒“notEmpty上的等待線程”。此時(shí),線程A會(huì)被喚醒從而得以繼續(xù)運(yùn)行。(02)若某線程(線程H)要插入數(shù)據(jù)時(shí),數(shù)組已滿,則該線程會(huì)它執(zhí)行notFull.await()進(jìn)行等待;當(dāng)其它某個(gè)線程(線程I)取出數(shù)據(jù)之后,會(huì)調(diào)用notFull.signal()喚醒“notFull上的等待線程”。此時(shí),線程H就會(huì)被喚醒從而得以繼續(xù)運(yùn)行。
ArrayBlockingQueue函數(shù)列表
// 創(chuàng)建一個(gè)帶有給定的(固定)容量和默認(rèn)訪問(wèn)策略的 ArrayBlockingQueue。 ArrayBlockingQueue(int capacity) // 創(chuàng)建一個(gè)具有給定的(固定)容量和指定訪問(wèn)策略的 ArrayBlockingQueue。 ArrayBlockingQueue(int capacity, boolean fair) // 創(chuàng)建一個(gè)具有給定的(固定)容量和指定訪問(wèn)策略的 ArrayBlockingQueue,它最初包含給定 collection 的元素,并以 collection 迭代器的遍歷順序添加元素。 ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) // 將指定的元素插入到此隊(duì)列的尾部(如果立即可行且不會(huì)超過(guò)該隊(duì)列的容量),在成功時(shí)返回 true,如果此隊(duì)列已滿,則拋出 IllegalStateException。 boolean add(E e) // 自動(dòng)移除此隊(duì)列中的所有元素。 void clear() // 如果此隊(duì)列包含指定的元素,則返回 true。 boolean contains(Object o) // 移除此隊(duì)列中所有可用的元素,并將它們添加到給定 collection 中。 int drainTo(Collection<? super E> c) // 最多從此隊(duì)列中移除給定數(shù)量的可用元素,并將這些元素添加到給定 collection 中。 int drainTo(Collection<? super E> c, int maxElements) // 返回在此隊(duì)列中的元素上按適當(dāng)順序進(jìn)行迭代的迭代器。 Iterator<E> iterator() // 將指定的元素插入到此隊(duì)列的尾部(如果立即可行且不會(huì)超過(guò)該隊(duì)列的容量),在成功時(shí)返回 true,如果此隊(duì)列已滿,則返回 false。 boolean offer(E e) // 將指定的元素插入此隊(duì)列的尾部,如果該隊(duì)列已滿,則在到達(dá)指定的等待時(shí)間之前等待可用的空間。 boolean offer(E e, long timeout, TimeUnit unit) // 獲取但不移除此隊(duì)列的頭;如果此隊(duì)列為空,則返回 null。 E peek() // 獲取并移除此隊(duì)列的頭,如果此隊(duì)列為空,則返回 null。 E poll() // 獲取并移除此隊(duì)列的頭部,在指定的等待時(shí)間前等待可用的元素(如果有必要)。 E poll(long timeout, TimeUnit unit) // 將指定的元素插入此隊(duì)列的尾部,如果該隊(duì)列已滿,則等待可用的空間。 void put(E e) // 返回在無(wú)阻塞的理想情況下(不存在內(nèi)存或資源約束)此隊(duì)列能接受的其他元素?cái)?shù)量。 int remainingCapacity() // 從此隊(duì)列中移除指定元素的單個(gè)實(shí)例(如果存在)。 boolean remove(Object o) // 返回此隊(duì)列中元素的數(shù)量。 int size() // 獲取并移除此隊(duì)列的頭部,在元素變得可用之前一直等待(如果有必要)。 E take() // 返回一個(gè)按適當(dāng)順序包含此隊(duì)列中所有元素的數(shù)組。 Object[] toArray() // 返回一個(gè)按適當(dāng)順序包含此隊(duì)列中所有元素的數(shù)組;返回?cái)?shù)組的運(yùn)行時(shí)類型是指定數(shù)組的運(yùn)行時(shí)類型。 <T> T[] toArray(T[] a) // 返回此 collection 的字符串表示形式。 String toString()
下面從ArrayBlockingQueue的創(chuàng)建,添加,取出,遍歷這幾個(gè)方面對(duì)ArrayBlockingQueue進(jìn)行分析。
1. 創(chuàng)建
下面以ArrayBlockingQueue(int capacity, boolean fair)來(lái)進(jìn)行說(shuō)明。
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
說(shuō)明:
(01) items是保存“阻塞隊(duì)列”數(shù)據(jù)的數(shù)組。它的定義如下:
final Object[] items;
(02) fair是“可重入的獨(dú)占鎖(ReentrantLock)”的類型。fair為true,表示是公平鎖;fair為false,表示是非公平鎖。
notEmpty和notFull是鎖的兩個(gè)Condition條件。它們的定義如下:
final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull;
Lock的作用是提供獨(dú)占鎖機(jī)制,來(lái)保護(hù)競(jìng)爭(zhēng)資源;而Condition是為了更加精細(xì)的對(duì)鎖進(jìn)行控制,它依賴于Lock,通過(guò)某個(gè)條件對(duì)多線程進(jìn)行控制。
notEmpty表示“鎖的非空條件”。當(dāng)某線程想從隊(duì)列中取數(shù)據(jù)時(shí),而此時(shí)又沒有數(shù)據(jù),則該線程通過(guò)notEmpty.await()進(jìn)行等待;當(dāng)其它線程向隊(duì)列中插入了元素之后,就調(diào)用notEmpty.signal()喚醒“之前通過(guò)notEmpty.await()進(jìn)入等待狀態(tài)的線程”。
同理,notFull表示“鎖的滿條件”。當(dāng)某線程想向隊(duì)列中插入元素,而此時(shí)隊(duì)列已滿時(shí),該線程等待;當(dāng)其它線程從隊(duì)列中取出元素之后,就喚醒該等待的線程。
2. 添加
下面以offer(E e)為例,對(duì)ArrayBlockingQueue的添加方法進(jìn)行說(shuō)明。
public boolean offer(E e) {
// 創(chuàng)建插入的元素是否為null,是的話拋出NullPointerException異常
checkNotNull(e);
// 獲取“該阻塞隊(duì)列的獨(dú)占鎖”
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果隊(duì)列已滿,則返回false。
if (count == items.length)
return false;
else {
// 如果隊(duì)列未滿,則插入e,并返回true。
insert(e);
return true;
}
} finally {
// 釋放鎖
lock.unlock();
}
}
說(shuō)明:offer(E e)的作用是將e插入阻塞隊(duì)列的尾部。如果隊(duì)列已滿,則返回false,表示插入失敗;否則,插入元素,并返回true。
(01) count表示”隊(duì)列中的元素個(gè)數(shù)“。除此之外,隊(duì)列中還有另外兩個(gè)遍歷takeIndex和putIndex。takeIndex表示下一個(gè)被取出元素的索引,putIndex表示下一個(gè)被添加元素的索引。它們的定義如下:
// 隊(duì)列中的元素個(gè)數(shù) int takeIndex; // 下一個(gè)被取出元素的索引 int putIndex; // 下一個(gè)被添加元素的索引 int count;
(02) insert()的源碼如下:
private void insert(E x) {
// 將x添加到”隊(duì)列“中
items[putIndex] = x;
// 設(shè)置”下一個(gè)被取出元素的索引“
putIndex = inc(putIndex);
// 將”隊(duì)列中的元素個(gè)數(shù)”+1
++count;
// 喚醒notEmpty上的等待線程
notEmpty.signal();
}
insert()在插入元素之后,會(huì)喚醒notEmpty上面的等待線程。
inc()的源碼如下:
final int inc(int i) {
return (++i == items.length) ? 0 : i;
}
若i+1的值等于“隊(duì)列的長(zhǎng)度”,即添加元素之后,隊(duì)列滿;則設(shè)置“下一個(gè)被添加元素的索引”為0。
3. 取出
下面以take()為例,對(duì)ArrayBlockingQueue的取出方法進(jìn)行說(shuō)明。
public E take() throws InterruptedException {
// 獲取“隊(duì)列的獨(dú)占鎖”
final ReentrantLock lock = this.lock;
// 獲取“鎖”,若當(dāng)前線程是中斷狀態(tài),則拋出InterruptedException異常
lock.lockInterruptibly();
try {
// 若“隊(duì)列為空”,則一直等待。
while (count == 0)
notEmpty.await();
// 取出元素
return extract();
} finally {
// 釋放“鎖”
lock.unlock();
}
}
說(shuō)明:take()的作用是取出并返回隊(duì)列的頭。若隊(duì)列為空,則一直等待。
extract()的源碼如下:
private E extract() {
final Object[] items = this.items;
// 強(qiáng)制將元素轉(zhuǎn)換為“泛型E”
E x = this.<E>cast(items[takeIndex]);
// 將第takeIndex元素設(shè)為null,即刪除。同時(shí),幫助GC回收。
items[takeIndex] = null;
// 設(shè)置“下一個(gè)被取出元素的索引”
takeIndex = inc(takeIndex);
// 將“隊(duì)列中元素?cái)?shù)量”-1
--count;
// 喚醒notFull上的等待線程。
notFull.signal();
return x;
}
說(shuō)明:extract()在刪除元素之后,會(huì)喚醒notFull上的等待線程。
4. 遍歷
下面對(duì)ArrayBlockingQueue的遍歷方法進(jìn)行說(shuō)明。
public Iterator<E> iterator() {
return new Itr();
}
Itr是實(shí)現(xiàn)了Iterator接口的類,它的源碼如下:
private class Itr implements Iterator<E> {
// 隊(duì)列中剩余元素的個(gè)數(shù)
private int remaining; // Number of elements yet to be returned
// 下一次調(diào)用next()返回的元素的索引
private int nextIndex; // Index of element to be returned by next
// 下一次調(diào)用next()返回的元素
private E nextItem; // Element to be returned by next call to next
// 上一次調(diào)用next()返回的元素
private E lastItem; // Element returned by last call to next
// 上一次調(diào)用next()返回的元素的索引
private int lastRet; // Index of last element returned, or -1 if none
Itr() {
// 獲取“阻塞隊(duì)列”的鎖
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try {
lastRet = -1;
if ((remaining = count) > 0)
nextItem = itemAt(nextIndex = takeIndex);
} finally {
// 釋放“鎖”
lock.unlock();
}
}
public boolean hasNext() {
return remaining > 0;
}
public E next() {
// 獲取“阻塞隊(duì)列”的鎖
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try {
// 若“剩余元素<=0”,則拋出異常。
if (remaining <= 0)
throw new NoSuchElementException();
lastRet = nextIndex;
// 獲取第nextIndex位置的元素
E x = itemAt(nextIndex); // check for fresher value
if (x == null) {
x = nextItem; // we are forced to report old value
lastItem = null; // but ensure remove fails
}
else
lastItem = x;
while (--remaining > 0 && // skip over nulls
(nextItem = itemAt(nextIndex = inc(nextIndex))) == null)
;
return x;
} finally {
lock.unlock();
}
}
public void remove() {
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try {
int i = lastRet;
if (i == -1)
throw new IllegalStateException();
lastRet = -1;
E x = lastItem;
lastItem = null;
// only remove if item still at index
if (x != null && x == items[i]) {
boolean removingHead = (i == takeIndex);
removeAt(i);
if (!removingHead)
nextIndex = dec(nextIndex);
}
} finally {
lock.unlock();
}
}
}
ArrayBlockingQueue示例
import java.util.*;
import java.util.concurrent.*;
/*
* ArrayBlockingQueue是“線程安全”的隊(duì)列,而LinkedList是非線程安全的。
*
* 下面是“多個(gè)線程同時(shí)操作并且遍歷queue”的示例
* (01) 當(dāng)queue是ArrayBlockingQueue對(duì)象時(shí),程序能正常運(yùn)行。
* (02) 當(dāng)queue是LinkedList對(duì)象時(shí),程序會(huì)產(chǎn)生ConcurrentModificationException異常。
*
*
*/
public class ArrayBlockingQueueDemo1{
// TODO: queue是LinkedList對(duì)象時(shí),程序會(huì)出錯(cuò)。
//private static Queue<String> queue = new LinkedList<String>();
private static Queue<String> queue = new ArrayBlockingQueue<String>(20);
public static void main(String[] args) {
// 同時(shí)啟動(dòng)兩個(gè)線程對(duì)queue進(jìn)行操作!
new MyThread("ta").start();
new MyThread("tb").start();
}
private static void printAll() {
String value;
Iterator iter = queue.iterator();
while(iter.hasNext()) {
value = (String)iter.next();
System.out.print(value+", ");
}
System.out.println();
}
private static class MyThread extends Thread {
MyThread(String name) {
super(name);
}
@Override
public void run() {
int i = 0;
while (i++ < 6) {
// “線程名” + "-" + "序號(hào)"
String val = Thread.currentThread().getName()+i;
queue.add(val);
// 通過(guò)“Iterator”遍歷queue。
printAll();
}
}
}
}
(某一次)運(yùn)行結(jié)果:
ta1, ta1, tb1, ta1, tb1, ta1, ta2, tb1, ta1, ta2, tb1, tb2, ta2, ta1, tb2, tb1, ta3, ta2, ta1, tb2, tb1, ta3, ta2, tb3, tb2, ta1, ta3, tb1, tb3, ta2, ta4, tb2, ta1, ta3, tb1, tb3, ta2, ta4, tb2, tb4, ta3, ta1, tb3, tb1, ta4, ta2, tb4, tb2, ta5, ta3, ta1, tb3, tb1, ta4, ta2, tb4, tb2, ta5, ta3, tb5, tb3, ta1, ta4, tb1, tb4, ta2, ta5, tb2, tb5, ta3, ta6, tb3, ta4, tb4, ta5, tb5, ta6, tb6,
結(jié)果說(shuō)明:如果將源碼中的queue改成LinkedList對(duì)象時(shí),程序會(huì)產(chǎn)生ConcurrentModificationException異常。
- Java源碼解析阻塞隊(duì)列ArrayBlockingQueue介紹
- Java源碼解析阻塞隊(duì)列ArrayBlockingQueue常用方法
- Java源碼解析阻塞隊(duì)列ArrayBlockingQueue功能簡(jiǎn)介
- 詳細(xì)分析Java并發(fā)集合ArrayBlockingQueue的用法
- java并發(fā)之ArrayBlockingQueue詳細(xì)介紹
- java中LinkedBlockingQueue與ArrayBlockingQueue的異同
- java集合框架 arrayblockingqueue應(yīng)用分析
- java ArrayBlockingQueue的方法及缺點(diǎn)分析
相關(guān)文章
Spring實(shí)戰(zhàn)之依賴關(guān)系注入之后的行為示例
這篇文章主要介紹了Spring實(shí)戰(zhàn)之依賴關(guān)系注入之后的行為,結(jié)合實(shí)例形式分析了Spring依賴關(guān)系注入之后的行為實(shí)現(xiàn)與使用相關(guān)操作技巧,需要的朋友可以參考下2019-11-11
SpringBoot配置SSL同時(shí)支持http和https訪問(wèn)實(shí)現(xiàn)
本文主要介紹了SpringBoot配置SSL同時(shí)支持http和https訪問(wèn)實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07
Springboot使用POI實(shí)現(xiàn)導(dǎo)出Excel文件示例
本篇文章主要介紹了Springboot使用POI實(shí)現(xiàn)導(dǎo)出Excel文件示例,非常具有實(shí)用價(jià)值,需要的朋友可以參考下。2017-02-02
idea查看properties中文變成unicode碼的解決方案
這篇文章主要介紹了idea查看properties中文變成unicode碼的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-06-06
解決Java執(zhí)行Cmd命令出現(xiàn)的死鎖問(wèn)題
這篇文章主要介紹了關(guān)于Java執(zhí)行Cmd命令出現(xiàn)的死鎖問(wèn)題解決,解決方法就是在waitfor()方法之前讀出窗口的標(biāo)準(zhǔn)輸出、輸出、錯(cuò)誤緩沖區(qū)中的內(nèi)容,本文給大家介紹的非常詳細(xì),需要的朋友可以參考下2022-07-07
利用Redis實(shí)現(xiàn)延時(shí)處理的方法實(shí)例
這篇文章主要給大家介紹了關(guān)于利用Redis實(shí)現(xiàn)延時(shí)處理的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者使用Redis具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-03-03
解決Spring?AOP攔截抽象類(父類)中方法失效問(wèn)題
這篇文章主要介紹了解決Spring?AOP攔截抽象類(父類)中方法失效問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11

